diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index b6c1c4b3d0221c6223fe21e9be93bfde25cd7a9e..a86101c3ad92818e07791e2e9ef9a23829db3e4f 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,7 +4,7 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage}; +use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage}; use crate::runtime2::port::PortIdLocal; /// Represents the identifier of a branch (the index within its container). An @@ -52,7 +52,7 @@ pub(crate) struct Branch { sync_state: SpeculativeState, next_branch_in_queue: Option, // Message/port state - inbox: HashMap, // TODO: @temporary, remove together with fires() + received: HashMap, // TODO: @temporary, remove together with fires() ports_delta: Vec, } @@ -66,7 +66,7 @@ impl Branch { code_state: component_state, sync_state: SpeculativeState::RunningNonSync, next_branch_in_queue: None, - inbox: HashMap::new(), + received: HashMap::new(), ports_delta: Vec::new(), } } @@ -85,7 +85,7 @@ impl Branch { code_state: parent_branch.code_state.clone(), sync_state: SpeculativeState::RunningInSync, next_branch_in_queue: None, - inbox: parent_branch.inbox.clone(), + received: parent_branch.received.clone(), ports_delta: parent_branch.ports_delta.clone(), } } @@ -264,7 +264,7 @@ impl ConnectorPublic { // TODO: Maybe prevent false sharing by aligning `public` to next cache line. // TODO: Do this outside of the connector, create a wrapping struct -pub(crate) struct Connector { +pub(crate) struct ConnectorPDL { // State and properties of connector itself id: u32, in_sync: bool, @@ -273,6 +273,7 @@ pub(crate) struct Connector { sync_active: BranchQueue, sync_pending_get: BranchQueue, sync_finished: BranchQueue, + sync_finished_last_handled: u32, // Port/message management pub inbox: PrivateInbox, pub ports: ConnectorPorts, @@ -297,7 +298,7 @@ impl RunContext for TempCtx { } } -impl Connector { +impl ConnectorPDL { /// Constructs a representation of a connector. The assumption is that the /// initial branch is at the first instruction of the connector's code, /// hence is in a non-sync state. @@ -309,6 +310,7 @@ impl Connector { sync_active: BranchQueue::new(), sync_pending_get: BranchQueue::new(), sync_finished: BranchQueue::new(), + sync_finished_last_handled: 0, // none at all inbox: PrivateInbox::new(), ports: ConnectorPorts::new(owned_ports), } @@ -318,6 +320,34 @@ impl Connector { return self.in_sync; } + pub fn insert_sync_message(&mut self, message: SyncMessage, results: &mut RunDeltaState) { + + } + + pub fn run(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { + if self.in_sync { + let scheduling = self.run_in_speculative_mode(pd, results); + + // When in speculative mode we might have generated new sync + // solutions, we need to turn them into proposed solutions here. + if self.sync_finished_last_handled != self.sync_finished.last { + let mut next_id; + if self.sync_finished_last_handled == 0 { + next_id = self.sync_finished.first; + } else { + let last_handled = &self.branches[self.sync_finished_last_handled as usize]; + debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue" + next_id = last_handled.next_branch_in_queue.unwrap(); + } + + // Transform branch into proposed global solution + } + } else { + let scheduling = self.run_in_deterministic_mode(pd, results); + return scheduling; + } + } + /// Runs the connector in synchronous mode. Potential changes to the global /// system's state are added to the `RunDeltaState` object by the connector, /// where it is the caller's responsibility to immediately take care of @@ -400,29 +430,38 @@ impl Connector { // But if some messages can be immediately applied, do so // now. let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id); - if !messages.is_empty() { - // TODO: If message contains ports, transfer ownership of port. - for message in messages { - // For each message, for the execution and feed it - // the provided message - let new_branch_index = self.branches.len() as u32; - let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch); - self.ports.prepare_sync_branch(branch.index.index, new_branch_index); - - let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index); - port_mapping.last_registered_branch_id = message.sender_cur_branch_id; - debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1); - - new_branch.inbox.insert(local_port_id, message.clone()); - - // Schedule the new branch - debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index); - self.branches.push(new_branch); - } + let mut did_have_messages = false; + + for message in messages { + did_have_messages = true; + + // For each message prepare a new branch to execute + let new_branch_index = self.branches.len() as u32; + let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch); + self.ports.prepare_sync_branch(branch.index.index, new_branch_index); + + let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index); + port_mapping.last_registered_branch_id = message.sender_cur_branch_id; + debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1); + + new_branch.received.insert(local_port_id, message.clone()); + + // If the message contains any ports then they will now + // be owned by the new branch + debug_assert!(results.ports.is_empty()); + find_ports_in_value_group(&message.message, &mut results.ports); + Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports); + results.ports.clear(); - // Because we have new branches to run, schedule - // immediately + // Schedule the new branch + debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index); + self.branches.push(new_branch); + } + + if did_have_messages { + // If we did create any new branches, then we can run + // them immediately. return ConnectorScheduling::Immediate; } } else { @@ -431,7 +470,7 @@ impl Connector { }, RunResult::BranchAtSyncEnd => { // Branch is done, go through all of the ports that are not yet - // assigned and modify them to be + // assigned and map them to non-firing. for port_idx in 0..self.ports.num_ports() { let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx); if !port_mapping.is_assigned { @@ -480,6 +519,13 @@ impl Connector { message: value_group, }; + // If the message contains any ports then we release our + // ownership over them in this branch + debug_assert!(results.ports.is_empty()); + find_ports_in_value_group(&message.message, &mut results.ports); + Self::release_ports_during_sync(&mut self.ports, branch, &results.ports); + results.ports.clear(); + results.outbox.push(message); return ConnectorScheduling::Immediate } else { @@ -545,7 +591,7 @@ impl Connector { }; let new_connector_ports = results.ports.clone(); // TODO: Do something with this let new_connector_branch = Branch::new_initial_branch(new_connector_state); - let new_connector = Connector::new(0, new_connector_branch, new_connector_ports); + let new_connector = ConnectorPDL::new(0, new_connector_branch, new_connector_ports); results.new_connectors.push(new_connector); @@ -586,7 +632,9 @@ impl Connector { } #[inline] - fn push_branch_into_queue(branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId) { + fn push_branch_into_queue( + branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId, + ) { debug_assert!(to_push.is_valid()); let to_push = to_push.index; @@ -702,6 +750,11 @@ impl Connector { return Ok(()) } + + // Helpers for generating and handling sync messages (and the solutions that + // are described by those sync messages) + + fn generate_initial_solution_for_branch(&self, branch_id: BranchId,) } /// A data structure passed to a connector whose code is being executed that is @@ -712,7 +765,7 @@ pub(crate) struct RunDeltaState { // Variables that allow the thread running the connector to pick up global // state changes and try to apply them. pub outbox: Vec, - pub new_connectors: Vec, + pub new_connectors: Vec, // Workspaces pub ports: Vec, }