From 8a530d2dc72f20b2300f7975c1bb04fdb08d8caf 2021-10-15 17:53:09 From: mh Date: 2021-10-15 17:53:09 Subject: [PATCH] basic message passing sync-resolving --- diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 71e86f483ce3f0640ff404e959dc4040f0049c9d..64edbdde1602c20a4130b8dd1fc6d55ca69ca186 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -6,10 +6,11 @@ use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::runtime2::global_store::{ConnectorKey, GlobalStore}; +use crate::runtime2::inbox::{OutgoingMessage, SolutionMessage}; use crate::runtime2::port::PortKind; use super::global_store::ConnectorId; use super::inbox::{ - PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage, + PrivateInbox, PublicInbox, OutgoingDataMessage, DataMessage, SyncMessage, SyncBranchConstraint, SyncConnectorSolution }; use super::port::PortIdLocal; @@ -340,8 +341,146 @@ impl ConnectorPDL { return self.in_sync; } - pub fn insert_sync_message(&mut self, message: SyncMessage, results: &mut RunDeltaState) { + /// Accepts a synchronous message and combines it with the locally stored + /// solution(s). + pub fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, results: &mut RunDeltaState) { + debug_assert!(!message.to_visit.contains(&self.id)); // own ID already removed + debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints + + // TODO: Optimize, use some kind of temp workspace vector + let mut execution_path_branch_ids = Vec::new(); + + if self.sync_finished_last_handled != 0 { + // We have some solutions to match against + let constraints_index = message.constraints + .iter() + .position(|v| v.connector_id == self.id) + .unwrap(); + let constraints = &message.constraints[constraints_index].constraints; + debug_assert!(!constraints.is_empty()); + + // Note that we only iterate over the solutions we've already + // handled ourselves, not necessarily + let mut branch_index = self.sync_finished.first; + 'branch_loop: loop { + // Load solution branch + let branch = &self.branches[branch_index as usize]; + execution_path_branch_ids.clear(); + self.branch_ids_of_execution_path(BranchId::new(branch_index), &mut execution_path_branch_ids); + + // Check if the branch matches all of the applied constraints + for constraint in constraints { + match constraint { + SyncBranchConstraint::SilentPort(silent_port_id) => { + let port_index = self.ports.get_port_index(*silent_port_id); + if port_index.is_none() { + // Nefarious peer + continue 'branch_loop; + } + let port_index = port_index.unwrap(); + + let mapping = self.ports.get_port(branch_index, port_index); + debug_assert!(mapping.is_assigned); + + if mapping.num_times_fired != 0 { + // Not silent, constraint not satisfied + continue 'branch_loop; + } + }, + SyncBranchConstraint::BranchNumber(expected_branch_id) => { + if !execution_path_branch_ids.contains(expected_branch_id) { + // Not the expected execution path, constraint not satisfied + continue 'branch_loop; + } + }, + SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => { + let port_index = self.ports.get_port_index(*port_id); + if port_index.is_none() { + // Nefarious peer + continue 'branch_loop; + } + let port_index = port_index.unwrap(); + + let mapping = self.ports.get_port(branch_index, port_index); + if mapping.last_registered_branch_id != expected_branch_id { + // Not the expected interaction on this port, constraint not satisfied + continue 'branch_loop; + } + }, + } + } + + // If here, then all of the external constraints were satisfied + // for the current branch. But the branch itself also imposes + // constraints. So while building up the new solution, make sure + // that those are satisfied as well. + // TODO: Code below can probably be merged with initial solution + // generation. + + // - clone old solution so we can add to it + let mut new_solution = message.clone(); + + // - determine the initial port mapping + let num_ports = self.ports.num_ports(); + let mut new_solution_mapping = Vec::with_capacity(num_ports); + for port_index in 0..self.ports.num_ports() { + let port_id = self.ports.get_port_id(port_index); + let mapping = self.ports.get_port(branch_index, port_index); + new_solution_mapping.push((port_id, mapping.last_registered_branch_id)); + } + + // - replace constraints with a local solution + new_solution.constraints.remove(constraints_index); + new_solution.local_solutions.push(SyncConnectorSolution{ + connector_id: self.id, + terminating_branch_id: BranchId::new(branch_index), + execution_branch_ids: execution_path_branch_ids.clone(), + final_port_mapping: new_solution_mapping, + }); + + // - do a second pass on the ports to generate and add the + // constraints that should be applied to other connectors + for port_index in 0..self.ports.num_ports() { + let port_id = self.ports.get_port_id(port_index); + + let (peer_connector_id, peer_port_id, peer_is_getter) = { + let key = unsafe{ ConnectorKey::from_id(self.id) }; + let port = global.ports.get(&key, port_id); + (port.peer_connector, port.peer_id, port.kind == PortKind::Putter) + }; + + let mapping = self.ports.get_port(branch_index, port_index); + let constraint = if mapping.num_times_fired == 0 { + SyncBranchConstraint::SilentPort(peer_port_id) + } else { + if peer_is_getter { + SyncBranchConstraint::PortMapping(peer_port_id, mapping.last_registered_branch_id) + } else { + SyncBranchConstraint::BranchNumber(mapping.last_registered_branch_id) + } + }; + + match new_solution.add_or_check_constraint(peer_connector_id, constraint) { + None => continue 'branch_loop, + Some(false) => continue 'branch_loop, + Some(true) => {}, + } + } + + // If here, then the newly generated solution is completely + // compatible. + Self::submit_sync_solution(new_solution, results); + + // Consider the next branch + if branch_index == self.sync_finished_last_handled { + // At the end of the previously handled solutions + break; + } + debug_assert!(branch.next_branch_in_queue.is_some()); // because we cannot be at the end of the queue + branch_index = branch.next_branch_in_queue.unwrap(); + } + } } // TODO: Remove GlobalStore, is used to retrieve ports. Ports belong with @@ -354,6 +493,7 @@ impl ConnectorPDL { // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. if self.sync_finished_last_handled != self.sync_finished.last { + // Retrieve first element in queue let mut next_id; if self.sync_finished_last_handled == 0 { next_id = self.sync_finished.first; @@ -363,7 +503,37 @@ impl ConnectorPDL { next_id = last_handled.next_branch_in_queue.unwrap(); } - // Transform branch into proposed global solution + loop { + let branch_id = BranchId::new(next_id); + let branch = &self.branches[next_id as usize]; + let branch_next = branch.next_branch_in_queue; + + // Turn local solution into a message and send it along + // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed + let connector_key = unsafe{ ConnectorKey::from_id(self.id) }; + let solution_message = self.generate_initial_solution_for_branch(branch_id, &connector_key, global); + if let Some(valid_solution) = solution_message { + Self::submit_sync_solution(valid_solution, results); + } else { + // Branch is actually invalid, but we only just figured + // it out. We need to mark it as invalid to prevent + // future use + Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id); + if branch_id == self.sync_finished_last_handled { + self.sync_finished_last_handled = self.sync_finished.last; + } + + let branch = &mut self.branches[next_id as usize]; + branch.sync_state = SpeculativeState::Inconsistent; + } + + match branch_next { + Some(id) => next_id = id, + None => break, + } + } + + self.sync_finished_last_handled = next_id; } return scheduling; @@ -537,7 +707,7 @@ impl ConnectorPDL { // Put in run results for thread to pick up and transfer to // the correct connector inbox. port_mapping.mark_definitive(branch.index, 1); - let message = OutgoingMessage { + let message = OutgoingDataMessage { sending_port: local_port_id, sender_prev_branch_id: BranchId::new_invalid(), sender_cur_branch_id: branch.index, @@ -551,7 +721,7 @@ impl ConnectorPDL { Self::release_ports_during_sync(&mut self.ports, branch, &results.ports); results.ports.clear(); - results.outbox.push(message); + results.outbox.push(OutgoingMessage::Data(message)); return ConnectorScheduling::Immediate } else { branch.sync_state = SpeculativeState::Inconsistent; @@ -640,7 +810,6 @@ impl ConnectorPDL { // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming // linked lists inside of the vector of branches. - #[inline] fn pop_branch(branches: &mut Vec, queue: &mut BranchQueue) -> &mut Branch { debug_assert!(queue.first != 0); let branch = &mut branches[queue.first as usize]; @@ -656,7 +825,6 @@ impl ConnectorPDL { return branch; } - #[inline] fn push_branch_into_queue( branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId, ) { @@ -677,6 +845,50 @@ impl ConnectorPDL { } } + /// Removes branch from linked-list queue. Walks through the entire list to + /// find the element (!). Assumption is that this is not called often. + fn remove_branch_from_queue( + branches: &mut Vec, queue: &mut BranchQueue, to_delete: BranchId, + ) { + debug_assert!(to_delete.is_valid()); // we're deleting a valid item + debug_assert!(queue.first != 0 && queue.last != 0); // queue isn't empty to begin with + + // Retrieve branch and its next element + let branch_to_delete = &mut branches[to_delete.index as usize]; + let branch_next_index_option = branch_to_delete.next_branch_in_queue; + let branch_next_index_unwrapped = branch_next_index_option.unwrap_or(0); + branch_to_delete.next_branch_in_queue = None; + + // Walk through all elements in queue to find branch to delete + let mut prev_index = 0; + let mut next_index = queue.first; + + while next_index != 0 { + if next_index == to_delete.index { + // Found the element we're going to delete + // - check if at the first element or not + if prev_index == 0 { + queue.first = branch_next_index_unwrapped; + } else { + let prev_branch = &mut branches[prev_index as usize]; + prev_branch.next_branch_in_queue = branch_next_index_option; + } + + // - check if at last element or not (also takes care of "no elements left in queue") + if branch_next_index_option.is_none() { + queue.last = prev_index; + } + + return; + } + + prev_index = next_index; + } + + // If here, then we didn't find the element + panic!("branch does not exist in provided queue"); + } + // Helpers for local port management. Specifically for adopting/losing // ownership over ports, and for checking if specific ports can be sent // over another port. @@ -866,6 +1078,23 @@ impl ConnectorPDL { return Some(sync_message); } + fn submit_sync_solution(partial_solution: SyncMessage, results: &mut RunDeltaState) { + if partial_solution.to_visit.is_empty() { + // Solution is completely consistent + let mut full_solution = SolutionMessage{ + local_solutions: Vec::with_capacity(partial_solution.local_solutions.len()), + }; + for local_solution in &partial_solution.local_solutions { + full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id)); + } + + results.outbox.push(OutgoingMessage::Solution(full_solution)); + } else { + // Still have connectors to visit + results.outbox.push(OutgoingMessage::Sync(partial_solution)); + } + } + fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec) { debug_assert!(parents.is_empty()); diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index c71ec0d15da177c50795ef34949e878eafb347ba..db5c94535cae7f96ccc86ca0bc81faa4b00df51c 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -23,13 +23,19 @@ use super::global_store::ConnectorId; /// A message prepared by a connector. Waiting to be picked up by the runtime to /// be sent to another connector. #[derive(Clone)] -pub struct OutgoingMessage { +pub struct OutgoingDataMessage { pub sending_port: PortIdLocal, pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id pub sender_cur_branch_id: BranchId, // always valid pub message: ValueGroup, } +pub enum OutgoingMessage { + Data(OutgoingDataMessage), + Sync(SyncMessage), + Solution(SolutionMessage), +} + /// A message that has been delivered (after being imbued with the receiving /// port by the scheduler) to a connector. #[derive(Clone)] @@ -42,12 +48,14 @@ pub struct DataMessage { pub message: ValueGroup, } +#[derive(Clone)] pub enum SyncBranchConstraint { SilentPort(PortIdLocal), BranchNumber(BranchId), PortMapping(PortIdLocal, BranchId), } +#[derive(Clone)] pub struct SyncConnectorSolution { pub connector_id: ConnectorId, pub terminating_branch_id: BranchId, @@ -55,11 +63,13 @@ pub struct SyncConnectorSolution { pub final_port_mapping: Vec<(PortIdLocal, BranchId)> } +#[derive(Clone)] pub struct SyncConnectorConstraints { pub connector_id: ConnectorId, pub constraints: Vec, } +#[derive(Clone)] pub struct SyncMessage { pub local_solutions: Vec, pub constraints: Vec, @@ -103,7 +113,7 @@ impl SyncMessage { if self.has_local_solution_for(connector_id) { return self.check_constraint(connector_id, constraint); } else { - self.add_constraint(connector_id); + self.add_constraint(connector_id, constraint); return Ok(true); } } @@ -175,6 +185,10 @@ impl SyncMessage { } } +pub struct SolutionMessage { + pub local_solutions: Vec<(ConnectorId, BranchId)>, +} + /// A control message. These might be sent by the scheduler to notify eachother /// of asynchronous state changes. pub struct ControlMessage { @@ -194,6 +208,7 @@ pub enum ControlMessageVariant { pub enum Message { Data(DataMessage), // data message, handled by connector Sync(SyncMessage), // sync message, handled by both connector/scheduler + Solution(SolutionMessage), // solution message, finishing a sync round Control(ControlMessage), // control message, handled by scheduler Ping, // ping message, intentionally waking up a connector (used for native connectors) } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 00a1ba5f112aba960e8d99d6dd7015ce6977ed26..b6706e8be16729c1218fa97812bd3c1916225be2 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -61,6 +61,12 @@ impl Scheduler { } else { scheduled.connector.inbox.insert_message(message); } + }, + Message::Sync(message) => { + scheduled.connector + }, + Message::Solution(solution) => { + }, Message::Control(message) => { match message.content {