Changeset - 5c858bc28930
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-05-28 12:35:05
christopher.esterhuyse@gmail.com
incremental setup procedure in progress
2 files changed with 95 insertions and 0 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
///////////////////// PRELUDE /////////////////////
 

	
 
pub use core::{
 
    cmp::Ordering,
 
    fmt::{Debug, Formatter},
 
    hash::{Hash, Hasher},
 
    ops::{Range, RangeFrom},
 
    time::Duration,
 
};
 
pub use indexmap::{IndexMap, IndexSet};
 
pub use maplit::{hashmap, hashset};
 
pub use mio::{
 
    net::{TcpListener, TcpStream},
 
    Event, Evented, Events, Poll, PollOpt, Ready, Token,
 
};
 
pub use std::{
 
    collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
 
    convert::TryInto,
 
    net::SocketAddr,
 
    sync::Arc,
 
    time::Instant,
 
};
 
pub use Polarity::*;
 

	
 
///////////////////// DEFS /////////////////////
 

	
 
pub type ControllerId = u32;
 
pub type ChannelIndex = u32;
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd)]
 
pub struct PortId {
 
    pub(crate) controller_id: ControllerId,
 
    pub(crate) port_index: u32,
 
}
 

	
 
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
pub struct Payload(Arc<Vec<u8>>);
 

	
 
/// This is a unique identifier for a channel (i.e., port).
 
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]
 
pub struct ChannelId {
 
    pub(crate) controller_id: ControllerId,
 
    pub(crate) channel_index: ChannelIndex,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]
 
pub enum Polarity {
 
    Putter, // output port (from the perspective of the component)
 
    Getter, // input port (from the perspective of the component)
 
}
 

	
 
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone)]
 
#[repr(C)]
 
pub struct Port(pub usize); // ports are COPY
 

	
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum MainComponentErr {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(Port),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(Port),
 
    WrongPortPolarity { param_index: usize, port: Port },
 
    DuplicateMovedPort(Port),
 
}
 
pub trait ProtocolDescription: Sized {
 
    type S: ComponentState<D = Self>;
 

	
 
    fn parse(pdl: &[u8]) -> Result<Self, String>;
 
    fn component_polarities(&self, identifier: &[u8]) -> Result<Vec<Polarity>, MainComponentErr>;
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Port]) -> Self::S;
 
}
 

	
 
pub trait ComponentState: Sized + Clone {
 
    type D: ProtocolDescription;
 
    fn pre_sync_run<C: MonoContext<D = Self::D, S = Self>>(
 
        &mut self,
 
        runtime_ctx: &mut C,
 
        protocol_description: &Self::D,
 
    ) -> MonoBlocker;
 

	
 
    fn sync_run<C: PolyContext<D = Self::D>>(
 
        &mut self,
 
        runtime_ctx: &mut C,
 
        protocol_description: &Self::D,
 
    ) -> PolyBlocker;
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum MonoBlocker {
 
    Inconsistent,
 
    ComponentExit,
 
    SyncBlockStart,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum PolyBlocker {
 
    Inconsistent,
 
    SyncBlockEnd,
 
    CouldntReadMsg(Port),
 
    CouldntCheckFiring(Port),
 
    PutMsg(Port, Payload),
 
}
 

	
 
pub trait MonoContext {
 
    type D: ProtocolDescription;
 
    type S: ComponentState<D = Self::D>;
 

	
 
    fn new_component(&mut self, moved_ports: HashSet<Port>, init_state: Self::S);
 
    fn new_channel(&mut self) -> [Port; 2];
 
    fn new_random(&mut self) -> u64;
 
}
 
pub trait PolyContext {
 
    type D: ProtocolDescription;
 

	
 
    fn is_firing(&mut self, port: Port) -> Option<bool>;
 
    fn read_msg(&mut self, port: Port) -> Option<&Payload>;
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl Payload {
 
    pub fn new(len: usize) -> Payload {
 
        let mut v = Vec::with_capacity(len);
 
        unsafe {
 
            v.set_len(len);
 
        }
 
        Payload(Arc::new(v))
 
    }
 
    pub fn len(&self) -> usize {
 
        self.0.len()
 
    }
 
    pub fn as_slice(&self) -> &[u8] {
 
        &self.0
 
    }
 
    pub fn as_mut_slice(&mut self) -> &mut [u8] {
 
        Arc::make_mut(&mut self.0) as _
 
    }
 
    pub fn concat_with(&mut self, other: &Self) {
 
        let bytes = other.as_slice().iter().copied();
 
        let me = Arc::make_mut(&mut self.0);
 
        me.extend(bytes);
 
    }
 
}
 
impl std::iter::FromIterator<u8> for Payload {
 
    fn from_iter<I: IntoIterator<Item = u8>>(it: I) -> Self {
 
        Self(Arc::new(it.into_iter().collect()))
 
    }
 
}
 
impl From<Vec<u8>> for Payload {
 
    fn from(s: Vec<u8>) -> Self {
 
        Self(s.into())
 
    }
 
}
 
impl Debug for Port {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "Port({})", self.0)
 
    }
 
}
 
impl Port {
 
