diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 09c99516ea20c2d9fa74789d4b088b30e6b68af6..8e364b07e78c33ae92abdbfb7082aa8e6d5136bf 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -28,16 +28,16 @@ impl Controller { Decision::Failure => Err(SyncErr::Timeout), }; let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); - for &child_ekey in self.inner.family.children_ekeys.iter() { + for &child_port in self.inner.family.children_ports.iter() { log!( &mut self.inner.logger, - "Forwarding {:?} to child with ekey {:?}", + "Forwarding {:?} to child with port {:?}", &announcement, - child_ekey + child_port ); self.inner .endpoint_exts - .get_mut(child_ekey) + .get_mut(child_port) .expect("eefef") .endpoint .send(announcement.clone())?; @@ -49,20 +49,20 @@ impl Controller { // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found fn handle_locals_maybe_decide(&mut self) -> Result { - if let Some(parent_ekey) = self.inner.family.parent_ekey { + if let Some(parent_port) = self.inner.family.parent_port { // I have a parent -> I'm not the leader let parent_endpoint = - &mut self.inner.endpoint_exts.get_mut(parent_ekey).expect("huu").endpoint; + &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { let msg = CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); - log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_ekey); + log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port); parent_endpoint.send(msg)?; } Ok(false) } else { // I have no parent -> I'm the leader - assert!(self.inner.family.parent_ekey.is_none()); + assert!(self.inner.family.parent_port.is_none()); let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); Ok(if let Some(predicate) = maybe_predicate { let decision = Decision::Success(predicate); @@ -79,19 +79,19 @@ impl Controller { &mut self, sync_batches: impl Iterator, ) -> Result { - let MonoN { ekeys, .. } = self.inner.mono_n.clone(); + let MonoN { ports, .. } = self.inner.mono_n.clone(); let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; let mut branches = HashMap::<_, _>::default(); for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { - let ekey_to_channel_id = |ekey| endpoint_exts.get(ekey).unwrap().info.channel_id; - let all_ekeys = ekeys.iter().copied(); - let all_channel_ids = all_ekeys.map(ekey_to_channel_id); + let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; + let all_ports = ports.iter().copied(); + let all_channel_ids = all_ports.map(port_to_channel_id); let mut predicate = Predicate::new_trivial(); // assign TRUE for puts and gets - let true_ekeys = puts.keys().chain(gets.iter()).copied(); - let true_channel_ids = true_ekeys.clone().map(ekey_to_channel_id); + let true_ports = puts.keys().chain(gets.iter()).copied(); + let true_channel_ids = true_ports.clone().map(port_to_channel_id); predicate.batch_assign_nones(true_channel_ids, true); // assign FALSE for all in interface not assigned true @@ -106,7 +106,7 @@ impl Controller { ) } let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; - for (ekey, payload) in puts { + for (port, payload) in puts { log!( &mut self.inner.logger, "... ... Initial native put msg {:?} pred {:?} batch {:?}", @@ -117,7 +117,7 @@ impl Controller { let msg = CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } .into_msg(*round_index); - endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?; + endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; } log!( &mut self.inner.logger, @@ -134,7 +134,7 @@ impl Controller { } branches.insert(predicate, branch); } - Ok(PolyN { ekeys, branches }) + Ok(PolyN { ports, branches }) } pub fn sync_round( &mut self, @@ -178,7 +178,7 @@ impl Controller { log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { let mut m_ctx = MonoPContext { - ekeys: &mut mono_p.ekeys, + ports: &mut mono_p.ports, mono_ps: &mut self.ephemeral.mono_ps, inner: &mut self.inner, }; @@ -197,25 +197,25 @@ impl Controller { self.ephemeral.poly_ps.len() ); - // 3. define the mapping from ekey -> actor + // 3. define the mapping from port -> actor // this is needed during the event loop to determine which actor // should receive the incoming message. // TODO: store and update this mapping rather than rebuilding it each round. - let ekey_to_holder: HashMap = { + let port_to_holder: HashMap = { use PolyId::*; - let n = self.inner.mono_n.ekeys.iter().map(move |&e| (e, N)); + let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); let p = self .ephemeral .poly_ps .iter() .enumerate() - .flat_map(|(index, m)| m.ekeys.iter().map(move |&e| (e, P { index }))); + .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); n.chain(p).collect() }; log!( &mut self.inner.logger, - "SET OF PolyPs and MonoPs final! ekey lookup map is {:?}", - &ekey_to_holder + "SET OF PolyPs and MonoPs final! port lookup map is {:?}", + &port_to_holder ); // 4. Create the solution storage. it tracks the solutions of "subtrees" @@ -226,9 +226,9 @@ impl Controller { let c = self .inner .family - .children_ekeys + .children_ports .iter() - .map(|&ekey| SubtreeId::ChildController { ekey }); + .map(|&port| SubtreeId::ChildController { port }); let subtree_id_iter = n.chain(m).chain(c); log!( &mut self.inner.logger, @@ -309,12 +309,12 @@ impl Controller { // timed out! send a FAILURE message to the sink, // and henceforth don't time out on polling. deadline = None; - match self.inner.family.parent_ekey { + match self.inner.family.parent_port { None => { // I am the sink! announce failure and return. return self.end_round_with_decision(Decision::Failure); } - Some(parent_ekey) => { + Some(parent_port) => { // I am not the sink! send a failure message. let announcement = Msg::CommMsg(CommMsg { round_index: self.inner.round_index, @@ -322,13 +322,13 @@ impl Controller { }); log!( &mut self.inner.logger, - "Forwarding {:?} to parent with ekey {:?}", + "Forwarding {:?} to parent with port {:?}", &announcement, - parent_ekey + parent_port ); self.inner .endpoint_exts - .get_mut(parent_ekey) + .get_mut(parent_port) .expect("ss") .endpoint .send(announcement.clone())?; @@ -364,7 +364,7 @@ impl Controller { Msg::CommMsg(CommMsg { contents, round_index }) => { log!( &mut self.inner.logger, - "... its a round-appropriate CommMsg with key {:?}", + "... its a round-appropriate CommMsg with port {:?}", received.recipient ); assert_eq!(round_index, self.inner.round_index); @@ -372,21 +372,21 @@ impl Controller { } }; match current_content { - CommMsgContents::Failure => match self.inner.family.parent_ekey { - Some(parent_ekey) => { + CommMsgContents::Failure => match self.inner.family.parent_port { + Some(parent_port) => { let announcement = Msg::CommMsg(CommMsg { round_index: self.inner.round_index, contents: CommMsgContents::Failure, }); log!( &mut self.inner.logger, - "Forwarding {:?} to parent with ekey {:?}", + "Forwarding {:?} to parent with port {:?}", &announcement, - parent_ekey + parent_port ); self.inner .endpoint_exts - .get_mut(parent_ekey) + .get_mut(parent_port) .expect("ss") .endpoint .send(announcement.clone())?; @@ -395,10 +395,10 @@ impl Controller { }, CommMsgContents::Elaborate { partial_oracle } => { // Child controller submitted a subtree solution. - if !self.inner.family.children_ekeys.contains(&received.recipient) { + if !self.inner.family.children_ports.contains(&received.recipient) { return Err(SyncErr::ElaborateFromNonChild); } - let subtree_id = SubtreeId::ChildController { ekey: received.recipient }; + let subtree_id = SubtreeId::ChildController { port: received.recipient }; log!( &mut self.inner.logger, "Received elaboration from child for subtree {:?}: {:?}", @@ -415,7 +415,7 @@ impl Controller { } } CommMsgContents::Announce { decision } => { - if self.inner.family.parent_ekey != Some(received.recipient) { + if self.inner.family.parent_port != Some(received.recipient) { return Err(SyncErr::AnnounceFromNonParent); } log!( @@ -435,7 +435,7 @@ impl Controller { // message for some actor. Feed it to the appropriate actor // and then give them another chance to run. - let subtree_id = ekey_to_holder.get(&received.recipient); + let subtree_id = port_to_holder.get(&received.recipient); log!( &mut self.inner.logger, "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", @@ -530,13 +530,13 @@ impl ControllerEphemeral { && self.poly_n.is_none() && self.poly_ps.is_empty() && self.mono_ps.is_empty() - && self.ekey_to_holder.is_empty() + && self.port_to_holder.is_empty() } fn clear(&mut self) { self.solution_storage.clear(); self.poly_n.take(); self.poly_ps.clear(); - self.ekey_to_holder.clear(); + self.port_to_holder.clear(); } } impl Into for MonoP { @@ -552,7 +552,7 @@ impl Into for MonoP { blocking_on: None, } }, - ekeys: self.ekeys, + ports: self.ports, } } } @@ -566,41 +566,41 @@ impl From for SyncErr { impl MonoContext for MonoPContext<'_> { type D = ProtocolD; type S = ProtocolS; - fn new_component(&mut self, moved_ekeys: HashSet, init_state: Self::S) { + fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { log!( &mut self.inner.logger, - "!! MonoContext callback to new_component with ekeys {:?}!", - &moved_ekeys, + "!! MonoContext callback to new_component with ports {:?}!", + &moved_ports, ); - if moved_ekeys.is_subset(self.ekeys) { - self.ekeys.retain(|x| !moved_ekeys.contains(x)); - self.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys }); + if moved_ports.is_subset(self.ports) { + self.ports.retain(|x| !moved_ports.contains(x)); + self.mono_ps.push(MonoP { state: init_state, ports: moved_ports }); } else { - panic!("MachineP attempting to move alien ekey!"); + panic!("MachineP attempting to move alien port!"); } } - fn new_channel(&mut self) -> [Key; 2] { + fn new_channel(&mut self) -> [Port; 2] { let [a, b] = Endpoint::new_memory_pair(); let channel_id = self.inner.channel_id_stream.next(); let mut clos = |endpoint, polarity| { let endpoint_ext = EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint }; - let ekey = self.inner.endpoint_exts.alloc(endpoint_ext); - let endpoint = &self.inner.endpoint_exts.get(ekey).unwrap().endpoint; - let token = Key::to_token(ekey); + let port = self.inner.endpoint_exts.alloc(endpoint_ext); + let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint; + let token = Port::to_token(port); self.inner .messenger_state .poll .register(endpoint, token, Ready::readable(), PollOpt::edge()) .expect("AAGAGGGGG"); - self.ekeys.insert(ekey); - ekey + self.ports.insert(port); + port }; let [kp, kg] = [clos(a, Putter), clos(b, Getter)]; log!( &mut self.inner.logger, - "!! MonoContext callback to new_channel. returning ekeys {:?}!", + "!! MonoContext callback to new_channel. returning ports {:?}!", [kp, kg], ); [kp, kg] @@ -713,9 +713,9 @@ impl SolutionStorage { impl PolyContext for BranchPContext<'_, '_> { type D = ProtocolD; - fn is_firing(&mut self, ekey: Key) -> Option { - assert!(self.ekeys.contains(&ekey)); - let channel_id = self.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + fn is_firing(&mut self, port: Port) -> Option { + assert!(self.ports.contains(&port)); + let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; let val = self.predicate.query(channel_id); log!( &mut self.m_ctx.inner.logger, @@ -725,9 +725,9 @@ impl PolyContext for BranchPContext<'_, '_> { ); val } - fn read_msg(&mut self, ekey: Key) -> Option<&Payload> { - assert!(self.ekeys.contains(&ekey)); - let val = self.inbox.get(&ekey); + fn read_msg(&mut self, port: Port) -> Option<&Payload> { + assert!(self.ports.contains(&port)); + let val = self.inbox.get(&port); log!( &mut self.m_ctx.inner.logger, "!! PolyContext callback to read_msg by {:?}! returning {:?}",