diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index f54d36201d575e58d5f2d89c450ca272f575d4b7..eb8ccbae8bcebfe6e4c6279c845c0c23e0065445 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -68,7 +68,7 @@ impl Scheduler { } else { self.debug_conn(connector_id, "Running ..."); let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; - let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx_fancy); + let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx); self.debug_conn(connector_id, "Finished running"); // Handle all of the output from the current run: messages to @@ -98,7 +98,7 @@ impl Scheduler { // Prepare for exit. Set the shutdown flag and broadcast // messages to notify peers of closing channels scheduled.shutting_down = true; - for port in &scheduled.ctx_fancy.ports { + for port in &scheduled.ctx.ports { if port.state != PortState::Closed { let message = scheduled.router.prepare_closing_channel( port.self_id, port.peer_id, @@ -123,7 +123,7 @@ impl Scheduler { /// Receiving messages from the public inbox and handling them or storing /// them in the component's private inbox fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) { - let connector_id = scheduled.ctx_fancy.id; + let connector_id = scheduled.ctx.id; while let Some(message) = scheduled.public.inbox.take_message() { // Check if the message has to be rerouted because we have moved the @@ -145,14 +145,14 @@ impl Scheduler { match message.content { ControlContent::PortPeerChanged(port_id, new_target_connector_id) => { // Need to change port target - let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap(); + let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); port.peer_connector = new_target_connector_id; // Note: for simplicity we program the scheduler to always finish // running a connector with an empty outbox. If this ever changes // then accepting the "port peer changed" message implies we need // to change the recipient of the message in the outbox. - debug_assert!(scheduled.ctx_fancy.outbox.is_empty()); + debug_assert!(scheduled.ctx.outbox.is_empty()); // And respond with an Ack let ack_message = Message::Control(ControlMessage { @@ -165,7 +165,7 @@ impl Scheduler { }, ControlContent::CloseChannel(port_id) => { // Mark the port as being closed - let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap(); + let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap(); port.state = PortState::Closed; // Send an Ack @@ -185,7 +185,7 @@ impl Scheduler { }, _ => { // All other cases have to be handled by the component - scheduled.ctx_fancy.inbox_messages.push(message); + scheduled.ctx.inbox_messages.push(message); } } } @@ -196,17 +196,17 @@ impl Scheduler { /// component's state that the scheduler needs to know about (e.g. a message /// that the component wants to send, a port that has been added). fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) { - let connector_id = scheduled.ctx_fancy.id; + let connector_id = scheduled.ctx.id; // Handling any messages that were sent - while let Some(message) = scheduled.ctx_fancy.outbox.pop_front() { + while let Some(message) = scheduled.ctx.outbox.pop_front() { self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); let target_component_id = match &message { Message::Data(content) => { // Data messages are always sent to a particular port, and // may end up being rerouted. - let port_desc = scheduled.ctx_fancy.get_port_by_id(content.data_header.sending_port).unwrap(); + let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap(); debug_assert_eq!(port_desc.peer_id, content.data_header.target_port); if port_desc.state == PortState::Closed { @@ -230,7 +230,7 @@ impl Scheduler { self.runtime.send_message(target_component_id, message); } - while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() { + while let Some(state_change) = scheduled.ctx.state_changes.pop_front() { match state_change { ComponentStateChange::CreatedComponent(component, initial_ports) => { // Creating a new component. The creator needs to relinquish @@ -243,28 +243,28 @@ impl Scheduler { for port_id in initial_ports { // Transfer messages associated with the transferred port let mut message_idx = 0; - while message_idx < scheduled.ctx_fancy.inbox_messages.len() { - let message = &scheduled.ctx_fancy.inbox_messages[message_idx]; + while message_idx < scheduled.ctx.inbox_messages.len() { + let message = &scheduled.ctx.inbox_messages[message_idx]; if Self::get_message_target_port(message) == Some(port_id) { // Need to transfer this message - let message = scheduled.ctx_fancy.inbox_messages.remove(message_idx); - new_connector.ctx_fancy.inbox_messages.push(message); + let message = scheduled.ctx.inbox_messages.remove(message_idx); + new_connector.ctx.inbox_messages.push(message); } else { message_idx += 1; } } // Transfer the port itself - let port_index = scheduled.ctx_fancy.ports.iter() + let port_index = scheduled.ctx.ports.iter() .position(|v| v.self_id == port_id) .unwrap(); - let port = scheduled.ctx_fancy.ports.remove(port_index); - new_connector.ctx_fancy.ports.push(port.clone()); + let port = scheduled.ctx.ports.remove(port_index); + new_connector.ctx.ports.push(port.clone()); // Notify the peer that the port has changed let reroute_message = scheduled.router.prepare_reroute( - port.self_id, port.peer_id, scheduled.ctx_fancy.id, - port.peer_connector, new_connector.ctx_fancy.id + port.self_id, port.peer_id, scheduled.ctx.id, + port.peer_connector, new_connector.ctx.id ); self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); @@ -275,38 +275,38 @@ impl Scheduler { self.runtime.push_work(new_key); }, ComponentStateChange::CreatedPort(port) => { - scheduled.ctx_fancy.ports.push(port); + scheduled.ctx.ports.push(port); }, ComponentStateChange::ChangedPort(port_change) => { if port_change.is_acquired { - scheduled.ctx_fancy.ports.push(port_change.port); + scheduled.ctx.ports.push(port_change.port); } else { - let index = scheduled.ctx_fancy.ports + let index = scheduled.ctx.ports .iter() .position(|v| v.self_id == port_change.port.self_id) .unwrap(); - scheduled.ctx_fancy.ports.remove(index); + scheduled.ctx.ports.remove(index); } } } } // Finally, check if we just entered or just left a sync region - if scheduled.ctx_fancy.changed_in_sync { - if scheduled.ctx_fancy.is_in_sync { + if scheduled.ctx.changed_in_sync { + if scheduled.ctx.is_in_sync { // Just entered sync region } else { // Just left sync region. So clear inbox - scheduled.ctx_fancy.inbox_messages.clear(); - scheduled.ctx_fancy.inbox_len_read = 0; + scheduled.ctx.inbox_messages.clear(); + scheduled.ctx.inbox_len_read = 0; } - scheduled.ctx_fancy.changed_in_sync = false; // reset flag + scheduled.ctx.changed_in_sync = false; // reset flag } } fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) { - debug_assert_eq!(connector_key.index, connector.ctx_fancy.id.0); + debug_assert_eq!(connector_key.index, connector.ctx.id.0); debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false); // This is the running connector, and only the running connector may