use crate::common::*; use crate::runtime::endpoint::Endpoint; use crate::runtime::endpoint::EndpointExt; use crate::runtime::endpoint::EndpointInfo; use std::net::SocketAddr; use std::sync::Arc; pub enum Polarity { In, Out, } pub enum Coupling { Active, Passive, } pub struct Binding { pub coupling: Coupling, pub polarity: Polarity, pub addr: SocketAddr, } impl From<(Coupling, Polarity, SocketAddr)> for Binding { fn from((coupling, polarity, addr): (Coupling, Polarity, SocketAddr)) -> Self { Self { coupling, polarity, addr } } } pub struct MsgBuffer<'a> { slice: &'a mut [u8], len: usize, } impl MsgBuffer<'_> { pub fn clear(&mut self) { self.len = 0; } pub fn write_msg(&mut self, r: &[u8]) -> std::io::Result<()> { use std::io::Write; self.slice.write_all(r)?; self.len = r.len(); Ok(()) } pub fn read_msg(&self) -> &[u8] { &self.slice[0..self.len] } } impl<'a> From<&'a mut [u8]> for MsgBuffer<'a> { fn from(slice: &'a mut [u8]) -> Self { Self { slice, len: 0 } } } #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] pub struct Port(pub u32); impl From for Port { fn from(x: InPort) -> Self { x.0 } } impl From for Port { fn from(x: OutPort) -> Self { x.0 } } pub struct InPort(Port); pub struct OutPort(Port); pub enum PortOp<'a> { In { port: &'a InPort, poll: bool, msg: Option<&'a mut [u8]> }, Out { port: &'a OutPort, offer: bool, msg: Option<&'a [u8]> }, } #[derive(Default)] struct ChannelIndexStream { next: u32, } impl ChannelIndexStream { fn next(&mut self) -> u32 { self.next += 1; self.next - 1 } } enum Connector { Connecting(Connecting), Connected(Connected), } #[derive(Default)] pub struct Connecting { bindings: Vec, // invariant: no more than std::u32::MAX entries } trait Binds { fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> T; } impl Binds for Connecting { fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> InPort { self.bindings.push((coupling, Polarity::In, addr).into()); let pid: u32 = (self.bindings.len() - 1).try_into().expect("Port ID overflow!"); InPort(Port(pid)) } } impl Binds for Connecting { fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> OutPort { self.bindings.push((coupling, Polarity::Out, addr).into()); let pid: u32 = (self.bindings.len() - 1).try_into().expect("Port ID overflow!"); OutPort(Port(pid)) } } impl Connecting { pub fn connect(&mut self, timeout: Option) -> Result { let controller_id = 42; let channel_index_stream = ChannelIndexStream::default(); // drain self if successful todo!() } } pub struct Protocol; impl Protocol { pub fn parse(_pdl_text: &[u8]) -> Result { todo!() } } struct ComponentExt { protocol: Arc, ports: HashSet, name: Vec, } pub struct Connected { native_ports: HashSet, controller_id: ControllerId, channel_index_stream: ChannelIndexStream, endpoint_exts: Vec, // invaraint components: Vec, } impl Connected { pub fn new_channel(&mut self) -> (OutPort, InPort) { assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2); let ports = ( OutPort(Port(self.endpoint_exts.len() as u32 - 1)), InPort(Port(self.endpoint_exts.len() as u32)), ); let channel_id = ChannelId { controller_id: self.controller_id, channel_index: self.channel_index_stream.next(), }; let [e0, e1] = Endpoint::new_memory_pair(); self.endpoint_exts.push(EndpointExt { info: EndpointInfo { channel_id, polarity: Putter }, endpoint: e0, }); self.endpoint_exts.push(EndpointExt { info: EndpointInfo { channel_id, polarity: Getter }, endpoint: e1, }); ports } pub fn new_component( &mut self, protocol: &Arc, name: Vec, moved_ports: &[Port], ) -> Result<(), ()> { let moved_ports = moved_ports.iter().copied().collect(); if !self.native_ports.is_superset(&moved_ports) { return Err(()); } self.native_ports.retain(|e| !moved_ports.contains(e)); self.components.push(ComponentExt { ports: moved_ports, protocol: protocol.clone(), name }); // TODO add a singleton machine Ok(()) } pub fn sync_set(&mut self, ops: &mut [PortOp]) { todo!() } pub fn sync_subsets( &mut self, _ops: &mut [PortOp], bit_subsets: &[&[usize]], ) -> Result { for &bit_subset in bit_subsets { use super::bits::BitChunkIter; let chunk_iter = bit_subset.iter().copied(); for index in BitChunkIter::new(chunk_iter) { println!("index {:?}", index); } } todo!() } } #[test] fn test() { let mut c = Connecting::default(); let net_out: OutPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap()); let net_in: InPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap()); let proto_0 = Arc::new(Protocol::parse(b"").unwrap()); let mut c = c.connect(None).unwrap(); let (mem_out, mem_in) = c.new_channel(); c.new_component(&proto_0, b"sync".to_vec(), &[net_in.into(), mem_out.into()]).unwrap(); let mut buf = vec![0; 32]; let mut ops = [ PortOp::Out { port: &net_out, offer: false, msg: Some(b"hi!") }, PortOp::Out { port: &net_out, offer: false, msg: Some(b"hey!") }, PortOp::Out { port: &net_out, offer: false, msg: Some(b"hello, there!") }, PortOp::In { port: &mem_in, poll: false, msg: Some(&mut buf) }, ]; c.sync_subsets(&mut ops, &[&[0b001], &[0b010], &[0b100]]).unwrap(); }