Files @ 869ae2e270a1
Branch filter:

Location: CSY/reowolf/src/runtime/experimental/api.rs - annotation

869ae2e270a1 6.0 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
Christopher Esterhuyse
more
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
6471206c5c59
869ae2e270a1
6471206c5c59
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
869ae2e270a1
6471206c5c59
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
6471206c5c59
869ae2e270a1
869ae2e270a1
6471206c5c59
6471206c5c59
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
6471206c5c59
869ae2e270a1
869ae2e270a1
869ae2e270a1
869ae2e270a1
6471206c5c59
6471206c5c59
6471206c5c59
use crate::common::*;
use crate::runtime::endpoint::Endpoint;
use crate::runtime::endpoint::EndpointExt;
use crate::runtime::endpoint::EndpointInfo;

use std::net::SocketAddr;
use std::sync::Arc;

pub enum Polarity {
    In,
    Out,
}
pub enum Coupling {
    Active,
    Passive,
}
pub struct Binding {
    pub coupling: Coupling,
    pub polarity: Polarity,
    pub addr: SocketAddr,
}
impl From<(Coupling, Polarity, SocketAddr)> for Binding {
    fn from((coupling, polarity, addr): (Coupling, Polarity, SocketAddr)) -> Self {
        Self { coupling, polarity, addr }
    }
}

pub struct MsgBuffer<'a> {
    slice: &'a mut [u8],
    len: usize,
}
impl MsgBuffer<'_> {
    pub fn clear(&mut self) {
        self.len = 0;
    }
    pub fn write_msg(&mut self, r: &[u8]) -> std::io::Result<()> {
        use std::io::Write;
        self.slice.write_all(r)?;
        self.len = r.len();
        Ok(())
    }
    pub fn read_msg(&self) -> &[u8] {
        &self.slice[0..self.len]
    }
}
impl<'a> From<&'a mut [u8]> for MsgBuffer<'a> {
    fn from(slice: &'a mut [u8]) -> Self {
        Self { slice, len: 0 }
    }
}

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub struct Port(pub u32);
impl From<InPort> for Port {
    fn from(x: InPort) -> Self {
        x.0
    }
}
impl From<OutPort> for Port {
    fn from(x: OutPort) -> Self {
        x.0
    }
}
pub struct InPort(Port);
pub struct OutPort(Port);
pub enum PortOp<'a> {
    In { port: &'a InPort, poll: bool, msg: Option<&'a mut [u8]> },
    Out { port: &'a OutPort, offer: bool, msg: Option<&'a [u8]> },
}

#[derive(Default)]
struct ChannelIndexStream {
    next: u32,
}
impl ChannelIndexStream {
    fn next(&mut self) -> u32 {
        self.next += 1;
        self.next - 1
    }
}

enum Connector {
    Connecting(Connecting),
    Connected(Connected),
}

#[derive(Default)]
pub struct Connecting {
    bindings: Vec<Binding>, // invariant: no more than std::u32::MAX entries
}
trait Binds<T> {
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> T;
}
impl Binds<InPort> for Connecting {
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> InPort {
        self.bindings.push((coupling, Polarity::In, addr).into());
        let pid: u32 = (self.bindings.len() - 1).try_into().expect("Port ID overflow!");
        InPort(Port(pid))
    }
}
impl Binds<OutPort> for Connecting {
    fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> OutPort {
        self.bindings.push((coupling, Polarity::Out, addr).into());
        let pid: u32 = (self.bindings.len() - 1).try_into().expect("Port ID overflow!");
        OutPort(Port(pid))
    }
}
impl Connecting {
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<Connected, ()> {
        let controller_id = 42;
        let channel_index_stream = ChannelIndexStream::default();
        // drain self if successful
        todo!()
    }
}
pub struct Protocol;
impl Protocol {
    pub fn parse(_pdl_text: &[u8]) -> Result<Self, ()> {
        todo!()
    }
}
struct ComponentExt {
    protocol: Arc<Protocol>,
    ports: HashSet<Port>,
    name: Vec<u8>,
}
pub struct Connected {
    native_ports: HashSet<Port>,
    controller_id: ControllerId,
    channel_index_stream: ChannelIndexStream,
    endpoint_exts: Vec<EndpointExt>, // invaraint
    components: Vec<ComponentExt>,
}
impl Connected {
    pub fn new_channel(&mut self) -> (OutPort, InPort) {
        assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2);
        let ports = (
            OutPort(Port(self.endpoint_exts.len() as u32 - 1)),
            InPort(Port(self.endpoint_exts.len() as u32)),
        );
        let channel_id = ChannelId {
            controller_id: self.controller_id,
            channel_index: self.channel_index_stream.next(),
        };
        let [e0, e1] = Endpoint::new_memory_pair();
        self.endpoint_exts.push(EndpointExt {
            info: EndpointInfo { channel_id, polarity: Putter },
            endpoint: e0,
        });
        self.endpoint_exts.push(EndpointExt {
            info: EndpointInfo { channel_id, polarity: Getter },
            endpoint: e1,
        });
        ports
    }
    pub fn new_component(
        &mut self,
        protocol: &Arc<Protocol>,
        name: Vec<u8>,
        moved_ports: &[Port],
    ) -> Result<(), ()> {
        let moved_ports = moved_ports.iter().copied().collect();
        if !self.native_ports.is_superset(&moved_ports) {
            return Err(());
        }
        self.native_ports.retain(|e| !moved_ports.contains(e));
        self.components.push(ComponentExt { ports: moved_ports, protocol: protocol.clone(), name });
        // TODO add a singleton machine
        Ok(())
    }
    pub fn sync_set(&mut self, ops: &mut [PortOp]) {
        todo!()
    }
    pub fn sync_subsets(
        &mut self,
        _ops: &mut [PortOp],
        bit_subsets: &[&[usize]],
    ) -> Result<usize, ()> {
        for &bit_subset in bit_subsets {
            use super::bits::BitChunkIter;
            let chunk_iter = bit_subset.iter().copied();
            for index in BitChunkIter::new(chunk_iter) {
                println!("index {:?}", index);
            }
        }
        todo!()
    }
}

#[test]
fn test() {
    let mut c = Connecting::default();
    let net_out: OutPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap());
    let net_in: InPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap());
    let proto_0 = Arc::new(Protocol::parse(b"").unwrap());
    let mut c = c.connect(None).unwrap();
    let (mem_out, mem_in) = c.new_channel();
    c.new_component(&proto_0, b"sync".to_vec(), &[net_in.into(), mem_out.into()]).unwrap();

    let mut buf = vec![0; 32];
    let mut ops = [
        PortOp::Out { port: &net_out, offer: false, msg: Some(b"hi!") },
        PortOp::Out { port: &net_out, offer: false, msg: Some(b"hey!") },
        PortOp::Out { port: &net_out, offer: false, msg: Some(b"hello, there!") },
        PortOp::In { port: &mem_in, poll: false, msg: Some(&mut buf) },
    ];
    c.sync_subsets(&mut ops, &[&[0b001], &[0b010], &[0b100]]).unwrap();
}