diff --git a/src/macros.rs b/src/macros.rs index ae727cd4f73bb7e2625a5aebd65d4ad43ac0d84e..a377585f4426a71b9c85d6181a502652a69e7891 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -17,8 +17,8 @@ macro_rules! log { // ignore }}; ($logger:expr, $($arg:tt)*) => {{ - // if let Some(w) = $logger.line_writer() { - // let _ = writeln!(w, $($arg)*); - // } + if let Some(w) = $logger.line_writer() { + let _ = writeln!(w, $($arg)*); + } }}; } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 22834bbf2cc17c68fe44c83e4de4b5ccd0f0a8c2..3f693584689850f084c091987960dc22a891a1b5 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -37,7 +37,6 @@ struct SolutionStorage { } #[derive(Debug)] struct BranchingProtoComponent { - ports: HashSet, branches: HashMap, } #[derive(Debug, Clone)] @@ -135,13 +134,12 @@ impl Connector { ) -> Result<&mut NativeBatch, PortOpError> { use PortOpError as Poe; let Self { unphased: cu, phased } = self; - if !cu.inner.native_ports.contains(&port) { + let info = cu.inner.current_state.port_info.get(&port).ok_or(Poe::UnknownPolarity)?; + if info.owner != cu.inner.native_component_id { return Err(Poe::PortUnavailable); } - match cu.inner.port_info.polarities.get(&port) { - Some(p) if *p == expect_polarity => {} - Some(_) => return Err(Poe::WrongPolarity), - None => return Err(Poe::UnknownPolarity), + if info.polarity != expect_polarity { + return Err(Poe::WrongPolarity); } match phased { ConnectorPhased::Setup { .. } => Err(Poe::NotConnected), @@ -217,7 +215,7 @@ impl Connector { // iterate let mut branching_proto_components = HashMap::::default(); - let mut unrun_components: Vec<(ComponentId, ProtoComponent)> = cu + let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu .proto_components .iter() .map(|(&proto_id, proto)| (proto_id, proto.clone())) @@ -235,13 +233,8 @@ impl Connector { cu_inner: &mut cu.inner, proto_component_id, unrun_components: &mut unrun_components, - proto_component_ports: &mut cu - .proto_components - .get_mut(&proto_component_id) - .unwrap() // unrun_components' keys originate from proto_components - .ports, }; - let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description); + let blocker = component.nonsync_run(&mut ctx, &cu.proto_description); log!( cu.inner.logger, "proto component {:?} ran to nonsync blocker {:?}", @@ -283,7 +276,7 @@ impl Connector { ); SolutionStorage::new(subtree_id_iter) }, - spec_var_stream: cu.inner.id_manager.new_spec_var_stream(), + spec_var_stream: cu.inner.current_state.id_manager.new_spec_var_stream(), getter_buffer: Default::default(), deadline: timeout.map(|to| Instant::now() + to), }; @@ -305,16 +298,30 @@ impl Connector { let NativeBatch { to_get, to_put } = native_branch; let predicate = { let mut predicate = Predicate::default(); - // assign trues for ports that fire - let firing_ports: HashSet = - to_get.iter().chain(to_put.keys()).copied().collect(); - for &port in to_get.iter().chain(to_put.keys()) { - let var = cu.inner.port_info.spec_var_for(port); + // all firing ports have SpecVal::FIRING + let firing_iter = to_get.iter().chain(to_put.keys()).copied(); + + log!( + cu.inner.logger, + "New native with firing ports {:?}", + firing_iter.clone().collect::>() + ); + let firing_ports: HashSet = firing_iter.clone().collect(); + for port in firing_iter { + let var = cu.inner.current_state.spec_var_for(port); predicate.assigned.insert(var, SpecVal::FIRING); } - // assign falses for all silent (not firing) ports - for &port in cu.inner.native_ports.difference(&firing_ports) { - let var = cu.inner.port_info.spec_var_for(port); + // all silent ports have SpecVal::SILENT + for (port, port_info) in cu.inner.current_state.port_info.iter() { + if port_info.owner != cu.inner.native_component_id { + // not my port + continue; + } + if firing_ports.contains(port) { + // this one is FIRING + continue; + } + let var = cu.inner.current_state.spec_var_for(*port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); continue 'native_branches; @@ -339,6 +346,8 @@ impl Connector { &msg, putter ); + // sanity check + assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity); rctx.getter_buffer.putter_add(cu, putter, msg); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; @@ -458,20 +467,14 @@ impl Connector { branching_proto_components.len() ); for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() { - let BranchingProtoComponent { ports, branches } = proto_component; + let BranchingProtoComponent { branches } = proto_component; // must reborrow to constrain the lifetime of pcb_temps to inside the loop let (swap, pcb_temps) = pcb_temps.reborrow().split_first_mut(); let (blocked, _pcb_temps) = pcb_temps.split_first_mut(); // initially, no components have .ended==true // drain from branches --> blocked let cd = CyclicDrainer::new(branches, swap.0, blocked.0); - BranchingProtoComponent::drain_branches_to_blocked( - cd, - cu, - rctx, - proto_component_id, - ports, - )?; + BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; // swap the blocked branches back std::mem::swap(blocked.0, branches); if branches.is_empty() { @@ -500,34 +503,34 @@ impl Connector { rctx.getter_buffer.len() ); while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() { - log!(@MARK, cu.inner.logger, "handling payload msg"); - assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); - let route = cu.inner.port_info.routes.get(&getter); + log!(@MARK, cu.inner.logger, "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg); + let getter_info = cu.inner.current_state.port_info.get(&getter).unwrap(); + let cid = getter_info.owner; + assert_eq!(Getter, getter_info.polarity); log!( cu.inner.logger, "Routing msg {:?} to {:?} via {:?}", &send_payload_msg, getter, - &route + &getter_info.route ); - match route { - None => log!(cu.inner.logger, "Delivery failed. Physical route unmapped!"), - Some(Route::UdpEndpoint { index }) => { + match getter_info.route { + Route::UdpEndpoint { index } => { let udp_endpoint_ext = - &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[*index]; + &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[index]; let SendPayloadMsg { predicate, payload } = send_payload_msg; log!(cu.inner.logger, "Delivering to udp endpoint index={}", index); udp_endpoint_ext.outgoing_payloads.insert(predicate, payload); } - Some(Route::NetEndpoint { index }) => { + Route::NetEndpoint { index } => { log!(@MARK, cu.inner.logger, "sending payload"); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, contents: CommMsgContents::SendPayload(send_payload_msg), }); - comm.endpoint_manager.send_to_comms(*index, &msg)?; + comm.endpoint_manager.send_to_comms(index, &msg)?; } - Some(Route::LocalComponent(cid)) if *cid == cu.inner.native_component_id => { + Route::LocalComponent if cid == cu.inner.native_component_id => { branching_native.feed_msg( cu, rctx, @@ -536,9 +539,9 @@ impl Connector { MapTempGuard::new(&mut bn_temp_owner), ) } - Some(Route::LocalComponent(cid)) => { - if let Some(branching_component) = branching_proto_components.get_mut(cid) { - let cid = *cid; + Route::LocalComponent => { + if let Some(branching_component) = branching_proto_components.get_mut(&cid) + { branching_component.feed_msg( cu, rctx, @@ -566,7 +569,7 @@ impl Connector { "Delivery to getter {:?} msg {:?} failed because {:?} isn't here", getter, &send_payload_msg, - proto_component_id + cid ); } } @@ -605,7 +608,7 @@ impl Connector { .endpoint_manager .try_recv_any_comms( &mut *cu.inner.logger, - &cu.inner.port_info, + &cu.inner.current_state, rctx, comm.round_index, )? { @@ -725,14 +728,14 @@ impl BranchingNative { bn_temp: MapTempGuard<'_, Predicate, NativeBranch>, ) { log!(cu.inner.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); - assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); + assert_eq!(Getter, cu.inner.current_state.port_info.get(&getter).unwrap().polarity); let mut draining = bn_temp; let finished = &mut self.branches; std::mem::swap(draining.0, finished); for (predicate, mut branch) in draining.drain() { log!(cu.inner.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); // check if this branch expects to receive it - let var = cu.inner.port_info.spec_var_for(getter); + let var = cu.inner.current_state.spec_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { branch.to_get.remove(&getter); let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); @@ -871,7 +874,6 @@ impl BranchingProtoComponent { cu: &mut ConnectorUnphased, rctx: &mut RoundCtx, proto_component_id: ComponentId, - ports: &HashSet, ) -> Result<(), UnrecoverableSyncError> { cd.cyclic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { @@ -906,7 +908,7 @@ impl BranchingProtoComponent { } B::CouldntCheckFiring(port) => { // sanity check - let var = cu.inner.port_info.spec_var_for(port); + let var = cu.inner.current_state.spec_var_for(port); assert!(predicate.query(var).is_none()); // keep forks in "unblocked" drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); @@ -914,9 +916,9 @@ impl BranchingProtoComponent { } B::PutMsg(putter, payload) => { // sanity check - assert_eq!(Some(&Putter), cu.inner.port_info.polarities.get(&putter)); + assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity); // overwrite assignment - let var = cu.inner.port_info.spec_var_for(putter); + let var = cu.inner.current_state.spec_var_for(putter); let was = predicate.assigned.insert(var, SpecVal::FIRING); if was == Some(SpecVal::SILENT) { log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", @@ -935,13 +937,16 @@ impl BranchingProtoComponent { } B::SyncBlockEnd => { // make concrete all variables - for port in ports.iter() { - let var = cu.inner.port_info.spec_var_for(*port); + for (port, port_info) in cu.inner.current_state.port_info.iter() { + if port_info.owner != proto_component_id { + continue; + } + let var = cu.inner.current_state.spec_var_for(*port); let should_have_fired = branch.inner.did_put_or_get.contains(port); let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); let did_fire = val == SpecVal::FIRING; if did_fire != should_have_fired { - log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}" + log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); // IMPLICIT inconsistency drop((predicate, branch)); @@ -963,16 +968,6 @@ impl BranchingProtoComponent { Ok(()) }) } - // fn branch_merge_func( - // mut a: ProtoComponentBranch, - // b: &mut ProtoComponentBranch, - // ) -> ProtoComponentBranch { - // if b.ended && !a.ended { - // a.ended = true; - // std::mem::swap(&mut a, b); - // } - // a - // } fn feed_msg( &mut self, cu: &mut ConnectorUnphased, @@ -990,7 +985,7 @@ impl BranchingProtoComponent { getter, &send_payload_msg ); - let BranchingProtoComponent { branches, ports } = self; + let BranchingProtoComponent { branches } = self; let (mut unblocked, pcb_temps) = pcb_temps.split_first_mut(); let (mut blocked, pcb_temps) = pcb_temps.split_first_mut(); // partition drain from branches -> {unblocked, blocked} @@ -1038,13 +1033,7 @@ impl BranchingProtoComponent { // drain from unblocked --> blocked let (swap, _pcb_temps) = pcb_temps.split_first_mut(); let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0); - BranchingProtoComponent::drain_branches_to_blocked( - cd, - cu, - rctx, - proto_component_id, - ports, - )?; + BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; // swap the blocked branches back std::mem::swap(blocked.0, branches); log!(cu.inner.logger, "component settles down with branches: {:?}", branches.keys()); @@ -1073,19 +1062,19 @@ impl BranchingProtoComponent { } } } - fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { - let BranchingProtoComponent { ports, branches } = self; + fn collapse_with(self, solution_predicate: &Predicate) -> ComponentState { + let BranchingProtoComponent { branches } = self; for (branch_predicate, branch) in branches { if branch.ended && branch_predicate.assigns_subset(solution_predicate) { let ProtoComponentBranch { state, .. } = branch; - return ProtoComponent { state, ports }; + return state; } } panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); } - fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { + fn initial(state: ComponentState) -> Self { let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false }; - Self { ports, branches: hashmap! { Predicate::default() => branch } } + Self { branches: hashmap! { Predicate::default() => branch } } } } impl SolutionStorage { @@ -1197,7 +1186,8 @@ impl GetterBuffer { self.getters_and_sends.push((getter, msg)); } fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) { - if let Some(&getter) = cu.inner.port_info.peers.get(&putter) { + if let Some(getter) = cu.inner.current_state.port_info.get(&putter).unwrap().peer { + log!(cu.inner.logger, "Putter add (putter:{:?} => getter:{:?})", putter, getter); self.getter_add(getter, msg); } else { log!(cu.inner.logger, "Putter {:?} has no known peer!", putter); @@ -1207,7 +1197,7 @@ impl GetterBuffer { } impl SyncProtoContext<'_> { pub(crate) fn is_firing(&mut self, port: PortId) -> Option { - let var = self.cu_inner.port_info.spec_var_for(port); + let var = self.cu_inner.current_state.spec_var_for(port); self.predicate.query(var).map(SpecVal::is_firing) } pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { @@ -1237,31 +1227,45 @@ impl NonsyncProtoContext<'_> { &state, &moved_ports ); - println!("MOVED PORTS {:#?}. got {:#?}", &moved_ports, &self.proto_component_ports); - assert!(self.proto_component_ports.is_superset(&moved_ports)); - // 2. remove ports from old component & update port->route - let new_id = self.cu_inner.id_manager.new_component_id(); + println!("MOVED PORTS {:#?}", &moved_ports); + // sanity check + for port in moved_ports.iter() { + assert_eq!( + self.proto_component_id, + self.cu_inner.current_state.port_info.get(port).unwrap().owner + ); + } + // 2. create new component + let new_cid = self.cu_inner.current_state.id_manager.new_component_id(); + self.unrun_components.push((new_cid, state)); + // 3. update ownership of moved ports for port in moved_ports.iter() { - self.proto_component_ports.remove(port); - self.cu_inner.port_info.routes.insert(*port, Route::LocalComponent(new_id)); + self.cu_inner.current_state.port_info.get_mut(port).unwrap().owner = new_cid; } // 3. create a new component - self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); } pub fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the proto component - let [o, i] = - [self.cu_inner.id_manager.new_port_id(), self.cu_inner.id_manager.new_port_id()]; - self.proto_component_ports.insert(o); - self.proto_component_ports.insert(i); - // {polarity, peer, route} known. {} unknown. - self.cu_inner.port_info.polarities.insert(o, Putter); - self.cu_inner.port_info.polarities.insert(i, Getter); - self.cu_inner.port_info.peers.insert(o, i); - self.cu_inner.port_info.peers.insert(i, o); - let route = Route::LocalComponent(self.proto_component_id); - self.cu_inner.port_info.routes.insert(o, route); - self.cu_inner.port_info.routes.insert(i, route); + let mut new_cid_fn = || self.cu_inner.current_state.id_manager.new_port_id(); + let [o, i] = [new_cid_fn(), new_cid_fn()]; + self.cu_inner.current_state.port_info.insert( + o, + PortInfo { + route: Route::LocalComponent, + peer: Some(i), + polarity: Putter, + owner: self.proto_component_id, + }, + ); + self.cu_inner.current_state.port_info.insert( + i, + PortInfo { + route: Route::LocalComponent, + peer: Some(o), + polarity: Getter, + owner: self.proto_component_id, + }, + ); log!( self.cu_inner.logger, "Component {:?} port pair (out->in) {:?} -> {:?}", diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 661b55a5a5dd05e9992fd89087e29bb40cf879d3..054123fb03ba031c5ad1b8541323b83139e89c29 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -152,7 +152,7 @@ impl EndpointManager { pub(super) fn try_recv_any_comms( &mut self, logger: &mut dyn Logger, - port_info: &PortInfo, + current_state: &CurrentState, round_ctx: &mut impl RoundCtxTrait, round_index: usize, ) -> Result { @@ -243,7 +243,7 @@ impl EndpointManager { self.udp_endpoint_store.polled_undrained.insert(index); if !ee.received_this_round { let payload = Payload::from(&recv_buffer[..bytes_written]); - let port_spec_var = port_info.spec_var_for(ee.getter_for_incoming); + let port_spec_var = current_state.spec_var_for(ee.getter_for_incoming); let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); round_ctx.getter_add( ee.getter_for_incoming, diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index b089a69e4a8f084681e49dbd1438dc383614e707..4dfb9af6d3a5b20a47842fefa838eac1b1f8679d 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -29,10 +29,14 @@ pub struct VecLogger(ConnectorId, Vec); pub struct DummyLogger; #[derive(Debug)] pub struct FileLogger(ConnectorId, std::fs::File); +#[derive(Debug)] +struct CurrentState { + port_info: HashMap, + id_manager: IdManager, +} pub(crate) struct NonsyncProtoContext<'a> { cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds - proto_component_ports: &'a mut HashSet, // sub-structure of component - unrun_components: &'a mut Vec<(ComponentId, ProtoComponent)>, // lives for Nonsync phase + unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase proto_component_id: ComponentId, // KEY in id->component map } pub(crate) struct SyncProtoContext<'a> { @@ -66,7 +70,7 @@ struct VecSet { } #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] enum Route { - LocalComponent(ComponentId), + LocalComponent, NetEndpoint { index: usize }, UdpEndpoint { index: usize }, } @@ -79,6 +83,7 @@ enum SubtreeId { struct MyPortInfo { polarity: Polarity, port: PortId, + owner: ComponentId, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] enum Decision { @@ -102,9 +107,9 @@ enum SetupMsg { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct SessionInfo { serde_proto_description: SerdeProtocolDescription, - port_info: PortInfo, + port_info: HashMap, endpoint_incoming_to_getter: Vec, - proto_components: HashMap, + proto_components: HashMap, } #[derive(Debug, Clone)] struct SerdeProtocolDescription(Arc); @@ -140,11 +145,6 @@ struct NetEndpoint { inbox: Vec, stream: TcpStream, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -struct ProtoComponent { - state: ComponentState, - ports: HashSet, -} #[derive(Debug, Clone)] struct NetEndpointSetup { getter_for_incoming: PortId, @@ -207,13 +207,14 @@ struct EndpointStore { endpoint_exts: Vec, polled_undrained: VecSet, } -#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct PortInfo { - owners: HashMap, - polarities: HashMap, - peers: HashMap, - routes: HashMap, + owner: ComponentId, + peer: Option, + polarity: Polarity, + route: Route, } + #[derive(Debug)] struct ConnectorCommunication { round_index: usize, @@ -225,15 +226,13 @@ struct ConnectorCommunication { #[derive(Debug)] struct ConnectorUnphased { proto_description: Arc, - proto_components: HashMap, + proto_components: HashMap, inner: ConnectorUnphasedInner, } #[derive(Debug)] struct ConnectorUnphasedInner { logger: Box, - id_manager: IdManager, - native_ports: HashSet, - port_info: PortInfo, + current_state: CurrentState, native_component_id: ComponentId, } #[derive(Debug)] @@ -325,11 +324,12 @@ impl VecSet { self.vec.pop() } } -impl PortInfo { +impl CurrentState { fn spec_var_for(&self, port: PortId) -> SpecVar { - SpecVar(match self.polarities.get(&port).unwrap() { + let info = self.port_info.get(&port).unwrap(); + SpecVar(match info.polarity { Getter => port, - Putter => *self.peers.get(&port).unwrap(), + Putter => info.peer.unwrap(), }) } } @@ -403,17 +403,26 @@ impl Connector { pub fn new_port_pair(&mut self) -> [PortId; 2] { let cu = &mut self.unphased; // adds two new associated ports, related to each other, and exposed to the native - let [o, i] = [cu.inner.id_manager.new_port_id(), cu.inner.id_manager.new_port_id()]; - cu.inner.native_ports.insert(o); - cu.inner.native_ports.insert(i); - // {polarity, peer, route} known. {} unknown. - cu.inner.port_info.polarities.insert(o, Putter); - cu.inner.port_info.polarities.insert(i, Getter); - cu.inner.port_info.peers.insert(o, i); - cu.inner.port_info.peers.insert(i, o); - let route = Route::LocalComponent(cu.inner.native_component_id); - cu.inner.port_info.routes.insert(o, route); - cu.inner.port_info.routes.insert(i, route); + let mut new_cid = || cu.inner.current_state.id_manager.new_port_id(); + let [o, i] = [new_cid(), new_cid()]; + cu.inner.current_state.port_info.insert( + o, + PortInfo { + route: Route::LocalComponent, + peer: Some(i), + owner: cu.inner.native_component_id, + polarity: Putter, + }, + ); + cu.inner.current_state.port_info.insert( + i, + PortInfo { + route: Route::LocalComponent, + peer: Some(o), + owner: cu.inner.native_component_id, + polarity: Getter, + }, + ); log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } @@ -426,32 +435,30 @@ impl Connector { use AddComponentError as Ace; // 1. check if this is OK let cu = &mut self.unphased; - let polarities = cu.proto_description.component_polarities(identifier)?; - if polarities.len() != ports.len() { - return Err(Ace::WrongNumberOfParamaters { expected: polarities.len() }); + let expected_polarities = cu.proto_description.component_polarities(identifier)?; + if expected_polarities.len() != ports.len() { + return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() }); } - for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { - if !cu.inner.native_ports.contains(port) { - return Err(Ace::UnknownPort(*port)); + for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) { + let info = cu.inner.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?; + if info.owner != cu.inner.native_component_id { + return Err(Ace::UnknownPort(port)); } - if expected_polarity != *cu.inner.port_info.polarities.get(port).unwrap() { - return Err(Ace::WrongPortPolarity { port: *port, expected_polarity }); + if info.polarity != expected_polarity { + return Err(Ace::WrongPortPolarity { port, expected_polarity }); } } - // 3. remove ports from old component & update port->route - let new_cid = cu.inner.id_manager.new_component_id(); + // 2. add new component + let new_cid = cu.inner.current_state.id_manager.new_component_id(); + cu.proto_components + .insert(new_cid, cu.proto_description.new_main_component(identifier, ports)); + // 3. update port ownership for port in ports.iter() { - cu.inner.port_info.routes.insert(*port, Route::LocalComponent(new_cid)); + match cu.inner.current_state.port_info.get_mut(port) { + Some(port_info) => port_info.owner = new_cid, + None => unreachable!(), + } } - cu.inner.native_ports.retain(|port| !ports.contains(port)); - // 4. add new component - cu.proto_components.insert( - new_cid, - ProtoComponent { - state: cu.proto_description.new_main_component(identifier, ports), - ports: ports.iter().copied().collect(), - }, - ); Ok(()) } } diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index ed9962ed069473e98fe41613a225d9da56c1b91b..c87129d90ebd6abd5ff12099db7087bad6580003 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -9,16 +9,15 @@ impl Connector { ) -> Self { log!(&mut *logger, "Created with connector_id {:?}", connector_id); let mut id_manager = IdManager::new(connector_id); + let native_component_id = id_manager.new_component_id(); Self { unphased: ConnectorUnphased { proto_description, proto_components: Default::default(), inner: ConnectorUnphasedInner { logger, - native_component_id: id_manager.new_component_id(), - id_manager, - native_ports: Default::default(), - port_info: Default::default(), + native_component_id, + current_state: CurrentState { id_manager, port_info: Default::default() }, }, }, phased: ConnectorPhased::Setup(Box::new(ConnectorSetup { @@ -42,28 +41,46 @@ impl Connector { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { let udp_index = setup.udp_endpoint_setups.len(); - let mut npid = || cu.inner.id_manager.new_port_id(); + let udp_cid = cu.inner.current_state.id_manager.new_component_id(); + let mut npid = || cu.inner.current_state.id_manager.new_port_id(); let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()]; - cu.inner.native_ports.insert(nin); - cu.inner.native_ports.insert(nout); - cu.inner.port_info.polarities.insert(nin, Getter); - cu.inner.port_info.polarities.insert(nout, Putter); - cu.inner.port_info.polarities.insert(uin, Getter); - cu.inner.port_info.polarities.insert(uout, Putter); - cu.inner.port_info.peers.insert(nin, uout); - cu.inner.port_info.peers.insert(nout, uin); - cu.inner.port_info.peers.insert(uin, nout); - cu.inner.port_info.peers.insert(uout, nin); - cu.inner - .port_info - .routes - .insert(nin, Route::LocalComponent(cu.inner.native_component_id)); - cu.inner - .port_info - .routes - .insert(nout, Route::LocalComponent(cu.inner.native_component_id)); - cu.inner.port_info.routes.insert(uin, Route::UdpEndpoint { index: udp_index }); - cu.inner.port_info.routes.insert(uout, Route::UdpEndpoint { index: udp_index }); + + cu.inner.current_state.port_info.insert( + nin, + PortInfo { + route: Route::LocalComponent, + polarity: Getter, + peer: Some(uout), + owner: cu.inner.native_component_id, + }, + ); + cu.inner.current_state.port_info.insert( + nout, + PortInfo { + route: Route::LocalComponent, + polarity: Putter, + peer: Some(uin), + owner: cu.inner.native_component_id, + }, + ); + cu.inner.current_state.port_info.insert( + uin, + PortInfo { + route: Route::UdpEndpoint { index: udp_index }, + polarity: Getter, + peer: Some(uin), + owner: udp_cid, + }, + ); + cu.inner.current_state.port_info.insert( + uout, + PortInfo { + route: Route::UdpEndpoint { index: udp_index }, + polarity: Putter, + peer: Some(uin), + owner: udp_cid, + }, + ); setup.udp_endpoint_setups.push(UdpEndpointSetup { local_addr, peer_addr, @@ -83,18 +100,20 @@ impl Connector { match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { - let local_port = cu.inner.id_manager.new_port_id(); - cu.inner.native_ports.insert(local_port); - // {polarity, route} known. {peer} unknown. - cu.inner.port_info.polarities.insert(local_port, polarity); - cu.inner - .port_info - .routes - .insert(local_port, Route::LocalComponent(cu.inner.native_component_id)); + let new_pid = cu.inner.current_state.id_manager.new_port_id(); + cu.inner.current_state.port_info.insert( + new_pid, + PortInfo { + route: Route::LocalComponent, + peer: None, + owner: cu.inner.native_component_id, + polarity, + }, + ); log!( cu.inner.logger, "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}", - local_port, + new_pid, polarity, &sock_addr, endpoint_polarity @@ -102,9 +121,9 @@ impl Connector { setup.net_endpoint_setups.push(NetEndpointSetup { sock_addr, endpoint_polarity, - getter_for_incoming: local_port, + getter_for_incoming: new_pid, }); - Ok(local_port) + Ok(new_pid) } } } @@ -124,17 +143,19 @@ impl Connector { &mut *cu.inner.logger, &setup.net_endpoint_setups, &setup.udp_endpoint_setups, - &mut cu.inner.port_info, + &mut cu.inner.current_state.port_info, &deadline, )?; log!( cu.inner.logger, - "Successfully connected {} endpoints", - endpoint_manager.net_endpoint_store.endpoint_exts.len() + "Successfully connected {} endpoints. info now {:#?} {:#?}", + endpoint_manager.net_endpoint_store.endpoint_exts.len(), + &cu.inner.current_state.port_info, + &endpoint_manager, ); // leader election and tree construction let neighborhood = init_neighborhood( - cu.inner.id_manager.connector_id, + cu.inner.current_state.id_manager.connector_id, &mut *cu.inner.logger, &mut endpoint_manager, &deadline, @@ -151,7 +172,7 @@ impl Connector { session_optimize(cu, &mut comm, &deadline)?; } log!(cu.inner.logger, "connect() finished. setup phase complete"); - self.phased = ConnectorPhased::Communication(Box::new(comm)); + *phased = ConnectorPhased::Communication(Box::new(comm)); Ok(()) } } @@ -161,7 +182,7 @@ fn new_endpoint_manager( logger: &mut dyn Logger, net_endpoint_setups: &[NetEndpointSetup], udp_endpoint_setups: &[UdpEndpointSetup], - port_info: &mut PortInfo, + port_info: &mut HashMap, deadline: &Option, ) -> Result { //////////////////////////////////////////// @@ -393,14 +414,14 @@ fn new_endpoint_manager( } continue; } - let local_polarity = *port_info - .polarities - .get(&net_todo.endpoint_setup.getter_for_incoming) + let local_info = port_info + .get_mut(&net_todo.endpoint_setup.getter_for_incoming) .unwrap(); if event.is_writable() && !net_todo.sent_local_port { // can write and didn't send setup msg yet? Do so! let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { - polarity: local_polarity, + owner: local_info.owner, + polarity: local_info.polarity, port: net_todo.endpoint_setup.getter_for_incoming, })); net_endpoint @@ -435,36 +456,21 @@ fn new_endpoint_manager( index, peer_info ); - if peer_info.polarity == local_polarity { + if peer_info.polarity == local_info.polarity { return Err(ConnectError::PortPeerPolarityMismatch( net_todo.endpoint_setup.getter_for_incoming, )); } net_todo.recv_peer_port = Some(peer_info.port); // 1. finally learned the peer of this port! - port_info.peers.insert( - net_todo.endpoint_setup.getter_for_incoming, - peer_info.port, - ); + local_info.peer = Some(peer_info.port); // 2. learned the info of this peer port - port_info.polarities.insert(peer_info.port, peer_info.polarity); - port_info.peers.insert( - peer_info.port, - net_todo.endpoint_setup.getter_for_incoming, - ); - if let Some(route) = port_info.routes.get(&peer_info.port) { - // check just for logging purposes - log!( - logger, - "Special case! Route to peer {:?} already known to be {:?}. Leave untouched", - peer_info.port, - route - ); - } - port_info - .routes - .entry(peer_info.port) - .or_insert(Route::NetEndpoint { index }); + port_info.entry(peer_info.port).or_insert(PortInfo { + peer: Some(net_todo.endpoint_setup.getter_for_incoming), + polarity: peer_info.polarity, + owner: peer_info.owner, + route: Route::NetEndpoint { index }, + }); } Some(inappropriate_msg) => { log!( @@ -805,7 +811,7 @@ fn session_optimize( unoptimized_map.keys() ); let my_session_info = SessionInfo { - port_info: cu.inner.port_info.clone(), + port_info: cu.inner.current_state.port_info.clone(), proto_components: cu.proto_components.clone(), serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), endpoint_incoming_to_getter: comm @@ -816,7 +822,7 @@ fn session_optimize( .map(|ee| ee.getter_for_incoming) .collect(), }; - unoptimized_map.insert(cu.inner.id_manager.connector_id, my_session_info); + unoptimized_map.insert(cu.inner.current_state.id_manager.connector_id, my_session_info); log!( cu.inner.logger, "Inserting my own info. Unoptimized subtree map is {:?}", @@ -871,8 +877,10 @@ fn session_optimize( comm.neighborhood.children.iter() ); log!(cu.inner.logger, "All session info dumped!: {:#?}", &optimized_map); - let optimized_info = - optimized_map.get(&cu.inner.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); + let optimized_info = optimized_map + .get(&cu.inner.current_state.id_manager.connector_id) + .expect("HEY NO INFO FOR ME?") + .clone(); let msg = S(Sm::SessionScatter { optimized_map }); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_setup(child, &msg)?; @@ -901,7 +909,7 @@ fn apply_optimizations( endpoint_incoming_to_getter, } = session_info; // TODO some info which should be read-only can be mutated with the current scheme - cu.inner.port_info = port_info; + cu.inner.current_state.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; for (ee, getter) in comm