From 9ab53cf3934270c167650e6c3a299e339a180683 2021-11-29 11:30:44 From: MH Date: 2021-11-29 11:30:44 Subject: [PATCH] Replace rerouting with lock-step confirmation --- diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 36b11b4505fa8a4dbeebcf94fb7fd23b140b799f..94adc0ef7b05e5ab570ecf219923b309aa5799ce 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -206,9 +206,8 @@ impl Scheduler { self.runtime.send_message(message.sending_component_id, ack_message); }, ControlContent::Ack => { - if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) { - self.debug_conn(connector_id, &format!("Sending message to {:?} [ack ack] \n --- {:?}", target_component, new_control_message)); - self.runtime.send_message(target_component, new_control_message); + if let Some(component_key) = scheduled.router.handle_ack(message.id) { + self.runtime.push_work(component_key); }; }, ControlContent::Ping => {}, @@ -303,13 +302,14 @@ impl Scheduler { while let Some(state_change) = scheduled.ctx.state_changes.pop_front() { match state_change { ComponentStateChange::CreatedComponent(component, initial_ports) => { - // Creating a new component. The creator needs to relinquish - // ownership of the ports that are given to the new - // component. All data messages that were intended for that - // port also needs to be transferred. - let new_key = self.runtime.create_pdl_component(component, false); - let new_connector = self.runtime.get_component_private(&new_key); - + // Creating a new component. Need to relinquish control of + // the ports. + let new_component_key = self.runtime.create_pdl_component(component, false); + let new_connector = self.runtime.get_component_private(&new_component_key); + + // First pass: transfer ports and the associated messages, + // also count the number of ports that have peers + let mut num_peers = 0; for port_id in initial_ports { // Transfer messages associated with the transferred port scheduled.ctx.inbox.transfer_messages_for_port(port_id, &mut new_connector.ctx.inbox); @@ -321,23 +321,32 @@ impl Scheduler { let port = scheduled.ctx.ports.remove(port_index); new_connector.ctx.ports.push(port.clone()); - // Notify the peer that the port has changed, but only - // if the port wasn't already closed (otherwise the peer - // is gone). if port.state == PortState::Open { - let reroute_message = scheduled.router.prepare_reroute( - port.self_id, port.peer_id, scheduled.ctx.id, - port.peer_connector, new_connector.ctx.id, - &mut new_connector.router - ); - - self.debug_conn(connector_id, &format!("Sending message to {:?} [newcon]\n --- {:?}", port.peer_connector, reroute_message)); - self.runtime.send_message(port.peer_connector, Message::Control(reroute_message)); + num_peers += 1; } } - // Schedule new connector to run - self.runtime.push_work(new_key); + if num_peers == 0 { + // No peers to notify, so just schedule the component + self.runtime.push_work(new_component_key); + } else { + // Some peers to notify + let new_component_id = new_component_key.downcast(); + let control_id = scheduled.router.prepare_new_component(new_component_key); + for port in new_connector.ctx.ports.iter() { + if port.state == PortState::Closed { + continue; + } + + let control_message = scheduled.router.prepare_changed_port_peer( + control_id, scheduled.ctx.id, + port.peer_connector, port.peer_id, + new_component_id, port.self_id + ); + self.debug_conn(connector_id, &format!("Sending message to {:?} [newcom]\n --- {:#?}", port.peer_connector, control_message)); + self.runtime.send_message(port.peer_connector, Message::Control(control_message)); + } + } }, ComponentStateChange::CreatedPort(port) => { scheduled.ctx.ports.push(port); @@ -854,16 +863,32 @@ struct ControlEntry { } enum ControlVariant { + NewComponent(ControlNewComponent), ChangedPort(ControlChangedPort), ClosedChannel(ControlClosedChannel), - ReroutePending, +} + +impl ControlVariant { + fn as_new_component_mut(&mut self) -> &mut ControlNewComponent { + match self { + ControlVariant::NewComponent(v) => v, + _ => unreachable!(), + } + } +} + +/// Entry for a new component waiting for execution after all of its peers have +/// confirmed the `ControlChangedPort` messages. +struct ControlNewComponent { + num_acks_pending: u32, // if it hits 0, we schedule the component + component_key: ConnectorKey, // this is the component we schedule } struct ControlChangedPort { - target_port: PortIdLocal, // if send to this port, then reroute - source_connector: ConnectorId, // connector we expect messages from - target_connector: ConnectorId, // connector we need to reroute to - id_of_ack_after_confirmation: u32, // control message ID we need to send to the target upon receiving an ack + reroute_if_sent_to_this_port: PortIdLocal, // if sent to this port, then reroute + source_connector: ConnectorId, // connector we expect messages from + target_connector: ConnectorId, // connector we need to reroute to + new_component_entry_id: u32, // if Ack'd, we reduce the counter on this `ControlNewComponent` entry } struct ControlClosedChannel { @@ -907,37 +932,49 @@ impl ControlMessageHandler { }; } - /// Prepares rerouting messages due to changed ownership of a port. The - /// control message returned by this function must be sent to the - /// transferred port's peer connector. - pub fn prepare_reroute( - &mut self, - port_id: PortIdLocal, peer_port_id: PortIdLocal, - self_connector_id: ConnectorId, peer_connector_id: ConnectorId, - new_owner_connector_id: ConnectorId, new_owner_ctrl_handler: &mut ControlMessageHandler, - ) -> ControlMessage { + /// Prepares a control entry for a new component. This returns the id of + /// the entry for calls to `prepare_changed_port_peer`. Don't call this + /// function if the component has no peers that need to be messaged. + pub fn prepare_new_component(&mut self, component_key: ConnectorKey) -> u32 { let id = self.take_id(); - - let new_owner_id = new_owner_ctrl_handler.take_id(); self.active.push(ControlEntry{ id, - variant: ControlVariant::ChangedPort(ControlChangedPort{ - target_port: port_id, - source_connector: peer_connector_id, - target_connector: new_owner_connector_id, - id_of_ack_after_confirmation: new_owner_id, + variant: ControlVariant::NewComponent(ControlNewComponent{ + num_acks_pending: 0, + component_key, }), }); - new_owner_ctrl_handler.active.push(ControlEntry{ - id: new_owner_id, - variant: ControlVariant::ReroutePending, + return id; + } + + pub fn prepare_changed_port_peer( + &mut self, new_component_entry_id: u32, creating_component_id: ConnectorId, + changed_component_id: ConnectorId, changed_port_id: PortIdLocal, + new_target_component_id: ConnectorId, new_target_port_id: PortIdLocal + ) -> ControlMessage { + // Add the peer-changed entry + let change_port_entry_id = self.take_id(); + self.active.push(ControlEntry{ + id: change_port_entry_id, + variant: ControlVariant::ChangedPort(ControlChangedPort{ + reroute_if_sent_to_this_port: new_target_port_id, + source_connector: changed_component_id, + target_connector: new_target_component_id, + new_component_entry_id, + }) }); - return ControlMessage { - id, - sending_component_id: self_connector_id, - content: ControlContent::PortPeerChanged(peer_port_id, new_owner_connector_id), + // Increment counter on "new component" entry + let position = self.position(new_component_entry_id).unwrap(); + let new_component_entry = &mut self.active[position]; + let new_component_entry = new_component_entry.variant.as_new_component_mut(); + new_component_entry.num_acks_pending += 1; + + return ControlMessage{ + id: change_port_entry_id, + sending_component_id: creating_component_id, + content: ControlContent::PortPeerChanged(changed_port_id, new_target_component_id), }; } @@ -946,7 +983,7 @@ impl ControlMessageHandler { pub fn should_reroute(&self, target_port: PortIdLocal) -> Option { for entry in &self.active { if let ControlVariant::ChangedPort(entry) = &entry.variant { - if entry.target_port == target_port { + if entry.reroute_if_sent_to_this_port == target_port { // Need to reroute this message return Some(entry.target_connector); } @@ -958,26 +995,36 @@ impl ControlMessageHandler { /// Handles an Ack as an answer to a previously sent control message. /// Handling an Ack might spawn a new message that needs to be sent. - pub fn handle_ack(&mut self, handler_component_id: ConnectorId, id: u32) -> Option<(ConnectorId, Message)> { - let index = self.active.iter() - .position(|v| v.id == id); + pub fn handle_ack(&mut self, id: u32) -> Option { + let index = self.position(id); match index { Some(index) => { - let removed = self.active.remove(index); - match removed.variant { + // Remove the entry. If `ChangedPort`, then retrieve associated + // `NewComponent`. Otherwise: early exits + let removed_entry = self.active.remove(index); + let new_component_idx = match removed_entry.variant { ControlVariant::ChangedPort(message) => { - return Some(( - message.target_connector, - Message::Control(ControlMessage{ - id: message.id_of_ack_after_confirmation, - sending_component_id: handler_component_id, - content: ControlContent::Ack - }) - )); + self.position(message.new_component_entry_id).unwrap() }, _ => return None, + }; + + // Decrement counter, if 0, then schedule component + let new_component_entry = self.active[new_component_idx].variant.as_new_component_mut(); + new_component_entry.num_acks_pending -= 1; + if new_component_entry.num_acks_pending != 0 { + return None; } + + // Return component key for scheduling + let new_component_entry = self.active.remove(new_component_idx); + let new_component_entry = match new_component_entry.variant { + ControlVariant::NewComponent(entry) => entry, + _ => unreachable!(), + }; + + return Some(new_component_entry.component_key); }, None => { todo!("handling of nefarious ACKs"); @@ -1000,4 +1047,9 @@ impl ControlMessageHandler { return generated_id; } + + #[inline] + fn position(&self, id: u32) -> Option { + return self.active.iter().position(|v| v.id == id); + } } \ No newline at end of file diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index c0fd39082af541d28257a921922101ba7993011a..31d3f161c6d982dd33fee742b5f57224dd756947 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -11,13 +11,13 @@ use crate::protocol::eval::*; use crate::runtime2::native::{ApplicationSyncAction}; // Generic testing constants, use when appropriate to simplify stress-testing -// pub(crate) const NUM_THREADS: u32 = 8; // number of threads in runtime -// pub(crate) const NUM_INSTANCES: u32 = 250; // number of test instances constructed -// pub(crate) const NUM_LOOPS: u32 = 10; // number of loops within a single test (not used by all tests) +pub(crate) const NUM_THREADS: u32 = 8; // number of threads in runtime +pub(crate) const NUM_INSTANCES: u32 = 1500; // number of test instances constructed +pub(crate) const NUM_LOOPS: u32 = 10; // number of loops within a single test (not used by all tests) -pub(crate) const NUM_THREADS: u32 = 6; -pub(crate) const NUM_INSTANCES: u32 = 1; -pub(crate) const NUM_LOOPS: u32 = 15; +// pub(crate) const NUM_THREADS: u32 = 6; +// pub(crate) const NUM_INSTANCES: u32 = 1; +// pub(crate) const NUM_LOOPS: u32 = 15; fn create_runtime(pdl: &str) -> Runtime {