diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 87fe7756d1a7f8d1ee6370396576792e7c77b974..0d5acfc819d7d402dc50a6668c417039b1efbfbd 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -38,7 +38,12 @@ struct CyclicDrainInner<'a, K: Eq + Hash, V> { output: &'a mut HashMap, } trait PayloadMsgSender { - fn send(&mut self, port_info: &PortInfo, putter: &PortId, msg: SendPayloadMsg); + fn putter_send( + &mut self, + cu: &mut ConnectorUnphased, + putter: PortId, + msg: SendPayloadMsg, + ) -> Result<(), SyncError>; } //////////////// @@ -57,27 +62,6 @@ impl Connector { } } } - pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> { - use PortOpError::*; - let Self { unphased, phased } = self; - if !unphased.native_ports.contains(&port) { - return Err(PortUnavailable); - } - if Putter != *unphased.port_info.polarities.get(&port).unwrap() { - return Err(WrongPolarity); - } - match phased { - ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => { - let batch = native_batches.last_mut().unwrap(); - if batch.to_put.contains_key(&port) { - return Err(MultipleOpsOnPort); - } - batch.to_put.insert(port, payload); - Ok(()) - } - } - } pub fn next_batch(&mut self) -> Result { // returns index of new batch use NextBatchError::*; @@ -90,26 +74,48 @@ impl Connector { } } } - pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> { + fn port_op_access( + &mut self, + port: PortId, + expect_polarity: Polarity, + ) -> Result<&mut NativeBatch, PortOpError> { use PortOpError::*; let Self { unphased, phased } = self; if !unphased.native_ports.contains(&port) { return Err(PortUnavailable); } - if Getter != *unphased.port_info.polarities.get(&port).unwrap() { - return Err(WrongPolarity); + match unphased.port_info.polarities.get(&port) { + Some(p) if *p == expect_polarity => {} + Some(_) => return Err(WrongPolarity), + None => return Err(UnknownPolarity), } match phased { ConnectorPhased::Setup { .. } => Err(NotConnected), ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => { - let batch = native_batches.last_mut().unwrap(); - if !batch.to_get.insert(port) { - return Err(MultipleOpsOnPort); - } - Ok(()) + let batch = native_batches.last_mut().unwrap(); // length >= invariant + Ok(batch) } } } + pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> { + use PortOpError::*; + let batch = self.port_op_access(port, Putter)?; + if batch.to_put.contains_key(&port) { + Err(MultipleOpsOnPort) + } else { + batch.to_put.insert(port, payload); + Ok(()) + } + } + pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> { + use PortOpError::*; + let batch = self.port_op_access(port, Getter)?; + if batch.to_get.insert(port) { + Ok(()) + } else { + Err(MultipleOpsOnPort) + } + } // entrypoint for caller. overwrites round result enum, and returns what happened pub fn sync(&mut self, timeout: Option) -> Result { let Self { unphased, phased } = self; @@ -164,7 +170,7 @@ impl Connector { proto_component_ports: &mut cu .proto_components .get_mut(&proto_component_id) - .unwrap() + .unwrap() // unrun_components' keys originate from proto_components .ports, }; let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description); @@ -231,9 +237,7 @@ impl Connector { for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg); - // rely on invariant: sync batches respect port polarity - let getter = *cu.port_info.peers.get(&putter).unwrap(); - payloads_to_get.push((getter, msg)); + payloads_to_get.putter_send(cu, putter, msg)?; } if to_get.is_empty() { log!( @@ -276,7 +280,7 @@ impl Connector { &mut payloads_to_get, proto_component_id, ports, - ); + )?; // swap the blocked branches back std::mem::swap(&mut blocked, branches); } @@ -288,22 +292,30 @@ impl Connector { // drain payloads_to_get, sending them through endpoints / feeding them to components while let Some((getter, send_payload_msg)) = payloads_to_get.pop() { assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); - match cu.port_info.routes.get(&getter).unwrap() { - Route::Endpoint { index } => { + match cu.port_info.routes.get(&getter) { + None => { + log!( + cu.logger, + "Delivery to getter {:?} msg {:?} failed. Physical route unmapped!", + getter, + &send_payload_msg + ); + } + Some(Route::Endpoint { index }) => { let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, contents: CommMsgContents::SendPayload(send_payload_msg), }); - comm.endpoint_manager.send_to(*index, &msg).unwrap(); + comm.endpoint_manager.send_to_comms(*index, &msg)?; } - Route::LocalComponent(ComponentId::Native) => branching_native.feed_msg( + Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg( cu, &mut solution_storage, // &mut Pay getter, - send_payload_msg, + &send_payload_msg, ), - Route::LocalComponent(ComponentId::Proto(proto_component_id)) => { + Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => { if let Some(branching_component) = branching_proto_components.get_mut(proto_component_id) { @@ -315,8 +327,16 @@ impl Connector { proto_component_id, &mut payloads_to_get, getter, - send_payload_msg, - ) + &send_payload_msg, + )?; + } else { + log!( + cu.logger, + "Delivery to getter {:?} msg {:?} failed because {:?} isn't here", + getter, + &send_payload_msg, + proto_component_id + ); } } } @@ -334,7 +354,7 @@ impl Connector { round_index: comm.round_index, contents: CommMsgContents::Suggest { suggestion }, }); - comm.endpoint_manager.send_to(parent, &msg).unwrap(); + comm.endpoint_manager.send_to_comms(parent, &msg)?; } None => { log!(cu.logger, "No parent. Deciding on solution {:?}", &solution); @@ -363,7 +383,7 @@ impl Connector { suggestion: Decision::Failure, }, }); - comm.endpoint_manager.send_to(parent, &msg).unwrap(); + comm.endpoint_manager.send_to_comms(parent, &msg)?; } else { log!(cu.logger, "As the leader, deciding on timeout"); break 'undecided Decision::Failure; @@ -448,7 +468,7 @@ impl Connector { round_index: comm.round_index, contents: CommMsgContents::Suggest { suggestion }, }); - comm.endpoint_manager.send_to(parent, &msg).unwrap(); + comm.endpoint_manager.send_to_comms(parent, &msg)?; } } } @@ -493,9 +513,8 @@ impl Connector { &comm.neighborhood.children ); for &child in comm.neighborhood.children.iter() { - comm.endpoint_manager.send_to(child, &msg).unwrap(); + comm.endpoint_manager.send_to_comms(child, &msg)?; } - match decision { Decision::Failure => Err(Se::RoundFailure), Decision::Success(predicate) => { @@ -517,7 +536,7 @@ impl BranchingNative { cu: &mut ConnectorUnphased, solution_storage: &mut SolutionStorage, getter: PortId, - send_payload_msg: SendPayloadMsg, + send_payload_msg: &SendPayloadMsg, ) { log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); @@ -620,76 +639,77 @@ impl BranchingProtoComponent { payload_msg_sender: &mut impl PayloadMsgSender, proto_component_id: ProtoComponentId, ports: &HashSet, - ) { + ) -> Result<(), SyncError> { cd.cylic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { - logger: &mut *cu.logger, - predicate: &predicate, - port_info: &cu.port_info, - inbox: &branch.inbox, - }; - let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description); - log!( - cu.logger, - "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", - proto_component_id, - &predicate, - &blocker, - ); - use SyncBlocker as B; - match blocker { - B::Inconsistent => { - // branch is inconsistent. throw it away - drop((predicate, branch)); - } - B::SyncBlockEnd => { - // make concrete all variables - for &port in ports.iter() { - let var = cu.port_info.firing_var_for(port); - predicate.assigned.entry(var).or_insert(false); - } - // submit solution for this component - solution_storage.submit_and_digest_subtree_solution( - &mut *cu.logger, - Route::LocalComponent(ComponentId::Proto(proto_component_id)), - predicate.clone(), - ); - // move to "blocked" - drainer.add_output(predicate, branch); - } - B::CouldntReadMsg(port) => { - // move to "blocked" - assert!(!branch.inbox.contains_key(&port)); - drainer.add_output(predicate, branch); - } - B::CouldntCheckFiring(port) => { - // sanity check + logger: &mut *cu.logger, + predicate: &predicate, + port_info: &cu.port_info, + inbox: &branch.inbox, + }; + let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description); + log!( + cu.logger, + "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", + proto_component_id, + &predicate, + &blocker, + ); + use SyncBlocker as B; + match blocker { + B::Inconsistent => { + // branch is inconsistent. throw it away + drop((predicate, branch)); + } + B::SyncBlockEnd => { + // make concrete all variables + for &port in ports.iter() { let var = cu.port_info.firing_var_for(port); - assert!(predicate.query(var).is_none()); - // keep forks in "unblocked" - drainer.add_input(predicate.clone().inserted(var, false), branch.clone()); - drainer.add_input(predicate.inserted(var, true), branch); + predicate.assigned.entry(var).or_insert(false); } - B::PutMsg(putter, payload) => { - // sanity check - assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter)); - // overwrite assignment - let var = cu.port_info.firing_var_for(putter); - let was = predicate.assigned.insert(var, true); - if was == Some(false) { - log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); - // discard forever - drop((predicate, branch)); - } else { - // keep in "unblocked" - log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); - let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - payload_msg_sender.send(&cu.port_info, &putter, msg); - drainer.add_input(predicate, branch); - } + // submit solution for this component + solution_storage.submit_and_digest_subtree_solution( + &mut *cu.logger, + Route::LocalComponent(ComponentId::Proto(proto_component_id)), + predicate.clone(), + ); + // move to "blocked" + drainer.add_output(predicate, branch); + } + B::CouldntReadMsg(port) => { + // move to "blocked" + assert!(!branch.inbox.contains_key(&port)); + drainer.add_output(predicate, branch); + } + B::CouldntCheckFiring(port) => { + // sanity check + let var = cu.port_info.firing_var_for(port); + assert!(predicate.query(var).is_none()); + // keep forks in "unblocked" + drainer.add_input(predicate.clone().inserted(var, false), branch.clone()); + drainer.add_input(predicate.inserted(var, true), branch); + } + B::PutMsg(putter, payload) => { + // sanity check + assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter)); + // overwrite assignment + let var = cu.port_info.firing_var_for(putter); + let was = predicate.assigned.insert(var, true); + if was == Some(false) { + log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); + // discard forever + drop((predicate, branch)); + } else { + // keep in "unblocked" + log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); + let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; + payload_msg_sender.putter_send(cu, putter, msg)?; + drainer.add_input(predicate, branch); } } - }); + } + Ok(()) + }) } fn feed_msg( &mut self, @@ -698,8 +718,8 @@ impl BranchingProtoComponent { proto_component_id: ProtoComponentId, payload_msg_sender: &mut impl PayloadMsgSender, getter: PortId, - send_payload_msg: SendPayloadMsg, - ) { + send_payload_msg: &SendPayloadMsg, + ) -> Result<(), SyncError> { let logger = &mut *cu.logger; log!( logger, @@ -758,10 +778,11 @@ impl BranchingProtoComponent { payload_msg_sender, proto_component_id, ports, - ); + )?; // swap the blocked branches back std::mem::swap(&mut blocked, branches); log!(cu.logger, "component settles down with branches: {:?}", branches.keys()); + Ok(()) } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; @@ -880,9 +901,18 @@ impl SolutionStorage { } } impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> { - fn send(&mut self, port_info: &PortInfo, putter: &PortId, msg: SendPayloadMsg) { - let getter = *port_info.peers.get(putter).unwrap(); - self.push((getter, msg)); + fn putter_send( + &mut self, + cu: &mut ConnectorUnphased, + putter: PortId, + msg: SendPayloadMsg, + ) -> Result<(), SyncError> { + if let Some(&getter) = cu.port_info.peers.get(&putter) { + self.push((getter, msg)); + Ok(()) + } else { + Err(SyncError::MalformedStateError(MalformedStateError::GetterUnknownFor { putter })) + } } } impl SyncProtoContext<'_> { @@ -960,14 +990,18 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { ) -> Self { Self { input, inner: CyclicDrainInner { swap, output } } } - fn cylic_drain(self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>)) { + fn cylic_drain( + self, + mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>) -> Result<(), E>, + ) -> Result<(), E> { let Self { input, inner: CyclicDrainInner { swap, output } } = self; // assert!(swap.is_empty()); while !input.is_empty() { for (k, v) in input.drain() { - func(k, v, CyclicDrainInner { swap, output }) + func(k, v, CyclicDrainInner { swap, output })? } std::mem::swap(input, swap); } + Ok(()) } }