diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index e9680d4ed5cb218f9167365de80b67554db3b3fc..1c285fb9c3124e9b303d560e87ca230c80b5ae70 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -53,6 +53,8 @@ impl ConnectorCtx { } } +// Because it contains pointers we're going to do a copy by value on this one +#[derive(Clone, Copy)] pub(crate) struct SchedulerCtx<'a> { pub(crate) runtime: &'a RuntimeInner } @@ -62,10 +64,6 @@ pub(crate) struct Scheduler { scheduler_id: u32, } -// Thinking aloud: actual ports should be accessible by connector, but managed -// by the scheduler (to handle rerouting messages). We could just give a read- -// only context, instead of an extra call on the "Connector" trait. - impl Scheduler { pub fn new(runtime: Arc, scheduler_id: u32) -> Self { return Self{ runtime, scheduler_id }; @@ -144,9 +142,12 @@ impl Scheduler { } // Actually run the connector + println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index); + let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run( - &self.runtime.protocol_description, &scheduled.context, &mut delta_state + scheduler_ctx, &scheduled.context, &mut delta_state ); + println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index); // Handle all of the output from the current run: messages to // send and connectors to instantiate. @@ -185,9 +186,19 @@ impl Scheduler { } }, ConnectorScheduling::Exit => { - // TODO: Better way of doing this, when exiting then - // connected components must know their channels are invalid - self.runtime.destroy_component(connector_key); + // Prepare for exit. Set the shutdown flag and broadcast + // messages to notify peers of closing channels + scheduled.shutting_down = true; + for port in &scheduled.context.ports { + self.runtime.send_message(port.peer_connector, Message{ + sending_connector: connector_key.downcast(), + receiving_port: port.peer_id, + contents: MessageContents::Control(ControlMessage{ + id: 0, + content: ControlMessageVariant::Ack + }) + }) + } } } } @@ -221,7 +232,7 @@ impl Scheduler { receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::ConfirmCommit(contents.clone()), }; - self.send_message_and_wake_up_if_sleeping(*to_visit, message); + self.runtime.send_message(*to_visit, message); } (ConnectorId::new_invalid(), PortIdLocal::new_invalid()) }, @@ -239,7 +250,7 @@ impl Scheduler { receiving_port: peer_port, contents: message, }; - self.send_message_and_wake_up_if_sleeping(peer_connector, message); + self.runtime.send_message(peer_connector, message); } } } @@ -264,12 +275,13 @@ impl Scheduler { // let the other end of the channel know that the port has // changed location. for port in &new_connector.context.ports { + cur_connector.pending_acks += 1; let reroute_message = cur_connector.router.prepare_reroute( port.self_id, port.peer_id, cur_connector.context.id, port.peer_connector, new_connector.context.id ); - self.send_message_and_wake_up_if_sleeping(port.peer_connector, reroute_message); + self.runtime.send_message(port.peer_connector, reroute_message); } // Schedule new connector to run @@ -281,39 +293,40 @@ impl Scheduler { debug_assert!(delta_state.new_ports.is_empty()); debug_assert!(delta_state.new_connectors.is_empty()); } +} - fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { - let connector = self.runtime.get_component_public(connector_id); +// ----------------------------------------------------------------------------- +// Control messages +// ----------------------------------------------------------------------------- - connector.inbox.insert_message(message); - let should_wake_up = connector.sleeping - .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); +struct ControlEntry { + id: u32, + variant: ControlVariant, +} - if should_wake_up { - let key = unsafe { ConnectorKey::from_id(connector_id) }; - self.runtime.push_work(key); - } - } +enum ControlVariant { + ChangedPort(ControlChangedPort), + ClosedChannel(ControlClosedChannel), } -/// Represents a rerouting entry due to a moved port -// TODO: Optimize -struct ReroutedTraffic { - id: u32, // ID of control message - target_port: PortIdLocal, // targeted port +struct ControlChangedPort { + target_port: PortIdLocal, // if send to this port, then reroute source_connector: ConnectorId, // connector we expect messages from - target_connector: ConnectorId, // connector they should be rerouted to + target_connector: ConnectorId, // connector we need to reroute to +} + +struct ControlClosedChannel { + } -pub(crate) struct Router { +pub(crate) struct ControlMessageHandler { id_counter: u32, - active: Vec, + active: Vec, } -impl Router { +impl ControlMessageHandler { pub fn new() -> Self { - Router{ + ControlMessageHandler { id_counter: 0, active: Vec::new(), } @@ -341,7 +354,7 @@ impl Router { return Message{ sending_connector: self_connector_id, - receiving_port: PortIdLocal::new_invalid(), + receiving_port: peer_port_id, contents: MessageContents::Control(ControlMessage{ id, content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id),