Changeset - 44a98be4e4b4
[Not reviewed]
0 17 2
Christopher Esterhuyse - 5 years ago 2020-06-19 17:04:03
christopher.esterhuyse@gmail.com
beginning large overhaul: moving to globally-unique ports & port -> endpoint route mappings
19 files changed with 1582 insertions and 1304 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -13,6 +13,7 @@ maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.2.1"
 
serde = { version = "1.0.112", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 
take_mut = "0.2.2"
 
@@ -21,8 +22,9 @@ indexmap = "1.3.0" # hashsets/hashmaps with efficient arbitrary element removal
 
# network
 
integer-encoding = "1.0.7"
 
byteorder = "1.3.2"
 
mio = "0.6.21" # migrate to mio v0.7.0 when it stabilizes
 
mio = "0.6.21"
 
mio-extras = "2.0.6"
 
mio07 = { version = "0.7.0", package = "mio", features = ["tcp", "os-poll"] }
 

	
 
# protocol
 
# id-arena = "2.2.1"
src/common.rs
Show inline comments
 
///////////////////// PRELUDE /////////////////////
 

	
 
pub use crate::protocol::{ComponentState, ProtocolDescription};
 
pub use crate::runtime::{NonsyncContext, SyncContext};
 

	
 
pub use core::{
 
    cmp::Ordering,
 
    fmt::{Debug, Formatter},
 
@@ -16,6 +19,7 @@ pub use mio::{
 
pub use std::{
 
    collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
 
    convert::TryInto,
 
    io::{Read, Write},
 
    net::SocketAddr,
 
    sync::Arc,
 
    time::Instant,
 
@@ -25,98 +29,53 @@ pub use Polarity::*;
 
///////////////////// DEFS /////////////////////
 

	
 
pub type ControllerId = u32;
 
pub type ChannelIndex = u32;
 
pub type PortSuffix = u32;
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd)]
 
// globally unique
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct PortId {
 
    pub(crate) controller_id: ControllerId,
 
    pub(crate) port_index: u32,
 
    pub(crate) port_index: PortSuffix,
 
}
 

	
 
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
pub struct Payload(Arc<Vec<u8>>);
 

	
 
/// This is a unique identifier for a channel (i.e., port).
 
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]
 
pub struct ChannelId {
 
    pub(crate) controller_id: ControllerId,
 
    pub(crate) channel_index: ChannelIndex,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub enum Polarity {
 
    Putter, // output port (from the perspective of the component)
 
    Getter, // input port (from the perspective of the component)
 
}
 

	
 
#[derive(
 
    Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub struct Port(pub u32); // ports are COPY
 

	
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum MainComponentErr {
 
pub enum AddComponentError {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(Port),
 
    CannotMovePort(PortId),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(Port),
 
    WrongPortPolarity { param_index: usize, port: Port },
 
    DuplicateMovedPort(Port),
 
}
 
pub trait ProtocolDescription: Sized {
 
    type S: ComponentState<D = Self>;
 

	
 
    fn parse(pdl: &[u8]) -> Result<Self, String>;
 
    fn component_polarities(&self, identifier: &[u8]) -> Result<Vec<Polarity>, MainComponentErr>;
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Port]) -> Self::S;
 
}
 

	
 
pub trait ComponentState: Sized + Clone {
 
    type D: ProtocolDescription;
 
    fn pre_sync_run<C: MonoContext<D = Self::D, S = Self>>(
 
        &mut self,
 
        runtime_ctx: &mut C,
 
        protocol_description: &Self::D,
 
    ) -> MonoBlocker;
 

	
 
    fn sync_run<C: PolyContext<D = Self::D>>(
 
        &mut self,
 
        runtime_ctx: &mut C,
 
        protocol_description: &Self::D,
 
    ) -> PolyBlocker;
 
    UnknownPort(PortId),
 
    WrongPortPolarity { port: PortId, expected_polarity: Polarity },
 
    DuplicateMovedPort(PortId),
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum MonoBlocker {
 
pub enum NonsyncBlocker {
 
    Inconsistent,
 
    ComponentExit,
 
    SyncBlockStart,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum PolyBlocker {
 
pub enum SyncBlocker {
 
    Inconsistent,
 
    SyncBlockEnd,
 
    CouldntReadMsg(Port),
 
    CouldntCheckFiring(Port),
 
    PutMsg(Port, Payload),
 
}
 

	
 
pub trait MonoContext {
 
    type D: ProtocolDescription;
 
    type S: ComponentState<D = Self::D>;
 

	
 
    fn new_component(&mut self, moved_ports: HashSet<Port>, init_state: Self::S);
 
    fn new_channel(&mut self) -> [Port; 2];
 
    fn new_random(&mut self) -> u64;
 
}
 
pub trait PolyContext {
 
    type D: ProtocolDescription;
 

	
 
    fn is_firing(&mut self, port: Port) -> Option<bool>;
 
    fn read_msg(&mut self, port: Port) -> Option<&Payload>;
 
    CouldntReadMsg(PortId),
 
    CouldntCheckFiring(PortId),
 
    PutMsg(PortId, Payload),
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
@@ -176,22 +135,8 @@ impl From<Vec<u8>> for Payload {
 
        Self(s.into())
 
    }
 
}
 
impl Debug for Port {
 
impl Debug for PortId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "Port({})", self.0)
 
    }
 
}
 
impl Port {
 
    pub fn from_raw(raw: u32) -> Self {
 
        Self(raw)
 
    }
 
    pub fn to_raw(self) -> u32 {
 
        self.0
 
    }
 
    pub fn to_token(self) -> mio::Token {
 
        mio::Token(self.0.try_into().unwrap())
 
    }
 
    pub fn from_token(t: mio::Token) -> Self {
 
        Self(t.0.try_into().unwrap())
 
        write!(f, "PortId({},{})", self.controller_id, self.port_index)
 
    }
 
}
src/lib.rs
Show inline comments
 
@@ -5,10 +5,12 @@ mod common;
 
mod protocol;
 
mod runtime;
 

	
 
#[cfg(test)]
 
mod test;
 
// #[cfg(test)]
 
// mod test;
 

	
 
pub use runtime::{errors, Connector, PortBinding};
 
pub use common::Polarity;
 
pub use protocol::ProtocolDescription;
 
pub use runtime::{Connector, EndpointSetup, StringLogger};
 

	
 
#[cfg(feature = "ffi")]
 
pub use runtime::ffi;
 
// #[cfg(feature = "ffi")]
 
// pub use runtime::ffi;
src/macros.rs
Show inline comments
 
macro_rules! log {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        use std::fmt::Write;
 
        writeln!($logger, $($arg)*).unwrap();
 
        let _ = write!($logger.line_writer(), $($arg)*).unwrap();
 
    }};
 
}
 
macro_rules! assert_let {
 
    ($pat:pat = $expr:expr => $work:expr) => {
 
        if let $pat = $expr {
 
            $work
 
        } else {
 
            panic!("assert_let failed");
 
        }
 
    };
 
}
 
// macro_rules! assert_let {
 
//     ($pat:pat = $expr:expr => $work:expr) => {
 
//         if let $pat = $expr {
 
//             $work
 
//         } else {
 
//             panic!("assert_let failed");
 
//         }
 
//     };
 
// }
 

	
 
#[test]
 
fn assert_let() {
 
    let x = Some(5);
 
    let z = assert_let![Some(y) = x => {
 
        println!("{:?}", y);
 
        3
 
    }];
 
    println!("{:?}", z);
 
}
 
// #[test]
 
// fn assert_let() {
 
//     let x = Some(5);
 
//     let z = assert_let![Some(y) = x => {
 
//         println!("{:?}", y);
 
//         3
 
//     }];
 
//     println!("{:?}", z);
 
// }
 

	
 
#[test]
 
#[should_panic]
 
fn must_let_panic() {
 
    let x: Option<u32> = None;
 
    assert_let![Some(y) = x => {
 
        println!("{:?}", y);
 
    }];
 
}
 
// #[test]
 
// #[should_panic]
 
// fn must_let_panic() {
 
//     let x: Option<u32> = None;
 
//     assert_let![Some(y) = x => {
 
//         println!("{:?}", y);
 
//     }];
 
// }
src/protocol/arena.rs
Show inline comments
 
use crate::common::*;
 
use core::hash::Hash;
 
use core::marker::PhantomData;
 

	
 
#[derive(Debug, serde::Serialize, serde::Deserialize)]
 
#[derive(serde::Serialize, serde::Deserialize)]
 
pub struct Id<T> {
 
    index: u32,
 
    _phantom: PhantomData<T>,
 
}
 
#[derive(Debug, serde::Serialize, serde::Deserialize)]
 
pub struct Arena<T> {
 
    store: Vec<T>,
 
}
 
//////////////////////////////////
 

	
 
impl<T> Debug for Id<T> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Id").field("index", &self.index).finish()
 
    }
 
}
 
impl<T> Clone for Id<T> {
 
    fn clone(&self) -> Self {
 
        *self
 
@@ -23,11 +35,6 @@ impl<T> Hash for Id<T> {
 
        self.index.hash(h);
 
    }
 
}
 

	
 
#[derive(Debug, serde::Serialize, serde::Deserialize)]
 
pub struct Arena<T> {
 
    store: Vec<T>,
 
}
 
impl<T> Arena<T> {
 
