Changeset - 0a71d0af9edf
[Not reviewed]
0 7 0
Christopher Esterhuyse - 5 years ago 2020-03-03 15:04:11
christopheresterhuyse@gmail.com
histories
7 files changed with 224 insertions and 86 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
///////////////////// PRELUDE /////////////////////
 

	
 
pub use core::{
 
    cmp::Ordering,
 
    fmt::{Debug, Formatter},
 
    hash::{Hash, Hasher},
 
    ops::{Range, RangeFrom},
 
    time::Duration,
 
};
 
pub use indexmap::{IndexMap, IndexSet};
 
pub use maplit::{hashmap, hashset};
 
pub use mio::{
 
    net::{TcpListener, TcpStream},
 
    Event, Evented, Events, Poll, PollOpt, Ready, Token,
 
};
 
pub use std::{
 
    collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
 
    convert::TryInto,
 
    net::SocketAddr,
 
    sync::Arc,
 
    time::Instant,
 
};
 
pub use Polarity::*;
 

	
 
///////////////////// DEFS /////////////////////
 

	
 
pub type Payload = Vec<u8>;
 
pub type ControllerId = u32;
 
pub type ChannelIndex = u32;
 

	
 
/// This is a unique identifier for a channel (i.e., port).
 
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]
 
pub struct ChannelId {
 
    pub(crate) controller_id: ControllerId,
 
    pub(crate) channel_index: ChannelIndex,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]
 
pub enum Polarity {
 
    Putter, // output port (from the perspective of the component)
 
    Getter, // input port (from the perspective of the component)
 
}
 

	
 
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone, Debug)]
 
#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone)]
 
#[repr(C)]
 
pub struct Port(pub usize); // ports are COPY
 
pub type Key = Port;
 

	
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum MainComponentErr {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(Port),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(Port),
 
    WrongPortPolarity { param_index: usize, port: Port },
 
    DuplicateMovedPort(Port),
 
}
 
pub trait ProtocolDescription: Sized {
 
    type S: ComponentState<D = Self>;
 

	
 
    fn parse(pdl: &[u8]) -> Result<Self, String>;
 
    fn component_polarities(&self, identifier: &[u8]) -> Result<Vec<Polarity>, MainComponentErr>;
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Key]) -> Self::S;
 
}
 

	
 
pub trait ComponentState: Sized + Clone {
 
    type D: ProtocolDescription;
 
    fn pre_sync_run<C: MonoContext<D = Self::D, S = Self>>(
 
        &mut self,
 
        runtime_ctx: &mut C,
 
        protocol_description: &Self::D,
 
    ) -> MonoBlocker;
 

	
 
    fn sync_run<C: PolyContext<D = Self::D>>(
 
        &mut self,
 
        runtime_ctx: &mut C,
 
        protocol_description: &Self::D,
 
    ) -> PolyBlocker;
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum MonoBlocker {
 
    Inconsistent,
 
    ComponentExit,
 
    SyncBlockStart,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum PolyBlocker {
 
    Inconsistent,
 
    SyncBlockEnd,
 
    CouldntReadMsg(Key),
 
    CouldntCheckFiring(Key),
 
    PutMsg(Key, Payload),
 
}
 

	
 
pub trait MonoContext {
 
    type D: ProtocolDescription;
 
    type S: ComponentState<D = Self::D>;
 

	
 
    fn new_component(&mut self, moved_keys: HashSet<Key>, init_state: Self::S);
 
    fn new_channel(&mut self) -> [Key; 2];
 
    fn new_random(&mut self) -> u64;
 
}
 
pub trait PolyContext {
 
    type D: ProtocolDescription;
 

	
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool>;
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload>;
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl Debug for Port {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "Port({})", self.0)
 
    }
 
}
 
impl Key {
 
    pub fn from_raw(raw: usize) -> Self {
 
        Self(raw)
 
    }
 
    pub fn to_raw(self) -> usize {
 
        self.0
 
    }
 
    pub fn to_token(self) -> mio::Token {
 
        mio::Token(self.0.try_into().unwrap())
 
    }
 
    pub fn from_token(t: mio::Token) -> Self {
 
        Self(t.0.try_into().unwrap())
 
    }
 
}
src/protocol/eval.rs
Show inline comments
 
@@ -1412,112 +1412,112 @@ impl Store {
 
                let var = var.declaration.unwrap();
 
                // Ensure value is compatible with type of variable
 
                let the_type = h[var].the_type(h);
 
                assert!(value.is_type_compatible(the_type));
 
                // Overwrite mapping
 
                self.map.insert(var, value.clone());
 
                Ok(value)
 
            }
 
            Expression::Indexing(indexing) => {
 
                // Evaluate index expression, which must be some integral type
 
                let index = self.eval(h, ctx, indexing.index)?;
 
                // Mutable reference to the subject
 
                let subject;
 
                match &h[indexing.subject] {
 
                    Expression::Variable(var) => {
 
                        let var = var.declaration.unwrap();
 
                        subject = self.map.get_mut(&var).unwrap();
 
                    }
 
                    _ => unreachable!(),
 
                }
 
                match subject.set(&index, &value) {
 
                    Some(value) => Ok(value),
 
                    None => Err(EvalContinuation::Inconsistent),
 
                }
 
            }
 
