Changeset - db17da820a3b
[Not reviewed]
0 11 0
Christopher Esterhuyse - 5 years ago 2020-07-01 08:41:15
christopher.esterhuyse@gmail.com
pruned dependencies. cleaned up visibility
11 files changed with 68 insertions and 75 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
[package]
 
name = "reowolf_rs"
 
version = "0.1.4"
 
authors = [
 
	"Christopher Esterhuyse <esterhuy@cwi.nl, christopher.esterhuyse@gmail.com>",
 
	"Hans-Dieter Hiep <hdh@cwi.nl>"
 
]
 
edition = "2018"
 

	
 
[dependencies]
 
# convenience macros
 
maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.2.1"
 
serde = { version = "1.0.114", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 
# take_mut = "0.2.2"
 
indexmap = "1.4.0" # hashsets/hashmaps with efficient arbitrary element removal
 

	
 
# network
 
# integer-encoding = "1.1.5"
 
# byteorder = "1.3.4"
 
mio = { version = "0.7.0", package = "mio", features = ["tcp", "os-poll"] }
 

	
 
# protocol
 
backtrace = "0.3"
 

	
 
[dev-dependencies]
 
# test-generator = "0.3.0"
 
crossbeam-utils = "0.7.2"
 
lazy_static = "1.4.0"
 

	
 
[lib]
 
# compile target: dynamically linked library using C ABI
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi"]
 
ffi = [] # no feature dependencies
 
\ No newline at end of file
cbindgen.toml
Show inline comments
 
language = "C"
 
header = "/* CBindgen generated */"
 
include_guard = "REOWOLF_HEADER_DEFINED"
 
[parse]
 
parse_deps = false
 
\ No newline at end of file
 
parse_deps = false
 
[enum]
 
prefix_with_name = true
 
\ No newline at end of file
examples/8_net_ports/amy.c
Show inline comments
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./8_amy_log.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	PortId putter, getter;
 
	char addr_str[] = "127.0.0.1:8000";
 
	connector_add_net_port(
 
		c, &putter, addr_str, sizeof(addr_str)-1, Putter, Active);
 
		c, &putter, addr_str, sizeof(addr_str)-1, Polarity_Putter, EndpointPolarity_Active);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_add_net_port(
 
		c, &getter, addr_str, sizeof(addr_str)-1, Getter, Passive);
 
		c, &getter, addr_str, sizeof(addr_str)-1, Polarity_Getter, EndpointPolarity_Passive);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_connect(c, 4000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	return 0;
 
}
 