    pub fn new() -> Self {
 
        Self { store: vec![] }
src/protocol/eval.rs
Show inline comments
 
@@ -886,7 +886,7 @@ impl Display for Value {
 
}
 

	
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct InputValue(pub Port);
 
pub struct InputValue(pub PortId);
 

	
 
impl Display for InputValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
@@ -911,7 +911,7 @@ impl ValueImpl for InputValue {
 
}
 

	
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct OutputValue(pub Port);
 
pub struct OutputValue(pub PortId);
 

	
 
impl Display for OutputValue {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
 
@@ -1803,38 +1803,38 @@ impl Prompt {
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    extern crate test_generator;
 

	
 
    use std::fs::File;
 
    use std::io::Read;
 
    use std::path::Path;
 
    use test_generator::test_resources;
 

	
 
    use super::*;
 

	
 
    #[test_resources("testdata/eval/positive/*.pdl")]
 
    fn batch1(resource: &str) {
 
        let path = Path::new(resource);
 
        let expect = path.with_extension("txt");
 
        let mut heap = Heap::new();
 
        let mut source = InputSource::from_file(&path).unwrap();
 
        let mut parser = Parser::new(&mut source);
 
        let pd = parser.parse(&mut heap).unwrap();
 
        let def = heap[pd].get_definition_ident(&heap, b"test").unwrap();
 
        let fun = heap[def].as_function().this;
 
        let args = Vec::new();
 
        let result = Prompt::compute_function(&heap, fun, &args).unwrap();
 
        let valstr: String = format!("{}", result);
 
        println!("{}", valstr);
 

	
 
        let mut cev: Vec<u8> = Vec::new();
 
        let mut f = File::open(expect).unwrap();
 
        f.read_to_end(&mut cev).unwrap();
 
        let lavstr = String::from_utf8_lossy(&cev);
 
        println!("{}", lavstr);
 

	
 
        assert_eq!(valstr, lavstr);
 
    }
 
}
 
// #[cfg(test)]
 
// mod tests {
 
//     extern crate test_generator;
 

	
 
//     use std::fs::File;
 
//     use std::io::Read;
 
//     use std::path::Path;
 
//     use test_generator::test_resources;
 

	
 
//     use super::*;
 

	
 
//     #[test_resources("testdata/eval/positive/*.pdl")]
 
//     fn batch1(resource: &str) {
 
//         let path = Path::new(resource);
 
//         let expect = path.with_extension("txt");
 
//         let mut heap = Heap::new();
 
//         let mut source = InputSource::from_file(&path).unwrap();
 
//         let mut parser = Parser::new(&mut source);
 
//         let pd = parser.parse(&mut heap).unwrap();
 
//         let def = heap[pd].get_definition_ident(&heap, b"test").unwrap();
 
//         let fun = heap[def].as_function().this;
 
//         let args = Vec::new();
 
//         let result = Prompt::compute_function(&heap, fun, &args).unwrap();
 
//         let valstr: String = format!("{}", result);
 
//         println!("{}", valstr);
 

	
 
//         let mut cev: Vec<u8> = Vec::new();
 
//         let mut f = File::open(expect).unwrap();
 
//         f.read_to_end(&mut cev).unwrap();
 
//         let lavstr = String::from_utf8_lossy(&cev);
 
//         println!("{}", lavstr);
 

	
 
//         assert_eq!(valstr, lavstr);
 
//     }
 
// }
src/protocol/inputsource.rs
Show inline comments
 
@@ -311,96 +311,96 @@ impl fmt::Display for DisplayEvalError<'_> {
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 
// #[cfg(test)]
 
// mod tests {
 
//     use super::*;
 

	
 
    #[test]
 
    fn test_from_string() {
 
        let mut is = InputSource::from_string("#version 100\n").unwrap();
 
        assert!(is.input.len() == 13);
 
        assert!(is.line == 1);
 
        assert!(is.column == 1);
 
        assert!(is.offset == 0);
 
        let ps = is.pos();
 
        assert!(ps.line == 1);
 
        assert!(ps.column == 1);
 
        assert!(ps.offset == 0);
 
        assert!(is.next() == Some(b'#'));
 
        is.consume();
 
        assert!(is.next() == Some(b'v'));
 
        assert!(is.lookahead(1) == Some(b'e'));
 
        is.consume();
 
        assert!(is.next() == Some(b'e'));
 
        is.consume();
 
        assert!(is.next() == Some(b'r'));
 
        is.consume();
 
        assert!(is.next() == Some(b's'));
 
        is.consume();
 
        assert!(is.next() == Some(b'i'));
 
        is.consume();
 
        {
 
            let ps = is.pos();
 
            assert_eq!(b"#version 100", ps.context(&is));
 
            let er = is.error("hello world!");
 
            let mut vec: Vec<u8> = Vec::new();
 
            er.write(&is, &mut vec).unwrap();
 
            assert_eq!(
 
                "Parse error at 1:7: hello world!\n#version 100\n      ^\n",
 
                String::from_utf8_lossy(&vec)
 
            );
 
        }
 
        assert!(is.next() == Some(b'o'));
 
        is.consume();
 
        assert!(is.next() == Some(b'n'));
 
        is.consume();
 
        assert!(is.input.len() == 13);
 
        assert!(is.line == 1);
 
        assert!(is.column == 9);
 
        assert!(is.offset == 8);
 
        assert!(is.next() == Some(b' '));
 
        is.consume();
 
        assert!(is.next() == Some(b'1'));
 
        is.consume();
 
        assert!(is.next() == Some(b'0'));
 
        is.consume();
 
        assert!(is.next() == Some(b'0'));
 
        is.consume();
 
        assert!(is.input.len() == 13);
 
        assert!(is.line == 1);
 
        assert!(is.column == 13);
 
        assert!(is.offset == 12);
 
        assert!(is.next() == Some(b'\n'));
 
        is.consume();
 
        assert!(is.input.len() == 13);
 
        assert!(is.line == 2);
 
        assert!(is.column == 1);
 
        assert!(is.offset == 13);
 
        {
 
            let ps = is.pos();
 
            assert_eq!(b"", ps.context(&is));
 
        }
 
        assert!(is.next() == None);
 
        is.consume();
 
        assert!(is.next() == None);
 
    }
 
//     #[test]
 
//     fn test_from_string() {
 
//         let mut is = InputSource::from_string("#version 100\n").unwrap();
 
//         assert!(is.input.len() == 13);
 
//         assert!(is.line == 1);
 
//         assert!(is.column == 1);
 
//         assert!(is.offset == 0);
 
//         let ps = is.pos();
 
//         assert!(ps.line == 1);
 
//         assert!(ps.column == 1);
 
//         assert!(ps.offset == 0);
 
//         assert!(is.next() == Some(b'#'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'v'));
 
//         assert!(is.lookahead(1) == Some(b'e'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'e'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'r'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b's'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'i'));
 
//         is.consume();
 
//         {
 
//             let ps = is.pos();
 
//             assert_eq!(b"#version 100", ps.context(&is));
 
//             let er = is.error("hello world!");
 
//             let mut vec: Vec<u8> = Vec::new();
 
//             er.write(&is, &mut vec).unwrap();
 
//             assert_eq!(
 
//                 "Parse error at 1:7: hello world!\n#version 100\n      ^\n",
 
//                 String::from_utf8_lossy(&vec)
 
//             );
 
//         }
 
//         assert!(is.next() == Some(b'o'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'n'));
 
//         is.consume();
 
//         assert!(is.input.len() == 13);
 
//         assert!(is.line == 1);
 
//         assert!(is.column == 9);
 
//         assert!(is.offset == 8);
 
//         assert!(is.next() == Some(b' '));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'1'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'0'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'0'));
 
//         is.consume();
 
//         assert!(is.input.len() == 13);
 
//         assert!(is.line == 1);
 
//         assert!(is.column == 13);
 
//         assert!(is.offset == 12);
 
//         assert!(is.next() == Some(b'\n'));
 
//         is.consume();
 
//         assert!(is.input.len() == 13);
 
//         assert!(is.line == 2);
 
//         assert!(is.column == 1);
 
//         assert!(is.offset == 13);
 
//         {
 
//             let ps = is.pos();
 
//             assert_eq!(b"", ps.context(&is));
 
//         }
 
//         assert!(is.next() == None);
 
//         is.consume();
 
//         assert!(is.next() == None);
 
//     }
 

	
 
    #[test]
 
    fn test_split() {
 
        let mut is = InputSource::from_string("#version 100\n").unwrap();
 
        let backup = is.clone();
 
        assert!(is.next() == Some(b'#'));
 
        is.consume();
 
        assert!(is.next() == Some(b'v'));
 
        is.consume();
 
        assert!(is.next() == Some(b'e'));
 
        is.consume();
 
        is = backup;
 
        assert!(is.next() == Some(b'#'));
 
        is.consume();
 
        assert!(is.next() == Some(b'v'));
 
        is.consume();
 
        assert!(is.next() == Some(b'e'));
 
        is.consume();
 
    }
 
}
 
//     #[test]
 
//     fn test_split() {
 
//         let mut is = InputSource::from_string("#version 100\n").unwrap();
 
//         let backup = is.clone();
 
//         assert!(is.next() == Some(b'#'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'v'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'e'));
 
//         is.consume();
 
//         is = backup;
 
//         assert!(is.next() == Some(b'#'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'v'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'e'));
 
//         is.consume();
 
//     }
 
// }
src/protocol/lexer.rs
Show inline comments
 
@@ -1656,124 +1656,124 @@ impl Lexer<'_> {
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use crate::protocol::ast::Expression::*;
 
    use crate::protocol::{ast, lexer::*};
 
// #[cfg(test)]
 
// mod tests {
 
//     use crate::protocol::ast::Expression::*;
 
//     use crate::protocol::{ast, lexer::*};
 

	
 
    #[test]
 
    fn test_lowercase() {
 
        assert_eq!(lowercase(b'a'), b'a');
 
        assert_eq!(lowercase(b'A'), b'a');
 
        assert_eq!(lowercase(b'z'), b'z');
 
        assert_eq!(lowercase(b'Z'), b'z');
 
    }
 
//     #[test]
 
//     fn test_lowercase() {
 
//         assert_eq!(lowercase(b'a'), b'a');
 
//         assert_eq!(lowercase(b'A'), b'a');
 
//         assert_eq!(lowercase(b'z'), b'z');
 
//         assert_eq!(lowercase(b'Z'), b'z');
 
//     }
 

	
 
    #[test]
 
    fn test_basic_expression() {
 
        let mut h = Heap::new();
 
        let mut is = InputSource::from_string("a+b;").unwrap();
 
        let mut lex = Lexer::new(&mut is);
 
        match lex.consume_expression(&mut h) {
 
            Ok(expr) => {
 
                println!("{:?}", expr);
 
                if let Binary(bin) = &h[expr] {
 
                    if let Variable(left) = &h[bin.left] {
 
                        if let Variable(right) = &h[bin.right] {
 
                            assert_eq!("a", format!("{}", h[left.identifier]));
 
                            assert_eq!("b", format!("{}", h[right.identifier]));
 
                            assert_eq!(Some(b';'), is.next());
 
                            return;
 
                        }
 
                    }
 
                }
 
                assert!(false);
 
            }
 
            Err(err) => {
 
                err.print(&is);
 
                assert!(false);
 
            }
 
        }
 
    }
 
//     #[test]
 
//     fn test_basic_expression() {
 
//         let mut h = Heap::new();
 
//         let mut is = InputSource::from_string("a+b;").unwrap();
 
//         let mut lex = Lexer::new(&mut is);
 
//         match lex.consume_expression(&mut h) {
 
//             Ok(expr) => {
 
//                 println!("{:?}", expr);
 
//                 if let Binary(bin) = &h[expr] {
 
//                     if let Variable(left) = &h[bin.left] {
 
//                         if let Variable(right) = &h[bin.right] {
 
//                             assert_eq!("a", format!("{}", h[left.identifier]));
 
//                             assert_eq!("b", format!("{}", h[right.identifier]));
 
//                             assert_eq!(Some(b';'), is.next());
 
//                             return;
 
//                         }
 
//                     }
 
//                 }
 
//                 assert!(false);
 
//             }
 
//             Err(err) => {
 
//                 err.print(&is);
 
//                 assert!(false);
 
//             }
 
//         }
 
//     }
 

	
 
    #[test]
 
    fn test_paren_expression() {
 
        let mut h = Heap::new();
 
        let mut is = InputSource::from_string("(true)").unwrap();
 
        let mut lex = Lexer::new(&mut is);
 
        match lex.consume_paren_expression(&mut h) {
 
            Ok(expr) => {
 
                println!("{:#?}", expr);
 
                if let Constant(con) = &h[expr] {
 
                    if let ast::Constant::True = con.value {
 
                        return;
 
                    }
 
                }
 
                assert!(false);
 
            }
 
            Err(err) => {
 
                err.print(&is);
 
                assert!(false);
 
            }
 
        }
 
    }
 
//     #[test]
 
//     fn test_paren_expression() {
 
//         let mut h = Heap::new();
 
//         let mut is = InputSource::from_string("(true)").unwrap();
 
//         let mut lex = Lexer::new(&mut is);
 
//         match lex.consume_paren_expression(&mut h) {
 
//             Ok(expr) => {
 
//                 println!("{:#?}", expr);
 
//                 if let Constant(con) = &h[expr] {
 
//                     if let ast::Constant::True = con.value {
 
//                         return;
 
//                     }
 
//                 }
 
//                 assert!(false);
 
//             }
 
//             Err(err) => {
 
//                 err.print(&is);
 
//                 assert!(false);
 
//             }
 
//         }
 
//     }
 

	
 
    #[test]
 
    fn test_expression() {
 
        let mut h = Heap::new();
 
        let mut is = InputSource::from_string("(x(1+5,get(y))-w[5])+z++\n").unwrap();
 
        let mut lex = Lexer::new(&mut is);
 
        match lex.consume_expression(&mut h) {
 
            Ok(expr) => {
 
                println!("{:#?}", expr);
 
            }
 
            Err(err) => {
 
                err.print(&is);
 
                assert!(false);
 
            }
 
        }
 
    }
 
//     #[test]
 
//     fn test_expression() {
 
//         let mut h = Heap::new();
 
//         let mut is = InputSource::from_string("(x(1+5,get(y))-w[5])+z++\n").unwrap();
 
//         let mut lex = Lexer::new(&mut is);
 
//         match lex.consume_expression(&mut h) {
 
//             Ok(expr) => {
 
//                 println!("{:#?}", expr);
 
//             }
 
//             Err(err) => {
 
//                 err.print(&is);
 
//                 assert!(false);
 
//             }
 
//         }
 
//     }
 

	
 
    #[test]
 
    fn test_basic_statement() {
 
        let mut h = Heap::new();
 
        let mut is = InputSource::from_string("while (true) { skip; }").unwrap();
 
        let mut lex = Lexer::new(&mut is);
 
        match lex.consume_statement(&mut h) {
 
            Ok(stmt) => {
 
                println!("{:#?}", stmt);
 
                if let Statement::While(w) = &h[stmt] {
 
                    if let Expression::Constant(_) = h[w.test] {
 
                        if let Statement::Block(_) = h[w.body] {
 
                            return;
 
                        }
 
                    }
 
                }
 
                assert!(false);
 
            }
 
            Err(err) => {
 
                err.print(&is);
 
                assert!(false);
 
            }
 
        }
 
    }
 
//     #[test]
 
//     fn test_basic_statement() {
 
//         let mut h = Heap::new();
 
//         let mut is = InputSource::from_string("while (true) { skip; }").unwrap();
 
//         let mut lex = Lexer::new(&mut is);
 
//         match lex.consume_statement(&mut h) {
 
//             Ok(stmt) => {
 
//                 println!("{:#?}", stmt);
 
//                 if let Statement::While(w) = &h[stmt] {
 
//                     if let Expression::Constant(_) = h[w.test] {
 
//                         if let Statement::Block(_) = h[w.body] {
 
//                             return;
 
//                         }
 
//                     }
 
//                 }
 
//                 assert!(false);
 
//             }
 
//             Err(err) => {
 
//                 err.print(&is);
 
//                 assert!(false);
 
//             }
 
//         }
 
//     }
 

	
 
    #[test]
 
    fn test_statement() {
 
        let mut h = Heap::new();
 
        let mut is = InputSource::from_string(
 
            "label: while (true) { if (x++ > y[0]) break label; else continue; }\n",
 
        )
 
        .unwrap();
 
        let mut lex = Lexer::new(&mut is);
 
        match lex.consume_statement(&mut h) {
 
            Ok(stmt) => {
 
                println!("{:#?}", stmt);
 
            }
 
            Err(err) => {
 
                err.print(&is);
 
                assert!(false);
 
            }
 
        }
 
    }
 
}
 
//     #[test]
 
//     fn test_statement() {
 
//         let mut h = Heap::new();
 
//         let mut is = InputSource::from_string(
 
//             "label: while (true) { if (x++ > y[0]) break label; else continue; }\n",
 
//         )
 
//         .unwrap();
 
//         let mut lex = Lexer::new(&mut is);
 
//         match lex.consume_statement(&mut h) {
 
//             Ok(stmt) => {
 
//                 println!("{:#?}", stmt);
 
//             }
 
//             Err(err) => {
 
//                 err.print(&is);
 
//                 assert!(false);
 
//             }
 
//         }
 
//     }
 
// }
src/protocol/mod.rs
Show inline comments
 
@@ -13,28 +13,35 @@ use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 

	
 
#[derive(serde::Serialize, serde::Deserialize)]
 
pub struct ProtocolDescriptionImpl {
 
pub struct ProtocolDescription {
 
    heap: Heap,
 
    source: InputSource,
 
    root: RootId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct ComponentState {
 
    prompt: Prompt,
 
}
 
pub enum EvalContext<'a> {
 
    Nonsync(&'a mut NonsyncContext<'a>),
 
    Sync(&'a mut SyncContext<'a>),
 
    None,
 
}
 
//////////////////////////////////////////////
 

	
 
impl std::fmt::Debug for ProtocolDescriptionImpl {
 
impl std::fmt::Debug for ProtocolDescription {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "Protocol")
 
    }
 
}
 

	
 
impl ProtocolDescription for ProtocolDescriptionImpl {
 
    type S = ComponentStateImpl;
 

	
 