            _ => unimplemented!("{:?}", h[lexpr]),
 
        }
 
    }
 
    fn get(&mut self, h: &Heap, ctx: &mut EvalContext, rexpr: ExpressionId) -> EvalResult {
 
        match &h[rexpr] {
 
            Expression::Variable(var) => {
 
                let var = var.declaration.unwrap();
 
                let value = self
 
                    .map
 
                    .get(&var)
 
                    .expect(&format!("Uninitialized variable {:?}", h[h[var].identifier()]));
 
                Ok(value.clone())
 
            }
 
            Expression::Indexing(indexing) => {
 
                // Evaluate index expression, which must be some integral type
 
                let index = self.eval(h, ctx, indexing.index)?;
 
                // Reference to subject
 
                let subject;
 
                match &h[indexing.subject] {
 
                    Expression::Variable(var) => {
 
                        let var = var.declaration.unwrap();
 
                        subject = self.map.get(&var).unwrap();
 
                    }
 
                    _ => unreachable!(),
 
                    q => unreachable!("Reached {:?}", q),
 
                }
 
                match subject.get(&index) {
 
                    Some(value) => Ok(value),
 
                    None => Err(EvalContinuation::Inconsistent),
 
                }
 
            }
 
            Expression::Select(selecting) => {
 
                // Reference to subject
 
                let subject;
 
                match &h[selecting.subject] {
 
                    Expression::Variable(var) => {
 
                        let var = var.declaration.unwrap();
 
                        subject = self.map.get(&var).unwrap();
 
                    }
 
                    _ => unreachable!(),
 
                    q => unreachable!("Reached {:?}", q),
 
                }
 
                match subject.length() {
 
                    Some(value) => Ok(value),
 
                    None => Err(EvalContinuation::Inconsistent),
 
                }
 
            }
 
            _ => unimplemented!("{:?}", h[rexpr]),
 
        }
 
    }
 
    fn eval(&mut self, h: &Heap, ctx: &mut EvalContext, expr: ExpressionId) -> EvalResult {
 
        match &h[expr] {
 
            Expression::Assignment(expr) => {
 
                let value = self.eval(h, ctx, expr.right)?;
 
                match expr.operation {
 
                    AssignmentOperator::Set => {
 
                        self.update(h, ctx, expr.left, value.clone())?;
 
                    }
 
                    AssignmentOperator::Added => {
 
                        let old = self.get(h, ctx, expr.left)?;
 
                        self.update(h, ctx, expr.left, old.plus(&value))?;
 
                    }
 
                    AssignmentOperator::Subtracted => {
 
                        let old = self.get(h, ctx, expr.left)?;
 
                        self.update(h, ctx, expr.left, old.minus(&value))?;
 
                    }
 
                    _ => unimplemented!("{:?}", expr),
 
                }
 
                Ok(value)
 
            }
 
            Expression::Conditional(expr) => {
 
                let test = self.eval(h, ctx, expr.test)?;
 
                if test.as_boolean().0 {
 
                    self.eval(h, ctx, expr.true_expression)
 
                } else {
 
                    self.eval(h, ctx, expr.false_expression)
 
                }
 
            }
 
            Expression::Binary(expr) => {
 
                let left = self.eval(h, ctx, expr.left)?;
 
                let right;
 
                match expr.operation {
 
                    BinaryOperator::LogicalAnd => {
 
                        if left.as_boolean().0 == false {
 
                            return Ok(left);
 
                        }
 
                        right = self.eval(h, ctx, expr.right)?;
 
                        right.as_boolean(); // panics if not a boolean
 
                        return Ok(right);
src/runtime/actors.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{endpoint::*, *};
 

	
 
#[derive(Debug)]
 
#[derive(Debug, Clone)]
 
pub(crate) struct MonoN {
 
    pub ekeys: HashSet<Key>,
 
    pub result: Option<(usize, HashMap<Key, Payload>)>,
 
}
 
#[derive(Debug)]
 
pub(crate) struct PolyN {
 
    pub ekeys: HashSet<Key>,
 
    pub branches: HashMap<Predicate, BranchN>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct BranchN {
 
    pub to_get: HashSet<Key>,
 
    pub gotten: HashMap<Key, Payload>,
 
    pub sync_batch_index: usize,
 
}
 

	
 
#[derive(Debug)]
 
#[derive(Debug, Clone)]
 
pub struct MonoP {
 
    pub state: ProtocolS,
 
    pub ekeys: HashSet<Key>,
 
}
 
#[derive(Debug)]
 
pub(crate) struct PolyP {
 
    pub incomplete: HashMap<Predicate, BranchP>,
 
    pub complete: HashMap<Predicate, BranchP>,
 
    pub ekeys: HashSet<Key>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct BranchP {
 
    pub blocking_on: Option<Key>,
 
    pub outbox: HashMap<Key, Payload>,
 
    pub inbox: HashMap<Key, Payload>,
 
    pub state: ProtocolS,
 
}
 

	
 
//////////////////////////////////////////////////////////////////
 

	
 
impl PolyP {
 
    pub(crate) fn poly_run(
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        let to_run: Vec<_> = self.incomplete.drain().collect();
 
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
 
    }
 

	
 
    pub(crate) fn poly_run_these_branches(
 
        &mut self,
 
        mut m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        mut to_run: Vec<(Predicate, BranchP)>,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        use SyncRunResult as Srr;
 
        log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,);
 
        'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() {
 
            let mut r_ctx = BranchPContext {
 
                m_ctx: m_ctx.reborrow(),
 
                ekeys: &self.ekeys,
 
                predicate: &predicate,
 
                inbox: &branch.inbox,
 
            };
 
            use PolyBlocker as Sb;
 
            let blocker = branch.state.sync_run(&mut r_ctx, protocol_description);
 
            log!(
 
                &mut r_ctx.m_ctx.inner.logger,
 
                "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                r_ctx.m_ctx.my_subtree_id,
 
                &predicate,
 
                &blocker
 
            );
 
            match blocker {
 
                Sb::Inconsistent => {} // DROP
 
                Sb::CouldntReadMsg(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    log!(
 
                        &mut r_ctx.m_ctx.inner.logger,
 
                        "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}",
 
                        r_ctx.m_ctx.my_subtree_id,
 
                        channel_id,
 
                        &branch.inbox,
 
                    );
 
                    if predicate.replace_assignment(channel_id, true) != Some(false) {
 
                        // don't rerun now. Rerun at next `sync_run`
 

	
 
                        log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,);
 
                        branch.blocking_on = Some(ekey);
 
                        self.incomplete.insert(predicate, branch);
 
                    } else {
 
                        log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,);
 
                    }
 
                    // ELSE DROP
 
                }
 
                Sb::CouldntCheckFiring(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    // split the branch!
 
                    let branch_f = branch.clone();
 
                    let mut predicate_f = predicate.clone();
 
                    if predicate_f.replace_assignment(channel_id, false).is_some() {
 
                        panic!("OI HANS QUERY FIRST!");
 
                    }
 
                    assert!(predicate.replace_assignment(channel_id, true).is_none());
 
                    to_run.push((predicate, branch));
 
                    to_run.push((predicate_f, branch_f));
 
                }
 
                Sb::SyncBlockEnd => {
 
                    let ControllerInner { logger, endpoint_exts, .. } = m_ctx.inner;
 
                    log!(
 
                        logger,
 
                        "~ ... ran {:?} reached SyncBlockEnd with pred {:?} ...",
 
                        m_ctx.my_subtree_id,
 
                        &predicate,
 
                    );
 
                    // come up with the predicate for this local solution
 

	
 
                    for ekey in self.ekeys.iter() {
 
                        let channel_id = endpoint_exts.get(*ekey).unwrap().info.channel_id;
 
                        let fired =
 
                            branch.inbox.contains_key(ekey) || branch.outbox.contains_key(ekey);
 
                        match predicate.query(channel_id) {
 
                            Some(true) => {
 
                                if !fired {
 
                                    // This branch should have fired but didn't!
 
                                    log!(
 
                                        logger,
 
                                        "~ ... ... should have fired {:?} and didn't! pruning!",
 
                                        channel_id,
 
                                    );
 
                                    continue 'to_run_loop;
 
                                }
 
                            }
 
                            Some(false) => assert!(!fired),
 
                            None => {
 
@@ -169,259 +171,258 @@ impl PolyP {
 
                            (info.channel_id.controller_id, info.channel_id.channel_index),
 
                        );
 
                        endpoint.send(msg)?;
 
                        to_run.push((predicate, branch));
 
                    }
 
                    // ELSE DROP
 
                }
 
            }
 
        }
 
        // all in self.incomplete most recently returned Blocker::CouldntReadMsg
 
        Ok(if self.incomplete.is_empty() {
 
            if self.complete.is_empty() {
 
                Srr::NoBranches
 
            } else {
 
                Srr::AllBranchesComplete
 
            }
 
        } else {
 
            Srr::BlockingForRecv
 
        })
 
    }
 

	
 
    pub(crate) fn poly_recv_run(
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        ekey: Key,
 
        payload_predicate: Predicate,
 
        payload: Payload,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        // try exact match
 

	
 
        let to_run = if self.complete.contains_key(&payload_predicate) {
 
            // exact match with stopped machine
 

	
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run matched stopped machine exactly! nothing to do here",
 
            );
 
            vec![]
 
        } else if let Some(mut branch) = self.incomplete.remove(&payload_predicate) {
 
            // exact match with running machine
 

	
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run matched running machine exactly! pred is {:?}",
 
                &payload_predicate
 
            );
 
            branch.inbox.insert(ekey, payload);
 
            vec![(payload_predicate, branch)]
 
            if branch.blocking_on == Some(ekey) {
 
                branch.blocking_on = None;
 
                vec![(payload_predicate, branch)]
 
            } else {
 
                vec![]
 
            }
 
        } else {
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run didn't have any exact matches... Let's try feed it to all branches",
 

	
 
            );
 
            let mut incomplete2 = HashMap::<_, _>::default();
 
            let to_run = self
 
                .incomplete
 
                .drain()
 
                .filter_map(|(old_predicate, mut branch)| {
 
                    use CommonSatResult as Csr;
 
                    match old_predicate.common_satisfier(&payload_predicate) {
 
                        Csr::FormerNotLatter | Csr::Equivalent => {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}",
 
                                &old_predicate
 
                            );
 
                            // old_predicate COVERS the assumptions of payload_predicate
 

	
 
                            if let Some(prev_payload) = branch.inbox.get(&ekey) {
 
                                // Incorrect to receive two distinct messages in same branch!
 
                                assert_eq!(prev_payload, &payload);
 
                            }
 
                            branch.inbox.insert(ekey, payload.clone());
 
                            Some((old_predicate, branch))
 
                            if branch.blocking_on == Some(ekey) {
 
                                // run.
 
                                branch.blocking_on = None;
 
                                Some((old_predicate, branch))
 
                            } else {
 
                                // don't bother running. its awaiting something else
 
                                incomplete2.insert(old_predicate, branch);
 
                                None
 
                            }
 
                        }
 
                        Csr::New(new) => {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING",
 
                                &payload_predicate,
 
                                &old_predicate,
 
                                &new,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            if let Some(prev_payload) = payload_branch.inbox.get(&ekey) {
 
                                // Incorrect to receive two distinct messages in same branch!
 
                                assert_eq!(prev_payload, &payload);
 
                            }
 
                            payload_branch.inbox.insert(ekey, payload.clone());
 

	
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            Some((new, payload_branch))
 
                            if payload_branch.blocking_on == Some(ekey) {
 
                                // run the fork
 
                                payload_branch.blocking_on = None;
 
                                Some((new, payload_branch))
 
                            } else {
 
                                // don't bother running. its awaiting something else
 
                                incomplete2.insert(new, payload_branch);
 
                                None
 
                            }
 
                        }
 
                        Csr::LatterNotFormer => {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING",
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            if let Some(prev_payload) = payload_branch.inbox.get(&ekey) {
 
                                // Incorrect to receive two distinct messages in same branch!
 
                                assert_eq!(prev_payload, &payload);
 
                            }
 
                            payload_branch.inbox.insert(ekey, payload.clone());
 

	
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            Some((payload_predicate.clone(), payload_branch))
 
                            incomplete2.insert(old_predicate.clone(), branch);
 
                            if payload_branch.blocking_on == Some(ekey) {
 
                                // run the fork
 
                                payload_branch.blocking_on = None;
 
                                Some((old_predicate, payload_branch))
 
                            } else {
 
                                // don't bother running. its awaiting something else
 
                                incomplete2.insert(old_predicate, payload_branch);
 
                                None
 
                            }
 
                        }
 
                        Csr::Nonexistant => {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}",
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // predicates contradict
 
                            incomplete2.insert(old_predicate, branch);
 
                            None
 
                        }
 
                    }
 
                })
 
                .collect();
 
            std::mem::swap(&mut self.incomplete, &mut incomplete2);
 
            to_run
 
        };
 
        log!(
 
            &mut m_ctx.inner.logger,
 
            "... DONE FEEDING BRANCHES. {} branches to run!",
 
            to_run.len(),
 
        );
 
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
 
    }
 

	
 
    pub(crate) fn become_mono(
 
        mut self,
 
        decision: &Predicate,
 
        table_row: &mut HashMap<Key, Payload>,
 
    ) -> MonoP {
 
        if let Some((_, branch)) = self.complete.drain().find(|(p, _)| decision.satisfies(p)) {
 
            let BranchP { inbox, state, outbox } = branch;
 
            for (key, payload) in inbox.into_iter().chain(outbox.into_iter()) {
 
                table_row.insert(key, payload);
 
            }
 
            self.incomplete.clear();
 
            MonoP { state, ekeys: self.ekeys }
 
        } else {
 
            panic!("No such solution!")
 
        }
 
    pub(crate) fn choose_mono(&self, decision: &Predicate) -> Option<MonoP> {
 
        self.complete
 
            .iter()
 
            .find(|(p, _)| decision.satisfies(p))
 
            .map(|(_, branch)| MonoP { state: branch.state.clone(), ekeys: self.ekeys.clone() })
 
    }
 
}
 

	
 
