diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 953151bdc93532b35e4ca164763cd350ecd96ba7..b4342f823c424d21082fcd76c5d1cbfa7a5eb750 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -95,28 +95,31 @@ impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> { } } impl Connector { - fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> { - if let ConnectorPhased::Communication(comm) = &mut self.phased { - Some(comm) - } else { - None - } - } - pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { + /// Read the message received by the given port in the previous synchronous round. + pub fn gotten(&self, port: PortId) -> Result<&Payload, GottenError> { use GottenError as Ge; - let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?; - match &comm.round_result { - Err(_) => Err(Ge::PreviousSyncFailed), - Ok(None) => Err(Ge::NoPreviousRound), - Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet), + if let ConnectorPhased::Communication(comm) = &self.phased { + match &comm.round_result { + Err(_) => Err(Ge::PreviousSyncFailed), + Ok(None) => Err(Ge::NoPreviousRound), + Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet), + } + } else { + return Err(Ge::NoPreviousRound); } } + /// Creates a new, empty synchronous batch for the connector and selects it. + /// Subsequent calls to `put` and `get` with populate the new batch with port operations. pub fn next_batch(&mut self) -> Result { // returns index of new batch - let comm = self.get_comm_mut().ok_or(WrongStateError)?; - comm.native_batches.push(Default::default()); - Ok(comm.native_batches.len() - 1) + if let ConnectorPhased::Communication(comm) = &mut self.phased { + comm.native_batches.push(Default::default()); + Ok(comm.native_batches.len() - 1) + } else { + Err(WrongStateError) + } } + fn port_op_access( &mut self, port: PortId, @@ -139,6 +142,10 @@ impl Connector { } } } + + /// Add a `put` operation to the connector's currently-selected synchronous batch. + /// Returns an error if the given port is not owned by the native component, + /// has the wrong polarity, or is already included in the batch. pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> { use PortOpError as Poe; let batch = self.port_op_access(port, Putter)?; @@ -149,6 +156,10 @@ impl Connector { Ok(()) } } + + /// Add a `get` operation to the connector's currently-selected synchronous batch. + /// Returns an error if the given port is not owned by the native component, + /// has the wrong polarity, or is already included in the batch. pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> { use PortOpError as Poe; let batch = self.port_op_access(port, Getter)?; @@ -158,7 +169,21 @@ impl Connector { Err(Poe::MultipleOpsOnPort) } } - // entrypoint for caller. overwrites round result enum, and returns what happened + + /// Participate in the completion of the next synchronous round, in which + /// the native component will perform the set of prepared operations of exactly one + /// of the synchronous batches. At the end of the procedure, the synchronous + /// batches will be reset to a singleton set, whose only element is selected, and empty. + /// The caller yields control over to the connector runtime to faciltiate the underlying + /// coordination work until either (a) the round is completed with all components' states + /// updated accordingly, (b) a distributed failure event resets all components' + /// states to what they were prior to the sync call, or (c) the sync procedure encounters + /// an unrecoverable error which ends the call early, and breaks the session and connector's + /// states irreversably. + /// Note that the (b) case necessitates the success of a distributed rollback procedure, + /// which this component may initiate, but cannot guarantee will succeed in time or at all. + /// consequently, the given timeout duration represents a duration in which the connector + /// will make a best effort to fail the round and return control flow to the caller. pub fn sync(&mut self, timeout: Option) -> Result { let Self { unphased: cu, phased } = self; match phased { @@ -1068,28 +1093,6 @@ impl SolutionStorage { new_local: Default::default(), } } - // fn is_clear(&self) -> bool { - // self.subtree_id_to_index.is_empty() - // && self.subtree_solutions.is_empty() - // && self.old_local.is_empty() - // && self.new_local.is_empty() - // } - // fn clear(&mut self) { - // self.subtree_id_to_index.clear(); - // self.subtree_solutions.clear(); - // self.old_local.clear(); - // self.new_local.clear(); - // } - // fn reset(&mut self, subtree_ids: impl Iterator) { - // self.subtree_id_to_index.clear(); - // self.subtree_solutions.clear(); - // self.old_local.clear(); - // self.new_local.clear(); - // for key in subtree_ids { - // self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); - // self.subtree_solutions.push(Default::default()) - // } - // } pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { let Self { old_local, new_local, .. } = self; new_local.drain().map(move |local| { @@ -1168,7 +1171,7 @@ impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { } } impl NonsyncProtoContext<'_> { - pub fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { + pub(crate) fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { // called by a PROTO COMPONENT. moves its own ports. // 1. sanity check: this component owns these ports // sanity check @@ -1195,7 +1198,7 @@ impl NonsyncProtoContext<'_> { } // 3. create a new component } - pub fn new_port_pair(&mut self) -> [PortId; 2] { + pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the proto component let mut new_cid_fn = || self.current_state.id_manager.new_port_id(); let [o, i] = [new_cid_fn(), new_cid_fn()];