\ No newline at end of file
examples/9_net_self_putget/amy.c
Show inline comments
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./9_amy_log.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	PortId putter, getter;
 
	char addr_str[] = "127.0.0.1:8000";
 
	connector_add_net_port(
 
		c, &putter, addr_str, sizeof(addr_str)-1, Putter, Active);
 
		c, &putter, addr_str, sizeof(addr_str)-1, Polarity_Putter, EndpointPolarity_Active);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_add_net_port(
 
		c, &getter, addr_str, sizeof(addr_str)-1, Getter, Passive);
 
		c, &getter, addr_str, sizeof(addr_str)-1, Polarity_Getter, EndpointPolarity_Passive);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_connect(c, 4000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_put_bytes(c, putter, "hi", 2);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_get(c, getter);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_sync(c, 4000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	return 0;
 
}
 
\ No newline at end of file
examples/a_swap/amy.c
Show inline comments
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
    if(argc != 3) {
 
        printf("Expected arg[1] and arg[2] for use as addr str\n");
 
        exit(1);
 
    }            
 
    char * pdl_ptr = buffer_pdl("eg_protocols.pdl");
 
    size_t pdl_len = strlen(pdl_ptr);
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len);
 
    char logpath[] = "./a_amy_log.txt";
 
    Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    
 
    PortId ports[6]; 
 
    connector_add_port_pair(c, &ports[0], &ports[1]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Getter, Passive);
 
    connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Polarity_Getter, EndpointPolarity_Passive);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Putter, Active);
 
    connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Polarity_Putter, EndpointPolarity_Active);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_port_pair(c, &ports[4], &ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,1,2,3,4,5}
 
    
 
    connector_add_component(c, "together", 8, &ports[1], 4);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,5} together {1,2,3,4}
 
    
 
    connector_connect(c, 4000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_put_bytes(c, ports[0], "hi", 2);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_get(c, ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
    connector_sync(c, 1000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, ports[5], &msg_len);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
examples/a_swap/bob.c
Show inline comments
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
    if(argc != 3) {
 
        printf("Expected arg[1] and arg[2] for use as addr str\n");
 
        exit(1);
 
    }            
 
    char * pdl_ptr = buffer_pdl("eg_protocols.pdl");
 
    size_t pdl_len = strlen(pdl_ptr);
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len);
 
    char logpath[] = "./a_bob_log.txt";
 
    Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    
 
    PortId ports[6]; 
 
    connector_add_port_pair(c, &ports[0], &ports[1]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Getter, Passive);
 
    connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Polarity_Getter, EndpointPolarity_Passive);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Putter, Active);
 
    connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Polarity_Putter, EndpointPolarity_Active);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_port_pair(c, &ports[4], &ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,1,2,3,4,5}
 
    
 
    connector_add_component(c, "together", 8, &ports[1], 4);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,5} together {1,2,3,4}
 
    
 
    connector_connect(c, 4000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_put_bytes(c, ports[0], "hi", 2);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_get(c, ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
    connector_sync(c, 1000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, ports[5], &msg_len);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
reowolf.h
Show inline comments
 
/* CBindgen generated */
 

	
 
#ifndef REOWOLF_HEADER_DEFINED
 
#define REOWOLF_HEADER_DEFINED
 

	
 
#include <stdarg.h>
 
#include <stdbool.h>
 
#include <stdint.h>
 
#include <stdlib.h>
 

	
 
typedef enum {
 
  Active,
 
  Passive,
 
  EndpointPolarity_Active,
 
  EndpointPolarity_Passive,
 
} EndpointPolarity;
 

	
 
typedef enum {
 
  Putter,
 
  Getter,
 
  Polarity_Putter,
 
  Polarity_Getter,
 
} Polarity;
 

	
 
typedef struct Arc_ProtocolDescription Arc_ProtocolDescription;
 

	
 
typedef struct Connector Connector;
 

	
 
typedef int32_t ErrorCode;
 

	
 
typedef uint32_t ConnectorId;
 

	
 
typedef uint32_t PortSuffix;
 

	
 
typedef struct {
 
  ConnectorId connector_id;
 
  PortSuffix u32_suffix;
 
} PortId;
 

	
 
/**
 
 * Given
 
 * - an initialized connector in setup or connecting state,
 
 * - a string slice for the component's identifier in the connector's configured protocol description,
 
 * - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
 * the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
 * Usable in {setup, communication} states.
src/common.rs
Show inline comments
 
///////////////////// PRELUDE /////////////////////
 

	
 
pub(crate) use crate::protocol::{ComponentState, ProtocolDescription};
 
pub(crate) use crate::runtime::{NonsyncProtoContext, SyncProtoContext};
 
pub(crate) use crate::runtime::{error::AddComponentError, NonsyncProtoContext, SyncProtoContext};
 

	
 
pub use core::{
 
pub(crate) use core::{
 
    cmp::Ordering,
 
    fmt::{Debug, Formatter},
 
    hash::{Hash, Hasher},
 
    ops::{Range, RangeFrom},
 
    hash::Hash,
 
    ops::Range,
 
    time::Duration,
 
};
 
pub use indexmap::{IndexMap, IndexSet};
 
pub use maplit::{hashmap, hashset};
 
pub use mio::{
 
// pub(crate) use indexmap::IndexSet;
 
pub(crate) use maplit::hashmap;
 
pub(crate) use mio::{
 
    net::{TcpListener, TcpStream},
 
    Events, Interest, Poll, Token,
 
};
 
pub use std::{
 
    collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
 
pub(crate) use std::{
 
    collections::{BTreeMap, HashMap, HashSet},
 
    convert::TryInto,
 
    io::{Read, Write},
 
    net::SocketAddr,
 
    sync::Arc,
 
    time::Instant,
 
};
 
pub use Polarity::*;
 

	
 
///////////////////// DEFS /////////////////////
 
pub(crate) use Polarity::*;
 

	
 
pub type ConnectorId = u32;
 
pub type PortSuffix = u32;
 

	
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
// acquired via error in the Rust API
 
pub struct ProtoComponentId(Id);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub struct Id {
 
    pub(crate) connector_id: ConnectorId,
 
    pub(crate) u32_suffix: PortSuffix,
 
}
 

	
 
#[derive(Debug, Default)]
 
pub struct U32Stream {
 
    next: u32,
 
}
 

	
 
// globally unique
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(transparent)]
 
pub struct PortId(Id);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct FiringVar(pub(crate) PortId);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct ProtoComponentId(Id);
 

	
 
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
// #[repr(C)]
 
#[derive(Default, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
pub struct Payload(Arc<Vec<u8>>);
 

	
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub enum Polarity {
 
    Putter, // output port (from the perspective of the component)
 
    Getter, // input port (from the perspective of the component)
 
}
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub enum EndpointPolarity {
 
    Active,  // calls connect()
 
    Passive, // calls bind() listen() accept()
 
}
 

	
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum AddComponentError {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(PortId),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(PortId),
 
    WrongPortPolarity { port: PortId, expected_polarity: Polarity },
 
    DuplicateMovedPort(PortId),
 
}
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub(crate) struct FiringVar(pub(crate) PortId);
 

	
 
#[derive(Debug, Clone)]
 
pub enum NonsyncBlocker {
 
pub(crate) enum NonsyncBlocker {
 
    Inconsistent,
 
    ComponentExit,
 
    SyncBlockStart,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum SyncBlocker {
 
pub(crate) enum SyncBlocker {
 
    Inconsistent,
 
    SyncBlockEnd,
 
    CouldntReadMsg(PortId),
 
    CouldntCheckFiring(PortId),
 
    PutMsg(PortId, Payload),
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl U32Stream {
 
    pub fn next(&mut self) -> u32 {
 
    pub(crate) fn next(&mut self) -> u32 {
 
        if self.next == u32::MAX {
 
            panic!("NO NEXT!")
 
        }
 
        self.next += 1;
 
        self.next - 1
 
    }
 
}
 
impl From<Id> for PortId {
 
    fn from(id: Id) -> PortId {
 
        Self(id)
 
    }
 
}
 
impl From<Id> for ProtoComponentId {
 
    fn from(id: Id) -> ProtoComponentId {
 
        Self(id)
 
    }
 
}
 
impl From<&[u8]> for Payload {
 
    fn from(s: &[u8]) -> Payload {
 
        Payload(Arc::new(s.to_vec()))
 
    }
 
}
 
impl Payload {
 
    pub fn new(len: usize) -> Payload {
 
        let mut v = Vec::with_capacity(len);
 
        unsafe {
 
            v.set_len(len);
 
        }
 
        Payload(Arc::new(v))
 
    }
 
    pub fn len(&self) -> usize {
 
        self.0.len()
 
    }
 
    pub fn as_slice(&self) -> &[u8] {
 
        &self.0
 
    }
 
    pub fn as_mut_slice(&mut self) -> &mut [u8] {
 
        Arc::make_mut(&mut self.0) as _
 
    }
 
    pub fn concat_with(&mut self, other: &Self) {
 
    pub fn concatenate_with(&mut self, other: &Self) {
 
        let bytes = other.as_slice().iter().copied();
 
        let me = Arc::make_mut(&mut self.0);
 
        me.extend(bytes);
 
    }
 
}
 
impl serde::Serialize for Payload {
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
        let inner: &Vec<u8> = &self.0;
 
        inner.serialize(serializer)
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for Payload {
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: Vec<u8> = Vec::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
 
impl std::iter::FromIterator<u8> for Payload {
 
    fn from_iter<I: IntoIterator<Item = u8>>(it: I) -> Self {
 
        Self(Arc::new(it.into_iter().collect()))
 
    }
 
}
 
impl From<Vec<u8>> for Payload {
 
    fn from(s: Vec<u8>) -> Self {
 
        Self(s.into())
 
    }
 
}
 
impl Debug for PortId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "ptID({}'{})", self.0.connector_id, self.0.u32_suffix)
 
    }
 
}
 
impl Debug for FiringVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "fvID({}'{})", (self.0).0.connector_id, (self.0).0.u32_suffix)
 
    }
 
}
 
impl Debug for ProtoComponentId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "pcID({}'{})", self.0.connector_id, self.0.u32_suffix)
 
    }
 
}
 
impl std::ops::Not for Polarity {
 
    type Output = Self;
 
    fn not(self) -> Self::Output {
 
        use Polarity::*;
src/runtime/error.rs
Show inline comments
 
use crate::common::*;
 

	
 
#[derive(Debug)]
 
pub enum ConnectError {
 
    BindFailed(SocketAddr),
 
    PollInitFailed,
 
    Timeout,
 
    PollFailed,
 
    AcceptFailed(SocketAddr),
 
    AlreadyConnected,
 
    PortPeerPolarityMismatch(PortId),
 
    EndpointSetupError(SocketAddr, EndpointError),
 
    SetupAlgMisbehavior,
 
}
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum AddComponentError {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(PortId),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(PortId),
 
    WrongPortPolarity { port: PortId, expected_polarity: Polarity },
 
    DuplicateMovedPort(PortId),
 
}
 
////////////////////////
 
#[derive(Debug, Clone)]
 
pub enum SyncError {
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
    IndistinguishableBatches([usize; 2]),
 
    RoundFailure,
 
    PollFailed,
 
    BrokenEndpoint(usize),
 
    MalformedStateError(MalformedStateError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum MalformedStateError {
 
    PortCannotPut(PortId),
 
    GetterUnknownFor { putter: PortId },
 
}
 
#[derive(Debug, Clone)]
 
pub enum EndpointError {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
}
 
#[derive(Debug)]
 
pub enum PortOpError {
 
    WrongPolarity,
src/runtime/mod.rs
Show inline comments
 
/// cbindgen:ignore
 
mod communication;
 
/// cbindgen:ignore
 
mod endpoints;
 
pub mod error;
 
/// cbindgen:ignore
 
mod logging;
 
/// cbindgen:ignore
 
mod setup;
 

	
 
#[cfg(feature = "ffi")]
 
pub mod ffi;
 

	
 
#[cfg(test)]
 
mod tests;
 

	
 
use crate::common::*;
 
use error::*;
 

	
 
#[derive(Debug)]
 
pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 
pub trait Logger: Debug {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write;
 
}
 
#[derive(Debug)]
 
pub struct VecLogger(ConnectorId, Vec<u8>);
 
#[derive(Debug)]
 
pub struct DummyLogger;
 
#[derive(Debug)]
 
pub struct FileLogger(ConnectorId, std::fs::File);
 
pub(crate) struct NonsyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    proto_component_id: ProtoComponentId,
 
    port_info: &'a mut PortInfo,
 
    id_manager: &'a mut IdManager,
 
    proto_component_ports: &'a mut HashSet<PortId>,
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>,
 
}
 
pub(crate) struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    predicate: &'a Predicate,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 
#[derive(Debug)]
 
struct RoundOk {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 
#[derive(Default)]
 
struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum ComponentId {
 
    Native,
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum Route {
 
    LocalComponent(ComponentId),
 
    Endpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
enum Decision {
 
    Failure,
 
    Success(Predicate),
 
}
 
@@ -132,49 +137,49 @@ struct EndpointSetup {
 
}
 
#[derive(Debug)]
 
struct EndpointExt {
 
    endpoint: Endpoint,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    polled_undrained: IndexSet<usize>,
 
    polled_undrained: VecSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    endpoint_exts: Vec<EndpointExt>,
 
}
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
 
struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundOk>, SyncError>,
 
}
 
#[derive(Debug)]
 
struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    logger: Box<dyn Logger>,
 
    id_manager: IdManager,
 
@@ -188,60 +193,63 @@ enum ConnectorPhased {
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
struct Predicate {
 
    assigned: BTreeMap<FiringVar, bool>,
 
}
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
////////////////
 
pub fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    // fn insert(&mut self, element: T) -> bool {
 
    //     match self.vec.binary_search(&element) {
 
    //         Ok(_) => false,
 
    //         Err(index) => {
 
    //             self.vec.insert(index, element);
 
    //             true
 
    //         }
 
    //     }
 
    // }
 
    fn insert(&mut self, element: T) -> bool {
 
        match self.vec.binary_search(&element) {
 
            Ok(_) => false,
 
            Err(index) => {
 
                self.vec.insert(index, element);
 
                true
 
            }
 
        }
 
    }
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
    fn pop(&mut self) -> Option<T> {
 
        self.vec.pop()
 
    }
 
}
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
        FiringVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        })
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            connector_id,
 
            port_suffix_stream: Default::default(),
 
            proto_component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_proto_component_id(&mut self) -> ProtoComponentId {
 
        Id {
 
            connector_id: self.connector_id,
 
            u32_suffix: self.proto_component_suffix_stream.next(),
src/runtime/setup.rs
Show inline comments
 
@@ -131,49 +131,49 @@ fn new_endpoint_manager(
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                .map_err(|_| BindFailed(endpoint_setup.sock_addr))?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Accepting(listener)
 
        };
 
        Ok(Todo {
 
            todo_endpoint,
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
            endpoint_setup: endpoint_setup.clone(),
 
        })
 
    }
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    const WAKER_TOKEN: Token = Token(usize::MAX);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(300);
 

	
 
    assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token
 

	
 
    let mut waker_continue_signal: Option<Arc<AtomicBool>> = None;
 
    let mut poll = Poll::new().map_err(|_| PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut polled_undrained = IndexSet::default();
 
    let mut polled_undrained = VecSet::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init_todo(Token(index), *local_port, endpoint_setup, &mut poll)
 
        })
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 

	
 
    // all in connect_failed are NOT registered with Poll
 
    let mut connect_failed: HashSet<usize> = Default::default();
 

	
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?)
 
        } else {
0 comments (0 inline, 0 general)