diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 42546271a1f6f01dfc042f0e78615f0446a12171..47ddc84473aeb08641da02f3cb67ff2b56f388e2 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,7 +4,7 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::inbox::{MessageContents, SolutionMessage}; +use crate::runtime2::inbox::{Message, MessageContents, SolutionMessage}; use crate::runtime2::native::Connector; use crate::runtime2::port::{Port, PortKind}; use crate::runtime2::scheduler::ConnectorCtx; @@ -19,7 +19,7 @@ use super::port::PortIdLocal; /// ID of `0` generally means "no branch" (e.g. no parent, or a port did not /// yet receive anything from any branch). #[derive(Clone, Copy, PartialEq, Eq)] -pub(crate) struct BranchId { +pub struct BranchId { pub index: u32, } @@ -145,6 +145,7 @@ struct PortOwnershipDelta { port_id: PortIdLocal, } +#[derive(Debug)] enum PortOwnershipError { UsedInInteraction(PortIdLocal), AlreadyGivenAway(PortIdLocal) @@ -158,7 +159,7 @@ pub(crate) struct ConnectorPorts { // Contains P*B entries, where P is the number of ports and B is the number // of branches. One can find the appropriate mapping of port p at branch b // at linear index `b*P+p`. - pub port_mapping: Vec + port_mapping: Vec } impl ConnectorPorts { @@ -188,7 +189,8 @@ impl ConnectorPorts { self.port_mapping.reserve(num_ports); for offset in 0..num_ports { let parent_port = &self.port_mapping[parent_base_idx + offset]; - self.port_mapping.push(parent_port.clone()); + let parent_port = parent_port.clone(); + self.port_mapping.push(parent_port); } } @@ -303,10 +305,10 @@ pub(crate) struct ConnectorPublic { } impl ConnectorPublic { - pub fn new() -> Self { + pub fn new(initialize_as_sleeping: bool) -> Self { ConnectorPublic{ inbox: PublicInbox::new(), - sleeping: AtomicBool::new(false), + sleeping: AtomicBool::new(initialize_as_sleeping), } } } @@ -349,14 +351,14 @@ impl RunContext for TempCtx { } impl Connector for ConnectorPDL { - fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { use MessageContents as MC; - match message { - MC::Data(message) => self.handle_data_message(message), - MC::Sync(message) => self.handle_sync_message(message, ctx, delta_state), - MC::RequestCommit(message) => self.handle_request_commit_message(message, ctx, delta_state), - MC::ConfirmCommit(message) => self.handle_confirm_commit_message(message, ctx, delta_state), + match message.contents { + MC::Data(content) => self.handle_data_message(message.receiving_port, content), + MC::Sync(content) => self.handle_sync_message(content, ctx, delta_state), + MC::RequestCommit(content) => self.handle_request_commit_message(content, ctx, delta_state), + MC::ConfirmCommit(content) => self.handle_confirm_commit_message(content, ctx, delta_state), MC::Control(_) | MC::Ping => {}, } } @@ -446,8 +448,8 @@ impl ConnectorPDL { // ------------------------------------------------------------------------- #[inline] - pub fn handle_data_message(&mut self, message: DataMessage) { - self.inbox.insert_message(message); + pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) { + self.inbox.insert_message(target_port, message); } /// Accepts a synchronous message and combines it with the locally stored @@ -577,6 +579,7 @@ impl ConnectorPDL { // If here, then the newly generated solution is completely // compatible. + let next_branch = branch.next_branch_in_queue; self.submit_sync_solution(new_solution, ctx, results); // Consider the next branch @@ -585,8 +588,8 @@ impl ConnectorPDL { break; } - debug_assert!(branch.next_branch_in_queue.is_some()); // because we cannot be at the end of the queue - branch_index = branch.next_branch_in_queue.unwrap(); + debug_assert!(next_branch.is_some()); // because we cannot be at the end of the queue + branch_index = next_branch.unwrap(); } } } @@ -629,7 +632,7 @@ impl ConnectorPDL { } } - fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) { // Make sure this is the message we actually committed to. As long as // we're running on a single machine this is fine. // TODO: Take care of nefarious peers @@ -683,7 +686,7 @@ impl ConnectorPDL { /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(self.in_sync); debug_assert!(!self.sync_active.is_empty()); @@ -709,24 +712,35 @@ impl ConnectorPDL { let local_port_index = self.ports.get_port_index(local_port_id).unwrap(); debug_assert!(self.ports.owned_ports.contains(&local_port_id)); - let silent_branch = &*branch; - // Create a copied branch who will have the port set to firing - let firing_index = self.branches.len() as u32; - let mut firing_branch = Branch::new_sync_branching_from(firing_index, silent_branch); - self.ports.prepare_sync_branch(branch.index.index, firing_index); + // Create two copied branches, one silent and one firing + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + let parent_branch_id = branch.index; + let parent_branch = &self.branches[parent_branch_id.index as usize]; - let firing_port = self.ports.get_port_mut(firing_index, local_port_index); - firing_port.mark_speculative(1); + let silent_index = self.branches.len() as u32; + let firing_index = silent_index + 1; + + let silent_branch = Branch::new_sync_branching_from(silent_index, parent_branch); + self.ports.prepare_sync_branch(parent_branch.index.index, silent_index); - // Assign the old branch a silent value - let silent_port = self.ports.get_port_mut(silent_branch.index.index, local_port_index); + let firing_branch = Branch::new_sync_branching_from(firing_index, parent_branch); + self.ports.prepare_sync_branch(parent_branch.index.index, firing_index); + + // Assign the port values of the two new branches + let silent_port = self.ports.get_port_mut(silent_index, local_port_index); silent_port.mark_speculative(0); + let firing_port = self.ports.get_port_mut(firing_index, local_port_index); + firing_port.mark_speculative(1); + // Run both branches again - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch.index); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch.index); + let silent_branch_id = silent_branch.index; + self.branches.push(silent_branch); + let firing_branch_id = firing_branch.index; self.branches.push(firing_branch); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch_id); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch_id); return ConnectorScheduling::Immediate; }, @@ -755,7 +769,8 @@ impl ConnectorPDL { if is_valid_get { // Mark as a branching point for future messages branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch.index); + let branch_id = branch.index; + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id); // But if some messages can be immediately applied, do so // now. @@ -766,9 +781,10 @@ impl ConnectorPDL { did_have_messages = true; // For each message prepare a new branch to execute + let parent_branch = &self.branches[branch_id.index as usize]; let new_branch_index = self.branches.len() as u32; - let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch); - self.ports.prepare_sync_branch(branch.index.index, new_branch_index); + let mut new_branch = Branch::new_sync_branching_from(new_branch_index, parent_branch); + self.ports.prepare_sync_branch(branch_id.index, new_branch_index); let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index); port_mapping.last_registered_branch_id = message.sender_cur_branch_id; @@ -785,8 +801,9 @@ impl ConnectorPDL { // Schedule the new branch debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index); + let new_branch_id = new_branch.index; self.branches.push(new_branch); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id); } if did_have_messages { @@ -808,8 +825,9 @@ impl ConnectorPDL { } } + let branch_id = branch.index; branch.sync_state = SpeculativeState::ReachedSyncEnd; - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch.index); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch_id); }, RunResult::BranchPut(port_id, value_group) => { // Branch performed a `put` on a particualar port. @@ -853,7 +871,7 @@ impl ConnectorPDL { // ownership over them in this branch debug_assert!(results.ports.is_empty()); find_ports_in_value_group(&message.message, &mut results.ports); - Self::release_ports_during_sync(&mut self.ports, branch, &results.ports); + Self::release_ports_during_sync(&mut self.ports, branch, &results.ports).unwrap(); results.ports.clear(); results.outbox.push(MessageContents::Data(message)); @@ -875,7 +893,7 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -897,8 +915,9 @@ impl ConnectorPDL { // Prepare for sync execution and reschedule immediately self.in_sync = true; let first_sync_branch = Branch::new_sync_branching_from(1, branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index); + let first_sync_branch_id = first_sync_branch.index; self.branches.push(first_sync_branch); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch_id); return ConnectorScheduling::Later; }, @@ -1020,6 +1039,8 @@ impl ConnectorPDL { } prev_index = next_index; + let entry = &branches[next_index as usize]; + next_index = entry.next_branch_in_queue.unwrap_or(0); } // If here, then we didn't find the element @@ -1216,7 +1237,7 @@ impl ConnectorPDL { // TODO: Maybe another package for random? let comparison_number: u64 = unsafe { let mut random_array = [0u8; 8]; - getrandom::getrandom(&mut random_array); + getrandom::getrandom(&mut random_array).unwrap(); std::mem::transmute(random_array) };