    fn parse(buffer: &[u8]) -> Result<Self, String> {
 
impl ProtocolDescription {
 
    pub fn parse(buffer: &[u8]) -> Result<Self, String> {
 
        let mut heap = Heap::new();
 
        let mut source = InputSource::from_buffer(buffer).unwrap();
 
        let mut parser = Parser::new(&mut source);
 
        match parser.parse(&mut heap) {
 
            Ok(root) => {
 
                return Ok(ProtocolDescriptionImpl { heap, source, root });
 
                return Ok(ProtocolDescription { heap, source, root });
 
            }
 
            Err(err) => {
 
                let mut vec: Vec<u8> = Vec::new();
 
@@ -43,27 +50,31 @@ impl ProtocolDescription for ProtocolDescriptionImpl {
 
            }
 
        }
 
    }
 
    fn component_polarities(&self, identifier: &[u8]) -> Result<Vec<Polarity>, MainComponentErr> {
 
    pub fn component_polarities(
 
        &self,
 
        identifier: &[u8],
 
    ) -> Result<Vec<Polarity>, AddComponentError> {
 
        use AddComponentError::*;
 
        let h = &self.heap;
 
        let root = &h[self.root];
 
        let def = root.get_definition_ident(h, identifier);
 
        if def.is_none() {
 
            return Err(MainComponentErr::NoSuchComponent);
 
            return Err(NoSuchComponent);
 
        }
 
        let def = &h[def.unwrap()];
 
        if !def.is_component() {
 
            return Err(MainComponentErr::NoSuchComponent);
 
            return Err(NoSuchComponent);
 
        }
 
        for &param in def.parameters().iter() {
 
            let param = &h[param];
 
            let type_annot = &h[param.type_annotation];
 
            if type_annot.the_type.array {
 
                return Err(MainComponentErr::NonPortTypeParameters);
 
                return Err(NonPortTypeParameters);
 
            }
 
            match type_annot.the_type.primitive {
 
                PrimitiveType::Input | PrimitiveType::Output => continue,
 
                _ => {
 
                    return Err(MainComponentErr::NonPortTypeParameters);
 
                    return Err(NonPortTypeParameters);
 
                }
 
            }
 
        }
 
@@ -83,7 +94,7 @@ impl ProtocolDescription for ProtocolDescriptionImpl {
 
        Ok(result)
 
    }
 
    // expects port polarities to be correct
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Port]) -> ComponentStateImpl {
 
    pub fn new_main_component(&self, identifier: &[u8], ports: &[PortId]) -> ComponentState {
 
        let mut args = Vec::new();
 
        for (&x, y) in ports.iter().zip(self.component_polarities(identifier).unwrap()) {
 
            match y {
 
@@ -94,23 +105,16 @@ impl ProtocolDescription for ProtocolDescriptionImpl {
 
        let h = &self.heap;
 
        let root = &h[self.root];
 
        let def = root.get_definition_ident(h, identifier).unwrap();
 
        ComponentStateImpl { prompt: Prompt::new(h, def, &args) }
 
        ComponentState { prompt: Prompt::new(h, def, &args) }
 
    }
 
}
 

	
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct ComponentStateImpl {
 
    prompt: Prompt,
 
}
 
impl ComponentState for ComponentStateImpl {
 
    type D = ProtocolDescriptionImpl;
 

	
 
    fn pre_sync_run<C: MonoContext<D = ProtocolDescriptionImpl, S = Self>>(
 
        &mut self,
 
        context: &mut C,
 
        pd: &ProtocolDescriptionImpl,
 
    ) -> MonoBlocker {
 
        let mut context = EvalContext::Mono(context);
 
impl ComponentState {
 
    pub fn nonsync_run<'a: 'b, 'b>(
 
        &'a mut self,
 
        context: &'b mut NonsyncContext<'b>,
 
        pd: &'a ProtocolDescription,
 
    ) -> NonsyncBlocker {
 
        let mut context = EvalContext::Nonsync(context);
 
        loop {
 
            let result = self.prompt.step(&pd.heap, &mut context);
 
            match result {
 
@@ -118,16 +122,16 @@ impl ComponentState for ComponentStateImpl {
 
                Ok(_) => unreachable!(),
 
                Err(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return MonoBlocker::Inconsistent,
 
                    EvalContinuation::Terminal => return MonoBlocker::ComponentExit,
 
                    EvalContinuation::SyncBlockStart => return MonoBlocker::SyncBlockStart,
 
                    EvalContinuation::Inconsistent => return NonsyncBlocker::Inconsistent,
 
                    EvalContinuation::Terminal => return NonsyncBlocker::ComponentExit,
 
                    EvalContinuation::SyncBlockStart => return NonsyncBlocker::SyncBlockStart,
 
                    // Not possible to end sync block if never entered one
 
                    EvalContinuation::SyncBlockEnd => unreachable!(),
 
                    EvalContinuation::NewComponent(decl, args) => {
 
                        // Look up definition (TODO for now, assume it is a definition)
 
                        let h = &pd.heap;
 
                        let def = h[decl].as_defined().definition;
 
                        let init_state = ComponentStateImpl { prompt: Prompt::new(h, def, &args) };
 
                        let init_state = ComponentState { prompt: Prompt::new(h, def, &args) };
 
                        context.new_component(&args, init_state);
 
                        // Continue stepping
 
                        continue;
 
@@ -141,12 +145,12 @@ impl ComponentState for ComponentStateImpl {
 
        }
 
    }
 

	
 
    fn sync_run<C: PolyContext<D = ProtocolDescriptionImpl>>(
 
        &mut self,
 
        context: &mut C,
 
        pd: &ProtocolDescriptionImpl,
 
    ) -> PolyBlocker {
 
        let mut context = EvalContext::Poly(context);
 
    pub fn sync_run<'a: 'b, 'b>(
 
        &'a mut self,
 
        context: &'b mut SyncContext<'b>,
 
        pd: &'a ProtocolDescription,
 
    ) -> SyncBlocker {
 
        let mut context = EvalContext::Sync(context);
 
        loop {
 
            let result = self.prompt.step(&pd.heap, &mut context);
 
            match result {
 
@@ -154,29 +158,29 @@ impl ComponentState for ComponentStateImpl {
 
                Ok(_) => unreachable!(),
 
                Err(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return PolyBlocker::Inconsistent,
 
                    EvalContinuation::Inconsistent => return SyncBlocker::Inconsistent,
 
                    // First need to exit synchronous block before definition may end
 
                    EvalContinuation::Terminal => unreachable!(),
 
                    // No nested synchronous blocks
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => return PolyBlocker::SyncBlockEnd,
 
                    EvalContinuation::SyncBlockEnd => return SyncBlocker::SyncBlockEnd,
 
                    // Not possible to create component in sync block
 
                    EvalContinuation::NewComponent(_, _) => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => match port {
 
                        Value::Output(OutputValue(port)) => {
 
                            return PolyBlocker::CouldntCheckFiring(port);
 
                            return SyncBlocker::CouldntCheckFiring(port);
 
                        }
 
                        Value::Input(InputValue(port)) => {
 
                            return PolyBlocker::CouldntCheckFiring(port);
 
                            return SyncBlocker::CouldntCheckFiring(port);
 
                        }
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::BlockGet(port) => match port {
 
                        Value::Output(OutputValue(port)) => {
 
                            return PolyBlocker::CouldntReadMsg(port);
 
                            return SyncBlocker::CouldntReadMsg(port);
 
                        }
 
                        Value::Input(InputValue(port)) => {
 
                            return PolyBlocker::CouldntReadMsg(port);
 
                            return SyncBlocker::CouldntReadMsg(port);
 
                        }
 
                        _ => unreachable!(),
 
                    },
 
@@ -195,7 +199,7 @@ impl ComponentState for ComponentStateImpl {
 
                        match message {
 
                            Value::Message(MessageValue(None)) => {
 
                                // Putting a null message is inconsistent
 
                                return PolyBlocker::Inconsistent;
 
                                return SyncBlocker::Inconsistent;
 
                            }
 
                            Value::Message(MessageValue(Some(buffer))) => {
 
                                // Create a copy of the payload
 
@@ -203,31 +207,25 @@ impl ComponentState for ComponentStateImpl {
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        return PolyBlocker::PutMsg(value, payload);
 
                        return SyncBlocker::PutMsg(value, payload);
 
                    }
 
                },
 
            }
 
        }
 
    }
 
}
 

	
 
pub enum EvalContext<'a> {
 
    Mono(&'a mut dyn MonoContext<D = ProtocolDescriptionImpl, S = ComponentStateImpl>),
 
    Poly(&'a mut dyn PolyContext<D = ProtocolDescriptionImpl>),
 
    None,
 
}
 
impl EvalContext<'_> {
 
    // fn random(&mut self) -> LongValue {
 
    //     match self {
 
    //         EvalContext::None => unreachable!(),
 
    //         EvalContext::Mono(_context) => todo!(),
 
    //         EvalContext::Poly(_) => unreachable!(),
 
    //         EvalContext::Nonsync(_context) => todo!(),
 
    //         EvalContext::Sync(_) => unreachable!(),
 
    //     }
 
    // }
 
    fn new_component(&mut self, args: &[Value], init_state: ComponentStateImpl) -> () {
 
    fn new_component(&mut self, args: &[Value], init_state: ComponentState) -> () {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => {
 
            EvalContext::Nonsync(context) => {
 
                let mut moved_ports = HashSet::new();
 
                for arg in args.iter() {
 
                    match arg {
 
@@ -242,26 +240,26 @@ impl EvalContext<'_> {
 
                }
 
                context.new_component(moved_ports, init_state)
 
            }
 
            EvalContext::Poly(_) => unreachable!(),
 
            EvalContext::Sync(_) => unreachable!(),
 
        }
 
    }
 
    fn new_channel(&mut self) -> [Value; 2] {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => {
 
            EvalContext::Nonsync(context) => {
 
                let [from, to] = context.new_channel();
 
                let from = Value::Output(OutputValue(from));
 
                let to = Value::Input(InputValue(to));
 
                return [from, to];
 
            }
 
            EvalContext::Poly(_) => unreachable!(),
 
            EvalContext::Sync(_) => unreachable!(),
 
        }
 
    }
 
    fn fires(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(_) => unreachable!(),
 
            EvalContext::Poly(context) => match port {
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(context) => match port {
 
                Value::Output(OutputValue(port)) => context.is_firing(port).map(Value::from),
 
                Value::Input(InputValue(port)) => context.is_firing(port).map(Value::from),
 
                _ => unreachable!(),
 
@@ -271,8 +269,8 @@ impl EvalContext<'_> {
 
    fn get(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(_) => unreachable!(),
 
            EvalContext::Poly(context) => match port {
 
            EvalContext::Nonsync(_) => unreachable!(),
 
            EvalContext::Sync(context) => match port {
 
                Value::Output(OutputValue(port)) => {
 
                    context.read_msg(port).map(Value::receive_message)
 
                }
src/protocol/parser.rs
Show inline comments
 
@@ -1575,7 +1575,9 @@ struct IndexableExpressions {
 
}
 

	
 
impl IndexableExpressions {
 
    fn new() -> Self {  IndexableExpressions { indexable: false }  }
 
    fn new() -> Self {
 
        IndexableExpressions { indexable: false }
 
    }
 
    fn error(&self, position: InputPosition) -> VisitorResult {
 
        Err(ParseError::new(position, "Unindexable expression"))
 
    }
 
@@ -1818,66 +1820,66 @@ impl<'a> Parser<'a> {
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    extern crate test_generator;
 

	
 
    use std::fs::File;
 
    use std::io::Read;
 
    use std::path::Path;
 

	
 
    use test_generator::test_resources;
 

	
 
    use super::*;
 

	
 
    #[test_resources("testdata/parser/positive/*.pdl")]
 
    fn batch1(resource: &str) {
 
        let path = Path::new(resource);
 
        let mut heap = Heap::new();
 
        let mut source = InputSource::from_file(&path).unwrap();
 
        let mut parser = Parser::new(&mut source);
 
        match parser.parse(&mut heap) {
 
            Ok(_) => {}
 
            Err(err) => {
 
                println!("{}", err.display(&source));
 
                println!("{:?}", err);
 
                assert!(false);
 
            }
 
        }
 
    }
 

	
 
    #[test_resources("testdata/parser/negative/*.pdl")]
 
    fn batch2(resource: &str) {
 
        let path = Path::new(resource);
 
        let expect = path.with_extension("txt");
 
        let mut heap = Heap::new();
 
        let mut source = InputSource::from_file(&path).unwrap();
 
        let mut parser = Parser::new(&mut source);
 
        match parser.parse(&mut heap) {
 
            Ok(pd) => {
 
                println!("{:?}", heap[pd]);
 
                println!("Expected parse error:");
 

	
 
                let mut cev: Vec<u8> = Vec::new();
 
                let mut f = File::open(expect).unwrap();
 
                f.read_to_end(&mut cev).unwrap();
 
                println!("{}", String::from_utf8_lossy(&cev));
 
                assert!(false);
 
            }
 
            Err(err) => {
 
                println!("{:?}", err);
 

	
 
                let mut vec: Vec<u8> = Vec::new();
 
                err.write(&source, &mut vec).unwrap();
 
                println!("{}", String::from_utf8_lossy(&vec));
 

	
 
                let mut cev: Vec<u8> = Vec::new();
 
                let mut f = File::open(expect).unwrap();
 
                f.read_to_end(&mut cev).unwrap();
 
                println!("{}", String::from_utf8_lossy(&cev));
 

	
 
                assert_eq!(vec, cev);
 
            }
 
        }
 
    }
 
}
 
// #[cfg(test)]
 
// mod tests {
 
//     extern crate test_generator;
 

	
 
//     use std::fs::File;
 
//     use std::io::Read;
 
//     use std::path::Path;
 

	
 
//     use test_generator::test_resources;
 

	
 
//     use super::*;
 

	
 
//     #[test_resources("testdata/parser/positive/*.pdl")]
 
//     fn batch1(resource: &str) {
 
//         let path = Path::new(resource);
 
//         let mut heap = Heap::new();
 
//         let mut source = InputSource::from_file(&path).unwrap();
 
//         let mut parser = Parser::new(&mut source);
 
//         match parser.parse(&mut heap) {
 
//             Ok(_) => {}
 
//             Err(err) => {
 
//                 println!("{}", err.display(&source));
 
//                 println!("{:?}", err);
 
//                 assert!(false);
 
//             }
 
//         }
 
//     }
 

	
 
//     #[test_resources("testdata/parser/negative/*.pdl")]
 
//     fn batch2(resource: &str) {
 
//         let path = Path::new(resource);
 
//         let expect = path.with_extension("txt");
 
//         let mut heap = Heap::new();
 
//         let mut source = InputSource::from_file(&path).unwrap();
 
//         let mut parser = Parser::new(&mut source);
 
//         match parser.parse(&mut heap) {
 
//             Ok(pd) => {
 
//                 println!("{:?}", heap[pd]);
 
//                 println!("Expected parse error:");
 

	
 
//                 let mut cev: Vec<u8> = Vec::new();
 
//                 let mut f = File::open(expect).unwrap();
 
//                 f.read_to_end(&mut cev).unwrap();
 
//                 println!("{}", String::from_utf8_lossy(&cev));
 
//                 assert!(false);
 
//             }
 
//             Err(err) => {
 
//                 println!("{:?}", err);
 

	
 
//                 let mut vec: Vec<u8> = Vec::new();
 
//                 err.write(&source, &mut vec).unwrap();
 
//                 println!("{}", String::from_utf8_lossy(&vec));
 

	
 
//                 let mut cev: Vec<u8> = Vec::new();
 
//                 let mut f = File::open(expect).unwrap();
 
//                 f.read_to_end(&mut cev).unwrap();
 
//                 println!("{}", String::from_utf8_lossy(&cev));
 

	
 
//                 assert_eq!(vec, cev);
 
//             }
 
//         }
 
//     }
 
// }
src/runtime/actors.rs
Show inline comments
 
@@ -3,37 +3,37 @@ use crate::runtime::{endpoint::*, *};
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct MonoN {
 
    pub ports: HashSet<Port>,
 
    pub result: Option<(usize, HashMap<Port, Payload>)>,
 
    pub ports: HashSet<PortId>,
 
    pub result: Option<(usize, HashMap<PortId, Payload>)>,
 
}
 
#[derive(Debug)]
 
pub(crate) struct PolyN {
 
    pub ports: HashSet<Port>,
 
    pub ports: HashSet<PortId>,
 
    pub branches: HashMap<Predicate, BranchN>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct BranchN {
 
    pub to_get: HashSet<Port>,
 
    pub gotten: HashMap<Port, Payload>,
 
    pub to_get: HashSet<PortId>,
 
    pub gotten: HashMap<PortId, Payload>,
 
    pub sync_batch_index: usize,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct MonoP {
 
    pub state: ProtocolS,
 
    pub ports: HashSet<Port>,
 
    pub ports: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
pub(crate) struct PolyP {
 
    pub incomplete: HashMap<Predicate, BranchP>,
 
    pub complete: HashMap<Predicate, BranchP>,
 
    pub ports: HashSet<Port>,
 
    pub ports: HashSet<PortId>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct BranchP {
 
    pub blocking_on: Option<Port>,
 
    pub outbox: HashMap<Port, Payload>,
 
    pub inbox: HashMap<Port, Payload>,
 
    pub blocking_on: Option<PortId>,
 
    pub outbox: HashMap<PortId, Payload>,
 
    pub inbox: HashMap<PortId, Payload>,
 
    pub state: ProtocolS,
 
}
 

	
 
@@ -211,7 +211,7 @@ impl PolyP {
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        port: Port,
 
        port: PortId,
 
        payload_predicate: Predicate,
 
        payload: Payload,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
@@ -366,7 +366,7 @@ impl PolyP {
 
impl PolyN {
 
    pub fn sync_recv(
 
        &mut self,
 
        port: Port,
 
        port: PortId,
 
        logger: &mut String,
 
        payload: Payload,
 
        payload_predicate: Predicate,
src/runtime/communication.rs
Show inline comments
 
@@ -201,7 +201,7 @@ impl Controller {
 
        //    this is needed during the event loop to determine which actor
 
        //    should receive the incoming message.
 
        //    TODO: store and update this mapping rather than rebuilding it each round.
 
        let port_to_holder: HashMap<Port, PolyId> = {
 
        let port_to_holder: HashMap<PortId, PolyId> = {
 
            use PolyId::*;
 
            let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N));
 
            let p = self
 
@@ -566,7 +566,7 @@ impl From<EndpointErr> for SyncErr {
 
impl MonoContext for MonoPContext<'_> {
 
    type D = ProtocolD;
 
    type S = ProtocolS;
 
    fn new_component(&mut self, moved_ports: HashSet<Port>, init_state: Self::S) {
 
    fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: Self::S) {
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_component with ports {:?}!",
 
@@ -579,7 +579,7 @@ impl MonoContext for MonoPContext<'_> {
 
            panic!("MachineP attempting to move alien port!");
 
        }
 
    }
 
    fn new_channel(&mut self) -> [Port; 2] {
 
    fn new_channel(&mut self) -> [PortId; 2] {
 
        let [a, b] = Endpoint::new_memory_pair();
 
        let channel_id = self.inner.channel_id_stream.next();
 

	
 
@@ -588,7 +588,7 @@ impl MonoContext for MonoPContext<'_> {
 
                EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint };
 
            let port = self.inner.endpoint_exts.alloc(endpoint_ext);
 
            let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint;
 
            let token = Port::to_token(port);
 
            let token = PortId::to_token(port);
 
            self.inner
 
                .messenger_state
 
                .poll
 
@@ -713,7 +713,7 @@ impl SolutionStorage {
 
impl PolyContext for BranchPContext<'_, '_> {
 
    type D = ProtocolD;
 

	
 
    fn is_firing(&mut self, port: Port) -> Option<bool> {
 
    fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        assert!(self.ports.contains(&port));
 
        let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id;
 
        let val = self.predicate.query(channel_id);
 
@@ -725,7 +725,7 @@ impl PolyContext for BranchPContext<'_, '_> {
 
        );
 
        val
 
    }
 
    fn read_msg(&mut self, port: Port) -> Option<&Payload> {
 
    fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        assert!(self.ports.contains(&port));
 
        let val = self.inbox.get(&port);
 
        log!(
src/runtime/errors.rs
Show inline comments
 
@@ -79,7 +79,7 @@ pub enum EvalErr {
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum MessengerRecvErr {
 
    PollingFailed,
 
    EndpointErr(Port, EndpointErr),
 
    EndpointErr(PortId, EndpointErr),
 
}
 
impl From<MainComponentErr> for ConfigErr {
 
    fn from(e: MainComponentErr) -> Self {
src/runtime/mod.rs
Show inline comments
 
#[cfg(feature = "ffi")]
 
pub mod ffi;
 

	
 
mod actors;
 
pub(crate) mod communication;
 
pub(crate) mod connector;
 
pub(crate) mod endpoint;
 
pub mod errors;
 
mod serde;
 
pub(crate) mod setup;
 
// #[cfg(feature = "ffi")]
 
// pub mod ffi;
 

	
 
// mod actors;
 
// pub(crate) mod communication;
 
// pub(crate) mod connector;
 
// pub(crate) mod endpoint;
 
// pub mod errors;
 
// mod serde;
 
mod my_tests;
 
mod setup2;
 
// pub(crate) mod setup;
 
// mod v2;
 

	
 
pub(crate) type ProtocolD = crate::protocol::ProtocolDescriptionImpl;
 
pub(crate) type ProtocolS = crate::protocol::ComponentStateImpl;
 

	
 
use crate::common::*;
 
use actors::*;
 
use endpoint::*;
 
use errors::*;
 

	
 
// use actors::*;
 
// use endpoint::*;
 
// use errors::*;
 

	
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub(crate) enum Decision {
 
    Failure,
 
    Success(Predicate),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub(crate) enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub(crate) enum SetupMsg {
 
    // sent by the passive endpoint to the active endpoint
 
    // MyPortInfo(MyPortInfo),
 
    LeaderEcho { maybe_leader: ControllerId },
 
    LeaderAnnounce { leader: ControllerId },
 
    YouAreMyParent,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub(crate) struct CommMsg {
 
    pub round_index: usize,
 
    pub contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub(crate) enum CommMsgContents {
 
    SendPayload { payload_predicate: Predicate, payload: Payload },
 
    Elaborate { partial_oracle: Predicate }, // SINKWARD
 
    Failure,                                 // SINKWARD
 
    Announce { decision: Decision },         // SINKAWAYS
 
}
 
#[derive(Debug, PartialEq)]
 
pub(crate) enum CommonSatResult {
 
    FormerNotLatter,
 
@@ -26,543 +60,753 @@ pub(crate) enum CommonSatResult {
 
    New(Predicate),
 
    Nonexistant,
 
}
 

	
 
#[derive(Clone, Eq, PartialEq, Hash)]
 
pub(crate) struct Predicate {
 
    pub assigned: BTreeMap<ChannelId, bool>,
 
pub struct Endpoint {
 
    inbox: Vec<u8>,
 
    stream: mio07::net::TcpStream,
 
}
 

	
 
#[derive(Debug, Default)]
 
struct SyncBatch {
 
    puts: HashMap<Port, Payload>,
 
    gets: HashSet<Port>,
 
pub struct IntStream {
 
    next: u32,
 
}
 

	
 
#[derive(Debug)]
 
pub enum Connector {
 
    Unconfigured(Unconfigured),
 
    Configured(Configured),
 
    Connected(Connected), // TODO consider boxing. currently takes up a lot of stack space
 
pub struct IdManager {
 
    controller_id: ControllerId,
 
    port_suffix_stream: IntStream,
 
}
 
#[derive(Debug)]
 
pub struct Unconfigured {
 
    pub controller_id: ControllerId,
 
pub struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
pub struct Configured {
 
    controller_id: ControllerId,
 
    polarities: Vec<Polarity>,
 
    bindings: HashMap<usize, PortBinding>,
 
    protocol_description: Arc<ProtocolD>,
 
    main_component: Vec<u8>,
 
    logger: String,
 
pub enum InpRoute {
 
    NativeComponent,
 
    ProtoComponent { index: usize },
 
    Endpoint { index: usize },
 
}
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_interface: Vec<(Port, Polarity)>,
 
    sync_batches: Vec<SyncBatch>,
 
    // controller is cooperatively scheduled with the native application
 
    // (except for transport layer behind Endpoints, which are managed by the OS)
 
    // control flow is passed to the controller during methods on Connector (primarily, connect and sync).
 
    controller: Controller,
 
pub trait Logger: Debug {
 
    fn line_writer(&mut self) -> &mut dyn std::fmt::Write;
 
    fn dump_log(&self, w: &mut dyn std::io::Write);
 
}
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortBinding {
 
    Native,
 
    Active(SocketAddr),
 
    Passive(SocketAddr),
 
#[derive(Debug, Clone)]
 
pub struct EndpointSetup {
 
    pub polarity: Polarity,
 
    pub sock_addr: SocketAddr,
 
    pub is_active: bool,
 
}
 

	
 
#[derive(Debug)]
 
struct Arena<T> {
 
    storage: Vec<T>,
 
pub struct EndpointExt {
 
    endpoint: Endpoint,
 
    inp_for_emerging_msgs: PortId,
 
}
 

	
 
#[derive(Debug)]
 
struct ReceivedMsg {
 
    recipient: Port,
 
    msg: Msg,
 
pub struct Neighborhood {
 
    parent: Option<usize>,
 
    children: Vec<usize>, // ordered, deduplicated
 
}
 

	
 
#[derive(Debug)]
 
struct MessengerState {
 
    poll: Poll,
 
    events: Events,
 
    delayed: Vec<ReceivedMsg>,
 
    undelayed: Vec<ReceivedMsg>,
 
    polled_undrained: IndexSet<Port>,
 
pub struct MemInMsg {
 
    inp: PortId,
 
    msg: Payload,
 
}
 
#[derive(Debug)]
 
struct ChannelIdStream {
 
    controller_id: ControllerId,
 
    next_channel_index: ChannelIndex,
 
pub struct EndpointPoller {
 
    poll: mio07::Poll,
 
    events: mio07::Events,
 
    undrained_endpoints: HashSet<usize>,
 
    delayed_inp_messages: Vec<(PortId, Msg)>,
 
}
 

	
 
#[derive(Debug)]
 
struct Controller {
 
    protocol_description: Arc<ProtocolD>,
 
    inner: ControllerInner,
 
    ephemeral: ControllerEphemeral,
 
    unrecoverable_error: Option<SyncErr>, // prevents future calls to Sync
 
pub struct Connector {
 
    logger: Box<dyn Logger>,
 
    proto_description: Arc<ProtocolDescription>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    proto_components: Vec<ProtoComponent>,
 
    outp_to_inp: HashMap<PortId, PortId>,
 
    inp_to_route: HashMap<PortId, InpRoute>,
 
    phased: ConnectorPhased,
 
}
 
#[derive(Debug)]
 
struct ControllerInner {
 
    round_index: usize,
 
    channel_id_stream: ChannelIdStream,
 
    endpoint_exts: Arena<EndpointExt>,
 
    messenger_state: MessengerState,
 
    mono_n: MonoN,       // state at next round start
 
    mono_ps: Vec<MonoP>, // state at next round start
 
    family: ControllerFamily,
 
    logger: String,
 
}
 

	
 
/// This structure has its state entirely reset between synchronous rounds
 
#[derive(Debug, Default)]
 
struct ControllerEphemeral {
 
    solution_storage: SolutionStorage,
 
    poly_n: Option<PolyN>,
 
    poly_ps: Vec<PolyP>,
 
    mono_ps: Vec<MonoP>,
 
    port_to_holder: HashMap<Port, PolyId>,
 
pub enum ConnectorPhased {
 
    Setup {
 
        endpoint_setups: Vec<(PortId, EndpointSetup)>,
 
        surplus_sockets: u16,
 
    },
 
    Communication {
 
        endpoint_poller: EndpointPoller,
 
        endpoint_exts: Vec<EndpointExt>,
 
        neighborhood: Neighborhood,
 
        mem_inbox: Vec<MemInMsg>,
 
    },
 
}
 

	
 
#[derive(Debug)]
 
struct ControllerFamily {
 
    parent_port: Option<Port>,
 
    children_ports: Vec<Port>,
 
pub struct StringLogger(ControllerId, String);
 
#[derive(Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub(crate) struct Predicate {
 
    pub assigned: BTreeMap<PortId, bool>,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncRunResult {
 
    BlockingForRecv,
 
    AllBranchesComplete,
 
    NoBranches,
 
#[derive(Debug, Default)]
 
struct SyncBatch {
 
    puts: HashMap<PortId, Payload>,
 
    gets: HashSet<PortId>,
 
}
 

	
 
// Used to identify poly actors
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
enum PolyId {
 
    N,
 
    P { index: usize },
 
pub struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
pub(crate) enum SubtreeId {
 
    PolyN,
 
    PolyP { index: usize },
 
    ChildController { port: Port },
 
pub enum EndpointRecvErr {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
}
 

	
 
pub(crate) struct MonoPContext<'a> {
 
    inner: &'a mut ControllerInner,
 
    ports: &'a mut HashSet<Port>,
 
    mono_ps: &'a mut Vec<MonoP>,
 
pub struct SyncContext<'a> {
 
    connector: &'a mut Connector,
 
}
 
pub(crate) struct PolyPContext<'a> {
 
    my_subtree_id: SubtreeId,
 
    inner: &'a mut ControllerInner,
 
    solution_storage: &'a mut SolutionStorage,
 
pub struct NonsyncContext<'a> {
 
    connector: &'a mut Connector,
 
}
 
impl PolyPContext<'_> {
 
    #[inline(always)]
 
    fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> {
 
        let Self { solution_storage, my_subtree_id, inner } = self;
 
        PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner }
 
////////////////
 
impl Debug for Endpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
    }
 
}
 
struct BranchPContext<'m, 'r> {
 
    m_ctx: PolyPContext<'m>,
 
    ports: &'r HashSet<Port>,
 
    predicate: &'r Predicate,
 
    inbox: &'r HashMap<Port, Payload>,
 
}
 

	
 
#[derive(Default)]
 
pub(crate) struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 

	
 
trait Messengerlike {
 
    fn get_state_mut(&mut self) -> &mut MessengerState;
 
    fn get_endpoint_mut(&mut self, eport: Port) -> &mut Endpoint;
 

	
 
    fn delay(&mut self, received: ReceivedMsg) {
 
        self.get_state_mut().delayed.push(received);
 
impl NonsyncContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: ComponentState) {
 
        todo!()
 
    }
 
    fn undelay_all(&mut self) {
 
        let MessengerState { delayed, undelayed, .. } = self.get_state_mut();
 
        undelayed.extend(delayed.drain(..))
 
    pub fn new_channel(&mut self) -> [PortId; 2] {
 
        todo!()
 
    }
 

	
 
    fn send(&mut self, to: Port, msg: Msg) -> Result<(), EndpointErr> {
 
        self.get_endpoint_mut(to).send(msg)
 
    }
 

	
 
    // attempt to receive a message from one of the endpoints before the deadline
 
    fn recv(&mut self, deadline: Instant) -> Result<Option<ReceivedMsg>, MessengerRecvErr> {
 
        // try get something buffered
 
        if let Some(x) = self.get_state_mut().undelayed.pop() {
 
            return Ok(Some(x));
 
        }
 

	
 
        loop {
 
            // polled_undrained may not be empty
 
            while let Some(eport) = self.get_state_mut().polled_undrained.pop() {
 
                if let Some(msg) = self
 
                    .get_endpoint_mut(eport)
 
                    .recv()
 
                    .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))?
 
                {
 
                    // this endpoint MAY still have messages! check again in future
 
                    self.get_state_mut().polled_undrained.insert(eport);
 
                    return Ok(Some(ReceivedMsg { recipient: eport, msg }));
 
                }
 
            }
 

	
 
            let state = self.get_state_mut();
 
            match state.poll_events(deadline) {
 
                Ok(()) => {
 
                    for e in state.events.iter() {
 
                        state.polled_undrained.insert(Port::from_token(e.token()));
 
                    }
 
                }
 
                Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed),
 
                Err(PollDeadlineErr::Timeout) => return Ok(None),
 
            }
 
        }
 
}
 
impl SyncContext<'_> {
 
    pub fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        todo!()
 
    }
 
    fn recv_blocking(&mut self) -> Result<ReceivedMsg, MessengerRecvErr> {
 
        // try get something buffered
 
        if let Some(x) = self.get_state_mut().undelayed.pop() {
 
            return Ok(x);
 
        }
 

	
 
        loop {
 
            // polled_undrained may not be empty
 
            while let Some(eport) = self.get_state_mut().polled_undrained.pop() {
 
                if let Some(msg) = self
 
                    .get_endpoint_mut(eport)
 
                    .recv()
 
                    .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))?
 
                {
 
                    // this endpoint MAY still have messages! check again in future
 
                    self.get_state_mut().polled_undrained.insert(eport);
 
                    return Ok(ReceivedMsg { recipient: eport, msg });
 
                }
 
            }
 

	
 
            let state = self.get_state_mut();
 

	
 
            state
 
                .poll
 
                .poll(&mut state.events, None)
 
                .map_err(|_| MessengerRecvErr::PollingFailed)?;
 
            for e in state.events.iter() {
 
                state.polled_undrained.insert(Port::from_token(e.token()));
 
            }
 
        }
 
    pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        todo!()
 
    }
 
}
 

	
 
/////////////////////////////////
 
impl Debug for SolutionStorage {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.pad("Solutions: [")?;
 
        for (subtree_id, &index) in self.subtree_id_to_index.iter() {
 
            let sols = &self.subtree_solutions[index];
 
            f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?;
 
        }
 
        f.pad("]")
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
    }
 
}
 
impl From<EvalErr> for SyncErr {
 
    fn from(e: EvalErr) -> SyncErr {
 
        SyncErr::EvalErr(e)
 
impl<R: Read> MonitoredReader<R> {
 
    pub fn bytes_read(&self) -> usize {
 
        self.bytes
 
    }
 
}
 
impl From<MessengerRecvErr> for SyncErr {
 
    fn from(e: MessengerRecvErr) -> SyncErr {
 
        SyncErr::MessengerRecvErr(e)
 
impl<R: Read> Read for MonitoredReader<R> {
 
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 
        let n = self.r.read(buf)?;
 
        self.bytes += n;
 
        Ok(n)
 
    }
 
}
 
impl From<MessengerRecvErr> for ConnectErr {
 
    fn from(e: MessengerRecvErr) -> ConnectErr {
 
        ConnectErr::MessengerRecvErr(e)
 
impl Into<Msg> for SetupMsg {
 
    fn into(self) -> Msg {
 
        Msg::SetupMsg(self)
 
    }
 
}
 
impl<T> Default for Arena<T> {
 
    fn default() -> Self {
 
        Self { storage: vec![] }
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.pad("{")?;
 
        for (port, &v) in self.assigned.iter() {
 
            f.write_fmt(format_args!("{:?}=>{}, ", port, if v { 'T' } else { 'F' }))?
 
        }
 
        f.pad("}")
 
    }
 
}
 
impl<T> Arena<T> {
 
    pub fn alloc(&mut self, t: T) -> Port {
 
        self.storage.push(t);
 
        let l: u32 = self.storage.len().try_into().unwrap();
 
        Port::from_raw(l - 1u32)
 
    }
 
    pub fn get(&self, key: Port) -> Option<&T> {
 
        self.storage.get(key.to_raw() as usize)
 
    }
 
    pub fn get_mut(&mut self, key: Port) -> Option<&mut T> {
 
        self.storage.get_mut(key.to_raw() as usize)
 
    }
 
    pub fn type_convert<X>(self, f: impl FnMut((Port, T)) -> X) -> Arena<X> {
 
        Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() }
 
    }
 
    pub fn iter(&self) -> impl Iterator<Item = (Port, &T)> {
 
        self.keyspace().zip(self.storage.iter())
 
    }
 
    pub fn len(&self) -> usize {
 
        self.storage.len()
 
    }
 
    pub fn keyspace(&self) -> impl Iterator<Item = Port> {
 
        (0u32..self.storage.len().try_into().unwrap()).map(Port::from_raw)
 
impl StringLogger {
 
    pub fn new(controller_id: ControllerId) -> Self {
 
        Self(controller_id, String::default())
 
    }
 
}
 

	
 
impl ChannelIdStream {
 
    fn new(controller_id: ControllerId) -> Self {
 
        Self { controller_id, next_channel_index: 0 }
 
impl Logger for StringLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::fmt::Write {
 
        use std::fmt::Write;
 
        let _ = write!(&mut self.1, "\nCID({}): ", self.0);
 
        self
 
    }
 
    fn next(&mut self) -> ChannelId {
 
        self.next_channel_index += 1;
 
        ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 }
 
    fn dump_log(&self, w: &mut dyn std::io::Write) {
 
        let _ = w.write(self.1.as_bytes());
 
    }
 
}
 

	
 
impl MessengerState {
 
    // does NOT guarantee that events is non-empty
 
    fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> {
 
        use PollDeadlineErr::*;
 
        self.events.clear();
 
        let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
        self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?;
 
        Ok(())
 
impl std::fmt::Write for StringLogger {
 
    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
 
        self.1.write_str(s)
 
    }
 
}
 
impl From<PollDeadlineErr> for ConnectErr {
 
    fn from(e: PollDeadlineErr) -> ConnectErr {
 
        match e {
 
            PollDeadlineErr::Timeout => ConnectErr::Timeout,
 
            PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed,
 
impl IntStream {
 
    fn next(&mut self) -> u32 {
 
        if self.next == u32::MAX {
 
            panic!("NO NEXT!")
 
        }
 
        self.next += 1;
 
        self.next - 1
 
    }
 
}
 

	
 
impl std::ops::Not for Polarity {
 
    type Output = Self;
 
    fn not(self) -> Self::Output {
 
        use Polarity::*;
 
        match self {
 
            Putter => Getter,
 
            Getter => Putter,
 
        }
 
impl IdManager {
 
    fn next_port(&mut self) -> PortId {
 
        let port_suffix = self.port_suffix_stream.next();
 
        let controller_id = self.controller_id;
 
        PortId { controller_id, port_index: port_suffix }
 
    }
 
}
 

	
 
impl Predicate {
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn satisfies(&self, other: &Self) -> bool {
 
        let mut s_it = self.assigned.iter();
 
        let mut s = if let Some(s) = s_it.next() {
 
            s
 
        } else {
 
            return other.assigned.is_empty();
 
        };
 
        for (oid, ob) in other.assigned.iter() {
 
            while s.0 < oid {
 
                s = if let Some(s) = s_it.next() {
 
                    s
 
                } else {
 
                    return false;
 
                };
 
            }
 
            if s.0 > oid || s.1 != ob {
 
                return false;
 
    fn new(controller_id: ControllerId) -> Self {
 
        Self { controller_id, port_suffix_stream: Default::default() }
 
    }
 
}
 
impl Endpoint {
 
    fn try_recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, EndpointRecvErr> {
 
        use EndpointRecvErr::*;
 
        // populate inbox as much as possible
 
        'read_loop: loop {
 
            match self.stream.read_to_end(&mut self.inbox) {
 
                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(BrokenEndpoint),
 
            }
 
        }
 
        true
 
    }
 

	
 
    /// Given self and other, two predicates, return the most general Predicate possible, N
 
    /// such that n.satisfies(self) && n.satisfies(other).
 
    /// If none exists Nonexistant is returned.
 
    /// If the resulting predicate is equivlanet to self, other, or both,
 
    /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
    /// otherwise New(N) is returned.
 
    pub fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
        use CommonSatResult::*;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // lists of assignments in self but not other and vice versa.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
            match [s, o] {
 
                [None, None] => break,
 
                [None, Some(x)] => {
 
                    o_not_s.push(x);
 
                    o_not_s.extend(o_it);
 
                    break;
 
                }
 
                [Some(x), None] => {
 
                    s_not_o.push(x);
 
                    s_not_o.extend(s_it);
 
                    break;
 
                }
 
                [Some((sid, sb)), Some((oid, ob))] => {
 
                    if sid < oid {
 
                        // o is missing this element
 
                        s_not_o.push((sid, sb));
 
                        s = s_it.next();
 
                    } else if sid > oid {
 
                        // s is missing this element
 
                        o_not_s.push((oid, ob));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        match bincode::deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                Ok(Some(msg))
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Equivalent,       // ... equivalent to both.
 
            [false, true] => FormerNotLatter, // ... equivalent to self.
 
            [true, false] => LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
                }
 
                New(new)
 
            }
 
        }
 
    }
 

	
 
    pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = ChannelId> + '_ {
 
        self.assigned
 
            .iter()
 
            .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    }
 

	
 
    pub fn batch_assign_nones(
 
        &mut self,
 
        channel_ids: impl Iterator<Item = ChannelId>,
 
        value: bool,
 
    ) {
 
        for channel_id in channel_ids {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
 
    }
 
    pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option<bool> {
 
        self.assigned.insert(channel_id, value)
 
    }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
            match res.assigned.insert(channel_id, assignment_1) {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
                _ => Err(MalformedMessage),
 
                // println!("SERDE ERRKIND {:?}", e);
 
                // Err(MalformedMessage)
 
            },
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, x: ChannelId) -> Option<bool> {
 
        self.assigned.get(&x).copied()
 
    }
 
    pub fn new_trivial() -> Self {
 
        Self { assigned: Default::default() }
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.pad("{")?;
 
        for (ChannelId { controller_id, channel_index }, &v) in self.assigned.iter() {
 
            f.write_fmt(format_args!(
 
                "({:?},{:?})=>{}, ",
 
                controller_id,
 
                channel_index,
 
                if v { 'T' } else { 'F' }
 
            ))?
 
        }
 
        f.pad("}")
 
    }
 
}
 

	
 
#[test]
 
fn pred_sat() {
 
    use maplit::btreemap;
 
    let mut c = ChannelIdStream::new(0);
 
    let ch = std::iter::repeat_with(move || c.next()).take(5).collect::<Vec<_>>();
 
    let p = Predicate::new_trivial();
 
    let p_0t = Predicate { assigned: btreemap! { ch[0] => true } };
 
    let p_0f = Predicate { assigned: btreemap! { ch[0] => false } };
 
    let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } };
 
    let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } };
 

	
 
    assert!(p.satisfies(&p));
 
    assert!(p_0t.satisfies(&p_0t));
 
    assert!(p_0f.satisfies(&p_0f));
 
    assert!(p_0f_3f.satisfies(&p_0f_3f));
 
    assert!(p_0f_3t.satisfies(&p_0f_3t));
 

	
 
    assert!(p_0t.satisfies(&p));
 
    assert!(p_0f.satisfies(&p));
 
    assert!(p_0f_3f.satisfies(&p_0f));
 
    assert!(p_0f_3t.satisfies(&p_0f));
 

	
 
    assert!(!p.satisfies(&p_0t));
 
    assert!(!p.satisfies(&p_0f));
 
    assert!(!p_0f.satisfies(&p_0t));
 
    assert!(!p_0t.satisfies(&p_0f));
 
    assert!(!p_0f_3f.satisfies(&p_0f_3t));
 
    assert!(!p_0f_3t.satisfies(&p_0f_3f));
 
    assert!(!p_0t.satisfies(&p_0f_3f));
 
    assert!(!p_0f.satisfies(&p_0f_3f));
 
    assert!(!p_0t.satisfies(&p_0f_3t));
 
    assert!(!p_0f.satisfies(&p_0f_3t));
 
}
 

	
 
#[test]
 
fn pred_common_sat() {
 
    use maplit::btreemap;
 
    use CommonSatResult::*;
 

	
 
    let mut c = ChannelIdStream::new(0);
 
    let ch = std::iter::repeat_with(move || c.next()).take(5).collect::<Vec<_>>();
 
    let p = Predicate::new_trivial();
 
    let p_0t = Predicate { assigned: btreemap! { ch[0] => true } };
 
    let p_0f = Predicate { assigned: btreemap! { ch[0] => false } };
 
    let p_3f = Predicate { assigned: btreemap! { ch[3] => false } };
 
    let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } };
 
    let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } };
 

	
 
    assert_eq![p.common_satisfier(&p), Equivalent];
 
    assert_eq![p_0t.common_satisfier(&p_0t), Equivalent];
 

	
 
    assert_eq![p.common_satisfier(&p_0t), LatterNotFormer];
 
    assert_eq![p_0t.common_satisfier(&p), FormerNotLatter];
 

	
 
    assert_eq![p_0t.common_satisfier(&p_0f), Nonexistant];
 
    assert_eq![p_0f_3t.common_satisfier(&p_0f_3f), Nonexistant];
 
    assert_eq![p_0f_3t.common_satisfier(&p_3f), Nonexistant];
 
    assert_eq![p_3f.common_satisfier(&p_0f_3t), Nonexistant];
 

	
 
    assert_eq![p_0f.common_satisfier(&p_3f), New(p_0f_3f)];
 
}
 
    fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), ()> {
 
        bincode::serialize_into(&mut self.stream, msg).map_err(drop)
 
    }
 
}
 
impl Connector {
 
    fn get_logger(&self) -> &dyn Logger {
 
        &*self.logger
 
    }
 
}
 

	
 
// #[derive(Debug)]
 
// pub enum Connector {
 
//     Unconfigured(Unconfigured),
 
//     Configured(Configured),
 
//     Connected(Connected), // TODO consider boxing. currently takes up a lot of stack space
 
// }
 
// #[derive(Debug)]
 
// pub struct Unconfigured {
 
//     pub controller_id: ControllerId,
 
// }
 
// #[derive(Debug)]
 
// pub struct Configured {
 
//     controller_id: ControllerId,
 
//     polarities: Vec<Polarity>,
 
//     bindings: HashMap<usize, PortBinding>,
 
//     protocol_description: Arc<ProtocolD>,
 
//     main_component: Vec<u8>,
 
//     logger: String,
 
// }
 
// #[derive(Debug)]
 
// pub struct Connected {
 
//     native_interface: Vec<(PortId, Polarity)>,
 
//     sync_batches: Vec<SyncBatch>,
 
//     // controller is cooperatively scheduled with the native application
 
//     // (except for transport layer behind Endpoints, which are managed by the OS)
 
//     // control flow is passed to the controller during methods on Connector (primarily, connect and sync).
 
//     controller: Controller,
 
// }
 

	
 
// #[derive(Debug, Copy, Clone)]
 
// pub enum PortBinding {
 
//     Native,
 
//     Active(SocketAddr),
 
//     Passive(SocketAddr),
 
// }
 

	
 
// #[derive(Debug)]
 
// struct Arena<T> {
 
//     storage: Vec<T>,
 
// }
 

	
 
// #[derive(Debug)]
 
// struct ReceivedMsg {
 
//     recipient: PortId,
 
//     msg: Msg,
 
// }
 

	
 
// #[derive(Debug)]
 
// struct MessengerState {
 
//     poll: Poll,
 
//     events: Events,
 
//     delayed: Vec<ReceivedMsg>,
 
//     undelayed: Vec<ReceivedMsg>,
 
//     polled_undrained: IndexSet<PortId>,
 
// }
 
// #[derive(Debug)]
 
// struct ChannelIdStream {
 
//     controller_id: ControllerId,
 
//     next_channel_index: ChannelIndex,
 
// }
 

	
 
// #[derive(Debug)]
 
// struct Controller {
 
//     protocol_description: Arc<ProtocolD>,
 
//     inner: ControllerInner,
 
//     ephemeral: ControllerEphemeral,
 
//     unrecoverable_error: Option<SyncErr>, // prevents future calls to Sync
 
// }
 
// #[derive(Debug)]
 
// struct ControllerInner {
 
//     round_index: usize,
 
//     channel_id_stream: ChannelIdStream,
 
//     endpoint_exts: Arena<EndpointExt>,
 
//     messenger_state: MessengerState,
 
//     mono_n: MonoN,       // state at next round start
 
//     mono_ps: Vec<MonoP>, // state at next round start
 
//     family: ControllerFamily,
 
//     logger: String,
 
// }
 

	
 
// /// This structure has its state entirely reset between synchronous rounds
 
// #[derive(Debug, Default)]
 
// struct ControllerEphemeral {
 
//     solution_storage: SolutionStorage,
 
//     poly_n: Option<PolyN>,
 
//     poly_ps: Vec<PolyP>,
 
//     mono_ps: Vec<MonoP>,
 
//     port_to_holder: HashMap<PortId, PolyId>,
 
// }
 

	
 
// #[derive(Debug)]
 
// struct ControllerFamily {
 
//     parent_port: Option<PortId>,
 
//     children_ports: Vec<PortId>,
 
// }
 

	
 
// #[derive(Debug)]
 
// pub(crate) enum SyncRunResult {
 
//     BlockingForRecv,
 
//     AllBranchesComplete,
 
//     NoBranches,
 
// }
 

	
 
// // Used to identify poly actors
 
// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
// enum PolyId {
 
//     N,
 
//     P { index: usize },
 
// }
 

	
 
// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
// pub(crate) enum SubtreeId {
 
//     PolyN,
 
//     PolyP { index: usize },
 
//     ChildController { port: PortId },
 
// }
 

	
 
// pub(crate) struct MonoPContext<'a> {
 
//     inner: &'a mut ControllerInner,
 
//     ports: &'a mut HashSet<PortId>,
 
//     mono_ps: &'a mut Vec<MonoP>,
 
// }
 
// pub(crate) struct PolyPContext<'a> {
 
//     my_subtree_id: SubtreeId,
 
//     inner: &'a mut ControllerInner,
 
//     solution_storage: &'a mut SolutionStorage,
 
// }
 
// impl PolyPContext<'_> {
 
//     #[inline(always)]
 
//     fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> {
 
//         let Self { solution_storage, my_subtree_id, inner } = self;
 
//         PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner }
 
//     }
 
// }
 
// struct BranchPContext<'m, 'r> {
 
//     m_ctx: PolyPContext<'m>,
 
//     ports: &'r HashSet<PortId>,
 
//     predicate: &'r Predicate,
 
//     inbox: &'r HashMap<PortId, Payload>,
 
// }
 

	
 
// #[derive(Default)]
 
// pub(crate) struct SolutionStorage {
 
//     old_local: HashSet<Predicate>,
 
//     new_local: HashSet<Predicate>,
 
//     // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
//     subtree_solutions: Vec<HashSet<Predicate>>,
 
//     subtree_id_to_index: HashMap<SubtreeId, usize>,
 
// }
 

	
 
// trait Messengerlike {
 
//     fn get_state_mut(&mut self) -> &mut MessengerState;
 
//     fn get_endpoint_mut(&mut self, eport: PortId) -> &mut Endpoint;
 

	
 
//     fn delay(&mut self, received: ReceivedMsg) {
 
//         self.get_state_mut().delayed.push(received);
 
//     }
 
//     fn undelay_all(&mut self) {
 
//         let MessengerState { delayed, undelayed, .. } = self.get_state_mut();
 
//         undelayed.extend(delayed.drain(..))
 
//     }
 

	
 
//     fn send(&mut self, to: PortId, msg: Msg) -> Result<(), EndpointErr> {
 
//         self.get_endpoint_mut(to).send(msg)
 
//     }
 

	
 
//     // attempt to receive a message from one of the endpoints before the deadline
 
//     fn recv(&mut self, deadline: Instant) -> Result<Option<ReceivedMsg>, MessengerRecvErr> {
 
//         // try get something buffered
 
//         if let Some(x) = self.get_state_mut().undelayed.pop() {
 
//             return Ok(Some(x));
 
//         }
 

	
 
//         loop {
 
//             // polled_undrained may not be empty
 
//             while let Some(eport) = self.get_state_mut().polled_undrained.pop() {
 
//                 if let Some(msg) = self
 
//                     .get_endpoint_mut(eport)
 
//                     .recv()
 
//                     .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))?
 
//                 {
 
//                     // this endpoint MAY still have messages! check again in future
 
//                     self.get_state_mut().polled_undrained.insert(eport);
 
//                     return Ok(Some(ReceivedMsg { recipient: eport, msg }));
 
//                 }
 
//             }
 

	
 
//             let state = self.get_state_mut();
 
//             match state.poll_events(deadline) {
 
//                 Ok(()) => {
 
//                     for e in state.events.iter() {
 
//                         state.polled_undrained.insert(PortId::from_token(e.token()));
 
//                     }
 
//                 }
 
//                 Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed),
 
//                 Err(PollDeadlineErr::Timeout) => return Ok(None),
 
//             }
 
//         }
 
//     }
 
//     fn recv_blocking(&mut self) -> Result<ReceivedMsg, MessengerRecvErr> {
 
//         // try get something buffered
 
//         if let Some(x) = self.get_state_mut().undelayed.pop() {
 
//             return Ok(x);
 
//         }
 

	
 
//         loop {
 
//             // polled_undrained may not be empty
 
//             while let Some(eport) = self.get_state_mut().polled_undrained.pop() {
 
//                 if let Some(msg) = self
 
//                     .get_endpoint_mut(eport)
 
//                     .recv()
 
//                     .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))?
 
//                 {
 
//                     // this endpoint MAY still have messages! check again in future
 
//                     self.get_state_mut().polled_undrained.insert(eport);
 
//                     return Ok(ReceivedMsg { recipient: eport, msg });
 
//                 }
 
//             }
 

	
 
//             let state = self.get_state_mut();
 

	
 
//             state
 
//                 .poll
 
//                 .poll(&mut state.events, None)
 
//                 .map_err(|_| MessengerRecvErr::PollingFailed)?;
 
//             for e in state.events.iter() {
 
//                 state.polled_undrained.insert(PortId::from_token(e.token()));
 
//             }
 
//         }
 
//     }
 
// }
 

	
 
// /////////////////////////////////
 
// impl Debug for SolutionStorage {
 
//     fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
//         f.pad("Solutions: [")?;
 
//         for (subtree_id, &index) in self.subtree_id_to_index.iter() {
 
//             let sols = &self.subtree_solutions[index];
 
//             f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?;
 
//         }
 
//         f.pad("]")
 
//     }
 
// }
 
// impl From<EvalErr> for SyncErr {
 
//     fn from(e: EvalErr) -> SyncErr {
 
//         SyncErr::EvalErr(e)
 
//     }
 
// }
 
// impl From<MessengerRecvErr> for SyncErr {
 
//     fn from(e: MessengerRecvErr) -> SyncErr {
 
//         SyncErr::MessengerRecvErr(e)
 
//     }
 
// }
 
// impl From<MessengerRecvErr> for ConnectErr {
 
//     fn from(e: MessengerRecvErr) -> ConnectErr {
 
//         ConnectErr::MessengerRecvErr(e)
 
//     }
 
// }
 
// impl<T> Default for Arena<T> {
 
//     fn default() -> Self {
 
//         Self { storage: vec![] }
 
//     }
 
// }
 
// impl<T> Arena<T> {
 
//     pub fn alloc(&mut self, t: T) -> PortId {
 
//         self.storage.push(t);
 
//         let l: u32 = self.storage.len().try_into().unwrap();
 
//         PortId::from_raw(l - 1u32)
 
//     }
 
//     pub fn get(&self, key: PortId) -> Option<&T> {
 
//         self.storage.get(key.to_raw() as usize)
 
//     }
 
//     pub fn get_mut(&mut self, key: PortId) -> Option<&mut T> {
 
//         self.storage.get_mut(key.to_raw() as usize)
 
//     }
 
//     pub fn type_convert<X>(self, f: impl FnMut((PortId, T)) -> X) -> Arena<X> {
 
//         Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() }
 
//     }
 
//     pub fn iter(&self) -> impl Iterator<Item = (PortId, &T)> {
 
//         self.keyspace().zip(self.storage.iter())
 
//     }
 
//     pub fn len(&self) -> usize {
 
//         self.storage.len()
 
//     }
 
//     pub fn keyspace(&self) -> impl Iterator<Item = PortId> {
 
//         (0u32..self.storage.len().try_into().unwrap()).map(PortId::from_raw)
 
//     }
 
// }
 

	
 
// impl ChannelIdStream {
 
//     fn new(controller_id: ControllerId) -> Self {
 
//         Self { controller_id, next_channel_index: 0 }
 
//     }
 
//     fn next(&mut self) -> ChannelId {
 
//         self.next_channel_index += 1;
 
//         ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 }
 
//     }
 
// }
 

	
 
// impl MessengerState {
 
//     // does NOT guarantee that events is non-empty
 
//     fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> {
 
//         use PollDeadlineErr::*;
 
//         self.events.clear();
 
//         let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
//         self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?;
 
//         Ok(())
 
//     }
 
// }
 
// impl From<PollDeadlineErr> for ConnectErr {
 
//     fn from(e: PollDeadlineErr) -> ConnectErr {
 
//         match e {
 
//             PollDeadlineErr::Timeout => ConnectErr::Timeout,
 
//             PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed,
 
//         }
 
//     }
 
// }
 

	
 
// impl std::ops::Not for Polarity {
 
//     type Output = Self;
 
//     fn not(self) -> Self::Output {
 
//         use Polarity::*;
 
//         match self {
 
//             Putter => Getter,
 
//             Getter => Putter,
 
//         }
 
//     }
 
// }
 

	
 
// impl Predicate {
 
//     // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
//     pub fn satisfies(&self, other: &Self) -> bool {
 
//         let mut s_it = self.assigned.iter();
 
//         let mut s = if let Some(s) = s_it.next() {
 
//             s
 
//         } else {
 
//             return other.assigned.is_empty();
 
//         };
 
//         for (oid, ob) in other.assigned.iter() {
 
//             while s.0 < oid {
 
//                 s = if let Some(s) = s_it.next() {
 
//                     s
 
//                 } else {
 
//                     return false;
 
//                 };
 
//             }
 
//             if s.0 > oid || s.1 != ob {
 
//                 return false;
 
//             }
 
//         }
 
//         true
 
//     }
 

	
 
//     /// Given self and other, two predicates, return the most general Predicate possible, N
 
//     /// such that n.satisfies(self) && n.satisfies(other).
 
//     /// If none exists Nonexistant is returned.
 
//     /// If the resulting predicate is equivlanet to self, other, or both,
 
//     /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
//     /// otherwise New(N) is returned.
 
//     pub fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
//         use CommonSatResult::*;
 
//         // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
//         let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
//         let [mut s, mut o] = [s_it.next(), o_it.next()];
 
//         // lists of assignments in self but not other and vice versa.
 
//         let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
//         loop {
 
//             match [s, o] {
 
//                 [None, None] => break,
 
//                 [None, Some(x)] => {
 
//                     o_not_s.push(x);
 
//                     o_not_s.extend(o_it);
 
//                     break;
 
//                 }
 
//                 [Some(x), None] => {
 
//                     s_not_o.push(x);
 
//                     s_not_o.extend(s_it);
 
//                     break;
 
//                 }
 
//                 [Some((sid, sb)), Some((oid, ob))] => {
 
//                     if sid < oid {
 
//                         // o is missing this element
 
//                         s_not_o.push((sid, sb));
 
//                         s = s_it.next();
 
//                     } else if sid > oid {
 
//                         // s is missing this element
 
//                         o_not_s.push((oid, ob));
 
//                         o = o_it.next();
 
//                     } else if sb != ob {
 
//                         assert_eq!(sid, oid);
 
//                         // both predicates assign the variable but differ on the value
 
//                         return Nonexistant;
 
//                     } else {
 
//                         // both predicates assign the variable to the same value
 
//                         s = s_it.next();
 
//                         o = o_it.next();
 
//                     }
 
//                 }
 
//             }
 
//         }
 
//         // Observed zero inconsistencies. A unified predicate exists...
 
//         match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
//             [true, true] => Equivalent,       // ... equivalent to both.
 
//             [false, true] => FormerNotLatter, // ... equivalent to self.
 
//             [true, false] => LatterNotFormer, // ... equivalent to other.
 
//             [false, false] => {
 
//                 // ... which is the union of the predicates' assignments but
 
//                 //     is equivalent to neither self nor other.
 
//                 let mut new = self.clone();
 
//                 for (&id, &b) in o_not_s {
 
//                     new.assigned.insert(id, b);
 
//                 }
 
//                 New(new)
 
//             }
 
//         }
 
//     }
 

	
 
//     pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = ChannelId> + '_ {
 
//         self.assigned
 
//             .iter()
 
//             .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
//     }
 

	
 
//     pub fn batch_assign_nones(
 
//         &mut self,
 
//         channel_ids: impl Iterator<Item = ChannelId>,
 
//         value: bool,
 
//     ) {
 
//         for channel_id in channel_ids {
 
//             self.assigned.entry(channel_id).or_insert(value);
 
//         }
 
//     }
 
//     pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option<bool> {
 
//         self.assigned.insert(channel_id, value)
 
//     }
 
//     pub fn union_with(&self, other: &Self) -> Option<Self> {
 
//         let mut res = self.clone();
 
//         for (&channel_id, &assignment_1) in other.assigned.iter() {
 
//             match res.assigned.insert(channel_id, assignment_1) {
 
//                 Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
//                 _ => {}
 
//             }
 
//         }
 
//         Some(res)
 
//     }
 
//     pub fn query(&self, x: ChannelId) -> Option<bool> {
 
//         self.assigned.get(&x).copied()
 
//     }
 
//     pub fn new_trivial() -> Self {
 
//         Self { assigned: Default::default() }
 
//     }
 
// }
 

	
 
// #[test]
 
// fn pred_sat() {
 
//     use maplit::btreemap;
 
//     let mut c = ChannelIdStream::new(0);
 
//     let ch = std::iter::repeat_with(move || c.next()).take(5).collect::<Vec<_>>();
 
//     let p = Predicate::new_trivial();
 
//     let p_0t = Predicate { assigned: btreemap! { ch[0] => true } };
 
//     let p_0f = Predicate { assigned: btreemap! { ch[0] => false } };
 
//     let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } };
 
//     let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } };
 

	
 
//     assert!(p.satisfies(&p));
 
//     assert!(p_0t.satisfies(&p_0t));
 
//     assert!(p_0f.satisfies(&p_0f));
 
//     assert!(p_0f_3f.satisfies(&p_0f_3f));
 
//     assert!(p_0f_3t.satisfies(&p_0f_3t));
 

	
 
//     assert!(p_0t.satisfies(&p));
 
//     assert!(p_0f.satisfies(&p));
 
//     assert!(p_0f_3f.satisfies(&p_0f));
 
//     assert!(p_0f_3t.satisfies(&p_0f));
 

	
 
//     assert!(!p.satisfies(&p_0t));
 
//     assert!(!p.satisfies(&p_0f));
 
//     assert!(!p_0f.satisfies(&p_0t));
 
//     assert!(!p_0t.satisfies(&p_0f));
 
//     assert!(!p_0f_3f.satisfies(&p_0f_3t));
 
//     assert!(!p_0f_3t.satisfies(&p_0f_3f));
 
//     assert!(!p_0t.satisfies(&p_0f_3f));
 
//     assert!(!p_0f.satisfies(&p_0f_3f));
 
//     assert!(!p_0t.satisfies(&p_0f_3t));
 
//     assert!(!p_0f.satisfies(&p_0f_3t));
 
// }
 

	
 
// #[test]
 
// fn pred_common_sat() {
 
//     use maplit::btreemap;
 
//     use CommonSatResult::*;
 

	
 
//     let mut c = ChannelIdStream::new(0);
 
//     let ch = std::iter::repeat_with(move || c.next()).take(5).collect::<Vec<_>>();
 
//     let p = Predicate::new_trivial();
 
//     let p_0t = Predicate { assigned: btreemap! { ch[0] => true } };
 
//     let p_0f = Predicate { assigned: btreemap! { ch[0] => false } };
 
//     let p_3f = Predicate { assigned: btreemap! { ch[3] => false } };
 
//     let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } };
 
//     let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } };
 

	
 
//     assert_eq![p.common_satisfier(&p), Equivalent];
 
//     assert_eq![p_0t.common_satisfier(&p_0t), Equivalent];
 

	
 
//     assert_eq![p.common_satisfier(&p_0t), LatterNotFormer];
 
//     assert_eq![p_0t.common_satisfier(&p), FormerNotLatter];
 

	
 
//     assert_eq![p_0t.common_satisfier(&p_0f), Nonexistant];
 
//     assert_eq![p_0f_3t.common_satisfier(&p_0f_3f), Nonexistant];
 
//     assert_eq![p_0f_3t.common_satisfier(&p_3f), Nonexistant];
 
//     assert_eq![p_3f.common_satisfier(&p_0f_3t), Nonexistant];
 

	
 
//     assert_eq![p_0f.common_satisfier(&p_3f), New(p_0f_3f)];
 
// }
src/runtime/my_tests.rs
Show inline comments
 
new file 100644
 
use crate as reowolf;
 
use reowolf::Polarity::*;
 
use std::net::SocketAddr;
 
use std::{sync::Arc, time::Duration};
 

	
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 

	
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<reowolf::ProtocolDescription> =
 
        { Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap()) };
 
}
 

	
 
#[test]
 
fn simple_connector() {
 
    let c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn add_port_pair() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, _] = c.add_port_pair();
 
    let [_, _] = c.add_port_pair();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn add_sync() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.add_port_pair();
 
    c.add_component(b"sync", &[i, o]).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn add_net_port() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let sock_addr = next_test_addr();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Getter, sock_addr, is_active: false })
 
        .unwrap();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Putter, sock_addr, is_active: true })
 
        .unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let sock_addr = next_test_addr();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Getter, sock_addr, is_active: false })
 
        .unwrap();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Putter, sock_addr, is_active: true })
 
        .unwrap();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    println!("{:#?}", c);
 
    c.get_logger().dump_log(&mut std::io::stdout().lock());
 
}
src/runtime/serde.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{
 
    endpoint::{CommMsg, CommMsgContents, Decision, EndpointInfo, Msg, SetupMsg},
 
    Predicate,
 
};
 
use crate::runtime::*;
 
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
 
use std::io::{ErrorKind::InvalidData, Read, Write};
 

	
 
@@ -53,6 +50,19 @@ macro_rules! ser_seq {
 
}
 
/////////////////////////////////////////
 

	
 
impl<W: Write> Ser<PortId> for W {
 
    fn ser(&mut self, t: &PortId) -> Result<(), std::io::Error> {
 
        self.ser(&t.controller_id)?;
 
        self.ser(&VarLenInt(t.port_index as u64))
 
    }
 
}
 

	
 
impl<R: Read> De<PortId> for R {
 
    fn de(&mut self) -> Result<PortId, std::io::Error> {
 
        Ok(PortId { controller_id: self.de()?, port_index: De::<VarLenInt>::de(self)?.0 as u32 })
 
    }
 
}
 

	
 
impl<W: Write> Ser<bool> for W {
 
    fn ser(&mut self, t: &bool) -> Result<(), std::io::Error> {
 
        self.ser(&match t {
 
@@ -147,21 +157,6 @@ impl<R: Read> De<VarLenInt> for R {
 
    }
 
}
 

	
 
impl<W: Write> Ser<ChannelId> for W {
 
    fn ser(&mut self, t: &ChannelId) -> Result<(), std::io::Error> {
 
        self.ser(&t.controller_id)?;
 
        self.ser(&VarLenInt(t.channel_index as u64))
 
    }
 
}
 
impl<R: Read> De<ChannelId> for R {
 
    fn de(&mut self) -> Result<ChannelId, std::io::Error> {
 
        Ok(ChannelId {
 
            controller_id: self.de()?,
 
            channel_index: De::<VarLenInt>::de(self)?.0 as ChannelIndex,
 
        })
 
    }
 
}
 

	
 
impl<W: Write> Ser<Predicate> for W {
 
    fn ser(&mut self, t: &Predicate) -> Result<(), std::io::Error> {
 
        self.ser(&VarLenInt(t.assigned.len() as u64))?;
 
@@ -174,7 +169,7 @@ impl<W: Write> Ser<Predicate> for W {
 
impl<R: Read> De<Predicate> for R {
 
    fn de(&mut self) -> Result<Predicate, std::io::Error> {
 
        let VarLenInt(len) = self.de()?;
 
        let mut assigned = BTreeMap::<ChannelId, bool>::default();
 
        let mut assigned = BTreeMap::<PortId, bool>::default();
 
        for _ in 0..len {
 
            assigned.insert(self.de()?, self.de()?);
 
        }
 
@@ -221,26 +216,13 @@ impl<R: Read> De<Polarity> for R {
 
        })
 
    }
 
}
 

	
 
impl<W: Write> Ser<EndpointInfo> for W {
 
    fn ser(&mut self, t: &EndpointInfo) -> Result<(), std::io::Error> {
 
        let EndpointInfo { channel_id, polarity } = t;
 
        ser_seq![self, channel_id, polarity]
 
    }
 
}
 
impl<R: Read> De<EndpointInfo> for R {
 
    fn de(&mut self) -> Result<EndpointInfo, std::io::Error> {
 
        Ok(EndpointInfo { channel_id: self.de()?, polarity: self.de()? })
 
    }
 
}
 

	
 
impl<W: Write> Ser<Msg> for W {
 
    fn ser(&mut self, t: &Msg) -> Result<(), std::io::Error> {
 
        use {CommMsgContents::*, SetupMsg::*};
 
        match t {
 
            Msg::SetupMsg(s) => match s {
 
                // [flag, data]
 
                ChannelSetup { info } => ser_seq![self, &0u8, info],
 
                MyPortInfo { polarity, port } => ser_seq![self, &0u8, polarity, port],
 
                LeaderEcho { maybe_leader } => ser_seq![self, &1u8, maybe_leader],
 
                LeaderAnnounce { leader } => ser_seq![self, &2u8, leader],
 
                YouAreMyParent => ser_seq![self, &3u8],
 
@@ -267,7 +249,7 @@ impl<R: Read> De<Msg> for R {
 
        Ok(match b {
 
            0..=3 => Msg::SetupMsg(match b {
 
                // [flag, data]
 
                0u8 => ChannelSetup { info: self.de()? },
 
                0u8 => MyPortInfo { polarity: self.de()?, port: self.de()? },
 
                1u8 => LeaderEcho { maybe_leader: self.de()? },
 
                2u8 => LeaderAnnounce { leader: self.de()? },
 
                3u8 => YouAreMyParent,
src/runtime/setup.rs
Show inline comments
 
@@ -25,7 +25,7 @@ impl Controller {
 
        bound_proto_interface: &[(PortBinding, Polarity)],
 
        logger: &mut String,
 
        deadline: Instant,
 
    ) -> Result<(Self, Vec<(Port, Polarity)>), ConnectErr> {
 
    ) -> Result<(Self, Vec<(PortId, Polarity)>), ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 
@@ -215,7 +215,7 @@ impl Controller {
 
            for event in ms.events.iter() {
 
                log!(logger, "event {:#?}", event);
 
                let token = event.token();
 
                let port = Port::from_token(token);
 
                let port = PortId::from_token(token);
 
                let entry = endpoint_ext_todos.get_mut(port).unwrap();
 
                match entry {
 
                    Finished(_) => {
 
@@ -325,7 +325,7 @@ impl Controller {
 
        logger: &mut String,
 
        endpoint_exts: &mut Arena<EndpointExt>,
 
        messenger_state: &mut MessengerState,
 
        neighbors: Vec<Port>,
 
        neighbors: Vec<PortId>,
 
        deadline: Instant,
 
    ) -> Result<ControllerFamily, ConnectErr> {
 
        use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*};
 
@@ -337,7 +337,7 @@ impl Controller {
 
            fn get_state_mut(&mut self) -> &mut MessengerState {
 
                self.0
 
            }
 
            fn get_endpoint_mut(&mut self, port: Port) -> &mut Endpoint {
 
            fn get_endpoint_mut(&mut self, port: PortId) -> &mut Endpoint {
 
                &mut self.1.get_mut(port).expect("OUT OF BOUNDS").endpoint
 
            }
 
        }
 
@@ -353,7 +353,7 @@ impl Controller {
 

	
 
        // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
        //    adopt it as leader, sender as parent, and reset the await set.
 
        let mut parent: Option<Port> = None;
 
        let mut parent: Option<PortId> = None;
 
        let mut my_leader = major;
 
        messenger.undelay_all();
 
        'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
@@ -478,7 +478,7 @@ impl Messengerlike for Controller {
 
    fn get_state_mut(&mut self) -> &mut MessengerState {
 
        &mut self.inner.messenger_state
 
    }
 
    fn get_endpoint_mut(&mut self, port: Port) -> &mut Endpoint {
 
    fn get_endpoint_mut(&mut self, port: PortId) -> &mut Endpoint {
 
        &mut self.inner.endpoint_exts.get_mut(port).expect("OUT OF BOUNDS").endpoint
 
    }
 
}
src/runtime/setup2.rs
Show inline comments
 
new file 100644
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
impl Connector {
 
    pub fn new_simple(
 
        proto_description: Arc<ProtocolDescription>,
 
        controller_id: ControllerId,
 
    ) -> Self {
 
        let logger = Box::new(StringLogger::new(controller_id));
 
        let surplus_sockets = 8;
 
        Self::new(logger, proto_description, controller_id, surplus_sockets)
 
    }
 
    pub fn new(
 
        logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        controller_id: ControllerId,
 
        surplus_sockets: u16,
 
    ) -> Self {
 
        Self {
 
            logger,
 
            proto_description,
 
            id_manager: IdManager::new(controller_id),
 
            native_ports: Default::default(),
 
            proto_components: Default::default(),
 
            outp_to_inp: Default::default(),
 
            inp_to_route: Default::default(),
 
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
 
        }
 
    }
 
    pub fn add_port_pair(&mut self) -> [PortId; 2] {
 
        let o = self.id_manager.next_port();
 
        let i = self.id_manager.next_port();
 
        self.outp_to_inp.insert(o, i);
 
        self.inp_to_route.insert(i, InpRoute::NativeComponent);
 
        self.native_ports.insert(o);
 
        self.native_ports.insert(i);
 
        log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result<PortId, ()> {
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let p = self.id_manager.next_port();
 
                self.native_ports.insert(p);
 
                log!(self.logger, "Added net port {:?} with info {:?} ", p, &endpoint_setup);
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
    }
 
    fn check_polarity(&self, port: &PortId) -> Polarity {
 
        if let ConnectorPhased::Setup { endpoint_setups, .. } = &self.phased {
 
            for (setup_port, EndpointSetup { polarity, .. }) in endpoint_setups.iter() {
 
                if setup_port == port {
 
                    // special case. this port's polarity isn't reflected by
 
                    // self.inp_to_route or self.outp_to_inp, because its still not paired to a peer
 
                    return *polarity;
 
                }
 
            }
 
        }
 
        if self.outp_to_inp.contains_key(port) {
 
            Polarity::Putter
 
        } else {
 
            assert!(self.inp_to_route.contains_key(port));
 
            Polarity::Getter
 
        }
 
    }
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        use AddComponentError::*;
 
        let polarities = self.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !self.native_ports.contains(port) {
 
                return Err(UnknownPort(*port));
 
            }
 
            if expected_polarity != self.check_polarity(port) {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // ok!
 
        let state = self.proto_description.new_main_component(identifier, ports);
 
        let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state };
 
        let proto_component_index = self.proto_components.len();
 
        self.proto_components.push(proto_component);
 
        for port in ports.iter() {
 
            if let Polarity::Getter = self.check_polarity(port) {
 
                self.inp_to_route
 
                    .insert(*port, InpRoute::ProtoComponent { index: proto_component_index });
 
            }
 
        }
 
        Ok(())
 
    }
 
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> {
 
        match &mut self.phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(self.logger, "Call to connecting in connected state");
 
                Err(())
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout);
 
                let deadline = Instant::now() + timeout;
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let (mut endpoint_exts, mut endpoint_poller) = init_endpoints(
 
                    &mut *self.logger,
 
                    endpoint_setups,
 
                    &mut self.inp_to_route,
 
                    deadline,
 
                )?;
 
                log!(self.logger, "Successfully connected {} endpoints", endpoint_exts.len());
 
                // leader election and tree construction
 
                let neighborhood =
 
                    init_neighborhood(&mut *self.logger, &mut endpoint_exts, &mut endpoint_poller)?;
 
                log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // TODO session optimization goes here
 
                self.phased = ConnectorPhased::Communication {
 
                    endpoint_poller,
 
                    endpoint_exts,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                };
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 

	
 
fn init_endpoints(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    inp_to_route: &mut HashMap<PortId, InpRoute>,
 
    deadline: Instant,
 
) -> Result<(Vec<EndpointExt>, EndpointPoller), ()> {
 
    use mio07::{
 
        net::{TcpListener, TcpStream},
 
        Events, Interest, Poll, Token,
 
    };
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: EndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    enum TodoEndpoint {
 
        Listener(TcpListener),
 
        Endpoint(Endpoint),
 
    }
 
    fn init(
 
        token: Token,
 
        local_port: PortId,
 
        endpoint_setup: &EndpointSetup,
 
        poll: &mut Poll,
 
    ) -> Result<Todo, ()> {
 
        let todo_endpoint = if endpoint_setup.is_active {
 
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr).map_err(drop)?;
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr).map_err(drop)?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Listener(listener)
 
        };
 
        Ok(Todo {
 
            todo_endpoint,
 
            endpoint_setup: endpoint_setup.clone(),
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
        })
 
    };
 
    ////////////////////////
 

	
 
    let mut ep = EndpointPoller {
 
        poll: Poll::new().map_err(drop)?,
 
        events: Events::with_capacity(64),
 
        undrained_endpoints: Default::default(),
 
        delayed_inp_messages: Default::default(),
 
    };
 

	
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init(Token(index), *local_port, endpoint_setup, &mut ep.poll)
 
        })
 
        .collect::<Result<Vec<Todo>, _>>()?;
 

	
 
    let mut unfinished: HashSet<usize> = (0..todos.len()).collect();
 
    while !unfinished.is_empty() {
 
        let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?;
 
        ep.poll.poll(&mut ep.events, Some(remaining)).map_err(drop)?;
 
        for event in ep.events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint {
 
                let (mut stream, peer_addr) = listener.accept().map_err(drop)?;
 
                ep.poll.registry().deregister(listener).unwrap();
 
                ep.poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr);
 
                let endpoint = Endpoint { stream, inbox: vec![] };
 
                todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
            }
 
            match todo {
 
                Todo {
 
                    todo_endpoint: TodoEndpoint::Endpoint(endpoint),
 
                    local_port,
 
                    endpoint_setup,
 
                    sent_local_port,
 
                    recv_peer_port,
 
                } => {
 
                    if !unfinished.contains(&index) {
 
                        continue;
 
                    }
 
                    if event.is_writable() && !*sent_local_port {
 
                        let msg =
 
                            MyPortInfo { polarity: endpoint_setup.polarity, port: *local_port };
 
                        endpoint.send(&msg)?;
 
                        log!(logger, "endpoint[{}] sent peer info {:?}", index, &msg);
 
                        *sent_local_port = true;
 
                    }
 
                    if event.is_readable() && recv_peer_port.is_none() {
 
                        ep.undrained_endpoints.insert(index);
 
                        if let Some(peer_port_info) =
 
                            endpoint.try_recv::<MyPortInfo>().map_err(drop)?
 
                        {
 
                            log!(logger, "endpoint[{}] got peer info {:?}", index, peer_port_info);
 
                            assert!(peer_port_info.polarity != endpoint_setup.polarity);
 
                            if let Putter = endpoint_setup.polarity {
 
                                inp_to_route.insert(*local_port, InpRoute::Endpoint { index });
 
                            }
 
                            *recv_peer_port = Some(peer_port_info.port);
 
                        }
 
                    }
 
                    if *sent_local_port && recv_peer_port.is_some() {
 
                        unfinished.remove(&index);
 
                        log!(logger, "endpoint[{}] is finished!", index);
 
                    }
 
                }
 
                Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(),
 
            }
 
        }
 
        ep.events.clear();
 
    }
 
    let endpoint_exts = todos
 
        .into_iter()
 
        .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
 
            endpoint: match todo_endpoint {
 
                TodoEndpoint::Endpoint(endpoint) => endpoint,
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
            },
 
            inp_for_emerging_msgs: recv_peer_port.unwrap(),
 
        })
 
        .collect();
 
    Ok((endpoint_exts, ep))
 
}
 

	
 
fn init_neighborhood(
 
    logger: &mut dyn Logger,
 
    endpoint_exts: &mut [EndpointExt],
 
    endpoint_poller: &mut EndpointPoller,
 
) -> Result<Neighborhood, ()> {
 
    log!(logger, "Time to construct my neighborhood");
 
    let parent = None;
 
    let children = Default::default();
 
    Ok(Neighborhood { parent, children })
 
}
src/runtime/v2.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::endpoint::Endpoint;
 
use crate::runtime::endpoint::Msg;
 
use crate::runtime::ProtocolD;
 
use crate::runtime::ProtocolS;
 
use std::io::Write;
 
use crate::common::{
 
    Arc, ControllerId, Duration, HashMap, HashSet, Instant, Payload, Polarity, Port, PortId,
 
    ProtocolDescription, SocketAddr,
 
};
 
use crate::runtime::endpoint::{Endpoint, Msg};
 
use crate::runtime::*;
 

	
 
#[derive(Default)]
 
struct IntStream {
 
    next: u32,
 
}
 
struct IdManager {
 
    controller_id: ControllerId,
 
    port_suffix_stream: IntStream,
 
}
 

	
 
struct ProtoComponent {
 
    state: ProtocolS,
 
    ports: HashSet<PortId>,
 
}
 
enum InpRoute {
 
    NativeComponent,
 
    ProtoComponent { index: usize },
 
    Endpoint { index: usize },
 
}
 
trait Logger {
 
    fn line_writer(&mut self) -> &mut dyn Write;
 
}
 
#[derive(Clone)]
 
struct EndpointSetup {
 
    polarity: Polarity,
 
    sock_addr: SocketAddr,
 
    is_active: bool,
 
}
 
struct EndpointExt {
 
    net_endpoint: Endpoint,
 
    // data-messages emerging from this endpoint are destined for this inp
 
    inp: Port,
 
}
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: Vec<usize>, // ordered, deduplicated
 
}
 
struct MemInMsg {
 
    inp: Port,
 
    msg: Payload,
 
}
 
struct EndpointPoller {
 
    poll: Poll,
 
    events: Events,
 
    undrained_endpoints: HashSet<usize>,
 
    delayed_inp_messages: Vec<(Port, Msg)>,
 
}
 
struct Connector {
 
    logger: Box<dyn Logger>,
 
    proto_description: Arc<ProtocolD>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    proto_components: Vec<ProtoComponent>,
 
    outp_to_inp: HashMap<PortId, PortId>,
 
    inp_to_route: HashMap<PortId, InpRoute>,
 
    phased: ConnectorPhased,
 
}
 
enum ConnectorPhased {
 
    Setup {
 
        endpoint_setups: Vec<(PortId, EndpointSetup)>,
 
        surplus_sockets: u16,
 
    },
 
    Communication {
 
        endpoint_poller: EndpointPoller,
 
        endpoint_exts: Vec<EndpointExt>,
 
        neighborhood: Neighborhood,
 
        mem_inbox: Vec<MemInMsg>,
 
    },
 
}
 
/////////////////////////////
 
impl IntStream {
 
    fn next(&mut self) -> u32 {
 
        if self.next == u32::MAX {
 
            panic!("NO NEXT!")
 
        }
 
        self.next += 1;
 
        self.next - 1
 
    }
 
}
 
impl IdManager {
 
    fn next_port(&mut self) -> PortId {
 
        let port_suffix = self.port_suffix_stream.next();
 
        let controller_id = self.controller_id;
 
        PortId { controller_id, port_index: port_suffix }
 
    }
 
    fn new(controller_id: ControllerId) -> Self {
 
        Self { controller_id, port_suffix_stream: Default::default() }
 
    }
 
}
 
impl Connector {
 
    pub fn new(
 
        logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolD>,
 
        controller_id: ControllerId,
 
        surplus_sockets: u16,
 
    ) -> Self {
 
        Self {
 
            logger,
 
            proto_description,
 
            id_manager: IdManager::new(controller_id),
 
            native_ports: Default::default(),
 
            proto_components: Default::default(),
 
            outp_to_inp: Default::default(),
 
            inp_to_route: Default::default(),
 
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
 
        }
 
    }
 
    pub fn add_port_pair(&mut self) -> [PortId; 2] {
 
        let o = self.id_manager.next_port();
 
        let i = self.id_manager.next_port();
 
        self.outp_to_inp.insert(o, i);
 
        self.native_ports.insert(o);
 
        self.native_ports.insert(i);
 
        [o, i]
 
    }
 
    pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result<PortId, ()> {
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let p = self.id_manager.next_port();
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
    }
 
    fn check_polarity(&self, port: &PortId) -> Polarity {
 
        if self.outp_to_inp.contains_key(port) {
 
            Polarity::Putter
 
        } else {
 
            assert!(self.inp_to_route.contains_key(port));
 
            Polarity::Getter
 
        }
 
    }
 
    pub fn add_proto_component(&mut self, identifier: &[u8], ports: &[PortId]) -> Result<(), ()> {
 
        let polarities = self.proto_description.component_polarities(identifier).map_err(drop)?;
 
        if polarities.len() != ports.len() {
 
            return Err(());
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !self.native_ports.contains(port) {
 
                return Err(());
 
            }
 
            if expected_polarity != self.check_polarity(port) {
 
                return Err(());
 
            }
 
        }
 
        // ok!
 
        let state = self.proto_description.new_main_component(identifier, ports);
 
        let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state };
 
        let proto_component_index = self.proto_components.len();
 
        self.proto_components.push(proto_component);
 
        for port in ports.iter() {
 
            if let Polarity::Getter = self.check_polarity(port) {
 
                self.inp_to_route
 
                    .insert(*port, InpRoute::ProtoComponent { index: proto_component_index });
 
            }
 
        }
 
        Ok(())
 
    }
 
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> {
 
        match &mut self.phased {
 
            ConnectorPhased::Communication { .. } => Err(()),
 
            ConnectorPhased::Setup { endpoint_setups, surplus_sockets } => {
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let (mut endpoint_exts, mut endpoint_poller) =
 
                    init_endpoints(endpoint_setups, timeout)?;
 
                write!(
 
                    self.logger.line_writer(),
 
                    "hello! I am controller_id:{}",
 
                    self.id_manager.controller_id
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(&mut endpoint_exts, &mut endpoint_poller)?;
 
                // TODO session optimization goes here
 
                self.phased = ConnectorPhased::Communication {
 
                    endpoint_poller,
 
                    endpoint_exts,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                };
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 

	
 
fn init_endpoints(
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    timeout: Duration,
 
) -> Result<(Vec<EndpointExt>, EndpointPoller), ()> {
 
    let mut endpoint_poller = EndpointPoller {
 
        poll: Poll::new().map_err(drop)?,
 
        events: Events::with_capacity(64),
 
        undrained_endpoints: Default::default(),
 
        delayed_inp_messages: Default::default(),
 
    };
 
    const PORT_ID_LEN: usize = std::mem::size_of::<PortId>();
 
    enum MaybeRecvPort {
 
        Complete(Port),
 
        Partial { buf: [u8; PORT_ID_LEN], read: u8 },
 
    }
 
    struct Todo {
 
        endpoint: TodoEndpoint,
 
        polarity: Polarity,
 
        local_port: Port,
 
        sent_local_port: bool,
 
        recv_peer_port: MaybeRecvPort,
 
    }
 
    enum TodoEndpoint {
 
        Listener(mio::net::TcpListener),
 
        Stream(mio::net::TcpStream),
 
    }
 
    const BOTH: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE);
 
    fn init(
 
        token: Token,
 
        local_port: Port,
 
        endpoint_setup: &EndpointSetup,
 
        poll: &mut Poll,
 
    ) -> Result<Todo, ()> {
 
        let endpoint = if endpoint_setup.is_active {
 
            let mut stream =
 
                mio::net::TcpStream::connect(&endpoint_setup.sock_addr).map_err(drop)?;
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::Stream(stream)
 
        } else {
 
            let mut listener =
 
                mio::net::TcpListener::bind(&endpoint_setup.sock_addr).map_err(drop)?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Listener(listener)
 
        };
 
        Ok(Todo {
 
            endpoint,
 
            endpoint_setup: endpoint_setup.clone(),
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: MaybeRecvPort::Partial { buf: [0; 8], read: 0 },
 
        })
 
    };
 

	
 
    let todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init(Token(index), local_port, endpoint_setup, &mut endpoint_poller.poll)
 
        })
 
        .collect::<Result<Vec<Todo>, _>>()?;
 
    let endpoint_exts = vec![];
 
    Ok((endpoint_exts, endpoint_poller))
 
}
 

	
 
fn init_neighborhood(
 
    endpoint_exts: &mut [EndpointExt],
 
    endpoint_poller: &mut EndpointPoller,
 
) -> Result<Neighborhood, ()> {
 
    todo!()
 
}
0 comments (0 inline, 0 general)