diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 901835c3ad34d5be048affc1bca121407f36a163..b5f63c8958d8bc0e79d923734538dbd57c1ba0f8 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -10,6 +10,11 @@ mod tests; use crate::common::*; use error::*; +#[derive(Debug)] +pub struct RoundOk { + batch_index: usize, + gotten: HashMap, +} #[derive(Debug)] pub struct VecSet { // invariant: ordered, deduplicated @@ -135,28 +140,31 @@ pub struct PortInfo { } #[derive(Debug)] pub struct Connector { + unphased: ConnectorUnphased, + phased: ConnectorPhased, +} +#[derive(Debug)] +pub struct ConnectorCommunication { + round_index: usize, + endpoint_manager: EndpointManager, + neighborhood: Neighborhood, + mem_inbox: Vec, + native_batches: Vec, + round_result: Result, SyncError>, +} +#[derive(Debug)] +pub struct ConnectorUnphased { proto_description: Arc, proto_components: HashMap, logger: Box, id_manager: IdManager, native_ports: HashSet, port_info: PortInfo, - phased: ConnectorPhased, } #[derive(Debug)] pub enum ConnectorPhased { - Setup { - endpoint_setups: Vec<(PortId, EndpointSetup)>, - surplus_sockets: u16, - }, - Communication { - round_index: usize, - endpoint_manager: EndpointManager, - neighborhood: Neighborhood, - mem_inbox: Vec, - native_batches: Vec, - round_result: Result)>, SyncError>, - }, + Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16 }, + Communication(ConnectorCommunication), } #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct Predicate { @@ -225,31 +233,32 @@ impl IdManager { } impl Drop for Connector { fn drop(&mut self) { - log!(&mut *self.logger, "Connector dropping. Goodbye!"); + log!(&mut *self.unphased.logger, "Connector dropping. Goodbye!"); } } impl Connector { pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { - std::mem::swap(&mut self.logger, &mut new_logger); + std::mem::swap(&mut self.unphased.logger, &mut new_logger); new_logger } pub fn get_logger(&mut self) -> &mut dyn Logger { - &mut *self.logger + &mut *self.unphased.logger } pub fn new_port_pair(&mut self) -> [PortId; 2] { + let cu = &mut self.unphased; // adds two new associated ports, related to each other, and exposed to the native - let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; - self.native_ports.insert(o); - self.native_ports.insert(i); + let [o, i] = [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()]; + cu.native_ports.insert(o); + cu.native_ports.insert(i); // {polarity, peer, route} known. {} unknown. - self.port_info.polarities.insert(o, Putter); - self.port_info.polarities.insert(i, Getter); - self.port_info.peers.insert(o, i); - self.port_info.peers.insert(i, o); + cu.port_info.polarities.insert(o, Putter); + cu.port_info.polarities.insert(i, Getter); + cu.port_info.peers.insert(o, i); + cu.port_info.peers.insert(i, o); let route = Route::LocalComponent(LocalComponentId::Native); - self.port_info.routes.insert(o, route); - self.port_info.routes.insert(i, route); - log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); + cu.port_info.routes.insert(o, route); + cu.port_info.routes.insert(i, route); + log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } pub fn add_component( @@ -260,31 +269,32 @@ impl Connector { // called by the USER. moves ports owned by the NATIVE use AddComponentError::*; // 1. check if this is OK - let polarities = self.proto_description.component_polarities(identifier)?; + let cu = &mut self.unphased; + let polarities = cu.proto_description.component_polarities(identifier)?; if polarities.len() != ports.len() { return Err(WrongNumberOfParamaters { expected: polarities.len() }); } for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { - if !self.native_ports.contains(port) { + if !cu.native_ports.contains(port) { return Err(UnknownPort(*port)); } - if expected_polarity != *self.port_info.polarities.get(port).unwrap() { + if expected_polarity != *cu.port_info.polarities.get(port).unwrap() { return Err(WrongPortPolarity { port: *port, expected_polarity }); } } // 3. remove ports from old component & update port->route - let new_id = self.id_manager.new_proto_component_id(); + let new_id = cu.id_manager.new_proto_component_id(); for port in ports.iter() { - self.port_info + cu.port_info .routes .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); } - self.native_ports.retain(|port| !ports.contains(port)); + cu.native_ports.retain(|port| !ports.contains(port)); // 4. add new component - self.proto_components.insert( + cu.proto_components.insert( new_id, ProtoComponent { - state: self.proto_description.new_main_component(identifier, ports), + state: cu.proto_description.new_main_component(identifier, ports), ports: ports.iter().copied().collect(), }, );