From adb23c48418269be0445d9cb9725c3303bc2125e 2020-02-21 13:00:16 From: Christopher Esterhuyse Date: 2020-02-21 13:00:16 Subject: [PATCH] component and port storages on 2 axes --- diff --git a/src/common.rs b/src/common.rs index e88907af761cb1b3bcbeab88a7239e59c8cbaa99..8a90df240d15bf2ff155733cd9228de1738694c2 100644 --- a/src/common.rs +++ b/src/common.rs @@ -50,6 +50,11 @@ pub type Key = Port; pub enum MainComponentErr { NoSuchComponent, NonPortTypeParameters, + CannotMovePort(Port), + WrongNumberOfParamaters { expected: usize }, + UnknownPort(Port), + WrongPortPolarity { param_index: usize, port: Port }, + DuplicateMovedPort(Port), } pub trait ProtocolDescription: Sized { type S: ComponentState; diff --git a/src/runtime/errors.rs b/src/runtime/errors.rs index 0a654f24e527816ee61793c61bffa5c8b3506ead..b34d96cfe33483c620428adff9c6840b81129f98 100644 --- a/src/runtime/errors.rs +++ b/src/runtime/errors.rs @@ -88,6 +88,7 @@ impl From for ConfigErr { match e { M::NoSuchComponent => C::NoSuchComponent, M::NonPortTypeParameters => C::NonPortTypeParameters, + _ => todo!(), } } } diff --git a/src/runtime/experimental/api.rs b/src/runtime/experimental/api.rs index 599ef40c0e2f2e424d3433a64025ea454b51bded..96dfde154fee08002136efd5d6ecf5639ebe3732 100644 --- a/src/runtime/experimental/api.rs +++ b/src/runtime/experimental/api.rs @@ -9,6 +9,7 @@ use crate::runtime::errors::PollDeadlineErr; use crate::runtime::MessengerState; use crate::runtime::Messengerlike; use crate::runtime::ReceivedMsg; +use crate::runtime::{ProtocolD, ProtocolS}; use std::net::SocketAddr; use std::sync::Arc; @@ -91,6 +92,15 @@ pub enum ConnectErr { PollingFailed, Timeout, } + +#[derive(Debug)] +struct Component { + protocol: Arc, + port_set: HashSet, + identifier: Arc<[u8]>, + state: ProtocolS, +} + impl From for ConnectErr { fn from(e: PollDeadlineErr) -> Self { use PollDeadlineErr as P; @@ -128,11 +138,11 @@ impl Connecting { fn new_connected( &self, controller_id: ControllerId, - protocol: &Arc, timeout: Option, ) -> Result { use ConnectErr::*; + /////////////////////////////////////////////////////// // 1. bindings correspond with ports 0..bindings.len(). For each: // - reserve a slot in endpoint_exts. // - store the port in `native_ports' set. @@ -256,6 +266,7 @@ impl Connecting { assert_eq!(None, endpoint_exts.iter_reserved().next()); drop(todos); + /////////////////////////////////////////////////////// // 1. construct `family', i.e. perform the sink tree setup procedure use {Msg::SetupMsg as S, SetupMsg::*}; let mut messenger = (&mut ms, &mut endpoint_exts); @@ -391,11 +402,11 @@ impl Connecting { } let family = Family { parent, children }; - // 1. done! return + // done! Ok(Connected { + components: Default::default(), controller_id, channel_index_stream, - protocol: protocol.clone(), endpoint_exts, native_ports, family, @@ -405,40 +416,78 @@ impl Connecting { pub fn connect_using_id( &mut self, controller_id: ControllerId, - protocol: &Arc, timeout: Option, ) -> Result { // 1. try and create a connection from these bindings with self immutable. - let connected = self.new_connected(controller_id, protocol, timeout)?; + let connected = self.new_connected(controller_id, timeout)?; // 2. success! drain self and return self.bindings.clear(); Ok(connected) } - pub fn connect( - &mut self, - protocol: &Arc, - timeout: Option, - ) -> Result { - self.connect_using_id(Self::random_controller_id(), protocol, timeout) - } -} -#[derive(Debug)] -pub struct Protocol; -impl Protocol { - pub fn parse(_pdl_text: &[u8]) -> Result { - Ok(Protocol) + pub fn connect(&mut self, timeout: Option) -> Result { + self.connect_using_id(Self::random_controller_id(), timeout) } } + #[derive(Debug)] pub struct Connected { native_ports: HashSet, controller_id: ControllerId, channel_index_stream: ChannelIndexStream, endpoint_exts: VecStorage, - protocol: Arc, + components: VecStorage, family: Family, } impl Connected { + pub fn new_component( + &mut self, + protocol: &Arc, + identifier: &Arc<[u8]>, + moved_port_list: &[Port], + ) -> Result<(), MainComponentErr> { + ////////////////////////////////////////// + // 1. try and create a new component (without mutating self) + use MainComponentErr::*; + let moved_port_set = { + let mut set: HashSet = Default::default(); + for &port in moved_port_list.iter() { + if !self.native_ports.contains(&port) { + return Err(CannotMovePort(port)); + } + if !set.insert(port) { + return Err(DuplicateMovedPort(port)); + } + } + set + }; + // moved_port_set is disjoint to native_ports + let expected_polarities = protocol.component_polarities(identifier)?; + if moved_port_list.len() != expected_polarities.len() { + return Err(WrongNumberOfParamaters { expected: expected_polarities.len() }); + } + // correct polarity list + for (param_index, (&port, &expected_polarity)) in + moved_port_list.iter().zip(expected_polarities.iter()).enumerate() + { + let polarity = + self.endpoint_exts.get_occupied(port.0).ok_or(UnknownPort(port))?.info.polarity; + if polarity != expected_polarity { + return Err(WrongPortPolarity { param_index, port }); + } + } + let state = protocol.new_main_component(identifier, &moved_port_list); + let component = Component { + port_set: moved_port_set, + protocol: protocol.clone(), + identifier: identifier.clone(), + state, + }; + ////////////////////////////// + // success! mutate self and return Ok + self.native_ports.retain(|e| !component.port_set.contains(e)); + self.components.new_occupied(component); + Ok(()) + } pub fn new_channel(&mut self) -> (OutPort, InPort) { assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2); let channel_id = ChannelId { @@ -456,15 +505,6 @@ impl Connected { }); (OutPort(Port(kp)), InPort(Port(kg))) } - pub fn new_component(&mut self, _name: Vec, moved_ports: &[Port]) -> Result<(), ()> { - let moved_ports = moved_ports.iter().copied().collect(); - if !self.native_ports.is_superset(&moved_ports) { - return Err(()); - } - self.native_ports.retain(|e| !moved_ports.contains(e)); - // self.components.push(ComponentExt { ports: moved_ports, protocol: protocol.clone(), name }); - todo!() - } pub fn sync_set(&mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs]) -> Result<(), ()> { Ok(()) } @@ -496,11 +536,12 @@ fn api_new_test() { let mut c = Connecting::default(); let net_out: OutPort = c.bind(Coupling::Active, "127.0.0.1:8000".parse().unwrap()); let net_in: InPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap()); - let proto_0 = Arc::new(Protocol::parse(b"").unwrap()); - let mut c = c.connect(&proto_0, None).unwrap(); + let proto_0 = Arc::new(ProtocolD::parse(b"").unwrap()); + let mut c = c.connect(None).unwrap(); let (mem_out, mem_in) = c.new_channel(); let mut inbuf = [0u8; 64]; - c.new_component(b"sync".to_vec(), &[net_in.into(), mem_out.into()]).unwrap(); + let identifier: Arc<[u8]> = b"sync".to_vec().into(); + c.new_component(&proto_0, &identifier, &[net_in.into(), mem_out.into()]).unwrap(); let mut ops = [ PortOpRs::In { msg_range: None, port: &mem_in }, PortOpRs::Out { msg: b"hey", port: &net_out, optional: false }, @@ -596,19 +637,17 @@ fn api_connecting() { "127.0.0.1:8889".parse().unwrap(), "127.0.0.1:8890".parse().unwrap(), ]; - let protocol1 = Arc::new(Protocol::parse(b"").unwrap()); - let protocol2 = protocol1.clone(); let handles = vec![ std::thread::spawn(move || { let mut connecting = Connecting::default(); let _a: OutPort = connecting.bind(Coupling::Active, addrs[0]); - let connected = connecting.connect(&protocol1, None); + let connected = connecting.connect(None); println!("A: {:#?}", connected); }), std::thread::spawn(move || { let mut connecting = Connecting::default(); let _a: OutPort = connecting.bind(Coupling::Passive, addrs[0]); - let connected = connecting.connect(&protocol2, Some(Duration::from_secs(2))); + let connected = connecting.connect(Some(Duration::from_secs(2))); println!("B: {:#?}", connected); }), ];