diff --git a/src/runtime/experimental/api.rs b/src/runtime/experimental/api.rs new file mode 100644 index 0000000000000000000000000000000000000000..697d6157c1fee6ae8707cc8b7066b2ce2cfbbe28 --- /dev/null +++ b/src/runtime/experimental/api.rs @@ -0,0 +1,183 @@ +use crate::common::*; +use crate::runtime::endpoint::Endpoint; +use crate::runtime::endpoint::EndpointExt; +use crate::runtime::endpoint::EndpointInfo; + +use std::net::SocketAddr; +use std::sync::Arc; + +pub enum Coupling { + Active, + Passive, +} +pub struct Binding { + pub coupling: Coupling, + pub polarity: Polarity, + pub addr: SocketAddr, +} +impl From<(Coupling, Polarity, SocketAddr)> for Binding { + fn from((coupling, polarity, addr): (Coupling, Polarity, SocketAddr)) -> Self { + Self { coupling, polarity, addr } + } +} + +pub struct MsgBuffer<'a> { + slice: &'a mut [u8], + len: usize, +} +impl MsgBuffer<'_> { + pub fn clear(&mut self) { + self.len = 0; + } + pub fn write_msg(&mut self, r: &[u8]) -> std::io::Result<()> { + use std::io::Write; + self.slice.write_all(r)?; + self.len = r.len(); + Ok(()) + } + pub fn read_msg(&self) -> &[u8] { + &self.slice[0..self.len] + } +} +impl<'a> From<&'a mut [u8]> for MsgBuffer<'a> { + fn from(slice: &'a mut [u8]) -> Self { + Self { slice, len: 0 } + } +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub struct Port(pub u32); +pub struct PortOp<'a> { + pub port: Port, + pub msg: Option<&'a [u8]>, +} + +#[derive(Default)] +struct ChannelIndexStream { + next: u32, +} +impl ChannelIndexStream { + fn next(&mut self) -> u32 { + self.next += 1; + self.next - 1 + } +} + +enum Connector { + Connecting(Connecting), + Connected(Connected), +} + +#[derive(Default)] +pub struct Connecting { + bindings: Vec, // invariant: no more than std::u32::MAX entries +} +impl Connecting { + pub fn bind(&mut self, binding: Binding) -> Port { + self.bindings.push(binding); + // preserve invariant + let pid: u32 = (self.bindings.len() - 1).try_into().expect("Port ID overflow!"); + Port(pid) + } + pub fn connect(&mut self, timeout: Option) -> Result { + let controller_id = 42; + let channel_index_stream = ChannelIndexStream::default(); + // drain self if successful + todo!() + } +} +pub struct Protocol; +impl Protocol { + pub fn parse(_pdl_text: &[u8]) -> Result { + todo!() + } +} +struct ComponentExt { + protocol: Arc, + ports: HashSet, + name: Vec, +} +pub struct Connected { + native_ports: HashSet, + controller_id: ControllerId, + channel_index_stream: ChannelIndexStream, + endpoint_exts: Vec, // invaraint + components: Vec, +} +impl Connected { + pub fn new_channel(&mut self) -> [Port; 2] { + assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2); + let ports = + [Port(self.endpoint_exts.len() as u32 - 1), Port(self.endpoint_exts.len() as u32)]; + let channel_id = ChannelId { + controller_id: self.controller_id, + channel_index: self.channel_index_stream.next(), + }; + let [e0, e1] = Endpoint::new_memory_pair(); + self.endpoint_exts.push(EndpointExt { + info: EndpointInfo { channel_id, polarity: Putter }, + endpoint: e0, + }); + self.endpoint_exts.push(EndpointExt { + info: EndpointInfo { channel_id, polarity: Getter }, + endpoint: e1, + }); + ports + } + pub fn new_component( + &mut self, + protocol: &Arc, + name: Vec, + moved_ports: &[Port], + ) -> Result<(), ()> { + let moved_ports = moved_ports.iter().copied().collect(); + if !self.native_ports.is_superset(&moved_ports) { + return Err(()); + } + self.native_ports.retain(|e| !moved_ports.contains(e)); + self.components.push(ComponentExt { ports: moved_ports, protocol: protocol.clone(), name }); + // TODO add a singleton machine + Ok(()) + } + pub fn sync_set(&mut self, ops: &mut [PortOp]) { + todo!() + } + pub fn sync_subsets( + &mut self, + _ops: &mut [PortOp], + bit_subsets: &[&[usize]], + ) -> Result { + for &bit_subset in bit_subsets { + use super::bits::BitChunkIter; + BitChunkIter::new(bit_subset.iter().copied()); + } + todo!() + } +} + +#[test] +fn test() { + let mut c = Connecting::default(); + let p0 = c.bind(Binding { + coupling: Coupling::Active, + polarity: Putter, + addr: "127.0.0.1:8000".parse().unwrap(), + }); + let p1 = c.bind(Binding { + coupling: Coupling::Passive, + polarity: Putter, + addr: "127.0.0.1:8001".parse().unwrap(), + }); + + let proto_0 = Arc::new(Protocol::parse(b"").unwrap()); + let mut c = c.connect(None).unwrap(); + let [p2, p3] = c.new_channel(); + c.new_component(&proto_0, b"sync".to_vec(), &[p0, p2]).unwrap(); + let mut ops = [ + // + PortOp { port: p1, msg: Some(b"hi!") }, + PortOp { port: p1, msg: Some(b"ahoy!") }, + PortOp { port: p1, msg: Some(b"hello!") }, + ]; + c.sync_subsets(&mut ops, &[&[0b001], &[0b010], &[0b100]]).unwrap(); +}