diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs index bc747db7960b1b734461fd8007b59628e2468f98..bd6281a1ae2c776f19a4859f0958fbee6265004b 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup2.rs @@ -29,29 +29,37 @@ impl Connector { id_manager: IdManager::new(controller_id), native_ports: Default::default(), proto_components: Default::default(), - outp_to_inp: Default::default(), - inp_to_route: Default::default(), + port_info: Default::default(), phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, } } pub fn add_port_pair(&mut self) -> [PortId; 2] { - let o = self.id_manager.next_port(); - let i = self.id_manager.next_port(); - self.outp_to_inp.insert(o, i); - self.inp_to_route.insert(i, InpRoute::NativeComponent); + let route = Route::LocalComponent(LocalComponentId::Native); + let [o, i] = [self.id_manager.next_port(), self.id_manager.next_port()]; self.native_ports.insert(o); self.native_ports.insert(i); + // {polarity, peer, route} known. {} unknown. + self.port_info.polarities.insert(o, Putter); + self.port_info.polarities.insert(i, Getter); + self.port_info.peers.insert(o, i); + self.port_info.peers.insert(i, o); + self.port_info.routes.insert(o, route); + self.port_info.routes.insert(i, route); log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } - pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result { + pub fn add_net_port( + &mut self, + polarity: Polarity, + endpoint_setup: EndpointSetup, + ) -> Result { match &mut self.phased { ConnectorPhased::Setup { endpoint_setups, .. } => { let p = self.id_manager.next_port(); self.native_ports.insert(p); - if endpoint_setup.polarity == Getter { - self.inp_to_route.insert(p, InpRoute::NativeComponent); - } + // {polarity, route} known. {peer} unknown. + self.port_info.polarities.insert(p, polarity); + self.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native)); log!(self.logger, "Added net port {:?} with info {:?} ", p, &endpoint_setup); endpoint_setups.push((p, endpoint_setup)); Ok(p) @@ -59,23 +67,6 @@ impl Connector { ConnectorPhased::Communication { .. } => Err(()), } } - fn check_polarity(&self, port: &PortId) -> Polarity { - if let ConnectorPhased::Setup { endpoint_setups, .. } = &self.phased { - for (setup_port, EndpointSetup { polarity, .. }) in endpoint_setups.iter() { - if setup_port == port { - // special case. this port's polarity isn't reflected by - // self.inp_to_route or self.outp_to_inp, because its still not paired to a peer - return *polarity; - } - } - } - if self.outp_to_inp.contains_key(port) { - Polarity::Putter - } else { - assert!(self.inp_to_route.contains_key(port)); - Polarity::Getter - } - } pub fn add_component( &mut self, identifier: &[u8], @@ -90,7 +81,7 @@ impl Connector { if !self.native_ports.contains(port) { return Err(UnknownPort(*port)); } - if expected_polarity != self.check_polarity(port) { + if expected_polarity != *self.port_info.polarities.get(port).unwrap() { return Err(WrongPortPolarity { port: *port, expected_polarity }); } } @@ -100,9 +91,11 @@ impl Connector { let proto_component_index = self.proto_components.len(); self.proto_components.push(proto_component); for port in ports.iter() { - if let Polarity::Getter = self.check_polarity(port) { - self.inp_to_route - .insert(*port, InpRoute::ProtoComponent { index: proto_component_index }); + if let Polarity::Getter = *self.port_info.polarities.get(port).unwrap() { + self.port_info.routes.insert( + *port, + Route::LocalComponent(LocalComponentId::Proto { index: proto_component_index }), + ); } } Ok(()) @@ -118,22 +111,8 @@ impl Connector { let deadline = Instant::now() + timeout; // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = { - let Self { outp_to_inp, inp_to_route, logger, .. } = self; - let logical_channel_callback = |lci: LogicalChannelInfo| { - if let Putter = lci.local_polarity { - outp_to_inp.insert(lci.local_port, lci.peer_port); - inp_to_route.insert( - lci.peer_port, - InpRoute::Endpoint { index: lci.endpoint_index }, - ); - } - }; - new_endpoint_manager( - &mut **logger, - endpoint_setups, - logical_channel_callback, - deadline, - )? + let Self { logger, port_info, .. } = self; + new_endpoint_manager(&mut **logger, endpoint_setups, port_info, deadline)? }; log!( self.logger, @@ -153,6 +132,10 @@ impl Connector { endpoint_manager, neighborhood, mem_inbox: Default::default(), + native_actor: NativeActor::Nonsync { + sync_result_branch: None, + next_batches: vec![SyncBatch::default()], + }, }; Ok(()) } @@ -163,14 +146,13 @@ impl Connector { fn new_endpoint_manager( logger: &mut dyn Logger, endpoint_setups: &[(PortId, EndpointSetup)], - mut logical_channel_callback: impl FnMut(LogicalChannelInfo), + port_info: &mut PortInfo, deadline: Instant, ) -> Result { //////////////////////////////////////////// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, - endpoint_setup: EndpointSetup, local_port: PortId, sent_local_port: bool, // true <-> I've sent my local port recv_peer_port: Option, // Some(..) <-> I've received my peer's port @@ -194,20 +176,15 @@ fn new_endpoint_manager( poll.registry().register(&mut listener, token, BOTH).unwrap(); TodoEndpoint::Listener(listener) }; - Ok(Todo { - todo_endpoint, - endpoint_setup: endpoint_setup.clone(), - local_port, - sent_local_port: false, - recv_peer_port: None, - }) + Ok(Todo { todo_endpoint, local_port, sent_local_port: false, recv_peer_port: None }) }; //////////////////////////////////////////// // 1. Start to construct EndpointManager let mut poll = Poll::new().map_err(drop)?; let mut events = Events::with_capacity(64); - let mut undrained_endpoints = IndexSet::::default(); + let mut polled_undrained = IndexSet::::default(); + let mut delayed_messages = vec![]; // 2. create a registered (TcpListener/Endpoint) for passive / active respectively let mut todos = endpoint_setups @@ -234,7 +211,7 @@ fn new_endpoint_manager( let (mut stream, peer_addr) = listener.accept().map_err(drop)?; poll.registry().deregister(listener).unwrap(); poll.registry().register(&mut stream, token, BOTH).unwrap(); - log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr); + log!(logger, "Endpoint[{}] accepted a connection from {:?}", index, peer_addr); let endpoint = Endpoint { stream, inbox: vec![] }; todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); } @@ -242,35 +219,49 @@ fn new_endpoint_manager( Todo { todo_endpoint: TodoEndpoint::Endpoint(endpoint), local_port, - endpoint_setup, sent_local_port, recv_peer_port, + .. } => { if !setup_incomplete.contains(&index) { continue; } + let local_polarity = *port_info.polarities.get(local_port).unwrap(); if event.is_writable() && !*sent_local_port { - let msg = - MyPortInfo { polarity: endpoint_setup.polarity, port: *local_port }; + let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { + polarity: local_polarity, + port: *local_port, + })); endpoint.send(&msg)?; - log!(logger, "endpoint[{}] sent peer info {:?}", index, &msg); + log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); *sent_local_port = true; } if event.is_readable() && recv_peer_port.is_none() { - undrained_endpoints.insert(index); - if let Some(peer_port_info) = - endpoint.try_recv::().map_err(drop)? - { - log!(logger, "endpoint[{}] got peer info {:?}", index, peer_port_info); - assert!(peer_port_info.polarity != endpoint_setup.polarity); - *recv_peer_port = Some(peer_port_info.port); - let lci = LogicalChannelInfo { - local_port: *local_port, - peer_port: peer_port_info.port, - local_polarity: endpoint_setup.polarity, - endpoint_index: index, - }; - logical_channel_callback(lci); + let maybe_msg = endpoint.try_recv().map_err(drop)?; + if maybe_msg.is_some() && !endpoint.inbox.is_empty() { + polled_undrained.insert(index); + } + match maybe_msg { + None => {} // msg deserialization incomplete + Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => { + log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info); + assert!(peer_info.polarity != local_polarity); + *recv_peer_port = Some(peer_info.port); + // 1. finally learned the peer of this port! + port_info.peers.insert(*local_port, peer_info.port); + // 2. learned the info of this peer port + port_info.polarities.insert(peer_info.port, peer_info.polarity); + port_info.peers.insert(peer_info.port, *local_port); + port_info.routes.insert(peer_info.port, Route::Endpoint { index }); + } + Some(inappropriate_msg) => { + log!( + logger, + "delaying msg {:?} during channel setup phase", + inappropriate_msg + ); + delayed_messages.push((index, inappropriate_msg)); + } } } if *sent_local_port && recv_peer_port.is_some() { @@ -296,9 +287,9 @@ fn new_endpoint_manager( Ok(EndpointManager { poll, events, - undrained_endpoints, + polled_undrained, + undelayed_messages: delayed_messages, // no longer delayed delayed_messages: Default::default(), - undelayed_messages: Default::default(), endpoint_exts, }) } @@ -309,11 +300,7 @@ fn init_neighborhood( em: &mut EndpointManager, deadline: Instant, ) -> Result { - //////////////////////////////////////////// - use Msg::SetupMsg as S; - use SetupMsg::*; - //////////////////////////////////////////// - + use {Msg::SetupMsg as S, SetupMsg::*}; log!(logger, "beginning neighborhood construction"); // 1. broadcast my ID as the first echo. await reply from all neighbors let echo = S(LeaderEcho { maybe_leader: controller_id }); @@ -330,7 +317,7 @@ fn init_neighborhood( let mut my_leader = controller_id; em.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?; log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); match msg { S(LeaderAnnounce { leader }) => { @@ -381,7 +368,10 @@ fn init_neighborhood( } } } - inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)), + inappropriate_msg => { + log!(logger, "delaying msg {:?} during echo phase", inappropriate_msg); + em.delayed_messages.push((index, inappropriate_msg)) + } } } match parent { @@ -413,21 +403,27 @@ fn init_neighborhood( ee.endpoint.send(msg)?; } let mut children = Vec::default(); + log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages); em.undelay_all(); + log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages); while !awaiting.is_empty() { - let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; + log!(logger, "awaiting {:?}", &awaiting); + let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?; match msg { S(YouAreMyParent) => { assert!(awaiting.remove(&index)); children.push(index); } - S(SetupMsg::LeaderAnnounce { leader }) => { + S(LeaderAnnounce { leader }) => { assert!(awaiting.remove(&index)); assert!(leader == my_leader); assert!(Some(index) != parent); // they wouldn't send me this if they considered me their parent } - inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)), + inappropriate_msg => { + log!(logger, "delaying msg {:?} during echo-reply phase", inappropriate_msg); + em.delayed_messages.push((index, inappropriate_msg)); + } } } children.sort();