From 8ea9f0e9a5abe06e775cabba8d2b4bf80e8807d1 2020-02-21 10:23:17 From: Christopher Esterhuyse Date: 2020-02-21 10:23:17 Subject: [PATCH] fusing ports and keys --- diff --git a/src/common.rs b/src/common.rs index e7a6037b9085fbbe67ec46ab9eafe6bbf6140448..e88907af761cb1b3bcbeab88a7239e59c8cbaa99 100644 --- a/src/common.rs +++ b/src/common.rs @@ -42,7 +42,9 @@ pub enum Polarity { } #[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone, Debug)] -pub struct Key(u64); +#[repr(C)] +pub struct Port(pub usize); // ports are COPY +pub type Key = Port; #[derive(Eq, PartialEq, Copy, Clone, Debug)] pub enum MainComponentErr { @@ -105,10 +107,10 @@ pub trait PolyContext { ///////////////////// IMPL ///////////////////// impl Key { - pub fn from_raw(raw: u64) -> Self { + pub fn from_raw(raw: usize) -> Self { Self(raw) } - pub fn to_raw(self) -> u64 { + pub fn to_raw(self) -> usize { self.0 } pub fn to_token(self) -> mio::Token { diff --git a/src/runtime/experimental/api.rs b/src/runtime/experimental/api.rs index bb8b1ff79eb5ad2c8fbf721b2362973404e3b6f3..78f1353f41c5180115899939cec7f314e4378212 100644 --- a/src/runtime/experimental/api.rs +++ b/src/runtime/experimental/api.rs @@ -1,33 +1,33 @@ +use super::vec_storage::VecStorage; use crate::common::*; -use crate::runtime::endpoint::Endpoint; use crate::runtime::endpoint::EndpointExt; use crate::runtime::endpoint::EndpointInfo; +use crate::runtime::endpoint::{Endpoint, Msg, SetupMsg}; +use crate::runtime::MessengerState; +use crate::runtime::Messengerlike; +use crate::runtime::ReceivedMsg; use std::net::SocketAddr; use std::sync::Arc; -pub enum Polarity { - In, - Out, -} pub enum Coupling { Active, Passive, } + +struct Family { + parent: Option, + children: HashSet, +} + pub struct Binding { pub coupling: Coupling, pub polarity: Polarity, pub addr: SocketAddr, } -impl From<(Coupling, Polarity, SocketAddr)> for Binding { - fn from((coupling, polarity, addr): (Coupling, Polarity, SocketAddr)) -> Self { - Self { coupling, polarity, addr } - } -} -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -#[repr(C)] -pub struct Port(pub u32); +pub struct InPort(Port); // InPort and OutPort are AFFINE (exposed to Rust API) +pub struct OutPort(Port); impl From for Port { fn from(x: InPort) -> Self { x.0 @@ -38,8 +38,6 @@ impl From for Port { x.0 } } -pub struct InPort(Port); -pub struct OutPort(Port); #[derive(Default)] struct ChannelIndexStream { @@ -59,39 +57,345 @@ enum Connector { #[derive(Default)] pub struct Connecting { - bindings: Vec, // invariant: no more than std::u32::MAX entries + bindings: Vec, } trait Binds { fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> T; } impl Binds for Connecting { fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> InPort { - self.bindings.push((coupling, Polarity::In, addr).into()); - let pid: u32 = (self.bindings.len() - 1).try_into().expect("Port ID overflow!"); - InPort(Port(pid)) + self.bindings.push(Binding { coupling, polarity: Polarity::Getter, addr }); + InPort(Port(self.bindings.len() - 1)) } } impl Binds for Connecting { fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> OutPort { - self.bindings.push((coupling, Polarity::Out, addr).into()); - let pid: u32 = (self.bindings.len() - 1).try_into().expect("Port ID overflow!"); - OutPort(Port(pid)) + self.bindings.push(Binding { coupling, polarity: Polarity::Putter, addr }); + OutPort(Port(self.bindings.len() - 1)) } } impl Connecting { - pub fn connect(&mut self, _timeout: Option) -> Result { - let controller_id = 42; - let channel_index_stream = ChannelIndexStream::default(); - let native_ports = (0..self.bindings.len()).map(|x| Port(x as u32)).collect(); - self.bindings.clear(); + fn random_controller_id() -> ControllerId { + type Bytes8 = [u8; std::mem::size_of::()]; + let mut bytes = Bytes8::default(); + getrandom::getrandom(&mut bytes).unwrap(); + unsafe { + // safe: + // 1. All random bytes give valid Bytes8 + // 2. Bytes8 and ControllerId have same valid representations + std::mem::transmute::(bytes) + } + } + fn test_stream_connectivity(stream: &mut TcpStream) -> bool { + use std::io::Write; + stream.write(&[]).is_ok() + } + fn new_connected( + &self, + controller_id: ControllerId, + protocol: &Arc, + timeout: Option, + ) -> Result { + // TEMP: helper functions until Key is unified with Port + #[inline] + fn key2port(ekey: Key) -> Port { + Port(ekey.to_raw() as usize) + } + #[inline] + fn port2key(port: Port) -> Key { + Key::from_raw(port.0) + } + + // 1. bindings correspond with ports 0..bindings.len(). For each: + // - reserve a slot in endpoint_exts. + // - store the port in `native_ports' set. + let mut endpoint_exts = VecStorage::::with_reserved_range(self.bindings.len()); + let native_ports = (0..self.bindings.len()).map(Port).collect(); + + // 2. create MessengerState structure for polling channels + let edge = PollOpt::edge(); + let [ready_r, ready_w] = [Ready::readable(), Ready::writable()]; + let mut ms = MessengerState { + poll: Poll::new().map_err(drop)?, + events: Events::with_capacity(self.bindings.len()), + delayed: vec![], + undelayed: vec![], + polled_undrained: Default::default(), + }; + + // 3. create one TODO task per (port,binding) as a vector with indices in lockstep. + // we will drain it gradually so we store elements of type Option where all are initially Some(_) + enum Todo { + PassiveAccepting { listener: TcpListener, channel_id: ChannelId }, + ActiveConnecting { stream: TcpStream }, + PassiveConnecting { stream: TcpStream, channel_id: ChannelId }, + ActiveRecving { endpoint: Endpoint }, + } + let mut channel_index_stream = ChannelIndexStream::default(); + let mut todos = self + .bindings + .iter() + .enumerate() + .map(|(index, binding)| { + Ok(Some(match binding.coupling { + Coupling::Passive => { + let channel_index = channel_index_stream.next(); + let channel_id = ChannelId { controller_id, channel_index }; + let listener = TcpListener::bind(&binding.addr).map_err(drop)?; + ms.poll.register(&listener, Token(index), ready_r, edge).unwrap(); // registration unique + Todo::PassiveAccepting { listener, channel_id } + } + Coupling::Active => { + let stream = TcpStream::connect(&binding.addr).map_err(drop)?; + ms.poll.register(&stream, Token(index), ready_w, edge).unwrap(); // registration unique + Todo::ActiveConnecting { stream } + } + })) + }) + .collect::>, ()>>()?; + let mut num_todos_remaining = todos.len(); + + // 4. handle incoming events until all TODOs are completed OR we timeout + let deadline = timeout.map(|t| Instant::now() + t); + let mut polled_undrained_later = IndexSet::<_>::default(); + let mut backoff_millis = 10; + while num_todos_remaining > 0 { + ms.poll_events_until(deadline).map_err(drop)?; + for event in ms.events.iter() { + let token = event.token(); + let index = token.0; + let binding = &self.bindings[index]; + match todos[index].take() { + None => { + polled_undrained_later.insert(index); + } + Some(Todo::PassiveAccepting { listener, channel_id }) => { + let (stream, _peer_addr) = listener.accept().map_err(drop)?; + ms.poll.deregister(&listener).expect("wer"); + ms.poll.register(&stream, token, ready_w, edge).expect("3y5"); + todos[index] = Some(Todo::PassiveConnecting { stream, channel_id }); + } + Some(Todo::ActiveConnecting { mut stream }) => { + let todo = if Self::test_stream_connectivity(&mut stream) { + ms.poll.reregister(&stream, token, ready_r, edge).expect("52"); + let endpoint = Endpoint::from_fresh_stream(stream); + Todo::ActiveRecving { endpoint } + } else { + ms.poll.deregister(&stream).expect("wt"); + std::thread::sleep(Duration::from_millis(backoff_millis)); + backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3; + let stream = TcpStream::connect(&binding.addr).unwrap(); + ms.poll.register(&stream, token, ready_w, edge).expect("PAC 3"); + Todo::ActiveConnecting { stream } + }; + todos[index] = Some(todo); + } + Some(Todo::PassiveConnecting { mut stream, channel_id }) => { + if !Self::test_stream_connectivity(&mut stream) { + return Err(()); + } + ms.poll.reregister(&stream, token, ready_r, edge).expect("55"); + let polarity = binding.polarity; + let info = EndpointInfo { polarity, channel_id }; + let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info }); + let mut endpoint = Endpoint::from_fresh_stream(stream); + endpoint.send(msg).map_err(drop)?; + let endpoint_ext = EndpointExt { endpoint, info }; + endpoint_exts.occupy_reserved(index, endpoint_ext); + num_todos_remaining -= 1; + } + Some(Todo::ActiveRecving { mut endpoint }) => { + // log!(logger, "{:03?} start ActiveRecving...", major); + // assert!(event.readiness().is_readable()); + let ekey = Key::from_raw(index); + 'recv_loop: while let Some(msg) = endpoint.recv().map_err(drop)? { + if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg { + if info.polarity == binding.polarity { + return Err(()); + } + let channel_id = info.channel_id; + let info = EndpointInfo { polarity: binding.polarity, channel_id }; + ms.polled_undrained.insert(ekey); + let endpoint_ext = EndpointExt { endpoint, info }; + endpoint_exts.occupy_reserved(index, endpoint_ext); + num_todos_remaining -= 1; + break 'recv_loop; + } else { + ms.delayed.push(ReceivedMsg { recipient: ekey, msg }); + } + } + } + } + } + } + assert_eq!(None, endpoint_exts.iter_reserved().next()); + drop(todos); + + // 1. construct `family', i.e. perform the sink tree setup procedure + + use {Msg::SetupMsg as S, SetupMsg::*}; + let mut messenger = (&mut ms, &mut endpoint_exts); + impl Messengerlike for (&mut MessengerState, &mut VecStorage) { + fn get_state_mut(&mut self) -> &mut MessengerState { + self.0 + } + fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint { + &mut self + .1 + .get_occupied_mut(ekey.to_raw() as usize) + .expect("OUT OF BOUNDS") + .endpoint + } + } + + // 1. broadcast my ID as the first echo. await reply from all in net_keylist + let neighbors = (0..self.bindings.len()).map(Port); + let echo = S(LeaderEcho { maybe_leader: controller_id }); + let mut awaiting = IndexSet::::with_capacity(neighbors.len()); + for n in neighbors.clone() { + messenger.send(port2key(n), echo.clone()).map_err(drop)?; + awaiting.insert(n); + } + + // 2. Receive incoming replies. whenever a higher-id echo arrives, + // adopt it as leader, sender as parent, and reset the await set. + let mut parent: Option = None; + let mut my_leader = controller_id; + messenger.undelay_all(); + 'echo_loop: while !awaiting.is_empty() || parent.is_some() { + let ReceivedMsg { recipient, msg } = + messenger.recv_until(deadline).map_err(drop)?.ok_or(())?; + let recipient = key2port(recipient); + match msg { + S(LeaderAnnounce { leader }) => { + // someone else completed the echo and became leader first! + // the sender is my parent + parent = Some(recipient); + my_leader = leader; + awaiting.clear(); + break 'echo_loop; + } + S(LeaderEcho { maybe_leader }) => { + use Ordering::*; + match maybe_leader.cmp(&my_leader) { + Less => { /* ignore */ } + Equal => { + awaiting.remove(&recipient); + if awaiting.is_empty() { + if let Some(p) = parent { + // return the echo to my parent + messenger + .send(port2key(p), S(LeaderEcho { maybe_leader })) + .map_err(drop)?; + } else { + // DECIDE! + break 'echo_loop; + } + } + } + Greater => { + // join new echo + parent = Some(recipient); + my_leader = maybe_leader; + let echo = S(LeaderEcho { maybe_leader: my_leader }); + awaiting.clear(); + if neighbors.len() == 1 { + // immediately reply to parent + messenger.send(port2key(recipient), echo.clone()).map_err(drop)?; + } else { + for n in neighbors.clone() { + if n != recipient { + messenger.send(port2key(n), echo.clone()).map_err(drop)?; + awaiting.insert(n); + } + } + } + } + } + } + msg => messenger.delay(ReceivedMsg { recipient: port2key(recipient), msg }), + } + } + match parent { + None => assert_eq!( + my_leader, controller_id, + "I've got no parent, but I consider {:?} the leader?", + my_leader + ), + Some(parent) => assert_ne!( + my_leader, controller_id, + "I have {:?} as parent, but I consider myself ({:?}) the leader?", + parent, controller_id + ), + } + + // 3. broadcast leader announcement (except to parent: confirm they are your parent) + // in this loop, every node sends 1 message to each neighbor + let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); + for n in neighbors.clone() { + let msg = + if Some(n) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() }; + messenger.send(port2key(n), msg).map_err(drop)?; + } + + // await 1 message from all non-parents + for n in neighbors.clone() { + if Some(n) != parent { + awaiting.insert(n); + } + } + let mut children = HashSet::default(); + messenger.undelay_all(); + while !awaiting.is_empty() { + let ReceivedMsg { recipient, msg } = + messenger.recv_until(deadline).map_err(drop)?.ok_or(())?; + let recipient = key2port(recipient); + match msg { + S(YouAreMyParent) => { + assert!(awaiting.remove(&recipient)); + children.insert(recipient); + } + S(SetupMsg::LeaderAnnounce { leader }) => { + assert!(awaiting.remove(&recipient)); + assert!(leader == my_leader); + assert!(Some(recipient) != parent); + // they wouldn't send me this if they considered me their parent + } + _ => messenger.delay(ReceivedMsg { recipient: port2key(recipient), msg }), + } + } + let family = Family { parent, children }; + + // 1. done! return Ok(Connected { controller_id, channel_index_stream, - components: vec![], - endpoint_exts: vec![], + protocol: protocol.clone(), + endpoint_exts, native_ports, + family, }) } + ///////// + pub fn connect_using_id( + &mut self, + controller_id: ControllerId, + protocol: &Arc, + timeout: Option, + ) -> Result { + // 1. try and create a connection from these bindings with self immutable. + let connected = self.new_connected(controller_id, protocol, timeout)?; + // 2. success! drain self and return + self.bindings.clear(); + Ok(connected) + } + pub fn connect( + &mut self, + protocol: &Arc, + timeout: Option, + ) -> Result { + self.connect_using_id(Self::random_controller_id(), protocol, timeout) + } } pub struct Protocol; impl Protocol { @@ -99,55 +403,46 @@ impl Protocol { Ok(Protocol) } } -struct ComponentExt { - protocol: Arc, - ports: HashSet, - name: Vec, -} +// struct ComponentExt { +// protocol: Arc, +// ports: HashSet, +// name: Vec, +// } pub struct Connected { native_ports: HashSet, controller_id: ControllerId, channel_index_stream: ChannelIndexStream, - endpoint_exts: Vec, // invaraint - components: Vec, + endpoint_exts: VecStorage, + protocol: Arc, + family: Family, + // components: Vec, } impl Connected { pub fn new_channel(&mut self) -> (OutPort, InPort) { assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2); - let ports = - [Port(self.endpoint_exts.len() as u32 - 1), Port(self.endpoint_exts.len() as u32)]; let channel_id = ChannelId { controller_id: self.controller_id, channel_index: self.channel_index_stream.next(), }; let [e0, e1] = Endpoint::new_memory_pair(); - self.endpoint_exts.push(EndpointExt { + let kp = self.endpoint_exts.new_occupied(EndpointExt { info: EndpointInfo { channel_id, polarity: Putter }, endpoint: e0, }); - self.endpoint_exts.push(EndpointExt { + let kg = self.endpoint_exts.new_occupied(EndpointExt { info: EndpointInfo { channel_id, polarity: Getter }, endpoint: e1, }); - for p in ports.iter() { - self.native_ports.insert(Port(p.0)); - } - (OutPort(ports[0]), InPort(ports[1])) + (OutPort(Port(kp)), InPort(Port(kg))) } - pub fn new_component( - &mut self, - protocol: &Arc, - name: Vec, - moved_ports: &[Port], - ) -> Result<(), ()> { + pub fn new_component(&mut self, _name: Vec, moved_ports: &[Port]) -> Result<(), ()> { let moved_ports = moved_ports.iter().copied().collect(); if !self.native_ports.is_superset(&moved_ports) { return Err(()); } self.native_ports.retain(|e| !moved_ports.contains(e)); - self.components.push(ComponentExt { ports: moved_ports, protocol: protocol.clone(), name }); - // TODO add a singleton machine - Ok(()) + // self.components.push(ComponentExt { ports: moved_ports, protocol: protocol.clone(), name }); + todo!() } pub fn sync_set(&mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs]) -> Result<(), ()> { Ok(()) @@ -181,10 +476,10 @@ fn api_new_test() { let net_out: OutPort = c.bind(Coupling::Active, "127.0.0.1:8000".parse().unwrap()); let net_in: InPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap()); let proto_0 = Arc::new(Protocol::parse(b"").unwrap()); - let mut c = c.connect(None).unwrap(); + let mut c = c.connect(&proto_0, None).unwrap(); let (mem_out, mem_in) = c.new_channel(); let mut inbuf = [0u8; 64]; - c.new_component(&proto_0, b"sync".to_vec(), &[net_in.into(), mem_out.into()]).unwrap(); + c.new_component(b"sync".to_vec(), &[net_in.into(), mem_out.into()]).unwrap(); let mut ops = [ PortOpRs::In { msg_range: None, port: &mem_in }, PortOpRs::Out { msg: b"hey", port: &net_out, optional: false }, diff --git a/src/runtime/experimental/vec_storage.rs b/src/runtime/experimental/vec_storage.rs index 41aa9f95114eb73b104367899ab0cae88cbc11ad..9fb241c250abd9eb2c13ff5760a795255f250f21 100644 --- a/src/runtime/experimental/vec_storage.rs +++ b/src/runtime/experimental/vec_storage.rs @@ -64,14 +64,21 @@ impl Bitvec { // invariant C: (vacant U occupied) subset of (0..data.len) // invariant D: last element of data is not in VACANT state // invariant E: number of allocated bits in vacant and occupied >= data.len() +// invariant F: vacant_bit_count == vacant.iter().count() pub struct VecStorage { data: Vec>, occupied: Bitvec, vacant: Bitvec, + occupied_bit_count: usize, } impl Default for VecStorage { fn default() -> Self { - Self { data: Default::default(), vacant: Default::default(), occupied: Default::default() } + Self { + data: Default::default(), + vacant: Default::default(), + occupied: Default::default(), + occupied_bit_count: 0, + } } } impl Debug for VecStorage { @@ -122,6 +129,9 @@ impl VecStorage { } } ////////////// + pub fn len(&self) -> usize { + self.occupied_bit_count + } pub fn with_reserved_range(range_end: usize) -> Self { let mut data = Vec::with_capacity(range_end); unsafe { @@ -134,6 +144,7 @@ impl VecStorage { data, vacant: Bitvec(chunk_iter.clone().collect()), occupied: Bitvec(chunk_iter.collect()), + occupied_bit_count: 0, } } pub fn clear(&mut self) { @@ -150,6 +161,7 @@ impl VecStorage { } self.vacant.0.clear(); self.occupied.0.clear(); + self.occupied_bit_count = 0; } pub fn iter(&self) -> impl Iterator { (0..self.data.len()).filter_map(move |i| unsafe { self.get_occupied_unchecked(i) }) @@ -213,6 +225,7 @@ impl VecStorage { self.data.get_unchecked_mut(i).as_mut_ptr().write(t); self.occupied.insert(i); }; + self.occupied_bit_count += 1; } pub fn new_occupied(&mut self, t: T) -> usize { let i = self.new_reserved(); @@ -222,6 +235,7 @@ impl VecStorage { self.data.get_unchecked_mut(i).as_mut_ptr().write(t); self.occupied.insert(i); }; + self.occupied_bit_count += 1; i } pub fn vacate(&mut self, i: usize) -> Option { @@ -236,6 +250,7 @@ impl VecStorage { unsafe { // 1. index is within bounds // 2. i is occupied => initialized data is being read + self.occupied_bit_count -= 1; Some(self.data.get_unchecked_mut(i).as_ptr().read()) } } else { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index a4a927db72cda173496233c78d0a025d78ef8b7b..8d32b65070a7ecdfabb545fad1ff440046b1db05 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -226,6 +226,39 @@ trait Messengerlike { } } } + + // attempt to receive a message from one of the endpoints before the deadline + fn recv_until( + &mut self, + deadline: Option, + ) -> Result, MessengerRecvErr> { + // try get something buffered + if let Some(x) = self.get_state_mut().undelayed.pop() { + return Ok(Some(x)); + } + + loop { + // polled_undrained may not be empty + while let Some(eekey) = self.get_state_mut().polled_undrained.pop() { + if let Some(msg) = self.get_endpoint_mut(eekey).recv()? { + // this endpoint MAY still have messages! check again in future + self.get_state_mut().polled_undrained.insert(eekey); + return Ok(Some(ReceivedMsg { recipient: eekey, msg })); + } + } + + let state = self.get_state_mut(); + match state.poll_events_until(deadline) { + Ok(()) => { + for e in state.events.iter() { + state.polled_undrained.insert(Key::from_token(e.token())); + } + } + Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), + Err(PollDeadlineErr::Timeout) => return Ok(None), + } + } + } } ///////////////////////////////// @@ -267,7 +300,7 @@ impl Default for Arena { impl Arena { pub fn alloc(&mut self, t: T) -> Key { self.storage.push(t); - Key::from_raw(self.storage.len() as u64 - 1) + Key::from_raw(self.storage.len() - 1) } pub fn get(&self, key: Key) -> Option<&T> { self.storage.get(key.to_raw() as usize) @@ -285,7 +318,7 @@ impl Arena { self.storage.len() } pub fn keyspace(&self) -> impl Iterator { - (0..(self.storage.len() as u64)).map(Key::from_raw) + (0..self.storage.len()).map(Key::from_raw) } } @@ -308,6 +341,17 @@ impl MessengerState { self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; Ok(()) } + fn poll_events_until(&mut self, deadline: Option) -> Result<(), PollDeadlineErr> { + use PollDeadlineErr::*; + self.events.clear(); + let poll_timeout = if let Some(d) = deadline { + Some(d.checked_duration_since(Instant::now()).ok_or(Timeout)?) + } else { + None + }; + self.poll.poll(&mut self.events, poll_timeout).map_err(|_| PollingFailed)?; + Ok(()) + } } impl From for ConnectErr { fn from(e: PollDeadlineErr) -> ConnectErr { diff --git a/src/test/connector.rs b/src/test/connector.rs index d89e4546e00a6cd89d7317e65ea9a4e4a1eef6e5..c0454ba8b84f8a78b03090d030b3ab13c3f79f99 100644 --- a/src/test/connector.rs +++ b/src/test/connector.rs @@ -82,7 +82,7 @@ composite fifo_1_e(in i, out o) { "; #[test] -fn connects_ok() { +fn connector_connects_ok() { // Test if we can connect natives using the given PDL /* Alice -->silence--P|A-->silence--> Bob @@ -108,7 +108,7 @@ fn connects_ok() { } #[test] -fn connected_but_silent_natives() { +fn connector_connected_but_silent_natives() { // Test if we can connect natives and have a trivial sync round /* Alice -->silence--P|A-->silence--> Bob @@ -136,7 +136,7 @@ fn connected_but_silent_natives() { } #[test] -fn self_forward_ok() { +fn connector_self_forward_ok() { // Test a deterministic system // where a native has no network bindings // and sends messages to itself @@ -166,7 +166,7 @@ fn self_forward_ok() { ])); } #[test] -fn token_spout_ok() { +fn connector_token_spout_ok() { // Test a deterministic system where the proto // creates token messages /* @@ -191,7 +191,7 @@ fn token_spout_ok() { } #[test] -fn waiter_ok() { +fn connector_waiter_ok() { // Test a stateful proto that blocks port 0 for 10 rounds // and then sends a single token on the 11th /* @@ -217,7 +217,7 @@ fn waiter_ok() { } #[test] -fn self_forward_timeout() { +fn connector_self_forward_timeout() { // Test a deterministic system // where a native has no network bindings // and sends messages to itself @@ -244,7 +244,7 @@ fn self_forward_timeout() { } #[test] -fn forward_det() { +fn connector_forward_det() { // Test if a deterministic protocol and natives can pass one message /* Alice -->forward--P|A-->forward--> Bob @@ -280,7 +280,7 @@ fn forward_det() { } #[test] -fn nondet_proto_det_natives() { +fn connector_nondet_proto_det_natives() { // Test the use of a nondeterministic protocol // where Alice decides the choice and the others conform /* @@ -318,7 +318,7 @@ fn nondet_proto_det_natives() { } #[test] -fn putter_determines() { +fn connector_putter_determines() { // putter and getter /* Alice -->sync--A|P-->sync--> Bob @@ -358,7 +358,7 @@ fn putter_determines() { } #[test] -fn getter_determines() { +fn connector_getter_determines() { // putter and getter /* Alice -->sync--A|P-->sync--> Bob @@ -399,7 +399,7 @@ fn getter_determines() { } #[test] -fn alternator_2() { +fn connector_alternator_2() { // Test a deterministic system which // alternates sending Sender's messages to A or B /* /--|-->A @@ -466,7 +466,7 @@ fn alternator_2() { } #[test] -fn composite_chain_a() { +fn connector_composite_chain_a() { // Check if composition works. Forward messages through long chains /* Alice -->sync-->sync-->A|P-->sync--> Bob @@ -505,7 +505,7 @@ fn composite_chain_a() { } #[test] -fn composite_chain_b() { +fn connector_composite_chain_b() { // Check if composition works. Forward messages through long chains /* Alice -->sync-->sync-->A|P-->sync-->sync--> Bob @@ -544,7 +544,7 @@ fn composite_chain_b() { } #[test] -fn exchange() { +fn connector_exchange() { /* /-->\ /-->P|A-->\ /-->\ Alice exchange exchange Bob @@ -589,7 +589,7 @@ fn exchange() { } #[test] -fn routing_filter() { +fn connector_routing_filter() { // Make a protocol whose behavior is a function of the contents of // a message. Here, the putter determines what is sent, and the proto // determines how it is routed @@ -650,7 +650,7 @@ fn routing_filter() { } #[test] -fn fifo_1_e() { +fn connector_fifo_1_e() { /* /-->\ Alice fifo_1