impl PolyN {
 
    pub fn sync_recv(
 
        &mut self,
 
        ekey: Key,
 
        logger: &mut String,
 
        payload: Payload,
 
        payload_predicate: Predicate,
 
        solution_storage: &mut SolutionStorage,
 
    ) {
 
        let mut branches2: HashMap<_, _> = Default::default();
 
        for (old_predicate, mut branch) in self.branches.drain() {
 
            use CommonSatResult as Csr;
 
            let case = old_predicate.common_satisfier(&payload_predicate);
 
            let mut report_if_solution =
 
                |branch: &BranchN, pred: &Predicate, logger: &mut String| {
 
                    if branch.to_get.is_empty() {
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            logger,
 
                            SubtreeId::PolyN,
 
                            pred.clone(),
 
                        );
 
                    }
 
                };
 
            log!(
 
                logger,
 
                "Feeding msg {:?} {:?} to native branch with pred {:?}. Predicate case {:?}",
 
                &payload_predicate,
 
                &payload,
 
                &old_predicate,
 
                &case
 
            );
 
            match case {
 
                Csr::Nonexistant => { /* skip branch */ }
 
                Csr::FormerNotLatter | Csr::Equivalent => {
 
                    // Feed the message to this branch in-place. no need to modify pred.
 
                    if branch.to_get.remove(&ekey) {
 
                        branch.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&branch, &old_predicate, logger);
 
                    }
 
                }
 
                Csr::LatterNotFormer => {
 
                    // create a new branch with the payload_predicate.
 
                    let mut forked = branch.clone();
 
                    if forked.to_get.remove(&ekey) {
 
                        forked.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&forked, &payload_predicate, logger);
 
                        branches2.insert(payload_predicate.clone(), forked);
 
                    }
 
                }
 
