diff --git a/src/runtime/errors.rs b/src/runtime/errors.rs index f245a07ec047eebe0389499cbfa006d3f364a667..0a654f24e527816ee61793c61bffa5c8b3506ead 100644 --- a/src/runtime/errors.rs +++ b/src/runtime/errors.rs @@ -79,7 +79,7 @@ pub enum EvalErr { #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum MessengerRecvErr { PollingFailed, - EndpointErr(EndpointErr), + EndpointErr(Port, EndpointErr), } impl From for ConfigErr { fn from(e: MainComponentErr) -> Self { diff --git a/src/runtime/experimental/api.rs b/src/runtime/experimental/api.rs index 78f1353f41c5180115899939cec7f314e4378212..599ef40c0e2f2e424d3433a64025ea454b51bded 100644 --- a/src/runtime/experimental/api.rs +++ b/src/runtime/experimental/api.rs @@ -3,6 +3,9 @@ use crate::common::*; use crate::runtime::endpoint::EndpointExt; use crate::runtime::endpoint::EndpointInfo; use crate::runtime::endpoint::{Endpoint, Msg, SetupMsg}; +use crate::runtime::errors::EndpointErr; +use crate::runtime::errors::MessengerRecvErr; +use crate::runtime::errors::PollDeadlineErr; use crate::runtime::MessengerState; use crate::runtime::Messengerlike; use crate::runtime::ReceivedMsg; @@ -15,6 +18,7 @@ pub enum Coupling { Passive, } +#[derive(Debug)] struct Family { parent: Option, children: HashSet, @@ -39,7 +43,7 @@ impl From for Port { } } -#[derive(Default)] +#[derive(Default, Debug)] struct ChannelIndexStream { next: u32, } @@ -74,6 +78,37 @@ impl Binds for Connecting { OutPort(Port(self.bindings.len() - 1)) } } + +#[derive(Debug, Clone)] +pub enum ConnectErr { + BindErr(SocketAddr), + NewSocketErr(SocketAddr), + AcceptErr(SocketAddr), + ConnectionShutdown(SocketAddr), + PortKindMismatch(SocketAddr), + EndpointErr(Port, EndpointErr), + PollInitFailed, + PollingFailed, + Timeout, +} +impl From for ConnectErr { + fn from(e: PollDeadlineErr) -> Self { + use PollDeadlineErr as P; + match e { + P::PollingFailed => Self::PollingFailed, + P::Timeout => Self::Timeout, + } + } +} +impl From for ConnectErr { + fn from(e: MessengerRecvErr) -> Self { + use MessengerRecvErr as M; + match e { + M::PollingFailed => Self::PollingFailed, + M::EndpointErr(port, err) => Self::EndpointErr(port, err), + } + } +} impl Connecting { fn random_controller_id() -> ControllerId { type Bytes8 = [u8; std::mem::size_of::()]; @@ -95,16 +130,8 @@ impl Connecting { controller_id: ControllerId, protocol: &Arc, timeout: Option, - ) -> Result { - // TEMP: helper functions until Key is unified with Port - #[inline] - fn key2port(ekey: Key) -> Port { - Port(ekey.to_raw() as usize) - } - #[inline] - fn port2key(port: Port) -> Key { - Key::from_raw(port.0) - } + ) -> Result { + use ConnectErr::*; // 1. bindings correspond with ports 0..bindings.len(). For each: // - reserve a slot in endpoint_exts. @@ -115,13 +142,8 @@ impl Connecting { // 2. create MessengerState structure for polling channels let edge = PollOpt::edge(); let [ready_r, ready_w] = [Ready::readable(), Ready::writable()]; - let mut ms = MessengerState { - poll: Poll::new().map_err(drop)?, - events: Events::with_capacity(self.bindings.len()), - delayed: vec![], - undelayed: vec![], - polled_undrained: Default::default(), - }; + let mut ms = + MessengerState::with_event_capacity(self.bindings.len()).map_err(|_| PollInitFailed)?; // 3. create one TODO task per (port,binding) as a vector with indices in lockstep. // we will drain it gradually so we store elements of type Option where all are initially Some(_) @@ -141,18 +163,20 @@ impl Connecting { Coupling::Passive => { let channel_index = channel_index_stream.next(); let channel_id = ChannelId { controller_id, channel_index }; - let listener = TcpListener::bind(&binding.addr).map_err(drop)?; + let listener = + TcpListener::bind(&binding.addr).map_err(|_| BindErr(binding.addr))?; ms.poll.register(&listener, Token(index), ready_r, edge).unwrap(); // registration unique Todo::PassiveAccepting { listener, channel_id } } Coupling::Active => { - let stream = TcpStream::connect(&binding.addr).map_err(drop)?; + let stream = TcpStream::connect(&binding.addr) + .map_err(|_| NewSocketErr(binding.addr))?; ms.poll.register(&stream, Token(index), ready_w, edge).unwrap(); // registration unique Todo::ActiveConnecting { stream } } })) }) - .collect::>, ()>>()?; + .collect::>, ConnectErr>>()?; let mut num_todos_remaining = todos.len(); // 4. handle incoming events until all TODOs are completed OR we timeout @@ -160,7 +184,7 @@ impl Connecting { let mut polled_undrained_later = IndexSet::<_>::default(); let mut backoff_millis = 10; while num_todos_remaining > 0 { - ms.poll_events_until(deadline).map_err(drop)?; + ms.poll_events_until(deadline)?; for event in ms.events.iter() { let token = event.token(); let index = token.0; @@ -170,7 +194,8 @@ impl Connecting { polled_undrained_later.insert(index); } Some(Todo::PassiveAccepting { listener, channel_id }) => { - let (stream, _peer_addr) = listener.accept().map_err(drop)?; + let (stream, _peer_addr) = + listener.accept().map_err(|_| AcceptErr(binding.addr))?; ms.poll.deregister(&listener).expect("wer"); ms.poll.register(&stream, token, ready_w, edge).expect("3y5"); todos[index] = Some(Todo::PassiveConnecting { stream, channel_id }); @@ -192,26 +217,26 @@ impl Connecting { } Some(Todo::PassiveConnecting { mut stream, channel_id }) => { if !Self::test_stream_connectivity(&mut stream) { - return Err(()); + return Err(ConnectionShutdown(binding.addr)); } ms.poll.reregister(&stream, token, ready_r, edge).expect("55"); let polarity = binding.polarity; let info = EndpointInfo { polarity, channel_id }; let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info }); let mut endpoint = Endpoint::from_fresh_stream(stream); - endpoint.send(msg).map_err(drop)?; + endpoint.send(msg).map_err(|e| EndpointErr(Port(index), e))?; let endpoint_ext = EndpointExt { endpoint, info }; endpoint_exts.occupy_reserved(index, endpoint_ext); num_todos_remaining -= 1; } Some(Todo::ActiveRecving { mut endpoint }) => { - // log!(logger, "{:03?} start ActiveRecving...", major); - // assert!(event.readiness().is_readable()); let ekey = Key::from_raw(index); - 'recv_loop: while let Some(msg) = endpoint.recv().map_err(drop)? { + 'recv_loop: while let Some(msg) = + endpoint.recv().map_err(|e| EndpointErr(Port(index), e))? + { if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg { if info.polarity == binding.polarity { - return Err(()); + return Err(PortKindMismatch(binding.addr)); } let channel_id = info.channel_id; let info = EndpointInfo { polarity: binding.polarity, channel_id }; @@ -232,7 +257,6 @@ impl Connecting { drop(todos); // 1. construct `family', i.e. perform the sink tree setup procedure - use {Msg::SetupMsg as S, SetupMsg::*}; let mut messenger = (&mut ms, &mut endpoint_exts); impl Messengerlike for (&mut MessengerState, &mut VecStorage) { @@ -253,7 +277,7 @@ impl Connecting { let echo = S(LeaderEcho { maybe_leader: controller_id }); let mut awaiting = IndexSet::::with_capacity(neighbors.len()); for n in neighbors.clone() { - messenger.send(port2key(n), echo.clone()).map_err(drop)?; + messenger.send(n, echo.clone()).map_err(|e| EndpointErr(n, e))?; awaiting.insert(n); } @@ -263,9 +287,7 @@ impl Connecting { let mut my_leader = controller_id; messenger.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let ReceivedMsg { recipient, msg } = - messenger.recv_until(deadline).map_err(drop)?.ok_or(())?; - let recipient = key2port(recipient); + let ReceivedMsg { recipient, msg } = messenger.recv_until(deadline)?.ok_or(Timeout)?; match msg { S(LeaderAnnounce { leader }) => { // someone else completed the echo and became leader first! @@ -285,8 +307,8 @@ impl Connecting { if let Some(p) = parent { // return the echo to my parent messenger - .send(port2key(p), S(LeaderEcho { maybe_leader })) - .map_err(drop)?; + .send(p, S(LeaderEcho { maybe_leader })) + .map_err(|e| EndpointErr(p, e))?; } else { // DECIDE! break 'echo_loop; @@ -301,11 +323,15 @@ impl Connecting { awaiting.clear(); if neighbors.len() == 1 { // immediately reply to parent - messenger.send(port2key(recipient), echo.clone()).map_err(drop)?; + messenger + .send(recipient, echo.clone()) + .map_err(|e| EndpointErr(recipient, e))?; } else { for n in neighbors.clone() { if n != recipient { - messenger.send(port2key(n), echo.clone()).map_err(drop)?; + messenger + .send(n, echo.clone()) + .map_err(|e| EndpointErr(n, e))?; awaiting.insert(n); } } @@ -313,7 +339,7 @@ impl Connecting { } } } - msg => messenger.delay(ReceivedMsg { recipient: port2key(recipient), msg }), + msg => messenger.delay(ReceivedMsg { recipient, msg }), } } match parent { @@ -335,7 +361,7 @@ impl Connecting { for n in neighbors.clone() { let msg = if Some(n) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() }; - messenger.send(port2key(n), msg).map_err(drop)?; + messenger.send(n, msg).map_err(|e| EndpointErr(n, e))?; } // await 1 message from all non-parents @@ -347,9 +373,8 @@ impl Connecting { let mut children = HashSet::default(); messenger.undelay_all(); while !awaiting.is_empty() { - let ReceivedMsg { recipient, msg } = - messenger.recv_until(deadline).map_err(drop)?.ok_or(())?; - let recipient = key2port(recipient); + let ReceivedMsg { recipient, msg } = messenger.recv_until(deadline)?.ok_or(Timeout)?; + let recipient = recipient; match msg { S(YouAreMyParent) => { assert!(awaiting.remove(&recipient)); @@ -361,7 +386,7 @@ impl Connecting { assert!(Some(recipient) != parent); // they wouldn't send me this if they considered me their parent } - _ => messenger.delay(ReceivedMsg { recipient: port2key(recipient), msg }), + _ => messenger.delay(ReceivedMsg { recipient, msg }), } } let family = Family { parent, children }; @@ -382,7 +407,7 @@ impl Connecting { controller_id: ControllerId, protocol: &Arc, timeout: Option, - ) -> Result { + ) -> Result { // 1. try and create a connection from these bindings with self immutable. let connected = self.new_connected(controller_id, protocol, timeout)?; // 2. success! drain self and return @@ -393,21 +418,18 @@ impl Connecting { &mut self, protocol: &Arc, timeout: Option, - ) -> Result { + ) -> Result { self.connect_using_id(Self::random_controller_id(), protocol, timeout) } } +#[derive(Debug)] pub struct Protocol; impl Protocol { pub fn parse(_pdl_text: &[u8]) -> Result { Ok(Protocol) } } -// struct ComponentExt { -// protocol: Arc, -// ports: HashSet, -// name: Vec, -// } +#[derive(Debug)] pub struct Connected { native_ports: HashSet, controller_id: ControllerId, @@ -415,7 +437,6 @@ pub struct Connected { endpoint_exts: VecStorage, protocol: Arc, family: Family, - // components: Vec, } impl Connected { pub fn new_channel(&mut self) -> (OutPort, InPort) { @@ -567,3 +588,31 @@ unsafe fn as_mut_slice<'a, T>(len: usize, ptr: *mut T) -> &'a mut [T] { unsafe fn as_const_slice<'a, T>(len: usize, ptr: *const T) -> &'a [T] { std::slice::from_raw_parts(ptr, len) } + +#[test] +fn api_connecting() { + let addrs: [SocketAddr; 3] = [ + "127.0.0.1:8888".parse().unwrap(), + "127.0.0.1:8889".parse().unwrap(), + "127.0.0.1:8890".parse().unwrap(), + ]; + let protocol1 = Arc::new(Protocol::parse(b"").unwrap()); + let protocol2 = protocol1.clone(); + let handles = vec![ + std::thread::spawn(move || { + let mut connecting = Connecting::default(); + let _a: OutPort = connecting.bind(Coupling::Active, addrs[0]); + let connected = connecting.connect(&protocol1, None); + println!("A: {:#?}", connected); + }), + std::thread::spawn(move || { + let mut connecting = Connecting::default(); + let _a: OutPort = connecting.bind(Coupling::Passive, addrs[0]); + let connected = connecting.connect(&protocol2, Some(Duration::from_secs(2))); + println!("B: {:#?}", connected); + }), + ]; + for h in handles { + h.join().unwrap(); + } +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 8d32b65070a7ecdfabb545fad1ff440046b1db05..eb8b9070a228c6849028a08cdbb06c0c6d2d5ddf 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -207,7 +207,11 @@ trait Messengerlike { loop { // polled_undrained may not be empty while let Some(eekey) = self.get_state_mut().polled_undrained.pop() { - if let Some(msg) = self.get_endpoint_mut(eekey).recv()? { + if let Some(msg) = self + .get_endpoint_mut(eekey) + .recv() + .map_err(|e| MessengerRecvErr::EndpointErr(eekey, e))? + { // this endpoint MAY still have messages! check again in future self.get_state_mut().polled_undrained.insert(eekey); return Ok(Some(ReceivedMsg { recipient: eekey, msg })); @@ -240,7 +244,11 @@ trait Messengerlike { loop { // polled_undrained may not be empty while let Some(eekey) = self.get_state_mut().polled_undrained.pop() { - if let Some(msg) = self.get_endpoint_mut(eekey).recv()? { + if let Some(msg) = self + .get_endpoint_mut(eekey) + .recv() + .map_err(|e| MessengerRecvErr::EndpointErr(eekey, e))? + { // this endpoint MAY still have messages! check again in future self.get_state_mut().polled_undrained.insert(eekey); return Ok(Some(ReceivedMsg { recipient: eekey, msg })); @@ -287,11 +295,11 @@ impl From for ConnectErr { ConnectErr::MessengerRecvErr(e) } } -impl From for MessengerRecvErr { - fn from(e: EndpointErr) -> MessengerRecvErr { - MessengerRecvErr::EndpointErr(e) - } -} +// impl From for MessengerRecvErr { +// fn from(e: EndpointErr) -> MessengerRecvErr { +// MessengerRecvErr::EndpointErr(e) +// } +// } impl Default for Arena { fn default() -> Self { Self { storage: vec![] } @@ -333,6 +341,15 @@ impl ChannelIdStream { } impl MessengerState { + fn with_event_capacity(event_capacity: usize) -> Result { + Ok(Self { + poll: Poll::new()?, + events: Events::with_capacity(event_capacity), + delayed: Default::default(), + undelayed: Default::default(), + polled_undrained: Default::default(), + }) + } // does NOT guarantee that events is non-empty fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> { use PollDeadlineErr::*;