diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index ae4c9985bebf731f91caeaf5bec0a4158c378339..e7db616c1ac626a4471c7c0ba5a446b86ad02755 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -37,6 +37,9 @@ struct CyclicDrainInner<'a, K: Eq + Hash, V> { swap: &'a mut HashMap, output: &'a mut HashMap, } +trait PayloadMsgSender { + fn send(&mut self, port_info: &PortInfo, putter: &PortId, msg: SendPayloadMsg); +} //////////////// impl Connector { @@ -122,8 +125,8 @@ impl Connector { } } } - - // TODO make cu immutable + // private function. mutates state but returns with round + // result ASAP (allows for convenient error return with ?) fn connected_sync( cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, @@ -131,13 +134,14 @@ impl Connector { ) -> Result, SyncError> { use SyncError as Se; let mut deadline = timeout.map(|to| Instant::now() + to); - // 1. run all proto components to Nonsync blockers log!( cu.logger, "~~~ SYNC called with timeout {:?}; starting round {}", &timeout, comm.round_index ); + + // 1. run all proto components to Nonsync blockers let mut branching_proto_components = HashMap::::default(); let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = @@ -262,7 +266,6 @@ impl Connector { branching_proto_components.len() ); for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() { - let ConnectorUnphased { port_info, proto_description, .. } = cu; let BranchingProtoComponent { ports, branches } = proto_component; let mut swap = HashMap::default(); let mut blocked = HashMap::default(); @@ -515,7 +518,6 @@ impl BranchingNative { &mut self, cu: &mut ConnectorUnphased, solution_storage: &mut SolutionStorage, - // payloads_to_get: &mut Vec<(PortId, CommMsgContents)>, getter: PortId, send_payload_msg: SendPayloadMsg, ) { @@ -617,7 +619,7 @@ impl BranchingProtoComponent { cd: CyclicDrainer, cu: &mut ConnectorUnphased, solution_storage: &mut SolutionStorage, - payloads_to_get: &mut Vec<(PortId, SendPayloadMsg)>, + payload_msg_sender: &mut impl PayloadMsgSender, proto_component_id: ProtoComponentId, ports: &HashSet, ) { @@ -683,9 +685,8 @@ impl BranchingProtoComponent { } else { // keep in "unblocked" log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); - let getter = *cu.port_info.peers.get(&putter).unwrap(); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - payloads_to_get.push((getter, msg)); + payload_msg_sender.send(&cu.port_info, &putter, msg); drainer.add_input(predicate, branch); } } @@ -697,7 +698,7 @@ impl BranchingProtoComponent { cu: &mut ConnectorUnphased, solution_storage: &mut SolutionStorage, proto_component_id: ProtoComponentId, - payloads_to_get: &mut Vec<(PortId, SendPayloadMsg)>, + payload_msg_sender: &mut impl PayloadMsgSender, getter: PortId, send_payload_msg: SendPayloadMsg, ) { @@ -756,7 +757,7 @@ impl BranchingProtoComponent { cd, cu, solution_storage, - payloads_to_get, + payload_msg_sender, proto_component_id, ports, ); @@ -806,7 +807,7 @@ impl SolutionStorage { self.old_local.clear(); self.new_local.clear(); } - pub(crate) fn reset(&mut self, subtree_ids: impl Iterator) { + fn reset(&mut self, subtree_ids: impl Iterator) { self.subtree_id_to_index.clear(); self.subtree_solutions.clear(); self.old_local.clear(); @@ -880,12 +881,18 @@ impl SolutionStorage { } } } +impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> { + fn send(&mut self, port_info: &PortInfo, putter: &PortId, msg: SendPayloadMsg) { + let getter = *port_info.peers.get(putter).unwrap(); + self.push((getter, msg)); + } +} impl SyncProtoContext<'_> { - pub fn is_firing(&mut self, port: PortId) -> Option { + pub(crate) fn is_firing(&mut self, port: PortId) -> Option { let var = self.port_info.firing_var_for(port); self.predicate.query(var) } - pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { self.inbox.get(&port) } }