diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 12cc34285eb35ebb65f10d2336aec215e407aab5..87b4702dfc5aa477113523e94ecac1bfc55dc765 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -16,6 +16,7 @@ mod tests; use crate::common::*; use error::*; +use mio::net::UdpSocket; #[derive(Debug)] pub struct Connector { @@ -132,7 +133,7 @@ enum AllMapperResult { New(Predicate), Nonexistant, } -struct Endpoint { +struct NetEndpoint { inbox: Vec, stream: TcpStream, } @@ -142,13 +143,19 @@ struct ProtoComponent { ports: HashSet, } #[derive(Debug, Clone)] -struct EndpointSetup { +struct NetEndpointSetup { sock_addr: SocketAddr, endpoint_polarity: EndpointPolarity, } + +#[derive(Debug, Clone)] +struct UdpEndpointSetup { + local_addr: SocketAddr, + peer_addr: SocketAddr, +} #[derive(Debug)] -struct EndpointExt { - endpoint: Endpoint, +struct NetEndpointExt { + net_endpoint: NetEndpoint, getter_for_incoming: PortId, } #[derive(Debug)] @@ -177,7 +184,13 @@ struct EndpointManager { polled_undrained: VecSet, delayed_messages: Vec<(usize, Msg)>, undelayed_messages: Vec<(usize, Msg)>, - endpoint_exts: Vec, + net_endpoint_exts: Vec, +} +struct UdpEndpointExt { + sock: UdpSocket, + getter_for_incoming: PortId, + outgoing_buffer: HashMap, + incoming_buffer: Vec, } #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] struct PortInfo { @@ -203,8 +216,14 @@ struct ConnectorUnphased { port_info: PortInfo, } #[derive(Debug)] +struct ConnectorSetup { + net_endpoint_setups: Vec<(PortId, NetEndpointSetup)>, + udp_endpoint_setups: Vec<(PortId, UdpEndpointSetup)>, + surplus_sockets: u16, +} +#[derive(Debug)] enum ConnectorPhased { - Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16 }, + Setup(Box), Communication(Box), } #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] @@ -217,10 +236,40 @@ struct NativeBatch { to_put: HashMap, to_get: HashSet, } +enum TokenTarget { + NetEndpoint { index: usize }, + UdpEndpoint { index: usize }, + Waker, +} //////////////// -pub fn would_block(err: &std::io::Error) -> bool { +fn would_block(err: &std::io::Error) -> bool { err.kind() == std::io::ErrorKind::WouldBlock } +impl TokenTarget { + const HALFWAY_INDEX: usize = usize::MAX / 2; + const MAX_INDEX: usize = usize::MAX; + const WAKER_TOKEN: usize = Self::MAX_INDEX; +} +impl From for TokenTarget { + fn from(Token(index): Token) -> Self { + if index == Self::MAX_INDEX { + TokenTarget::Waker + } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { + TokenTarget::UdpEndpoint { index: shifted } + } else { + TokenTarget::NetEndpoint { index } + } + } +} +impl Into for TokenTarget { + fn into(self) -> Token { + match self { + TokenTarget::Waker => Token(Self::MAX_INDEX), + TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), + TokenTarget::NetEndpoint { index } => Token(index), + } + } +} impl VecSet { fn new(mut vec: Vec) -> Self { vec.sort();