From 3f2759e5fc57c9fa46bdf6f3fa096e0460bc7a98 2021-10-25 13:08:11 From: MH Date: 2021-10-25 13:08:11 Subject: [PATCH] somewhat correctly handling port closing and rerouting --- diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 419df819b60b039f6930dbfea3d6269311a82723..a79d9810109b7215372f2f19213c6546221829ec 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -61,6 +61,7 @@ pub(crate) struct Branch { code_state: ComponentState, prepared_channel: Option<(Value, Value)>, sync_state: SpeculativeState, + halted_at_port: PortIdLocal, // invalid if not halted next_branch_in_queue: Option, // Message/port state received: HashMap, // TODO: @temporary, remove together with fires() @@ -77,6 +78,7 @@ impl Branch { code_state: component_state, prepared_channel: None, sync_state: SpeculativeState::RunningNonSync, + halted_at_port: PortIdLocal::new_invalid(), next_branch_in_queue: None, received: HashMap::new(), ports_delta: Vec::new(), @@ -98,6 +100,7 @@ impl Branch { code_state: parent_branch.code_state.clone(), prepared_channel: None, sync_state: SpeculativeState::RunningInSync, + halted_at_port: PortIdLocal::new_invalid(), next_branch_in_queue: None, received: parent_branch.received.clone(), ports_delta: parent_branch.ports_delta.clone(), @@ -105,7 +108,13 @@ impl Branch { } fn commit_to_sync(&mut self) { - self.index = BranchId::new(0); + // Logically impossible conditions (because we have a finished branch + // we are going to commit to) + debug_assert!(self.prepared_channel.is_none()); + debug_assert!(!self.halted_at_port.is_valid()); + + // Reset other variables to their defaults + self.index = BranchId::new_invalid(); self.parent_index = BranchId::new_invalid(); self.sync_state = SpeculativeState::RunningNonSync; self.next_branch_in_queue = None; @@ -404,6 +413,38 @@ impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { + // Check for new messages we haven't seen before. If any of the + // pending branches can accept the message, do so. + while let Some((target_port_id, message)) = self.inbox.next_message() { + let mut branch_idx = self.sync_pending_get.first; + while branch_idx != 0 { + let branch = &self.branches[branch_idx as usize]; + let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0); + + let target_port_index = self.ports.get_port_index(*target_port_id).unwrap(); + let port_mapping = self.ports.get_port(branch_idx, target_port_index); + + if branch.sync_state == SpeculativeState::HaltedAtBranchPoint && + branch.halted_at_port == *target_port_id && + port_mapping.last_registered_branch_id == message.sender_prev_branch_id { + // Branch may accept this mesage, so create a fork that + // contains this message in the inbox. + let new_branch_idx = self.branches.len() as u32; + let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch); + + self.ports.prepare_sync_branch(branch_idx, new_branch_idx); + let mapping = self.ports.get_port_mut(branch_idx, target_port_index); + mapping.last_registered_branch_id = message.sender_cur_branch_id; + + let new_branch_id = BranchId::new(new_branch_idx); + self.branches.push(new_branch); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id) + } + + branch_idx = next_branch_idx; + } + } + let scheduling = self.run_in_speculative_mode(sched_ctx, conn_ctx, delta_state); // When in speculative mode we might have generated new sync @@ -486,7 +527,6 @@ impl ConnectorPDL { // Handling connector messages // ------------------------------------------------------------------------- - #[inline] pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) { self.inbox.insert_message(target_port, message); } @@ -819,6 +859,7 @@ impl ConnectorPDL { if is_valid_get { // Mark as a branching point for future messages branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.halted_at_port = local_port_id; let branch_id = branch.index; Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id); diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 34023b2d4f910d5eadec4fd066abd37ce49737c4..8b1a6fd9b7a04dca1fd591b7fff46e2127c7975a 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -293,14 +293,14 @@ impl PrivateInbox { /// Retrieves the next unread message. Should only be called by the /// inbox-reader. - pub(crate) fn next_message(&mut self) -> Option<&DataMessage> { + pub(crate) fn next_message(&mut self) -> Option<(&PortIdLocal, &DataMessage)> { if self.len_read == self.messages.len() { return None; } - let (_, to_return) = &self.messages[self.len_read]; + let (target_port, message) = &self.messages[self.len_read]; self.len_read += 1; - return Some(to_return); + return Some((target_port, message)); } /// Simply empties the inbox diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 32b1be9a1b059aa7e1b4c2aef2380acd93708848..2622104e2d5aa40d557ffa26ec7c5fc44cd8908d 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -24,7 +24,7 @@ use inbox::Message; use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; use scheduler::{Scheduler, ConnectorCtx, ControlMessageHandler}; use native::{Connector, ConnectorApplication, ApplicationInterface}; -use crate::runtime2::port::Port; +use crate::runtime2::port::{Port, PortState}; use crate::runtime2::scheduler::SchedulerCtx; /// A kind of token that, once obtained, allows mutable access to a connector. @@ -231,12 +231,14 @@ impl RuntimeInner { self_id: getter_id, peer_id: putter_id, kind: PortKind::Getter, + state: PortState::Open, peer_connector: creating_connector, }; let putter_port = Port{ self_id: putter_id, peer_id: getter_id, kind: PortKind::Putter, + state: PortState::Open, peer_connector: creating_connector, }; @@ -289,7 +291,7 @@ impl RuntimeInner { ConnectorVariant::UserDefined(connector) => { for port_id in connector.ports.owned_ports.iter().copied() { println!("DEBUG: Transferring port {:?} from {} to {}", port_id, created_by.context.id.0, key.index); - let mut port = created_by.context.remove_port(port_id); + let port = created_by.context.remove_port(port_id); created.context.add_port(port); } }, @@ -350,7 +352,7 @@ impl RuntimeInner { let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst); println!("DEBUG: Decremented components to {}", old_num - 1); debug_assert!(old_num > 0); - if old_num == 0 { // such that we have no more active connectors (for now!) + if old_num == 1 { // such that we have no more active connectors (for now!) let num_interfaces = self.active_interfaces.load(Ordering::Acquire); if num_interfaces == 0 { self.signal_for_shutdown(); @@ -428,7 +430,6 @@ impl ConnectorStore { public: ConnectorPublic::new(initially_sleeping), router: ControlMessageHandler::new(), shutting_down: false, - pending_acks: 0, }; let index; diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 2f6607005df909e7d0a6eb7849a81ec5f566af7b..5623f2f59e5c52e89029c2adcd4533c4400b492a 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,5 +1,6 @@ use std::sync::Arc; use std::sync::atomic::Ordering; +use crate::runtime2::ScheduledConnector; use super::{RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; @@ -77,18 +78,18 @@ impl Scheduler { 'thread_loop: loop { // Retrieve a unit of work - println!("DEBUG [{}]: Waiting for work", scheduler_id); + self.debug("Waiting for work"); let connector_key = self.runtime.wait_for_work(); if connector_key.is_none() { // We should exit - println!("DEBUG [{}]: ... No more work, quitting", scheduler_id); + self.debug(" ... No more work, quitting"); break 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); let connector_id = connector_key.downcast(); - println!("DEBUG [{}]: ... Got work, running {}", scheduler_id, connector_key.index); + self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index)); let scheduled = self.runtime.get_component_private(&connector_key); @@ -99,9 +100,9 @@ impl Scheduler { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { // Check for rerouting - println!("DEBUG [{}]: Handling message from {}:{}\n{:#?}", scheduler_id, message.sending_connector.0, message.receiving_port.index, message); + self.debug_conn(connector_id, &format!("Handling message from {}:{}\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message)); if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { - self.send_message_and_wake_up_if_sleeping(other_connector_id, message); + self.runtime.send_message(other_connector_id, message); continue; } @@ -113,14 +114,14 @@ impl Scheduler { // Need to change port target let port = scheduled.context.get_port_mut(port_id); port.peer_connector = new_target_connector_id; + + // Note: for simplicity we program the scheduler to always finish + // running a connector with an empty outbox. If this ever changes + // then accepting the "port peer changed" message implies we need + // to change the recipient of the message in the outbox. debug_assert!(delta_state.outbox.is_empty()); // And respond with an Ack - // Note: after this code has been reached, we may not have any - // messages in the outbox that send to the port whose owning - // connector we just changed. This is because the `ack` will - // clear the rerouting entry of the `ack`-receiver. - // TODO: Question from Max from the past: what the hell did you mean? self.runtime.send_message( message.sending_connector, Message{ @@ -167,6 +168,7 @@ impl Scheduler { if scheduled.shutting_down { // Nothing to do. But we're stil waiting for all our pending // control messages to be answered. + self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks())); if scheduled.router.num_pending_acks() == 0 { // We're actually done, we can safely destroy the // currently running connector @@ -176,16 +178,16 @@ impl Scheduler { cur_schedule = ConnectorScheduling::NotNow; } } else { - println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index); + self.debug_conn(connector_id, "Running ..."); let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run( scheduler_ctx, &scheduled.context, &mut delta_state ); - println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index); + self.debug_conn(connector_id, "Finished running"); // Handle all of the output from the current run: messages to // send and connectors to instantiate. - self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state); + self.handle_delta_state(scheduled, connector_key.downcast(), &mut delta_state); cur_schedule = new_schedule; } @@ -204,58 +206,55 @@ impl Scheduler { // Need to sleep, note that we are the only ones which are // allows to set the sleeping state to `true`, and since // we're running it must currently be `false`. - debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false); - scheduled.public.sleeping.store(true, Ordering::Release); - - // We might have received a message in the meantime from a - // thread that did not see the sleeping flag set to `true`, - // so: - if !scheduled.public.inbox.is_empty() { - let should_reschedule_self = scheduled.public.sleeping - .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); - - if should_reschedule_self { - self.runtime.push_work(connector_key); - } - } + self.try_go_to_sleep(connector_key, scheduled); }, ConnectorScheduling::Exit => { // Prepare for exit. Set the shutdown flag and broadcast // messages to notify peers of closing channels scheduled.shutting_down = true; for port in &scheduled.context.ports { - let message = scheduled.router.prepare_closing_channel( - port.self_id, port.peer_id, - connector_id - ); - self.runtime.send_message(port.peer_connector, message); + if port.state != PortState::Closed { + let message = scheduled.router.prepare_closing_channel( + port.self_id, port.peer_id, + connector_id + ); + self.runtime.send_message(port.peer_connector, message); + } } + + if scheduled.router.num_pending_acks() == 0 { + self.runtime.destroy_component(connector_key); + continue 'thread_loop; + } + + self.try_go_to_sleep(connector_key, scheduled); } } } } - fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) { + fn handle_delta_state(&mut self, + cur_connector: &mut ScheduledConnector, connector_id: ConnectorId, + delta_state: &mut RunDeltaState + ) { // Handling any messages that were sent - let connector_id = connector_key.downcast(); - if !delta_state.outbox.is_empty() { for mut message in delta_state.outbox.drain(..) { // Based on the message contents, decide where the message // should be sent to. This might end up modifying the message. - let (peer_connector, peer_port) = match &mut message { + self.debug_conn(connector_id, &format!("Sending message\n --- {:?}", message)); + let (peer_connector, self_port, peer_port) = match &mut message { MessageContents::Data(contents) => { - let port = context.get_port(contents.sending_port); - (port.peer_connector, port.peer_id) + let port = cur_connector.context.get_port(contents.sending_port); + (port.peer_connector, contents.sending_port, port.peer_id) }, MessageContents::Sync(contents) => { let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid()) + (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) }, MessageContents::RequestCommit(contents)=> { let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid()) + (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) }, MessageContents::ConfirmCommit(contents) => { for to_visit in &contents.to_visit { @@ -266,7 +265,7 @@ impl Scheduler { }; self.runtime.send_message(*to_visit, message); } - (ConnectorId::new_invalid(), PortIdLocal::new_invalid()) + (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) }, MessageContents::Control(_) | MessageContents::Ping => { // Never generated by the user's code @@ -277,6 +276,17 @@ impl Scheduler { // TODO: Maybe clean this up, perhaps special case for // ConfirmCommit can be handled differently. if peer_connector.is_valid() { + if peer_port.is_valid() { + // Sending a message to a port, so the port may not be + // closed. + let port = cur_connector.context.get_port(self_port); + match port.state { + PortState::Open => {}, + PortState::Closed => { + todo!("Handling sending over a closed port"); + } + } + } let message = Message { sending_connector: connector_id, receiving_port: peer_port, @@ -289,15 +299,13 @@ impl Scheduler { if !delta_state.new_ports.is_empty() { for port in delta_state.new_ports.drain(..) { - context.ports.push(port); + cur_connector.context.ports.push(port); } } // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { - let cur_connector = self.runtime.get_component_private(connector_key); - for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key let new_key = self.runtime.create_pdl_component(cur_connector, new_connector); @@ -307,7 +315,6 @@ impl Scheduler { // let the other end of the channel know that the port has // changed location. for port in &new_connector.context.ports { - cur_connector.pending_acks += 1; let reroute_message = cur_connector.router.prepare_reroute( port.self_id, port.peer_id, cur_connector.context.id, port.peer_connector, new_connector.context.id @@ -325,6 +332,37 @@ impl Scheduler { debug_assert!(delta_state.new_ports.is_empty()); debug_assert!(delta_state.new_connectors.is_empty()); } + + fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) { + debug_assert_eq!(connector_key.index, connector.context.id.0); + debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false); + + // This is the running connector, and only the running connector may + // decide it wants to sleep again. + connector.public.sleeping.store(true, Ordering::Release); + + // But do to reordering we might have received messages from peers who + // did not consider us sleeping. If so, then we wake ourselves again. + if !connector.public.inbox.is_empty() { + // Try to wake ourselves up + let should_wake_up_again = connector.public.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up_again { + self.runtime.push_work(connector_key) + } + } + } + + // TODO: Remove, this is debugging stuff + fn debug(&self, message: &str) { + println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); + } + + fn debug_conn(&self, conn: ConnectorId, message: &str) { + println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); + } } // ----------------------------------------------------------------------------- @@ -369,7 +407,7 @@ impl ControlMessageHandler { /// entry to match against the (hopefully) returned `Ack` message. pub fn prepare_closing_channel( &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, - self_connector_id: connectorId + self_connector_id: ConnectorId ) -> Message { let id = self.take_id(); @@ -425,7 +463,7 @@ impl ControlMessageHandler { /// function returns the connector that should retrieve this message. pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option { for entry in &self.active { - if let ControlVariant::ChangedPort(entry) = entry { + if let ControlVariant::ChangedPort(entry) = &entry.variant { if entry.source_connector == sending_connector && entry.target_port == target_port { // Need to reroute this message diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 4be969ef20327dcbec9388bcd93cd991d53c5fe4..6e2c177a086fd82ac787b34c665d28b1da2a4ae0 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -14,7 +14,7 @@ fn runtime_for(num_threads: u32, pdl: &str) -> Runtime { #[test] fn test_put_and_get() { - let rt = runtime_for(1, " + let rt = runtime_for(4, " primitive putter(out sender, u32 loops) { u32 index = 0; while (index < loops) { @@ -33,6 +33,7 @@ primitive getter(in receiver, u32 loops) { print(\"getting!\"); auto result = get(receiver); assert(result); + } index += 1; } @@ -41,7 +42,7 @@ primitive getter(in receiver, u32 loops) { let mut api = rt.create_interface(); let channel = api.create_channel(); - let num_loops = 5; + let num_loops = 100; api.create_connector("", "putter", ValueGroup::new_stack(vec![ Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })),