diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 080249437130290a12be7caf72c4bf8183468cb7..5b8ce5558b6dfef3374b4b715c6f36bc60e8befd 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -130,11 +130,18 @@ impl Connector { } // entrypoint for caller. overwrites round result enum, and returns what happened pub fn sync(&mut self, timeout: Option) -> Result { - let Self { unphased, phased } = self; + let Self { unphased: cu, phased } = self; match phased { ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected), ConnectorPhased::Communication(comm) => { - comm.round_result = Self::connected_sync(unphased, comm, timeout); + match &comm.round_result { + Err(SyncError::Unrecoverable(e)) => { + log!(cu.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e); + return Err(SyncError::Unrecoverable(e.clone())); + } + _ => {} + } + comm.round_result = Self::connected_sync(cu, comm, timeout); comm.round_index += 1; match &comm.round_result { Ok(None) => unreachable!(), @@ -151,8 +158,9 @@ impl Connector { comm: &mut ConnectorCommunication, timeout: Option, ) -> Result, SyncError> { + ////////////////////////////////// use SyncError as Se; - // let deadline = timeout.map(|to| Instant::now() + to); + ////////////////////////////////// log!( cu.logger, "~~~ SYNC called with timeout {:?}; starting round {}", @@ -166,6 +174,7 @@ impl Connector { let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len()); + // drains unrun_components, and populates branching_proto_components. while let Some((proto_component_id, mut component)) = unrun_components.pop() { // TODO coalesce fields log!( @@ -209,6 +218,7 @@ impl Connector { branching_proto_components.len(), ); + // Create temp structures needed for the synchronous phase of the round let mut rctx = RoundCtx { solution_storage: { let n = std::iter::once(Route::LocalComponent(ComponentId::Native)); @@ -217,7 +227,8 @@ impl Connector { .keys() .map(|&id| Route::LocalComponent(ComponentId::Proto(id))); let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index }); - SolutionStorage::new(n.chain(c).chain(e)) + let route_iter = n.chain(c).chain(e); + SolutionStorage::new(route_iter) }, spec_var_stream: cu.id_manager.new_spec_var_stream(), getter_buffer: Default::default(), @@ -225,7 +236,7 @@ impl Connector { }; log!(cu.logger, "Round context structure initialized"); - // 2. kick off the native + // Explore all native branches eagerly. Find solutions, buffer messages, etc. log!( cu.logger, "Translating {} native batches into branches...", @@ -239,8 +250,7 @@ impl Connector { { let NativeBatch { to_get, to_put } = native_branch; let predicate = { - let mut predicate = - Predicate::default().inserted(native_branch_spec_var, branch_spec_val); + let mut predicate = Predicate::default(); // assign trues for ports that fire let firing_ports: HashSet = to_get.iter().chain(to_put.keys()).copied().collect(); @@ -248,7 +258,7 @@ impl Connector { let var = cu.port_info.spec_var_for(port); predicate.assigned.insert(var, SpecVal::FIRING); } - // assign falses for silent ports + // assign falses for all silent (not firing) ports for &port in cu.native_ports.difference(&firing_ports) { let var = cu.port_info.spec_var_for(port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { @@ -256,17 +266,18 @@ impl Connector { continue 'native_branches; } } - predicate + // this branch is consistent. distinguish it with a unique var:val mapping and proceed + predicate.inserted(native_branch_spec_var, branch_spec_val) }; log!(cu.logger, "Native branch index={:?} has consistent {:?}", index, &predicate); - - // put all messages + // send all outgoing messages (by buffering them) for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg); - rctx.getter_buffer.putter_add(cu, putter, msg)?; + rctx.getter_buffer.putter_add(cu, putter, msg); } if to_get.is_empty() { + // this branch is immediately ready to be part of a solution log!( cu.logger, "Native submitting solution for batch {} with {:?}", @@ -285,8 +296,9 @@ impl Connector { unreachable!() } } - // restore the invariant + // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); + // Call to another big method; keep running this round until a distributed decision is reached let decision = Self::sync_reach_decision( cu, comm, @@ -344,7 +356,7 @@ impl Connector { branching_native: &mut BranchingNative, branching_proto_components: &mut HashMap, rctx: &mut RoundCtx, - ) -> Result { + ) -> Result { let mut already_requested_failure = false; if branching_native.branches.is_empty() { log!(cu.logger, "Native starts with no branches! Failure!"); @@ -628,7 +640,7 @@ impl Connector { cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, parent: usize, - ) -> Result<(), SyncError> { + ) -> Result<(), UnrecoverableSyncError> { log!(cu.logger, "Forwarding to my parent {:?}", parent); let suggestion = Decision::Failure; let msg = Msg::CommMsg(CommMsg { @@ -747,10 +759,6 @@ impl BranchingNative { panic!("Native had no branches matching pred {:?}", solution_predicate); } } -// |putter, m| { -// let getter = *cu.port_info.peers.get(&putter).unwrap(); -// payloads_to_get.push((getter, m)); -// }, impl BranchingProtoComponent { fn drain_branches_to_blocked( cd: CyclicDrainer, @@ -758,7 +766,7 @@ impl BranchingProtoComponent { rctx: &mut RoundCtx, proto_component_id: ProtoComponentId, ports: &HashSet, - ) -> Result<(), SyncError> { + ) -> Result<(), UnrecoverableSyncError> { cd.cylic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { untaken_choice: &mut branch.untaken_choice, @@ -833,7 +841,7 @@ impl BranchingProtoComponent { // keep in "unblocked" log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - rctx.getter_buffer.putter_add(cu, putter, msg)?; + rctx.getter_buffer.putter_add(cu, putter, msg); drainer.add_input(predicate, branch); } } @@ -848,7 +856,7 @@ impl BranchingProtoComponent { proto_component_id: ProtoComponentId, getter: PortId, send_payload_msg: &SendPayloadMsg, - ) -> Result<(), SyncError> { + ) -> Result<(), UnrecoverableSyncError> { let logger = &mut *cu.logger; log!( logger, @@ -1045,17 +1053,12 @@ impl GetterBuffer { fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) { self.getters_and_sends.push((getter, msg)); } - fn putter_add( - &mut self, - cu: &mut ConnectorUnphased, - putter: PortId, - msg: SendPayloadMsg, - ) -> Result<(), SyncError> { + fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) { if let Some(&getter) = cu.port_info.peers.get(&putter) { self.getter_add(getter, msg); - Ok(()) } else { - Err(SyncError::MalformedStateError(MalformedStateError::GetterUnknownFor { putter })) + log!(cu.logger, "Putter {:?} has no known peer!", putter); + panic!("Putter {:?} has no known peer!"); } } }