diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 6296c5feb66807ea14d489d7686bdbe5bbc30c3d..a814dd78afe58d847588da5a9615a1d04ae2216a 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -25,7 +25,7 @@ impl Controller { bound_proto_interface: &[(PortBinding, Polarity)], logger: &mut String, deadline: Instant, - ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> { + ) -> Result<(Self, Vec<(Port, Polarity)>), ConnectErr> { use ConnectErr::*; log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); @@ -33,15 +33,15 @@ impl Controller { let mut channel_id_stream = ChannelIdStream::new(major); let mut endpoint_ext_todos = Arena::default(); - let mut ekeys_native = vec![]; - let mut ekeys_proto = vec![]; - let mut ekeys_network = vec![]; + let mut ports_native = vec![]; + let mut ports_proto = vec![]; + let mut ports_network = vec![]; let mut native_interface = vec![]; /* 1. - allocate an EndpointExtTodo for every native and interface port - - store all the resulting keys in two keylists for the interfaces of the native and proto components + - store all the resulting ports in two portlists for the interfaces of the native and proto components native: [a, c, f] | | | | | | @@ -53,44 +53,44 @@ impl Controller { match binding { PortBinding::Native => { let channel_id = channel_id_stream.next(); - let ([ekey_native, ekey_proto], native_polarity) = { + let ([port_native, port_proto], native_polarity) = { let [p, g] = Endpoint::new_memory_pair(); - let mut endpoint_to_key = |endpoint, polarity| { + let mut endpoint_to_port = |endpoint, polarity| { endpoint_ext_todos.alloc(EndpointExtTodo::Finished(EndpointExt { endpoint, info: EndpointInfo { polarity, channel_id }, })) }; - let pkey = endpoint_to_key(p, Putter); - let gkey = endpoint_to_key(g, Getter); - let key_pair = match polarity { - Putter => [gkey, pkey], - Getter => [pkey, gkey], + let pport = endpoint_to_port(p, Putter); + let gport = endpoint_to_port(g, Getter); + let port_pair = match polarity { + Putter => [gport, pport], + Getter => [pport, gport], }; - (key_pair, !polarity) + (port_pair, !polarity) }; - native_interface.push((ekey_native, native_polarity)); - ekeys_native.push(ekey_native); - ekeys_proto.push(ekey_proto); + native_interface.push((port_native, native_polarity)); + ports_native.push(port_native); + ports_proto.push(port_proto); } PortBinding::Passive(addr) => { let channel_id = channel_id_stream.next(); - let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::PassiveAccepting { + let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::PassiveAccepting { addr, info: EndpointInfo { polarity, channel_id }, listener: TcpListener::bind(&addr).map_err(|_| BindFailed(addr))?, }); - ekeys_network.push(ekey_proto); - ekeys_proto.push(ekey_proto); + ports_network.push(port_proto); + ports_proto.push(port_proto); } PortBinding::Active(addr) => { - let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting { + let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting { addr, polarity, stream: TcpStream::connect(&addr).unwrap(), }); - ekeys_network.push(ekey_proto); - ekeys_proto.push(ekey_proto); + ports_network.push(port_proto); + ports_proto.push(port_proto); } } } @@ -100,10 +100,10 @@ impl Controller { let (mut messenger_state, mut endpoint_exts) = Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?; - let n_mono = MonoN { ekeys: ekeys_native.into_iter().collect(), result: None }; + let n_mono = MonoN { ports: ports_native.into_iter().collect(), result: None }; let p_monos = vec![MonoP { - state: protocol_description.new_main_component(main_component, &ekeys_proto), - ekeys: ekeys_proto.into_iter().collect(), + state: protocol_description.new_main_component(main_component, &ports_proto), + ports: ports_proto.into_iter().collect(), }]; // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS} @@ -112,7 +112,7 @@ impl Controller { logger, &mut endpoint_exts, &mut messenger_state, - ekeys_network, + ports_network, deadline, )?; @@ -179,22 +179,22 @@ impl Controller { }; // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener} - // 3. store the keyset of EndpointExtTodos which are not Finished in `to_finish`. + // 3. store the portset of EndpointExtTodos which are not Finished in `to_finish`. let mut to_finish = HashSet::<_>::default(); log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len()); - for (key, t) in endpoint_ext_todos.iter() { - let token = key.to_token(); + for (port, t) in endpoint_ext_todos.iter() { + let token = port.to_token(); match t { ActiveRecving { .. } | PassiveConnecting { .. } => unreachable!(), Finished(EndpointExt { endpoint, .. }) => { ms.poll.register(endpoint, token, ready_r, edge) } ActiveConnecting { stream, .. } => { - to_finish.insert(key); + to_finish.insert(port); ms.poll.register(stream, token, ready_w, edge) } PassiveAccepting { listener, .. } => { - to_finish.insert(key); + to_finish.insert(port); ms.poll.register(listener, token, ready_r, edge) } } @@ -215,11 +215,11 @@ impl Controller { for event in ms.events.iter() { log!(logger, "event {:#?}", event); let token = event.token(); - let ekey = Key::from_token(token); - let entry = endpoint_ext_todos.get_mut(ekey).unwrap(); + let port = Port::from_token(token); + let entry = endpoint_ext_todos.get_mut(port).unwrap(); match entry { Finished(_) => { - polled_undrained_later.insert(ekey); + polled_undrained_later.insert(port); } PassiveAccepting { addr, listener, .. } => { log!(logger, "{:03?} start PassiveAccepting...", major); @@ -255,7 +255,7 @@ impl Controller { }); res?; log!(logger, "{:03?} ... end PassiveConnecting", major); - assert!(to_finish.remove(&ekey)); + assert!(to_finish.remove(&port)); } ActiveConnecting { addr, stream, .. } => { log!(logger, "{:03?} start ActiveConnecting...", major); @@ -298,11 +298,11 @@ impl Controller { Finished(EndpointExt { info, endpoint }) }] }); - ms.polled_undrained.insert(ekey); - assert!(to_finish.remove(&ekey)); + ms.polled_undrained.insert(port); + assert!(to_finish.remove(&port)); break 'recv_loop; } else { - ms.delayed.push(ReceivedMsg { recipient: ekey, msg }); + ms.delayed.push(ReceivedMsg { recipient: port, msg }); } } log!(logger, "{:03?} ... end ActiveRecving", major); @@ -310,8 +310,8 @@ impl Controller { } } } - for ekey in polled_undrained_later { - ms.polled_undrained.insert(ekey); + for port in polled_undrained_later { + ms.polled_undrained.insert(port); } let endpoint_exts = endpoint_ext_todos.type_convert(|(_, todo)| match todo { Finished(endpoint_ext) => endpoint_ext, @@ -325,7 +325,7 @@ impl Controller { logger: &mut String, endpoint_exts: &mut Arena, messenger_state: &mut MessengerState, - neighbors: Vec, + neighbors: Vec, deadline: Instant, ) -> Result { use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*}; @@ -337,12 +337,12 @@ impl Controller { fn get_state_mut(&mut self) -> &mut MessengerState { self.0 } - fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint { - &mut self.1.get_mut(ekey).expect("OUT OF BOUNDS").endpoint + fn get_endpoint_mut(&mut self, port: Port) -> &mut Endpoint { + &mut self.1.get_mut(port).expect("OUT OF BOUNDS").endpoint } } - // 1. broadcast my ID as the first echo. await reply from all in net_keylist + // 1. broadcast my ID as the first echo. await reply from all in net_portlist let echo = S(LeaderEcho { maybe_leader: major }); let mut awaiting = IndexSet::with_capacity(neighbors.len()); for &n in neighbors.iter() { @@ -353,7 +353,7 @@ impl Controller { // 2. Receive incoming replies. whenever a higher-id echo arrives, // adopt it as leader, sender as parent, and reset the await set. - let mut parent: Option = None; + let mut parent: Option = None; let mut my_leader = major; messenger.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { @@ -470,7 +470,7 @@ impl Controller { _ => messenger.delay(ReceivedMsg { recipient, msg }), } } - Ok(ControllerFamily { parent_ekey: parent, children_ekeys: children }) + Ok(ControllerFamily { parent_port: parent, children_ports: children }) } } @@ -478,7 +478,7 @@ impl Messengerlike for Controller { fn get_state_mut(&mut self) -> &mut MessengerState { &mut self.inner.messenger_state } - fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint { - &mut self.inner.endpoint_exts.get_mut(ekey).expect("OUT OF BOUNDS").endpoint + fn get_endpoint_mut(&mut self, port: Port) -> &mut Endpoint { + &mut self.inner.endpoint_exts.get_mut(port).expect("OUT OF BOUNDS").endpoint } }