    pub fn from_raw(raw: usize) -> Self {
 
        Self(raw)
 
    }
 
    pub fn to_raw(self) -> usize {
 
        self.0
 
    }
 
    pub fn to_token(self) -> mio::Token {
 
        mio::Token(self.0.try_into().unwrap())
 
    }
 
    pub fn from_token(t: mio::Token) -> Self {
 
        Self(t.0.try_into().unwrap())
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
#[cfg(feature = "ffi")]
 
pub mod ffi;
 

	
 
mod actors;
 
pub(crate) mod communication;
 
pub(crate) mod connector;
 
pub(crate) mod endpoint;
 
pub mod errors;
 
// pub mod experimental;
 
mod serde;
 
pub(crate) mod setup;
 

	
 
pub(crate) type ProtocolD = crate::protocol::ProtocolDescriptionImpl;
 
pub(crate) type ProtocolS = crate::protocol::ComponentStateImpl;
 

	
 
use crate::common::*;
 
use actors::*;
 
use endpoint::*;
 
use errors::*;
 

	
 
#[derive(Debug, PartialEq)]
 
pub(crate) enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 

	
 
#[derive(Clone, Eq, PartialEq, Hash)]
 
pub(crate) struct Predicate {
 
    pub assigned: BTreeMap<ChannelId, bool>,
 
}
 

	
 
#[derive(Debug, Default)]
 
struct SyncBatch {
 
    puts: HashMap<Port, Payload>,
 
    gets: HashSet<Port>,
 
}
 

	
 
/////////////////////
 
pub(crate) struct ProtoComponentTodo {
 
    name: Vec<u8>,
 
    ports: Vec<PortId>,
 
}
 
pub(crate) struct EndpointId(usize);
 
pub(crate) enum EndpointPolarity {
 
    Accept,
 
    Connect,
 
}
 
pub(crate) enum RecvRoute {
 
    ThroughEndpoint { endpoint_id: EndpointId },
 
    NativeComponent,
 
    ProtocolComponent { component_idx: usize },
 
}
 
pub(crate) struct PortLogical {
 
    peer: PortId,
 
    polarity: Polarity,
 
}
 
pub(crate) struct Configured2 {
 
    controller_id: ControllerId,
 
    next_port_index: u32,
 
    protocol_description: Arc<ProtocolD>,
 
    net_ports_todo: HashMap<PortId, TodoEndpoint>,
 
    port_logical: HashMap<PortId, PortLogical>,
 
    port_recv_route: HashMap<PortId, RecvRoute>,
 
    proto_component_todos: Vec<ProtoComponentTodo>,
 
}
 
pub(crate) struct TodoEndpoint {
 
    port_polarity: Polarity,
 
    endpoint_polarity: EndpointPolarity,
 
    addr: SocketAddr,
 
}
 
impl Configured2 {
 
    fn generate_port(&mut self) -> PortId {
 
        PortId {
 
            controller_id: self.controller_id,
 
            port_index: {
 
                // ensure we don't overflow
 
                assert_ne!(self.next_port_index, std::u32::MAX);
 
                self.next_port_index += 1;
 
                self.next_port_index - 1
 
            },
 
        }
 
    }
 
    fn add_net_port(&mut self, todo_endpoint: TodoEndpoint) -> PortId {
 
        let port = self.generate_port();
 
        self.net_ports_todo.insert(port, todo_endpoint); // Peer unknown!
 
        self.port_recv_route.insert(port, RecvRoute::NativeComponent);
 
        port
 
    }
 
    fn add_port_pair(&mut self) -> [PortId; 2] {
 
        let [a, b] = [self.generate_port(), self.generate_port()];
 
        self.port_logical.insert(a, PortLogical { peer: b, polarity: Polarity::Putter });
 
        self.port_logical.insert(b, PortLogical { peer: a, polarity: Polarity::Putter });
 
        self.port_recv_route.insert(a, RecvRoute::NativeComponent);
 
        self.port_recv_route.insert(b, RecvRoute::NativeComponent);
 
        [a, b]
 
    }
 
    fn add_component(&mut self, name: Vec<u8>, ports: &[PortId]) -> Result<(), PortId> {
 
        // 1. check that they all route to the native
 
        for port in ports {
 
            match self.port_recv_route.get(port) {
 
                Some(RecvRoute::NativeComponent) => { /* do nothing */ }
 
                _ => return Err(*port),
 
            }
 
        }
 
        // 2. create a new component todo
 
        self.proto_component_todos.push(ProtoComponentTodo { name, ports: ports.to_vec() });
 
        let component_idx = self.proto_component_todos.len();
 
        // 3. overwrite route mappings, route to this component
 
        for &port in ports {
 
            self.port_recv_route.insert(port, RecvRoute::ProtocolComponent { component_idx });
 
        }
 
        Ok(())
 
    }
 
    fn connect(&mut self) -> Result<(), ConnectErr2> {
 
        // 1. bind all acceptors
 
        // 2. connect all acceptors and connectors and send my (PortId, Polarity).
 
        // 3. finish populating port_logical mappings
 
        // 4. echo alg. with extinction to build neighborhood
 
        Ok(())
 
    }
 
}
 
enum ConnectErr2 {
 
    AcceptBindErr(PortId),
 
}
 
/////////////////////////////////
 

	
 
#[derive(Debug)]
 
pub enum Connector {
 
    Unconfigured(Unconfigured),
 
    Configured(Configured),
 
    Connected(Connected), // TODO consider boxing. currently takes up a lot of stack real estate
 
}
 
#[derive(Debug)]
 
pub struct Unconfigured {
 
    pub controller_id: ControllerId,
 
}
 
#[derive(Debug)]
 
pub struct Configured {
 
    controller_id: ControllerId,
 
    polarities: Vec<Polarity>,
 
    bindings: HashMap<usize, PortBinding>,
 
    protocol_description: Arc<ProtocolD>,
 
    main_component: Vec<u8>,
 
    logger: String,
 
}
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_interface: Vec<(Port, Polarity)>,
 
    sync_batches: Vec<SyncBatch>,
 
    controller: Controller,
 
}
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortBinding {
 
    Native,
 
    Active(SocketAddr),
 
    Passive(SocketAddr),
 
}
 

	
 
#[derive(Debug)]
 
struct Arena<T> {
 
