diff --git a/src/runtime/retired/endpoint.rs b/src/runtime/retired/endpoint.rs new file mode 100644 index 0000000000000000000000000000000000000000..359692552320f0bfbcbd269c5a5827581545e405 --- /dev/null +++ b/src/runtime/retired/endpoint.rs @@ -0,0 +1,219 @@ +use crate::common::*; +use crate::runtime::{errors::*, Predicate}; +use mio::{Evented, PollOpt, Ready}; + +pub(crate) enum Endpoint { + Memory { s: mio_extras::channel::Sender, r: mio_extras::channel::Receiver }, + Network(NetworkEndpoint), +} + +#[derive(Debug)] +pub(crate) struct EndpointExt { + pub endpoint: Endpoint, + pub info: EndpointInfo, +} +#[derive(Debug, Copy, Clone)] +pub struct EndpointInfo { + pub polarity: Polarity, + pub channel_id: ChannelId, +} + +#[derive(Debug, Clone)] +pub(crate) enum Decision { + Failure, + Success(Predicate), +} + +#[derive(Clone, Debug)] +pub(crate) enum Msg { + SetupMsg(SetupMsg), + CommMsg(CommMsg), +} +#[derive(Clone, Debug)] +pub(crate) enum SetupMsg { + // sent by the passive endpoint to the active endpoint + ChannelSetup { info: EndpointInfo }, + LeaderEcho { maybe_leader: ControllerId }, + LeaderAnnounce { leader: ControllerId }, + YouAreMyParent, +} +impl Into for SetupMsg { + fn into(self) -> Msg { + Msg::SetupMsg(self) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct CommMsg { + pub round_index: usize, + pub contents: CommMsgContents, +} +#[derive(Clone, Debug)] +pub(crate) enum CommMsgContents { + SendPayload { payload_predicate: Predicate, payload: Payload }, + Elaborate { partial_oracle: Predicate }, // SINKWARD + Failure, // SINKWARD + Announce { decision: Decision }, // SINKAWAYS +} + +pub struct NetworkEndpoint { + stream: mio::net::TcpStream, + inbox: Vec, + outbox: Vec, +} + +impl std::fmt::Debug for Endpoint { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + let s = match self { + Endpoint::Memory { .. } => "Memory", + Endpoint::Network(..) => "Network", + }; + f.write_fmt(format_args!("Endpoint::{}", s)) + } +} + +impl CommMsgContents { + pub fn into_msg(self, round_index: usize) -> Msg { + Msg::CommMsg(CommMsg { round_index, contents: self }) + } +} + +impl From for ConnectErr { + fn from(e: EndpointErr) -> Self { + match e { + EndpointErr::Disconnected => ConnectErr::Disconnected, + EndpointErr::MetaProtocolDeviation => ConnectErr::MetaProtocolDeviation, + } + } +} +impl Endpoint { + // asymmetric + // pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self { + // Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] }) + // } + pub(crate) fn from_fresh_stream_and_inbox(stream: mio::net::TcpStream, inbox: Vec) -> Self { + Self::Network(NetworkEndpoint { stream, inbox, outbox: vec![] }) + } + + // symmetric + pub fn new_memory_pair() -> [Self; 2] { + let (s1, r1) = mio_extras::channel::channel::(); + let (s2, r2) = mio_extras::channel::channel::(); + [Self::Memory { s: s1, r: r2 }, Self::Memory { s: s2, r: r1 }] + } + pub fn send(&mut self, msg: Msg) -> Result<(), EndpointErr> { + match self { + Self::Memory { s, .. } => s.send(msg).map_err(|_| EndpointErr::Disconnected), + Self::Network(NetworkEndpoint { stream, outbox, .. }) => { + use crate::runtime::serde::Ser; + outbox.ser(&msg).expect("ser failed"); + loop { + use std::io::Write; + match stream.write(outbox) { + Ok(0) => return Ok(()), + Ok(bytes_written) => { + outbox.drain(0..bytes_written); + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + panic!("sending shouldn't WouldBlock") + } + Err(_e) => return Err(EndpointErr::Disconnected), + } + } + } + } + } + pub fn recv(&mut self) -> Result, EndpointErr> { + match self { + Self::Memory { r, .. } => match r.try_recv() { + Ok(msg) => Ok(Some(msg)), + Err(std::sync::mpsc::TryRecvError::Empty) => Ok(None), + Err(std::sync::mpsc::TryRecvError::Disconnected) => Err(EndpointErr::Disconnected), + }, + Self::Network(NetworkEndpoint { stream, inbox, .. }) => { + // populate inbox as much as possible + 'read_loop: loop { + use std::io::Read; + match stream.read_to_end(inbox) { + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop, + Ok(0) => break 'read_loop, + Ok(_) => (), + Err(_e) => return Err(EndpointErr::Disconnected), + } + } + use crate::runtime::serde::{De, MonitoredReader}; + let mut monitored = MonitoredReader::from(&inbox[..]); + match De::::de(&mut monitored) { + Ok(msg) => { + let msg_size2 = monitored.bytes_read(); + inbox.drain(0..(msg_size2.try_into().unwrap())); + Ok(Some(msg)) + } + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(None), + Err(_) => Err(EndpointErr::MetaProtocolDeviation), + } + } + } + } +} + +impl Evented for Endpoint { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> Result<(), std::io::Error> { + match self { + Self::Memory { r, .. } => r.register(poll, token, interest, opts), + Self::Network(n) => n.register(poll, token, interest, opts), + } + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> Result<(), std::io::Error> { + match self { + Self::Memory { r, .. } => r.reregister(poll, token, interest, opts), + Self::Network(n) => n.reregister(poll, token, interest, opts), + } + } + + fn deregister(&self, poll: &Poll) -> Result<(), std::io::Error> { + match self { + Self::Memory { r, .. } => r.deregister(poll), + Self::Network(n) => n.deregister(poll), + } + } +} + +impl Evented for NetworkEndpoint { + fn register( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> Result<(), std::io::Error> { + self.stream.register(poll, token, interest, opts) + } + + fn reregister( + &self, + poll: &Poll, + token: Token, + interest: Ready, + opts: PollOpt, + ) -> Result<(), std::io::Error> { + self.stream.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &Poll) -> Result<(), std::io::Error> { + self.stream.deregister(poll) + } +}