                Csr::New(new) => {
 
                    // create a new branch with the newly-created predicate
 
                    let mut forked = branch.clone();
 
                    if forked.to_get.remove(&ekey) {
 
                        forked.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&forked, &new, logger);
 
                        branches2.insert(new.clone(), forked);
 
                    }
 
                }
 
            }
 
            // unlike PolyP machines, Native branches do not become inconsistent
 
            branches2.insert(old_predicate, branch);
 
        }
 
        log!(
 
            logger,
 
            "Native now has {} branches with predicates: {:?}",
 
            branches2.len(),
 
            branches2.keys().collect::<Vec<_>>()
 
        );
 
        std::mem::swap(&mut branches2, &mut self.branches);
 
    }
 

	
 
    pub fn become_mono(
 
        mut self,
 
        logger: &mut String,
 
        decision: &Predicate,
 
        table_row: &mut HashMap<Key, Payload>,
 
    ) -> MonoN {
 
        log!(
 
            logger,
 
            "decision {:?} with branch preds {:?}",
 
            decision,
 
            self.branches.iter().collect::<Vec<_>>()
 
        );
 
        if let Some((branch_pred, branch)) = self
 
            .branches
 
            .drain()
 
    pub fn choose_mono(&self, decision: &Predicate) -> Option<MonoN> {
 
        self.branches
 
            .iter()
 
            .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p))
 
        {
 
            log!(logger, "decision {:?} mapped to branch {:?}", decision, branch_pred);
 
            let BranchN { gotten, sync_batch_index, .. } = branch;
 
            for (&key, payload) in gotten.iter() {
 
                assert!(table_row.insert(key, payload.clone()).is_none());
 
            }
 
            MonoN { ekeys: self.ekeys, result: Some((sync_batch_index, gotten)) }
 
        } else {
 
            log!(logger, "decision {:?} HAD NO SOLUTION!!?", decision);
 
            panic!("No such solution!")
 
        }
 
            .map(|(_, branch)| {
 
                let BranchN { gotten, sync_batch_index, .. } = branch.clone();
 
                MonoN { ekeys: self.ekeys.clone(), result: Some((sync_batch_index, gotten)) }
 
            })
 
    }
 
}
src/runtime/communication.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    //usable BETWEEN rounds
 
    fn consistent(&self) -> bool {
 
        self.inner.mono_n.is_some()
 
    }
 

	
 
    fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> {
 
        log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 
        let mut table_row = HashMap::<Key, _>::default();
 
        // 1. become_mono for Poly actors
 
        self.inner.mono_n =
 
            self.ephemeral.poly_n.take().map(|poly_n| {
 
                poly_n.become_mono(&mut self.inner.logger, &decision, &mut table_row)
 
            });
 
        self.inner.mono_ps.extend(
 
            self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut table_row)),
 
        );
 
        // let mut table_row = HashMap::<Key, _>::default();
 

	
 
        // convert (Key=>Payload) map to (ChannelId=>Payload) map.
 
        let table_row: HashMap<_, _> = table_row
 
            .into_iter()
 
            .map(|(ekey, msg)| {
 
                let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                (channel_id, msg)
 
        let n_pair = {
 
            let poly_n = self.ephemeral.poly_n.take().unwrap();
 
            let mono_n = poly_n.choose_mono(&decision).unwrap();
 
            (mono_n, poly_n)
 
        };
 
        self.inner.mono_n = Some(n_pair.0.clone());
 

	
 
        let p_pairs = self
 
            .ephemeral
 
            .poly_ps
 
            .drain(..)
 
            .map(|poly_p| {
 
                let mono_p = poly_p.choose_mono(&decision).unwrap();
 
                (mono_p, poly_p)
 
            })
 
            .collect();
 
        // log all firing ports
 
        for (channel_id, payload) in table_row {
 
            log!(&mut self.inner.logger, "VALUE {:?} => Message({:?})", channel_id, payload);
 
        }
 
        // log all silent ports
 
        for channel_id in decision.iter_matching(false) {
 
            log!(&mut self.inner.logger, "VALUE {:?} => *", channel_id);
 
        }
 
            .collect::<Vec<_>>();
 
        self.inner.mono_ps.extend(p_pairs.iter().map(|p_pair| p_pair.0.clone()));
 
        self.round_histories.push(RoundHistory::Consistent(decision.clone(), n_pair, p_pairs));
 
        let announcement =
 
            CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index);
 
        for &child_ekey in self.inner.family.children_ekeys.iter() {
 
            log!(
 
                &mut self.inner.logger,
 
                "Forwarding {:?} to child with ekey {:?}",
 
                &announcement,
 
                child_ekey
 
            );
 
            self.inner
 
                .endpoint_exts
 
                .get_mut(child_ekey)
 
                .expect("eefef")
 
                .endpoint
 
                .send(announcement.clone())?;
 
        }
 
        self.inner.round_index += 1;
 
        self.ephemeral.clear();
 
        Ok(())
 
    }
 

	
 
    // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
    fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncErr> {
 
        if let Some(parent_ekey) = self.inner.family.parent_ekey {
 
            // I have a parent -> I'm not the leader
 
            let parent_endpoint =
 
                &mut self.inner.endpoint_exts.get_mut(parent_ekey).expect("huu").endpoint;
 
            for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
                let msg =
 
                    CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
                log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_ekey);
 
                parent_endpoint.send(msg)?;
 
            }
 
            Ok(false)
 
        } else {
 
            // I have no parent -> I'm the leader
 
            assert!(self.inner.family.parent_ekey.is_none());
 
            let maybe_decision = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
            Ok(if let Some(decision) = maybe_decision {
 
                log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
                self.end_round_with_decision(decision)?;
 
                true
 
            } else {
 
                false
 
            })
 
        }
 
    }
 

	
 