    storage: Vec<T>,
 
}
 

	
 
#[derive(Debug)]
 
struct ReceivedMsg {
 
    recipient: Port,
 
    msg: Msg,
 
}
 

	
 
#[derive(Debug)]
 
struct MessengerState {
 
    poll: Poll,
 
    events: Events,
 
    delayed: Vec<ReceivedMsg>,
 
    undelayed: Vec<ReceivedMsg>,
 
    polled_undrained: IndexSet<Port>,
 
}
 
#[derive(Debug)]
 
struct ChannelIdStream {
 
    controller_id: ControllerId,
 
    next_channel_index: ChannelIndex,
 
}
 

	
 
#[derive(Debug)]
 
struct Controller {
 
    protocol_description: Arc<ProtocolD>,
 
    inner: ControllerInner,
 
    ephemeral: ControllerEphemeral,
 
    unrecoverable_error: Option<SyncErr>, // prevents future calls to Sync
 
}
 
#[derive(Debug)]
 
struct ControllerInner {
 
    round_index: usize,
 
    channel_id_stream: ChannelIdStream,
 
    endpoint_exts: Arena<EndpointExt>,
 
    messenger_state: MessengerState,
 
    mono_n: MonoN,       // state at next round start
 
    mono_ps: Vec<MonoP>, // state at next round start
 
