diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 022ab16496f3aa2ab43511267f73f022916e8654..c71ec0d15da177c50795ef34949e878eafb347ba 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -44,25 +44,135 @@ pub struct DataMessage { pub enum SyncBranchConstraint { SilentPort(PortIdLocal), - BranchNumber(u32), - PortMapping(PortIdLocal, u32), + BranchNumber(BranchId), + PortMapping(PortIdLocal, BranchId), } pub struct SyncConnectorSolution { - connector_id: ConnectorId, - terminating_branch_id: BranchId, - execution_branch_ids: Vec, // ends with terminating branch ID + pub connector_id: ConnectorId, + pub terminating_branch_id: BranchId, + pub execution_branch_ids: Vec, // no particular ordering of IDs enforced + pub final_port_mapping: Vec<(PortIdLocal, BranchId)> } pub struct SyncConnectorConstraints { - connector_id: ConnectorId, - constraints: Vec, + pub connector_id: ConnectorId, + pub constraints: Vec, } pub struct SyncMessage { - connector_solutions: Vec, - connector_constraints: Vec, - connectors_to_visit: Vec, + pub local_solutions: Vec, + pub constraints: Vec, + pub to_visit: Vec, +} + +// TODO: Shouldn't really be here, right? +impl SyncMessage { + /// Creates a new sync message. Assumes that it is created by a connector + /// that has just encountered a new local solution. + pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self { + let mut local_solutions = Vec::with_capacity(approximate_peers); + local_solutions.push(initial_solution); + + return Self{ + local_solutions, + constraints: Vec::with_capacity(approximate_peers), + to_visit: Vec::with_capacity(approximate_peers), + }; + } + + /// Checks if a connector has already provided a local solution + pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool { + return self.local_solutions + .iter() + .any(|v| v.connector_id == connector_id); + } + + /// Adds a new constraint. If the connector has already provided a local + /// solution then the constraint will be checked. Otherwise the constraint + /// will be added to the solution. If this is the first constraint for a + /// connector then it will be added to the connectors that still have to be + /// visited. + /// + /// If this returns true then the constraint was added, or the local + /// solution for the specified connector satisfies the constraint. If this + /// function returns an error then we're dealing with a nefarious peer. + pub(crate) fn add_or_check_constraint( + &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint + ) -> Result { + if self.has_local_solution_for(connector_id) { + return self.check_constraint(connector_id, constraint); + } else { + self.add_constraint(connector_id); + return Ok(true); + } + } + + /// Pushes a new connector constraint. Caller must ensure that the solution + /// has not yet arrived at the specified connector (because then it would no + /// longer have constraints, but a proposed solution instead). + pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) { + debug_assert!(!self.has_local_solution_for(connector_id)); + + let position = self.constraints + .iter() + .position(|v| v.connector_id == connector_id); + + match position { + Some(index) => { + // Has pre-existing constraints + debug_assert!(self.to_visit.contains(&connector_id)); + let entry = &mut self.constraints[index]; + entry.constraints.push(constraint); + }, + None => { + debug_assert!(!self.to_visit.contains(&connector_id)); + self.constraints.push(SyncConnectorConstraints{ + connector_id, + constraints: vec![constraint], + }); + self.to_visit.push(connector_id); + } + } + } + + /// Checks if a constraint is satisfied by a solution. Caller must make sure + /// that a local solution has already been provided. Will return an error + /// value only if the provided constraint does not make sense (i.e. a + /// nefarious peer has supplied a constraint with a port we do not own). + pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result { + debug_assert!(self.has_local_solution_for(connector_id)); + + let entry = self.local_solutions + .iter() + .find(|v| v.connector_id == connector_id) + .unwrap(); + + match constraint { + SyncBranchConstraint::SilentPort(silent_port_id) => { + for (port_id, mapped_id) in &entry.final_port_mapping { + if port_id == silent_port_id { + // If silent, then mapped ID is invalid + return Ok(!mapped_id.is_valid()) + } + } + + return Err(()); + }, + SyncBranchConstraint::BranchNumber(expected_branch_id) => { + return Ok(entry.execution_branch_ids.contains(&expected_branch_id)); + }, + SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => { + for (port_id, mapped_id) in &entry.final_port_mapping { + if port_id == port_id { + return Ok(*mapped_id == expected_branch_id); + } + } + + return Err(()); + }, + } + } } /// A control message. These might be sent by the scheduler to notify eachother