diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs index bd6281a1ae2c776f19a4859f0958fbee6265004b..72524062d4f1c1b0b9470717abc0f9b7fae7a475 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup2.rs @@ -24,38 +24,23 @@ impl Connector { surplus_sockets: u16, ) -> Self { Self { - logger, proto_description, + proto_components: Default::default(), + logger, id_manager: IdManager::new(controller_id), native_ports: Default::default(), - proto_components: Default::default(), port_info: Default::default(), phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, } } - pub fn add_port_pair(&mut self) -> [PortId; 2] { - let route = Route::LocalComponent(LocalComponentId::Native); - let [o, i] = [self.id_manager.next_port(), self.id_manager.next_port()]; - self.native_ports.insert(o); - self.native_ports.insert(i); - // {polarity, peer, route} known. {} unknown. - self.port_info.polarities.insert(o, Putter); - self.port_info.polarities.insert(i, Getter); - self.port_info.peers.insert(o, i); - self.port_info.peers.insert(i, o); - self.port_info.routes.insert(o, route); - self.port_info.routes.insert(i, route); - log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); - [o, i] - } - pub fn add_net_port( + pub fn new_net_port( &mut self, polarity: Polarity, endpoint_setup: EndpointSetup, ) -> Result { match &mut self.phased { ConnectorPhased::Setup { endpoint_setups, .. } => { - let p = self.id_manager.next_port(); + let p = self.id_manager.new_port_id(); self.native_ports.insert(p); // {polarity, route} known. {peer} unknown. self.port_info.polarities.insert(p, polarity); @@ -72,7 +57,9 @@ impl Connector { identifier: &[u8], ports: &[PortId], ) -> Result<(), AddComponentError> { + // called by the USER. moves ports owned by the NATIVE use AddComponentError::*; + // 1. check if this is OK let polarities = self.proto_description.component_polarities(identifier)?; if polarities.len() != ports.len() { return Err(WrongNumberOfParamaters { expected: polarities.len() }); @@ -85,19 +72,21 @@ impl Connector { return Err(WrongPortPolarity { port: *port, expected_polarity }); } } - // ok! - let state = self.proto_description.new_main_component(identifier, ports); - let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state }; - let proto_component_index = self.proto_components.len(); - self.proto_components.push(proto_component); + // 3. remove ports from old component & update port->route + let new_id = self.id_manager.new_proto_component_id(); for port in ports.iter() { - if let Polarity::Getter = *self.port_info.polarities.get(port).unwrap() { - self.port_info.routes.insert( - *port, - Route::LocalComponent(LocalComponentId::Proto { index: proto_component_index }), - ); - } + self.port_info + .routes + .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); } + // 4. add new component + self.proto_components.insert( + new_id, + ProtoComponent { + state: self.proto_description.new_main_component(identifier, ports), + ports: ports.iter().copied().collect(), + }, + ); Ok(()) } pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> { @@ -110,10 +99,12 @@ impl Connector { log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout); let deadline = Instant::now() + timeout; // connect all endpoints in parallel; send and receive peer ids through ports - let mut endpoint_manager = { - let Self { logger, port_info, .. } = self; - new_endpoint_manager(&mut **logger, endpoint_setups, port_info, deadline)? - }; + let mut endpoint_manager = new_endpoint_manager( + &mut *self.logger, + endpoint_setups, + &mut self.port_info, + deadline, + )?; log!( self.logger, "Successfully connected {} endpoints", @@ -129,13 +120,12 @@ impl Connector { log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood); // TODO session optimization goes here self.phased = ConnectorPhased::Communication { + round_index: 0, endpoint_manager, neighborhood, mem_inbox: Default::default(), - native_actor: NativeActor::Nonsync { - sync_result_branch: None, - next_batches: vec![SyncBatch::default()], - }, + native_batches: vec![Default::default()], + round_result: Ok(None), }; Ok(()) } @@ -317,7 +307,7 @@ fn init_neighborhood( let mut my_leader = controller_id; em.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); match msg { S(LeaderAnnounce { leader }) => { @@ -331,7 +321,7 @@ fn init_neighborhood( S(LeaderEcho { maybe_leader }) => { use Ordering::*; match maybe_leader.cmp(&my_leader) { - Less => { /* ignore */ } + Less => { /* ignore this wave */ } Equal => { awaiting.remove(&index); if awaiting.is_empty() { @@ -339,7 +329,7 @@ fn init_neighborhood( // return the echo to my parent em.send_to(p, &S(LeaderEcho { maybe_leader }))?; } else { - // DECIDE! + // wave completed! break 'echo_loop; } } @@ -358,6 +348,7 @@ fn init_neighborhood( } else { for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() { if index2 == index { + // don't propagate echo to my parent continue; } log!(logger, "repeating echo {:?} to {:?}", &echo, index2); @@ -408,7 +399,7 @@ fn init_neighborhood( log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages); while !awaiting.is_empty() { log!(logger, "awaiting {:?}", &awaiting); - let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; match msg { S(YouAreMyParent) => { assert!(awaiting.remove(&index));