@@ -94,105 +93,134 @@ impl Controller {
 

	
 
            // assign TRUE for puts and gets
 
            let true_ekeys = puts.keys().chain(gets.iter()).copied();
 
            let true_channel_ids = true_ekeys.clone().map(ekey_to_channel_id);
 
            predicate.batch_assign_nones(true_channel_ids, true);
 

	
 
            // assign FALSE for all in interface not assigned true
 
            predicate.batch_assign_nones(all_channel_ids.clone(), false);
 

	
 
            if branches.contains_key(&predicate) {
 
                // TODO what do I do with redundant predicates?
 
                unimplemented!(
 
                    "Having multiple batches with the same
 
                    predicate requires the support of oracle boolean variables"
 
                )
 
            }
 
            let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
            for (ekey, payload) in puts {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
                    &payload,
 
                    &predicate,
 
                    sync_batch_index,
 
                );
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ekeys, branches })
 
    }
 

	
 
    // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
    // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
    pub fn sync_round(
 
        &mut self,
 
        deadline: Instant,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        // TODO! fuse handle_locals_return_decision and end_round_return_decision
 
        if !self.consistent() {
 
            // was previously inconsistent
 
            return Err(SyncErr::Inconsistent);
 
        }
 
        match self.sync_round_inner(deadline, sync_batches) {
 
            Ok(()) => Ok(()),
 
            Err(e) => {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "/\\/\\/\\/\\/\\/ Sync round failed! Preparing for diagnosis...",
 
                );
 
                let h = RoundHistory::Inconsistent(
 
                    std::mem::take(&mut self.ephemeral.solution_storage),
 
                    self.ephemeral.poly_n.take().unwrap(),
 
                    std::mem::take(&mut self.ephemeral.poly_ps),
 
                );
 
                self.round_histories.push(h);
 
                for (round_index, round) in self.round_histories.iter().enumerate() {
 
                    log!(&mut self.inner.logger, "round {}:{:#?}\n", round_index, round);
 
                }
 

	
 
                Err(e)
 
            }
 
        }
 
    }
 

	
 
    // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
    // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
    fn sync_round_inner(
 
        &mut self,
 
        deadline: Instant,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        assert!(self.ephemeral.is_clear());
 

	
 
        log!(
 
            &mut self.inner.logger,
 
            "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
            self.inner.round_index
 
        );
 

	
 
        // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
        //    Some actors are dropped. some new actors are created.
 
        //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
        log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.inner.mono_ps.len());
 
        self.ephemeral.poly_ps.clear();
 
        // let mut poly_ps: Vec<PolyP> = vec![];
 
        while let Some(mut mono_p) = self.inner.mono_ps.pop() {
 
            let mut m_ctx = MonoPContext {
 
                ekeys: &mut mono_p.ekeys,
 
                inner: &mut self.inner,
 
                // endpoint_exts: &mut self.endpoint_exts,
 
                // mono_ps: &mut self.mono_ps,
 
                // channel_id_stream: &mut self.channel_id_stream,
 
            };
 
            // cross boundary into crate::protocol
 
            let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
            log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
            match blocker {
 
                MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent),
 
                MonoBlocker::ComponentExit => drop(mono_p),
 
                MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()),
 
            }
 
        }
 
        log!(
 
            &mut self.inner.logger,
 
            "Finished running all MonoPs! Have {} PolyPs waiting",
 
            self.ephemeral.poly_ps.len()
 
        );
 

	
 
        // 3. define the mapping from ekey -> actor
 
        //    this is needed during the event loop to determine which actor
 
        //    should receive the incoming message.
 
        //    TODO: store and update this mapping rather than rebuilding it each round.
 
        let ekey_to_holder: HashMap<Key, PolyId> = {
 
            use PolyId::*;
 
            let n = self.inner.mono_n.iter().flat_map(|m| m.ekeys.iter().map(move |&e| (e, N)));
 
            let p = self
 
                .ephemeral
 
                .poly_ps
 
                .iter()
 
@@ -242,101 +270,97 @@ impl Controller {
 
            Some(poly_n)
 
        } else {
 
            log!(&mut self.inner.logger, "NO NATIVE COMPONENT");
 
            None
 
        };
 

	
 
        // 6. Kick off the synchronous round of each protocol actor
 
        //    If just one actor becomes inconsistent now, there can be no solution!
 
        //    TODO distinguish between completed and not completed poly_p's?
 
        log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
        for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() {
 
            let my_subtree_id = SubtreeId::PolyP { index };
 
            let m_ctx = PolyPContext {
 
                my_subtree_id,
 
                inner: &mut self.inner,
 
                solution_storage: &mut self.ephemeral.solution_storage,
 
            };
 
            use SyncRunResult as Srr;
 
            let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?;
 
            log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
            match blocker {
 
                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
            }
 
        }
 
        log!(&mut self.inner.logger, "All Poly machines have been kicked off!");
 

	
 
        // 7. `solution_storage` may have new solutions for this controller
 
        //    handle their discovery. LEADER => announce, otherwise => send to parent
 
        {
 
            let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
            log!(
 
                &mut self.inner.logger,
 
                "Got {} controller-local solutions before a single RECV: {:?}",
 
                peeked.len(),
 
                peeked
 
            );
 
        }
 
        if self.handle_locals_maybe_decide()? {
 
            return Ok(());
 
        }
 

	
 
        // 4. Receive incoming messages until the DECISION is made
 
        log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages");
 
        self.undelay_all();
 
        'recv_loop: loop {
 
            log!(&mut self.inner.logger, "`POLLING`...");
 
            let received = self.recv(deadline)?.ok_or_else(|| {
 
                log!(
 
                    &mut self.inner.logger,
 
                    ":( timing out. Solutions storage in state... {:#?}",
 
                    &self.ephemeral.solution_storage
 
                );
 
                log!(&mut self.inner.logger, ":( timing out");
 
                SyncErr::Timeout
 
            })?;
 
            log!(&mut self.inner.logger, "::: message {:?}...", &received);
 
            let current_content = match received.msg {
 
                Msg::SetupMsg(_) => {
 
                    // This occurs in the event the connector was malformed during connect()
 
                    return Err(SyncErr::UnexpectedSetupMsg);
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index < self.inner.round_index =>
 
                {
 
                    // Old message! Can safely discard
 
                    log!(&mut self.inner.logger, "...and its OLD! :(");
 
                    drop(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index > self.inner.round_index =>
 
                {
 
                    // Message from a next round. Keep for later!
 
                    log!(&mut self.inner.logger, "... DELAY! :(");
 
                    self.delay(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { contents, round_index }) => {
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "... its a round-appropriate CommMsg with key {:?}",
 
                        received.recipient
 
                    );
 
                    assert_eq!(round_index, self.inner.round_index);
 
                    contents
 
                }
 
            };
 
            match current_content {
 
                CommMsgContents::Elaborate { partial_oracle } => {
 
                    // Child controller submitted a subtree solution.
 
                    if !self.inner.family.children_ekeys.contains(&received.recipient) {
 
                        return Err(SyncErr::ElaborateFromNonChild);
 
                    }
 
                    let subtree_id = SubtreeId::ChildController { ekey: received.recipient };
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received elaboration from child for subtree {:?}: {:?}",
 
                        subtree_id,
 
                        &partial_oracle
 
                    );
 
                    self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
@@ -436,96 +460,97 @@ impl Controller {
 
                                        let peeked = self
 
                                            .ephemeral
 
                                            .solution_storage
 
                                            .peek_new_locals()
 
                                            .collect::<Vec<_>>();
 
                                        log!(
 
                                            &mut self.inner.logger,
 
                                            "Got {} new controller-local solutions from RECV: {:?}",
 
                                            peeked.len(),
 
                                            peeked
 
                                        );
 
                                    }
 
                                    if self.handle_locals_maybe_decide()? {
 
                                        return Ok(());
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    };
 
                }
 
            }
 
        }
 
    }
 
}
 
