diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 080249437130290a12be7caf72c4bf8183468cb7..5b8ce5558b6dfef3374b4b715c6f36bc60e8befd 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -130,11 +130,18 @@ impl Connector { } // entrypoint for caller. overwrites round result enum, and returns what happened pub fn sync(&mut self, timeout: Option) -> Result { - let Self { unphased, phased } = self; + let Self { unphased: cu, phased } = self; match phased { ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected), ConnectorPhased::Communication(comm) => { - comm.round_result = Self::connected_sync(unphased, comm, timeout); + match &comm.round_result { + Err(SyncError::Unrecoverable(e)) => { + log!(cu.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e); + return Err(SyncError::Unrecoverable(e.clone())); + } + _ => {} + } + comm.round_result = Self::connected_sync(cu, comm, timeout); comm.round_index += 1; match &comm.round_result { Ok(None) => unreachable!(), @@ -151,8 +158,9 @@ impl Connector { comm: &mut ConnectorCommunication, timeout: Option, ) -> Result, SyncError> { + ////////////////////////////////// use SyncError as Se; - // let deadline = timeout.map(|to| Instant::now() + to); + ////////////////////////////////// log!( cu.logger, "~~~ SYNC called with timeout {:?}; starting round {}", @@ -166,6 +174,7 @@ impl Connector { let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len()); + // drains unrun_components, and populates branching_proto_components. while let Some((proto_component_id, mut component)) = unrun_components.pop() { // TODO coalesce fields log!( @@ -209,6 +218,7 @@ impl Connector { branching_proto_components.len(), ); + // Create temp structures needed for the synchronous phase of the round let mut rctx = RoundCtx { solution_storage: { let n = std::iter::once(Route::LocalComponent(ComponentId::Native)); @@ -217,7 +227,8 @@ impl Connector { .keys() .map(|&id| Route::LocalComponent(ComponentId::Proto(id))); let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index }); - SolutionStorage::new(n.chain(c).chain(e)) + let route_iter = n.chain(c).chain(e); + SolutionStorage::new(route_iter) }, spec_var_stream: cu.id_manager.new_spec_var_stream(), getter_buffer: Default::default(), @@ -225,7 +236,7 @@ impl Connector { }; log!(cu.logger, "Round context structure initialized"); - // 2. kick off the native + // Explore all native branches eagerly. Find solutions, buffer messages, etc. log!( cu.logger, "Translating {} native batches into branches...", @@ -239,8 +250,7 @@ impl Connector { { let NativeBatch { to_get, to_put } = native_branch; let predicate = { - let mut predicate = - Predicate::default().inserted(native_branch_spec_var, branch_spec_val); + let mut predicate = Predicate::default(); // assign trues for ports that fire let firing_ports: HashSet = to_get.iter().chain(to_put.keys()).copied().collect(); @@ -248,7 +258,7 @@ impl Connector { let var = cu.port_info.spec_var_for(port); predicate.assigned.insert(var, SpecVal::FIRING); } - // assign falses for silent ports + // assign falses for all silent (not firing) ports for &port in cu.native_ports.difference(&firing_ports) { let var = cu.port_info.spec_var_for(port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { @@ -256,17 +266,18 @@ impl Connector { continue 'native_branches; } } - predicate + // this branch is consistent. distinguish it with a unique var:val mapping and proceed + predicate.inserted(native_branch_spec_var, branch_spec_val) }; log!(cu.logger, "Native branch index={:?} has consistent {:?}", index, &predicate); - - // put all messages + // send all outgoing messages (by buffering them) for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg); - rctx.getter_buffer.putter_add(cu, putter, msg)?; + rctx.getter_buffer.putter_add(cu, putter, msg); } if to_get.is_empty() { + // this branch is immediately ready to be part of a solution log!( cu.logger, "Native submitting solution for batch {} with {:?}", @@ -285,8 +296,9 @@ impl Connector { unreachable!() } } - // restore the invariant + // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); + // Call to another big method; keep running this round until a distributed decision is reached let decision = Self::sync_reach_decision( cu, comm, @@ -344,7 +356,7 @@ impl Connector { branching_native: &mut BranchingNative, branching_proto_components: &mut HashMap, rctx: &mut RoundCtx, - ) -> Result { + ) -> Result { let mut already_requested_failure = false; if branching_native.branches.is_empty() { log!(cu.logger, "Native starts with no branches! Failure!"); @@ -628,7 +640,7 @@ impl Connector { cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, parent: usize, - ) -> Result<(), SyncError> { + ) -> Result<(), UnrecoverableSyncError> { log!(cu.logger, "Forwarding to my parent {:?}", parent); let suggestion = Decision::Failure; let msg = Msg::CommMsg(CommMsg { @@ -747,10 +759,6 @@ impl BranchingNative { panic!("Native had no branches matching pred {:?}", solution_predicate); } } -// |putter, m| { -// let getter = *cu.port_info.peers.get(&putter).unwrap(); -// payloads_to_get.push((getter, m)); -// }, impl BranchingProtoComponent { fn drain_branches_to_blocked( cd: CyclicDrainer, @@ -758,7 +766,7 @@ impl BranchingProtoComponent { rctx: &mut RoundCtx, proto_component_id: ProtoComponentId, ports: &HashSet, - ) -> Result<(), SyncError> { + ) -> Result<(), UnrecoverableSyncError> { cd.cylic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { untaken_choice: &mut branch.untaken_choice, @@ -833,7 +841,7 @@ impl BranchingProtoComponent { // keep in "unblocked" log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - rctx.getter_buffer.putter_add(cu, putter, msg)?; + rctx.getter_buffer.putter_add(cu, putter, msg); drainer.add_input(predicate, branch); } } @@ -848,7 +856,7 @@ impl BranchingProtoComponent { proto_component_id: ProtoComponentId, getter: PortId, send_payload_msg: &SendPayloadMsg, - ) -> Result<(), SyncError> { + ) -> Result<(), UnrecoverableSyncError> { let logger = &mut *cu.logger; log!( logger, @@ -1045,17 +1053,12 @@ impl GetterBuffer { fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) { self.getters_and_sends.push((getter, msg)); } - fn putter_add( - &mut self, - cu: &mut ConnectorUnphased, - putter: PortId, - msg: SendPayloadMsg, - ) -> Result<(), SyncError> { + fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) { if let Some(&getter) = cu.port_info.peers.get(&putter) { self.getter_add(getter, msg); - Ok(()) } else { - Err(SyncError::MalformedStateError(MalformedStateError::GetterUnknownFor { putter })) + log!(cu.logger, "Putter {:?} has no known peer!", putter); + panic!("Putter {:?} has no known peer!"); } } } diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 3118126f12f8118c6e8219beda332e544c883cc8..0fd9398b3bc6f8fa76658af6e9a49fc1f9b450d2 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -75,9 +75,14 @@ impl EndpointManager { pub(super) fn num_endpoints(&self) -> usize { self.endpoint_exts.len() } - pub(super) fn send_to_comms(&mut self, index: usize, msg: &Msg) -> Result<(), SyncError> { + pub(super) fn send_to_comms( + &mut self, + index: usize, + msg: &Msg, + ) -> Result<(), UnrecoverableSyncError> { + use UnrecoverableSyncError as Use; let endpoint = &mut self.endpoint_exts[index].endpoint; - endpoint.send(msg).map_err(|_| SyncError::BrokenEndpoint(index)) + endpoint.send(msg).map_err(|_| Use::BrokenEndpoint(index)) } pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { let endpoint = &mut self.endpoint_exts[index].endpoint; @@ -89,13 +94,13 @@ impl EndpointManager { &mut self, logger: &mut dyn Logger, deadline: Option, - ) -> Result, SyncError> { - use {SyncError as Se, TryRecyAnyError as Trae}; + ) -> Result, UnrecoverableSyncError> { + use {TryRecyAnyError as Trae, UnrecoverableSyncError as Use}; match self.try_recv_any(logger, deadline) { Ok(tup) => Ok(Some(tup)), Err(Trae::Timeout) => Ok(None), - Err(Trae::PollFailed) => Err(Se::PollFailed), - Err(Trae::EndpointError { error: _, index }) => Err(Se::BrokenEndpoint(index)), + Err(Trae::PollFailed) => Err(Use::PollFailed), + Err(Trae::EndpointError { error: _, index }) => Err(Use::BrokenEndpoint(index)), } } pub(super) fn try_recv_any_setup( diff --git a/src/runtime/error.rs b/src/runtime/error.rs index c156a4ac99d9a11ae70d463086107a12beb90b89..c226b4368b276d20e7fa161f8300b96a7db46a35 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -24,13 +24,17 @@ pub enum AddComponentError { } //////////////////////// #[derive(Debug, Clone)] +pub enum UnrecoverableSyncError { + PollFailed, + BrokenEndpoint(usize), + MalformedStateError(MalformedStateError), +} +#[derive(Debug, Clone)] pub enum SyncError { NotConnected, InconsistentProtoComponent(ProtoComponentId), RoundFailure, - PollFailed, - BrokenEndpoint(usize), - MalformedStateError(MalformedStateError), + Unrecoverable(UnrecoverableSyncError), } #[derive(Debug, Clone)] pub enum MalformedStateError { @@ -65,3 +69,9 @@ pub enum NextBatchError { pub enum NewNetPortError { AlreadyConnected, } +///////////////////// +impl From for SyncError { + fn from(e: UnrecoverableSyncError) -> Self { + Self::Unrecoverable(e) + } +}