From 1807ef57538822264749ee8fb33b30360d099b93 2021-12-03 15:53:58 From: MH Date: 2021-12-03 15:53:58 Subject: [PATCH] remove old runtime files --- diff --git a/src/runtime_old/communication.rs b/src/runtime_old/communication.rs deleted file mode 100644 index fbc0f14ad660eaa7757fb1eaff39e4407c631f36..0000000000000000000000000000000000000000 --- a/src/runtime_old/communication.rs +++ /dev/null @@ -1,1433 +0,0 @@ -use super::*; -use crate::common::*; -use core::ops::{Deref, DerefMut}; - -// Guard protecting an incrementally unfoldable slice of MapTempGuard elements -struct MapTempsGuard<'a, K, V>(&'a mut [HashMap]); - -// Type protecting a temporary map; At the start and end of the Guard's lifetime, self.0.is_empty() must be true -struct MapTempGuard<'a, K, V>(&'a mut HashMap); - -// Once the synchronous round has begun, this structure manages the -// native component's speculative branches, one per synchronous batch. -struct BranchingNative { - branches: HashMap, -} - -// Corresponds to one of the native's synchronous batches during the synchronous round. -// ports marked for message receipt correspond to entries of -// (a) `gotten` if they have not received yet, -// (b) `to_get` if they have already received, with the given payload. -// The branch corresponds to a component solution IFF to_get is empty. -#[derive(Clone, Debug)] -struct NativeBranch { - index: usize, - gotten: HashMap, - to_get: HashSet, -} - -// Manages a protocol component's speculative branches for the duration -// of the synchronous round. -#[derive(Debug)] -struct BranchingProtoComponent { - branches: HashMap, -} - -// One specualtive branch of a protocol component. -// `ended` IFF this branch has reached SyncBlocker::SyncBlockEnd before. -#[derive(Debug, Clone)] -struct ProtoComponentBranch { - state: ComponentState, - inner: ProtoComponentBranchInner, - ended: bool, -} - -// A structure wrapping a set of three pointers, making it impossible -// to miss that they are being setup for `cyclic_drain`. -struct CyclicDrainer<'a, K: Eq + Hash, V> { - input: &'a mut HashMap, - swap: &'a mut HashMap, - output: &'a mut HashMap, -} - -// Small convenience trait for extending the stdlib's bool type with -// an optionlike replace method for increasing brevity. -trait ReplaceBoolTrue { - fn replace_with_true(&mut self) -> bool; -} - -//////////////// IMPL //////////////////////////// - -impl ReplaceBoolTrue for bool { - fn replace_with_true(&mut self) -> bool { - let was = *self; - *self = true; - !was - } -} - -// CuUndecided provides a mostly immutable view into the ConnectorUnphased structure, -// making it harder to accidentally mutate its contents in a way that cannot be rolled back. -impl CuUndecided for ConnectorUnphased { - fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription) { - (&mut *self.logger, &self.proto_description) - } - fn logger_and_protocol_components( - &mut self, - ) -> (&mut dyn Logger, &mut HashMap) { - (&mut *self.logger, &mut self.proto_components) - } - fn logger(&mut self) -> &mut dyn Logger { - &mut *self.logger - } - fn proto_description(&self) -> &ProtocolDescription { - &self.proto_description - } - fn native_component_id(&self) -> ComponentId { - self.native_component_id - } -} -impl<'a, K, V> MapTempsGuard<'a, K, V> { - fn reborrow(&mut self) -> MapTempsGuard<'_, K, V> { - MapTempsGuard(self.0) - } - fn split_first_mut(self) -> (MapTempGuard<'a, K, V>, MapTempsGuard<'a, K, V>) { - let (head, tail) = self.0.split_first_mut().expect("Cache exhausted"); - (MapTempGuard::new(head), MapTempsGuard(tail)) - } -} -impl<'a, K, V> MapTempGuard<'a, K, V> { - fn new(map: &'a mut HashMap) -> Self { - assert!(map.is_empty()); // sanity check - Self(map) - } -} -impl<'a, K, V> Drop for MapTempGuard<'a, K, V> { - fn drop(&mut self) { - assert!(self.0.is_empty()); // sanity check - } -} -impl<'a, K, V> Deref for MapTempGuard<'a, K, V> { - type Target = HashMap; - fn deref(&self) -> &::Target { - self.0 - } -} -impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> { - fn deref_mut(&mut self) -> &mut ::Target { - self.0 - } -} -impl Connector { - /// Read the message received by the given port in the previous synchronous round. - pub fn gotten(&self, port: PortId) -> Result<&Payload, GottenError> { - use GottenError as Ge; - if let ConnectorPhased::Communication(comm) = &self.phased { - match &comm.round_result { - Err(_) => Err(Ge::PreviousSyncFailed), - Ok(None) => Err(Ge::NoPreviousRound), - Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet), - } - } else { - return Err(Ge::NoPreviousRound); - } - } - /// Creates a new, empty synchronous batch for the connector and selects it. - /// Subsequent calls to `put` and `get` with populate the new batch with port operations. - pub fn next_batch(&mut self) -> Result { - // returns index of new batch - if let ConnectorPhased::Communication(comm) = &mut self.phased { - comm.native_batches.push(Default::default()); - Ok(comm.native_batches.len() - 1) - } else { - Err(WrongStateError) - } - } - - fn port_op_access( - &mut self, - port: PortId, - expect_polarity: Polarity, - ) -> Result<&mut NativeBatch, PortOpError> { - use PortOpError as Poe; - let Self { unphased: cu, phased } = self; - let info = cu.ips.port_info.map.get(&port).ok_or(Poe::UnknownPolarity)?; - if info.owner != cu.native_component_id { - return Err(Poe::PortUnavailable); - } - if info.polarity != expect_polarity { - return Err(Poe::WrongPolarity); - } - match phased { - ConnectorPhased::Setup { .. } => Err(Poe::NotConnected), - ConnectorPhased::Communication(comm) => { - let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant - Ok(batch) - } - } - } - - /// Add a `put` operation to the connector's currently-selected synchronous batch. - /// Returns an error if the given port is not owned by the native component, - /// has the wrong polarity, or is already included in the batch. - pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> { - use PortOpError as Poe; - let batch = self.port_op_access(port, Putter)?; - if batch.to_put.contains_key(&port) { - Err(Poe::MultipleOpsOnPort) - } else { - batch.to_put.insert(port, payload); - Ok(()) - } - } - - /// Add a `get` operation to the connector's currently-selected synchronous batch. - /// Returns an error if the given port is not owned by the native component, - /// has the wrong polarity, or is already included in the batch. - pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> { - use PortOpError as Poe; - let batch = self.port_op_access(port, Getter)?; - if batch.to_get.insert(port) { - Ok(()) - } else { - Err(Poe::MultipleOpsOnPort) - } - } - - /// Participate in the completion of the next synchronous round, in which - /// the native component will perform the set of prepared operations of exactly one - /// of the synchronous batches. At the end of the procedure, the synchronous - /// batches will be reset to a singleton set, whose only element is selected, and empty. - /// The caller yields control over to the connector runtime to faciltiate the underlying - /// coordination work until either (a) the round is completed with all components' states - /// updated accordingly, (b) a distributed failure event resets all components' - /// states to what they were prior to the sync call, or (c) the sync procedure encounters - /// an unrecoverable error which ends the call early, and breaks the session and connector's - /// states irreversably. - /// Note that the (b) case necessitates the success of a distributed rollback procedure, - /// which this component may initiate, but cannot guarantee will succeed in time or at all. - /// consequently, the given timeout duration represents a duration in which the connector - /// will make a best effort to fail the round and return control flow to the caller. - pub fn sync(&mut self, timeout: Option) -> Result { - // This method first destructures the connector, and checks for obvious - // failure cases. The bulk of the behavior continues in `connected_sync`, - // to minimize indentation, and enable convient ?-style short circuit syntax. - let Self { unphased: cu, phased } = self; - match phased { - ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected), - ConnectorPhased::Communication(comm) => { - match &comm.round_result { - Err(SyncError::Unrecoverable(e)) => { - log!(cu.logger(), "Attempted to start sync round, but previous error {:?} was unrecoverable!", e); - return Err(SyncError::Unrecoverable(e.clone())); - } - _ => {} - } - comm.round_result = Self::connected_sync(cu, comm, timeout); - comm.round_index += 1; - match &comm.round_result { - Ok(None) => unreachable!(), - Ok(Some(ok_result)) => Ok(ok_result.batch_index), - Err(sync_error) => Err(sync_error.clone()), - } - } - } - } - - // Attempts to complete the synchronous round for the given - // communication-phased connector structure. - // Modifies components and ports in `cu` IFF the round succeeds. - #[inline] - fn connected_sync( - cu: &mut ConnectorUnphased, - comm: &mut ConnectorCommunication, - timeout: Option, - ) -> Result, SyncError> { - ////////////////////////////////// - use SyncError as Se; - ////////////////////////////////// - - // Create separate storages for ports and components stored in `cu`, - // while kicking off the branching of components until the set of - // components entering their synchronous block is finalized in `branching_proto_components`. - // This is the last time cu's components and ports are accessed until the round is decided. - let mut ips = cu.ips.clone(); - let mut branching_proto_components = - HashMap::::default(); - let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu - .proto_components - .iter() - .map(|(&proto_id, proto)| (proto_id, proto.clone())) - .collect(); - log!(cu.logger(), "Nonsync running {} proto components...", unrun_components.len()); - // initially, the set of components to run is the set of components stored by `cu`, - // but they are eventually drained into `branching_proto_components`. - // Some components exit first, and others are created and put into `unrun_components`. - while let Some((proto_component_id, mut component)) = unrun_components.pop() { - log!( - cu.logger(), - "Nonsync running proto component with ID {:?}. {} to go after this", - proto_component_id, - unrun_components.len() - ); - let (logger, proto_description) = cu.logger_and_protocol_description(); - let mut ctx = NonsyncProtoContext { - ips: &mut ips, - logger, - proto_component_id, - unrun_components: &mut unrun_components, - }; - let blocker = component.nonsync_run(&mut ctx, proto_description); - log!( - logger, - "proto component {:?} ran to nonsync blocker {:?}", - proto_component_id, - &blocker - ); - use NonsyncBlocker as B; - match blocker { - B::ComponentExit => drop(component), - B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)), - B::SyncBlockStart => assert!(branching_proto_components - .insert(proto_component_id, BranchingProtoComponent::initial(component)) - .is_none()), // Some(_) returned IFF some component identifier key is overwritten (BAD!) - } - } - log!( - cu.logger(), - "All {} proto components are now done with Nonsync phase", - branching_proto_components.len(), - ); - - // Create temporary structures needed for the synchronous phase of the round - let mut rctx = RoundCtx { - ips, // already used previously, now moved into RoundCtx - solution_storage: { - let subtree_id_iter = { - // Create an iterator over the identifiers of this - // connector's childen in the _solution tree_. - // Namely, the native, all locally-managed components, - // and all this connector's children in the _consensus tree_ (other connectors). - let n = std::iter::once(SubtreeId::LocalComponent(cu.native_component_id)); - let c = branching_proto_components - .keys() - .map(|&cid| SubtreeId::LocalComponent(cid)); - let e = comm - .neighborhood - .children - .iter() - .map(|&index| SubtreeId::NetEndpoint { index }); - n.chain(c).chain(e) - }; - log!( - cu.logger, - "Children in subtree are: {:?}", - DebuggableIter(subtree_id_iter.clone()) - ); - SolutionStorage::new(subtree_id_iter) - }, - spec_var_stream: cu.ips.id_manager.new_spec_var_stream(), - payload_inbox: Default::default(), // buffer for in-memory payloads to be handled - deadline: timeout.map(|to| Instant::now() + to), - }; - log!(cu.logger(), "Round context structure initialized"); - - // Prepare the branching native component, involving the conversion - // of its synchronous batches (user provided) into speculative branches eagerly. - // As a side effect, send all PUTs with the appropriate predicates. - // Afterwards, each native component's speculative branch finds a local - // solution the moment it's received all the messages it's awaiting. - log!( - cu.logger(), - "Translating {} native batches into branches...", - comm.native_batches.len() - ); - // Allocate a single speculative variable to distinguish each native branch. - // This enables native components to have distinct branches with identical - // FIRING variables. - let native_spec_var = rctx.spec_var_stream.next(); - log!(cu.logger(), "Native branch spec var is {:?}", native_spec_var); - let mut branching_native = BranchingNative { branches: Default::default() }; - 'native_branches: for ((native_branch, index), branch_spec_val) in - comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain()) - { - let NativeBatch { to_get, to_put } = native_branch; - // compute the solution predicate to associate with this branch. - let predicate = { - let mut predicate = Predicate::default(); - // all firing ports have SpecVal::FIRING - let firing_iter = to_get.iter().chain(to_put.keys()).copied(); - log!( - cu.logger(), - "New native with firing ports {:?}", - firing_iter.clone().collect::>() - ); - let firing_ports: HashSet = firing_iter.clone().collect(); - for port in firing_iter { - let var = cu.ips.port_info.spec_var_for(port); - predicate.assigned.insert(var, SpecVal::FIRING); - } - // all silent ports have SpecVal::SILENT - for port in cu.ips.port_info.ports_owned_by(cu.native_component_id) { - if firing_ports.contains(port) { - // this one is FIRING - continue; - } - let var = cu.ips.port_info.spec_var_for(*port); - if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { - log!(&mut *cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); - continue 'native_branches; - } - } - // this branch is consistent. distinguish it with a unique var:val mapping and proceed - predicate.inserted(native_spec_var, branch_spec_val) - }; - log!(cu.logger(), "Native branch index={:?} has consistent {:?}", index, &predicate); - // send all outgoing messages (by buffering them) - for (putter, payload) in to_put { - let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - log!( - cu.logger(), - "Native branch {} sending msg {:?} with putter {:?}", - index, - &msg, - putter - ); - // sanity check - assert_eq!(Putter, cu.ips.port_info.map.get(&putter).unwrap().polarity); - rctx.putter_push(cu, putter, msg); - } - let branch = NativeBranch { index, gotten: Default::default(), to_get }; - if branch.is_ended() { - // empty to_get set => already corresponds with a component solution - log!( - cu.logger(), - "Native submitting solution for batch {} with {:?}", - index, - &predicate - ); - rctx.solution_storage.submit_and_digest_subtree_solution( - cu, - SubtreeId::LocalComponent(cu.native_component_id), - predicate.clone(), - ); - } - if let Some(_) = branching_native.branches.insert(predicate, branch) { - // thanks to the native_spec_var, each batch has a distinct predicate - unreachable!() - } - } - // restore the invariant: !native_batches.is_empty() - comm.native_batches.push(Default::default()); - // Call to another big method; keep running this round - // until a distributed decision is reached! - log!(cu.logger(), "Searching for decision..."); - let decision = Self::sync_reach_decision( - cu, - comm, - &mut branching_native, - &mut branching_proto_components, - &mut rctx, - )?; - log!(cu.logger(), "Committing to decision {:?}!", &decision); - comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger(), &decision)?; - - // propagate the decision to children - let msg = Msg::CommMsg(CommMsg { - round_index: comm.round_index, - contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce { - decision: decision.clone(), - }), - }); - log!( - cu.logger(), - "Announcing decision {:?} through child endpoints {:?}", - &msg, - &comm.neighborhood.children - ); - for &child in comm.neighborhood.children.iter() { - comm.endpoint_manager.send_to_comms(child, &msg)?; - } - let ret = match decision { - Decision::Failure => { - // untouched port/component fields of `cu` are NOT overwritten. - // the result is a rollback. - Err(Se::RoundFailure) - } - Decision::Success(predicate) => { - // commit changes to component states - cu.proto_components.clear(); - let (logger, proto_components) = cu.logger_and_protocol_components(); - proto_components.extend( - // "flatten" branching components, committing the speculation - // consistent with the predicate decided upon. - branching_proto_components - .into_iter() - .map(|(cid, bpc)| (cid, bpc.collapse_with(logger, &predicate))), - ); - // commit changes to ports and id_manager - log!( - logger, - "End round with (updated) component states {:?}", - proto_components.keys() - ); - cu.ips = rctx.ips; - // consume native - let round_ok = branching_native.collapse_with(cu.logger(), &predicate); - Ok(Some(round_ok)) - } - }; - log!(cu.logger(), "Sync round ending! Cleaning up"); - ret - } - - // Once the synchronous round has been started, this procedure - // routs and handles payloads, receives control messages from neighboring connectors, - // checks for timeout, and aggregates solutions until a distributed decision is reached. - // The decision is either a solution (success case), or a distributed timeout rollback (failure case) - // The final possible outcome is an unrecoverable error, which results from some fundamental misbehavior, - // a network channel breaking, etc. - fn sync_reach_decision( - cu: &mut impl CuUndecided, - comm: &mut ConnectorCommunication, - branching_native: &mut BranchingNative, - branching_proto_components: &mut HashMap, - rctx: &mut RoundCtx, - ) -> Result { - // The round is in progress, and now its just a matter of arriving at a decision. - let mut already_requested_failure = false; - if branching_native.branches.is_empty() { - // An unsatisfiable native is the easiest way to detect failure - log!(cu.logger(), "Native starts with no branches! Failure!"); - match comm.neighborhood.parent { - Some(parent) => { - if already_requested_failure.replace_with_true() { - Self::request_failure(cu, comm, parent)? - } else { - log!(cu.logger(), "Already requested failure"); - } - } - None => { - log!(cu.logger(), "No parent. Deciding on failure"); - return Ok(Decision::Failure); - } - } - } - - // Create a small set of "workspace" hashmaps, to be passed by-reference into various calls. - // This is an optimization, avoiding repeated allocation. - let mut pcb_temps_owner = <[HashMap; 3]>::default(); - let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner); - let mut bn_temp_owner = >::default(); - - // first, we run every protocol component to their sync blocker. - // Afterwards we establish a loop invariant: no new decision can be reached - // without handling messages in the buffer or arriving from the network - log!( - cu.logger(), - "Running all {} proto components to their sync blocker...", - branching_proto_components.len() - ); - for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() { - let BranchingProtoComponent { branches } = proto_component; - // must reborrow to constrain the lifetime of pcb_temps to inside the loop - let (swap, pcb_temps) = pcb_temps.reborrow().split_first_mut(); - let (blocked, _pcb_temps) = pcb_temps.split_first_mut(); - // initially, no protocol components have .ended==true - // drain from branches --> blocked - let cd = CyclicDrainer { input: branches, swap: swap.0, output: blocked.0 }; - BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; - // swap the blocked branches back - std::mem::swap(blocked.0, branches); - if branches.is_empty() { - log!(cu.logger(), "{:?} has become inconsistent!", proto_component_id); - if let Some(parent) = comm.neighborhood.parent { - if already_requested_failure.replace_with_true() { - Self::request_failure(cu, comm, parent)? - } else { - log!(cu.logger(), "Already requested failure"); - } - } else { - log!(cu.logger(), "As the leader, deciding on timeout"); - return Ok(Decision::Failure); - } - } - } - log!(cu.logger(), "All proto components are blocked"); - // ...invariant established! - - log!(cu.logger(), "Entering decision loop..."); - comm.endpoint_manager.undelay_all(); - 'undecided: loop { - // handle all buffered messages, sending them through endpoints / feeding them to components - log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len()); - while let Some((getter, send_payload_msg)) = rctx.getter_pop() { - let getter_info = rctx.ips.port_info.map.get(&getter).unwrap(); - let cid = getter_info.owner; // the id of the component owning `getter` port - assert_eq!(Getter, getter_info.polarity); // sanity check - log!( - cu.logger(), - "Routing msg {:?} to {:?} via {:?}", - &send_payload_msg, - getter, - &getter_info.route - ); - match getter_info.route { - Route::UdpEndpoint { index } => { - // this is a message sent over the network through a UDP endpoint - let udp_endpoint_ext = - &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[index]; - let SendPayloadMsg { predicate, payload } = send_payload_msg; - log!(cu.logger(), "Delivering to udp endpoint index={}", index); - // UDP mediator messages are buffered until the end of the round, - // because they are still speculative - udp_endpoint_ext.outgoing_payloads.insert(predicate, payload); - } - Route::NetEndpoint { index } => { - // this is a message sent over the network as a control message - let msg = Msg::CommMsg(CommMsg { - round_index: comm.round_index, - contents: CommMsgContents::SendPayload(send_payload_msg), - }); - // actually send the message now - comm.endpoint_manager.send_to_comms(index, &msg)?; - } - Route::LocalComponent if cid == cu.native_component_id() => branching_native - .feed_msg( - cu, - rctx, - getter, - &send_payload_msg, - MapTempGuard::new(&mut bn_temp_owner), - ), - Route::LocalComponent => { - // some other component_id routed locally. must be a protocol component! - if let Some(branching_component) = branching_proto_components.get_mut(&cid) - { - // The recipient component is still running! - // Feed it this message AND run it again until all branches are blocked - branching_component.feed_msg( - cu, - rctx, - cid, - getter, - &send_payload_msg, - pcb_temps.reborrow(), - )?; - if branching_component.branches.is_empty() { - // A solution is impossible! this component has zero branches - // Initiate a rollback - log!(cu.logger(), "{:?} has become inconsistent!", cid); - if let Some(parent) = comm.neighborhood.parent { - if already_requested_failure.replace_with_true() { - Self::request_failure(cu, comm, parent)? - } else { - log!(cu.logger(), "Already requested failure"); - } - } else { - log!(cu.logger(), "As the leader, deciding on timeout"); - return Ok(Decision::Failure); - } - } - } else { - // This case occurs when the component owning `getter` has exited, - // but the putter is still running (and sent this message). - // we drop the message on the floor, because it cannot be involved - // in a solution (requires sending a message over a dead channel!). - log!( - cu.logger(), - "Delivery to getter {:?} msg {:?} failed because {:?} isn't here", - getter, - &send_payload_msg, - cid - ); - } - } - } - } - // payload buffer is empty. - // check if we have a solution yet - log!(cu.logger(), "Check if we have any local decisions..."); - for solution in rctx.solution_storage.iter_new_local_make_old() { - log!(cu.logger(), "New local decision with solution {:?}...", &solution); - match comm.neighborhood.parent { - Some(parent) => { - // Always forward connector-local solutions to my parent - // AS they are moved from new->old in solution storage. - log!(cu.logger(), "Forwarding to my parent {:?}", parent); - let suggestion = Decision::Success(solution); - let msg = Msg::CommMsg(CommMsg { - round_index: comm.round_index, - contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { - suggestion, - }), - }); - comm.endpoint_manager.send_to_comms(parent, &msg)?; - } - None => { - log!(cu.logger(), "No parent. Deciding on solution {:?}", &solution); - return Ok(Decision::Success(solution)); - } - } - } - - // stuck! make progress by receiving a msg - // try recv ONE message arriving through an endpoint - log!(cu.logger(), "No decision yet. Let's recv an endpoint msg..."); - { - // This is the first call that may block the thread! - // Until a message arrives over the network, no new solutions are possible. - let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = - match comm.endpoint_manager.try_recv_any_comms(cu, rctx, comm.round_index)? { - CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg), - CommRecvOk::NewPayloadMsgs => { - // 1+ speculative payloads have been buffered - // but no other control messages that require further handling - // restart the loop to process the messages before blocking - continue 'undecided; - } - CommRecvOk::TimeoutWithoutNew => { - log!(cu.logger(), "Reached user-defined deadling without decision..."); - if let Some(parent) = comm.neighborhood.parent { - if already_requested_failure.replace_with_true() { - Self::request_failure(cu, comm, parent)? - } else { - log!(cu.logger(), "Already requested failure"); - } - } else { - log!(cu.logger(), "As the leader, deciding on timeout"); - return Ok(Decision::Failure); - } - // disable future timeout events! our request for failure has been sent - // all we can do at this point is wait. - rctx.deadline = None; - continue 'undecided; - } - }; - // We received a control message that requires further action - log!( - cu.logger(), - "Received from endpoint {} ctrl msg {:?}", - net_index, - &comm_ctrl_msg - ); - match comm_ctrl_msg { - CommCtrlMsg::Suggest { suggestion } => { - // We receive the solution of another connector (part of the decision process) - // (only accept this through a child endpoint) - if comm.neighborhood.children.contains(&net_index) { - match suggestion { - Decision::Success(predicate) => { - // child solution contributes to local solution - log!(cu.logger(), "Child provided solution {:?}", &predicate); - let subtree_id = SubtreeId::NetEndpoint { index: net_index }; - rctx.solution_storage.submit_and_digest_subtree_solution( - cu, subtree_id, predicate, - ); - } - Decision::Failure => { - // Someone timed out! propagate this to parent or decide - match comm.neighborhood.parent { - None => { - log!(cu.logger(), "I decide on my child's failure"); - break 'undecided Ok(Decision::Failure); - } - Some(parent) => { - log!(cu.logger(), "Forwarding failure through my parent endpoint {:?}", parent); - if already_requested_failure.replace_with_true() { - Self::request_failure(cu, comm, parent)? - } else { - log!(cu.logger(), "Already requested failure"); - } - } - } - } - } - } else { - // Unreachable if all connectors are playing by the rules. - // Silently ignored instead of causing panic to make the - // runtime more robust against network fuzz - log!( - cu.logger(), - "Discarding suggestion {:?} from non-child endpoint idx {:?}", - &suggestion, - net_index - ); - } - } - CommCtrlMsg::Announce { decision } => { - // Apparently this round is over! A decision has been reached - if Some(net_index) == comm.neighborhood.parent { - // We accept the decision because it comes from our parent. - // end this loop, and and the synchronous round - return Ok(decision); - } else { - // Again, unreachable if all connectors are playing by the rules - log!( - cu.logger(), - "Discarding announcement {:?} from non-parent endpoint idx {:?}", - &decision, - net_index - ); - } - } - } - } - log!(cu.logger(), "Endpoint msg recv done"); - } - } - - // Send a failure request to my parent in the consensus tree - fn request_failure( - cu: &mut impl CuUndecided, - comm: &mut ConnectorCommunication, - parent: usize, - ) -> Result<(), UnrecoverableSyncError> { - log!(cu.logger(), "Forwarding to my parent {:?}", parent); - let suggestion = Decision::Failure; - let msg = Msg::CommMsg(CommMsg { - round_index: comm.round_index, - contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { suggestion }), - }); - comm.endpoint_manager.send_to_comms(parent, &msg) - } -} -impl NativeBranch { - fn is_ended(&self) -> bool { - self.to_get.is_empty() - } -} -impl BranchingNative { - // Feed the given payload to the native component - // May result in discovering new component solutions, - // or fork speculative branches if the message's predicate - // is MORE SPECIFIC than the branches of the native - fn feed_msg( - &mut self, - cu: &mut impl CuUndecided, - rctx: &mut RoundCtx, - getter: PortId, - send_payload_msg: &SendPayloadMsg, - bn_temp: MapTempGuard<'_, Predicate, NativeBranch>, - ) { - log!(cu.logger(), "feeding native getter {:?} {:?}", getter, &send_payload_msg); - assert_eq!(Getter, rctx.ips.port_info.map.get(&getter).unwrap().polarity); - let mut draining = bn_temp; - let finished = &mut self.branches; - std::mem::swap(draining.0, finished); - // Visit all native's branches, and feed those whose current predicates are - // consistent with that of the received message. - for (predicate, mut branch) in draining.drain() { - log!(cu.logger(), "visiting native branch {:?} with {:?}", &branch, &predicate); - let var = rctx.ips.port_info.spec_var_for(getter); - if predicate.query(var) != Some(SpecVal::FIRING) { - // optimization. Don't bother trying this branch, - // because the resulting branch would have an inconsistent predicate. - // the existing branch asserts the getter port is SILENT - log!( - cu.logger(), - "skipping branch with {:?} that doesn't want the message (fastpath)", - &predicate - ); - Self::insert_branch_merging(finished, predicate, branch); - continue; - } - // Define a little helper closure over `rctx` - // for feeding the given branch this new payload, - // and submitting any resulting solutions - let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { - // This branch notes the getter port as "gotten" - branch.to_get.remove(&getter); - if let Some(was) = branch.gotten.insert(getter, send_payload_msg.payload.clone()) { - // Sanity check. Payload mapping (Predicate,Port) should be unique each round - assert_eq!(&was, &send_payload_msg.payload); - } - if branch.is_ended() { - // That was the last message the branch was awaiting! - // Submitting new component solution. - log!( - cu.logger(), - "new native solution with {:?} is_ended() with gotten {:?}", - &predicate, - &branch.gotten - ); - let subtree_id = SubtreeId::LocalComponent(cu.native_component_id()); - rctx.solution_storage.submit_and_digest_subtree_solution( - cu, - subtree_id, - predicate.clone(), - ); - } else { - // This branch still has ports awaiting their messages - log!( - cu.logger(), - "Fed native {:?} still has to_get {:?}", - &predicate, - &branch.to_get - ); - } - }; - use AssignmentUnionResult as Aur; - match predicate.assignment_union(&send_payload_msg.predicate) { - Aur::Nonexistant => { - // The predicates of this branch and the payload are incompatible - // retain this branch as-is - log!( - cu.logger(), - "skipping branch with {:?} that doesn't want the message (slowpath)", - &predicate - ); - Self::insert_branch_merging(finished, predicate, branch); - } - Aur::Equivalent | Aur::FormerNotLatter => { - // The branch's existing predicate "covers" (is at least as specific) - // as that of the payload. Can feed this branch the message without altering - // the branch predicate. - feed_branch(&mut branch, &predicate); - log!(cu.logger(), "branch pred covers it! Accept the msg"); - Self::insert_branch_merging(finished, predicate, branch); - } - Aur::LatterNotFormer => { - // The predicates of branch and payload are compatible, - // but that of the payload is strictly more specific than that of the latter. - // FORK the branch, feed the fork the message, and give it the payload's predicate. - let mut branch2 = branch.clone(); - let predicate2 = send_payload_msg.predicate.clone(); - feed_branch(&mut branch2, &predicate2); - log!( - cu.logger(), - "payload pred {:?} covers branch pred {:?}", - &predicate2, - &predicate - ); - Self::insert_branch_merging(finished, predicate, branch); - Self::insert_branch_merging(finished, predicate2, branch2); - } - Aur::New(predicate2) => { - // The predicates of branch and payload are compatible, - // but their union is some new predicate (both preds assign something new). - // FORK the branch, feed the fork the message, and give it the new predicate. - let mut branch2 = branch.clone(); - feed_branch(&mut branch2, &predicate2); - log!( - cu.logger(), - "new subsuming pred created {:?}. forking and feeding", - &predicate2 - ); - Self::insert_branch_merging(finished, predicate, branch); - Self::insert_branch_merging(finished, predicate2, branch2); - } - } - } - } - - // Insert a new speculate branch into the given storage, - // MERGING it with an existing branch if their predicate keys clash. - fn insert_branch_merging( - branches: &mut HashMap, - predicate: Predicate, - mut branch: NativeBranch, - ) { - let e = branches.entry(predicate); - use std::collections::hash_map::Entry; - match e { - Entry::Vacant(ev) => { - // no existing branch present. We insert it no problem. (The most common case) - ev.insert(branch); - } - Entry::Occupied(mut eo) => { - // Oh dear, there is already a branch with this predicate. - // Rather than choosing either branch, we MERGE them. - // This means taking the UNION of their .gotten and the INTERSECTION of their .to_get - let old = eo.get_mut(); - for (k, v) in branch.gotten.drain() { - if old.gotten.insert(k, v).is_none() { - // added a gotten element in `branch` not already in `old` - old.to_get.remove(&k); - } - } - } - } - } - - // Given the predicate for the round's solution, collapse this - // branching native to an ended branch whose predicate is consistent with it. - // return as `RoundEndedNative` the result of a native completing successful round - fn collapse_with( - self, - logger: &mut dyn Logger, - solution_predicate: &Predicate, - ) -> RoundEndedNative { - log!( - logger, - "Collapsing native with {} branch preds {:?}", - self.branches.len(), - self.branches.keys() - ); - for (branch_predicate, branch) in self.branches { - log!( - logger, - "Considering native branch {:?} with to_get {:?} gotten {:?}", - &branch_predicate, - &branch.to_get, - &branch.gotten - ); - if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) { - let NativeBranch { index, gotten, .. } = branch; - log!(logger, "Collapsed native has gotten {:?}", &gotten); - return RoundEndedNative { batch_index: index, gotten }; - } - } - log!(logger, "Native had no branches matching pred {:?}", solution_predicate); - panic!("Native had no branches matching pred {:?}", solution_predicate); - } -} -impl BranchingProtoComponent { - // Create a singleton-branch branching protocol component as - // speculation begins, with the given protocol state. - fn initial(state: ComponentState) -> Self { - let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false }; - Self { branches: hashmap! { Predicate::default() => branch } } - } - - // run all the given branches (cd.input) to their SyncBlocker, - // populating cd.output by cyclically draining "input" -> "cd."input" / cd.output. - // (to prevent concurrent r/w of one structure, we realize "input" as cd.input for reading and cd.swap for writing) - // This procedure might lose branches, and it might create new branches. - fn drain_branches_to_blocked( - cd: CyclicDrainer, - cu: &mut impl CuUndecided, - rctx: &mut RoundCtx, - proto_component_id: ComponentId, - ) -> Result<(), UnrecoverableSyncError> { - // let CyclicDrainer { input, swap, output } = cd; - while !cd.input.is_empty() { - 'branch_iter: for (mut predicate, mut branch) in cd.input.drain() { - let mut ctx = SyncProtoContext { - rctx, - predicate: &predicate, - branch_inner: &mut branch.inner, - }; - // Run this component's state to the next syncblocker for handling - let blocker = branch.state.sync_run(&mut ctx, cu.proto_description()); - log!( - cu.logger(), - "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", - proto_component_id, - &predicate, - &blocker, - ); - use SyncBlocker as B; - match blocker { - B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency - B::CouldntReadMsg(port) => { - // sanity check: `CouldntReadMsg` returned IFF the message is unavailable - assert!(!branch.inner.inbox.contains_key(&port)); - // This branch hit a proper blocker: progress awaits the receipt of some message. Exit the cycle. - Self::insert_branch_merging(cd.output, predicate, branch); - } - B::CouldntCheckFiring(port) => { - // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned - let var = rctx.ips.port_info.spec_var_for(port); - assert!(predicate.query(var).is_none()); - // speculate on the two possible values of `var`. Schedule both branches to be rerun. - - Self::insert_branch_merging( - cd.swap, - predicate.clone().inserted(var, SpecVal::SILENT), - branch.clone(), - ); - Self::insert_branch_merging( - cd.swap, - predicate.inserted(var, SpecVal::FIRING), - branch, - ); - } - B::PutMsg(putter, payload) => { - // sanity check: The given port indeed has `Putter` polarity - assert_eq!(Putter, rctx.ips.port_info.map.get(&putter).unwrap().polarity); - // assign FIRING to this port's associated firing variable - let var = rctx.ips.port_info.spec_var_for(putter); - let was = predicate.assigned.insert(var, SpecVal::FIRING); - if was == Some(SpecVal::SILENT) { - // Discard the branch, as it clearly has contradictory requirements for this value. - log!(cu.logger(), "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", - proto_component_id, putter, var); - drop((predicate, branch)); - } else { - // Note that this port has put this round, - // and assert that this isn't its 2nd time putting this round (otheriwse PDL programming error) - assert!(branch.inner.did_put_or_get.insert(putter)); - log!(cu.logger(), "Proto component {:?} with pred {:?} putting payload {:?} on port {:?} (using var {:?})", - proto_component_id, &predicate, &payload, putter, var); - // Send the given payload (by buffering it). - let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - rctx.putter_push(cu, putter, msg); - // Branch can still make progress. Schedule to be rerun - - Self::insert_branch_merging(cd.swap, predicate, branch); - } - } - B::SyncBlockEnd => { - // This branch reached the end of it's synchronous block - // assign all variables of owned ports that DIDN'T fire to SILENT - for port in rctx.ips.port_info.ports_owned_by(proto_component_id) { - let var = rctx.ips.port_info.spec_var_for(*port); - let actually_exchanged = branch.inner.did_put_or_get.contains(port); - let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); - let speculated_to_fire = val == SpecVal::FIRING; - if actually_exchanged != speculated_to_fire { - log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} actually_exchanged={}, speculated_to_fire={}", - port, var, val, actually_exchanged, speculated_to_fire); - // IMPLICIT inconsistency - drop((predicate, branch)); - continue 'branch_iter; - } - } - // submit solution for this component - let subtree_id = SubtreeId::LocalComponent(proto_component_id); - rctx.solution_storage.submit_and_digest_subtree_solution( - cu, - subtree_id, - predicate.clone(), - ); - branch.ended = true; - // This branch exits the cyclic drain - Self::insert_branch_merging(cd.output, predicate, branch); - } - } - } - std::mem::swap(cd.input, cd.swap); - } - Ok(()) - } - - // Feed this branching protocol component the given message, and - // then run all branches until they are once again blocked. - fn feed_msg( - &mut self, - cu: &mut impl CuUndecided, - rctx: &mut RoundCtx, - proto_component_id: ComponentId, - getter: PortId, - send_payload_msg: &SendPayloadMsg, - pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>, - ) -> Result<(), UnrecoverableSyncError> { - log!( - cu.logger(), - "feeding proto component {:?} getter {:?} {:?}", - proto_component_id, - getter, - &send_payload_msg - ); - let (mut unblocked, pcb_temps) = pcb_temps.split_first_mut(); - let (mut blocked, pcb_temps) = pcb_temps.split_first_mut(); - // partition drain from self.branches -> {unblocked, blocked} (not cyclic) - log!(cu.logger(), "visiting {} blocked branches...", self.branches.len()); - for (predicate, mut branch) in self.branches.drain() { - if branch.ended { - log!(cu.logger(), "Skipping ended branch with {:?}", &predicate); - Self::insert_branch_merging(&mut blocked, predicate, branch); - continue; - } - use AssignmentUnionResult as Aur; - log!(cu.logger(), "visiting branch with pred {:?}", &predicate); - // We give each branch a chance to receive this message, - // those that do are maybe UNBLOCKED, and all others remain BLOCKED. - match predicate.assignment_union(&send_payload_msg.predicate) { - Aur::Nonexistant => { - // this branch does not receive the message. categorize into blocked. - log!(cu.logger(), "skipping branch"); - Self::insert_branch_merging(&mut blocked, predicate, branch); - } - Aur::Equivalent | Aur::FormerNotLatter => { - // retain the existing predicate, but add this payload - log!(cu.logger(), "feeding this branch without altering its predicate"); - branch.feed_msg(getter, send_payload_msg.payload.clone()); - // this branch does receive the message. categorize into unblocked. - Self::insert_branch_merging(&mut unblocked, predicate, branch); - } - Aur::LatterNotFormer => { - // fork branch, give fork the message and payload predicate. original branch untouched - log!(cu.logger(), "Forking this branch, giving it the predicate of the msg"); - let mut branch2 = branch.clone(); - let predicate2 = send_payload_msg.predicate.clone(); - branch2.feed_msg(getter, send_payload_msg.payload.clone()); - // the branch that receives the message is unblocked, the original one is blocked - Self::insert_branch_merging(&mut blocked, predicate, branch); - Self::insert_branch_merging(&mut unblocked, predicate2, branch2); - } - Aur::New(predicate2) => { - // fork branch, give fork the message and the new predicate. original branch untouched - log!(cu.logger(), "Forking this branch with new predicate {:?}", &predicate2); - let mut branch2 = branch.clone(); - branch2.feed_msg(getter, send_payload_msg.payload.clone()); - // the branch that receives the message is unblocked, the original one is blocked - Self::insert_branch_merging(&mut blocked, predicate, branch); - Self::insert_branch_merging(&mut unblocked, predicate2, branch2); - } - } - } - log!(cu.logger(), "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len()); - // drain from unblocked --> blocked - let (swap, _pcb_temps) = pcb_temps.split_first_mut(); // peel off ONE temp storage map - let cd = CyclicDrainer { input: unblocked.0, swap: swap.0, output: blocked.0 }; - BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; - // swap the blocked branches back - std::mem::swap(blocked.0, &mut self.branches); - log!(cu.logger(), "component settles down with branches: {:?}", self.branches.keys()); - Ok(()) - } - - // Insert a new speculate branch into the given storage, - // MERGING it with an existing branch if their predicate keys clash. - fn insert_branch_merging( - branches: &mut HashMap, - predicate: Predicate, - mut branch: ProtoComponentBranch, - ) { - let e = branches.entry(predicate); - use std::collections::hash_map::Entry; - match e { - Entry::Vacant(ev) => { - // no existing branch present. We insert it no problem. (The most common case) - ev.insert(branch); - } - Entry::Occupied(mut eo) => { - // Oh dear, there is already a branch with this predicate. - // Rather than choosing either branch, we MERGE them. - // This means keeping the existing one in-place, and giving it the UNION of the inboxes - let old = eo.get_mut(); - for (k, v) in branch.inner.inbox.drain() { - old.inner.inbox.insert(k, v); - } - } - } - } - - // Given the predicate for the round's solution, collapse this - // branching native to an ended branch whose predicate is consistent with it. - fn collapse_with( - self, - logger: &mut dyn Logger, - solution_predicate: &Predicate, - ) -> ComponentState { - let BranchingProtoComponent { branches } = self; - for (branch_predicate, branch) in branches { - if branch.ended && branch_predicate.assigns_subset(solution_predicate) { - let ProtoComponentBranch { state, .. } = branch; - return state; - } - } - log!(logger, "ProtoComponent had no branches matching pred {:?}", solution_predicate); - panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); - } -} -impl ProtoComponentBranch { - // Feed this branch received message. - // It's safe to receive the same message repeatedly, - // but if we receive a message with different contents, - // it's a sign something has gone wrong! keys of type (port, round, predicate) - // should always map to at most one message value! - fn feed_msg(&mut self, getter: PortId, payload: Payload) { - let e = self.inner.inbox.entry(getter); - use std::collections::hash_map::Entry; - match e { - Entry::Vacant(ev) => { - // new message - ev.insert(payload); - } - Entry::Occupied(eo) => { - // redundant recv. can happen as a result of a - // component A having two branches X and Y related by - assert_eq!(eo.get(), &payload); - } - } - } -} -impl SolutionStorage { - // Create a new solution storage, to manage the local solutions for - // this connector and all of it's children (subtrees) in the solution tree. - fn new(subtree_ids: impl Iterator) -> Self { - // For easy iteration, we store this SubtreeId => {Predicate} - // structure instead as a pair of structures: a vector of predicate sets, - // and a subtree_id-to-index lookup map - let mut subtree_id_to_index: HashMap = Default::default(); - let mut subtree_solutions = vec![]; - for id in subtree_ids { - subtree_id_to_index.insert(id, subtree_solutions.len()); - subtree_solutions.push(Default::default()) - } - // new_local U old_local represents the solutions of this connector itself: - // namely, those that can be created from the union of one element from each child's solution set. - // The difference between new and old is that new stores those NOT YET sent over the network - // to this connector's parent in the solution tree. - // invariant: old_local and new_local have an empty intersection - Self { - subtree_solutions, - subtree_id_to_index, - old_local: Default::default(), - new_local: Default::default(), - } - } - // drain old_local to new_local, visiting all new additions to old_local - pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { - let Self { old_local, new_local, .. } = self; - new_local.drain().map(move |local| { - // rely on invariant: empty intersection between old and new local sets - assert!(old_local.insert(local.clone())); - local - }) - } - // insert a solution for the given subtree ID, - // AND update new_local to include any solutions that become - // possible as a result of this new addition - pub(crate) fn submit_and_digest_subtree_solution( - &mut self, - cu: &mut impl CuUndecided, - subtree_id: SubtreeId, - predicate: Predicate, - ) { - log!(cu.logger(), "++ new component solution {:?} {:?}", subtree_id, &predicate); - let Self { subtree_solutions, new_local, old_local, subtree_id_to_index } = self; - let index = subtree_id_to_index[&subtree_id]; - let was_new = subtree_solutions[index].insert(predicate.clone()); - if was_new { - // This is a newly-added solution! update new_local - // consider ALL consistent combinations of one element from each solution set - // to our right or left in the solution-set vector - // but with THIS PARTICULAR predicate from our own index. - let left = 0..index; - let right = (index + 1)..subtree_solutions.len(); - // iterator over SETS of solutions, one for every component except `subtree_id` (me) - let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); - // Recursively enumerate all solutions matching the description above, - Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local); - } - } - - // Recursively build local solutions for this connector, - // see `submit_and_digest_subtree_solution` - fn elaborate_into_new_local_rec<'a, 'b>( - cu: &mut impl CuUndecided, - partial: Predicate, - mut set_visitor: impl Iterator> + Clone, - old_local: &'b HashSet, - new_local: &'a mut HashSet, - ) { - if let Some(set) = set_visitor.next() { - // incomplete solution. keep recursively creating combined solutions - for pred in set.iter() { - if let Some(elaborated) = pred.union_with(&partial) { - Self::elaborate_into_new_local_rec( - cu, - elaborated, - set_visitor.clone(), - old_local, - new_local, - ) - } - } - } else { - // recursive stop condition. This is a solution for this connector... - if !old_local.contains(&partial) { - // ... and it hasn't been found before - log!(cu.logger(), "storing NEW LOCAL SOLUTION {:?}", &partial); - new_local.insert(partial); - } - } - } -} -impl NonsyncProtoContext<'_> { - // Facilitates callback from the component to the connector runtime, - // creating a new component and changing the given port's ownership to that - // of the new component. - pub(crate) fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { - // Sanity check! The moved ports are owned by this component to begin with - for port in moved_ports.iter() { - assert_eq!(self.proto_component_id, self.ips.port_info.map.get(port).unwrap().owner); - } - // Create the new component, and schedule it to be run - let new_cid = self.ips.id_manager.new_component_id(); - log!( - self.logger, - "Component {:?} added new component {:?} with state {:?}, moving ports {:?}", - self.proto_component_id, - new_cid, - &state, - &moved_ports - ); - self.unrun_components.push((new_cid, state)); - // Update the ownership of the moved ports - for port in moved_ports.iter() { - self.ips.port_info.map.get_mut(port).unwrap().owner = new_cid; - } - if let Some(set) = self.ips.port_info.owned.get_mut(&self.proto_component_id) { - set.retain(|x| !moved_ports.contains(x)); - } - self.ips.port_info.owned.insert(new_cid, moved_ports.clone()); - } - - // Facilitates callback from the component to the connector runtime, - // creating a new port-pair connected by an memory channel - pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] { - // adds two new associated ports, related to each other, and exposed to the proto component - let mut new_cid_fn = || self.ips.id_manager.new_port_id(); - let [o, i] = [new_cid_fn(), new_cid_fn()]; - self.ips.port_info.map.insert( - o, - PortInfo { - route: Route::LocalComponent, - peer: Some(i), - polarity: Putter, - owner: self.proto_component_id, - }, - ); - self.ips.port_info.map.insert( - i, - PortInfo { - route: Route::LocalComponent, - peer: Some(o), - polarity: Getter, - owner: self.proto_component_id, - }, - ); - self.ips - .port_info - .owned - .entry(self.proto_component_id) - .or_default() - .extend([o, i].iter().copied()); - log!( - self.logger, - "Component {:?} port pair (out->in) {:?} -> {:?}", - self.proto_component_id, - o, - i - ); - [o, i] - } -} -impl SyncProtoContext<'_> { - // The component calls the runtime back, inspecting whether it's associated - // preidcate has already determined a (speculative) value for the given port's firing variable. - pub(crate) fn is_firing(&mut self, port: PortId) -> Option { - let var = self.rctx.ips.port_info.spec_var_for(port); - self.predicate.query(var).map(SpecVal::is_firing) - } - - pub(crate) fn did_put_or_get(&mut self, port: PortId) -> bool { - self.branch_inner.did_put_or_get.contains(&port) - } - - // The component calls the runtime back, trying to inspect a port's message - pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - let maybe_msg = self.branch_inner.inbox.get(&port); - if maybe_msg.is_some() { - // Make a note that this component has received - // this port's message 1+ times this round - self.branch_inner.did_put_or_get.insert(port); - } - maybe_msg - } -} diff --git a/src/runtime_old/endpoints.rs b/src/runtime_old/endpoints.rs deleted file mode 100644 index ba531398af9ba9f4d4efbdc534aed68298c29238..0000000000000000000000000000000000000000 --- a/src/runtime_old/endpoints.rs +++ /dev/null @@ -1,462 +0,0 @@ -use super::*; - -enum PollAndPopulateError { - PollFailed, - Timeout, -} - -struct TryRecvAnyNetError { - error: NetEndpointError, - index: usize, -} -///////////////////// -impl NetEndpoint { - // Returns the bincode configuration the NetEndpoint uses pervasively - // for configuration on ser/de operations. - fn bincode_opts() -> impl bincode::config::Options { - // uses variable-length encoding everywhere; great! - bincode::config::DefaultOptions::default() - } - - // Attempt to return some deserializable T-type from - // the inbox or network stream - pub(super) fn try_recv( - &mut self, - logger: &mut dyn Logger, - ) -> Result, NetEndpointError> { - use NetEndpointError as Nee; - // populate inbox with bytes as much as possible (mio::TcpStream is nonblocking) - let before_len = self.inbox.len(); - 'read_loop: loop { - let res = self.stream.read_to_end(&mut self.inbox); - match res { - Err(e) if err_would_block(&e) => break 'read_loop, - Ok(0) => break 'read_loop, - Ok(_) => (), - Err(_e) => return Err(Nee::BrokenNetEndpoint), - } - } - log!( - @ENDPT, - logger, - "Inbox bytes [{:x?}| {:x?}]", - DenseDebugHex(&self.inbox[..before_len]), - DenseDebugHex(&self.inbox[before_len..]), - ); - // Try deserialize from the inbox. `reading_slice' is updated by read() - // in-place to truncate the read part. In the event of success, - // the message bytes are contained in the truncated prefix - let mut reading_slice = self.inbox.as_slice(); - let before_len = reading_slice.len(); - use bincode::config::Options; - match Self::bincode_opts().deserialize_from(&mut reading_slice) { - Ok(msg) => { - let msg_size = before_len - reading_slice.len(); - // inbox[..msg_size] was deserialized into one message! - self.inbox.drain(..msg_size); - log!( - @ENDPT, - logger, - "Yielding msg. Inbox len {}-{}=={}: [{:?}]", - self.inbox.len() + msg_size, - msg_size, - self.inbox.len(), - DenseDebugHex(&self.inbox[..]), - ); - Ok(Some(msg)) - } - Err(e) => match *e { - bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { - // Contents of inbox insufficient for deserializing a message - Ok(None) - } - _ => Err(Nee::MalformedMessage), - }, - } - } - - // Send the given serializable type into the stream - pub(super) fn send( - &mut self, - msg: &T, - io_byte_buffer: &mut IoByteBuffer, - ) -> Result<(), NetEndpointError> { - use bincode::config::Options; - use NetEndpointError as Nee; - // Create a buffer for our bytes: a slice of the io_byte_buffer - let mut buf_slice = io_byte_buffer.as_mut_slice(); - // serialize into the slice, truncating as its filled - Self::bincode_opts().serialize_into(&mut buf_slice, msg).expect("Serialize failed!"); - // written segment is the part missing from buf_slice. Write this as one segment to the TCP stream - let wrote = IoByteBuffer::CAPACITY - buf_slice.len(); - self.stream - .write_all(&io_byte_buffer.as_mut_slice()[..wrote]) - .map_err(|_| Nee::BrokenNetEndpoint)?; - let _ = self.stream.flush(); - Ok(()) - } -} - -impl EndpointManager { - pub(super) fn index_iter(&self) -> Range { - 0..self.num_net_endpoints() - } - pub(super) fn num_net_endpoints(&self) -> usize { - self.net_endpoint_store.endpoint_exts.len() - } - - // Setup-phase particular send procedure. - // Used pervasively, allows for some brevity with the ? operator. - pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { - let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; - net_endpoint.send(msg, &mut self.io_byte_buffer).map_err(|err| { - ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) - }) - } - - // Communication-phase particular send procedure. - // Used pervasively, allows for some brevity with the ? operator. - pub(super) fn send_to_comms( - &mut self, - index: usize, - msg: &Msg, - ) -> Result<(), UnrecoverableSyncError> { - use UnrecoverableSyncError as Use; - let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; - net_endpoint - .send(msg, &mut self.io_byte_buffer) - .map_err(|_| Use::BrokenNetEndpoint { index }) - } - - /// Receive the first message of any kind at all. - /// Why not return SetupMsg? Because often this message will be forwarded to several others, - /// and by returning a Msg, it can be serialized in-place (NetEndpoints allow the sending of Msg types!) - pub(super) fn try_recv_any_setup( - &mut self, - logger: &mut dyn Logger, - deadline: &Option, - ) -> Result<(usize, Msg), ConnectError> { - // helper function, mapping a TryRecvAnySetup type error - // into a ConnectError - fn map_trane( - trane: TryRecvAnyNetError, - net_endpoint_store: &EndpointStore, - ) -> ConnectError { - ConnectError::NetEndpointSetupError( - net_endpoint_store.endpoint_exts[trane.index] - .net_endpoint - .stream - .local_addr() - .unwrap(), // stream must already be connected - trane.error, - ) - } - // try yield undelayed net message - if let Some(tup) = self.undelayed_messages.pop() { - log!(@ENDPT, logger, "RECV undelayed_msg {:?}", &tup); - return Ok(tup); - } - loop { - // try recv from some polled undrained NET endpoint - if let Some(tup) = self - .try_recv_undrained_net(logger) - .map_err(|trane| map_trane(trane, &self.net_endpoint_store))? - { - return Ok(tup); - } - // poll if time remains - self.poll_and_populate(logger, deadline)?; - } - } - - // drops all Setup messages, - // buffers all future round messages, - // drops all previous round messages, - // enqueues all current round SendPayload messages using rctx.getter_push - // returns the first comm_ctrl_msg encountered - // only polls until SOME message is enqueued - pub(super) fn try_recv_any_comms( - &mut self, - cu: &mut impl CuUndecided, - rctx: &mut RoundCtx, - round_index: usize, - ) -> Result { - /////////////////////////////////////////// - // adds scoped functionality for EndpointManager - impl EndpointManager { - // Given some Msg structure in a particular context, - // return a control message for the current round - // if its a payload message, buffer it instead - fn handle_msg( - &mut self, - cu: &mut impl CuUndecided, - rctx: &mut RoundCtx, - net_index: usize, - msg: Msg, - round_index: usize, - some_message_enqueued: &mut bool, - ) -> Option<(usize, CommCtrlMsg)> { - let comm_msg_contents = match msg { - Msg::SetupMsg(..) => return None, // discard setup messages - Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&round_index) { - Ordering::Equal => comm_msg.contents, // ok, keep going - Ordering::Less => { - // discard this message - log!( - cu.logger(), - "We are in round {}, but msg is for round {}. Discard", - comm_msg.round_index, - round_index, - ); - return None; - } - Ordering::Greater => { - // "delay" this message, enqueueing it for a future round - log!( - cu.logger(), - "We are in round {}, but msg is for round {}. Buffer", - comm_msg.round_index, - round_index, - ); - self.delayed_messages.push((net_index, Msg::CommMsg(comm_msg))); - return None; - } - }, - }; - // inspect the contents of this contemporary message, sorting it - match comm_msg_contents { - CommMsgContents::CommCtrl(comm_ctrl_msg) => { - // yes! this is a CommCtrlMsg - Some((net_index, comm_ctrl_msg)) - } - CommMsgContents::SendPayload(send_payload_msg) => { - // Enqueue this payload message - // Still not a CommCtrlMsg, so return None - let getter = - self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming; - rctx.getter_push(getter, send_payload_msg); - *some_message_enqueued = true; - None - } - } - } - } - use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use}; - /////////////////////////////////////////// - let mut some_message_enqueued = false; - // pop undelayed messages, handling them. Return the first CommCtrlMsg popped - while let Some((net_index, msg)) = self.undelayed_messages.pop() { - if let Some((net_index, msg)) = - self.handle_msg(cu, rctx, net_index, msg, round_index, &mut some_message_enqueued) - { - return Ok(CommRecvOk::NewControlMsg { net_index, msg }); - } - } - loop { - // drain endpoints of incoming messages (without blocking) - // return first CommCtrlMsg received - while let Some((net_index, msg)) = self.try_recv_undrained_net(cu.logger())? { - if let Some((net_index, msg)) = self.handle_msg( - cu, - rctx, - net_index, - msg, - round_index, - &mut some_message_enqueued, - ) { - return Ok(CommRecvOk::NewControlMsg { net_index, msg }); - } - } - // try receive a udp message - let recv_buffer = self.io_byte_buffer.as_mut_slice(); - while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() { - let ee = &mut self.udp_endpoint_store.endpoint_exts[index]; - if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() { - // I received a payload! - self.udp_endpoint_store.polled_undrained.insert(index); - if !ee.received_this_round { - let payload = Payload::from(&recv_buffer[..bytes_written]); - let port_spec_var = rctx.ips.port_info.spec_var_for(ee.getter_for_incoming); - let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); - rctx.getter_push( - ee.getter_for_incoming, - SendPayloadMsg { payload, predicate }, - ); - some_message_enqueued = true; - ee.received_this_round = true; - } else { - // lose the message! - } - } - } - if some_message_enqueued { - return Ok(CommRecvOk::NewPayloadMsgs); - } - // poll if time remains - match self.poll_and_populate(cu.logger(), &rctx.deadline) { - Ok(()) => {} // continue looping - Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew), - Err(Pape::PollFailed) => return Err(Use::PollFailed), - } - } - } - - // Try receive some message from any net endpoint without blocking - fn try_recv_undrained_net( - &mut self, - logger: &mut dyn Logger, - ) -> Result, TryRecvAnyNetError> { - while let Some(index) = self.net_endpoint_store.polled_undrained.pop() { - let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; - if let Some(msg) = net_endpoint - .try_recv(logger) - .map_err(|error| TryRecvAnyNetError { error, index })? - { - log!(@ENDPT, logger, "RECV polled_undrained {:?}", &msg); - if !net_endpoint.inbox.is_empty() { - // there may be another message waiting! - self.net_endpoint_store.polled_undrained.insert(index); - } - return Ok(Some((index, msg))); - } - } - Ok(None) - } - - // Poll the network, raising `polled_undrained` flags for endpoints - // as they receive events. - fn poll_and_populate( - &mut self, - logger: &mut dyn Logger, - deadline: &Option, - ) -> Result<(), PollAndPopulateError> { - use PollAndPopulateError as Pape; - // No message yet. Do we have enough time to poll? - let remaining = if let Some(deadline) = deadline { - Some(deadline.checked_duration_since(Instant::now()).ok_or(Pape::Timeout)?) - } else { - None - }; - // Yes we do! Poll with remaining time as poll deadline - self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?; - for event in self.events.iter() { - match TokenTarget::from(event.token()) { - TokenTarget::NetEndpoint { index } => { - self.net_endpoint_store.polled_undrained.insert(index); - log!( - @ENDPT, - logger, - "RECV poll event {:?} for NET endpoint index {:?}. undrained: {:?}", - &event, - index, - self.net_endpoint_store.polled_undrained.iter() - ); - } - TokenTarget::UdpEndpoint { index } => { - self.udp_endpoint_store.polled_undrained.insert(index); - log!( - @ENDPT, - logger, - "RECV poll event {:?} for UDP endpoint index {:?}. undrained: {:?}", - &event, - index, - self.udp_endpoint_store.polled_undrained.iter() - ); - } - } - } - self.events.clear(); - Ok(()) - } - - // Move all delayed messages to undelayed, making it possible to yield them - pub(super) fn undelay_all(&mut self) { - if self.undelayed_messages.is_empty() { - // fast path - std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages); - return; - } - // slow path - self.undelayed_messages.extend(self.delayed_messages.drain(..)); - } - - // End the synchronous round for the udp endpoints given the round decision - pub(super) fn udp_endpoints_round_end( - &mut self, - logger: &mut dyn Logger, - decision: &Decision, - ) -> Result<(), UnrecoverableSyncError> { - // retain received_from_this_round for use in pseudo_socket_api::recv_from - log!( - logger, - "Ending round for {} udp endpoints", - self.udp_endpoint_store.endpoint_exts.len() - ); - use UnrecoverableSyncError as Use; - if let Decision::Success(solution_predicate) = decision { - // Similar to a native component, we commit the branch of the component - // consistent with the predicate decided upon, making its effects visible - // to the world outside the connector's internals. - // In this case, this takes the form of emptying the component's outbox buffer, - // actually sending payloads 'on the wire' as UDP messages. - for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() { - 'outgoing_loop: for (payload_predicate, payload) in ee.outgoing_payloads.drain() { - if payload_predicate.assigns_subset(solution_predicate) { - ee.sock.send(payload.as_slice()).map_err(|e| { - println!("{:?}", e); - Use::BrokenUdpEndpoint { index } - })?; - log!( - logger, - "Sent payload {:?} with pred {:?} through Udp endpoint {}", - &payload, - &payload_predicate, - index - ); - // send at most one payload per endpoint per round - break 'outgoing_loop; - } - } - ee.received_this_round = false; - } - } - Ok(()) - } -} -impl Debug for NetEndpoint { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - struct DebugStream<'a>(&'a TcpStream); - impl Debug for DebugStream<'_> { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_struct("Endpoint") - .field("local_addr", &self.0.local_addr()) - .field("peer_addr", &self.0.peer_addr()) - .finish() - } - } - f.debug_struct("Endpoint") - .field("inbox", &self.inbox) - .field("stream", &DebugStream(&self.stream)) - .finish() - } -} -impl Into for SetupMsg { - fn into(self) -> Msg { - Msg::SetupMsg(self) - } -} -impl From for ConnectError { - fn from(pape: PollAndPopulateError) -> ConnectError { - use {ConnectError as Ce, PollAndPopulateError as Pape}; - match pape { - Pape::PollFailed => Ce::PollFailed, - Pape::Timeout => Ce::Timeout, - } - } -} -impl From for UnrecoverableSyncError { - fn from(trane: TryRecvAnyNetError) -> UnrecoverableSyncError { - let TryRecvAnyNetError { index, .. } = trane; - UnrecoverableSyncError::BrokenNetEndpoint { index } - } -} diff --git a/src/runtime_old/error.rs b/src/runtime_old/error.rs deleted file mode 100644 index 59bca37f40dc3ddf0db08b6f5c800ded8618b10c..0000000000000000000000000000000000000000 --- a/src/runtime_old/error.rs +++ /dev/null @@ -1,75 +0,0 @@ -use crate::common::*; - -#[derive(Debug)] -pub enum ConnectError { - BindFailed(SocketAddr), - UdpConnectFailed(SocketAddr), - TcpInvalidConnect(SocketAddr), - PollInitFailed, - Timeout, - PollFailed, - AcceptFailed(SocketAddr), - AlreadyConnected, - PortPeerPolarityMismatch(PortId), - NetEndpointSetupError(SocketAddr, NetEndpointError), - SetupAlgMisbehavior, -} -#[derive(Eq, PartialEq, Copy, Clone, Debug)] -pub enum AddComponentError { - DuplicatePort(PortId), - NoSuchModule, - NoSuchComponent, - NonPortTypeParameters, - CannotMovePort(PortId), - WrongNumberOfParamaters { expected: usize }, - UnknownPort(PortId), - WrongPortPolarity { port: PortId, expected_polarity: Polarity }, - DuplicateMovedPort(PortId), -} -//////////////////////// -#[derive(Debug, Clone)] -pub enum UnrecoverableSyncError { - PollFailed, - BrokenNetEndpoint { index: usize }, - BrokenUdpEndpoint { index: usize }, - MalformedStateError(MalformedStateError), -} -#[derive(Debug, Clone)] -pub enum SyncError { - NotConnected, - InconsistentProtoComponent(ComponentId), - RoundFailure, - Unrecoverable(UnrecoverableSyncError), -} -#[derive(Debug, Clone)] -pub enum MalformedStateError { - PortCannotPut(PortId), - GetterUnknownFor { putter: PortId }, -} -#[derive(Debug, Clone)] -pub enum NetEndpointError { - MalformedMessage, - BrokenNetEndpoint, -} -#[derive(Debug)] -pub enum PortOpError { - WrongPolarity, - UnknownPolarity, - NotConnected, - MultipleOpsOnPort, - PortUnavailable, -} -#[derive(Debug, Eq, PartialEq)] -pub enum GottenError { - NoPreviousRound, - PortDidntGet, - PreviousSyncFailed, -} -#[derive(Debug, Eq, PartialEq)] -pub struct WrongStateError; -///////////////////// -impl From for SyncError { - fn from(e: UnrecoverableSyncError) -> Self { - Self::Unrecoverable(e) - } -} diff --git a/src/runtime_old/logging.rs b/src/runtime_old/logging.rs deleted file mode 100644 index 7a97cb4fdc5ea40ca161798a889e2afb24d3c037..0000000000000000000000000000000000000000 --- a/src/runtime_old/logging.rs +++ /dev/null @@ -1,56 +0,0 @@ -use super::*; - -// Used in the loggers' format string -fn secs_since_unix_epoch() -> f64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|dur| dur.as_secs_f64()) - .unwrap_or(0.) -} -impl FileLogger { - pub fn new(connector_id: ConnectorId, file: std::fs::File) -> Self { - Self(connector_id, file) - } -} -impl VecLogger { - pub fn new(connector_id: ConnectorId) -> Self { - Self(connector_id, Default::default()) - } -} -///////////////// -impl Logger for DummyLogger { - fn line_writer(&mut self) -> Option<&mut dyn std::io::Write> { - None - } -} - -impl Logger for VecLogger { - fn line_writer(&mut self) -> Option<&mut dyn std::io::Write> { - let _ = write!(&mut self.1, "CID({}) at {:.6} ", self.0, secs_since_unix_epoch()); - Some(self) - } -} -impl Logger for FileLogger { - fn line_writer(&mut self) -> Option<&mut dyn std::io::Write> { - let _ = write!(&mut self.1, "CID({}) at {:.6} ", self.0, secs_since_unix_epoch()); - Some(&mut self.1) - } -} -/////////////////// -impl Drop for VecLogger { - fn drop(&mut self) { - let stdout = std::io::stderr(); - let mut lock = stdout.lock(); - writeln!(lock, "--- DROP LOG DUMP ---").unwrap(); - let _ = std::io::Write::write(&mut lock, self.1.as_slice()); - } -} -impl std::io::Write for VecLogger { - fn flush(&mut self) -> Result<(), std::io::Error> { - Ok(()) - } - fn write(&mut self, data: &[u8]) -> Result { - self.1.extend_from_slice(data); - Ok(data.len()) - } -} diff --git a/src/runtime_old/mod.rs b/src/runtime_old/mod.rs deleted file mode 100644 index 95bc3e85c6f4df6c9a221764f5eb8ad46745159e..0000000000000000000000000000000000000000 --- a/src/runtime_old/mod.rs +++ /dev/null @@ -1,903 +0,0 @@ -/// cbindgen:ignore -mod communication; -/// cbindgen:ignore -mod endpoints; -pub mod error; -/// cbindgen:ignore -mod logging; -/// cbindgen:ignore -mod setup; - -#[cfg(test)] -mod tests; - -use crate::common::*; -use error::*; -use mio::net::UdpSocket; - -/// The interface between the user's application and a communication session, -/// in which the application plays the part of a (native) component. This structure provides the application -/// with functionality available to all components: the ability to add new channels (port pairs), and to -/// instantiate new components whose definitions are defined in the connector's configured protocol -/// description. Native components have the additional ability to add `dangling' ports backed by local/remote -/// IP addresses, to be coupled with a counterpart once the connector's setup is completed by `connect`. -/// This allows sets of applications to cooperate in constructing shared sessions that span the network. -#[derive(Debug)] -pub struct Connector { - unphased: ConnectorUnphased, - phased: ConnectorPhased, -} - -/// Characterizes a type which can write lines of logging text. -/// The implementations provided in the `logging` module are likely to be sufficient, -/// but for added flexibility, users are able to implement their own loggers for use -/// by connectors. -pub trait Logger: Debug + Send + Sync { - fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>; -} - -/// A logger that appends the logged strings to a growing byte buffer -#[derive(Debug)] -pub struct VecLogger(ConnectorId, Vec); - -/// A trivial logger that always returns None, such that no logging information is ever written. -#[derive(Debug)] -pub struct DummyLogger; - -/// A logger that writes the logged lines to a given file. -#[derive(Debug)] -pub struct FileLogger(ConnectorId, std::fs::File); - -// Interface between protocol state and the connector runtime BEFORE all components -// ave begun their branching speculation. See ComponentState::nonsync_run. -pub(crate) struct NonsyncProtoContext<'a> { - ips: &'a mut IdAndPortState, - logger: &'a mut dyn Logger, - unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase - proto_component_id: ComponentId, // KEY in id->component map -} - -// Interface between protocol state and the connector runtime AFTER all components -// have begun their branching speculation. See ComponentState::sync_run. -pub(crate) struct SyncProtoContext<'a> { - rctx: &'a RoundCtx, - branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch - predicate: &'a Predicate, // KEY in pred->branch map -} - -// The data coupled with a particular protocol component branch, but crucially omitting -// the `ComponentState` such that this may be passed by reference to the state with separate -// access control. -#[derive(Default, Debug, Clone)] -struct ProtoComponentBranchInner { - did_put_or_get: HashSet, - inbox: HashMap, -} - -// A speculative variable that lives for the duration of the synchronous round. -// Each is assigned a value in domain `SpecVal`. -#[derive( - Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] -struct SpecVar(PortId); - -// The codomain of SpecVal. Has two associated constants for values FIRING and SILENT, -// but may also enumerate many more values to facilitate finer-grained nondeterministic branching. -#[derive( - Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] -struct SpecVal(u16); - -// Data associated with a successful synchronous round, retained afterwards such that the -// native component can freely reflect on how it went, reading the messages received at their -// inputs, and reflecting on which of their connector's synchronous batches succeeded. -#[derive(Debug)] -struct RoundEndedNative { - batch_index: usize, - gotten: HashMap, -} - -// Implementation of a set in terms of a vector (optimized for reading, not writing) -#[derive(Default)] -struct VecSet { - // invariant: ordered, deduplicated - vec: Vec, -} - -// Allows a connector to remember how to forward payloads towards the component that -// owns their destination port. `LocalComponent` corresponds with messages for components -// managed by the connector itself (hinting for it to look it up in a local structure), -// whereas the other variants direct the connector to forward the messages over the network. -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -enum Route { - LocalComponent, - NetEndpoint { index: usize }, - UdpEndpoint { index: usize }, -} - -// The outcome of a synchronous round, representing the distributed consensus. -// In the success case, the attached predicate encodes a row in the session's trace table. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -enum Decision { - Failure, // some connector timed out! - Success(Predicate), -} - -// The type of control messages exchanged between connectors over the network -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum Msg { - SetupMsg(SetupMsg), - CommMsg(CommMsg), -} - -// Control messages exchanged during the setup phase only -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum SetupMsg { - MyPortInfo(MyPortInfo), - LeaderWave { wave_leader: ConnectorId }, - LeaderAnnounce { tree_leader: ConnectorId }, - YouAreMyParent, -} - -// Control message particular to the communication phase. -// as such, it's annotated with a round_index -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct CommMsg { - round_index: usize, - contents: CommMsgContents, -} - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum CommMsgContents { - SendPayload(SendPayloadMsg), - CommCtrl(CommCtrlMsg), -} - -// Connector <-> connector control messages for use in the communication phase -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -enum CommCtrlMsg { - Suggest { suggestion: Decision }, // child->parent - Announce { decision: Decision }, // parent->child -} - -// Speculative payload message, communicating the value for the given -// port's message predecated on the given speculative variable assignments. -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct SendPayloadMsg { - predicate: Predicate, - payload: Payload, -} - -// Return result of `Predicate::assignment_union`, communicating the contents -// of the predicate which represents the (consistent) union of their mappings, -// if it exists (no variable mapped distinctly by the input predicates) -#[derive(Debug, PartialEq)] -enum AssignmentUnionResult { - FormerNotLatter, - LatterNotFormer, - Equivalent, - New(Predicate), - Nonexistant, -} - -// One of two endpoints for a control channel with a connector on either end. -// The underlying transport is TCP, so we use an inbox buffer to allow -// discrete payload receipt. -struct NetEndpoint { - inbox: Vec, - stream: TcpStream, -} - -// Datastructure used during the setup phase representing a NetEndpoint TO BE SETUP -#[derive(Debug, Clone)] -struct NetEndpointSetup { - getter_for_incoming: PortId, - sock_addr: SocketAddr, - endpoint_polarity: EndpointPolarity, -} - -// Datastructure used during the setup phase representing a UdpEndpoint TO BE SETUP -#[derive(Debug, Clone)] -struct UdpEndpointSetup { - getter_for_incoming: PortId, - local_addr: SocketAddr, - peer_addr: SocketAddr, -} - -// NetEndpoint annotated with the ID of the port that receives payload -// messages received through the endpoint. This approach assumes that NetEndpoints -// DO NOT multiplex port->port channels, and so a mapping such as this is possible. -// As a result, the messages themselves don't need to carry the PortID with them. -#[derive(Debug)] -struct NetEndpointExt { - net_endpoint: NetEndpoint, - getter_for_incoming: PortId, -} - -// Endpoint for a "raw" UDP endpoint. Corresponds to the "Udp Mediator Component" -// described in the literature. -// It acts as an endpoint by receiving messages via the poller etc. (managed by EndpointManager), -// It acts as a native component by managing a (speculative) set of payload messages (an outbox, -// protecting the peer on the other side of the network). -#[derive(Debug)] -struct UdpEndpointExt { - sock: UdpSocket, // already bound and connected - received_this_round: bool, - outgoing_payloads: HashMap, - getter_for_incoming: PortId, -} - -// Meta-data for the connector: its role in the consensus tree. -#[derive(Debug)] -struct Neighborhood { - parent: Option, - children: VecSet, -} - -// Manages the connector's ID, and manages allocations for connector/port IDs. -#[derive(Debug, Clone)] -struct IdManager { - connector_id: ConnectorId, - port_suffix_stream: U32Stream, - component_suffix_stream: U32Stream, -} - -// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams. -struct IoByteBuffer { - byte_vec: Vec, -} - -// A generator of speculative variables. Created on-demand during the synchronous round -// by the IdManager. -#[derive(Debug)] -struct SpecVarStream { - connector_id: ConnectorId, - port_suffix_stream: U32Stream, -} - -// Manages the messy state of the various endpoints, pollers, buffers, etc. -#[derive(Debug)] -struct EndpointManager { - // invariants: - // 1. net and udp endpoints are registered with poll with tokens computed with TargetToken::into - // 2. Events is empty - poll: Poll, - events: Events, - delayed_messages: Vec<(usize, Msg)>, - undelayed_messages: Vec<(usize, Msg)>, // ready to yield - net_endpoint_store: EndpointStore, - udp_endpoint_store: EndpointStore, - io_byte_buffer: IoByteBuffer, -} - -// A storage of endpoints, which keeps track of which components have raised -// an event during poll(), signifying that they need to be checked for new incoming data -#[derive(Debug)] -struct EndpointStore { - endpoint_exts: Vec, - polled_undrained: VecSet, -} - -// The information associated with a port identifier, designed for local storage. -#[derive(Clone, Debug)] -struct PortInfo { - owner: ComponentId, - peer: Option, - polarity: Polarity, - route: Route, -} - -// Similar to `PortInfo`, but designed for communication during the setup procedure. -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -struct MyPortInfo { - polarity: Polarity, - port: PortId, - owner: ComponentId, -} - -// Newtype around port info map, allowing the implementation of some -// useful methods -#[derive(Default, Debug, Clone)] -struct PortInfoMap { - // invariant: self.invariant_preserved() - // `owned` is redundant information, allowing for fast lookup - // of a component's owned ports (which occurs during the sync round a lot) - map: HashMap, - owned: HashMap>, -} - -// A convenient substructure for containing port info and the ID manager. -// Houses the bulk of the connector's persistent state between rounds. -// It turns out several situations require access to both things. -#[derive(Debug, Clone)] -struct IdAndPortState { - port_info: PortInfoMap, - id_manager: IdManager, -} - -// A component's setup-phase-specific data -#[derive(Debug)] -struct ConnectorCommunication { - round_index: usize, - endpoint_manager: EndpointManager, - neighborhood: Neighborhood, - native_batches: Vec, - round_result: Result, SyncError>, -} - -// A component's data common to both setup and communication phases -#[derive(Debug)] -struct ConnectorUnphased { - proto_description: Arc, - proto_components: HashMap, - logger: Box, - ips: IdAndPortState, - native_component_id: ComponentId, -} - -// A connector's phase-specific data -#[derive(Debug)] -enum ConnectorPhased { - Setup(Box), - Communication(Box), -} - -// A connector's setup-phase-specific data -#[derive(Debug)] -struct ConnectorSetup { - net_endpoint_setups: Vec, - udp_endpoint_setups: Vec, -} - -// A newtype wrapper for a map from speculative variable to speculative value -// A missing mapping corresponds with "unspecified". -#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -struct Predicate { - assigned: BTreeMap, -} - -// Identifies a child of this connector in the _solution tree_. -// Each connector creates its own local solutions for the consensus procedure during `sync`, -// from the solutions of its children. Those children are either locally-managed components, -// (which are leaves in the solution tree), or other connectors reachable through the given -// network endpoint (which are internal nodes in the solution tree). -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -enum SubtreeId { - LocalComponent(ComponentId), - NetEndpoint { index: usize }, -} - -// An accumulation of the connector's knowledge of all (a) the local solutions its children -// in the solution tree have found, and (b) its own solutions derivable from those of its children. -// This structure starts off each round with an empty set, and accumulates solutions as they are found -// by local components, or received over the network in control messages. -// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to -// say that these sets GROW until the round is over, and all solutions are reset. -#[derive(Debug)] -struct SolutionStorage { - // invariant: old_local U new_local solutions are those that can be created from - // the UNION of one element from each set in `subtree_solution`. - // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated. - old_local: HashSet, // already sent to this connector's parent OR decided - new_local: HashSet, // not yet sent to this connector's parent OR decided - // this pair acts as SubtreeId -> HashSet which is friendlier to iteration - subtree_solutions: Vec>, - subtree_id_to_index: HashMap, -} - -// Stores the transient data of a synchronous round. -// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of -// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx, -// and can be undone if the round fails. -struct RoundCtx { - solution_storage: SolutionStorage, - spec_var_stream: SpecVarStream, - payload_inbox: Vec<(PortId, SendPayloadMsg)>, - deadline: Option, - ips: IdAndPortState, -} - -// A trait intended to limit the access of the ConnectorUnphased structure -// such that we don't accidentally modify any important component/port data -// while the results of the round are undecided. Why? Any actions during Connector::sync -// are _speculative_ until the round is decided, and we need a safe way of rolling -// back any changes. -trait CuUndecided { - fn logger(&mut self) -> &mut dyn Logger; - fn proto_description(&self) -> &ProtocolDescription; - fn native_component_id(&self) -> ComponentId; - fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription); - fn logger_and_protocol_components( - &mut self, - ) -> (&mut dyn Logger, &mut HashMap); -} - -// Represents a set of synchronous port operations that the native component -// has described as an "option" for completing during the synchronous rounds. -// Operations contained here succeed together or not at all. -// A native with N=2+ batches are expressing an N-way nondeterministic choice -#[derive(Debug, Default)] -struct NativeBatch { - // invariant: putters' and getters' polarities respected - to_put: HashMap, - to_get: HashSet, -} - -// Parallels a mio::Token type, but more clearly communicates -// the way it identifies the evented structre it corresponds to. -// See runtime/setup for methods converting between TokenTarget and mio::Token -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -enum TokenTarget { - NetEndpoint { index: usize }, - UdpEndpoint { index: usize }, -} - -// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened, -// such that it can know when to continue polling, and when to block. -enum CommRecvOk { - TimeoutWithoutNew, - NewPayloadMsgs, - NewControlMsg { net_index: usize, msg: CommCtrlMsg }, -} -//////////////// -fn err_would_block(err: &std::io::Error) -> bool { - err.kind() == std::io::ErrorKind::WouldBlock -} -impl VecSet { - fn new(mut vec: Vec) -> Self { - // establish the invariant - vec.sort(); - vec.dedup(); - Self { vec } - } - fn contains(&self, element: &T) -> bool { - self.vec.binary_search(element).is_ok() - } - // Insert the given element. Returns whether it was already present. - fn insert(&mut self, element: T) -> bool { - match self.vec.binary_search(&element) { - Ok(_) => false, - Err(index) => { - self.vec.insert(index, element); - true - } - } - } - fn iter(&self) -> std::slice::Iter { - self.vec.iter() - } - fn pop(&mut self) -> Option { - self.vec.pop() - } -} -impl PortInfoMap { - fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator { - self.owned.get(&owner).into_iter().flat_map(HashSet::iter) - } - fn spec_var_for(&self, port: PortId) -> SpecVar { - // Every port maps to a speculative variable - // Two distinct ports map to the same variable - // IFF they are two ends of the same logical channel. - let info = self.map.get(&port).unwrap(); - SpecVar(match info.polarity { - Getter => port, - Putter => info.peer.unwrap(), - }) - } - fn invariant_preserved(&self) -> bool { - // for every port P with some owner O, - // P is in O's owned set - for (port, info) in self.map.iter() { - match self.owned.get(&info.owner) { - Some(set) if set.contains(port) => {} - _ => { - println!("{:#?}\n WITH port {:?}", self, port); - return false; - } - } - } - // for every port P owned by every owner O, - // P's owner is O - for (&owner, set) in self.owned.iter() { - for port in set { - match self.map.get(port) { - Some(info) if info.owner == owner => {} - _ => { - println!("{:#?}\n WITH owner {:?} port {:?}", self, owner, port); - return false; - } - } - } - } - true - } -} -impl SpecVarStream { - fn next(&mut self) -> SpecVar { - let phantom_port: PortId = - Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() } - .into(); - SpecVar(phantom_port) - } -} -impl IdManager { - fn new(connector_id: ConnectorId) -> Self { - Self { - connector_id, - port_suffix_stream: Default::default(), - component_suffix_stream: Default::default(), - } - } - fn new_spec_var_stream(&self) -> SpecVarStream { - // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N. - // This gap is entirely unnecessary (i.e. 0 is fine) - // It's purpose is only to make SpecVars easier to spot in logs. - // E.g. spot the spec var: { v0_0, v1_2, v1_103 } - const SKIP_N: u32 = 100; - let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N); - SpecVarStream { connector_id: self.connector_id, port_suffix_stream } - } - fn new_port_id(&mut self) -> PortId { - Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into() - } - fn new_component_id(&mut self) -> ComponentId { - Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() } - .into() - } -} -impl Drop for Connector { - fn drop(&mut self) { - log!(self.unphased.logger(), "Connector dropping. Goodbye!"); - } -} -// Given a slice of ports, return the first, if any, port is present repeatedly -fn duplicate_port(slice: &[PortId]) -> Option { - let mut vec = Vec::with_capacity(slice.len()); - for port in slice.iter() { - match vec.binary_search(port) { - Err(index) => vec.insert(index, *port), - Ok(_) => return Some(*port), - } - } - None -} -impl Connector { - /// Generate a random connector identifier from the system's source of randomness. - pub fn random_id() -> ConnectorId { - type Bytes8 = [u8; std::mem::size_of::()]; - unsafe { - let mut bytes = std::mem::MaybeUninit::::uninit(); - // getrandom is the canonical crate for a small, secure rng - getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap(); - // safe! representations of all valid Byte8 values are valid ConnectorId values - std::mem::transmute::<_, _>(bytes.assume_init()) - } - } - - /// Returns true iff the connector is in connected state, i.e., it's setup phase is complete, - /// and it is ready to participate in synchronous rounds of communication. - pub fn is_connected(&self) -> bool { - // If designed for Rust usage, connectors would be exposed as an enum type from the start. - // consequently, this "phased" business would also include connector variants and this would - // get a lot closer to the connector impl. itself. - // Instead, the C-oriented implementation doesn't distinguish connector states as types, - // and distinguish them as enum variants instead - match self.phased { - ConnectorPhased::Setup(..) => false, - ConnectorPhased::Communication(..) => true, - } - } - - /// Enables the connector's current logger to be swapped out for another - pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { - std::mem::swap(&mut self.unphased.logger, &mut new_logger); - new_logger - } - - /// Access the connector's current logger - pub fn get_logger(&mut self) -> &mut dyn Logger { - &mut *self.unphased.logger - } - - /// Create a new synchronous channel, returning its ends as a pair of ports, - /// with polarity output, input respectively. Available during either setup/communication phase. - /// # Panics - /// This function panics if the connector's (large) port id space is exhausted. - pub fn new_port_pair(&mut self) -> [PortId; 2] { - let cu = &mut self.unphased; - // adds two new associated ports, related to each other, and exposed to the native - let mut new_cid = || cu.ips.id_manager.new_port_id(); - // allocate two fresh port identifiers - let [o, i] = [new_cid(), new_cid()]; - // store info for each: - // - they are each others' peers - // - they are owned by a local component with id `cid` - // - polarity putter, getter respectively - cu.ips.port_info.map.insert( - o, - PortInfo { - route: Route::LocalComponent, - peer: Some(i), - owner: cu.native_component_id, - polarity: Putter, - }, - ); - cu.ips.port_info.map.insert( - i, - PortInfo { - route: Route::LocalComponent, - peer: Some(o), - owner: cu.native_component_id, - polarity: Getter, - }, - ); - cu.ips - .port_info - .owned - .entry(cu.native_component_id) - .or_default() - .extend([o, i].iter().copied()); - - log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); - [o, i] - } - - /// Instantiates a new component for the connector runtime to manage, and passing - /// the given set of ports from the interface of the native component, to that of the - /// newly created component (passing their ownership). - /// # Errors - /// Error is returned if the moved ports are not owned by the native component, - /// if the given component name is not defined in the connector's protocol, - /// the given sequence of ports contains a duplicate port, - /// or if the component is unfit for instantiation with the given port sequence. - /// # Panics - /// This function panics if the connector's (large) component id space is exhausted. - pub fn add_component( - &mut self, - module_name: &[u8], - identifier: &[u8], - ports: &[PortId], - ) -> Result<(), AddComponentError> { - // Check for error cases first before modifying `cu` - use AddComponentError as Ace; - let cu = &self.unphased; - if let Some(port) = duplicate_port(ports) { - return Err(Ace::DuplicatePort(port)); - } - let expected_polarities = cu.proto_description.component_polarities(module_name, identifier)?; - if expected_polarities.len() != ports.len() { - return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() }); - } - for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) { - let info = cu.ips.port_info.map.get(&port).ok_or(Ace::UnknownPort(port))?; - if info.owner != cu.native_component_id { - return Err(Ace::UnknownPort(port)); - } - if info.polarity != expected_polarity { - return Err(Ace::WrongPortPolarity { port, expected_polarity }); - } - } - // No errors! Time to modify `cu` - // create a new component and identifier - let Connector { phased, unphased: cu } = self; - let new_cid = cu.ips.id_manager.new_component_id(); - cu.proto_components.insert(new_cid, cu.proto_description.new_component(module_name, identifier, ports)); - // update the ownership of moved ports - for port in ports.iter() { - match cu.ips.port_info.map.get_mut(port) { - Some(port_info) => port_info.owner = new_cid, - None => unreachable!(), - } - } - if let Some(set) = cu.ips.port_info.owned.get_mut(&cu.native_component_id) { - set.retain(|x| !ports.contains(x)); - } - let moved_port_set: HashSet = ports.iter().copied().collect(); - if let ConnectorPhased::Communication(comm) = phased { - // Preserve invariant: batches only reason about native's ports. - // Remove batch puts/gets for moved ports. - for batch in comm.native_batches.iter_mut() { - batch.to_put.retain(|port, _| !moved_port_set.contains(port)); - batch.to_get.retain(|port| !moved_port_set.contains(port)); - } - } - cu.ips.port_info.owned.insert(new_cid, moved_port_set); - Ok(()) - } -} -impl Predicate { - #[inline] - pub fn singleton(k: SpecVar, v: SpecVal) -> Self { - Self::default().inserted(k, v) - } - #[inline] - pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self { - self.assigned.insert(k, v); - self - } - - // Return true whether `self` is a subset of `maybe_superset` - pub fn assigns_subset(&self, maybe_superset: &Self) -> bool { - for (var, val) in self.assigned.iter() { - match maybe_superset.assigned.get(var) { - Some(val2) if val2 == val => {} - _ => return false, // var unmapped, or mapped differently - } - } - // `maybe_superset` mirrored all my assignments! - true - } - - /// Given the two predicates {self, other}, return that whose - /// assignments are the union of those of both. - fn assignment_union(&self, other: &Self) -> AssignmentUnionResult { - use AssignmentUnionResult as Aur; - // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. - let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; - let [mut s, mut o] = [s_it.next(), o_it.next()]; - // populate lists of assignments in self but not other and vice versa. - // do this by incrementally unfolding the iterators, keeping an eye - // on the ordering between the head elements [s, o]. - // whenever s break, // both iterators are empty - [None, Some(x)] => { - // self's iterator is empty. - // all remaning elements are in other but not self - o_not_s.push(x); - o_not_s.extend(o_it); - break; - } - [Some(x), None] => { - // other's iterator is empty. - // all remaning elements are in self but not other - s_not_o.push(x); - s_not_o.extend(s_it); - break; - } - [Some((sid, sb)), Some((oid, ob))] => { - if sid < oid { - // o is missing this element - s_not_o.push((sid, sb)); - s = s_it.next(); - } else if sid > oid { - // s is missing this element - o_not_s.push((oid, ob)); - o = o_it.next(); - } else if sb != ob { - assert_eq!(sid, oid); - // both predicates assign the variable but differ on the value - // No predicate exists which satisfies both! - return Aur::Nonexistant; - } else { - // both predicates assign the variable to the same value - s = s_it.next(); - o = o_it.next(); - } - } - } - } - // Observed zero inconsistencies. A unified predicate exists... - match [s_not_o.is_empty(), o_not_s.is_empty()] { - [true, true] => Aur::Equivalent, // ... equivalent to both. - [false, true] => Aur::FormerNotLatter, // ... equivalent to self. - [true, false] => Aur::LatterNotFormer, // ... equivalent to other. - [false, false] => { - // ... which is the union of the predicates' assignments but - // is equivalent to neither self nor other. - let mut new = self.clone(); - for (&id, &b) in o_not_s { - new.assigned.insert(id, b); - } - Aur::New(new) - } - } - } - - // Compute the union of the assignments of the two given predicates, if it exists. - // It doesn't exist if there is some value which the predicates assign to different values. - pub(crate) fn union_with(&self, other: &Self) -> Option { - let mut res = self.clone(); - for (&channel_id, &assignment_1) in other.assigned.iter() { - match res.assigned.insert(channel_id, assignment_1) { - Some(assignment_2) if assignment_1 != assignment_2 => return None, - _ => {} - } - } - Some(res) - } - pub(crate) fn query(&self, var: SpecVar) -> Option { - self.assigned.get(&var).copied() - } -} - -impl RoundCtx { - // remove an arbitrary buffered message, along with the ID of the getter who receives it - fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { - self.payload_inbox.pop() - } - - // buffer a message along with the ID of the getter who receives it - fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) { - self.payload_inbox.push((getter, msg)); - } - - // buffer a message along with the ID of the putter who sent it - fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) { - if let Some(getter) = self.ips.port_info.map.get(&putter).unwrap().peer { - log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter); - self.getter_push(getter, msg); - } else { - log!(cu.logger(), "Putter {:?} has no known peer!", putter); - panic!("Putter {:?} has no known peer!", putter); - } - } -} - -impl Debug for VecSet { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_set().entries(self.vec.iter()).finish() - } -} -impl Debug for Predicate { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - struct Assignment<'a>((&'a SpecVar, &'a SpecVal)); - impl Debug for Assignment<'_> { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{:?}={:?}", (self.0).0, (self.0).1) - } - } - f.debug_set().entries(self.assigned.iter().map(Assignment)).finish() - } -} -impl IdParts for SpecVar { - fn id_parts(self) -> (ConnectorId, U32Suffix) { - self.0.id_parts() - } -} -impl Debug for SpecVar { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let (a, b) = self.id_parts(); - write!(f, "v{}_{}", a, b) - } -} -impl SpecVal { - const FIRING: Self = SpecVal(1); - const SILENT: Self = SpecVal(0); - fn is_firing(self) -> bool { - self == Self::FIRING - // all else treated as SILENT - } - fn iter_domain() -> impl Iterator { - (0..).map(SpecVal) - } -} -impl Debug for SpecVal { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - self.0.fmt(f) - } -} -impl Default for IoByteBuffer { - fn default() -> Self { - let mut byte_vec = Vec::with_capacity(Self::CAPACITY); - unsafe { - // safe! this vector is guaranteed to have sufficient capacity - byte_vec.set_len(Self::CAPACITY); - } - Self { byte_vec } - } -} -impl IoByteBuffer { - const CAPACITY: usize = u16::MAX as usize + 1000; - fn as_mut_slice(&mut self) -> &mut [u8] { - self.byte_vec.as_mut_slice() - } -} - -impl Debug for IoByteBuffer { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "IoByteBuffer") - } -} diff --git a/src/runtime_old/setup.rs b/src/runtime_old/setup.rs deleted file mode 100644 index 596c6e14d5343322de78c2c95eec4eda548bd4d5..0000000000000000000000000000000000000000 --- a/src/runtime_old/setup.rs +++ /dev/null @@ -1,879 +0,0 @@ -use crate::common::*; -use crate::runtime::*; - -impl TokenTarget { - // subdivides the domain of usize into - // [NET_ENDPOINT][UDP_ENDPOINT ] - // ^0 ^usize::MAX/2 ^usize::MAX - const HALFWAY_INDEX: usize = usize::MAX / 2; -} -impl From for TokenTarget { - fn from(Token(index): Token) -> Self { - if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { - TokenTarget::UdpEndpoint { index: shifted } - } else { - TokenTarget::NetEndpoint { index } - } - } -} -impl Into for TokenTarget { - fn into(self) -> Token { - match self { - TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), - TokenTarget::NetEndpoint { index } => Token(index), - } - } -} -impl Connector { - /// Create a new connector structure with the given protocol description (via Arc to facilitate sharing). - /// The resulting connector will start in the setup phase, and cannot be used for communication until the - /// `connect` procedure completes. - /// # Safety - /// The correctness of the system's underlying distributed algorithms requires that no two - /// connectors have the same ID. If the user does not know the identifiers of other connectors in the - /// system, it is advised to guess it using Connector::random_id (relying on the exceptionally low probability of an error). - /// Sessions with duplicate connector identifiers will not result in any memory unsafety, but cannot be guaranteed - /// to preserve their configured protocols. - /// Fortunately, in most realistic cases, the presence of duplicate connector identifiers will result in an - /// error during `connect`, observed as a peer misbehaving. - pub fn new( - mut logger: Box, - proto_description: Arc, - connector_id: ConnectorId, - ) -> Self { - log!(&mut *logger, "Created with connector_id {:?}", connector_id); - let mut id_manager = IdManager::new(connector_id); - let native_component_id = id_manager.new_component_id(); - Self { - unphased: ConnectorUnphased { - proto_description, - proto_components: Default::default(), - logger, - native_component_id, - ips: IdAndPortState { id_manager, port_info: Default::default() }, - }, - phased: ConnectorPhased::Setup(Box::new(ConnectorSetup { - net_endpoint_setups: Default::default(), - udp_endpoint_setups: Default::default(), - })), - } - } - - /// Conceptually, this returning [p0, g1] is sugar for: - /// 1. create port pair [p0, g0] - /// 2. create port pair [p1, g1] - /// 3. create udp component with interface of moved ports [p1, g0] - /// 4. return [p0, g1] - pub fn new_udp_mediator_component( - &mut self, - local_addr: SocketAddr, - peer_addr: SocketAddr, - ) -> Result<[PortId; 2], WrongStateError> { - let Self { unphased: cu, phased } = self; - match phased { - ConnectorPhased::Communication(..) => Err(WrongStateError), - ConnectorPhased::Setup(setup) => { - let udp_index = setup.udp_endpoint_setups.len(); - let udp_cid = cu.ips.id_manager.new_component_id(); - // allocates 4 new port identifiers, two for each logical channel, - // one channel per direction (into and out of the component) - let mut npid = || cu.ips.id_manager.new_port_id(); - let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()]; - // allocate the native->udp_mediator channel's ports - cu.ips.port_info.map.insert( - nout, - PortInfo { - route: Route::LocalComponent, - polarity: Putter, - peer: Some(uin), - owner: cu.native_component_id, - }, - ); - cu.ips.port_info.map.insert( - uin, - PortInfo { - route: Route::UdpEndpoint { index: udp_index }, - polarity: Getter, - peer: Some(uin), - owner: udp_cid, - }, - ); - // allocate the udp_mediator->native channel's ports - cu.ips.port_info.map.insert( - uout, - PortInfo { - route: Route::UdpEndpoint { index: udp_index }, - polarity: Putter, - peer: Some(uin), - owner: udp_cid, - }, - ); - cu.ips.port_info.map.insert( - nin, - PortInfo { - route: Route::LocalComponent, - polarity: Getter, - peer: Some(uout), - owner: cu.native_component_id, - }, - ); - // allocate the two ports owned by the UdpMediator component - // Remember to setup this UdpEndpoint setup during `connect` later. - setup.udp_endpoint_setups.push(UdpEndpointSetup { - local_addr, - peer_addr, - getter_for_incoming: nin, - }); - - // update owned sets - cu.ips - .port_info - .owned - .entry(cu.native_component_id) - .or_default() - .extend([nin, nout].iter().copied()); - cu.ips.port_info.owned.insert(udp_cid, maplit::hashset! {uin, uout}); - // Return the native's output, input port pair - Ok([nout, nin]) - } - } - } - - /// Adds a "dangling" port to the connector in the setup phase, - /// to be formed into channel during the connect procedure with the given - /// transport layer information. - pub fn new_net_port( - &mut self, - polarity: Polarity, - sock_addr: SocketAddr, - endpoint_polarity: EndpointPolarity, - ) -> Result { - let Self { unphased: cu, phased } = self; - match phased { - ConnectorPhased::Communication(..) => Err(WrongStateError), - ConnectorPhased::Setup(setup) => { - // allocate a single dangling port with a `None` peer (for now) - let new_pid = cu.ips.id_manager.new_port_id(); - cu.ips.port_info.map.insert( - new_pid, - PortInfo { - route: Route::LocalComponent, - peer: None, - owner: cu.native_component_id, - polarity, - }, - ); - log!( - cu.logger, - "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}", - new_pid, - polarity, - &sock_addr, - endpoint_polarity - ); - // Remember to setup this NetEndpoint setup during `connect` later. - setup.net_endpoint_setups.push(NetEndpointSetup { - sock_addr, - endpoint_polarity, - getter_for_incoming: new_pid, - }); - // update owned set - cu.ips.port_info.owned.entry(cu.native_component_id).or_default().insert(new_pid); - Ok(new_pid) - } - } - } - - /// Finalizes the connector's setup procedure and forms a distributed system with - /// all other connectors reachable through network channels. This procedure represents - /// a synchronization barrier, and upon successful return, the connector can no longer add new network ports, - /// but is ready to begin the first communication round. - /// Initially, the connector has a singleton set of _batches_, the only element of which is empty. - /// This single element starts off selected. The selected batch is modified with `put` and `get`, - /// and new batches are added and selected with `next_batch`. See `sync` for an explanation of the - /// purpose of these batches. - pub fn connect(&mut self, timeout: Option) -> Result<(), ConnectError> { - use ConnectError as Ce; - let Self { unphased: cu, phased } = self; - match &phased { - ConnectorPhased::Communication { .. } => { - log!(cu.logger, "Call to connecting in connected state"); - Err(Ce::AlreadyConnected) - } - ConnectorPhased::Setup(setup) => { - // Idea: Clone `self.unphased`, and then pass the replica to - // `connect_inner` to do the work, attempting to create a new connector structure - // in connected state without encountering any errors. - // If anything goes wrong during `connect_inner`, we simply keep the original `cu`. - - // Ideally, we'd simply clone `cu` in its entirety. - // However, it isn't clonable, because of the pesky logger. - // Solution: the original and clone ConnectorUnphased structures - // 'share' the original logger by using `mem::swap` strategically to pass a dummy back and forth, - // such that the real logger is wherever we need it to be without violating any invariants. - let mut cu_clone = ConnectorUnphased { - logger: Box::new(DummyLogger), - proto_components: cu.proto_components.clone(), - native_component_id: cu.native_component_id.clone(), - ips: cu.ips.clone(), - proto_description: cu.proto_description.clone(), - }; - // cu has REAL logger... - std::mem::swap(&mut cu.logger, &mut cu_clone.logger); - // ... cu_clone has REAL logger. - match Self::connect_inner(cu_clone, setup, timeout) { - Ok(connected_connector) => { - *self = connected_connector; - Ok(()) - } - Err((err, mut logger)) => { - // Put the original logger back in place (in self.unphased, AKA `cu`). - // cu_clone has REAL logger... - std::mem::swap(&mut cu.logger, &mut logger); - // ... cu has REAL logger. - Err(err) - } - } - } - } - } - - // Given an immutable setup structure, and my own (cloned) ConnetorUnphased, - // attempt to complete the setup procedure and return a new connector in Connected state. - // If anything goes wrong, throw everything in the bin, except for the Logger, which is - // the only structure that sees lasting effects of the failed attempt. - fn connect_inner( - mut cu: ConnectorUnphased, - setup: &ConnectorSetup, - timeout: Option, - ) -> Result)> { - log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); - let deadline = timeout.map(|to| Instant::now() + to); - // `try_complete` is a helper function, which DOES NOT own `cu`, and returns ConnectError on err. - // This outer function takes its output and wraps it alongside `cu` (which it owns) - // as appropriate for Err(...) and OK(...) cases. - let mut try_complete = || { - // connect all endpoints in parallel; send and receive peer ids through ports - let mut endpoint_manager = setup_endpoints_and_pair_ports( - &mut *cu.logger, - &setup.net_endpoint_setups, - &setup.udp_endpoint_setups, - &mut cu.ips.port_info, - &deadline, - )?; - log!( - cu.logger, - "Successfully connected {} endpoints. info now {:#?} {:#?}", - endpoint_manager.net_endpoint_store.endpoint_exts.len(), - &cu.ips.port_info, - &endpoint_manager, - ); - // leader election and tree construction. Learn our role in the consensus tree, - // from learning who are our children/parents (neighbors) in the consensus tree. - let neighborhood = init_neighborhood( - cu.ips.id_manager.connector_id, - &mut *cu.logger, - &mut endpoint_manager, - &deadline, - )?; - log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); - // Put it all together with an initial round index of zero. - let comm = ConnectorCommunication { - round_index: 0, - endpoint_manager, - neighborhood, - native_batches: vec![Default::default()], - round_result: Ok(None), // no previous round yet - }; - log!(cu.logger, "connect() finished. setup phase complete"); - Ok(comm) - }; - match try_complete() { - Ok(comm) => { - Ok(Self { unphased: cu, phased: ConnectorPhased::Communication(Box::new(comm)) }) - } - Err(err) => Err((err, cu.logger)), - } - } -} - -// Given a set of net_ and udp_ endpoints to setup, -// port information to flesh out (by discovering peers through channels) -// and a deadline in which to do it, -// try to return: -// - An EndpointManager, containing all the set up endpoints -// - new information about ports acquired through the newly-created channels -fn setup_endpoints_and_pair_ports( - logger: &mut dyn Logger, - net_endpoint_setups: &[NetEndpointSetup], - udp_endpoint_setups: &[UdpEndpointSetup], - port_info: &mut PortInfoMap, - deadline: &Option, -) -> Result { - use ConnectError as Ce; - const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); - const RETRY_PERIOD: Duration = Duration::from_millis(200); - - // The data for a net endpoint's setup in progress - struct NetTodo { - // becomes completed once sent_local_port && recv_peer_port.is_some() - // we send local port if we haven't already and we receive a writable event - // we recv peer port if we haven't already and we receive a readbale event - todo_endpoint: NetTodoEndpoint, - endpoint_setup: NetEndpointSetup, - sent_local_port: bool, // true <-> I've sent my local port - recv_peer_port: Option, // Some(..) <-> I've received my peer's port - } - - // The data for a udp endpoint's setup in progress - struct UdpTodo { - // becomes completed once we receive our first writable event - getter_for_incoming: PortId, - sock: UdpSocket, - } - - // Substructure of `NetTodo`, which represents the endpoint itself - enum NetTodoEndpoint { - Accepting(TcpListener), // awaiting it's peer initiating the connection - PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel - } - //////////////////////////////////////////// - - // Start to construct our return values - let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; - let mut events = - Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4); - let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()]; - let mut delayed_messages = vec![]; - let mut last_retry_at = Instant::now(); - let mut io_byte_buffer = IoByteBuffer::default(); - - // Create net/udp todo structures, each already registered with poll - let mut net_todos = net_endpoint_setups - .iter() - .enumerate() - .map(|(index, endpoint_setup)| { - let token = TokenTarget::NetEndpoint { index }.into(); - log!(logger, "Net endpoint {} beginning setup with {:?}", index, &endpoint_setup); - let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity { - let mut stream = TcpStream::connect(endpoint_setup.sock_addr) - .map_err(|_| Ce::TcpInvalidConnect(endpoint_setup.sock_addr))?; - poll.registry().register(&mut stream, token, BOTH).unwrap(); - NetTodoEndpoint::PeerInfoRecving(NetEndpoint { stream, inbox: vec![] }) - } else { - let mut listener = TcpListener::bind(endpoint_setup.sock_addr) - .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?; - poll.registry().register(&mut listener, token, BOTH).unwrap(); - NetTodoEndpoint::Accepting(listener) - }; - Ok(NetTodo { - todo_endpoint, - sent_local_port: false, - recv_peer_port: None, - endpoint_setup: endpoint_setup.clone(), - }) - }) - .collect::, ConnectError>>()?; - let udp_todos = udp_endpoint_setups - .iter() - .enumerate() - .map(|(index, endpoint_setup)| { - let mut sock = UdpSocket::bind(endpoint_setup.local_addr) - .map_err(|_| Ce::BindFailed(endpoint_setup.local_addr))?; - sock.connect(endpoint_setup.peer_addr) - .map_err(|_| Ce::UdpConnectFailed(endpoint_setup.peer_addr))?; - poll.registry() - .register(&mut sock, TokenTarget::UdpEndpoint { index }.into(), Interest::WRITABLE) - .unwrap(); - Ok(UdpTodo { sock, getter_for_incoming: endpoint_setup.getter_for_incoming }) - }) - .collect::, ConnectError>>()?; - - // Initially no net connections have failed, and all udp and net endpoint setups are incomplete - let mut net_connect_to_retry: HashSet = Default::default(); - let mut setup_incomplete: HashSet = { - let net_todo_targets_iter = - (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index }); - let udp_todo_targets_iter = - (0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index }); - net_todo_targets_iter.chain(udp_todo_targets_iter).collect() - }; - // progress by reacting to poll events. continue until every endpoint is set up - while !setup_incomplete.is_empty() { - // recompute the timeout for the poll call - let remaining = match (deadline, net_connect_to_retry.is_empty()) { - (None, true) => None, - (None, false) => Some(RETRY_PERIOD), - (Some(deadline), is_empty) => { - let dur_to_timeout = - deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?; - Some(if is_empty { dur_to_timeout } else { dur_to_timeout.min(RETRY_PERIOD) }) - } - }; - // block until either - // (a) `events` has been populated with 1+ elements - // (b) timeout elapses, or - // (c) RETRY_PERIOD elapses - poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?; - if last_retry_at.elapsed() > RETRY_PERIOD { - // Retry all net connections and reset `last_retry_at` - last_retry_at = Instant::now(); - for net_index in net_connect_to_retry.drain() { - // Restart connect procedure for this net endpoint - let net_todo = &mut net_todos[net_index]; - log!( - logger, - "Restarting connection with endpoint {:?} {:?}", - net_index, - net_todo.endpoint_setup.sock_addr - ); - match &mut net_todo.todo_endpoint { - NetTodoEndpoint::PeerInfoRecving(endpoint) => { - let mut new_stream = TcpStream::connect(net_todo.endpoint_setup.sock_addr) - .expect("mio::TcpStream connect should not fail!"); - std::mem::swap(&mut endpoint.stream, &mut new_stream); - let token = TokenTarget::NetEndpoint { index: net_index }.into(); - poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap(); - } - _ => unreachable!(), - } - } - } - for event in events.iter() { - let token = event.token(); - // figure out which endpoint the event belonged to - let token_target = TokenTarget::from(token); - match token_target { - TokenTarget::UdpEndpoint { index } => { - // UdpEndpoints are easy to complete. - // Their setup event just has to succeed without error - if !setup_incomplete.contains(&token_target) { - // spurious wakeup. this endpoint has already been set up! - continue; - } - let udp_todo: &UdpTodo = &udp_todos[index]; - if event.is_error() { - return Err(Ce::BindFailed(udp_todo.sock.local_addr().unwrap())); - } - setup_incomplete.remove(&token_target); - } - TokenTarget::NetEndpoint { index } => { - // NetEndpoints are complex to complete, - // they must accept/connect to their peer, - // and then exchange port info successfully - let net_todo = &mut net_todos[index]; - if let NetTodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint { - // Passive endpoint that will first try accept the peer's connection - match listener.accept() { - Err(e) if err_would_block(&e) => continue, // spurious wakeup - Err(_) => { - log!(logger, "accept() failure on index {}", index); - return Err(Ce::AcceptFailed(listener.local_addr().unwrap())); - } - Ok((mut stream, peer_addr)) => { - // successfully accepted the active peer - // reusing the token, but now for the stream and not the listener - poll.registry().deregister(listener).unwrap(); - poll.registry().register(&mut stream, token, BOTH).unwrap(); - log!( - logger, - "Endpoint[{}] accepted a connection from {:?}", - index, - peer_addr - ); - let net_endpoint = NetEndpoint { stream, inbox: vec![] }; - net_todo.todo_endpoint = - NetTodoEndpoint::PeerInfoRecving(net_endpoint); - } - } - } - // OK now let's try and finish exchanging port info - if let NetTodoEndpoint::PeerInfoRecving(net_endpoint) = - &mut net_todo.todo_endpoint - { - if event.is_error() { - // event signals some error! :( - if net_todo.endpoint_setup.endpoint_polarity - == EndpointPolarity::Passive - { - // breaking as the acceptor is currently unrecoverable - return Err(Ce::AcceptFailed( - net_endpoint.stream.local_addr().unwrap(), - )); - } - // this actively-connecting endpoint failed to connect! - // We will schedule it for a retry - net_connect_to_retry.insert(index); - continue; - } - // event wasn't ERROR - if net_connect_to_retry.contains(&index) { - // spurious wakeup. already scheduled to retry connect later - continue; - } - if !setup_incomplete.contains(&token_target) { - // spurious wakeup. this endpoint has already been completed! - if event.is_readable() { - net_polled_undrained.insert(index); - } - continue; - } - let local_info = port_info - .map - .get(&net_todo.endpoint_setup.getter_for_incoming) - .expect("Net Setup's getter port info isn't known"); // unreachable - if event.is_writable() && !net_todo.sent_local_port { - // can write and didn't send setup msg yet? Do so! - let _ = net_endpoint.stream.set_nodelay(true); - let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { - owner: local_info.owner, - polarity: local_info.polarity, - port: net_todo.endpoint_setup.getter_for_incoming, - })); - net_endpoint - .send(&msg, &mut io_byte_buffer) - .map_err(|e| { - Ce::NetEndpointSetupError( - net_endpoint.stream.local_addr().unwrap(), - e, - ) - }) - .unwrap(); - log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); - net_todo.sent_local_port = true; - } - if event.is_readable() && net_todo.recv_peer_port.is_none() { - // can read and didn't finish recving setup msg yet? Do so! - let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| { - Ce::NetEndpointSetupError( - net_endpoint.stream.local_addr().unwrap(), - e, - ) - })?; - if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() { - net_polled_undrained.insert(index); - } - match maybe_msg { - None => {} // msg deserialization incomplete - Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => { - log!( - logger, - "endpoint[{}] got peer info {:?}", - index, - peer_info - ); - if peer_info.polarity == local_info.polarity { - return Err(ConnectError::PortPeerPolarityMismatch( - net_todo.endpoint_setup.getter_for_incoming, - )); - } - net_todo.recv_peer_port = Some(peer_info.port); - // finally learned the peer of this port! - port_info - .map - .get_mut(&net_todo.endpoint_setup.getter_for_incoming) - .unwrap() - .peer = Some(peer_info.port); - // learned the info of this peer port - port_info.map.entry(peer_info.port).or_insert({ - port_info - .owned - .entry(peer_info.owner) - .or_default() - .insert(peer_info.port); - PortInfo { - peer: Some(net_todo.endpoint_setup.getter_for_incoming), - polarity: peer_info.polarity, - owner: peer_info.owner, - route: Route::NetEndpoint { index }, - } - }); - } - Some(inappropriate_msg) => { - log!( - logger, - "delaying msg {:?} during channel setup phase", - inappropriate_msg - ); - delayed_messages.push((index, inappropriate_msg)); - } - } - } - // is the setup for this net_endpoint now complete? - if net_todo.sent_local_port && net_todo.recv_peer_port.is_some() { - // yes! connected, sent my info and received peer's info - setup_incomplete.remove(&token_target); - log!(logger, "endpoint[{}] is finished!", index); - } - } - } - } - } - events.clear(); - } - log!(logger, "Endpoint setup complete! Cleaning up and building structures"); - let net_endpoint_exts = net_todos - .into_iter() - .enumerate() - .map(|(index, NetTodo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt { - net_endpoint: match todo_endpoint { - NetTodoEndpoint::PeerInfoRecving(mut net_endpoint) => { - let token = TokenTarget::NetEndpoint { index }.into(); - poll.registry() - .reregister(&mut net_endpoint.stream, token, Interest::READABLE) - .unwrap(); - net_endpoint - } - _ => unreachable!(), - }, - getter_for_incoming: endpoint_setup.getter_for_incoming, - }) - .collect(); - let udp_endpoint_exts = udp_todos - .into_iter() - .enumerate() - .map(|(index, udp_todo)| { - let UdpTodo { mut sock, getter_for_incoming } = udp_todo; - let token = TokenTarget::UdpEndpoint { index }.into(); - poll.registry().reregister(&mut sock, token, Interest::READABLE).unwrap(); - UdpEndpointExt { - sock, - outgoing_payloads: Default::default(), - received_this_round: false, - getter_for_incoming, - } - }) - .collect(); - let endpoint_manager = EndpointManager { - poll, - events, - undelayed_messages: delayed_messages, // no longer delayed - delayed_messages: Default::default(), - net_endpoint_store: EndpointStore { - endpoint_exts: net_endpoint_exts, - polled_undrained: net_polled_undrained, - }, - udp_endpoint_store: EndpointStore { - endpoint_exts: udp_endpoint_exts, - polled_undrained: udp_polled_undrained, - }, - io_byte_buffer, - }; - Ok(endpoint_manager) -} - -// Given a fully-formed endpoint manager, -// construct the consensus tree with: -// 1. decentralized leader election -// 2. centralized tree construction -fn init_neighborhood( - connector_id: ConnectorId, - logger: &mut dyn Logger, - em: &mut EndpointManager, - deadline: &Option, -) -> Result { - use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; - - // storage structure for the state of a distributed wave - // (for readability) - #[derive(Debug)] - struct WaveState { - parent: Option, - leader: ConnectorId, - } - - // kick off a leader-election wave rooted at myself - // given the desired wave information - // (e.g. don't inform my parent if they exist) - fn do_wave( - em: &mut EndpointManager, - awaiting: &mut HashSet, - ws: &WaveState, - ) -> Result<(), ConnectError> { - awaiting.clear(); - let msg = S(Sm::LeaderWave { wave_leader: ws.leader }); - for index in em.index_iter() { - if Some(index) != ws.parent { - em.send_to_setup(index, &msg)?; - awaiting.insert(index); - } - } - Ok(()) - } - /////////////////////// - /* - Conceptually, we have two distinct disstributed algorithms back-to-back - 1. Leader election using echo algorithm with extinction. - - Each connector initiates a wave tagged with their ID - - Connectors participate in waves of GREATER ID, abandoning previous waves - - Only the wave of the connector with GREATEST ID completes, whereupon they are the leader - 2. Tree construction - - The leader broadcasts their leadership with msg A - - Upon receiving their first announcement, connectors reply B, and send A to all peers - - A controller exits once they have received A or B from each neighbor - - The actual implementation is muddier, because non-leaders aren't aware of termiantion of algorithm 1, - so they rely on receipt of the leader's announcement to realize that algorithm 2 has begun. - - NOTE the distinction between PARENT and LEADER - */ - log!(logger, "beginning neighborhood construction"); - if em.num_net_endpoints() == 0 { - log!(logger, "Edge case of no neighbors! No parent an no children!"); - return Ok(Neighborhood { parent: None, children: VecSet::new(vec![]) }); - } - log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_net_endpoints()); - let mut awaiting = HashSet::with_capacity(em.num_net_endpoints()); - // 1+ neighbors. Leader can only be learned by receiving messages - // loop ends when I know my sink tree parent (implies leader was elected) - let election_result: WaveState = { - // initially: No parent, I'm the best leader. - let mut best_wave = WaveState { parent: None, leader: connector_id }; - // start a wave for this initial state - do_wave(em, &mut awaiting, &best_wave)?; - // with 1+ neighbors, progress is only made in response to incoming messages - em.undelay_all(); - 'election: loop { - log!(logger, "Election loop. awaiting {:?}...", awaiting.iter()); - let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?; - log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); - match msg { - S(Sm::LeaderAnnounce { tree_leader }) => { - // A neighbor explicitly tells me who is the leader - // they become my parent, and I adopt their announced leader - let election_result = - WaveState { leader: tree_leader, parent: Some(recv_index) }; - log!(logger, "Election lost! Result {:?}", &election_result); - assert!(election_result.leader >= best_wave.leader); - assert_ne!(election_result.leader, connector_id); - break 'election election_result; - } - S(Sm::LeaderWave { wave_leader }) => { - use Ordering as O; - match wave_leader.cmp(&best_wave.leader) { - O::Less => log!( - logger, - "Ignoring wave with Id {:?}<{:?}", - wave_leader, - best_wave.leader - ), - O::Greater => { - log!( - logger, - "Joining wave with Id {:?}>{:?}", - wave_leader, - best_wave.leader - ); - best_wave = WaveState { leader: wave_leader, parent: Some(recv_index) }; - log!(logger, "New wave state {:?}", &best_wave); - do_wave(em, &mut awaiting, &best_wave)?; - if awaiting.is_empty() { - log!(logger, "Special case! Only neighbor is parent. Replying to {:?} msg {:?}", recv_index, &msg); - em.send_to_setup(recv_index, &msg)?; - } - } - O::Equal => { - assert!(awaiting.remove(&recv_index)); - log!( - logger, - "Wave reply from index {:?} for leader {:?}. Now awaiting {} replies", - recv_index, - best_wave.leader, - awaiting.len() - ); - if awaiting.is_empty() { - if let Some(parent) = best_wave.parent { - log!( - logger, - "Sub-wave done! replying to parent {:?} msg {:?}", - parent, - &msg - ); - em.send_to_setup(parent, &msg)?; - } else { - let election_result: WaveState = best_wave; - log!(logger, "Election won! Result {:?}", &election_result); - break 'election election_result; - } - } - } - } - } - msg @ S(Sm::YouAreMyParent) | msg @ S(Sm::MyPortInfo(_)) => { - log!(logger, "Endpont {:?} sent unexpected msg! {:?}", recv_index, &msg); - return Err(Ce::SetupAlgMisbehavior); - } - msg @ Msg::CommMsg { .. } => { - log!(logger, "delaying msg {:?} during election algorithm", msg); - em.delayed_messages.push((recv_index, msg)); - } - } - } - }; - - // starting algorithm 2. Send a message to every neighbor - // namely, send "YouAreMyParent" to parent (if they exist), - // and LeaderAnnounce to everyone else - log!(logger, "Starting tree construction. Step 1: send one msg per neighbor"); - awaiting.clear(); - for index in em.index_iter() { - if Some(index) == election_result.parent { - em.send_to_setup(index, &S(Sm::YouAreMyParent))?; - } else { - awaiting.insert(index); - em.send_to_setup( - index, - &S(Sm::LeaderAnnounce { tree_leader: election_result.leader }), - )?; - } - } - // Receive one message from each neighbor to learn - // whether they consider me their parent or not. - let mut children = vec![]; - em.undelay_all(); - while !awaiting.is_empty() { - log!(logger, "Tree construction_loop loop. awaiting {:?}...", awaiting.iter()); - let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?; - log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); - match msg { - S(Sm::LeaderAnnounce { .. }) => { - // `recv_index` is not my child - log!( - logger, - "Got reply from non-child index {:?}. Children: {:?}", - recv_index, - children.iter() - ); - if !awaiting.remove(&recv_index) { - return Err(Ce::SetupAlgMisbehavior); - } - } - S(Sm::YouAreMyParent) => { - if !awaiting.remove(&recv_index) { - log!( - logger, - "Got reply from child index {:?}. Children before... {:?}", - recv_index, - children.iter() - ); - return Err(Ce::SetupAlgMisbehavior); - } - // `recv_index` is my child - children.push(recv_index); - } - msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => { - log!(logger, "discarding old message {:?} during election", msg); - } - msg @ Msg::CommMsg { .. } => { - log!(logger, "delaying msg {:?} during election", msg); - em.delayed_messages.push((recv_index, msg)); - } - } - } - // Neighborhood complete! - children.shrink_to_fit(); - let neighborhood = - Neighborhood { parent: election_result.parent, children: VecSet::new(children) }; - log!(logger, "Neighborhood constructed {:?}", &neighborhood); - Ok(neighborhood) -} diff --git a/src/runtime_old/tests.rs b/src/runtime_old/tests.rs deleted file mode 100644 index b9cf20cd3d9a847e5d489335d4b310605476d26c..0000000000000000000000000000000000000000 --- a/src/runtime_old/tests.rs +++ /dev/null @@ -1,1491 +0,0 @@ -use crate as reowolf; -use crossbeam_utils::thread::scope; -use reowolf::{ - error::*, - EndpointPolarity::{Active, Passive}, - Polarity::{Getter, Putter}, - *, -}; -use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration}; -////////////////////////////////////////// -const MS100: Option = Some(Duration::from_millis(100)); -const MS300: Option = Some(Duration::from_millis(300)); -const SEC1: Option = Some(Duration::from_secs(1)); -const SEC5: Option = Some(Duration::from_secs(5)); -const SEC15: Option = Some(Duration::from_secs(15)); -fn next_test_addr() -> SocketAddr { - use std::{ - net::{Ipv4Addr, SocketAddrV4}, - sync::atomic::{AtomicU16, Ordering::SeqCst}, - }; - static TEST_PORT: AtomicU16 = AtomicU16::new(5_000); - let port = TEST_PORT.fetch_add(1, SeqCst); - SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into() -} -fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector { - file_logged_configured_connector(connector_id, dir_path, MINIMAL_PROTO.clone()) -} -fn file_logged_configured_connector( - connector_id: ConnectorId, - dir_path: &Path, - pd: Arc, -) -> Connector { - let _ = std::fs::create_dir_all(dir_path).expect("Failed to create log output dir"); - let path = dir_path.join(format!("cid_{:?}.txt", connector_id)); - let file = File::create(path).expect("Failed to create log output file!"); - let file_logger = Box::new(FileLogger::new(connector_id, file)); - Connector::new(file_logger, pd, connector_id) -} -static MINIMAL_PDL: &'static [u8] = b" -primitive sync_component(in a, out b) { - while (true) { - sync { - if (fires(a) && fires(b)) { - msg x = get(a); - put(b, x); - } else { - assert(!fires(a) && !fires(b)); - } - } - } -} - -primitive together(in ia, in ib, out oa, out ob){ - while(true) sync { - if(fires(ia)) { - put(oa, get(ia)); - put(ob, get(ib)); - } - } -} -"; -lazy_static::lazy_static! { - static ref MINIMAL_PROTO: Arc = { - Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap()) - }; -} -static TEST_MSG_BYTES: &'static [u8] = b"hello"; -lazy_static::lazy_static! { - static ref TEST_MSG: Payload = { - Payload::from(TEST_MSG_BYTES) - }; -} -fn new_u8_buffer(cap: usize) -> Vec { - let mut v = Vec::with_capacity(cap); - // Safe! len will cover owned bytes in valid state - unsafe { v.set_len(cap) } - v -} -////////////////////////////////////////// - -#[test] -fn basic_connector() { - Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0); -} - -#[test] -fn basic_logged_connector() { - let test_log_path = Path::new("./logs/basic_logged_connector"); - file_logged_connector(0, test_log_path); -} - -#[test] -fn new_port_pair() { - let test_log_path = Path::new("./logs/new_port_pair"); - let mut c = file_logged_connector(0, test_log_path); - let [_, _] = c.new_port_pair(); - let [_, _] = c.new_port_pair(); -} - -#[test] -fn new_sync() { - let test_log_path = Path::new("./logs/new_sync"); - let mut c = file_logged_connector(0, test_log_path); - let [o, i] = c.new_port_pair(); - c.add_component(b"", b"sync_component", &[i, o]).unwrap(); -} - -#[test] -fn new_net_port() { - let test_log_path = Path::new("./logs/new_net_port"); - let mut c = file_logged_connector(0, test_log_path); - let sock_addrs = [next_test_addr()]; - let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); -} - -#[test] -fn trivial_connect() { - let test_log_path = Path::new("./logs/trivial_connect"); - let mut c = file_logged_connector(0, test_log_path); - c.connect(SEC1).unwrap(); -} - -#[test] -fn single_node_connect() { - let test_log_path = Path::new("./logs/single_node_connect"); - let sock_addrs = [next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); -} - -#[test] -fn minimal_net_connect() { - let test_log_path = Path::new("./logs/minimal_net_connect"); - let sock_addrs = [next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let _ = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let _ = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn put_no_sync() { - let test_log_path = Path::new("./logs/put_no_sync"); - let mut c = file_logged_connector(0, test_log_path); - let [o, _] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.put(o, TEST_MSG.clone()).unwrap(); -} - -#[test] -fn wrong_polarity_bad() { - let test_log_path = Path::new("./logs/wrong_polarity_bad"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.put(i, TEST_MSG.clone()).unwrap_err(); -} - -#[test] -fn dup_put_bad() { - let test_log_path = Path::new("./logs/dup_put_bad"); - let mut c = file_logged_connector(0, test_log_path); - let [o, _] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.put(o, TEST_MSG.clone()).unwrap(); - c.put(o, TEST_MSG.clone()).unwrap_err(); -} - -#[test] -fn trivial_sync() { - let test_log_path = Path::new("./logs/trivial_sync"); - let mut c = file_logged_connector(0, test_log_path); - c.connect(SEC1).unwrap(); - c.sync(SEC1).unwrap(); -} - -#[test] -fn unconnected_gotten_err() { - let test_log_path = Path::new("./logs/unconnected_gotten_err"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); -} - -#[test] -fn connected_gotten_err_no_round() { - let test_log_path = Path::new("./logs/connected_gotten_err_no_round"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); -} - -#[test] -fn connected_gotten_err_ungotten() { - let test_log_path = Path::new("./logs/connected_gotten_err_ungotten"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.sync(SEC1).unwrap(); - assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err()); -} - -#[test] -fn native_polarity_checks() { - let test_log_path = Path::new("./logs/native_polarity_checks"); - let mut c = file_logged_connector(0, test_log_path); - let [o, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - // fail... - c.get(o).unwrap_err(); - c.put(i, TEST_MSG.clone()).unwrap_err(); - // succeed.. - c.get(i).unwrap(); - c.put(o, TEST_MSG.clone()).unwrap(); -} - -#[test] -fn native_multiple_gets() { - let test_log_path = Path::new("./logs/native_multiple_gets"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(i).unwrap(); - c.get(i).unwrap_err(); -} - -#[test] -fn next_batch() { - let test_log_path = Path::new("./logs/next_batch"); - let mut c = file_logged_connector(0, test_log_path); - c.next_batch().unwrap_err(); - c.connect(SEC1).unwrap(); - c.next_batch().unwrap(); - c.next_batch().unwrap(); - c.next_batch().unwrap(); -} - -#[test] -fn native_self_msg() { - let test_log_path = Path::new("./logs/native_self_msg"); - let mut c = file_logged_connector(0, test_log_path); - let [o, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(i).unwrap(); - c.put(o, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); -} - -#[test] -fn two_natives_msg() { - let test_log_path = Path::new("./logs/two_natives_msg"); - let sock_addrs = [next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); - c.sync(SEC1).unwrap(); - c.gotten(g).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn trivial_nondet() { - let test_log_path = Path::new("./logs/trivial_nondet"); - let mut c = file_logged_connector(0, test_log_path); - let [_, i] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(i).unwrap(); - // getting 0 batch - c.next_batch().unwrap(); - // silent 1 batch - assert_eq!(1, c.sync(SEC1).unwrap()); - c.gotten(i).unwrap_err(); -} - -#[test] -fn connector_pair_nondet() { - let test_log_path = Path::new("./logs/connector_pair_nondet"); - let sock_addrs = [next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - c.next_batch().unwrap(); - c.get(g).unwrap(); - assert_eq!(1, c.sync(SEC1).unwrap()); - c.gotten(g).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let p = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn native_immediately_inconsistent() { - let test_log_path = Path::new("./logs/native_immediately_inconsistent"); - let mut c = file_logged_connector(0, test_log_path); - let [_, g] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); - c.sync(SEC15).unwrap_err(); -} - -#[test] -fn native_recovers() { - let test_log_path = Path::new("./logs/native_recovers"); - let mut c = file_logged_connector(0, test_log_path); - let [p, g] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); - c.sync(SEC15).unwrap_err(); - c.put(p, TEST_MSG.clone()).unwrap(); - c.get(g).unwrap(); - c.sync(SEC15).unwrap(); -} - -#[test] -fn cannot_use_moved_ports() { - /* - native p|-->|g sync - */ - let test_log_path = Path::new("./logs/cannot_use_moved_ports"); - let mut c = file_logged_connector(0, test_log_path); - let [p, g] = c.new_port_pair(); - c.add_component(b"", b"sync_component", &[g, p]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p, TEST_MSG.clone()).unwrap_err(); - c.get(g).unwrap_err(); -} - -#[test] -fn sync_sync() { - /* - native p0|-->|g0 sync - g1|<--|p1 - */ - let test_log_path = Path::new("./logs/sync_sync"); - let mut c = file_logged_connector(0, test_log_path); - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - c.add_component(b"", b"sync_component", &[g0, p1]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(g1).unwrap(); - c.sync(SEC1).unwrap(); - c.gotten(g1).unwrap(); -} - -#[test] -fn double_net_connect() { - let test_log_path = Path::new("./logs/double_net_connect"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let [_p, _g] = [ - c.new_net_port(Putter, sock_addrs[0], Active).unwrap(), - c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), - ]; - c.connect(SEC1).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let [_g, _p] = [ - c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), - c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), - ]; - c.connect(SEC1).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn distributed_msg_bounce() { - /* - native[0] | sync 0.p|-->|1.p native[1] - 0.g|<--|1.g - */ - let test_log_path = Path::new("./logs/distributed_msg_bounce"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - /* - native | sync p|--> - | g|<-- - */ - let mut c = file_logged_connector(0, test_log_path); - let [p, g] = [ - c.new_net_port(Putter, sock_addrs[0], Active).unwrap(), - c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), - ]; - c.add_component(b"", b"sync_component", &[g, p]).unwrap(); - c.connect(SEC1).unwrap(); - c.sync(SEC1).unwrap(); - }); - s.spawn(|_| { - /* - native p|--> - g|<-- - */ - let mut c = file_logged_connector(1, test_log_path); - let [g, p] = [ - c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), - c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), - ]; - c.connect(SEC1).unwrap(); - c.put(p, TEST_MSG.clone()).unwrap(); - c.get(g).unwrap(); - c.sync(SEC1).unwrap(); - c.gotten(g).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn local_timeout() { - let test_log_path = Path::new("./logs/local_timeout"); - let mut c = file_logged_connector(0, test_log_path); - let [_, g] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); - match c.sync(MS300) { - Err(SyncError::RoundFailure) => {} - res => panic!("expeted timeout. but got {:?}", res), - } -} - -#[test] -fn parent_timeout() { - let test_log_path = Path::new("./logs/parent_timeout"); - let sock_addrs = [next_test_addr()]; - scope(|s| { - s.spawn(|_| { - // parent; times out - let mut c = file_logged_connector(999, test_log_path); - let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - c.sync(MS300).unwrap_err(); // timeout - }); - s.spawn(|_| { - // child - let mut c = file_logged_connector(000, test_log_path); - let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); // not matched by put - c.sync(None).unwrap_err(); // no timeout - }); - }) - .unwrap(); -} - -#[test] -fn child_timeout() { - let test_log_path = Path::new("./logs/child_timeout"); - let sock_addrs = [next_test_addr()]; - scope(|s| { - s.spawn(|_| { - // child; times out - let mut c = file_logged_connector(000, test_log_path); - let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - c.sync(MS300).unwrap_err(); // timeout - }); - s.spawn(|_| { - // parent - let mut c = file_logged_connector(999, test_log_path); - let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - c.get(g).unwrap(); // not matched by put - c.sync(None).unwrap_err(); // no timeout - }); - }) - .unwrap(); -} - -#[test] -fn chain_connect() { - let test_log_path = Path::new("./logs/chain_connect"); - let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC5).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(10, test_log_path); - c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); - c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(); - c.connect(SEC5).unwrap(); - }); - s.spawn(|_| { - // LEADER - let mut c = file_logged_connector(7, test_log_path); - c.new_net_port(Getter, sock_addrs[1], Active).unwrap(); - c.new_net_port(Putter, sock_addrs[2], Passive).unwrap(); - c.connect(SEC5).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(4, test_log_path); - c.new_net_port(Getter, sock_addrs[2], Active).unwrap(); - c.new_net_port(Putter, sock_addrs[3], Passive).unwrap(); - c.connect(SEC5).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - c.new_net_port(Getter, sock_addrs[3], Active).unwrap(); - c.connect(SEC5).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn net_self_loop() { - let test_log_path = Path::new("./logs/net_self_loop"); - let sock_addrs = [next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let p = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p, TEST_MSG.clone()).unwrap(); - c.get(g).unwrap(); - c.sync(MS300).unwrap(); -} - -#[test] -fn nobody_connects_active() { - let test_log_path = Path::new("./logs/nobody_connects_active"); - let sock_addrs = [next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let _g = c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); - c.connect(Some(Duration::from_secs(5))).unwrap_err(); -} -#[test] -fn nobody_connects_passive() { - let test_log_path = Path::new("./logs/nobody_connects_passive"); - let sock_addrs = [next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let _g = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - c.connect(Some(Duration::from_secs(5))).unwrap_err(); -} - -#[test] -fn together() { - let test_log_path = Path::new("./logs/together"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let [p0, p1] = c.new_port_pair(); - let p2 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let p3 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); - let [p4, p5] = c.new_port_pair(); - c.add_component(b"", b"together", &[p1, p2, p3, p4]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(p5).unwrap(); - c.sync(MS300).unwrap(); - c.gotten(p5).unwrap(); - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let [p0, p1] = c.new_port_pair(); - let p2 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); - let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let [p4, p5] = c.new_port_pair(); - c.add_component(b"", b"together", &[p1, p2, p3, p4]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(p5).unwrap(); - c.sync(MS300).unwrap(); - c.gotten(p5).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn native_batch_distinguish() { - let test_log_path = Path::new("./logs/native_batch_distinguish"); - let mut c = file_logged_connector(0, test_log_path); - c.connect(SEC1).unwrap(); - c.next_batch().unwrap(); - c.sync(SEC1).unwrap(); -} - -#[test] -fn multirounds() { - let test_log_path = Path::new("./logs/multirounds"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); - c.connect(SEC1).unwrap(); - for _ in 0..10 { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(p1).unwrap(); - c.sync(SEC1).unwrap(); - } - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); - c.connect(SEC1).unwrap(); - for _ in 0..10 { - c.get(p0).unwrap(); - c.put(p1, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - } - }); - }) - .unwrap(); -} - -#[test] -fn multi_recover() { - let test_log_path = Path::new("./logs/multi_recover"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let success_iter = [true, false].iter().copied().cycle().take(10); - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); - c.connect(SEC1).unwrap(); - for succeeds in success_iter.clone() { - c.put(p0, TEST_MSG.clone()).unwrap(); - if succeeds { - c.get(p1).unwrap(); - } - let res = c.sync(MS300); - assert_eq!(res.is_ok(), succeeds); - } - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); - c.connect(SEC1).unwrap(); - for succeeds in success_iter.clone() { - c.get(p0).unwrap(); - c.put(p1, TEST_MSG.clone()).unwrap(); - let res = c.sync(MS300); - assert_eq!(res.is_ok(), succeeds); - } - }); - }) - .unwrap(); -} - -#[test] -fn udp_self_connect() { - let test_log_path = Path::new("./logs/udp_self_connect"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.new_udp_mediator_component(sock_addrs[1], sock_addrs[0]).unwrap(); - c.connect(SEC1).unwrap(); -} - -#[test] -fn solo_udp_put_success() { - let test_log_path = Path::new("./logs/solo_udp_put_success"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.sync(MS300).unwrap(); -} - -#[test] -fn solo_udp_get_fail() { - let test_log_path = Path::new("./logs/solo_udp_get_fail"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let mut c = file_logged_connector(0, test_log_path); - let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.connect(SEC1).unwrap(); - c.get(p0).unwrap(); - c.sync(MS300).unwrap_err(); -} - -#[ignore] -#[test] -fn reowolf_to_udp() { - let test_log_path = Path::new("./logs/reowolf_to_udp"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let barrier = std::sync::Barrier::new(2); - scope(|s| { - s.spawn(|_| { - barrier.wait(); - // reowolf thread - let mut c = file_logged_connector(0, test_log_path); - let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.sync(MS300).unwrap(); - barrier.wait(); - }); - s.spawn(|_| { - barrier.wait(); - // udp thread - let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); - udp.connect(sock_addrs[0]).unwrap(); - let mut buf = new_u8_buffer(256); - let len = udp.recv(&mut buf).unwrap(); - assert_eq!(TEST_MSG_BYTES, &buf[0..len]); - barrier.wait(); - }); - }) - .unwrap(); -} - -#[ignore] -#[test] -fn udp_to_reowolf() { - let test_log_path = Path::new("./logs/udp_to_reowolf"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let barrier = std::sync::Barrier::new(2); - scope(|s| { - s.spawn(|_| { - barrier.wait(); - // reowolf thread - let mut c = file_logged_connector(0, test_log_path); - let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.connect(SEC1).unwrap(); - c.get(p0).unwrap(); - c.sync(SEC5).unwrap(); - assert_eq!(c.gotten(p0).unwrap().as_slice(), TEST_MSG_BYTES); - barrier.wait(); - }); - s.spawn(|_| { - barrier.wait(); - // udp thread - let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); - udp.connect(sock_addrs[0]).unwrap(); - for _ in 0..15 { - udp.send(TEST_MSG_BYTES).unwrap(); - std::thread::sleep(MS100.unwrap()); - } - barrier.wait(); - }); - }) - .unwrap(); -} - -#[test] -fn udp_reowolf_swap() { - let test_log_path = Path::new("./logs/udp_reowolf_swap"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - let barrier = std::sync::Barrier::new(2); - scope(|s| { - s.spawn(|_| { - barrier.wait(); - // reowolf thread - let mut c = file_logged_connector(0, test_log_path); - let [p0, p1] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap(); - c.connect(SEC1).unwrap(); - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(p1).unwrap(); - c.sync(SEC5).unwrap(); - assert_eq!(c.gotten(p1).unwrap().as_slice(), TEST_MSG_BYTES); - barrier.wait(); - }); - s.spawn(|_| { - barrier.wait(); - // udp thread - let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); - udp.connect(sock_addrs[0]).unwrap(); - let mut buf = new_u8_buffer(256); - for _ in 0..5 { - std::thread::sleep(Duration::from_millis(60)); - udp.send(TEST_MSG_BYTES).unwrap(); - } - let len = udp.recv(&mut buf).unwrap(); - assert_eq!(TEST_MSG_BYTES, &buf[0..len]); - barrier.wait(); - }); - }) - .unwrap(); -} - -#[test] -fn example_pres_3() { - let test_log_path = Path::new("./logs/example_pres_3"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - // "amy" - let mut c = file_logged_connector(0, test_log_path); - let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); - c.connect(SEC1).unwrap(); - // put {A} and FAIL - c.put(p0, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap_err(); - // put {B} and FAIL - c.put(p1, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap_err(); - // put {A, B} and SUCCEED - c.put(p0, TEST_MSG.clone()).unwrap(); - c.put(p1, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - }); - s.spawn(|_| { - // "bob" - let mut c = file_logged_connector(1, test_log_path); - let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); - c.connect(SEC1).unwrap(); - for _ in 0..2 { - // get {A, B} and FAIL - c.get(p0).unwrap(); - c.get(p1).unwrap(); - c.sync(SEC1).unwrap_err(); - } - // get {A, B} and SUCCEED - c.get(p0).unwrap(); - c.get(p1).unwrap(); - c.sync(SEC1).unwrap(); - }); - }) - .unwrap(); -} - -#[test] -fn ac_not_b() { - let test_log_path = Path::new("./logs/ac_not_b"); - let sock_addrs = [next_test_addr(), next_test_addr()]; - scope(|s| { - s.spawn(|_| { - // "amy" - let mut c = file_logged_connector(0, test_log_path); - let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); - c.connect(SEC5).unwrap(); - - // put both A and B - c.put(p0, TEST_MSG.clone()).unwrap(); - c.put(p1, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap_err(); - }); - s.spawn(|_| { - // "bob" - let pdl = b" - primitive ac_not_b(in a, in b, out c){ - // forward A to C but keep B silent - sync { put(c, get(a)); } - }"; - let pd = Arc::new(reowolf::ProtocolDescription::parse(pdl).unwrap()); - let mut c = file_logged_configured_connector(1, test_log_path, pd); - let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); - let [a, b] = c.new_port_pair(); - - c.add_component(b"", b"ac_not_b", &[p0, p1, a]).unwrap(); - - c.connect(SEC1).unwrap(); - - c.get(b).unwrap(); - c.sync(SEC1).unwrap_err(); - }); - }) - .unwrap(); -} - -#[test] -fn many_rounds_net() { - let test_log_path = Path::new("./logs/many_rounds_net"); - let sock_addrs = [next_test_addr()]; - const NUM_ROUNDS: usize = 1_000; - scope(|s| { - s.spawn(|_| { - let mut c = file_logged_connector(0, test_log_path); - let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); - c.connect(SEC1).unwrap(); - for _ in 0..NUM_ROUNDS { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - } - }); - s.spawn(|_| { - let mut c = file_logged_connector(1, test_log_path); - let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); - c.connect(SEC1).unwrap(); - for _ in 0..NUM_ROUNDS { - c.get(p0).unwrap(); - c.sync(SEC1).unwrap(); - } - }); - }) - .unwrap(); -} -#[test] -fn many_rounds_mem() { - let test_log_path = Path::new("./logs/many_rounds_mem"); - const NUM_ROUNDS: usize = 1_000; - let mut c = file_logged_connector(0, test_log_path); - let [p0, p1] = c.new_port_pair(); - c.connect(SEC1).unwrap(); - for _ in 0..NUM_ROUNDS { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(p1).unwrap(); - c.sync(SEC1).unwrap(); - } -} - -#[test] -fn pdl_reo_lossy() { - let pdl = b" - primitive lossy(in a, out b) { - while(true) sync { - msg m = null; - if(fires(a)) { - m = get(a); - if(fires(b)) { - put(b, m); - } - } - } - } - "; - reowolf::ProtocolDescription::parse(pdl).unwrap(); -} - -#[test] -fn pdl_reo_fifo1() { - let pdl = b" - primitive fifo1(in a, out b) { - msg m = null; - while(true) sync { - if(m == null) { - if(fires(a)) m=get(a); - } else { - if(fires(b)) put(b, m); - m = null; - } - } - } - "; - reowolf::ProtocolDescription::parse(pdl).unwrap(); -} - -#[test] -fn pdl_reo_fifo1full() { - let test_log_path = Path::new("./logs/pdl_reo_fifo1full"); - let pdl = b" - primitive fifo1full(in a, out b) { - bool is_set = true; - msg m = create(0); - while(true) sync { - if(!is_set) { - if(fires(a)) m=get(a); - is_set = false; - } else { - if(fires(b)) put(b, m); - is_set = true; - } - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - let [_p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - c.add_component(b"", b"fifo1full", &[g0, p1]).unwrap(); - c.connect(None).unwrap(); - c.get(g1).unwrap(); - c.sync(None).unwrap(); - assert_eq!(0, c.gotten(g1).unwrap().len()); -} - -#[test] -fn pdl_msg_consensus() { - let test_log_path = Path::new("./logs/pdl_msg_consensus"); - let pdl = b" - primitive msgconsensus(in a, in b) { - while(true) sync { - msg x = get(a); - msg y = get(b); - assert(x == y); - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - c.add_component(b"", b"msgconsensus", &[g0, g1]).unwrap(); - c.connect(None).unwrap(); - c.put(p0, Payload::from(b"HELLO" as &[_])).unwrap(); - c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap(); - c.sync(SEC1).unwrap(); - - c.put(p0, Payload::from(b"HEY" as &[_])).unwrap(); - c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap(); - c.sync(SEC1).unwrap_err(); -} - -#[test] -fn sequencer3_prim() { - let test_log_path = Path::new("./logs/sequencer3_prim"); - let pdl = b" - primitive sequencer3(out a, out b, out c) { - u32 i = 0; - while(true) sync { - out to = a; - if (i==1) to = b; - else if(i==2) to = c; - if(fires(to)) { - put(to, create(0)); - i = (i + 1)%3; - } - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - let [p2, g2] = c.new_port_pair(); - c.add_component(b"", b"sequencer3", &[p0, p1, p2]).unwrap(); - c.connect(None).unwrap(); - - let which_of_three = move |c: &mut Connector| { - // setup three sync batches. sync. return which succeeded - c.get(g0).unwrap(); - c.next_batch().unwrap(); - c.get(g1).unwrap(); - c.next_batch().unwrap(); - c.get(g2).unwrap(); - c.sync(None).unwrap() - }; - - const TEST_ROUNDS: usize = 50; - // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...] - for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) { - // silent round - assert_eq!(0, c.sync(None).unwrap()); - // non silent round - assert_eq!(expected_batch_idx, which_of_three(&mut c)); - } -} - -#[test] -fn sequencer3_comp() { - let test_log_path = Path::new("./logs/sequencer3_comp"); - let pdl = b" - primitive replicator(in a, out b, out c) { - while (true) { - sync { - if (fires(a) && fires(b) && fires(c)) { - msg x = get(a); - put(b, x); - put(c, x); - } else { - assert(!fires(a) && !fires(b) && !fires(c)); - } - } - } - } - primitive fifo1_init(bool has_value, T m, in a, out b) { - while(true) sync { - if(has_value && fires(b)) { - put(b, m); - has_value = false; - } else if (!has_value && fires(a)) { - m = get(a); - has_value = true; - } - } - } - composite fifo1_full(in a, out b) { - new fifo1_init(true, create(0), a, b); - } - composite fifo1(in a, out b) { - new fifo1_init(false, create(0), a, b); - } - composite sequencer3(out a, out b, out c) { - channel d -> e; - channel f -> g; - channel h -> i; - channel j -> k; - channel l -> m; - channel n -> o; - - new fifo1_full(o, d); - new replicator(e, f, a); - new fifo1(g, h); - new replicator(i, j, b); - new fifo1(k, l); - new replicator(m, n, c); - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - let [p2, g2] = c.new_port_pair(); - c.add_component(b"", b"sequencer3", &[p0, p1, p2]).unwrap(); - c.connect(None).unwrap(); - - let which_of_three = move |c: &mut Connector| { - // setup three sync batches. sync. return which succeeded - c.get(g0).unwrap(); - c.next_batch().unwrap(); - c.get(g1).unwrap(); - c.next_batch().unwrap(); - c.get(g2).unwrap(); - c.sync(SEC1).unwrap() - }; - - const TEST_ROUNDS: usize = 50; - // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...] - for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) { - // silent round - assert_eq!(0, c.sync(SEC1).unwrap()); - // non silent round - assert_eq!(expected_batch_idx, which_of_three(&mut c)); - } -} - -enum XRouterItem { - Silent, - GetA, - GetB, -} -// Hardcoded pseudo-random sequence of round behaviors for the native component -const XROUTER_ITEMS: &[XRouterItem] = { - use XRouterItem::{GetA as A, GetB as B, Silent as S}; - &[ - B, A, S, B, A, A, B, S, B, S, A, A, S, B, B, S, B, S, B, B, S, B, B, A, B, B, A, B, A, B, - S, B, S, B, S, A, S, B, A, S, B, A, B, S, B, S, B, S, S, B, B, A, A, A, S, S, S, B, A, A, - A, S, S, B, B, B, A, B, S, S, A, A, B, A, B, B, A, A, A, B, A, B, S, A, B, S, A, A, B, S, - ] -}; - -#[test] -fn xrouter_prim() { - let test_log_path = Path::new("./logs/xrouter_prim"); - let pdl = b" - primitive xrouter(in a, out b, out c) { - while(true) sync { - if(fires(a)) { - if(fires(b)) put(b, get(a)); - else put(c, get(a)); - } - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) xrouter2, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - let [p2, g2] = c.new_port_pair(); - c.add_component(b"", b"xrouter", &[g0, p1, p2]).unwrap(); - c.connect(None).unwrap(); - - let now = std::time::Instant::now(); - for item in XROUTER_ITEMS.iter() { - match item { - XRouterItem::Silent => {} - XRouterItem::GetA => { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(g1).unwrap(); - } - XRouterItem::GetB => { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(g2).unwrap(); - } - } - assert_eq!(0, c.sync(SEC1).unwrap()); - } - println!("PRIM {:?}", now.elapsed()); -} -#[test] -fn xrouter_comp() { - let test_log_path = Path::new("./logs/xrouter_comp"); - let pdl = b" - primitive replicator(in a, out b, out c) { - while (true) { - sync { - if (fires(a) && fires(b) && fires(c)) { - msg x = get(a); - put(b, x); - put(c, x); - } else { - assert(!fires(a) && !fires(b) && !fires(c)); - } - } - } - } - - primitive merger(in a, in b, out c) { - while (true) { - sync { - if (fires(a) && !fires(b) && fires(c)) { - put(c, get(a)); - } else if (!fires(a) && fires(b) && fires(c)) { - put(c, get(b)); - } else { - assert(!fires(a) && !fires(b) && !fires(c)); - } - } - } - } - - primitive lossy(in a, out b) { - while(true) sync { - if(fires(a)) { - auto m = get(a); - if(fires(b)) put(b, m); - } - } - } - primitive sync_drain(in a, in b) { - while(true) sync { - if(fires(a)) { - msg drop_it = get(a); - msg on_the_floor = get(b); - } - } - } - composite xrouter(in a, out b, out c) { - channel d -> e; - channel f -> g; - channel h -> i; - channel j -> k; - channel l -> m; - channel n -> o; - channel p -> q; - channel r -> s; - channel t -> u; - - new replicator(a, d, f); - new replicator(g, t, h); - new lossy(e, l); - new lossy(i, j); - new replicator(m, b, p); - new replicator(k, n, c); - new merger(q, o, r); - new sync_drain(u, s); - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) xrouter2, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - let [p2, g2] = c.new_port_pair(); - c.add_component(b"", b"xrouter", &[g0, p1, p2]).unwrap(); - c.connect(None).unwrap(); - - let now = std::time::Instant::now(); - for item in XROUTER_ITEMS.iter() { - match item { - XRouterItem::Silent => {} - XRouterItem::GetA => { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(g1).unwrap(); - } - XRouterItem::GetB => { - c.put(p0, TEST_MSG.clone()).unwrap(); - c.get(g2).unwrap(); - } - } - assert_eq!(0, c.sync(SEC1).unwrap()); - } - println!("COMP {:?}", now.elapsed()); -} - -#[test] -fn count_stream() { - let test_log_path = Path::new("./logs/count_stream"); - let pdl = b" - primitive count_stream(out o) { - msg m = create(1); - m[0] = 0; - while(true) sync { - put(o, m); - m[0] += 1; - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - c.add_component(b"", b"count_stream", &[p0]).unwrap(); - c.connect(None).unwrap(); - - for expecting in 0u8..16 { - c.get(g0).unwrap(); - c.sync(None).unwrap(); - assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice()); - } -} - -#[test] -fn for_msg_byte() { - let test_log_path = Path::new("./logs/for_msg_byte"); - let pdl = b" - primitive for_msg_byte(out o) { - u8 i = 0; - u32 idx = 0; - while(i<8) { - msg m = create(1); - m[idx] = i; - sync put(o, m); - i += 1; - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. - let [p0, g0] = c.new_port_pair(); - c.add_component(b"", b"for_msg_byte", &[p0]).unwrap(); - c.connect(None).unwrap(); - - for expecting in 0u8..8 { - c.get(g0).unwrap(); - c.sync(None).unwrap(); - assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice()); - } - c.sync(None).unwrap(); -} - -#[test] -fn eq_causality() { - let test_log_path = Path::new("./logs/eq_causality"); - let pdl = b" - primitive eq(in a, in b, out c) { - msg ma = create(0); - msg mb = create(0); - while(true) sync { - if(fires(a)) { - // b and c also fire! - // left first! - ma = get(a); - put(c, ma); - mb = get(b); - assert(ma == mb); - } - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - /* - [native]p0-->g0[eq]p1--. - g1 | - ^---------` - */ - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - c.add_component(b"", b"eq", &[g0, g1, p1]).unwrap(); - - /* - V--------. - g2 | - [native]p2-->g3[eq]p3--` - */ - let [p2, g2] = c.new_port_pair(); - let [p3, g3] = c.new_port_pair(); - c.add_component(b"", b"eq", &[g3, g2, p3]).unwrap(); - c.connect(None).unwrap(); - - for _ in 0..4 { - // everything is fine with LEFT FIRST - c.put(p0, TEST_MSG.clone()).unwrap(); - c.sync(MS100).unwrap(); - - // no solution when left is NOT FIRST - c.put(p2, TEST_MSG.clone()).unwrap(); - c.sync(MS100).unwrap_err(); - } -} - -#[test] -fn eq_no_causality() { - let test_log_path = Path::new("./logs/eq_no_causality"); - let pdl = b" - composite eq(in a, in b, out c) { - channel leftfirsto -> leftfirsti; - new eqinner(a, b, c, leftfirsto, leftfirsti); - } - primitive eqinner(in a, in b, out c, out leftfirsto, in leftfirsti) { - msg ma = create(0); - msg mb = create(0); - while(true) sync { - if(fires(a)) { - // b and c also fire! - if(fires(leftfirsti)) { - // left first! DO USE DUMMY - ma = get(a); - put(c, ma); - mb = get(b); - - // using dummy! - put(leftfirsto, ma); - auto drop_it = get(leftfirsti); - } else { - // right first! DON'T USE DUMMY - mb = get(b); - put(c, mb); - ma = get(a); - } - assert(ma == mb); - } - } - } - "; - let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); - let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - - /* - [native]p0-->g0[eq]p1--. - g1 | - ^---------` - */ - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - c.add_component(b"", b"eq", &[g0, g1, p1]).unwrap(); - - /* - V--------. - g2 | - [native]p2-->g3[eq]p3--` - */ - let [p2, g2] = c.new_port_pair(); - let [p3, g3] = c.new_port_pair(); - c.add_component(b"", b"eq", &[g3, g2, p3]).unwrap(); - c.connect(None).unwrap(); - - for _ in 0..32 { - // ok when they send - c.put(p0, TEST_MSG.clone()).unwrap(); - c.put(p2, TEST_MSG.clone()).unwrap(); - c.sync(SEC1).unwrap(); - // ok when they don't - c.sync(SEC1).unwrap(); - } -}