use super::bits::{usizes_for_bits, BitChunkIter, BitMatrix, Pair, TRUE_CHUNK}; use super::vec_storage::VecStorage; use crate::common::*; use crate::runtime::endpoint::EndpointExt; use crate::runtime::endpoint::EndpointInfo; use crate::runtime::endpoint::{Endpoint, Msg, SetupMsg}; use crate::runtime::errors::EndpointErr; use crate::runtime::errors::MessengerRecvErr; use crate::runtime::errors::PollDeadlineErr; use crate::runtime::MessengerState; use crate::runtime::Messengerlike; use crate::runtime::ReceivedMsg; use crate::runtime::{ProtocolD, ProtocolS}; use std::net::SocketAddr; use std::sync::Arc; pub enum Coupling { Active, Passive, } #[derive(Debug)] struct Family { parent: Option, children: HashSet, } pub struct Binding { pub coupling: Coupling, pub polarity: Polarity, pub addr: SocketAddr, } pub struct InPort(Port); // InPort and OutPort are AFFINE (exposed to Rust API) pub struct OutPort(Port); impl From for Port { fn from(x: InPort) -> Self { x.0 } } impl From for Port { fn from(x: OutPort) -> Self { x.0 } } #[derive(Default, Debug)] struct ChannelIndexStream { next: u32, } impl ChannelIndexStream { fn next(&mut self) -> u32 { self.next += 1; self.next - 1 } } enum Connector { Connecting(Connecting), Connected(Connected), } #[derive(Default)] pub struct Connecting { bindings: Vec, } trait Binds { fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> T; } impl Binds for Connecting { fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> InPort { self.bindings.push(Binding { coupling, polarity: Polarity::Getter, addr }); InPort(Port(self.bindings.len() - 1)) } } impl Binds for Connecting { fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> OutPort { self.bindings.push(Binding { coupling, polarity: Polarity::Putter, addr }); OutPort(Port(self.bindings.len() - 1)) } } #[derive(Debug, Clone)] pub enum ConnectErr { BindErr(SocketAddr), NewSocketErr(SocketAddr), AcceptErr(SocketAddr), ConnectionShutdown(SocketAddr), PortKindMismatch(Port, SocketAddr), EndpointErr(Port, EndpointErr), PollInitFailed, PollingFailed, Timeout, } #[derive(Debug)] struct Component { protocol: Arc, port_set: HashSet, identifier: Arc<[u8]>, state: Option, // invariant between rounds: Some() } impl From for ConnectErr { fn from(e: PollDeadlineErr) -> Self { use PollDeadlineErr as P; match e { P::PollingFailed => Self::PollingFailed, P::Timeout => Self::Timeout, } } } impl From for ConnectErr { fn from(e: MessengerRecvErr) -> Self { use MessengerRecvErr as M; match e { M::PollingFailed => Self::PollingFailed, M::EndpointErr(port, err) => Self::EndpointErr(port, err), } } } impl Connecting { fn random_controller_id() -> ControllerId { type Bytes8 = [u8; std::mem::size_of::()]; let mut bytes = Bytes8::default(); getrandom::getrandom(&mut bytes).unwrap(); unsafe { // safe: // 1. All random bytes give valid Bytes8 // 2. Bytes8 and ControllerId have same valid representations std::mem::transmute::(bytes) } } fn test_stream_connectivity(stream: &mut TcpStream) -> bool { use std::io::Write; stream.write(&[]).is_ok() } fn new_connected( &self, controller_id: ControllerId, timeout: Option, ) -> Result { use ConnectErr::*; /////////////////////////////////////////////////////// // 1. bindings correspond with ports 0..bindings.len(). For each: // - reserve a slot in endpoint_exts. // - store the port in `native_ports' set. let mut endpoint_exts = VecStorage::::with_reserved_range(self.bindings.len()); let native_ports = (0..self.bindings.len()).map(Port).collect(); // 2. create MessengerState structure for polling channels let edge = PollOpt::edge(); let [ready_r, ready_w] = [Ready::readable(), Ready::writable()]; let mut ms = MessengerState::with_event_capacity(self.bindings.len()).map_err(|_| PollInitFailed)?; // 3. create one TODO task per (port,binding) as a vector with indices in lockstep. // we will drain it gradually so we store elements of type Option where all are initially Some(_) enum Todo { PassiveAccepting { listener: TcpListener, channel_id: ChannelId }, ActiveConnecting { stream: TcpStream }, PassiveConnecting { stream: TcpStream, channel_id: ChannelId }, ActiveRecving { endpoint: Endpoint }, } let mut channel_index_stream = ChannelIndexStream::default(); let mut todos = self .bindings .iter() .enumerate() .map(|(index, binding)| { Ok(Some(match binding.coupling { Coupling::Passive => { let channel_index = channel_index_stream.next(); let channel_id = ChannelId { controller_id, channel_index }; let listener = TcpListener::bind(&binding.addr).map_err(|_| BindErr(binding.addr))?; ms.poll.register(&listener, Token(index), ready_r, edge).unwrap(); // registration unique Todo::PassiveAccepting { listener, channel_id } } Coupling::Active => { let stream = TcpStream::connect(&binding.addr) .map_err(|_| NewSocketErr(binding.addr))?; ms.poll.register(&stream, Token(index), ready_w, edge).unwrap(); // registration unique Todo::ActiveConnecting { stream } } })) }) .collect::>, ConnectErr>>()?; let mut num_todos_remaining = todos.len(); // 4. handle incoming events until all TODOs are completed OR we timeout let deadline = timeout.map(|t| Instant::now() + t); let mut polled_undrained_later = IndexSet::<_>::default(); let mut backoff_millis = 10; while num_todos_remaining > 0 { ms.poll_events_until(deadline)?; for event in ms.events.iter() { let token = event.token(); let index = token.0; let binding = &self.bindings[index]; match todos[index].take() { None => { polled_undrained_later.insert(index); } Some(Todo::PassiveAccepting { listener, channel_id }) => { let (stream, _peer_addr) = listener.accept().map_err(|_| AcceptErr(binding.addr))?; ms.poll.deregister(&listener).expect("wer"); ms.poll.register(&stream, token, ready_w, edge).expect("3y5"); todos[index] = Some(Todo::PassiveConnecting { stream, channel_id }); } Some(Todo::ActiveConnecting { mut stream }) => { let todo = if Self::test_stream_connectivity(&mut stream) { ms.poll.reregister(&stream, token, ready_r, edge).expect("52"); let endpoint = Endpoint::from_fresh_stream(stream); Todo::ActiveRecving { endpoint } } else { ms.poll.deregister(&stream).expect("wt"); std::thread::sleep(Duration::from_millis(backoff_millis)); backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3; let stream = TcpStream::connect(&binding.addr).unwrap(); ms.poll.register(&stream, token, ready_w, edge).expect("PAC 3"); Todo::ActiveConnecting { stream } }; todos[index] = Some(todo); } Some(Todo::PassiveConnecting { mut stream, channel_id }) => { if !Self::test_stream_connectivity(&mut stream) { return Err(ConnectionShutdown(binding.addr)); } ms.poll.reregister(&stream, token, ready_r, edge).expect("55"); let polarity = binding.polarity; let info = EndpointInfo { polarity, channel_id }; let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info }); let mut endpoint = Endpoint::from_fresh_stream(stream); endpoint.send(msg).map_err(|e| EndpointErr(Port(index), e))?; let endpoint_ext = EndpointExt { endpoint, info }; endpoint_exts.occupy_reserved(index, endpoint_ext); num_todos_remaining -= 1; } Some(Todo::ActiveRecving { mut endpoint }) => { let ekey = Port(index); 'recv_loop: while let Some(msg) = endpoint.recv().map_err(|e| EndpointErr(ekey, e))? { if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg { if info.polarity == binding.polarity { return Err(PortKindMismatch(ekey, binding.addr)); } let channel_id = info.channel_id; let info = EndpointInfo { polarity: binding.polarity, channel_id }; ms.polled_undrained.insert(ekey); let endpoint_ext = EndpointExt { endpoint, info }; endpoint_exts.occupy_reserved(index, endpoint_ext); num_todos_remaining -= 1; break 'recv_loop; } else { ms.delayed.push(ReceivedMsg { recipient: ekey, msg }); } } } } } } assert_eq!(None, endpoint_exts.iter_reserved().next()); drop(todos); /////////////////////////////////////////////////////// // 1. construct `family', i.e. perform the sink tree setup procedure use {Msg::SetupMsg as S, SetupMsg::*}; let mut messenger = (&mut ms, &mut endpoint_exts); impl Messengerlike for (&mut MessengerState, &mut VecStorage) { fn get_state_mut(&mut self) -> &mut MessengerState { self.0 } fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint { &mut self .1 .get_occupied_mut(ekey.to_raw() as usize) .expect("OUT OF BOUNDS") .endpoint } } // 1. broadcast my ID as the first echo. await reply from all in net_keylist let neighbors = (0..self.bindings.len()).map(Port); let echo = S(LeaderEcho { maybe_leader: controller_id }); let mut awaiting = IndexSet::::with_capacity(neighbors.len()); for n in neighbors.clone() { messenger.send(n, echo.clone()).map_err(|e| EndpointErr(n, e))?; awaiting.insert(n); } // 2. Receive incoming replies. whenever a higher-id echo arrives, // adopt it as leader, sender as parent, and reset the await set. let mut parent: Option = None; let mut my_leader = controller_id; messenger.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { let ReceivedMsg { recipient, msg } = messenger.recv_until(deadline)?.ok_or(Timeout)?; match msg { S(LeaderAnnounce { leader }) => { // someone else completed the echo and became leader first! // the sender is my parent parent = Some(recipient); my_leader = leader; awaiting.clear(); break 'echo_loop; } S(LeaderEcho { maybe_leader }) => { use Ordering::*; match maybe_leader.cmp(&my_leader) { Less => { /* ignore */ } Equal => { awaiting.remove(&recipient); if awaiting.is_empty() { if let Some(p) = parent { // return the echo to my parent messenger .send(p, S(LeaderEcho { maybe_leader })) .map_err(|e| EndpointErr(p, e))?; } else { // DECIDE! break 'echo_loop; } } } Greater => { // join new echo parent = Some(recipient); my_leader = maybe_leader; let echo = S(LeaderEcho { maybe_leader: my_leader }); awaiting.clear(); if neighbors.len() == 1 { // immediately reply to parent messenger .send(recipient, echo.clone()) .map_err(|e| EndpointErr(recipient, e))?; } else { for n in neighbors.clone() { if n != recipient { messenger .send(n, echo.clone()) .map_err(|e| EndpointErr(n, e))?; awaiting.insert(n); } } } } } } msg => messenger.delay(ReceivedMsg { recipient, msg }), } } match parent { None => assert_eq!( my_leader, controller_id, "I've got no parent, but I consider {:?} the leader?", my_leader ), Some(parent) => assert_ne!( my_leader, controller_id, "I have {:?} as parent, but I consider myself ({:?}) the leader?", parent, controller_id ), } // 3. broadcast leader announcement (except to parent: confirm they are your parent) // in this loop, every node sends 1 message to each neighbor let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); for n in neighbors.clone() { let msg = if Some(n) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() }; messenger.send(n, msg).map_err(|e| EndpointErr(n, e))?; } // await 1 message from all non-parents for n in neighbors.clone() { if Some(n) != parent { awaiting.insert(n); } } let mut children = HashSet::default(); messenger.undelay_all(); while !awaiting.is_empty() { let ReceivedMsg { recipient, msg } = messenger.recv_until(deadline)?.ok_or(Timeout)?; let recipient = recipient; match msg { S(YouAreMyParent) => { assert!(awaiting.remove(&recipient)); children.insert(recipient); } S(SetupMsg::LeaderAnnounce { leader }) => { assert!(awaiting.remove(&recipient)); assert!(leader == my_leader); assert!(Some(recipient) != parent); // they wouldn't send me this if they considered me their parent } _ => messenger.delay(ReceivedMsg { recipient, msg }), } } let family = Family { parent, children }; // done! Ok(Connected { components: Default::default(), controller_id, channel_index_stream, endpoint_exts, native_ports, family, ephemeral: Default::default(), }) } ///////// pub fn connect_using_id( &mut self, controller_id: ControllerId, timeout: Option, ) -> Result { // 1. try and create a connection from these bindings with self immutable. let connected = self.new_connected(controller_id, timeout)?; // 2. success! drain self and return self.bindings.clear(); Ok(connected) } pub fn connect(&mut self, timeout: Option) -> Result { self.connect_using_id(Self::random_controller_id(), timeout) } } #[derive(Debug)] pub struct Connected { native_ports: HashSet, controller_id: ControllerId, channel_index_stream: ChannelIndexStream, endpoint_exts: VecStorage, components: VecStorage, family: Family, ephemeral: Ephemeral, } #[derive(Debug, Default)] struct Ephemeral { // invariant: between rounds this is cleared machines: Vec, bit_matrix: BitMatrix, assignment_to_bit_property: HashMap<(ChannelId, bool), usize>, usize_buf: Vec, } impl Ephemeral { fn clear(&mut self) { self.bit_matrix = Default::default(); self.usize_buf.clear(); self.machines.clear(); self.assignment_to_bit_property.clear(); } } #[derive(Debug)] struct Machine { component_index: usize, state: ProtocolS, } struct MonoCtx<'a> { another_pass: &'a mut bool, } impl MonoContext for MonoCtx<'_> { type D = ProtocolD; type S = ProtocolS; fn new_component(&mut self, moved_keys: HashSet, init_state: Self::S) { todo!() } fn new_channel(&mut self) -> [Key; 2] { todo!() } fn new_random(&mut self) -> u64 { todo!() } } impl Connected { pub fn new_component( &mut self, protocol: &Arc, identifier: &Arc<[u8]>, moved_port_list: &[Port], ) -> Result<(), MainComponentErr> { ////////////////////////////////////////// // 1. try and create a new component (without mutating self) use MainComponentErr::*; let moved_port_set = { let mut set: HashSet = Default::default(); for &port in moved_port_list.iter() { if !self.native_ports.contains(&port) { return Err(CannotMovePort(port)); } if !set.insert(port) { return Err(DuplicateMovedPort(port)); } } set }; // moved_port_set is disjoint to native_ports let expected_polarities = protocol.component_polarities(identifier)?; if moved_port_list.len() != expected_polarities.len() { return Err(WrongNumberOfParamaters { expected: expected_polarities.len() }); } // correct polarity list for (param_index, (&port, &expected_polarity)) in moved_port_list.iter().zip(expected_polarities.iter()).enumerate() { let polarity = self.endpoint_exts.get_occupied(port.0).ok_or(UnknownPort(port))?.info.polarity; if polarity != expected_polarity { return Err(WrongPortPolarity { param_index, port }); } } let state = Some(protocol.new_main_component(identifier, &moved_port_list)); let component = Component { port_set: moved_port_set, protocol: protocol.clone(), identifier: identifier.clone(), state, }; ////////////////////////////// // success! mutate self and return Ok self.native_ports.retain(|e| !component.port_set.contains(e)); self.components.new_occupied(component); Ok(()) } pub fn new_channel(&mut self) -> (OutPort, InPort) { assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2); let channel_id = ChannelId { controller_id: self.controller_id, channel_index: self.channel_index_stream.next(), }; let [e0, e1] = Endpoint::new_memory_pair(); let kp = self.endpoint_exts.new_occupied(EndpointExt { info: EndpointInfo { channel_id, polarity: Putter }, endpoint: e0, }); let kg = self.endpoint_exts.new_occupied(EndpointExt { info: EndpointInfo { channel_id, polarity: Getter }, endpoint: e1, }); (OutPort(Port(kp)), InPort(Port(kg))) } pub fn sync_set(&mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs]) -> Result<(), ()> { // For every component, take its state and make a singleton machine for (component_index, component) in self.components.iter_mut().enumerate() { let state = component.state.take().unwrap(); let machine = Machine { component_index, state }; self.ephemeral.machines.push(machine); } // Grow property matrix. has |machines| entities and {to_run => 0, to_remove => 1} properties const PROP_TO_RUN: usize = 0; const PROP_TO_REMOVE: usize = 1; self.ephemeral .bit_matrix .grow_to(Pair { property: 2, entity: self.ephemeral.machines.len() as u32 }); // Set to_run property for all existing machines self.ephemeral.bit_matrix.batch_mut(move |p| p[PROP_TO_RUN] = TRUE_CHUNK); ///////////// // perform mono runs, adding and removing TO_RUN property bits bits, and adding PROP_TO_REMOVE property bits let mut usize_buf = vec![]; let mut another_pass = true; while another_pass { another_pass = false; let machine_index_iter = self .ephemeral .bit_matrix .iter_entities_where(&mut usize_buf, move |p| p[PROP_TO_RUN]); for machine_index in machine_index_iter { let machine = &mut self.ephemeral.machines[machine_index as usize]; let component = self.components.get_occupied(machine.component_index).unwrap(); let mut ctx = MonoCtx { another_pass: &mut another_pass }; // TODO ctx doesn't work. it may callback to create new machines (setting their TO_RUN and another_pass=true) match machine.state.pre_sync_run(&mut ctx, &component.protocol) { MonoBlocker::Inconsistent => todo!(), // make entire state inconsistent! MonoBlocker::ComponentExit => self .ephemeral .bit_matrix .set(Pair { entity: machine_index, property: PROP_TO_REMOVE as u32 }), MonoBlocker::SyncBlockStart => self .ephemeral .bit_matrix .unset(Pair { entity: machine_index, property: PROP_TO_RUN as u32 }), } } } // no machines have property TO_RUN // from back to front, swap_remove all machines with PROP_TO_REMOVE let machine_index_iter = self .ephemeral .bit_matrix .iter_entities_where_rev(&mut usize_buf, move |p| p[PROP_TO_REMOVE]); for machine_index in machine_index_iter { let machine = self.ephemeral.machines.swap_remove(machine_index as usize); drop(machine); } // replace old matrix full of bogus data with a new (fresh) one for the set of machines // henceforth, machines(entities) and properties won't shrink or move. self.ephemeral.bit_matrix = BitMatrix::new(Pair { entity: self.ephemeral.machines.len() as u32 * 2, property: 8 }); // !!! TODO poly run until solution is found //////////////////// let solution_assignments: Vec<(ChannelId, bool)> = vec![]; // solution has been found. time to find a // logically destructure self so we can read and write to different fields interleaved... let Self { components, ephemeral: Ephemeral { bit_matrix, assignment_to_bit_property, usize_buf, machines }, .. } = self; // !!!!!!! TODO MORE HERE let machine_index_iter = bit_matrix.iter_entities_where(usize_buf, move |p| { solution_assignments.iter().fold(TRUE_CHUNK, |chunk, assignment| { let &bit_property = assignment_to_bit_property.get(assignment).unwrap(); chunk & p[bit_property] }) }); for machine_index in machine_index_iter { let machine = &machines[machine_index as usize]; let component = &mut components.get_occupied_mut(machine.component_index).unwrap(); let was = component.state.replace(machine.state.clone()); assert!(was.is_none()); // 2+ machines matched the solution for this component! println!("visiting machine at index {:?}", machine_index); } for component in self.components.iter() { assert!(component.state.is_some()); // 0 machines matched the solution for this component! } self.ephemeral.clear(); println!("B {:#?}", self); Ok(()) } pub fn sync_subsets( &mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs], bit_subsets: &[&[usize]], ) -> Result { for (batch_index, bit_subset) in bit_subsets.iter().enumerate() { println!("batch_index {:?}", batch_index); let chunk_iter = bit_subset.iter().copied(); for index in BitChunkIter::new(chunk_iter) { println!(" index {:?}", index); } } Ok(0) } } macro_rules! bitslice { ($( $num:expr ),*) => {{ &[0 $( | (1usize << $num) )*] }}; } #[test] fn api_new_test() { let mut c = Connecting::default(); let net_out: OutPort = c.bind(Coupling::Active, "127.0.0.1:8000".parse().unwrap()); let net_in: InPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap()); let proto_0 = Arc::new(ProtocolD::parse(b"").unwrap()); let mut c = c.connect(None).unwrap(); let (mem_out, mem_in) = c.new_channel(); let mut inbuf = [0u8; 64]; let identifier: Arc<[u8]> = b"sync".to_vec().into(); c.new_component(&proto_0, &identifier, &[net_in.into(), mem_out.into()]).unwrap(); let mut ops = [ PortOpRs::In { msg_range: None, port: &mem_in }, PortOpRs::Out { msg: b"hey", port: &net_out, optional: false }, PortOpRs::Out { msg: b"hi?", port: &net_out, optional: true }, PortOpRs::Out { msg: b"yo!", port: &net_out, optional: false }, ]; c.sync_set(&mut inbuf, &mut ops).unwrap(); c.sync_subsets(&mut inbuf, &mut ops, &[bitslice! {0,1,2}]).unwrap(); } #[repr(C)] pub struct PortOp { msgptr: *mut u8, // read if OUT, field written if IN, will point into buf msglen: usize, // read if OUT, written if IN, won't exceed buf port: Port, optional: bool, // no meaning if } pub enum PortOpRs<'a> { In { msg_range: Option>, port: &'a InPort }, Out { msg: &'a [u8], port: &'a OutPort, optional: bool }, } unsafe fn c_sync_set( connected: &mut Connected, inbuflen: usize, inbufptr: *mut u8, opslen: usize, opsptr: *mut PortOp, ) -> i32 { let buf = as_mut_slice(inbuflen, inbufptr); let ops = as_mut_slice(opslen, opsptr); let (subset_index, wrote) = sync_inner(connected, buf); assert_eq!(0, subset_index); for op in ops { if let Some(range) = wrote.get(&op.port) { op.msgptr = inbufptr.add(range.start); op.msglen = range.end - range.start; } } 0 } unsafe fn c_sync_subset( connected: &mut Connected, inbuflen: usize, inbufptr: *mut u8, opslen: usize, opsptr: *mut PortOp, subsetslen: usize, subsetsptr: *const *const usize, ) -> i32 { let buf: &mut [u8] = as_mut_slice(inbuflen, inbufptr); let ops: &mut [PortOp] = as_mut_slice(opslen, opsptr); let subsets: &[*const usize] = as_const_slice(subsetslen, subsetsptr); let subsetlen = usizes_for_bits(opslen); // don't yet know subsetptr; which subset fires unknown! let (subset_index, wrote) = sync_inner(connected, buf); let subsetptr: *const usize = subsets[subset_index]; let subset: &[usize] = as_const_slice(subsetlen, subsetptr); for index in BitChunkIter::new(subset.iter().copied()) { let op = &mut ops[index as usize]; if let Some(range) = wrote.get(&op.port) { op.msgptr = inbufptr.add(range.start); op.msglen = range.end - range.start; } } subset_index as i32 } // dummy fn for the actual synchronous round fn sync_inner<'c, 'b>( _connected: &'c mut Connected, _buf: &'b mut [u8], ) -> (usize, &'b HashMap>) { todo!() } unsafe fn as_mut_slice<'a, T>(len: usize, ptr: *mut T) -> &'a mut [T] { std::slice::from_raw_parts_mut(ptr, len) } unsafe fn as_const_slice<'a, T>(len: usize, ptr: *const T) -> &'a [T] { std::slice::from_raw_parts(ptr, len) } #[test] fn api_connecting() { let addrs: [SocketAddr; 3] = [ "127.0.0.1:8888".parse().unwrap(), "127.0.0.1:8889".parse().unwrap(), "127.0.0.1:8890".parse().unwrap(), ]; lazy_static::lazy_static! { static ref PROTOCOL: Arc = { static PDL: &[u8] = b" primitive sync(in i, out o) { while(true) synchronous { put(o, get(i)); } } "; Arc::new(ProtocolD::parse(PDL).unwrap()) }; } const TIMEOUT: Option = Some(Duration::from_secs(1)); let handles = vec![ std::thread::spawn(move || { let mut c = Connecting::default(); let p_in: InPort = c.bind(Coupling::Passive, addrs[0]); let p_out: OutPort = c.bind(Coupling::Active, addrs[1]); let mut c = c.connect(TIMEOUT).unwrap(); println!("c {:#?}", &c); let identifier = b"sync".to_vec().into(); c.new_component(&PROTOCOL, &identifier, &[p_in.into(), p_out.into()]).unwrap(); println!("c {:#?}", &c); let mut inbuf = vec![]; let mut port_ops = []; c.sync_set(&mut inbuf, &mut port_ops).unwrap(); }), std::thread::spawn(move || { let mut connecting = Connecting::default(); let _a: OutPort = connecting.bind(Coupling::Active, addrs[0]); let _b: InPort = connecting.bind(Coupling::Passive, addrs[1]); let _c: InPort = connecting.bind(Coupling::Active, addrs[2]); let _connected = connecting.connect(TIMEOUT).unwrap(); }), std::thread::spawn(move || { let mut connecting = Connecting::default(); let _a: OutPort = connecting.bind(Coupling::Passive, addrs[2]); let _connected = connecting.connect(TIMEOUT).unwrap(); }), ]; for h in handles { h.join().unwrap(); } }