From 44a98be4e4b4329fc2f6816973481f93207f84de 2020-06-19 17:04:03 From: Christopher Esterhuyse Date: 2020-06-19 17:04:03 Subject: [PATCH] beginning large overhaul: moving to globally-unique ports & port -> endpoint route mappings --- diff --git a/Cargo.toml b/Cargo.toml index f32b593bd1b50de04db616b3f2c5057863479f39..ed868539539bbcd2b9428c4396b049087982cb50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ maplit = "1.0.2" derive_more = "0.99.2" # runtime +bincode = "1.2.1" serde = { version = "1.0.112", features = ["derive"] } getrandom = "0.1.14" # tiny crate. used to guess controller-id take_mut = "0.2.2" @@ -21,8 +22,9 @@ indexmap = "1.3.0" # hashsets/hashmaps with efficient arbitrary element removal # network integer-encoding = "1.0.7" byteorder = "1.3.2" -mio = "0.6.21" # migrate to mio v0.7.0 when it stabilizes +mio = "0.6.21" mio-extras = "2.0.6" +mio07 = { version = "0.7.0", package = "mio", features = ["tcp", "os-poll"] } # protocol # id-arena = "2.2.1" diff --git a/src/common.rs b/src/common.rs index 810d1d29f9a313a7005336f337ad784916cfee61..633372524b6c50ed005f0ddf0263f79bf8b427cd 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,5 +1,8 @@ ///////////////////// PRELUDE ///////////////////// +pub use crate::protocol::{ComponentState, ProtocolDescription}; +pub use crate::runtime::{NonsyncContext, SyncContext}; + pub use core::{ cmp::Ordering, fmt::{Debug, Formatter}, @@ -16,6 +19,7 @@ pub use mio::{ pub use std::{ collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, convert::TryInto, + io::{Read, Write}, net::SocketAddr, sync::Arc, time::Instant, @@ -25,98 +29,53 @@ pub use Polarity::*; ///////////////////// DEFS ///////////////////// pub type ControllerId = u32; -pub type ChannelIndex = u32; +pub type PortSuffix = u32; -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd)] +// globally unique +#[derive( + Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, +)] pub struct PortId { pub(crate) controller_id: ControllerId, - pub(crate) port_index: u32, + pub(crate) port_index: PortSuffix, } #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] pub struct Payload(Arc>); -/// This is a unique identifier for a channel (i.e., port). -#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)] -pub struct ChannelId { - pub(crate) controller_id: ControllerId, - pub(crate) channel_index: ChannelIndex, -} - -#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)] +#[derive( + Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize, +)] pub enum Polarity { Putter, // output port (from the perspective of the component) Getter, // input port (from the perspective of the component) } -#[derive( - Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone, serde::Serialize, serde::Deserialize, -)] -#[repr(C)] -pub struct Port(pub u32); // ports are COPY - #[derive(Eq, PartialEq, Copy, Clone, Debug)] -pub enum MainComponentErr { +pub enum AddComponentError { NoSuchComponent, NonPortTypeParameters, - CannotMovePort(Port), + CannotMovePort(PortId), WrongNumberOfParamaters { expected: usize }, - UnknownPort(Port), - WrongPortPolarity { param_index: usize, port: Port }, - DuplicateMovedPort(Port), -} -pub trait ProtocolDescription: Sized { - type S: ComponentState; - - fn parse(pdl: &[u8]) -> Result; - fn component_polarities(&self, identifier: &[u8]) -> Result, MainComponentErr>; - fn new_main_component(&self, identifier: &[u8], ports: &[Port]) -> Self::S; -} - -pub trait ComponentState: Sized + Clone { - type D: ProtocolDescription; - fn pre_sync_run>( - &mut self, - runtime_ctx: &mut C, - protocol_description: &Self::D, - ) -> MonoBlocker; - - fn sync_run>( - &mut self, - runtime_ctx: &mut C, - protocol_description: &Self::D, - ) -> PolyBlocker; + UnknownPort(PortId), + WrongPortPolarity { port: PortId, expected_polarity: Polarity }, + DuplicateMovedPort(PortId), } #[derive(Debug, Clone)] -pub enum MonoBlocker { +pub enum NonsyncBlocker { Inconsistent, ComponentExit, SyncBlockStart, } #[derive(Debug, Clone)] -pub enum PolyBlocker { +pub enum SyncBlocker { Inconsistent, SyncBlockEnd, - CouldntReadMsg(Port), - CouldntCheckFiring(Port), - PutMsg(Port, Payload), -} - -pub trait MonoContext { - type D: ProtocolDescription; - type S: ComponentState; - - fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S); - fn new_channel(&mut self) -> [Port; 2]; - fn new_random(&mut self) -> u64; -} -pub trait PolyContext { - type D: ProtocolDescription; - - fn is_firing(&mut self, port: Port) -> Option; - fn read_msg(&mut self, port: Port) -> Option<&Payload>; + CouldntReadMsg(PortId), + CouldntCheckFiring(PortId), + PutMsg(PortId, Payload), } ///////////////////// IMPL ///////////////////// @@ -176,22 +135,8 @@ impl From> for Payload { Self(s.into()) } } -impl Debug for Port { +impl Debug for PortId { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "Port({})", self.0) - } -} -impl Port { - pub fn from_raw(raw: u32) -> Self { - Self(raw) - } - pub fn to_raw(self) -> u32 { - self.0 - } - pub fn to_token(self) -> mio::Token { - mio::Token(self.0.try_into().unwrap()) - } - pub fn from_token(t: mio::Token) -> Self { - Self(t.0.try_into().unwrap()) + write!(f, "PortId({},{})", self.controller_id, self.port_index) } } diff --git a/src/lib.rs b/src/lib.rs index fd637d24a3487022a4d9b261671d5c13ffe1cc71..755f30dc3d0c8e85f53e394469426a8fcb150bf1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,10 +5,12 @@ mod common; mod protocol; mod runtime; -#[cfg(test)] -mod test; +// #[cfg(test)] +// mod test; -pub use runtime::{errors, Connector, PortBinding}; +pub use common::Polarity; +pub use protocol::ProtocolDescription; +pub use runtime::{Connector, EndpointSetup, StringLogger}; -#[cfg(feature = "ffi")] -pub use runtime::ffi; +// #[cfg(feature = "ffi")] +// pub use runtime::ffi; diff --git a/src/macros.rs b/src/macros.rs index 655828c1a752270d7cb6bfc35242baea40e7e6df..bf66dde8d0c64dfd5e7024a99ae3f82e939afe35 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -1,34 +1,33 @@ macro_rules! log { ($logger:expr, $($arg:tt)*) => {{ - use std::fmt::Write; - writeln!($logger, $($arg)*).unwrap(); + let _ = write!($logger.line_writer(), $($arg)*).unwrap(); }}; } -macro_rules! assert_let { - ($pat:pat = $expr:expr => $work:expr) => { - if let $pat = $expr { - $work - } else { - panic!("assert_let failed"); - } - }; -} +// macro_rules! assert_let { +// ($pat:pat = $expr:expr => $work:expr) => { +// if let $pat = $expr { +// $work +// } else { +// panic!("assert_let failed"); +// } +// }; +// } -#[test] -fn assert_let() { - let x = Some(5); - let z = assert_let![Some(y) = x => { - println!("{:?}", y); - 3 - }]; - println!("{:?}", z); -} +// #[test] +// fn assert_let() { +// let x = Some(5); +// let z = assert_let![Some(y) = x => { +// println!("{:?}", y); +// 3 +// }]; +// println!("{:?}", z); +// } -#[test] -#[should_panic] -fn must_let_panic() { - let x: Option = None; - assert_let![Some(y) = x => { - println!("{:?}", y); - }]; -} +// #[test] +// #[should_panic] +// fn must_let_panic() { +// let x: Option = None; +// assert_let![Some(y) = x => { +// println!("{:?}", y); +// }]; +// } diff --git a/src/protocol/arena.rs b/src/protocol/arena.rs index e55efe6a0da9a3f0b48378dedb3f230847a216a1..1e4b64f9ae2b6d7e47f4e24d33b1a6a1b3b7d905 100644 --- a/src/protocol/arena.rs +++ b/src/protocol/arena.rs @@ -1,11 +1,23 @@ +use crate::common::*; use core::hash::Hash; use core::marker::PhantomData; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(serde::Serialize, serde::Deserialize)] pub struct Id { index: u32, _phantom: PhantomData, } +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct Arena { + store: Vec, +} +////////////////////////////////// + +impl Debug for Id { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("Id").field("index", &self.index).finish() + } +} impl Clone for Id { fn clone(&self) -> Self { *self @@ -23,11 +35,6 @@ impl Hash for Id { self.index.hash(h); } } - -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct Arena { - store: Vec, -} impl Arena { pub fn new() -> Self { Self { store: vec![] } diff --git a/src/protocol/eval.rs b/src/protocol/eval.rs index 6abae219507a6695ef85228d29e85aa628fca629..9fb2531c1176c94b65cc4ce1f89d27cf966d585f 100644 --- a/src/protocol/eval.rs +++ b/src/protocol/eval.rs @@ -886,7 +886,7 @@ impl Display for Value { } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct InputValue(pub Port); +pub struct InputValue(pub PortId); impl Display for InputValue { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { @@ -911,7 +911,7 @@ impl ValueImpl for InputValue { } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct OutputValue(pub Port); +pub struct OutputValue(pub PortId); impl Display for OutputValue { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { @@ -1803,38 +1803,38 @@ impl Prompt { } } -#[cfg(test)] -mod tests { - extern crate test_generator; - - use std::fs::File; - use std::io::Read; - use std::path::Path; - use test_generator::test_resources; - - use super::*; - - #[test_resources("testdata/eval/positive/*.pdl")] - fn batch1(resource: &str) { - let path = Path::new(resource); - let expect = path.with_extension("txt"); - let mut heap = Heap::new(); - let mut source = InputSource::from_file(&path).unwrap(); - let mut parser = Parser::new(&mut source); - let pd = parser.parse(&mut heap).unwrap(); - let def = heap[pd].get_definition_ident(&heap, b"test").unwrap(); - let fun = heap[def].as_function().this; - let args = Vec::new(); - let result = Prompt::compute_function(&heap, fun, &args).unwrap(); - let valstr: String = format!("{}", result); - println!("{}", valstr); - - let mut cev: Vec = Vec::new(); - let mut f = File::open(expect).unwrap(); - f.read_to_end(&mut cev).unwrap(); - let lavstr = String::from_utf8_lossy(&cev); - println!("{}", lavstr); - - assert_eq!(valstr, lavstr); - } -} +// #[cfg(test)] +// mod tests { +// extern crate test_generator; + +// use std::fs::File; +// use std::io::Read; +// use std::path::Path; +// use test_generator::test_resources; + +// use super::*; + +// #[test_resources("testdata/eval/positive/*.pdl")] +// fn batch1(resource: &str) { +// let path = Path::new(resource); +// let expect = path.with_extension("txt"); +// let mut heap = Heap::new(); +// let mut source = InputSource::from_file(&path).unwrap(); +// let mut parser = Parser::new(&mut source); +// let pd = parser.parse(&mut heap).unwrap(); +// let def = heap[pd].get_definition_ident(&heap, b"test").unwrap(); +// let fun = heap[def].as_function().this; +// let args = Vec::new(); +// let result = Prompt::compute_function(&heap, fun, &args).unwrap(); +// let valstr: String = format!("{}", result); +// println!("{}", valstr); + +// let mut cev: Vec = Vec::new(); +// let mut f = File::open(expect).unwrap(); +// f.read_to_end(&mut cev).unwrap(); +// let lavstr = String::from_utf8_lossy(&cev); +// println!("{}", lavstr); + +// assert_eq!(valstr, lavstr); +// } +// } diff --git a/src/protocol/inputsource.rs b/src/protocol/inputsource.rs index 8f1801d8b74e282ca0e8db7d5fde44b3462d9d56..654371b0d3f8704d8763ccf41fa2289d7cf2c214 100644 --- a/src/protocol/inputsource.rs +++ b/src/protocol/inputsource.rs @@ -311,96 +311,96 @@ impl fmt::Display for DisplayEvalError<'_> { } } -#[cfg(test)] -mod tests { - use super::*; +// #[cfg(test)] +// mod tests { +// use super::*; - #[test] - fn test_from_string() { - let mut is = InputSource::from_string("#version 100\n").unwrap(); - assert!(is.input.len() == 13); - assert!(is.line == 1); - assert!(is.column == 1); - assert!(is.offset == 0); - let ps = is.pos(); - assert!(ps.line == 1); - assert!(ps.column == 1); - assert!(ps.offset == 0); - assert!(is.next() == Some(b'#')); - is.consume(); - assert!(is.next() == Some(b'v')); - assert!(is.lookahead(1) == Some(b'e')); - is.consume(); - assert!(is.next() == Some(b'e')); - is.consume(); - assert!(is.next() == Some(b'r')); - is.consume(); - assert!(is.next() == Some(b's')); - is.consume(); - assert!(is.next() == Some(b'i')); - is.consume(); - { - let ps = is.pos(); - assert_eq!(b"#version 100", ps.context(&is)); - let er = is.error("hello world!"); - let mut vec: Vec = Vec::new(); - er.write(&is, &mut vec).unwrap(); - assert_eq!( - "Parse error at 1:7: hello world!\n#version 100\n ^\n", - String::from_utf8_lossy(&vec) - ); - } - assert!(is.next() == Some(b'o')); - is.consume(); - assert!(is.next() == Some(b'n')); - is.consume(); - assert!(is.input.len() == 13); - assert!(is.line == 1); - assert!(is.column == 9); - assert!(is.offset == 8); - assert!(is.next() == Some(b' ')); - is.consume(); - assert!(is.next() == Some(b'1')); - is.consume(); - assert!(is.next() == Some(b'0')); - is.consume(); - assert!(is.next() == Some(b'0')); - is.consume(); - assert!(is.input.len() == 13); - assert!(is.line == 1); - assert!(is.column == 13); - assert!(is.offset == 12); - assert!(is.next() == Some(b'\n')); - is.consume(); - assert!(is.input.len() == 13); - assert!(is.line == 2); - assert!(is.column == 1); - assert!(is.offset == 13); - { - let ps = is.pos(); - assert_eq!(b"", ps.context(&is)); - } - assert!(is.next() == None); - is.consume(); - assert!(is.next() == None); - } +// #[test] +// fn test_from_string() { +// let mut is = InputSource::from_string("#version 100\n").unwrap(); +// assert!(is.input.len() == 13); +// assert!(is.line == 1); +// assert!(is.column == 1); +// assert!(is.offset == 0); +// let ps = is.pos(); +// assert!(ps.line == 1); +// assert!(ps.column == 1); +// assert!(ps.offset == 0); +// assert!(is.next() == Some(b'#')); +// is.consume(); +// assert!(is.next() == Some(b'v')); +// assert!(is.lookahead(1) == Some(b'e')); +// is.consume(); +// assert!(is.next() == Some(b'e')); +// is.consume(); +// assert!(is.next() == Some(b'r')); +// is.consume(); +// assert!(is.next() == Some(b's')); +// is.consume(); +// assert!(is.next() == Some(b'i')); +// is.consume(); +// { +// let ps = is.pos(); +// assert_eq!(b"#version 100", ps.context(&is)); +// let er = is.error("hello world!"); +// let mut vec: Vec = Vec::new(); +// er.write(&is, &mut vec).unwrap(); +// assert_eq!( +// "Parse error at 1:7: hello world!\n#version 100\n ^\n", +// String::from_utf8_lossy(&vec) +// ); +// } +// assert!(is.next() == Some(b'o')); +// is.consume(); +// assert!(is.next() == Some(b'n')); +// is.consume(); +// assert!(is.input.len() == 13); +// assert!(is.line == 1); +// assert!(is.column == 9); +// assert!(is.offset == 8); +// assert!(is.next() == Some(b' ')); +// is.consume(); +// assert!(is.next() == Some(b'1')); +// is.consume(); +// assert!(is.next() == Some(b'0')); +// is.consume(); +// assert!(is.next() == Some(b'0')); +// is.consume(); +// assert!(is.input.len() == 13); +// assert!(is.line == 1); +// assert!(is.column == 13); +// assert!(is.offset == 12); +// assert!(is.next() == Some(b'\n')); +// is.consume(); +// assert!(is.input.len() == 13); +// assert!(is.line == 2); +// assert!(is.column == 1); +// assert!(is.offset == 13); +// { +// let ps = is.pos(); +// assert_eq!(b"", ps.context(&is)); +// } +// assert!(is.next() == None); +// is.consume(); +// assert!(is.next() == None); +// } - #[test] - fn test_split() { - let mut is = InputSource::from_string("#version 100\n").unwrap(); - let backup = is.clone(); - assert!(is.next() == Some(b'#')); - is.consume(); - assert!(is.next() == Some(b'v')); - is.consume(); - assert!(is.next() == Some(b'e')); - is.consume(); - is = backup; - assert!(is.next() == Some(b'#')); - is.consume(); - assert!(is.next() == Some(b'v')); - is.consume(); - assert!(is.next() == Some(b'e')); - is.consume(); - } -} +// #[test] +// fn test_split() { +// let mut is = InputSource::from_string("#version 100\n").unwrap(); +// let backup = is.clone(); +// assert!(is.next() == Some(b'#')); +// is.consume(); +// assert!(is.next() == Some(b'v')); +// is.consume(); +// assert!(is.next() == Some(b'e')); +// is.consume(); +// is = backup; +// assert!(is.next() == Some(b'#')); +// is.consume(); +// assert!(is.next() == Some(b'v')); +// is.consume(); +// assert!(is.next() == Some(b'e')); +// is.consume(); +// } +// } diff --git a/src/protocol/lexer.rs b/src/protocol/lexer.rs index 9928f47823ed69ca7f575313db73947ea22cee32..6718ca6da65deb1a82047a005ba8b4da7fe4406c 100644 --- a/src/protocol/lexer.rs +++ b/src/protocol/lexer.rs @@ -1656,124 +1656,124 @@ impl Lexer<'_> { } } -#[cfg(test)] -mod tests { - use crate::protocol::ast::Expression::*; - use crate::protocol::{ast, lexer::*}; +// #[cfg(test)] +// mod tests { +// use crate::protocol::ast::Expression::*; +// use crate::protocol::{ast, lexer::*}; - #[test] - fn test_lowercase() { - assert_eq!(lowercase(b'a'), b'a'); - assert_eq!(lowercase(b'A'), b'a'); - assert_eq!(lowercase(b'z'), b'z'); - assert_eq!(lowercase(b'Z'), b'z'); - } +// #[test] +// fn test_lowercase() { +// assert_eq!(lowercase(b'a'), b'a'); +// assert_eq!(lowercase(b'A'), b'a'); +// assert_eq!(lowercase(b'z'), b'z'); +// assert_eq!(lowercase(b'Z'), b'z'); +// } - #[test] - fn test_basic_expression() { - let mut h = Heap::new(); - let mut is = InputSource::from_string("a+b;").unwrap(); - let mut lex = Lexer::new(&mut is); - match lex.consume_expression(&mut h) { - Ok(expr) => { - println!("{:?}", expr); - if let Binary(bin) = &h[expr] { - if let Variable(left) = &h[bin.left] { - if let Variable(right) = &h[bin.right] { - assert_eq!("a", format!("{}", h[left.identifier])); - assert_eq!("b", format!("{}", h[right.identifier])); - assert_eq!(Some(b';'), is.next()); - return; - } - } - } - assert!(false); - } - Err(err) => { - err.print(&is); - assert!(false); - } - } - } +// #[test] +// fn test_basic_expression() { +// let mut h = Heap::new(); +// let mut is = InputSource::from_string("a+b;").unwrap(); +// let mut lex = Lexer::new(&mut is); +// match lex.consume_expression(&mut h) { +// Ok(expr) => { +// println!("{:?}", expr); +// if let Binary(bin) = &h[expr] { +// if let Variable(left) = &h[bin.left] { +// if let Variable(right) = &h[bin.right] { +// assert_eq!("a", format!("{}", h[left.identifier])); +// assert_eq!("b", format!("{}", h[right.identifier])); +// assert_eq!(Some(b';'), is.next()); +// return; +// } +// } +// } +// assert!(false); +// } +// Err(err) => { +// err.print(&is); +// assert!(false); +// } +// } +// } - #[test] - fn test_paren_expression() { - let mut h = Heap::new(); - let mut is = InputSource::from_string("(true)").unwrap(); - let mut lex = Lexer::new(&mut is); - match lex.consume_paren_expression(&mut h) { - Ok(expr) => { - println!("{:#?}", expr); - if let Constant(con) = &h[expr] { - if let ast::Constant::True = con.value { - return; - } - } - assert!(false); - } - Err(err) => { - err.print(&is); - assert!(false); - } - } - } +// #[test] +// fn test_paren_expression() { +// let mut h = Heap::new(); +// let mut is = InputSource::from_string("(true)").unwrap(); +// let mut lex = Lexer::new(&mut is); +// match lex.consume_paren_expression(&mut h) { +// Ok(expr) => { +// println!("{:#?}", expr); +// if let Constant(con) = &h[expr] { +// if let ast::Constant::True = con.value { +// return; +// } +// } +// assert!(false); +// } +// Err(err) => { +// err.print(&is); +// assert!(false); +// } +// } +// } - #[test] - fn test_expression() { - let mut h = Heap::new(); - let mut is = InputSource::from_string("(x(1+5,get(y))-w[5])+z++\n").unwrap(); - let mut lex = Lexer::new(&mut is); - match lex.consume_expression(&mut h) { - Ok(expr) => { - println!("{:#?}", expr); - } - Err(err) => { - err.print(&is); - assert!(false); - } - } - } +// #[test] +// fn test_expression() { +// let mut h = Heap::new(); +// let mut is = InputSource::from_string("(x(1+5,get(y))-w[5])+z++\n").unwrap(); +// let mut lex = Lexer::new(&mut is); +// match lex.consume_expression(&mut h) { +// Ok(expr) => { +// println!("{:#?}", expr); +// } +// Err(err) => { +// err.print(&is); +// assert!(false); +// } +// } +// } - #[test] - fn test_basic_statement() { - let mut h = Heap::new(); - let mut is = InputSource::from_string("while (true) { skip; }").unwrap(); - let mut lex = Lexer::new(&mut is); - match lex.consume_statement(&mut h) { - Ok(stmt) => { - println!("{:#?}", stmt); - if let Statement::While(w) = &h[stmt] { - if let Expression::Constant(_) = h[w.test] { - if let Statement::Block(_) = h[w.body] { - return; - } - } - } - assert!(false); - } - Err(err) => { - err.print(&is); - assert!(false); - } - } - } +// #[test] +// fn test_basic_statement() { +// let mut h = Heap::new(); +// let mut is = InputSource::from_string("while (true) { skip; }").unwrap(); +// let mut lex = Lexer::new(&mut is); +// match lex.consume_statement(&mut h) { +// Ok(stmt) => { +// println!("{:#?}", stmt); +// if let Statement::While(w) = &h[stmt] { +// if let Expression::Constant(_) = h[w.test] { +// if let Statement::Block(_) = h[w.body] { +// return; +// } +// } +// } +// assert!(false); +// } +// Err(err) => { +// err.print(&is); +// assert!(false); +// } +// } +// } - #[test] - fn test_statement() { - let mut h = Heap::new(); - let mut is = InputSource::from_string( - "label: while (true) { if (x++ > y[0]) break label; else continue; }\n", - ) - .unwrap(); - let mut lex = Lexer::new(&mut is); - match lex.consume_statement(&mut h) { - Ok(stmt) => { - println!("{:#?}", stmt); - } - Err(err) => { - err.print(&is); - assert!(false); - } - } - } -} +// #[test] +// fn test_statement() { +// let mut h = Heap::new(); +// let mut is = InputSource::from_string( +// "label: while (true) { if (x++ > y[0]) break label; else continue; }\n", +// ) +// .unwrap(); +// let mut lex = Lexer::new(&mut is); +// match lex.consume_statement(&mut h) { +// Ok(stmt) => { +// println!("{:#?}", stmt); +// } +// Err(err) => { +// err.print(&is); +// assert!(false); +// } +// } +// } +// } diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 053695195dcba4e66bf3115f31c25a8118ba0572..9bbc5c2e0c4750de5971f78716eeeee48e251c52 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -13,28 +13,35 @@ use crate::protocol::inputsource::*; use crate::protocol::parser::*; #[derive(serde::Serialize, serde::Deserialize)] -pub struct ProtocolDescriptionImpl { +pub struct ProtocolDescription { heap: Heap, source: InputSource, root: RootId, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ComponentState { + prompt: Prompt, +} +pub enum EvalContext<'a> { + Nonsync(&'a mut NonsyncContext<'a>), + Sync(&'a mut SyncContext<'a>), + None, +} +////////////////////////////////////////////// -impl std::fmt::Debug for ProtocolDescriptionImpl { +impl std::fmt::Debug for ProtocolDescription { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "Protocol") } } - -impl ProtocolDescription for ProtocolDescriptionImpl { - type S = ComponentStateImpl; - - fn parse(buffer: &[u8]) -> Result { +impl ProtocolDescription { + pub fn parse(buffer: &[u8]) -> Result { let mut heap = Heap::new(); let mut source = InputSource::from_buffer(buffer).unwrap(); let mut parser = Parser::new(&mut source); match parser.parse(&mut heap) { Ok(root) => { - return Ok(ProtocolDescriptionImpl { heap, source, root }); + return Ok(ProtocolDescription { heap, source, root }); } Err(err) => { let mut vec: Vec = Vec::new(); @@ -43,27 +50,31 @@ impl ProtocolDescription for ProtocolDescriptionImpl { } } } - fn component_polarities(&self, identifier: &[u8]) -> Result, MainComponentErr> { + pub fn component_polarities( + &self, + identifier: &[u8], + ) -> Result, AddComponentError> { + use AddComponentError::*; let h = &self.heap; let root = &h[self.root]; let def = root.get_definition_ident(h, identifier); if def.is_none() { - return Err(MainComponentErr::NoSuchComponent); + return Err(NoSuchComponent); } let def = &h[def.unwrap()]; if !def.is_component() { - return Err(MainComponentErr::NoSuchComponent); + return Err(NoSuchComponent); } for ¶m in def.parameters().iter() { let param = &h[param]; let type_annot = &h[param.type_annotation]; if type_annot.the_type.array { - return Err(MainComponentErr::NonPortTypeParameters); + return Err(NonPortTypeParameters); } match type_annot.the_type.primitive { PrimitiveType::Input | PrimitiveType::Output => continue, _ => { - return Err(MainComponentErr::NonPortTypeParameters); + return Err(NonPortTypeParameters); } } } @@ -83,7 +94,7 @@ impl ProtocolDescription for ProtocolDescriptionImpl { Ok(result) } // expects port polarities to be correct - fn new_main_component(&self, identifier: &[u8], ports: &[Port]) -> ComponentStateImpl { + pub fn new_main_component(&self, identifier: &[u8], ports: &[PortId]) -> ComponentState { let mut args = Vec::new(); for (&x, y) in ports.iter().zip(self.component_polarities(identifier).unwrap()) { match y { @@ -94,23 +105,16 @@ impl ProtocolDescription for ProtocolDescriptionImpl { let h = &self.heap; let root = &h[self.root]; let def = root.get_definition_ident(h, identifier).unwrap(); - ComponentStateImpl { prompt: Prompt::new(h, def, &args) } + ComponentState { prompt: Prompt::new(h, def, &args) } } } - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct ComponentStateImpl { - prompt: Prompt, -} -impl ComponentState for ComponentStateImpl { - type D = ProtocolDescriptionImpl; - - fn pre_sync_run>( - &mut self, - context: &mut C, - pd: &ProtocolDescriptionImpl, - ) -> MonoBlocker { - let mut context = EvalContext::Mono(context); +impl ComponentState { + pub fn nonsync_run<'a: 'b, 'b>( + &'a mut self, + context: &'b mut NonsyncContext<'b>, + pd: &'a ProtocolDescription, + ) -> NonsyncBlocker { + let mut context = EvalContext::Nonsync(context); loop { let result = self.prompt.step(&pd.heap, &mut context); match result { @@ -118,16 +122,16 @@ impl ComponentState for ComponentStateImpl { Ok(_) => unreachable!(), Err(cont) => match cont { EvalContinuation::Stepping => continue, - EvalContinuation::Inconsistent => return MonoBlocker::Inconsistent, - EvalContinuation::Terminal => return MonoBlocker::ComponentExit, - EvalContinuation::SyncBlockStart => return MonoBlocker::SyncBlockStart, + EvalContinuation::Inconsistent => return NonsyncBlocker::Inconsistent, + EvalContinuation::Terminal => return NonsyncBlocker::ComponentExit, + EvalContinuation::SyncBlockStart => return NonsyncBlocker::SyncBlockStart, // Not possible to end sync block if never entered one EvalContinuation::SyncBlockEnd => unreachable!(), EvalContinuation::NewComponent(decl, args) => { // Look up definition (TODO for now, assume it is a definition) let h = &pd.heap; let def = h[decl].as_defined().definition; - let init_state = ComponentStateImpl { prompt: Prompt::new(h, def, &args) }; + let init_state = ComponentState { prompt: Prompt::new(h, def, &args) }; context.new_component(&args, init_state); // Continue stepping continue; @@ -141,12 +145,12 @@ impl ComponentState for ComponentStateImpl { } } - fn sync_run>( - &mut self, - context: &mut C, - pd: &ProtocolDescriptionImpl, - ) -> PolyBlocker { - let mut context = EvalContext::Poly(context); + pub fn sync_run<'a: 'b, 'b>( + &'a mut self, + context: &'b mut SyncContext<'b>, + pd: &'a ProtocolDescription, + ) -> SyncBlocker { + let mut context = EvalContext::Sync(context); loop { let result = self.prompt.step(&pd.heap, &mut context); match result { @@ -154,29 +158,29 @@ impl ComponentState for ComponentStateImpl { Ok(_) => unreachable!(), Err(cont) => match cont { EvalContinuation::Stepping => continue, - EvalContinuation::Inconsistent => return PolyBlocker::Inconsistent, + EvalContinuation::Inconsistent => return SyncBlocker::Inconsistent, // First need to exit synchronous block before definition may end EvalContinuation::Terminal => unreachable!(), // No nested synchronous blocks EvalContinuation::SyncBlockStart => unreachable!(), - EvalContinuation::SyncBlockEnd => return PolyBlocker::SyncBlockEnd, + EvalContinuation::SyncBlockEnd => return SyncBlocker::SyncBlockEnd, // Not possible to create component in sync block EvalContinuation::NewComponent(_, _) => unreachable!(), EvalContinuation::BlockFires(port) => match port { Value::Output(OutputValue(port)) => { - return PolyBlocker::CouldntCheckFiring(port); + return SyncBlocker::CouldntCheckFiring(port); } Value::Input(InputValue(port)) => { - return PolyBlocker::CouldntCheckFiring(port); + return SyncBlocker::CouldntCheckFiring(port); } _ => unreachable!(), }, EvalContinuation::BlockGet(port) => match port { Value::Output(OutputValue(port)) => { - return PolyBlocker::CouldntReadMsg(port); + return SyncBlocker::CouldntReadMsg(port); } Value::Input(InputValue(port)) => { - return PolyBlocker::CouldntReadMsg(port); + return SyncBlocker::CouldntReadMsg(port); } _ => unreachable!(), }, @@ -195,7 +199,7 @@ impl ComponentState for ComponentStateImpl { match message { Value::Message(MessageValue(None)) => { // Putting a null message is inconsistent - return PolyBlocker::Inconsistent; + return SyncBlocker::Inconsistent; } Value::Message(MessageValue(Some(buffer))) => { // Create a copy of the payload @@ -203,31 +207,25 @@ impl ComponentState for ComponentStateImpl { } _ => unreachable!(), } - return PolyBlocker::PutMsg(value, payload); + return SyncBlocker::PutMsg(value, payload); } }, } } } } - -pub enum EvalContext<'a> { - Mono(&'a mut dyn MonoContext), - Poly(&'a mut dyn PolyContext), - None, -} impl EvalContext<'_> { // fn random(&mut self) -> LongValue { // match self { // EvalContext::None => unreachable!(), - // EvalContext::Mono(_context) => todo!(), - // EvalContext::Poly(_) => unreachable!(), + // EvalContext::Nonsync(_context) => todo!(), + // EvalContext::Sync(_) => unreachable!(), // } // } - fn new_component(&mut self, args: &[Value], init_state: ComponentStateImpl) -> () { + fn new_component(&mut self, args: &[Value], init_state: ComponentState) -> () { match self { EvalContext::None => unreachable!(), - EvalContext::Mono(context) => { + EvalContext::Nonsync(context) => { let mut moved_ports = HashSet::new(); for arg in args.iter() { match arg { @@ -242,26 +240,26 @@ impl EvalContext<'_> { } context.new_component(moved_ports, init_state) } - EvalContext::Poly(_) => unreachable!(), + EvalContext::Sync(_) => unreachable!(), } } fn new_channel(&mut self) -> [Value; 2] { match self { EvalContext::None => unreachable!(), - EvalContext::Mono(context) => { + EvalContext::Nonsync(context) => { let [from, to] = context.new_channel(); let from = Value::Output(OutputValue(from)); let to = Value::Input(InputValue(to)); return [from, to]; } - EvalContext::Poly(_) => unreachable!(), + EvalContext::Sync(_) => unreachable!(), } } fn fires(&mut self, port: Value) -> Option { match self { EvalContext::None => unreachable!(), - EvalContext::Mono(_) => unreachable!(), - EvalContext::Poly(context) => match port { + EvalContext::Nonsync(_) => unreachable!(), + EvalContext::Sync(context) => match port { Value::Output(OutputValue(port)) => context.is_firing(port).map(Value::from), Value::Input(InputValue(port)) => context.is_firing(port).map(Value::from), _ => unreachable!(), @@ -271,8 +269,8 @@ impl EvalContext<'_> { fn get(&mut self, port: Value) -> Option { match self { EvalContext::None => unreachable!(), - EvalContext::Mono(_) => unreachable!(), - EvalContext::Poly(context) => match port { + EvalContext::Nonsync(_) => unreachable!(), + EvalContext::Sync(context) => match port { Value::Output(OutputValue(port)) => { context.read_msg(port).map(Value::receive_message) } diff --git a/src/protocol/parser.rs b/src/protocol/parser.rs index ab1611acf6699235c4208c300f26d606283ddeb7..dd1bae03bcf1a1f6e8a6aac49c694c4bba94ab04 100644 --- a/src/protocol/parser.rs +++ b/src/protocol/parser.rs @@ -1575,7 +1575,9 @@ struct IndexableExpressions { } impl IndexableExpressions { - fn new() -> Self { IndexableExpressions { indexable: false } } + fn new() -> Self { + IndexableExpressions { indexable: false } + } fn error(&self, position: InputPosition) -> VisitorResult { Err(ParseError::new(position, "Unindexable expression")) } @@ -1818,66 +1820,66 @@ impl<'a> Parser<'a> { } } -#[cfg(test)] -mod tests { - extern crate test_generator; - - use std::fs::File; - use std::io::Read; - use std::path::Path; - - use test_generator::test_resources; - - use super::*; - - #[test_resources("testdata/parser/positive/*.pdl")] - fn batch1(resource: &str) { - let path = Path::new(resource); - let mut heap = Heap::new(); - let mut source = InputSource::from_file(&path).unwrap(); - let mut parser = Parser::new(&mut source); - match parser.parse(&mut heap) { - Ok(_) => {} - Err(err) => { - println!("{}", err.display(&source)); - println!("{:?}", err); - assert!(false); - } - } - } - - #[test_resources("testdata/parser/negative/*.pdl")] - fn batch2(resource: &str) { - let path = Path::new(resource); - let expect = path.with_extension("txt"); - let mut heap = Heap::new(); - let mut source = InputSource::from_file(&path).unwrap(); - let mut parser = Parser::new(&mut source); - match parser.parse(&mut heap) { - Ok(pd) => { - println!("{:?}", heap[pd]); - println!("Expected parse error:"); - - let mut cev: Vec = Vec::new(); - let mut f = File::open(expect).unwrap(); - f.read_to_end(&mut cev).unwrap(); - println!("{}", String::from_utf8_lossy(&cev)); - assert!(false); - } - Err(err) => { - println!("{:?}", err); - - let mut vec: Vec = Vec::new(); - err.write(&source, &mut vec).unwrap(); - println!("{}", String::from_utf8_lossy(&vec)); - - let mut cev: Vec = Vec::new(); - let mut f = File::open(expect).unwrap(); - f.read_to_end(&mut cev).unwrap(); - println!("{}", String::from_utf8_lossy(&cev)); - - assert_eq!(vec, cev); - } - } - } -} +// #[cfg(test)] +// mod tests { +// extern crate test_generator; + +// use std::fs::File; +// use std::io::Read; +// use std::path::Path; + +// use test_generator::test_resources; + +// use super::*; + +// #[test_resources("testdata/parser/positive/*.pdl")] +// fn batch1(resource: &str) { +// let path = Path::new(resource); +// let mut heap = Heap::new(); +// let mut source = InputSource::from_file(&path).unwrap(); +// let mut parser = Parser::new(&mut source); +// match parser.parse(&mut heap) { +// Ok(_) => {} +// Err(err) => { +// println!("{}", err.display(&source)); +// println!("{:?}", err); +// assert!(false); +// } +// } +// } + +// #[test_resources("testdata/parser/negative/*.pdl")] +// fn batch2(resource: &str) { +// let path = Path::new(resource); +// let expect = path.with_extension("txt"); +// let mut heap = Heap::new(); +// let mut source = InputSource::from_file(&path).unwrap(); +// let mut parser = Parser::new(&mut source); +// match parser.parse(&mut heap) { +// Ok(pd) => { +// println!("{:?}", heap[pd]); +// println!("Expected parse error:"); + +// let mut cev: Vec = Vec::new(); +// let mut f = File::open(expect).unwrap(); +// f.read_to_end(&mut cev).unwrap(); +// println!("{}", String::from_utf8_lossy(&cev)); +// assert!(false); +// } +// Err(err) => { +// println!("{:?}", err); + +// let mut vec: Vec = Vec::new(); +// err.write(&source, &mut vec).unwrap(); +// println!("{}", String::from_utf8_lossy(&vec)); + +// let mut cev: Vec = Vec::new(); +// let mut f = File::open(expect).unwrap(); +// f.read_to_end(&mut cev).unwrap(); +// println!("{}", String::from_utf8_lossy(&cev)); + +// assert_eq!(vec, cev); +// } +// } +// } +// } diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index 0f8cd88f76b65de172c197a066c6308719cd6230..c86044f404990f03c55c5a2ed2b9f47aa472c1ee 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -3,37 +3,37 @@ use crate::runtime::{endpoint::*, *}; #[derive(Debug, Clone)] pub(crate) struct MonoN { - pub ports: HashSet, - pub result: Option<(usize, HashMap)>, + pub ports: HashSet, + pub result: Option<(usize, HashMap)>, } #[derive(Debug)] pub(crate) struct PolyN { - pub ports: HashSet, + pub ports: HashSet, pub branches: HashMap, } #[derive(Debug, Clone)] pub(crate) struct BranchN { - pub to_get: HashSet, - pub gotten: HashMap, + pub to_get: HashSet, + pub gotten: HashMap, pub sync_batch_index: usize, } #[derive(Debug, Clone)] pub struct MonoP { pub state: ProtocolS, - pub ports: HashSet, + pub ports: HashSet, } #[derive(Debug)] pub(crate) struct PolyP { pub incomplete: HashMap, pub complete: HashMap, - pub ports: HashSet, + pub ports: HashSet, } #[derive(Debug, Clone)] pub(crate) struct BranchP { - pub blocking_on: Option, - pub outbox: HashMap, - pub inbox: HashMap, + pub blocking_on: Option, + pub outbox: HashMap, + pub inbox: HashMap, pub state: ProtocolS, } @@ -211,7 +211,7 @@ impl PolyP { &mut self, m_ctx: PolyPContext, protocol_description: &ProtocolD, - port: Port, + port: PortId, payload_predicate: Predicate, payload: Payload, ) -> Result { @@ -366,7 +366,7 @@ impl PolyP { impl PolyN { pub fn sync_recv( &mut self, - port: Port, + port: PortId, logger: &mut String, payload: Payload, payload_predicate: Predicate, diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 8e364b07e78c33ae92abdbfb7082aa8e6d5136bf..e619b0712bbc97002d4b7cb762fd5e264fafdb49 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -201,7 +201,7 @@ impl Controller { // this is needed during the event loop to determine which actor // should receive the incoming message. // TODO: store and update this mapping rather than rebuilding it each round. - let port_to_holder: HashMap = { + let port_to_holder: HashMap = { use PolyId::*; let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); let p = self @@ -566,7 +566,7 @@ impl From for SyncErr { impl MonoContext for MonoPContext<'_> { type D = ProtocolD; type S = ProtocolS; - fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { + fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { log!( &mut self.inner.logger, "!! MonoContext callback to new_component with ports {:?}!", @@ -579,7 +579,7 @@ impl MonoContext for MonoPContext<'_> { panic!("MachineP attempting to move alien port!"); } } - fn new_channel(&mut self) -> [Port; 2] { + fn new_channel(&mut self) -> [PortId; 2] { let [a, b] = Endpoint::new_memory_pair(); let channel_id = self.inner.channel_id_stream.next(); @@ -588,7 +588,7 @@ impl MonoContext for MonoPContext<'_> { EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint }; let port = self.inner.endpoint_exts.alloc(endpoint_ext); let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint; - let token = Port::to_token(port); + let token = PortId::to_token(port); self.inner .messenger_state .poll @@ -713,7 +713,7 @@ impl SolutionStorage { impl PolyContext for BranchPContext<'_, '_> { type D = ProtocolD; - fn is_firing(&mut self, port: Port) -> Option { + fn is_firing(&mut self, port: PortId) -> Option { assert!(self.ports.contains(&port)); let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; let val = self.predicate.query(channel_id); @@ -725,7 +725,7 @@ impl PolyContext for BranchPContext<'_, '_> { ); val } - fn read_msg(&mut self, port: Port) -> Option<&Payload> { + fn read_msg(&mut self, port: PortId) -> Option<&Payload> { assert!(self.ports.contains(&port)); let val = self.inbox.get(&port); log!( diff --git a/src/runtime/errors.rs b/src/runtime/errors.rs index b34d96cfe33483c620428adff9c6840b81129f98..13e0b13fcdb65e7fc63feecce192c378817c2714 100644 --- a/src/runtime/errors.rs +++ b/src/runtime/errors.rs @@ -79,7 +79,7 @@ pub enum EvalErr { #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum MessengerRecvErr { PollingFailed, - EndpointErr(Port, EndpointErr), + EndpointErr(PortId, EndpointErr), } impl From for ConfigErr { fn from(e: MainComponentErr) -> Self { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index e33c900e34e8c07738ec7b82234d58841469ba2f..3999f7a07d4287caa3eaccb111afff0a6a0286e3 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,23 +1,57 @@ -#[cfg(feature = "ffi")] -pub mod ffi; - -mod actors; -pub(crate) mod communication; -pub(crate) mod connector; -pub(crate) mod endpoint; -pub mod errors; -mod serde; -pub(crate) mod setup; +// #[cfg(feature = "ffi")] +// pub mod ffi; + +// mod actors; +// pub(crate) mod communication; +// pub(crate) mod connector; +// pub(crate) mod endpoint; +// pub mod errors; +// mod serde; +mod my_tests; +mod setup2; +// pub(crate) mod setup; // mod v2; -pub(crate) type ProtocolD = crate::protocol::ProtocolDescriptionImpl; -pub(crate) type ProtocolS = crate::protocol::ComponentStateImpl; - use crate::common::*; -use actors::*; -use endpoint::*; -use errors::*; - +// use actors::*; +// use endpoint::*; +// use errors::*; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub(crate) enum Decision { + Failure, + Success(Predicate), +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub(crate) enum Msg { + SetupMsg(SetupMsg), + CommMsg(CommMsg), +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct MyPortInfo { + polarity: Polarity, + port: PortId, +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub(crate) enum SetupMsg { + // sent by the passive endpoint to the active endpoint + // MyPortInfo(MyPortInfo), + LeaderEcho { maybe_leader: ControllerId }, + LeaderAnnounce { leader: ControllerId }, + YouAreMyParent, +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub(crate) struct CommMsg { + pub round_index: usize, + pub contents: CommMsgContents, +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub(crate) enum CommMsgContents { + SendPayload { payload_predicate: Predicate, payload: Payload }, + Elaborate { partial_oracle: Predicate }, // SINKWARD + Failure, // SINKWARD + Announce { decision: Decision }, // SINKAWAYS +} #[derive(Debug, PartialEq)] pub(crate) enum CommonSatResult { FormerNotLatter, @@ -26,543 +60,753 @@ pub(crate) enum CommonSatResult { New(Predicate), Nonexistant, } - -#[derive(Clone, Eq, PartialEq, Hash)] -pub(crate) struct Predicate { - pub assigned: BTreeMap, +pub struct Endpoint { + inbox: Vec, + stream: mio07::net::TcpStream, } - #[derive(Debug, Default)] -struct SyncBatch { - puts: HashMap, - gets: HashSet, +pub struct IntStream { + next: u32, } - #[derive(Debug)] -pub enum Connector { - Unconfigured(Unconfigured), - Configured(Configured), - Connected(Connected), // TODO consider boxing. currently takes up a lot of stack space +pub struct IdManager { + controller_id: ControllerId, + port_suffix_stream: IntStream, } #[derive(Debug)] -pub struct Unconfigured { - pub controller_id: ControllerId, +pub struct ProtoComponent { + state: ComponentState, + ports: HashSet, } #[derive(Debug)] -pub struct Configured { - controller_id: ControllerId, - polarities: Vec, - bindings: HashMap, - protocol_description: Arc, - main_component: Vec, - logger: String, +pub enum InpRoute { + NativeComponent, + ProtoComponent { index: usize }, + Endpoint { index: usize }, } -#[derive(Debug)] -pub struct Connected { - native_interface: Vec<(Port, Polarity)>, - sync_batches: Vec, - // controller is cooperatively scheduled with the native application - // (except for transport layer behind Endpoints, which are managed by the OS) - // control flow is passed to the controller during methods on Connector (primarily, connect and sync). - controller: Controller, +pub trait Logger: Debug { + fn line_writer(&mut self) -> &mut dyn std::fmt::Write; + fn dump_log(&self, w: &mut dyn std::io::Write); } - -#[derive(Debug, Copy, Clone)] -pub enum PortBinding { - Native, - Active(SocketAddr), - Passive(SocketAddr), +#[derive(Debug, Clone)] +pub struct EndpointSetup { + pub polarity: Polarity, + pub sock_addr: SocketAddr, + pub is_active: bool, } - #[derive(Debug)] -struct Arena { - storage: Vec, +pub struct EndpointExt { + endpoint: Endpoint, + inp_for_emerging_msgs: PortId, } - #[derive(Debug)] -struct ReceivedMsg { - recipient: Port, - msg: Msg, +pub struct Neighborhood { + parent: Option, + children: Vec, // ordered, deduplicated } - #[derive(Debug)] -struct MessengerState { - poll: Poll, - events: Events, - delayed: Vec, - undelayed: Vec, - polled_undrained: IndexSet, +pub struct MemInMsg { + inp: PortId, + msg: Payload, } #[derive(Debug)] -struct ChannelIdStream { - controller_id: ControllerId, - next_channel_index: ChannelIndex, +pub struct EndpointPoller { + poll: mio07::Poll, + events: mio07::Events, + undrained_endpoints: HashSet, + delayed_inp_messages: Vec<(PortId, Msg)>, } - #[derive(Debug)] -struct Controller { - protocol_description: Arc, - inner: ControllerInner, - ephemeral: ControllerEphemeral, - unrecoverable_error: Option, // prevents future calls to Sync +pub struct Connector { + logger: Box, + proto_description: Arc, + id_manager: IdManager, + native_ports: HashSet, + proto_components: Vec, + outp_to_inp: HashMap, + inp_to_route: HashMap, + phased: ConnectorPhased, } #[derive(Debug)] -struct ControllerInner { - round_index: usize, - channel_id_stream: ChannelIdStream, - endpoint_exts: Arena, - messenger_state: MessengerState, - mono_n: MonoN, // state at next round start - mono_ps: Vec, // state at next round start - family: ControllerFamily, - logger: String, -} - -/// This structure has its state entirely reset between synchronous rounds -#[derive(Debug, Default)] -struct ControllerEphemeral { - solution_storage: SolutionStorage, - poly_n: Option, - poly_ps: Vec, - mono_ps: Vec, - port_to_holder: HashMap, +pub enum ConnectorPhased { + Setup { + endpoint_setups: Vec<(PortId, EndpointSetup)>, + surplus_sockets: u16, + }, + Communication { + endpoint_poller: EndpointPoller, + endpoint_exts: Vec, + neighborhood: Neighborhood, + mem_inbox: Vec, + }, } - #[derive(Debug)] -struct ControllerFamily { - parent_port: Option, - children_ports: Vec, +pub struct StringLogger(ControllerId, String); +#[derive(Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +pub(crate) struct Predicate { + pub assigned: BTreeMap, } - -#[derive(Debug)] -pub(crate) enum SyncRunResult { - BlockingForRecv, - AllBranchesComplete, - NoBranches, +#[derive(Debug, Default)] +struct SyncBatch { + puts: HashMap, + gets: HashSet, } - -// Used to identify poly actors -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -enum PolyId { - N, - P { index: usize }, +pub struct MonitoredReader { + bytes: usize, + r: R, } - -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub(crate) enum SubtreeId { - PolyN, - PolyP { index: usize }, - ChildController { port: Port }, +pub enum EndpointRecvErr { + MalformedMessage, + BrokenEndpoint, } - -pub(crate) struct MonoPContext<'a> { - inner: &'a mut ControllerInner, - ports: &'a mut HashSet, - mono_ps: &'a mut Vec, +pub struct SyncContext<'a> { + connector: &'a mut Connector, } -pub(crate) struct PolyPContext<'a> { - my_subtree_id: SubtreeId, - inner: &'a mut ControllerInner, - solution_storage: &'a mut SolutionStorage, +pub struct NonsyncContext<'a> { + connector: &'a mut Connector, } -impl PolyPContext<'_> { - #[inline(always)] - fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { - let Self { solution_storage, my_subtree_id, inner } = self; - PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } +//////////////// +impl Debug for Endpoint { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() } } -struct BranchPContext<'m, 'r> { - m_ctx: PolyPContext<'m>, - ports: &'r HashSet, - predicate: &'r Predicate, - inbox: &'r HashMap, -} - -#[derive(Default)] -pub(crate) struct SolutionStorage { - old_local: HashSet, - new_local: HashSet, - // this pair acts as SubtreeId -> HashSet which is friendlier to iteration - subtree_solutions: Vec>, - subtree_id_to_index: HashMap, -} - -trait Messengerlike { - fn get_state_mut(&mut self) -> &mut MessengerState; - fn get_endpoint_mut(&mut self, eport: Port) -> &mut Endpoint; - - fn delay(&mut self, received: ReceivedMsg) { - self.get_state_mut().delayed.push(received); +impl NonsyncContext<'_> { + pub fn new_component(&mut self, moved_ports: HashSet, init_state: ComponentState) { + todo!() } - fn undelay_all(&mut self) { - let MessengerState { delayed, undelayed, .. } = self.get_state_mut(); - undelayed.extend(delayed.drain(..)) + pub fn new_channel(&mut self) -> [PortId; 2] { + todo!() } - - fn send(&mut self, to: Port, msg: Msg) -> Result<(), EndpointErr> { - self.get_endpoint_mut(to).send(msg) - } - - // attempt to receive a message from one of the endpoints before the deadline - fn recv(&mut self, deadline: Instant) -> Result, MessengerRecvErr> { - // try get something buffered - if let Some(x) = self.get_state_mut().undelayed.pop() { - return Ok(Some(x)); - } - - loop { - // polled_undrained may not be empty - while let Some(eport) = self.get_state_mut().polled_undrained.pop() { - if let Some(msg) = self - .get_endpoint_mut(eport) - .recv() - .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? - { - // this endpoint MAY still have messages! check again in future - self.get_state_mut().polled_undrained.insert(eport); - return Ok(Some(ReceivedMsg { recipient: eport, msg })); - } - } - - let state = self.get_state_mut(); - match state.poll_events(deadline) { - Ok(()) => { - for e in state.events.iter() { - state.polled_undrained.insert(Port::from_token(e.token())); - } - } - Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), - Err(PollDeadlineErr::Timeout) => return Ok(None), - } - } +} +impl SyncContext<'_> { + pub fn is_firing(&mut self, port: PortId) -> Option { + todo!() } - fn recv_blocking(&mut self) -> Result { - // try get something buffered - if let Some(x) = self.get_state_mut().undelayed.pop() { - return Ok(x); - } - - loop { - // polled_undrained may not be empty - while let Some(eport) = self.get_state_mut().polled_undrained.pop() { - if let Some(msg) = self - .get_endpoint_mut(eport) - .recv() - .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? - { - // this endpoint MAY still have messages! check again in future - self.get_state_mut().polled_undrained.insert(eport); - return Ok(ReceivedMsg { recipient: eport, msg }); - } - } - - let state = self.get_state_mut(); - - state - .poll - .poll(&mut state.events, None) - .map_err(|_| MessengerRecvErr::PollingFailed)?; - for e in state.events.iter() { - state.polled_undrained.insert(Port::from_token(e.token())); - } - } + pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + todo!() } } - -///////////////////////////////// -impl Debug for SolutionStorage { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.pad("Solutions: [")?; - for (subtree_id, &index) in self.subtree_id_to_index.iter() { - let sols = &self.subtree_solutions[index]; - f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; - } - f.pad("]") +impl From for MonitoredReader { + fn from(r: R) -> Self { + Self { r, bytes: 0 } } } -impl From for SyncErr { - fn from(e: EvalErr) -> SyncErr { - SyncErr::EvalErr(e) +impl MonitoredReader { + pub fn bytes_read(&self) -> usize { + self.bytes } } -impl From for SyncErr { - fn from(e: MessengerRecvErr) -> SyncErr { - SyncErr::MessengerRecvErr(e) +impl Read for MonitoredReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + let n = self.r.read(buf)?; + self.bytes += n; + Ok(n) } } -impl From for ConnectErr { - fn from(e: MessengerRecvErr) -> ConnectErr { - ConnectErr::MessengerRecvErr(e) +impl Into for SetupMsg { + fn into(self) -> Msg { + Msg::SetupMsg(self) } } -impl Default for Arena { - fn default() -> Self { - Self { storage: vec![] } +impl Debug for Predicate { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.pad("{")?; + for (port, &v) in self.assigned.iter() { + f.write_fmt(format_args!("{:?}=>{}, ", port, if v { 'T' } else { 'F' }))? + } + f.pad("}") } } -impl Arena { - pub fn alloc(&mut self, t: T) -> Port { - self.storage.push(t); - let l: u32 = self.storage.len().try_into().unwrap(); - Port::from_raw(l - 1u32) - } - pub fn get(&self, key: Port) -> Option<&T> { - self.storage.get(key.to_raw() as usize) - } - pub fn get_mut(&mut self, key: Port) -> Option<&mut T> { - self.storage.get_mut(key.to_raw() as usize) - } - pub fn type_convert(self, f: impl FnMut((Port, T)) -> X) -> Arena { - Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() } - } - pub fn iter(&self) -> impl Iterator { - self.keyspace().zip(self.storage.iter()) - } - pub fn len(&self) -> usize { - self.storage.len() - } - pub fn keyspace(&self) -> impl Iterator { - (0u32..self.storage.len().try_into().unwrap()).map(Port::from_raw) +impl StringLogger { + pub fn new(controller_id: ControllerId) -> Self { + Self(controller_id, String::default()) } } - -impl ChannelIdStream { - fn new(controller_id: ControllerId) -> Self { - Self { controller_id, next_channel_index: 0 } +impl Logger for StringLogger { + fn line_writer(&mut self) -> &mut dyn std::fmt::Write { + use std::fmt::Write; + let _ = write!(&mut self.1, "\nCID({}): ", self.0); + self } - fn next(&mut self) -> ChannelId { - self.next_channel_index += 1; - ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 } + fn dump_log(&self, w: &mut dyn std::io::Write) { + let _ = w.write(self.1.as_bytes()); } } - -impl MessengerState { - // does NOT guarantee that events is non-empty - fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> { - use PollDeadlineErr::*; - self.events.clear(); - let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; - self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; - Ok(()) +impl std::fmt::Write for StringLogger { + fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> { + self.1.write_str(s) } } -impl From for ConnectErr { - fn from(e: PollDeadlineErr) -> ConnectErr { - match e { - PollDeadlineErr::Timeout => ConnectErr::Timeout, - PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed, +impl IntStream { + fn next(&mut self) -> u32 { + if self.next == u32::MAX { + panic!("NO NEXT!") } + self.next += 1; + self.next - 1 } } - -impl std::ops::Not for Polarity { - type Output = Self; - fn not(self) -> Self::Output { - use Polarity::*; - match self { - Putter => Getter, - Getter => Putter, - } +impl IdManager { + fn next_port(&mut self) -> PortId { + let port_suffix = self.port_suffix_stream.next(); + let controller_id = self.controller_id; + PortId { controller_id, port_index: port_suffix } } -} - -impl Predicate { - // returns true IFF self.unify would return Equivalent OR FormerNotLatter - pub fn satisfies(&self, other: &Self) -> bool { - let mut s_it = self.assigned.iter(); - let mut s = if let Some(s) = s_it.next() { - s - } else { - return other.assigned.is_empty(); - }; - for (oid, ob) in other.assigned.iter() { - while s.0 < oid { - s = if let Some(s) = s_it.next() { - s - } else { - return false; - }; - } - if s.0 > oid || s.1 != ob { - return false; + fn new(controller_id: ControllerId) -> Self { + Self { controller_id, port_suffix_stream: Default::default() } + } +} +impl Endpoint { + fn try_recv(&mut self) -> Result, EndpointRecvErr> { + use EndpointRecvErr::*; + // populate inbox as much as possible + 'read_loop: loop { + match self.stream.read_to_end(&mut self.inbox) { + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop, + Ok(0) => break 'read_loop, + Ok(_) => (), + Err(_e) => return Err(BrokenEndpoint), } } - true - } - - /// Given self and other, two predicates, return the most general Predicate possible, N - /// such that n.satisfies(self) && n.satisfies(other). - /// If none exists Nonexistant is returned. - /// If the resulting predicate is equivlanet to self, other, or both, - /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. - /// otherwise New(N) is returned. - pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { - use CommonSatResult::*; - // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. - let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; - let [mut s, mut o] = [s_it.next(), o_it.next()]; - // lists of assignments in self but not other and vice versa. - let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; - loop { - match [s, o] { - [None, None] => break, - [None, Some(x)] => { - o_not_s.push(x); - o_not_s.extend(o_it); - break; - } - [Some(x), None] => { - s_not_o.push(x); - s_not_o.extend(s_it); - break; - } - [Some((sid, sb)), Some((oid, ob))] => { - if sid < oid { - // o is missing this element - s_not_o.push((sid, sb)); - s = s_it.next(); - } else if sid > oid { - // s is missing this element - o_not_s.push((oid, ob)); - o = o_it.next(); - } else if sb != ob { - assert_eq!(sid, oid); - // both predicates assign the variable but differ on the value - return Nonexistant; - } else { - // both predicates assign the variable to the same value - s = s_it.next(); - o = o_it.next(); - } - } + let mut monitored = MonitoredReader::from(&self.inbox[..]); + match bincode::deserialize_from(&mut monitored) { + Ok(msg) => { + let msg_size = monitored.bytes_read(); + self.inbox.drain(0..(msg_size.try_into().unwrap())); + Ok(Some(msg)) } - } - // Observed zero inconsistencies. A unified predicate exists... - match [s_not_o.is_empty(), o_not_s.is_empty()] { - [true, true] => Equivalent, // ... equivalent to both. - [false, true] => FormerNotLatter, // ... equivalent to self. - [true, false] => LatterNotFormer, // ... equivalent to other. - [false, false] => { - // ... which is the union of the predicates' assignments but - // is equivalent to neither self nor other. - let mut new = self.clone(); - for (&id, &b) in o_not_s { - new.assigned.insert(id, b); + Err(e) => match *e { + bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { + Ok(None) } - New(new) - } - } - } - - pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { - self.assigned - .iter() - .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) - } - - pub fn batch_assign_nones( - &mut self, - channel_ids: impl Iterator, - value: bool, - ) { - for channel_id in channel_ids { - self.assigned.entry(channel_id).or_insert(value); - } - } - pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option { - self.assigned.insert(channel_id, value) - } - pub fn union_with(&self, other: &Self) -> Option { - let mut res = self.clone(); - for (&channel_id, &assignment_1) in other.assigned.iter() { - match res.assigned.insert(channel_id, assignment_1) { - Some(assignment_2) if assignment_1 != assignment_2 => return None, - _ => {} - } + _ => Err(MalformedMessage), + // println!("SERDE ERRKIND {:?}", e); + // Err(MalformedMessage) + }, } - Some(res) - } - pub fn query(&self, x: ChannelId) -> Option { - self.assigned.get(&x).copied() } - pub fn new_trivial() -> Self { - Self { assigned: Default::default() } - } -} -impl Debug for Predicate { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.pad("{")?; - for (ChannelId { controller_id, channel_index }, &v) in self.assigned.iter() { - f.write_fmt(format_args!( - "({:?},{:?})=>{}, ", - controller_id, - channel_index, - if v { 'T' } else { 'F' } - ))? - } - f.pad("}") - } -} - -#[test] -fn pred_sat() { - use maplit::btreemap; - let mut c = ChannelIdStream::new(0); - let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); - let p = Predicate::new_trivial(); - let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; - let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; - let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; - let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; - - assert!(p.satisfies(&p)); - assert!(p_0t.satisfies(&p_0t)); - assert!(p_0f.satisfies(&p_0f)); - assert!(p_0f_3f.satisfies(&p_0f_3f)); - assert!(p_0f_3t.satisfies(&p_0f_3t)); - - assert!(p_0t.satisfies(&p)); - assert!(p_0f.satisfies(&p)); - assert!(p_0f_3f.satisfies(&p_0f)); - assert!(p_0f_3t.satisfies(&p_0f)); - - assert!(!p.satisfies(&p_0t)); - assert!(!p.satisfies(&p_0f)); - assert!(!p_0f.satisfies(&p_0t)); - assert!(!p_0t.satisfies(&p_0f)); - assert!(!p_0f_3f.satisfies(&p_0f_3t)); - assert!(!p_0f_3t.satisfies(&p_0f_3f)); - assert!(!p_0t.satisfies(&p_0f_3f)); - assert!(!p_0f.satisfies(&p_0f_3f)); - assert!(!p_0t.satisfies(&p_0f_3t)); - assert!(!p_0f.satisfies(&p_0f_3t)); -} - -#[test] -fn pred_common_sat() { - use maplit::btreemap; - use CommonSatResult::*; - - let mut c = ChannelIdStream::new(0); - let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); - let p = Predicate::new_trivial(); - let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; - let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; - let p_3f = Predicate { assigned: btreemap! { ch[3] => false } }; - let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; - let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; - - assert_eq![p.common_satisfier(&p), Equivalent]; - assert_eq![p_0t.common_satisfier(&p_0t), Equivalent]; - - assert_eq![p.common_satisfier(&p_0t), LatterNotFormer]; - assert_eq![p_0t.common_satisfier(&p), FormerNotLatter]; - - assert_eq![p_0t.common_satisfier(&p_0f), Nonexistant]; - assert_eq![p_0f_3t.common_satisfier(&p_0f_3f), Nonexistant]; - assert_eq![p_0f_3t.common_satisfier(&p_3f), Nonexistant]; - assert_eq![p_3f.common_satisfier(&p_0f_3t), Nonexistant]; - - assert_eq![p_0f.common_satisfier(&p_3f), New(p_0f_3f)]; -} + fn send(&mut self, msg: &T) -> Result<(), ()> { + bincode::serialize_into(&mut self.stream, msg).map_err(drop) + } +} +impl Connector { + fn get_logger(&self) -> &dyn Logger { + &*self.logger + } +} + +// #[derive(Debug)] +// pub enum Connector { +// Unconfigured(Unconfigured), +// Configured(Configured), +// Connected(Connected), // TODO consider boxing. currently takes up a lot of stack space +// } +// #[derive(Debug)] +// pub struct Unconfigured { +// pub controller_id: ControllerId, +// } +// #[derive(Debug)] +// pub struct Configured { +// controller_id: ControllerId, +// polarities: Vec, +// bindings: HashMap, +// protocol_description: Arc, +// main_component: Vec, +// logger: String, +// } +// #[derive(Debug)] +// pub struct Connected { +// native_interface: Vec<(PortId, Polarity)>, +// sync_batches: Vec, +// // controller is cooperatively scheduled with the native application +// // (except for transport layer behind Endpoints, which are managed by the OS) +// // control flow is passed to the controller during methods on Connector (primarily, connect and sync). +// controller: Controller, +// } + +// #[derive(Debug, Copy, Clone)] +// pub enum PortBinding { +// Native, +// Active(SocketAddr), +// Passive(SocketAddr), +// } + +// #[derive(Debug)] +// struct Arena { +// storage: Vec, +// } + +// #[derive(Debug)] +// struct ReceivedMsg { +// recipient: PortId, +// msg: Msg, +// } + +// #[derive(Debug)] +// struct MessengerState { +// poll: Poll, +// events: Events, +// delayed: Vec, +// undelayed: Vec, +// polled_undrained: IndexSet, +// } +// #[derive(Debug)] +// struct ChannelIdStream { +// controller_id: ControllerId, +// next_channel_index: ChannelIndex, +// } + +// #[derive(Debug)] +// struct Controller { +// protocol_description: Arc, +// inner: ControllerInner, +// ephemeral: ControllerEphemeral, +// unrecoverable_error: Option, // prevents future calls to Sync +// } +// #[derive(Debug)] +// struct ControllerInner { +// round_index: usize, +// channel_id_stream: ChannelIdStream, +// endpoint_exts: Arena, +// messenger_state: MessengerState, +// mono_n: MonoN, // state at next round start +// mono_ps: Vec, // state at next round start +// family: ControllerFamily, +// logger: String, +// } + +// /// This structure has its state entirely reset between synchronous rounds +// #[derive(Debug, Default)] +// struct ControllerEphemeral { +// solution_storage: SolutionStorage, +// poly_n: Option, +// poly_ps: Vec, +// mono_ps: Vec, +// port_to_holder: HashMap, +// } + +// #[derive(Debug)] +// struct ControllerFamily { +// parent_port: Option, +// children_ports: Vec, +// } + +// #[derive(Debug)] +// pub(crate) enum SyncRunResult { +// BlockingForRecv, +// AllBranchesComplete, +// NoBranches, +// } + +// // Used to identify poly actors +// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +// enum PolyId { +// N, +// P { index: usize }, +// } + +// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +// pub(crate) enum SubtreeId { +// PolyN, +// PolyP { index: usize }, +// ChildController { port: PortId }, +// } + +// pub(crate) struct MonoPContext<'a> { +// inner: &'a mut ControllerInner, +// ports: &'a mut HashSet, +// mono_ps: &'a mut Vec, +// } +// pub(crate) struct PolyPContext<'a> { +// my_subtree_id: SubtreeId, +// inner: &'a mut ControllerInner, +// solution_storage: &'a mut SolutionStorage, +// } +// impl PolyPContext<'_> { +// #[inline(always)] +// fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { +// let Self { solution_storage, my_subtree_id, inner } = self; +// PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } +// } +// } +// struct BranchPContext<'m, 'r> { +// m_ctx: PolyPContext<'m>, +// ports: &'r HashSet, +// predicate: &'r Predicate, +// inbox: &'r HashMap, +// } + +// #[derive(Default)] +// pub(crate) struct SolutionStorage { +// old_local: HashSet, +// new_local: HashSet, +// // this pair acts as SubtreeId -> HashSet which is friendlier to iteration +// subtree_solutions: Vec>, +// subtree_id_to_index: HashMap, +// } + +// trait Messengerlike { +// fn get_state_mut(&mut self) -> &mut MessengerState; +// fn get_endpoint_mut(&mut self, eport: PortId) -> &mut Endpoint; + +// fn delay(&mut self, received: ReceivedMsg) { +// self.get_state_mut().delayed.push(received); +// } +// fn undelay_all(&mut self) { +// let MessengerState { delayed, undelayed, .. } = self.get_state_mut(); +// undelayed.extend(delayed.drain(..)) +// } + +// fn send(&mut self, to: PortId, msg: Msg) -> Result<(), EndpointErr> { +// self.get_endpoint_mut(to).send(msg) +// } + +// // attempt to receive a message from one of the endpoints before the deadline +// fn recv(&mut self, deadline: Instant) -> Result, MessengerRecvErr> { +// // try get something buffered +// if let Some(x) = self.get_state_mut().undelayed.pop() { +// return Ok(Some(x)); +// } + +// loop { +// // polled_undrained may not be empty +// while let Some(eport) = self.get_state_mut().polled_undrained.pop() { +// if let Some(msg) = self +// .get_endpoint_mut(eport) +// .recv() +// .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? +// { +// // this endpoint MAY still have messages! check again in future +// self.get_state_mut().polled_undrained.insert(eport); +// return Ok(Some(ReceivedMsg { recipient: eport, msg })); +// } +// } + +// let state = self.get_state_mut(); +// match state.poll_events(deadline) { +// Ok(()) => { +// for e in state.events.iter() { +// state.polled_undrained.insert(PortId::from_token(e.token())); +// } +// } +// Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), +// Err(PollDeadlineErr::Timeout) => return Ok(None), +// } +// } +// } +// fn recv_blocking(&mut self) -> Result { +// // try get something buffered +// if let Some(x) = self.get_state_mut().undelayed.pop() { +// return Ok(x); +// } + +// loop { +// // polled_undrained may not be empty +// while let Some(eport) = self.get_state_mut().polled_undrained.pop() { +// if let Some(msg) = self +// .get_endpoint_mut(eport) +// .recv() +// .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? +// { +// // this endpoint MAY still have messages! check again in future +// self.get_state_mut().polled_undrained.insert(eport); +// return Ok(ReceivedMsg { recipient: eport, msg }); +// } +// } + +// let state = self.get_state_mut(); + +// state +// .poll +// .poll(&mut state.events, None) +// .map_err(|_| MessengerRecvErr::PollingFailed)?; +// for e in state.events.iter() { +// state.polled_undrained.insert(PortId::from_token(e.token())); +// } +// } +// } +// } + +// ///////////////////////////////// +// impl Debug for SolutionStorage { +// fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { +// f.pad("Solutions: [")?; +// for (subtree_id, &index) in self.subtree_id_to_index.iter() { +// let sols = &self.subtree_solutions[index]; +// f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; +// } +// f.pad("]") +// } +// } +// impl From for SyncErr { +// fn from(e: EvalErr) -> SyncErr { +// SyncErr::EvalErr(e) +// } +// } +// impl From for SyncErr { +// fn from(e: MessengerRecvErr) -> SyncErr { +// SyncErr::MessengerRecvErr(e) +// } +// } +// impl From for ConnectErr { +// fn from(e: MessengerRecvErr) -> ConnectErr { +// ConnectErr::MessengerRecvErr(e) +// } +// } +// impl Default for Arena { +// fn default() -> Self { +// Self { storage: vec![] } +// } +// } +// impl Arena { +// pub fn alloc(&mut self, t: T) -> PortId { +// self.storage.push(t); +// let l: u32 = self.storage.len().try_into().unwrap(); +// PortId::from_raw(l - 1u32) +// } +// pub fn get(&self, key: PortId) -> Option<&T> { +// self.storage.get(key.to_raw() as usize) +// } +// pub fn get_mut(&mut self, key: PortId) -> Option<&mut T> { +// self.storage.get_mut(key.to_raw() as usize) +// } +// pub fn type_convert(self, f: impl FnMut((PortId, T)) -> X) -> Arena { +// Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() } +// } +// pub fn iter(&self) -> impl Iterator { +// self.keyspace().zip(self.storage.iter()) +// } +// pub fn len(&self) -> usize { +// self.storage.len() +// } +// pub fn keyspace(&self) -> impl Iterator { +// (0u32..self.storage.len().try_into().unwrap()).map(PortId::from_raw) +// } +// } + +// impl ChannelIdStream { +// fn new(controller_id: ControllerId) -> Self { +// Self { controller_id, next_channel_index: 0 } +// } +// fn next(&mut self) -> ChannelId { +// self.next_channel_index += 1; +// ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 } +// } +// } + +// impl MessengerState { +// // does NOT guarantee that events is non-empty +// fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> { +// use PollDeadlineErr::*; +// self.events.clear(); +// let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; +// self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; +// Ok(()) +// } +// } +// impl From for ConnectErr { +// fn from(e: PollDeadlineErr) -> ConnectErr { +// match e { +// PollDeadlineErr::Timeout => ConnectErr::Timeout, +// PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed, +// } +// } +// } + +// impl std::ops::Not for Polarity { +// type Output = Self; +// fn not(self) -> Self::Output { +// use Polarity::*; +// match self { +// Putter => Getter, +// Getter => Putter, +// } +// } +// } + +// impl Predicate { +// // returns true IFF self.unify would return Equivalent OR FormerNotLatter +// pub fn satisfies(&self, other: &Self) -> bool { +// let mut s_it = self.assigned.iter(); +// let mut s = if let Some(s) = s_it.next() { +// s +// } else { +// return other.assigned.is_empty(); +// }; +// for (oid, ob) in other.assigned.iter() { +// while s.0 < oid { +// s = if let Some(s) = s_it.next() { +// s +// } else { +// return false; +// }; +// } +// if s.0 > oid || s.1 != ob { +// return false; +// } +// } +// true +// } + +// /// Given self and other, two predicates, return the most general Predicate possible, N +// /// such that n.satisfies(self) && n.satisfies(other). +// /// If none exists Nonexistant is returned. +// /// If the resulting predicate is equivlanet to self, other, or both, +// /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. +// /// otherwise New(N) is returned. +// pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { +// use CommonSatResult::*; +// // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. +// let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; +// let [mut s, mut o] = [s_it.next(), o_it.next()]; +// // lists of assignments in self but not other and vice versa. +// let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; +// loop { +// match [s, o] { +// [None, None] => break, +// [None, Some(x)] => { +// o_not_s.push(x); +// o_not_s.extend(o_it); +// break; +// } +// [Some(x), None] => { +// s_not_o.push(x); +// s_not_o.extend(s_it); +// break; +// } +// [Some((sid, sb)), Some((oid, ob))] => { +// if sid < oid { +// // o is missing this element +// s_not_o.push((sid, sb)); +// s = s_it.next(); +// } else if sid > oid { +// // s is missing this element +// o_not_s.push((oid, ob)); +// o = o_it.next(); +// } else if sb != ob { +// assert_eq!(sid, oid); +// // both predicates assign the variable but differ on the value +// return Nonexistant; +// } else { +// // both predicates assign the variable to the same value +// s = s_it.next(); +// o = o_it.next(); +// } +// } +// } +// } +// // Observed zero inconsistencies. A unified predicate exists... +// match [s_not_o.is_empty(), o_not_s.is_empty()] { +// [true, true] => Equivalent, // ... equivalent to both. +// [false, true] => FormerNotLatter, // ... equivalent to self. +// [true, false] => LatterNotFormer, // ... equivalent to other. +// [false, false] => { +// // ... which is the union of the predicates' assignments but +// // is equivalent to neither self nor other. +// let mut new = self.clone(); +// for (&id, &b) in o_not_s { +// new.assigned.insert(id, b); +// } +// New(new) +// } +// } +// } + +// pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { +// self.assigned +// .iter() +// .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) +// } + +// pub fn batch_assign_nones( +// &mut self, +// channel_ids: impl Iterator, +// value: bool, +// ) { +// for channel_id in channel_ids { +// self.assigned.entry(channel_id).or_insert(value); +// } +// } +// pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option { +// self.assigned.insert(channel_id, value) +// } +// pub fn union_with(&self, other: &Self) -> Option { +// let mut res = self.clone(); +// for (&channel_id, &assignment_1) in other.assigned.iter() { +// match res.assigned.insert(channel_id, assignment_1) { +// Some(assignment_2) if assignment_1 != assignment_2 => return None, +// _ => {} +// } +// } +// Some(res) +// } +// pub fn query(&self, x: ChannelId) -> Option { +// self.assigned.get(&x).copied() +// } +// pub fn new_trivial() -> Self { +// Self { assigned: Default::default() } +// } +// } + +// #[test] +// fn pred_sat() { +// use maplit::btreemap; +// let mut c = ChannelIdStream::new(0); +// let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); +// let p = Predicate::new_trivial(); +// let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; +// let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; +// let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; +// let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; + +// assert!(p.satisfies(&p)); +// assert!(p_0t.satisfies(&p_0t)); +// assert!(p_0f.satisfies(&p_0f)); +// assert!(p_0f_3f.satisfies(&p_0f_3f)); +// assert!(p_0f_3t.satisfies(&p_0f_3t)); + +// assert!(p_0t.satisfies(&p)); +// assert!(p_0f.satisfies(&p)); +// assert!(p_0f_3f.satisfies(&p_0f)); +// assert!(p_0f_3t.satisfies(&p_0f)); + +// assert!(!p.satisfies(&p_0t)); +// assert!(!p.satisfies(&p_0f)); +// assert!(!p_0f.satisfies(&p_0t)); +// assert!(!p_0t.satisfies(&p_0f)); +// assert!(!p_0f_3f.satisfies(&p_0f_3t)); +// assert!(!p_0f_3t.satisfies(&p_0f_3f)); +// assert!(!p_0t.satisfies(&p_0f_3f)); +// assert!(!p_0f.satisfies(&p_0f_3f)); +// assert!(!p_0t.satisfies(&p_0f_3t)); +// assert!(!p_0f.satisfies(&p_0f_3t)); +// } + +// #[test] +// fn pred_common_sat() { +// use maplit::btreemap; +// use CommonSatResult::*; + +// let mut c = ChannelIdStream::new(0); +// let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); +// let p = Predicate::new_trivial(); +// let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; +// let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; +// let p_3f = Predicate { assigned: btreemap! { ch[3] => false } }; +// let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; +// let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; + +// assert_eq![p.common_satisfier(&p), Equivalent]; +// assert_eq![p_0t.common_satisfier(&p_0t), Equivalent]; + +// assert_eq![p.common_satisfier(&p_0t), LatterNotFormer]; +// assert_eq![p_0t.common_satisfier(&p), FormerNotLatter]; + +// assert_eq![p_0t.common_satisfier(&p_0f), Nonexistant]; +// assert_eq![p_0f_3t.common_satisfier(&p_0f_3f), Nonexistant]; +// assert_eq![p_0f_3t.common_satisfier(&p_3f), Nonexistant]; +// assert_eq![p_3f.common_satisfier(&p_0f_3t), Nonexistant]; + +// assert_eq![p_0f.common_satisfier(&p_3f), New(p_0f_3f)]; +// } diff --git a/src/runtime/my_tests.rs b/src/runtime/my_tests.rs new file mode 100644 index 0000000000000000000000000000000000000000..56d9f89c5429c586800e84ff96dcc5824bd3cab2 --- /dev/null +++ b/src/runtime/my_tests.rs @@ -0,0 +1,76 @@ +use crate as reowolf; +use reowolf::Polarity::*; +use std::net::SocketAddr; +use std::{sync::Arc, time::Duration}; + +fn next_test_addr() -> SocketAddr { + use std::{ + net::{Ipv4Addr, SocketAddrV4}, + sync::atomic::{AtomicU16, Ordering::SeqCst}, + }; + static TEST_PORT: AtomicU16 = AtomicU16::new(5_000); + let port = TEST_PORT.fetch_add(1, SeqCst); + SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into() +} + +lazy_static::lazy_static! { + static ref MINIMAL_PROTO: Arc = + { Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap()) }; +} + +#[test] +fn simple_connector() { + let c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); + println!("{:#?}", c); +} + +#[test] +fn add_port_pair() { + let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let [_, _] = c.add_port_pair(); + let [_, _] = c.add_port_pair(); + println!("{:#?}", c); +} + +#[test] +fn add_sync() { + let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let [o, i] = c.add_port_pair(); + c.add_component(b"sync", &[i, o]).unwrap(); + println!("{:#?}", c); +} + +#[test] +fn add_net_port() { + let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let sock_addr = next_test_addr(); + let _ = c + .add_net_port(reowolf::EndpointSetup { polarity: Getter, sock_addr, is_active: false }) + .unwrap(); + let _ = c + .add_net_port(reowolf::EndpointSetup { polarity: Putter, sock_addr, is_active: true }) + .unwrap(); + println!("{:#?}", c); +} + +#[test] +fn trivial_connect() { + let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); + c.connect(Duration::from_secs(1)).unwrap(); + println!("{:#?}", c); +} + +#[test] +fn single_node_connect() { + let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let sock_addr = next_test_addr(); + let _ = c + .add_net_port(reowolf::EndpointSetup { polarity: Getter, sock_addr, is_active: false }) + .unwrap(); + let _ = c + .add_net_port(reowolf::EndpointSetup { polarity: Putter, sock_addr, is_active: true }) + .unwrap(); + c.connect(Duration::from_secs(1)).unwrap(); + println!("{:#?}", c); + c.get_logger().dump_log(&mut std::io::stdout().lock()); +} diff --git a/src/runtime/serde.rs b/src/runtime/serde.rs index 87fd04a704e8c5b9b2318e9bcd1119d73d22798d..c9a4ec1cc7c843c8452e1058dda4d79bab2866d2 100644 --- a/src/runtime/serde.rs +++ b/src/runtime/serde.rs @@ -1,8 +1,5 @@ use crate::common::*; -use crate::runtime::{ - endpoint::{CommMsg, CommMsgContents, Decision, EndpointInfo, Msg, SetupMsg}, - Predicate, -}; +use crate::runtime::*; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::io::{ErrorKind::InvalidData, Read, Write}; @@ -53,6 +50,19 @@ macro_rules! ser_seq { } ///////////////////////////////////////// +impl Ser for W { + fn ser(&mut self, t: &PortId) -> Result<(), std::io::Error> { + self.ser(&t.controller_id)?; + self.ser(&VarLenInt(t.port_index as u64)) + } +} + +impl De for R { + fn de(&mut self) -> Result { + Ok(PortId { controller_id: self.de()?, port_index: De::::de(self)?.0 as u32 }) + } +} + impl Ser for W { fn ser(&mut self, t: &bool) -> Result<(), std::io::Error> { self.ser(&match t { @@ -147,21 +157,6 @@ impl De for R { } } -impl Ser for W { - fn ser(&mut self, t: &ChannelId) -> Result<(), std::io::Error> { - self.ser(&t.controller_id)?; - self.ser(&VarLenInt(t.channel_index as u64)) - } -} -impl De for R { - fn de(&mut self) -> Result { - Ok(ChannelId { - controller_id: self.de()?, - channel_index: De::::de(self)?.0 as ChannelIndex, - }) - } -} - impl Ser for W { fn ser(&mut self, t: &Predicate) -> Result<(), std::io::Error> { self.ser(&VarLenInt(t.assigned.len() as u64))?; @@ -174,7 +169,7 @@ impl Ser for W { impl De for R { fn de(&mut self) -> Result { let VarLenInt(len) = self.de()?; - let mut assigned = BTreeMap::::default(); + let mut assigned = BTreeMap::::default(); for _ in 0..len { assigned.insert(self.de()?, self.de()?); } @@ -221,26 +216,13 @@ impl De for R { }) } } - -impl Ser for W { - fn ser(&mut self, t: &EndpointInfo) -> Result<(), std::io::Error> { - let EndpointInfo { channel_id, polarity } = t; - ser_seq![self, channel_id, polarity] - } -} -impl De for R { - fn de(&mut self) -> Result { - Ok(EndpointInfo { channel_id: self.de()?, polarity: self.de()? }) - } -} - impl Ser for W { fn ser(&mut self, t: &Msg) -> Result<(), std::io::Error> { use {CommMsgContents::*, SetupMsg::*}; match t { Msg::SetupMsg(s) => match s { // [flag, data] - ChannelSetup { info } => ser_seq![self, &0u8, info], + MyPortInfo { polarity, port } => ser_seq![self, &0u8, polarity, port], LeaderEcho { maybe_leader } => ser_seq![self, &1u8, maybe_leader], LeaderAnnounce { leader } => ser_seq![self, &2u8, leader], YouAreMyParent => ser_seq![self, &3u8], @@ -267,7 +249,7 @@ impl De for R { Ok(match b { 0..=3 => Msg::SetupMsg(match b { // [flag, data] - 0u8 => ChannelSetup { info: self.de()? }, + 0u8 => MyPortInfo { polarity: self.de()?, port: self.de()? }, 1u8 => LeaderEcho { maybe_leader: self.de()? }, 2u8 => LeaderAnnounce { leader: self.de()? }, 3u8 => YouAreMyParent, diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index a814dd78afe58d847588da5a9615a1d04ae2216a..7e9d70da2fd3abcbb45dfdeac03859ae25d1b645 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -25,7 +25,7 @@ impl Controller { bound_proto_interface: &[(PortBinding, Polarity)], logger: &mut String, deadline: Instant, - ) -> Result<(Self, Vec<(Port, Polarity)>), ConnectErr> { + ) -> Result<(Self, Vec<(PortId, Polarity)>), ConnectErr> { use ConnectErr::*; log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); @@ -215,7 +215,7 @@ impl Controller { for event in ms.events.iter() { log!(logger, "event {:#?}", event); let token = event.token(); - let port = Port::from_token(token); + let port = PortId::from_token(token); let entry = endpoint_ext_todos.get_mut(port).unwrap(); match entry { Finished(_) => { @@ -325,7 +325,7 @@ impl Controller { logger: &mut String, endpoint_exts: &mut Arena, messenger_state: &mut MessengerState, - neighbors: Vec, + neighbors: Vec, deadline: Instant, ) -> Result { use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*}; @@ -337,7 +337,7 @@ impl Controller { fn get_state_mut(&mut self) -> &mut MessengerState { self.0 } - fn get_endpoint_mut(&mut self, port: Port) -> &mut Endpoint { + fn get_endpoint_mut(&mut self, port: PortId) -> &mut Endpoint { &mut self.1.get_mut(port).expect("OUT OF BOUNDS").endpoint } } @@ -353,7 +353,7 @@ impl Controller { // 2. Receive incoming replies. whenever a higher-id echo arrives, // adopt it as leader, sender as parent, and reset the await set. - let mut parent: Option = None; + let mut parent: Option = None; let mut my_leader = major; messenger.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { @@ -478,7 +478,7 @@ impl Messengerlike for Controller { fn get_state_mut(&mut self) -> &mut MessengerState { &mut self.inner.messenger_state } - fn get_endpoint_mut(&mut self, port: Port) -> &mut Endpoint { + fn get_endpoint_mut(&mut self, port: PortId) -> &mut Endpoint { &mut self.inner.endpoint_exts.get_mut(port).expect("OUT OF BOUNDS").endpoint } } diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs new file mode 100644 index 0000000000000000000000000000000000000000..fcd4c87a48d131ae8c230ce6de899cf5f03ad353 --- /dev/null +++ b/src/runtime/setup2.rs @@ -0,0 +1,275 @@ +use crate::common::*; +use crate::runtime::*; + +impl Connector { + pub fn new_simple( + proto_description: Arc, + controller_id: ControllerId, + ) -> Self { + let logger = Box::new(StringLogger::new(controller_id)); + let surplus_sockets = 8; + Self::new(logger, proto_description, controller_id, surplus_sockets) + } + pub fn new( + logger: Box, + proto_description: Arc, + controller_id: ControllerId, + surplus_sockets: u16, + ) -> Self { + Self { + logger, + proto_description, + id_manager: IdManager::new(controller_id), + native_ports: Default::default(), + proto_components: Default::default(), + outp_to_inp: Default::default(), + inp_to_route: Default::default(), + phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, + } + } + pub fn add_port_pair(&mut self) -> [PortId; 2] { + let o = self.id_manager.next_port(); + let i = self.id_manager.next_port(); + self.outp_to_inp.insert(o, i); + self.inp_to_route.insert(i, InpRoute::NativeComponent); + self.native_ports.insert(o); + self.native_ports.insert(i); + log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); + [o, i] + } + pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result { + match &mut self.phased { + ConnectorPhased::Setup { endpoint_setups, .. } => { + let p = self.id_manager.next_port(); + self.native_ports.insert(p); + log!(self.logger, "Added net port {:?} with info {:?} ", p, &endpoint_setup); + endpoint_setups.push((p, endpoint_setup)); + Ok(p) + } + ConnectorPhased::Communication { .. } => Err(()), + } + } + fn check_polarity(&self, port: &PortId) -> Polarity { + if let ConnectorPhased::Setup { endpoint_setups, .. } = &self.phased { + for (setup_port, EndpointSetup { polarity, .. }) in endpoint_setups.iter() { + if setup_port == port { + // special case. this port's polarity isn't reflected by + // self.inp_to_route or self.outp_to_inp, because its still not paired to a peer + return *polarity; + } + } + } + if self.outp_to_inp.contains_key(port) { + Polarity::Putter + } else { + assert!(self.inp_to_route.contains_key(port)); + Polarity::Getter + } + } + pub fn add_component( + &mut self, + identifier: &[u8], + ports: &[PortId], + ) -> Result<(), AddComponentError> { + use AddComponentError::*; + let polarities = self.proto_description.component_polarities(identifier)?; + if polarities.len() != ports.len() { + return Err(WrongNumberOfParamaters { expected: polarities.len() }); + } + for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { + if !self.native_ports.contains(port) { + return Err(UnknownPort(*port)); + } + if expected_polarity != self.check_polarity(port) { + return Err(WrongPortPolarity { port: *port, expected_polarity }); + } + } + // ok! + let state = self.proto_description.new_main_component(identifier, ports); + let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state }; + let proto_component_index = self.proto_components.len(); + self.proto_components.push(proto_component); + for port in ports.iter() { + if let Polarity::Getter = self.check_polarity(port) { + self.inp_to_route + .insert(*port, InpRoute::ProtoComponent { index: proto_component_index }); + } + } + Ok(()) + } + pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> { + match &mut self.phased { + ConnectorPhased::Communication { .. } => { + log!(self.logger, "Call to connecting in connected state"); + Err(()) + } + ConnectorPhased::Setup { endpoint_setups, .. } => { + log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout); + let deadline = Instant::now() + timeout; + // connect all endpoints in parallel; send and receive peer ids through ports + let (mut endpoint_exts, mut endpoint_poller) = init_endpoints( + &mut *self.logger, + endpoint_setups, + &mut self.inp_to_route, + deadline, + )?; + log!(self.logger, "Successfully connected {} endpoints", endpoint_exts.len()); + // leader election and tree construction + let neighborhood = + init_neighborhood(&mut *self.logger, &mut endpoint_exts, &mut endpoint_poller)?; + log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood); + // TODO session optimization goes here + self.phased = ConnectorPhased::Communication { + endpoint_poller, + endpoint_exts, + neighborhood, + mem_inbox: Default::default(), + }; + Ok(()) + } + } + } +} + +fn init_endpoints( + logger: &mut dyn Logger, + endpoint_setups: &[(PortId, EndpointSetup)], + inp_to_route: &mut HashMap, + deadline: Instant, +) -> Result<(Vec, EndpointPoller), ()> { + use mio07::{ + net::{TcpListener, TcpStream}, + Events, Interest, Poll, Token, + }; + const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); + struct Todo { + todo_endpoint: TodoEndpoint, + endpoint_setup: EndpointSetup, + local_port: PortId, + sent_local_port: bool, // true <-> I've sent my local port + recv_peer_port: Option, // Some(..) <-> I've received my peer's port + } + enum TodoEndpoint { + Listener(TcpListener), + Endpoint(Endpoint), + } + fn init( + token: Token, + local_port: PortId, + endpoint_setup: &EndpointSetup, + poll: &mut Poll, + ) -> Result { + let todo_endpoint = if endpoint_setup.is_active { + let mut stream = TcpStream::connect(endpoint_setup.sock_addr).map_err(drop)?; + poll.registry().register(&mut stream, token, BOTH).unwrap(); + TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] }) + } else { + let mut listener = TcpListener::bind(endpoint_setup.sock_addr).map_err(drop)?; + poll.registry().register(&mut listener, token, BOTH).unwrap(); + TodoEndpoint::Listener(listener) + }; + Ok(Todo { + todo_endpoint, + endpoint_setup: endpoint_setup.clone(), + local_port, + sent_local_port: false, + recv_peer_port: None, + }) + }; + //////////////////////// + + let mut ep = EndpointPoller { + poll: Poll::new().map_err(drop)?, + events: Events::with_capacity(64), + undrained_endpoints: Default::default(), + delayed_inp_messages: Default::default(), + }; + + let mut todos = endpoint_setups + .iter() + .enumerate() + .map(|(index, (local_port, endpoint_setup))| { + init(Token(index), *local_port, endpoint_setup, &mut ep.poll) + }) + .collect::, _>>()?; + + let mut unfinished: HashSet = (0..todos.len()).collect(); + while !unfinished.is_empty() { + let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?; + ep.poll.poll(&mut ep.events, Some(remaining)).map_err(drop)?; + for event in ep.events.iter() { + let token = event.token(); + let Token(index) = token; + let todo: &mut Todo = &mut todos[index]; + if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint { + let (mut stream, peer_addr) = listener.accept().map_err(drop)?; + ep.poll.registry().deregister(listener).unwrap(); + ep.poll.registry().register(&mut stream, token, BOTH).unwrap(); + log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr); + let endpoint = Endpoint { stream, inbox: vec![] }; + todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); + } + match todo { + Todo { + todo_endpoint: TodoEndpoint::Endpoint(endpoint), + local_port, + endpoint_setup, + sent_local_port, + recv_peer_port, + } => { + if !unfinished.contains(&index) { + continue; + } + if event.is_writable() && !*sent_local_port { + let msg = + MyPortInfo { polarity: endpoint_setup.polarity, port: *local_port }; + endpoint.send(&msg)?; + log!(logger, "endpoint[{}] sent peer info {:?}", index, &msg); + *sent_local_port = true; + } + if event.is_readable() && recv_peer_port.is_none() { + ep.undrained_endpoints.insert(index); + if let Some(peer_port_info) = + endpoint.try_recv::().map_err(drop)? + { + log!(logger, "endpoint[{}] got peer info {:?}", index, peer_port_info); + assert!(peer_port_info.polarity != endpoint_setup.polarity); + if let Putter = endpoint_setup.polarity { + inp_to_route.insert(*local_port, InpRoute::Endpoint { index }); + } + *recv_peer_port = Some(peer_port_info.port); + } + } + if *sent_local_port && recv_peer_port.is_some() { + unfinished.remove(&index); + log!(logger, "endpoint[{}] is finished!", index); + } + } + Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(), + } + } + ep.events.clear(); + } + let endpoint_exts = todos + .into_iter() + .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt { + endpoint: match todo_endpoint { + TodoEndpoint::Endpoint(endpoint) => endpoint, + TodoEndpoint::Listener(..) => unreachable!(), + }, + inp_for_emerging_msgs: recv_peer_port.unwrap(), + }) + .collect(); + Ok((endpoint_exts, ep)) +} + +fn init_neighborhood( + logger: &mut dyn Logger, + endpoint_exts: &mut [EndpointExt], + endpoint_poller: &mut EndpointPoller, +) -> Result { + log!(logger, "Time to construct my neighborhood"); + let parent = None; + let children = Default::default(); + Ok(Neighborhood { parent, children }) +} diff --git a/src/runtime/v2.rs b/src/runtime/v2.rs index 610036ae0e4477f6581fade79ea3e32775880d80..abd043454dd94e5c1060e66ecada817add63cf96 100644 --- a/src/runtime/v2.rs +++ b/src/runtime/v2.rs @@ -1,262 +1,8 @@ -use crate::common::*; -use crate::runtime::endpoint::Endpoint; -use crate::runtime::endpoint::Msg; -use crate::runtime::ProtocolD; -use crate::runtime::ProtocolS; -use std::io::Write; +use crate::common::{ + Arc, ControllerId, Duration, HashMap, HashSet, Instant, Payload, Polarity, Port, PortId, + ProtocolDescription, SocketAddr, +}; +use crate::runtime::endpoint::{Endpoint, Msg}; +use crate::runtime::*; -#[derive(Default)] -struct IntStream { - next: u32, -} -struct IdManager { - controller_id: ControllerId, - port_suffix_stream: IntStream, -} - -struct ProtoComponent { - state: ProtocolS, - ports: HashSet, -} -enum InpRoute { - NativeComponent, - ProtoComponent { index: usize }, - Endpoint { index: usize }, -} -trait Logger { - fn line_writer(&mut self) -> &mut dyn Write; -} -#[derive(Clone)] -struct EndpointSetup { - polarity: Polarity, - sock_addr: SocketAddr, - is_active: bool, -} -struct EndpointExt { - net_endpoint: Endpoint, - // data-messages emerging from this endpoint are destined for this inp - inp: Port, -} -struct Neighborhood { - parent: Option, - children: Vec, // ordered, deduplicated -} -struct MemInMsg { - inp: Port, - msg: Payload, -} -struct EndpointPoller { - poll: Poll, - events: Events, - undrained_endpoints: HashSet, - delayed_inp_messages: Vec<(Port, Msg)>, -} -struct Connector { - logger: Box, - proto_description: Arc, - id_manager: IdManager, - native_ports: HashSet, - proto_components: Vec, - outp_to_inp: HashMap, - inp_to_route: HashMap, - phased: ConnectorPhased, -} -enum ConnectorPhased { - Setup { - endpoint_setups: Vec<(PortId, EndpointSetup)>, - surplus_sockets: u16, - }, - Communication { - endpoint_poller: EndpointPoller, - endpoint_exts: Vec, - neighborhood: Neighborhood, - mem_inbox: Vec, - }, -} ///////////////////////////// -impl IntStream { - fn next(&mut self) -> u32 { - if self.next == u32::MAX { - panic!("NO NEXT!") - } - self.next += 1; - self.next - 1 - } -} -impl IdManager { - fn next_port(&mut self) -> PortId { - let port_suffix = self.port_suffix_stream.next(); - let controller_id = self.controller_id; - PortId { controller_id, port_index: port_suffix } - } - fn new(controller_id: ControllerId) -> Self { - Self { controller_id, port_suffix_stream: Default::default() } - } -} -impl Connector { - pub fn new( - logger: Box, - proto_description: Arc, - controller_id: ControllerId, - surplus_sockets: u16, - ) -> Self { - Self { - logger, - proto_description, - id_manager: IdManager::new(controller_id), - native_ports: Default::default(), - proto_components: Default::default(), - outp_to_inp: Default::default(), - inp_to_route: Default::default(), - phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, - } - } - pub fn add_port_pair(&mut self) -> [PortId; 2] { - let o = self.id_manager.next_port(); - let i = self.id_manager.next_port(); - self.outp_to_inp.insert(o, i); - self.native_ports.insert(o); - self.native_ports.insert(i); - [o, i] - } - pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result { - match &mut self.phased { - ConnectorPhased::Setup { endpoint_setups, .. } => { - let p = self.id_manager.next_port(); - endpoint_setups.push((p, endpoint_setup)); - Ok(p) - } - ConnectorPhased::Communication { .. } => Err(()), - } - } - fn check_polarity(&self, port: &PortId) -> Polarity { - if self.outp_to_inp.contains_key(port) { - Polarity::Putter - } else { - assert!(self.inp_to_route.contains_key(port)); - Polarity::Getter - } - } - pub fn add_proto_component(&mut self, identifier: &[u8], ports: &[PortId]) -> Result<(), ()> { - let polarities = self.proto_description.component_polarities(identifier).map_err(drop)?; - if polarities.len() != ports.len() { - return Err(()); - } - for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { - if !self.native_ports.contains(port) { - return Err(()); - } - if expected_polarity != self.check_polarity(port) { - return Err(()); - } - } - // ok! - let state = self.proto_description.new_main_component(identifier, ports); - let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state }; - let proto_component_index = self.proto_components.len(); - self.proto_components.push(proto_component); - for port in ports.iter() { - if let Polarity::Getter = self.check_polarity(port) { - self.inp_to_route - .insert(*port, InpRoute::ProtoComponent { index: proto_component_index }); - } - } - Ok(()) - } - pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> { - match &mut self.phased { - ConnectorPhased::Communication { .. } => Err(()), - ConnectorPhased::Setup { endpoint_setups, surplus_sockets } => { - // connect all endpoints in parallel; send and receive peer ids through ports - let (mut endpoint_exts, mut endpoint_poller) = - init_endpoints(endpoint_setups, timeout)?; - write!( - self.logger.line_writer(), - "hello! I am controller_id:{}", - self.id_manager.controller_id - ); - // leader election and tree construction - let neighborhood = init_neighborhood(&mut endpoint_exts, &mut endpoint_poller)?; - // TODO session optimization goes here - self.phased = ConnectorPhased::Communication { - endpoint_poller, - endpoint_exts, - neighborhood, - mem_inbox: Default::default(), - }; - Ok(()) - } - } - } -} - -fn init_endpoints( - endpoint_setups: &[(PortId, EndpointSetup)], - timeout: Duration, -) -> Result<(Vec, EndpointPoller), ()> { - let mut endpoint_poller = EndpointPoller { - poll: Poll::new().map_err(drop)?, - events: Events::with_capacity(64), - undrained_endpoints: Default::default(), - delayed_inp_messages: Default::default(), - }; - const PORT_ID_LEN: usize = std::mem::size_of::(); - enum MaybeRecvPort { - Complete(Port), - Partial { buf: [u8; PORT_ID_LEN], read: u8 }, - } - struct Todo { - endpoint: TodoEndpoint, - polarity: Polarity, - local_port: Port, - sent_local_port: bool, - recv_peer_port: MaybeRecvPort, - } - enum TodoEndpoint { - Listener(mio::net::TcpListener), - Stream(mio::net::TcpStream), - } - const BOTH: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); - fn init( - token: Token, - local_port: Port, - endpoint_setup: &EndpointSetup, - poll: &mut Poll, - ) -> Result { - let endpoint = if endpoint_setup.is_active { - let mut stream = - mio::net::TcpStream::connect(&endpoint_setup.sock_addr).map_err(drop)?; - poll.registry().register(&mut stream, token, BOTH).unwrap(); - TodoEndpoint::Stream(stream) - } else { - let mut listener = - mio::net::TcpListener::bind(&endpoint_setup.sock_addr).map_err(drop)?; - poll.registry().register(&mut listener, token, BOTH).unwrap(); - TodoEndpoint::Listener(listener) - }; - Ok(Todo { - endpoint, - endpoint_setup: endpoint_setup.clone(), - local_port, - sent_local_port: false, - recv_peer_port: MaybeRecvPort::Partial { buf: [0; 8], read: 0 }, - }) - }; - - let todos = endpoint_setups - .iter() - .enumerate() - .map(|(index, (local_port, endpoint_setup))| { - init(Token(index), local_port, endpoint_setup, &mut endpoint_poller.poll) - }) - .collect::, _>>()?; - let endpoint_exts = vec![]; - Ok((endpoint_exts, endpoint_poller)) -} - -fn init_neighborhood( - endpoint_exts: &mut [EndpointExt], - endpoint_poller: &mut EndpointPoller, -) -> Result { - todo!() -}