From dedf89df1602f8d140e898e20aa96b2251188319 2020-05-28 11:14:52 From: Christopher Esterhuyse Date: 2020-05-28 11:14:52 Subject: [PATCH] retiring `Endpoint Key` term in favor of port --- diff --git a/src/common.rs b/src/common.rs index 8fd148c70ac89a5c07a4fefbf7a129bd40fedd55..7bfe716bc3725604ad1f7b3b4debf49a825e3337 100644 --- a/src/common.rs +++ b/src/common.rs @@ -24,7 +24,6 @@ pub use Polarity::*; ///////////////////// DEFS ///////////////////// -// pub type Payload = Vec; pub type ControllerId = u32; pub type ChannelIndex = u32; @@ -47,7 +46,6 @@ pub enum Polarity { #[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone)] #[repr(C)] pub struct Port(pub usize); // ports are COPY -pub type Key = Port; #[derive(Eq, PartialEq, Copy, Clone, Debug)] pub enum MainComponentErr { @@ -64,7 +62,7 @@ pub trait ProtocolDescription: Sized { fn parse(pdl: &[u8]) -> Result; fn component_polarities(&self, identifier: &[u8]) -> Result, MainComponentErr>; - fn new_main_component(&self, identifier: &[u8], ports: &[Key]) -> Self::S; + fn new_main_component(&self, identifier: &[u8], ports: &[Port]) -> Self::S; } pub trait ComponentState: Sized + Clone { @@ -93,24 +91,24 @@ pub enum MonoBlocker { pub enum PolyBlocker { Inconsistent, SyncBlockEnd, - CouldntReadMsg(Key), - CouldntCheckFiring(Key), - PutMsg(Key, Payload), + CouldntReadMsg(Port), + CouldntCheckFiring(Port), + PutMsg(Port, Payload), } pub trait MonoContext { type D: ProtocolDescription; type S: ComponentState; - fn new_component(&mut self, moved_keys: HashSet, init_state: Self::S); - fn new_channel(&mut self) -> [Key; 2]; + fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S); + fn new_channel(&mut self) -> [Port; 2]; fn new_random(&mut self) -> u64; } pub trait PolyContext { type D: ProtocolDescription; - fn is_firing(&mut self, ekey: Key) -> Option; - fn read_msg(&mut self, ekey: Key) -> Option<&Payload>; + fn is_firing(&mut self, port: Port) -> Option; + fn read_msg(&mut self, port: Port) -> Option<&Payload>; } ///////////////////// IMPL ///////////////////// @@ -152,7 +150,7 @@ impl Debug for Port { write!(f, "Port({})", self.0) } } -impl Key { +impl Port { pub fn from_raw(raw: usize) -> Self { Self(raw) } diff --git a/src/protocol/eval.rs b/src/protocol/eval.rs index 3ad9110d69958a3f3eb4957f91c116ead9536998..4fa17c5a2eede0bf1999d491f20214461f3fb0b3 100644 --- a/src/protocol/eval.rs +++ b/src/protocol/eval.rs @@ -886,7 +886,7 @@ impl Display for Value { } #[derive(Debug, Clone)] -pub struct InputValue(pub Key); +pub struct InputValue(pub Port); impl Display for InputValue { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { @@ -911,7 +911,7 @@ impl ValueImpl for InputValue { } #[derive(Debug, Clone)] -pub struct OutputValue(pub Key); +pub struct OutputValue(pub Port); impl Display for OutputValue { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { diff --git a/src/protocol/inputsource.rs b/src/protocol/inputsource.rs index ec3e5020ef3a1c3969d390e4919f55e047a48820..f1e2c82c78856bbfc4ec21cb1c7909dcd6e9808b 100644 --- a/src/protocol/inputsource.rs +++ b/src/protocol/inputsource.rs @@ -21,10 +21,10 @@ primitive forward(in i, out o) { primitive sync(in i, out o) { while(true) synchronous() if(fires(i)) put(o, get(i)); } -primitive alternator_2(in i, out a, out b) { +primitive alternator_2(in i, out l, out r) { while(true) { - synchronous() put(a, get(i)); - synchronous() put(b, get(i)); + synchronous() put(l, get(i)); + synchronous() put(r, get(i)); } } primitive replicator_2(in i, out l, out r) { @@ -43,7 +43,7 @@ primitive merger_2(in l, in r, out o) { impl InputSource { // Constructors - pub fn new(filename: S, reader: &mut A) -> io::Result { + pub fn new(filename: S, reader: &mut R) -> io::Result { let mut vec = STD_LIB_PDL.to_vec(); reader.read_to_end(&mut vec)?; Ok(InputSource { diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 7ce0b5ff2e22c6041368d8c1da25329d2e34795d..8eaf5e1398e1462da4be7eace773cb7c61443368 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -80,7 +80,7 @@ impl ProtocolDescription for ProtocolDescriptionImpl { } Ok(result) } - fn new_main_component(&self, identifier: &[u8], ports: &[Key]) -> ComponentStateImpl { + fn new_main_component(&self, identifier: &[u8], ports: &[Port]) -> ComponentStateImpl { let mut args = Vec::new(); for (&x, y) in ports.iter().zip(self.component_polarities(identifier).unwrap()) { match y { @@ -160,31 +160,31 @@ impl ComponentState for ComponentStateImpl { // Not possible to create component in sync block EvalContinuation::NewComponent(_, _) => unreachable!(), EvalContinuation::BlockFires(port) => match port { - Value::Output(OutputValue(key)) => { - return PolyBlocker::CouldntCheckFiring(key); + Value::Output(OutputValue(port)) => { + return PolyBlocker::CouldntCheckFiring(port); } - Value::Input(InputValue(key)) => { - return PolyBlocker::CouldntCheckFiring(key); + Value::Input(InputValue(port)) => { + return PolyBlocker::CouldntCheckFiring(port); } _ => unreachable!(), }, EvalContinuation::BlockGet(port) => match port { - Value::Output(OutputValue(key)) => { - return PolyBlocker::CouldntReadMsg(key); + Value::Output(OutputValue(port)) => { + return PolyBlocker::CouldntReadMsg(port); } - Value::Input(InputValue(key)) => { - return PolyBlocker::CouldntReadMsg(key); + Value::Input(InputValue(port)) => { + return PolyBlocker::CouldntReadMsg(port); } _ => unreachable!(), }, EvalContinuation::Put(port, message) => { - let key; + let value; match port { - Value::Output(OutputValue(the_key)) => { - key = the_key; + Value::Output(OutputValue(port_value)) => { + value = port_value; } - Value::Input(InputValue(the_key)) => { - key = the_key; + Value::Input(InputValue(port_value)) => { + value = port_value; } _ => unreachable!(), } @@ -200,7 +200,7 @@ impl ComponentState for ComponentStateImpl { } _ => unreachable!(), } - return PolyBlocker::PutMsg(key, payload); + return PolyBlocker::PutMsg(value, payload); } }, } @@ -214,30 +214,30 @@ pub enum EvalContext<'a> { None, } impl EvalContext<'_> { - fn random(&mut self) -> LongValue { - match self { - EvalContext::None => unreachable!(), - EvalContext::Mono(context) => todo!(), - EvalContext::Poly(_) => unreachable!(), - } - } + // fn random(&mut self) -> LongValue { + // match self { + // EvalContext::None => unreachable!(), + // EvalContext::Mono(_context) => todo!(), + // EvalContext::Poly(_) => unreachable!(), + // } + // } fn new_component(&mut self, args: &[Value], init_state: ComponentStateImpl) -> () { match self { EvalContext::None => unreachable!(), EvalContext::Mono(context) => { - let mut moved_keys = HashSet::new(); + let mut moved_ports = HashSet::new(); for arg in args.iter() { match arg { - Value::Output(OutputValue(key)) => { - moved_keys.insert(*key); + Value::Output(OutputValue(port)) => { + moved_ports.insert(*port); } - Value::Input(InputValue(key)) => { - moved_keys.insert(*key); + Value::Input(InputValue(port)) => { + moved_ports.insert(*port); } _ => {} } } - context.new_component(moved_keys, init_state) + context.new_component(moved_ports, init_state) } EvalContext::Poly(_) => unreachable!(), } @@ -259,8 +259,8 @@ impl EvalContext<'_> { EvalContext::None => unreachable!(), EvalContext::Mono(_) => unreachable!(), EvalContext::Poly(context) => match port { - Value::Output(OutputValue(key)) => context.is_firing(key).map(Value::from), - Value::Input(InputValue(key)) => context.is_firing(key).map(Value::from), + Value::Output(OutputValue(port)) => context.is_firing(port).map(Value::from), + Value::Input(InputValue(port)) => context.is_firing(port).map(Value::from), _ => unreachable!(), }, } @@ -270,10 +270,12 @@ impl EvalContext<'_> { EvalContext::None => unreachable!(), EvalContext::Mono(_) => unreachable!(), EvalContext::Poly(context) => match port { - Value::Output(OutputValue(key)) => { - context.read_msg(key).map(Value::receive_message) + Value::Output(OutputValue(port)) => { + context.read_msg(port).map(Value::receive_message) + } + Value::Input(InputValue(port)) => { + context.read_msg(port).map(Value::receive_message) } - Value::Input(InputValue(key)) => context.read_msg(key).map(Value::receive_message), _ => unreachable!(), }, } diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index d47ad39644c7b60806e31e35bdf2f692227d0704..0f8cd88f76b65de172c197a066c6308719cd6230 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -3,37 +3,37 @@ use crate::runtime::{endpoint::*, *}; #[derive(Debug, Clone)] pub(crate) struct MonoN { - pub ekeys: HashSet, - pub result: Option<(usize, HashMap)>, + pub ports: HashSet, + pub result: Option<(usize, HashMap)>, } #[derive(Debug)] pub(crate) struct PolyN { - pub ekeys: HashSet, + pub ports: HashSet, pub branches: HashMap, } #[derive(Debug, Clone)] pub(crate) struct BranchN { - pub to_get: HashSet, - pub gotten: HashMap, + pub to_get: HashSet, + pub gotten: HashMap, pub sync_batch_index: usize, } #[derive(Debug, Clone)] pub struct MonoP { pub state: ProtocolS, - pub ekeys: HashSet, + pub ports: HashSet, } #[derive(Debug)] pub(crate) struct PolyP { pub incomplete: HashMap, pub complete: HashMap, - pub ekeys: HashSet, + pub ports: HashSet, } #[derive(Debug, Clone)] pub(crate) struct BranchP { - pub blocking_on: Option, - pub outbox: HashMap, - pub inbox: HashMap, + pub blocking_on: Option, + pub outbox: HashMap, + pub inbox: HashMap, pub state: ProtocolS, } @@ -60,7 +60,7 @@ impl PolyP { 'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() { let mut r_ctx = BranchPContext { m_ctx: m_ctx.reborrow(), - ekeys: &self.ekeys, + ports: &self.ports, predicate: &predicate, inbox: &branch.inbox, }; @@ -75,10 +75,10 @@ impl PolyP { ); match blocker { Sb::Inconsistent => {} // DROP - Sb::CouldntReadMsg(ekey) => { - assert!(self.ekeys.contains(&ekey)); + Sb::CouldntReadMsg(port) => { + assert!(self.ports.contains(&port)); let channel_id = - r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + r_ctx.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; log!( &mut r_ctx.m_ctx.inner.logger, "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}", @@ -90,17 +90,17 @@ impl PolyP { // don't rerun now. Rerun at next `sync_run` log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,); - branch.blocking_on = Some(ekey); + branch.blocking_on = Some(port); self.incomplete.insert(predicate, branch); } else { log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,); } // ELSE DROP } - Sb::CouldntCheckFiring(ekey) => { - assert!(self.ekeys.contains(&ekey)); + Sb::CouldntCheckFiring(port) => { + assert!(self.ports.contains(&port)); let channel_id = - r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + r_ctx.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; // split the branch! let branch_f = branch.clone(); let mut predicate_f = predicate.clone(); @@ -121,10 +121,10 @@ impl PolyP { ); // come up with the predicate for this local solution - for ekey in self.ekeys.iter() { - let channel_id = endpoint_exts.get(*ekey).unwrap().info.channel_id; + for port in self.ports.iter() { + let channel_id = endpoint_exts.get(*port).unwrap().info.channel_id; let fired = - branch.inbox.contains_key(ekey) || branch.outbox.contains_key(ekey); + branch.inbox.contains_key(port) || branch.outbox.contains_key(port); match predicate.query(channel_id) { Some(true) => { if !fired { @@ -142,8 +142,8 @@ impl PolyP { println!( "pred {:#?} in {:#?} out {:#?}", &predicate, - branch.inbox.get(ekey), - branch.outbox.get(ekey) + branch.inbox.get(port), + branch.outbox.get(port) ); panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had Some(false)!" ,channel_id) } @@ -154,8 +154,8 @@ impl PolyP { println!( "pred {:#?} in {:#?} out {:#?}", &predicate, - branch.inbox.get(ekey), - branch.outbox.get(ekey) + branch.inbox.get(port), + branch.outbox.get(port) ); panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had None!" ,channel_id) } @@ -170,12 +170,12 @@ impl PolyP { ); self.complete.insert(predicate, branch); } - Sb::PutMsg(ekey, payload) => { - assert!(self.ekeys.contains(&ekey)); + Sb::PutMsg(port, payload) => { + assert!(self.ports.contains(&port)); let EndpointExt { info, endpoint } = - m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap(); + m_ctx.inner.endpoint_exts.get_mut(port).unwrap(); if predicate.replace_assignment(info.channel_id, true) != Some(false) { - branch.outbox.insert(ekey, payload.clone()); + branch.outbox.insert(port, payload.clone()); let msg = CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload, @@ -185,7 +185,7 @@ impl PolyP { &mut m_ctx.inner.logger, "~ ... ... PolyP sending msg {:?} to {:?} ({:?}) now!", &msg, - ekey, + port, (info.channel_id.controller_id, info.channel_id.channel_index), ); endpoint.send(msg)?; @@ -211,7 +211,7 @@ impl PolyP { &mut self, m_ctx: PolyPContext, protocol_description: &ProtocolD, - ekey: Key, + port: Port, payload_predicate: Predicate, payload: Payload, ) -> Result { @@ -233,8 +233,8 @@ impl PolyP { "... poly_recv_run matched running machine exactly! pred is {:?}", &payload_predicate ); - branch.inbox.insert(ekey, payload); - if branch.blocking_on == Some(ekey) { + branch.inbox.insert(port, payload); + if branch.blocking_on == Some(port) { branch.blocking_on = None; vec![(payload_predicate, branch)] } else { @@ -260,12 +260,12 @@ impl PolyP { ); // old_predicate COVERS the assumptions of payload_predicate - if let Some(prev_payload) = branch.inbox.get(&ekey) { + if let Some(prev_payload) = branch.inbox.get(&port) { // Incorrect to receive two distinct messages in same branch! assert_eq!(prev_payload, &payload); } - branch.inbox.insert(ekey, payload.clone()); - if branch.blocking_on == Some(ekey) { + branch.inbox.insert(port, payload.clone()); + if branch.blocking_on == Some(port) { // run. branch.blocking_on = None; Some((old_predicate, branch)) @@ -285,15 +285,15 @@ impl PolyP { ); // payload_predicate has new assumptions. FORK! let mut payload_branch = branch.clone(); - if let Some(prev_payload) = payload_branch.inbox.get(&ekey) { + if let Some(prev_payload) = payload_branch.inbox.get(&port) { // Incorrect to receive two distinct messages in same branch! assert_eq!(prev_payload, &payload); } - payload_branch.inbox.insert(ekey, payload.clone()); + payload_branch.inbox.insert(port, payload.clone()); // put the original back untouched incomplete2.insert(old_predicate, branch); - if payload_branch.blocking_on == Some(ekey) { + if payload_branch.blocking_on == Some(port) { // run the fork payload_branch.blocking_on = None; Some((new, payload_branch)) @@ -312,15 +312,15 @@ impl PolyP { ); // payload_predicate has new assumptions. FORK! let mut payload_branch = branch.clone(); - if let Some(prev_payload) = payload_branch.inbox.get(&ekey) { + if let Some(prev_payload) = payload_branch.inbox.get(&port) { // Incorrect to receive two distinct messages in same branch! assert_eq!(prev_payload, &payload); } - payload_branch.inbox.insert(ekey, payload.clone()); + payload_branch.inbox.insert(port, payload.clone()); // put the original back untouched incomplete2.insert(old_predicate.clone(), branch); - if payload_branch.blocking_on == Some(ekey) { + if payload_branch.blocking_on == Some(port) { // run the fork payload_branch.blocking_on = None; Some((payload_predicate.clone(), payload_branch)) @@ -359,14 +359,14 @@ impl PolyP { self.complete .iter() .find(|(p, _)| decision.satisfies(p)) - .map(|(_, branch)| MonoP { state: branch.state.clone(), ekeys: self.ekeys.clone() }) + .map(|(_, branch)| MonoP { state: branch.state.clone(), ports: self.ports.clone() }) } } impl PolyN { pub fn sync_recv( &mut self, - ekey: Key, + port: Port, logger: &mut String, payload: Payload, payload_predicate: Predicate, @@ -399,16 +399,16 @@ impl PolyN { Csr::Nonexistant => { /* skip branch */ } Csr::LatterNotFormer | Csr::Equivalent => { // Feed the message to this branch in-place. no need to modify pred. - if branch.to_get.remove(&ekey) { - branch.gotten.insert(ekey, payload.clone()); + if branch.to_get.remove(&port) { + branch.gotten.insert(port, payload.clone()); report_if_solution(&branch, &old_predicate, logger); } } Csr::FormerNotLatter => { // create a new branch with the payload_predicate. let mut forked = branch.clone(); - if forked.to_get.remove(&ekey) { - forked.gotten.insert(ekey, payload.clone()); + if forked.to_get.remove(&port) { + forked.gotten.insert(port, payload.clone()); report_if_solution(&forked, &payload_predicate, logger); branches2.insert(payload_predicate.clone(), forked); } @@ -416,8 +416,8 @@ impl PolyN { Csr::New(new) => { // create a new branch with the newly-created predicate let mut forked = branch.clone(); - if forked.to_get.remove(&ekey) { - forked.gotten.insert(ekey, payload.clone()); + if forked.to_get.remove(&port) { + forked.gotten.insert(port, payload.clone()); report_if_solution(&forked, &new, logger); branches2.insert(new.clone(), forked); } @@ -436,7 +436,7 @@ impl PolyN { .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p)) .map(|(_, branch)| { let BranchN { gotten, sync_batch_index, .. } = branch.clone(); - MonoN { ekeys: self.ekeys.clone(), result: Some((sync_batch_index, gotten)) } + MonoN { ports: self.ports.clone(), result: Some((sync_batch_index, gotten)) } }) } } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 09c99516ea20c2d9fa74789d4b088b30e6b68af6..8e364b07e78c33ae92abdbfb7082aa8e6d5136bf 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -28,16 +28,16 @@ impl Controller { Decision::Failure => Err(SyncErr::Timeout), }; let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); - for &child_ekey in self.inner.family.children_ekeys.iter() { + for &child_port in self.inner.family.children_ports.iter() { log!( &mut self.inner.logger, - "Forwarding {:?} to child with ekey {:?}", + "Forwarding {:?} to child with port {:?}", &announcement, - child_ekey + child_port ); self.inner .endpoint_exts - .get_mut(child_ekey) + .get_mut(child_port) .expect("eefef") .endpoint .send(announcement.clone())?; @@ -49,20 +49,20 @@ impl Controller { // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found fn handle_locals_maybe_decide(&mut self) -> Result { - if let Some(parent_ekey) = self.inner.family.parent_ekey { + if let Some(parent_port) = self.inner.family.parent_port { // I have a parent -> I'm not the leader let parent_endpoint = - &mut self.inner.endpoint_exts.get_mut(parent_ekey).expect("huu").endpoint; + &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { let msg = CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); - log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_ekey); + log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port); parent_endpoint.send(msg)?; } Ok(false) } else { // I have no parent -> I'm the leader - assert!(self.inner.family.parent_ekey.is_none()); + assert!(self.inner.family.parent_port.is_none()); let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); Ok(if let Some(predicate) = maybe_predicate { let decision = Decision::Success(predicate); @@ -79,19 +79,19 @@ impl Controller { &mut self, sync_batches: impl Iterator, ) -> Result { - let MonoN { ekeys, .. } = self.inner.mono_n.clone(); + let MonoN { ports, .. } = self.inner.mono_n.clone(); let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; let mut branches = HashMap::<_, _>::default(); for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { - let ekey_to_channel_id = |ekey| endpoint_exts.get(ekey).unwrap().info.channel_id; - let all_ekeys = ekeys.iter().copied(); - let all_channel_ids = all_ekeys.map(ekey_to_channel_id); + let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; + let all_ports = ports.iter().copied(); + let all_channel_ids = all_ports.map(port_to_channel_id); let mut predicate = Predicate::new_trivial(); // assign TRUE for puts and gets - let true_ekeys = puts.keys().chain(gets.iter()).copied(); - let true_channel_ids = true_ekeys.clone().map(ekey_to_channel_id); + let true_ports = puts.keys().chain(gets.iter()).copied(); + let true_channel_ids = true_ports.clone().map(port_to_channel_id); predicate.batch_assign_nones(true_channel_ids, true); // assign FALSE for all in interface not assigned true @@ -106,7 +106,7 @@ impl Controller { ) } let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; - for (ekey, payload) in puts { + for (port, payload) in puts { log!( &mut self.inner.logger, "... ... Initial native put msg {:?} pred {:?} batch {:?}", @@ -117,7 +117,7 @@ impl Controller { let msg = CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } .into_msg(*round_index); - endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?; + endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; } log!( &mut self.inner.logger, @@ -134,7 +134,7 @@ impl Controller { } branches.insert(predicate, branch); } - Ok(PolyN { ekeys, branches }) + Ok(PolyN { ports, branches }) } pub fn sync_round( &mut self, @@ -178,7 +178,7 @@ impl Controller { log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { let mut m_ctx = MonoPContext { - ekeys: &mut mono_p.ekeys, + ports: &mut mono_p.ports, mono_ps: &mut self.ephemeral.mono_ps, inner: &mut self.inner, }; @@ -197,25 +197,25 @@ impl Controller { self.ephemeral.poly_ps.len() ); - // 3. define the mapping from ekey -> actor + // 3. define the mapping from port -> actor // this is needed during the event loop to determine which actor // should receive the incoming message. // TODO: store and update this mapping rather than rebuilding it each round. - let ekey_to_holder: HashMap = { + let port_to_holder: HashMap = { use PolyId::*; - let n = self.inner.mono_n.ekeys.iter().map(move |&e| (e, N)); + let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); let p = self .ephemeral .poly_ps .iter() .enumerate() - .flat_map(|(index, m)| m.ekeys.iter().map(move |&e| (e, P { index }))); + .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); n.chain(p).collect() }; log!( &mut self.inner.logger, - "SET OF PolyPs and MonoPs final! ekey lookup map is {:?}", - &ekey_to_holder + "SET OF PolyPs and MonoPs final! port lookup map is {:?}", + &port_to_holder ); // 4. Create the solution storage. it tracks the solutions of "subtrees" @@ -226,9 +226,9 @@ impl Controller { let c = self .inner .family - .children_ekeys + .children_ports .iter() - .map(|&ekey| SubtreeId::ChildController { ekey }); + .map(|&port| SubtreeId::ChildController { port }); let subtree_id_iter = n.chain(m).chain(c); log!( &mut self.inner.logger, @@ -309,12 +309,12 @@ impl Controller { // timed out! send a FAILURE message to the sink, // and henceforth don't time out on polling. deadline = None; - match self.inner.family.parent_ekey { + match self.inner.family.parent_port { None => { // I am the sink! announce failure and return. return self.end_round_with_decision(Decision::Failure); } - Some(parent_ekey) => { + Some(parent_port) => { // I am not the sink! send a failure message. let announcement = Msg::CommMsg(CommMsg { round_index: self.inner.round_index, @@ -322,13 +322,13 @@ impl Controller { }); log!( &mut self.inner.logger, - "Forwarding {:?} to parent with ekey {:?}", + "Forwarding {:?} to parent with port {:?}", &announcement, - parent_ekey + parent_port ); self.inner .endpoint_exts - .get_mut(parent_ekey) + .get_mut(parent_port) .expect("ss") .endpoint .send(announcement.clone())?; @@ -364,7 +364,7 @@ impl Controller { Msg::CommMsg(CommMsg { contents, round_index }) => { log!( &mut self.inner.logger, - "... its a round-appropriate CommMsg with key {:?}", + "... its a round-appropriate CommMsg with port {:?}", received.recipient ); assert_eq!(round_index, self.inner.round_index); @@ -372,21 +372,21 @@ impl Controller { } }; match current_content { - CommMsgContents::Failure => match self.inner.family.parent_ekey { - Some(parent_ekey) => { + CommMsgContents::Failure => match self.inner.family.parent_port { + Some(parent_port) => { let announcement = Msg::CommMsg(CommMsg { round_index: self.inner.round_index, contents: CommMsgContents::Failure, }); log!( &mut self.inner.logger, - "Forwarding {:?} to parent with ekey {:?}", + "Forwarding {:?} to parent with port {:?}", &announcement, - parent_ekey + parent_port ); self.inner .endpoint_exts - .get_mut(parent_ekey) + .get_mut(parent_port) .expect("ss") .endpoint .send(announcement.clone())?; @@ -395,10 +395,10 @@ impl Controller { }, CommMsgContents::Elaborate { partial_oracle } => { // Child controller submitted a subtree solution. - if !self.inner.family.children_ekeys.contains(&received.recipient) { + if !self.inner.family.children_ports.contains(&received.recipient) { return Err(SyncErr::ElaborateFromNonChild); } - let subtree_id = SubtreeId::ChildController { ekey: received.recipient }; + let subtree_id = SubtreeId::ChildController { port: received.recipient }; log!( &mut self.inner.logger, "Received elaboration from child for subtree {:?}: {:?}", @@ -415,7 +415,7 @@ impl Controller { } } CommMsgContents::Announce { decision } => { - if self.inner.family.parent_ekey != Some(received.recipient) { + if self.inner.family.parent_port != Some(received.recipient) { return Err(SyncErr::AnnounceFromNonParent); } log!( @@ -435,7 +435,7 @@ impl Controller { // message for some actor. Feed it to the appropriate actor // and then give them another chance to run. - let subtree_id = ekey_to_holder.get(&received.recipient); + let subtree_id = port_to_holder.get(&received.recipient); log!( &mut self.inner.logger, "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", @@ -530,13 +530,13 @@ impl ControllerEphemeral { && self.poly_n.is_none() && self.poly_ps.is_empty() && self.mono_ps.is_empty() - && self.ekey_to_holder.is_empty() + && self.port_to_holder.is_empty() } fn clear(&mut self) { self.solution_storage.clear(); self.poly_n.take(); self.poly_ps.clear(); - self.ekey_to_holder.clear(); + self.port_to_holder.clear(); } } impl Into for MonoP { @@ -552,7 +552,7 @@ impl Into for MonoP { blocking_on: None, } }, - ekeys: self.ekeys, + ports: self.ports, } } } @@ -566,41 +566,41 @@ impl From for SyncErr { impl MonoContext for MonoPContext<'_> { type D = ProtocolD; type S = ProtocolS; - fn new_component(&mut self, moved_ekeys: HashSet, init_state: Self::S) { + fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { log!( &mut self.inner.logger, - "!! MonoContext callback to new_component with ekeys {:?}!", - &moved_ekeys, + "!! MonoContext callback to new_component with ports {:?}!", + &moved_ports, ); - if moved_ekeys.is_subset(self.ekeys) { - self.ekeys.retain(|x| !moved_ekeys.contains(x)); - self.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys }); + if moved_ports.is_subset(self.ports) { + self.ports.retain(|x| !moved_ports.contains(x)); + self.mono_ps.push(MonoP { state: init_state, ports: moved_ports }); } else { - panic!("MachineP attempting to move alien ekey!"); + panic!("MachineP attempting to move alien port!"); } } - fn new_channel(&mut self) -> [Key; 2] { + fn new_channel(&mut self) -> [Port; 2] { let [a, b] = Endpoint::new_memory_pair(); let channel_id = self.inner.channel_id_stream.next(); let mut clos = |endpoint, polarity| { let endpoint_ext = EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint }; - let ekey = self.inner.endpoint_exts.alloc(endpoint_ext); - let endpoint = &self.inner.endpoint_exts.get(ekey).unwrap().endpoint; - let token = Key::to_token(ekey); + let port = self.inner.endpoint_exts.alloc(endpoint_ext); + let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint; + let token = Port::to_token(port); self.inner .messenger_state .poll .register(endpoint, token, Ready::readable(), PollOpt::edge()) .expect("AAGAGGGGG"); - self.ekeys.insert(ekey); - ekey + self.ports.insert(port); + port }; let [kp, kg] = [clos(a, Putter), clos(b, Getter)]; log!( &mut self.inner.logger, - "!! MonoContext callback to new_channel. returning ekeys {:?}!", + "!! MonoContext callback to new_channel. returning ports {:?}!", [kp, kg], ); [kp, kg] @@ -713,9 +713,9 @@ impl SolutionStorage { impl PolyContext for BranchPContext<'_, '_> { type D = ProtocolD; - fn is_firing(&mut self, ekey: Key) -> Option { - assert!(self.ekeys.contains(&ekey)); - let channel_id = self.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + fn is_firing(&mut self, port: Port) -> Option { + assert!(self.ports.contains(&port)); + let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; let val = self.predicate.query(channel_id); log!( &mut self.m_ctx.inner.logger, @@ -725,9 +725,9 @@ impl PolyContext for BranchPContext<'_, '_> { ); val } - fn read_msg(&mut self, ekey: Key) -> Option<&Payload> { - assert!(self.ekeys.contains(&ekey)); - let val = self.inbox.get(&ekey); + fn read_msg(&mut self, port: Port) -> Option<&Payload> { + assert!(self.ports.contains(&port)); + let val = self.inbox.get(&port); log!( &mut self.m_ctx.inner.logger, "!! PolyContext callback to read_msg by {:?}! returning {:?}", diff --git a/src/runtime/connector.rs b/src/runtime/connector.rs index 88d5b2607eb9e17283d4a8fdd8b33118e3b93b2f..5a10a83706fdca6bb462cc2d8b0ec2dea08a846f 100644 --- a/src/runtime/connector.rs +++ b/src/runtime/connector.rs @@ -113,16 +113,16 @@ impl Connector { Connector::Connected(connected) => connected, _ => return Err(NotConnected), }; - let (ekey, native_polarity) = + let (port, native_polarity) = *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?; if native_polarity != Putter { return Err(WrongPolarity); } let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!"); - if sync_batch.puts.contains_key(&ekey) { + if sync_batch.puts.contains_key(&port) { return Err(DuplicateOperation); } - sync_batch.puts.insert(ekey, payload); + sync_batch.puts.insert(port, payload); Ok(()) } @@ -132,16 +132,16 @@ impl Connector { Connector::Connected(connected) => connected, _ => return Err(NotConnected), }; - let (ekey, native_polarity) = + let (port, native_polarity) = *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?; if native_polarity != Getter { return Err(WrongPolarity); } let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!"); - if sync_batch.gets.contains(&ekey) { + if sync_batch.gets.contains(&port) { return Err(DuplicateOperation); } - sync_batch.gets.insert(ekey); + sync_batch.gets.insert(port); Ok(()) } pub fn next_batch(&mut self) -> Result { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index d4811b397bbdcf22bac1c5033a4cd2caad11a94b..80cf9765469db0064ad0f4d0f9d90f4268a2bdde 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -34,8 +34,8 @@ pub(crate) struct Predicate { #[derive(Debug, Default)] struct SyncBatch { - puts: HashMap, - gets: HashSet, + puts: HashMap, + gets: HashSet, } #[derive(Debug)] @@ -59,7 +59,7 @@ pub struct Configured { } #[derive(Debug)] pub struct Connected { - native_interface: Vec<(Key, Polarity)>, + native_interface: Vec<(Port, Polarity)>, sync_batches: Vec, controller: Controller, } @@ -78,7 +78,7 @@ struct Arena { #[derive(Debug)] struct ReceivedMsg { - recipient: Key, + recipient: Port, msg: Msg, } @@ -88,7 +88,7 @@ struct MessengerState { events: Events, delayed: Vec, undelayed: Vec, - polled_undrained: IndexSet, + polled_undrained: IndexSet, } #[derive(Debug)] struct ChannelIdStream { @@ -122,13 +122,13 @@ struct ControllerEphemeral { poly_n: Option, poly_ps: Vec, mono_ps: Vec, - ekey_to_holder: HashMap, + port_to_holder: HashMap, } #[derive(Debug)] struct ControllerFamily { - parent_ekey: Option, - children_ekeys: Vec, + parent_port: Option, + children_ports: Vec, } #[derive(Debug)] @@ -149,12 +149,12 @@ enum PolyId { pub(crate) enum SubtreeId { PolyN, PolyP { index: usize }, - ChildController { ekey: Key }, + ChildController { port: Port }, } pub(crate) struct MonoPContext<'a> { inner: &'a mut ControllerInner, - ekeys: &'a mut HashSet, + ports: &'a mut HashSet, mono_ps: &'a mut Vec, } pub(crate) struct PolyPContext<'a> { @@ -171,9 +171,9 @@ impl PolyPContext<'_> { } struct BranchPContext<'m, 'r> { m_ctx: PolyPContext<'m>, - ekeys: &'r HashSet, + ports: &'r HashSet, predicate: &'r Predicate, - inbox: &'r HashMap, + inbox: &'r HashMap, } #[derive(Default)] @@ -187,7 +187,7 @@ pub(crate) struct SolutionStorage { trait Messengerlike { fn get_state_mut(&mut self) -> &mut MessengerState; - fn get_endpoint_mut(&mut self, eekey: Key) -> &mut Endpoint; + fn get_endpoint_mut(&mut self, eport: Port) -> &mut Endpoint; fn delay(&mut self, received: ReceivedMsg) { self.get_state_mut().delayed.push(received); @@ -197,7 +197,7 @@ trait Messengerlike { undelayed.extend(delayed.drain(..)) } - fn send(&mut self, to: Key, msg: Msg) -> Result<(), EndpointErr> { + fn send(&mut self, to: Port, msg: Msg) -> Result<(), EndpointErr> { self.get_endpoint_mut(to).send(msg) } @@ -210,15 +210,15 @@ trait Messengerlike { loop { // polled_undrained may not be empty - while let Some(eekey) = self.get_state_mut().polled_undrained.pop() { + while let Some(eport) = self.get_state_mut().polled_undrained.pop() { if let Some(msg) = self - .get_endpoint_mut(eekey) + .get_endpoint_mut(eport) .recv() - .map_err(|e| MessengerRecvErr::EndpointErr(eekey, e))? + .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? { // this endpoint MAY still have messages! check again in future - self.get_state_mut().polled_undrained.insert(eekey); - return Ok(Some(ReceivedMsg { recipient: eekey, msg })); + self.get_state_mut().polled_undrained.insert(eport); + return Ok(Some(ReceivedMsg { recipient: eport, msg })); } } @@ -226,7 +226,7 @@ trait Messengerlike { match state.poll_events(deadline) { Ok(()) => { for e in state.events.iter() { - state.polled_undrained.insert(Key::from_token(e.token())); + state.polled_undrained.insert(Port::from_token(e.token())); } } Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), @@ -242,15 +242,15 @@ trait Messengerlike { loop { // polled_undrained may not be empty - while let Some(eekey) = self.get_state_mut().polled_undrained.pop() { + while let Some(eport) = self.get_state_mut().polled_undrained.pop() { if let Some(msg) = self - .get_endpoint_mut(eekey) + .get_endpoint_mut(eport) .recv() - .map_err(|e| MessengerRecvErr::EndpointErr(eekey, e))? + .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? { // this endpoint MAY still have messages! check again in future - self.get_state_mut().polled_undrained.insert(eekey); - return Ok(ReceivedMsg { recipient: eekey, msg }); + self.get_state_mut().polled_undrained.insert(eport); + return Ok(ReceivedMsg { recipient: eport, msg }); } } @@ -261,7 +261,7 @@ trait Messengerlike { .poll(&mut state.events, None) .map_err(|_| MessengerRecvErr::PollingFailed)?; for e in state.events.iter() { - state.polled_undrained.insert(Key::from_token(e.token())); + state.polled_undrained.insert(Port::from_token(e.token())); } } } @@ -293,38 +293,33 @@ impl From for ConnectErr { ConnectErr::MessengerRecvErr(e) } } -// impl From for MessengerRecvErr { -// fn from(e: EndpointErr) -> MessengerRecvErr { -// MessengerRecvErr::EndpointErr(e) -// } -// } impl Default for Arena { fn default() -> Self { Self { storage: vec![] } } } impl Arena { - pub fn alloc(&mut self, t: T) -> Key { + pub fn alloc(&mut self, t: T) -> Port { self.storage.push(t); - Key::from_raw(self.storage.len() - 1) + Port::from_raw(self.storage.len() - 1) } - pub fn get(&self, key: Key) -> Option<&T> { + pub fn get(&self, key: Port) -> Option<&T> { self.storage.get(key.to_raw() as usize) } - pub fn get_mut(&mut self, key: Key) -> Option<&mut T> { + pub fn get_mut(&mut self, key: Port) -> Option<&mut T> { self.storage.get_mut(key.to_raw() as usize) } - pub fn type_convert(self, f: impl FnMut((Key, T)) -> X) -> Arena { + pub fn type_convert(self, f: impl FnMut((Port, T)) -> X) -> Arena { Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() } } - pub fn iter(&self) -> impl Iterator { + pub fn iter(&self) -> impl Iterator { self.keyspace().zip(self.storage.iter()) } pub fn len(&self) -> usize { self.storage.len() } - pub fn keyspace(&self) -> impl Iterator { - (0..self.storage.len()).map(Key::from_raw) + pub fn keyspace(&self) -> impl Iterator { + (0..self.storage.len()).map(Port::from_raw) } } diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 6296c5feb66807ea14d489d7686bdbe5bbc30c3d..a814dd78afe58d847588da5a9615a1d04ae2216a 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -25,7 +25,7 @@ impl Controller { bound_proto_interface: &[(PortBinding, Polarity)], logger: &mut String, deadline: Instant, - ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> { + ) -> Result<(Self, Vec<(Port, Polarity)>), ConnectErr> { use ConnectErr::*; log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); @@ -33,15 +33,15 @@ impl Controller { let mut channel_id_stream = ChannelIdStream::new(major); let mut endpoint_ext_todos = Arena::default(); - let mut ekeys_native = vec![]; - let mut ekeys_proto = vec![]; - let mut ekeys_network = vec![]; + let mut ports_native = vec![]; + let mut ports_proto = vec![]; + let mut ports_network = vec![]; let mut native_interface = vec![]; /* 1. - allocate an EndpointExtTodo for every native and interface port - - store all the resulting keys in two keylists for the interfaces of the native and proto components + - store all the resulting ports in two portlists for the interfaces of the native and proto components native: [a, c, f] | | | | | | @@ -53,44 +53,44 @@ impl Controller { match binding { PortBinding::Native => { let channel_id = channel_id_stream.next(); - let ([ekey_native, ekey_proto], native_polarity) = { + let ([port_native, port_proto], native_polarity) = { let [p, g] = Endpoint::new_memory_pair(); - let mut endpoint_to_key = |endpoint, polarity| { + let mut endpoint_to_port = |endpoint, polarity| { endpoint_ext_todos.alloc(EndpointExtTodo::Finished(EndpointExt { endpoint, info: EndpointInfo { polarity, channel_id }, })) }; - let pkey = endpoint_to_key(p, Putter); - let gkey = endpoint_to_key(g, Getter); - let key_pair = match polarity { - Putter => [gkey, pkey], - Getter => [pkey, gkey], + let pport = endpoint_to_port(p, Putter); + let gport = endpoint_to_port(g, Getter); + let port_pair = match polarity { + Putter => [gport, pport], + Getter => [pport, gport], }; - (key_pair, !polarity) + (port_pair, !polarity) }; - native_interface.push((ekey_native, native_polarity)); - ekeys_native.push(ekey_native); - ekeys_proto.push(ekey_proto); + native_interface.push((port_native, native_polarity)); + ports_native.push(port_native); + ports_proto.push(port_proto); } PortBinding::Passive(addr) => { let channel_id = channel_id_stream.next(); - let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::PassiveAccepting { + let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::PassiveAccepting { addr, info: EndpointInfo { polarity, channel_id }, listener: TcpListener::bind(&addr).map_err(|_| BindFailed(addr))?, }); - ekeys_network.push(ekey_proto); - ekeys_proto.push(ekey_proto); + ports_network.push(port_proto); + ports_proto.push(port_proto); } PortBinding::Active(addr) => { - let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting { + let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting { addr, polarity, stream: TcpStream::connect(&addr).unwrap(), }); - ekeys_network.push(ekey_proto); - ekeys_proto.push(ekey_proto); + ports_network.push(port_proto); + ports_proto.push(port_proto); } } } @@ -100,10 +100,10 @@ impl Controller { let (mut messenger_state, mut endpoint_exts) = Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?; - let n_mono = MonoN { ekeys: ekeys_native.into_iter().collect(), result: None }; + let n_mono = MonoN { ports: ports_native.into_iter().collect(), result: None }; let p_monos = vec![MonoP { - state: protocol_description.new_main_component(main_component, &ekeys_proto), - ekeys: ekeys_proto.into_iter().collect(), + state: protocol_description.new_main_component(main_component, &ports_proto), + ports: ports_proto.into_iter().collect(), }]; // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS} @@ -112,7 +112,7 @@ impl Controller { logger, &mut endpoint_exts, &mut messenger_state, - ekeys_network, + ports_network, deadline, )?; @@ -179,22 +179,22 @@ impl Controller { }; // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener} - // 3. store the keyset of EndpointExtTodos which are not Finished in `to_finish`. + // 3. store the portset of EndpointExtTodos which are not Finished in `to_finish`. let mut to_finish = HashSet::<_>::default(); log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len()); - for (key, t) in endpoint_ext_todos.iter() { - let token = key.to_token(); + for (port, t) in endpoint_ext_todos.iter() { + let token = port.to_token(); match t { ActiveRecving { .. } | PassiveConnecting { .. } => unreachable!(), Finished(EndpointExt { endpoint, .. }) => { ms.poll.register(endpoint, token, ready_r, edge) } ActiveConnecting { stream, .. } => { - to_finish.insert(key); + to_finish.insert(port); ms.poll.register(stream, token, ready_w, edge) } PassiveAccepting { listener, .. } => { - to_finish.insert(key); + to_finish.insert(port); ms.poll.register(listener, token, ready_r, edge) } } @@ -215,11 +215,11 @@ impl Controller { for event in ms.events.iter() { log!(logger, "event {:#?}", event); let token = event.token(); - let ekey = Key::from_token(token); - let entry = endpoint_ext_todos.get_mut(ekey).unwrap(); + let port = Port::from_token(token); + let entry = endpoint_ext_todos.get_mut(port).unwrap(); match entry { Finished(_) => { - polled_undrained_later.insert(ekey); + polled_undrained_later.insert(port); } PassiveAccepting { addr, listener, .. } => { log!(logger, "{:03?} start PassiveAccepting...", major); @@ -255,7 +255,7 @@ impl Controller { }); res?; log!(logger, "{:03?} ... end PassiveConnecting", major); - assert!(to_finish.remove(&ekey)); + assert!(to_finish.remove(&port)); } ActiveConnecting { addr, stream, .. } => { log!(logger, "{:03?} start ActiveConnecting...", major); @@ -298,11 +298,11 @@ impl Controller { Finished(EndpointExt { info, endpoint }) }] }); - ms.polled_undrained.insert(ekey); - assert!(to_finish.remove(&ekey)); + ms.polled_undrained.insert(port); + assert!(to_finish.remove(&port)); break 'recv_loop; } else { - ms.delayed.push(ReceivedMsg { recipient: ekey, msg }); + ms.delayed.push(ReceivedMsg { recipient: port, msg }); } } log!(logger, "{:03?} ... end ActiveRecving", major); @@ -310,8 +310,8 @@ impl Controller { } } } - for ekey in polled_undrained_later { - ms.polled_undrained.insert(ekey); + for port in polled_undrained_later { + ms.polled_undrained.insert(port); } let endpoint_exts = endpoint_ext_todos.type_convert(|(_, todo)| match todo { Finished(endpoint_ext) => endpoint_ext, @@ -325,7 +325,7 @@ impl Controller { logger: &mut String, endpoint_exts: &mut Arena, messenger_state: &mut MessengerState, - neighbors: Vec, + neighbors: Vec, deadline: Instant, ) -> Result { use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*}; @@ -337,12 +337,12 @@ impl Controller { fn get_state_mut(&mut self) -> &mut MessengerState { self.0 } - fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint { - &mut self.1.get_mut(ekey).expect("OUT OF BOUNDS").endpoint + fn get_endpoint_mut(&mut self, port: Port) -> &mut Endpoint { + &mut self.1.get_mut(port).expect("OUT OF BOUNDS").endpoint } } - // 1. broadcast my ID as the first echo. await reply from all in net_keylist + // 1. broadcast my ID as the first echo. await reply from all in net_portlist let echo = S(LeaderEcho { maybe_leader: major }); let mut awaiting = IndexSet::with_capacity(neighbors.len()); for &n in neighbors.iter() { @@ -353,7 +353,7 @@ impl Controller { // 2. Receive incoming replies. whenever a higher-id echo arrives, // adopt it as leader, sender as parent, and reset the await set. - let mut parent: Option = None; + let mut parent: Option = None; let mut my_leader = major; messenger.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { @@ -470,7 +470,7 @@ impl Controller { _ => messenger.delay(ReceivedMsg { recipient, msg }), } } - Ok(ControllerFamily { parent_ekey: parent, children_ekeys: children }) + Ok(ControllerFamily { parent_port: parent, children_ports: children }) } } @@ -478,7 +478,7 @@ impl Messengerlike for Controller { fn get_state_mut(&mut self) -> &mut MessengerState { &mut self.inner.messenger_state } - fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint { - &mut self.inner.endpoint_exts.get_mut(ekey).expect("OUT OF BOUNDS").endpoint + fn get_endpoint_mut(&mut self, port: Port) -> &mut Endpoint { + &mut self.inner.endpoint_exts.get_mut(port).expect("OUT OF BOUNDS").endpoint } }