impl ControllerEphemeral {
 
    fn is_clear(&self) -> bool {
 
        self.solution_storage.is_clear()
 
            && self.poly_n.is_none()
 
            && self.poly_ps.is_empty()
 
            && self.ekey_to_holder.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.solution_storage.clear();
 
        self.poly_n.take();
 
        self.poly_ps.clear();
 
        self.ekey_to_holder.clear();
 
    }
 
}
 
impl Into<PolyP> for MonoP {
 
    fn into(self) -> PolyP {
 
        PolyP {
 
            complete: Default::default(),
 
            incomplete: hashmap! {
 
                Predicate::new_trivial() =>
 
                BranchP {
 
                    state: self.state,
 
                    inbox: Default::default(),
 
                    outbox: Default::default(),
 
                    blocking_on: None,
 
                }
 
            },
 
            ekeys: self.ekeys,
 
        }
 
    }
 
}
 

	
 
impl From<EndpointErr> for SyncErr {
 
    fn from(e: EndpointErr) -> SyncErr {
 
        SyncErr::EndpointErr(e)
 
    }
 
}
 

	
 
impl MonoContext for MonoPContext<'_> {
 
    type D = ProtocolD;
 
    type S = ProtocolS;
 
    fn new_component(&mut self, moved_ekeys: HashSet<Key>, init_state: Self::S) {
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_component with ekeys {:?}!",
 
            &moved_ekeys,
 
        );
 
        if moved_ekeys.is_subset(self.ekeys) {
 
            self.ekeys.retain(|x| !moved_ekeys.contains(x));
 
            self.inner.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys });
 
        } else {
 
            panic!("MachineP attempting to move alien ekey!");
 
        }
 
    }
 
    fn new_channel(&mut self) -> [Key; 2] {
 
        let [a, b] = Endpoint::new_memory_pair();
 
        let channel_id = self.inner.channel_id_stream.next();
 

	
 
        let mut clos = |endpoint, polarity| {
 
            let endpoint_ext =
 
                EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint };
 
            let ekey = self.inner.endpoint_exts.alloc(endpoint_ext);
 
            let endpoint = &self.inner.endpoint_exts.get(ekey).unwrap().endpoint;
 
            let token = Key::to_token(ekey);
 
            self.inner
 
                .messenger_state
 
                .poll
 
                .register(endpoint, token, Ready::readable(), PollOpt::edge())
 
                .expect("AAGAGGGGG");
 
            self.ekeys.insert(ekey);
 
            ekey
 
        };
 
        let [kp, kg] = [clos(a, Putter), clos(b, Getter)];
src/runtime/mod.rs
Show inline comments
 
#[cfg(feature = "ffi")]
 
pub mod ffi;
 

	
 
mod actors;
 
pub(crate) mod communication;
 
pub(crate) mod connector;
 
pub(crate) mod endpoint;
 
pub mod errors;
 
pub mod experimental;
 
// pub mod experimental;
 
mod serde;
 