    family: ControllerFamily,
 
    logger: String,
 
}
 

	
 
/// This structure has its state entirely reset between synchronous rounds
 
#[derive(Debug, Default)]
 
struct ControllerEphemeral {
 
    solution_storage: SolutionStorage,
 
    poly_n: Option<PolyN>,
 
    poly_ps: Vec<PolyP>,
 
    mono_ps: Vec<MonoP>,
 
    port_to_holder: HashMap<Port, PolyId>,
 
}
 

	
 
#[derive(Debug)]
 
struct ControllerFamily {
 
    parent_port: Option<Port>,
 
    children_ports: Vec<Port>,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncRunResult {
 
    BlockingForRecv,
 
    AllBranchesComplete,
 
    NoBranches,
 
}
 

	
 
// Used to identify poly actors
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
enum PolyId {
 
    N,
 
    P { index: usize },
 
}
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
pub(crate) enum SubtreeId {
 
    PolyN,
 
    PolyP { index: usize },
 
    ChildController { port: Port },
 
}
 

	
 
pub(crate) struct MonoPContext<'a> {
 
    inner: &'a mut ControllerInner,
 
    ports: &'a mut HashSet<Port>,
 
    mono_ps: &'a mut Vec<MonoP>,
 
}
 
pub(crate) struct PolyPContext<'a> {
 
    my_subtree_id: SubtreeId,
 
    inner: &'a mut ControllerInner,
 
    solution_storage: &'a mut SolutionStorage,
 
}
 
impl PolyPContext<'_> {
 
    #[inline(always)]
 
    fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> {
 
        let Self { solution_storage, my_subtree_id, inner } = self;
 
        PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner }
 
    }
 
}
 
struct BranchPContext<'m, 'r> {
 
    m_ctx: PolyPContext<'m>,
 
    ports: &'r HashSet<Port>,
 
    predicate: &'r Predicate,
 
    inbox: &'r HashMap<Port, Payload>,
 
}
 

	
 
#[derive(Default)]
 
pub(crate) struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 

	
 
trait Messengerlike {
 
    fn get_state_mut(&mut self) -> &mut MessengerState;
 
    fn get_endpoint_mut(&mut self, eport: Port) -> &mut Endpoint;
 

	
 
    fn delay(&mut self, received: ReceivedMsg) {
 
        self.get_state_mut().delayed.push(received);
 
    }
 
    fn undelay_all(&mut self) {
 
        let MessengerState { delayed, undelayed, .. } = self.get_state_mut();
 
        undelayed.extend(delayed.drain(..))
 
    }
 

	
 
    fn send(&mut self, to: Port, msg: Msg) -> Result<(), EndpointErr> {
 
        self.get_endpoint_mut(to).send(msg)
 
    }
 

	
 
    // attempt to receive a message from one of the endpoints before the deadline
 
    fn recv(&mut self, deadline: Instant) -> Result<Option<ReceivedMsg>, MessengerRecvErr> {
 
        // try get something buffered
 
        if let Some(x) = self.get_state_mut().undelayed.pop() {
 
            return Ok(Some(x));
 
        }
 

	
 
        loop {
 
            // polled_undrained may not be empty
 
            while let Some(eport) = self.get_state_mut().polled_undrained.pop() {
 
                if let Some(msg) = self
 
                    .get_endpoint_mut(eport)
 
                    .recv()
 
                    .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))?
 
                {
 
                    // this endpoint MAY still have messages! check again in future
 
                    self.get_state_mut().polled_undrained.insert(eport);
 
                    return Ok(Some(ReceivedMsg { recipient: eport, msg }));
 
                }
 
            }
 

	
 
            let state = self.get_state_mut();
 
            match state.poll_events(deadline) {
 
                Ok(()) => {
 
                    for e in state.events.iter() {
 
                        state.polled_undrained.insert(Port::from_token(e.token()));
 
                    }
 
                }
 
                Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed),
0 comments (0 inline, 0 general)