diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 76970d1e2f9c3558f0e82ab1b8cb222cae795f4d..34023b2d4f910d5eadec4fd066abd37ce49737c4 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -187,6 +187,7 @@ pub struct ControlMessage { #[derive(Debug, Clone)] pub enum ControlMessageVariant { ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port + CloseChannel(PortIdLocal), // close the port associated with this Ack, // acknowledgement of previous control message, matching occurs through control message ID. } diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index a98334a747a63370f502708fc830e019e335b43c..32b1be9a1b059aa7e1b4c2aef2380acd93708848 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -99,7 +99,6 @@ pub(crate) struct ScheduledConnector { pub public: ConnectorPublic, // accessible by all schedulers and connectors pub router: ControlMessageHandler, pub shutting_down: bool, - pub pending_acks: u32, } // ----------------------------------------------------------------------------- diff --git a/src/runtime2/port.rs b/src/runtime2/port.rs index a4503b7ca237a1a045c634fc4210715ef8d9cf61..42b9ee27cfa77e70e25dc084aa8a382aeeefd9f3 100644 --- a/src/runtime2/port.rs +++ b/src/runtime2/port.rs @@ -27,14 +27,22 @@ pub enum PortKind { Getter, } -/// Represents a port inside of the runtime. May be without owner if it is -/// created by the application interfacing with the runtime, instead of being -/// created by a connector. +#[derive(Debug, Eq, PartialEq)] +pub enum PortState { + Open, + Closed, +} + +/// Represents a port inside of the runtime. This is generally the local view of +/// a connector on its port, which may not be consistent with the rest of the +/// global system (e.g. its peer was moved to a new connector, or the peer might +/// have died in the meantime, so it is no longer usable). pub struct Port { pub self_id: PortIdLocal, pub peer_id: PortIdLocal, pub kind: PortKind, - pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase. + pub state: PortState, + pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 1c285fb9c3124e9b303d560e87ca230c80b5ae70..2f6607005df909e7d0a6eb7849a81ec5f566af7b 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::sync::atomic::Ordering; use super::{RuntimeInner, ConnectorId, ConnectorKey}; -use super::port::{Port, PortIdLocal}; +use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::connector::{ConnectorScheduling, RunDeltaState}; use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage}; @@ -87,6 +87,7 @@ impl Scheduler { // We have something to do let connector_key = connector_key.unwrap(); + let connector_id = connector_key.downcast(); println!("DEBUG [{}]: ... Got work, running {}", scheduler_id, connector_key.index); let scheduled = self.runtime.get_component_private(&connector_key); @@ -119,10 +120,11 @@ impl Scheduler { // messages in the outbox that send to the port whose owning // connector we just changed. This is because the `ack` will // clear the rerouting entry of the `ack`-receiver. - self.send_message_and_wake_up_if_sleeping( + // TODO: Question from Max from the past: what the hell did you mean? + self.runtime.send_message( message.sending_connector, Message{ - sending_connector: connector_key.downcast(), + sending_connector: connector_id, receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::Control(ControlMessage{ id: content.id, @@ -131,6 +133,25 @@ impl Scheduler { } ); }, + ControlMessageVariant::CloseChannel(port_id) => { + // Mark the port as being closed + let port = scheduled.context.get_port_mut(port_id); + port.state = PortState::Closed; + + // Send an Ack + self.runtime.send_message( + message.sending_connector, + Message{ + sending_connector: connector_id, + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Control(ControlMessage{ + id: content.id, + content: ControlMessageVariant::Ack, + }), + } + ); + + }, ControlMessageVariant::Ack => { scheduled.router.handle_ack(content.id); } @@ -141,19 +162,33 @@ impl Scheduler { } } - // Actually run the connector - println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index); - let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; - let new_schedule = scheduled.connector.run( - scheduler_ctx, &scheduled.context, &mut delta_state - ); - println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index); + // Run the main behaviour of the connector, depending on its + // current state. + if scheduled.shutting_down { + // Nothing to do. But we're stil waiting for all our pending + // control messages to be answered. + if scheduled.router.num_pending_acks() == 0 { + // We're actually done, we can safely destroy the + // currently running connector + self.runtime.destroy_component(connector_key); + continue 'thread_loop; + } else { + cur_schedule = ConnectorScheduling::NotNow; + } + } else { + println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index); + let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; + let new_schedule = scheduled.connector.run( + scheduler_ctx, &scheduled.context, &mut delta_state + ); + println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index); - // Handle all of the output from the current run: messages to - // send and connectors to instantiate. - self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state); + // Handle all of the output from the current run: messages to + // send and connectors to instantiate. + self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state); - cur_schedule = new_schedule; + cur_schedule = new_schedule; + } } // If here then the connector does not require immediate execution. @@ -190,14 +225,11 @@ impl Scheduler { // messages to notify peers of closing channels scheduled.shutting_down = true; for port in &scheduled.context.ports { - self.runtime.send_message(port.peer_connector, Message{ - sending_connector: connector_key.downcast(), - receiving_port: port.peer_id, - contents: MessageContents::Control(ControlMessage{ - id: 0, - content: ControlMessageVariant::Ack - }) - }) + let message = scheduled.router.prepare_closing_channel( + port.self_id, port.peer_id, + connector_id + ); + self.runtime.send_message(port.peer_connector, message); } } } @@ -316,7 +348,8 @@ struct ControlChangedPort { } struct ControlClosedChannel { - + source_port: PortIdLocal, + target_port: PortIdLocal, } pub(crate) struct ControlMessageHandler { @@ -332,6 +365,32 @@ impl ControlMessageHandler { } } + /// Prepares a message indicating that a channel has closed, we keep a local + /// entry to match against the (hopefully) returned `Ack` message. + pub fn prepare_closing_channel( + &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, + self_connector_id: connectorId + ) -> Message { + let id = self.take_id(); + + self.active.push(ControlEntry{ + id, + variant: ControlVariant::ClosedChannel(ControlClosedChannel{ + source_port: self_port_id, + target_port: peer_port_id, + }), + }); + + return Message{ + sending_connector: self_connector_id, + receiving_port: peer_port_id, + contents: MessageContents::Control(ControlMessage{ + id, + content: ControlMessageVariant::CloseChannel(peer_port_id), + }), + }; + } + /// Prepares rerouting messages due to changed ownership of a port. The /// control message returned by this function must be sent to the /// transferred port's peer connector. @@ -341,15 +400,15 @@ impl ControlMessageHandler { self_connector_id: ConnectorId, peer_connector_id: ConnectorId, new_owner_connector_id: ConnectorId ) -> Message { - let id = self.id_counter; - let (new_id_counter, _) = self.id_counter.overflowing_add(1); - self.id_counter = new_id_counter; + let id = self.take_id(); - self.active.push(ReroutedTraffic{ + self.active.push(ControlEntry{ id, - target_port: port_id, - source_connector: peer_connector_id, - target_connector: new_owner_connector_id, + variant: ControlVariant::ChangedPort(ControlChangedPort{ + target_port: port_id, + source_connector: peer_connector_id, + target_connector: new_owner_connector_id, + }), }); return Message{ @@ -365,11 +424,13 @@ impl ControlMessageHandler { /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option { - for reroute in &self.active { - if reroute.source_connector == sending_connector && - reroute.target_port == target_port { - // Need to reroute this message - return Some(reroute.target_connector); + for entry in &self.active { + if let ControlVariant::ChangedPort(entry) = entry { + if entry.source_connector == sending_connector && + entry.target_port == target_port { + // Need to reroute this message + return Some(entry.target_connector); + } } } @@ -386,4 +447,19 @@ impl ControlMessageHandler { None => { todo!("handling of nefarious ACKs"); }, } } + + /// Retrieves the number of responses we still expect to receive from our + /// peers + #[inline] + pub fn num_pending_acks(&self) -> usize { + return self.active.len(); + } + + fn take_id(&mut self) -> u32 { + let generated_id = self.id_counter; + let (new_id, _) = self.id_counter.overflowing_add(1); + self.id_counter = new_id; + + return generated_id; + } } \ No newline at end of file