pub(crate) mod setup;
 

	
 
pub(crate) type ProtocolD = crate::protocol::ProtocolDescriptionImpl;
 
pub(crate) type ProtocolS = crate::protocol::ComponentStateImpl;
 

	
 
use crate::common::*;
 
use actors::*;
 
use endpoint::*;
 
use errors::*;
 

	
 
#[derive(Debug, PartialEq)]
 
pub(crate) enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 

	
 
#[derive(Clone, Eq, PartialEq, Hash)]
 
pub(crate) struct Predicate {
 
    pub assigned: BTreeMap<ChannelId, bool>,
 
}
 

	
 
#[derive(Debug, Default)]
 
struct SyncBatch {
 
    puts: HashMap<Key, Payload>,
 
    gets: HashSet<Key>,
 
}
 

	
 
#[derive(Debug)]
 
pub enum Connector {
 
    Unconfigured(Unconfigured),
 
    Configured(Configured),
 
    Connected(Connected), // TODO consider boxing. currently takes up a lot of stack real estate
 
}
 
#[derive(Debug)]
 
pub struct Unconfigured {
 
    pub controller_id: ControllerId,
 
}
 
#[derive(Debug)]
 
pub struct Configured {
 
    controller_id: ControllerId,
 
    polarities: Vec<Polarity>,
 
    bindings: HashMap<usize, PortBinding>,
 
    protocol_description: Arc<ProtocolD>,
 
    main_component: Vec<u8>,
 
}
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_interface: Vec<(Key, Polarity)>,
 
    sync_batches: Vec<SyncBatch>,
 
    controller: Controller,
 
}
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortBinding {
 
    Native,
 
    Active(SocketAddr),
 
    Passive(SocketAddr),
 
}
 

	
 
#[derive(Debug)]
 
struct Arena<T> {
 
    storage: Vec<T>,
 
}
 

	
 
#[derive(Debug)]
 
struct ReceivedMsg {
 
    recipient: Key,
 
    msg: Msg,
 
}
 

	
 
#[derive(Debug)]
 
struct MessengerState {
 
    poll: Poll,
 
    events: Events,
 
    delayed: Vec<ReceivedMsg>,
 
    undelayed: Vec<ReceivedMsg>,
 
    polled_undrained: IndexSet<Key>,
 
}
 
#[derive(Debug)]
 
struct ChannelIdStream {
 
    controller_id: ControllerId,
 
    next_channel_index: ChannelIndex,
 
}
 

	
 
#[derive(Debug)]
 
enum RoundHistory {
 
    Consistent(Predicate, (MonoN, PolyN), Vec<(MonoP, PolyP)>),
 
    Inconsistent(SolutionStorage, PolyN, Vec<PolyP>),
 
}
 

	
 
#[derive(Debug)]
 
struct Controller {
 
    protocol_description: Arc<ProtocolD>,
 
    inner: ControllerInner,
 
    ephemeral: ControllerEphemeral,
 
    round_histories: Vec<RoundHistory>,
 
}
 
#[derive(Debug)]
 
struct ControllerInner {
 
    round_index: usize,
 
    channel_id_stream: ChannelIdStream,
 
    endpoint_exts: Arena<EndpointExt>,
 
    messenger_state: MessengerState,
 
    mono_n: Option<MonoN>,
 
    mono_ps: Vec<MonoP>,
 
    family: ControllerFamily,
 
    logger: String,
 
}
 

	
 
/// This structure has its state entirely reset between synchronous rounds
 
#[derive(Debug, Default)]
 
struct ControllerEphemeral {
 
    solution_storage: SolutionStorage,
 
    poly_n: Option<PolyN>,
 
    poly_ps: Vec<PolyP>,
 
    ekey_to_holder: HashMap<Key, PolyId>,
 
}
 

	
 
#[derive(Debug)]
 
struct ControllerFamily {
 
    parent_ekey: Option<Key>,
 
    children_ekeys: Vec<Key>,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncRunResult {
 
    BlockingForRecv,
 
    AllBranchesComplete,
 
    NoBranches,
 
}
 

	
 
// Used to identify poly actors
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
enum PolyId {
 
    N,
 
    P { index: usize },
 
}
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
pub(crate) enum SubtreeId {
 
    PolyN,
 
    PolyP { index: usize },
 
    ChildController { ekey: Key },
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -82,97 +82,102 @@ impl Controller {
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
                PortBinding::Active(addr) => {
 
                    let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting {
 
                        addr,
 
                        polarity,
 
                        stream: TcpStream::connect(&addr).unwrap(),
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
            }
 
        }
 
        log!(&mut logger, "{:03?} setup todos...", major);
 

	
 
        // 2. convert the arena to Arena<EndpointExt>  and return the
 
        let (mut messenger_state, mut endpoint_exts) =
 
            Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?;
 

	
 
        let n_mono = Some(MonoN { ekeys: ekeys_native.into_iter().collect(), result: None });
 
        let p_monos = vec![MonoP {
 
            state: protocol_description.new_main_component(main_component, &ekeys_proto),
 
            ekeys: ekeys_proto.into_iter().collect(),
 
        }];
 

	
 
        // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS}
 
        let family = Self::setup_sink_tree_family(
 
            major,
 
            &mut logger,
 
            &mut endpoint_exts,
 
            &mut messenger_state,
 
            ekeys_network,
 
            deadline,
 
        )?;
 

	
 
        log!(&mut logger, "CONNECT PHASE END! ~");
 
        let inner = ControllerInner {
 
            family,
 
            messenger_state,
 
            channel_id_stream,
 
            endpoint_exts,
 
            mono_ps: p_monos,
 
            mono_n: n_mono,
 
            round_index: 0,
 
            logger,
 
        };
 
        let controller = Self { protocol_description, inner, ephemeral: Default::default() };
 
        let controller = Self {
 
            protocol_description,
 
            inner,
 
            ephemeral: Default::default(),
 
            round_histories: vec![],
 
        };
 
        Ok((controller, native_interface))
 
    }
 

	
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
        use std::io::Write;
 
        stream.write(&[]).is_ok()
 
    }
 

	
 
    // inserts
 
