diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index b089a69e4a8f084681e49dbd1438dc383614e707..4dfb9af6d3a5b20a47842fefa838eac1b1f8679d 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -29,10 +29,14 @@ pub struct VecLogger(ConnectorId, Vec); pub struct DummyLogger; #[derive(Debug)] pub struct FileLogger(ConnectorId, std::fs::File); +#[derive(Debug)] +struct CurrentState { + port_info: HashMap, + id_manager: IdManager, +} pub(crate) struct NonsyncProtoContext<'a> { cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds - proto_component_ports: &'a mut HashSet, // sub-structure of component - unrun_components: &'a mut Vec<(ComponentId, ProtoComponent)>, // lives for Nonsync phase + unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase proto_component_id: ComponentId, // KEY in id->component map } pub(crate) struct SyncProtoContext<'a> { @@ -66,7 +70,7 @@ struct VecSet { } #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] enum Route { - LocalComponent(ComponentId), + LocalComponent, NetEndpoint { index: usize }, UdpEndpoint { index: usize }, } @@ -79,6 +83,7 @@ enum SubtreeId { struct MyPortInfo { polarity: Polarity, port: PortId, + owner: ComponentId, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] enum Decision { @@ -102,9 +107,9 @@ enum SetupMsg { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct SessionInfo { serde_proto_description: SerdeProtocolDescription, - port_info: PortInfo, + port_info: HashMap, endpoint_incoming_to_getter: Vec, - proto_components: HashMap, + proto_components: HashMap, } #[derive(Debug, Clone)] struct SerdeProtocolDescription(Arc); @@ -140,11 +145,6 @@ struct NetEndpoint { inbox: Vec, stream: TcpStream, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -struct ProtoComponent { - state: ComponentState, - ports: HashSet, -} #[derive(Debug, Clone)] struct NetEndpointSetup { getter_for_incoming: PortId, @@ -207,13 +207,14 @@ struct EndpointStore { endpoint_exts: Vec, polled_undrained: VecSet, } -#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct PortInfo { - owners: HashMap, - polarities: HashMap, - peers: HashMap, - routes: HashMap, + owner: ComponentId, + peer: Option, + polarity: Polarity, + route: Route, } + #[derive(Debug)] struct ConnectorCommunication { round_index: usize, @@ -225,15 +226,13 @@ struct ConnectorCommunication { #[derive(Debug)] struct ConnectorUnphased { proto_description: Arc, - proto_components: HashMap, + proto_components: HashMap, inner: ConnectorUnphasedInner, } #[derive(Debug)] struct ConnectorUnphasedInner { logger: Box, - id_manager: IdManager, - native_ports: HashSet, - port_info: PortInfo, + current_state: CurrentState, native_component_id: ComponentId, } #[derive(Debug)] @@ -325,11 +324,12 @@ impl VecSet { self.vec.pop() } } -impl PortInfo { +impl CurrentState { fn spec_var_for(&self, port: PortId) -> SpecVar { - SpecVar(match self.polarities.get(&port).unwrap() { + let info = self.port_info.get(&port).unwrap(); + SpecVar(match info.polarity { Getter => port, - Putter => *self.peers.get(&port).unwrap(), + Putter => info.peer.unwrap(), }) } } @@ -403,17 +403,26 @@ impl Connector { pub fn new_port_pair(&mut self) -> [PortId; 2] { let cu = &mut self.unphased; // adds two new associated ports, related to each other, and exposed to the native - let [o, i] = [cu.inner.id_manager.new_port_id(), cu.inner.id_manager.new_port_id()]; - cu.inner.native_ports.insert(o); - cu.inner.native_ports.insert(i); - // {polarity, peer, route} known. {} unknown. - cu.inner.port_info.polarities.insert(o, Putter); - cu.inner.port_info.polarities.insert(i, Getter); - cu.inner.port_info.peers.insert(o, i); - cu.inner.port_info.peers.insert(i, o); - let route = Route::LocalComponent(cu.inner.native_component_id); - cu.inner.port_info.routes.insert(o, route); - cu.inner.port_info.routes.insert(i, route); + let mut new_cid = || cu.inner.current_state.id_manager.new_port_id(); + let [o, i] = [new_cid(), new_cid()]; + cu.inner.current_state.port_info.insert( + o, + PortInfo { + route: Route::LocalComponent, + peer: Some(i), + owner: cu.inner.native_component_id, + polarity: Putter, + }, + ); + cu.inner.current_state.port_info.insert( + i, + PortInfo { + route: Route::LocalComponent, + peer: Some(o), + owner: cu.inner.native_component_id, + polarity: Getter, + }, + ); log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } @@ -426,32 +435,30 @@ impl Connector { use AddComponentError as Ace; // 1. check if this is OK let cu = &mut self.unphased; - let polarities = cu.proto_description.component_polarities(identifier)?; - if polarities.len() != ports.len() { - return Err(Ace::WrongNumberOfParamaters { expected: polarities.len() }); + let expected_polarities = cu.proto_description.component_polarities(identifier)?; + if expected_polarities.len() != ports.len() { + return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() }); } - for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { - if !cu.inner.native_ports.contains(port) { - return Err(Ace::UnknownPort(*port)); + for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) { + let info = cu.inner.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?; + if info.owner != cu.inner.native_component_id { + return Err(Ace::UnknownPort(port)); } - if expected_polarity != *cu.inner.port_info.polarities.get(port).unwrap() { - return Err(Ace::WrongPortPolarity { port: *port, expected_polarity }); + if info.polarity != expected_polarity { + return Err(Ace::WrongPortPolarity { port, expected_polarity }); } } - // 3. remove ports from old component & update port->route - let new_cid = cu.inner.id_manager.new_component_id(); + // 2. add new component + let new_cid = cu.inner.current_state.id_manager.new_component_id(); + cu.proto_components + .insert(new_cid, cu.proto_description.new_main_component(identifier, ports)); + // 3. update port ownership for port in ports.iter() { - cu.inner.port_info.routes.insert(*port, Route::LocalComponent(new_cid)); + match cu.inner.current_state.port_info.get_mut(port) { + Some(port_info) => port_info.owner = new_cid, + None => unreachable!(), + } } - cu.inner.native_ports.retain(|port| !ports.contains(port)); - // 4. add new component - cu.proto_components.insert( - new_cid, - ProtoComponent { - state: cu.proto_description.new_main_component(identifier, ports), - ports: ports.iter().copied().collect(), - }, - ); Ok(()) } }