    fn finish_endpoint_ext_todos(
 
        major: ControllerId,
 
        logger: &mut String,
 
        mut endpoint_ext_todos: Arena<EndpointExtTodo>,
 
        deadline: Instant,
 
    ) -> Result<(MessengerState, Arena<EndpointExt>), ConnectErr> {
 
        use {ConnectErr::*, EndpointExtTodo::*};
 

	
 
        // 1. define and setup a poller and event loop
 
        let edge = PollOpt::edge();
 
        let [ready_r, ready_w] = [Ready::readable(), Ready::writable()];
 
        let mut ms = MessengerState {
 
            poll: Poll::new().map_err(|_| PollInitFailed)?,
 
            events: Events::with_capacity(endpoint_ext_todos.len()),
 
            delayed: vec![],
 
            undelayed: vec![],
 
            polled_undrained: Default::default(),
 
        };
 

	
 
        // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener}
 
        // 3. store the keyset of EndpointExtTodos which are not Finished in `to_finish`.
 
        let mut to_finish = HashSet::<_>::default();
 
        log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len());
 
        for (key, t) in endpoint_ext_todos.iter() {
 
            let token = key.to_token();
 
            match t {
 
                ActiveRecving { .. } | PassiveConnecting { .. } => unreachable!(),
 
                Finished(EndpointExt { endpoint, .. }) => {
 
                    ms.poll.register(endpoint, token, ready_r, edge)
 
                }
 
                ActiveConnecting { stream, .. } => {
 
                    to_finish.insert(key);
 
                    ms.poll.register(stream, token, ready_w, edge)
 
                }
 
                PassiveAccepting { listener, .. } => {
 
                    to_finish.insert(key);
 
                    ms.poll.register(listener, token, ready_r, edge)
 
                }
 
            }
src/test/connector.rs
Show inline comments
 
@@ -34,96 +34,117 @@ composite sync_2(in i, out o) {
 
    new sync(y, o);
 
}
 
primitive exchange(in ai, out ao, in bi, out bo) {
 
    // Note the implicit causal relationship
 
    while(true) synchronous {
 
        if(fires(ai)) {
 
            put(bo, get(ai));
 
            put(ao, get(bi));
 
        }
 
    }
 
}
 
primitive filter(in i, out ok, out err) {
 
    while(true) synchronous {
 
        if (fires(i)) {
 
            msg m = get(i);
 
            if(m.length > 0) {
 
                put(ok, m);
 
            } else {
 
                put(err, m);
 
            } 
 
        }
 
    }
 
}
 
primitive token_spout(out o) {
 
    while(true) synchronous {
 
        put(o, create(0));
 
    }
 
}
 
primitive wait_n(int to_wait, out o) {
 
    while(to_wait > 0) synchronous() to_wait -= 1;
 
    synchronous { put(o, create(0)); }
 
}
 
composite wait_10(out o) {
 
    new wait_n(10, o);
 
}
 
primitive fifo_1(msg m, in i, out o) {
 
    while(true) synchronous {
 
        if (m == null && fires(i)) {
 
            m = get(i);
 
        } else if (m != null && fires(o)) {
 
            put(o, m);
 
            m = null;
 
        }
 
    }
 
}
 
composite fifo_1_e(in i, out o) {
 
    new fifo_1(null, i, o);
 
}
 
primitive samelen(in a, in b, out c) {
 
    synchronous {
 
        msg m = get(a);
 
        msg n = get(b);
 
        assert(m.length == n.length);
 
        put(c, m);
 
    }
 
}
 
primitive repl2(in a, out b, out c) {
 
    synchronous {
 
        msg m = get(a);
 
        put(b, m);
 
        put(c, m);
 
    }
 
}
 
composite samelen_repl(in a, out b) {
 
    channel c -> d;   
 
    channel e -> f;
 
    new samelen(a, f, c);
 
    new repl2(d, b, e);
 
}
 
";
 

	
 
#[test]
 
fn connector_connects_ok() {
 
    // Test if we can connect natives using the given PDL
 
    /*
 
    Alice -->silence--P|A-->silence--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    assert!(run_connector_set(&[
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_connected_but_silent_natives() {
 
    // Test if we can connect natives and have a trivial sync round
 
    /*
 
    Alice -->silence--P|A-->silence--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    assert!(run_connector_set(&[
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"blocked").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(Ok(0), x.sync(timeout));
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"blocked").unwrap();
 
@@ -635,48 +656,122 @@ fn connector_routing_filter() {
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                // empty batch
 
                x.next_batch().unwrap();
 

	
 
                // got a message
 
                x.get(0).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    1 => assert_ne!(Ok(&[] as &[u8]), x.read_gotten(0)),
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
fn connector_fifo_1_e() {
 
    /*
 
        /-->\
 
    Alice   fifo_1
 
        \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    const N: usize = 10;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"fifo_1_e").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // put
 
                assert_eq!(Ok(()), x.put(0, b"message~".to_vec()));
 
                assert_eq!(Ok(0), x.sync(timeout));
 

	
 
                // get
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"message~" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
#[should_panic]
 
fn connector_causal_loop() {
 
    /*
 
        /-->\      /-->P|A-->\      /-->\
 
    Alice   exchange         exchange   Bob
 
        \<--/      \<--P|A<--/      \<--/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap(); // peer out
 
            x.bind_port(1, Passive(addrs[1])).unwrap(); // peer in
 
            x.bind_port(2, Native).unwrap(); // native in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"A->B".to_vec()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
        &|x| {
 
            // Bob
 
            x.configure(PDL, b"exchange").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap(); // peer out
 
            x.bind_port(1, Active(addrs[0])).unwrap(); // peer in
 
            x.bind_port(2, Native).unwrap(); // native in
 
            x.bind_port(3, Native).unwrap(); // native out
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"B->A".to_vec()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
                assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(1));
 
            }
 
        },
 
    ]));
 
}
 

	
 
#[test]
 
#[should_panic]
 
fn connector_causal_loop2() {
 
    /*
 
        /-->\     /<---\
 
    Alice   samelen-->repl
 
        \<-------------/
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    // let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    assert!(run_connector_set(&[
 
        //
 
        &|x| {
 
            // Alice
 
            x.configure(PDL, b"samelen_repl").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            for _ in 0..N {
 
                assert_eq!(Ok(()), x.put(0, b"foo".to_vec()));
 
                assert_eq!(Ok(()), x.get(1));
 
                assert_eq!(Ok(0), x.sync(timeout));
 
            }
 
        },
 
    ]));
 
}
0 comments (0 inline, 0 general)