Changeset - 0fb83f27a238
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-02-05 18:07:44
christopheresterhuyse@gmail.com
fixed natives sometimes choosing incomplete branches as solutions
6 files changed with 31 insertions and 20 deletions:
0 comments (0 inline, 0 general)
src/protocol/mod.rs
Show inline comments
 
mod ast;
 
mod eval;
 
pub mod inputsource;
 
mod lexer;
 
mod library;
 
mod parser;
 

	
 
use crate::common::*;
 
use crate::protocol::ast::*;
 
use crate::protocol::eval::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 
use std::hint::unreachable_unchecked;
 

	
 
pub struct ProtocolDescriptionImpl {
 
    heap: Heap,
 
    source: InputSource,
 
    root: RootId
 
    root: RootId,
 
}
 

	
 
impl std::fmt::Debug for ProtocolDescriptionImpl {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "Protocol")
 
    }
 
}
 

	
 
impl ProtocolDescription for ProtocolDescriptionImpl {
 
    type S = ComponentStateImpl;
 

	
 
    fn parse(buffer: &[u8]) -> Result<Self, String> {
 
        let mut heap = Heap::new();
 
        let mut source = InputSource::from_buffer(buffer).unwrap();
 
        let mut parser = Parser::new(&mut source);
 
        match parser.parse(&mut heap) {
 
            Ok(root) => {
 
                return Ok(ProtocolDescriptionImpl { heap, source, root });
 
            }
 
            Err(err) => {
 
                let mut vec: Vec<u8> = Vec::new();
 
                err.write(&source, &mut vec).unwrap();
 
                Err(String::from_utf8_lossy(&vec).to_string())
 
            }
 
        }
 
    }
 
    fn component_polarities(&self, identifier: &[u8]) -> Result<Vec<Polarity>, MainComponentErr> {
 
        let h = &self.heap;
 
        let root = &h[self.root];
 
        let def = root.get_definition_ident(h, identifier);
 
        if def.is_none() {
 
            return Err(MainComponentErr::NoSuchComponent);
 
        }
 
        let def = &h[def.unwrap()];
 
        if !def.is_component() {
 
            return Err(MainComponentErr::NoSuchComponent);
 
        }
 
        for &param in def.parameters().iter() {
 
            let param = &h[param];
 
            let type_annot = &h[param.type_annotation];
 
            if type_annot.the_type.array {
 
                return Err(MainComponentErr::NonPortTypeParameters);
 
            }
 
            match type_annot.the_type.primitive {
 
                PrimitiveType::Input | PrimitiveType::Output => continue,
 
                _ => {
 
                    return Err(MainComponentErr::NonPortTypeParameters);
 
                }
 
            }
 
        }
 
        let mut result = Vec::new();
 
        for &param in def.parameters().iter() {
 
            let param = &h[param];
 
            let type_annot = &h[param.type_annotation];
 
            let ptype = &type_annot.the_type.primitive;
 
            if ptype == &PrimitiveType::Input {
 
                result.push(Polarity::Getter)
 
            } else if ptype == &PrimitiveType::Output {
 
                result.push(Polarity::Putter)
 
            } else {
 
                unreachable!()
 
            }
 
        }
 
        Ok(result)
 
    }
 
    fn new_main_component(&self, identifier: &[u8], ports: &[Key]) -> ComponentStateImpl {
 
        let mut args = Vec::new();
 
        for (&x, y) in ports.iter().zip(self.component_polarities(identifier).unwrap()) {
 
            match y {
 
                Polarity::Getter => args.push(Value::Input(InputValue(x))),
 
                Polarity::Putter => args.push(Value::Output(OutputValue(x)))
 
                Polarity::Putter => args.push(Value::Output(OutputValue(x))),
 
            }
 
        }
 
        let h = &self.heap;
 
        let root = &h[self.root];
 
        let def = root.get_definition_ident(h, identifier).unwrap();
 
        ComponentStateImpl {
 
            prompt: Prompt::new(h, def, &args)
 
        }
 
        ComponentStateImpl { prompt: Prompt::new(h, def, &args) }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ComponentStateImpl {
 
    prompt: Prompt,
 
}
 
impl ComponentState for ComponentStateImpl {
 
    type D = ProtocolDescriptionImpl;
 

	
 
    fn pre_sync_run<C: MonoContext<D = ProtocolDescriptionImpl, S = Self>>(
 
        &mut self,
 
        context: &mut C,
 
        pd: &ProtocolDescriptionImpl,
 
    ) -> MonoBlocker {
 
        let mut context = EvalContext::Mono(context);
 
        loop {
 
            let result = self.prompt.step(&pd.heap, &mut context);
 
            match result {
 
                // In component definitions, there are no return statements
 
                Ok(_) => unreachable!(),
 
                Err(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return MonoBlocker::Inconsistent,
 
                    EvalContinuation::Terminal => return MonoBlocker::ComponentExit,
 
                    EvalContinuation::SyncBlockStart => return MonoBlocker::SyncBlockStart,
 
                    // Not possible to end sync block if never entered one
 
                    EvalContinuation::SyncBlockEnd => unreachable!(),
 
                    EvalContinuation::NewComponent(args) => {
 
                        todo!();
 
                        continue;
 
                    }
 
                    // Outside synchronous blocks, no fires/get/put happens
 
                    EvalContinuation::BlockFires(val) => unreachable!(),
 
                    EvalContinuation::BlockGet(val) => unreachable!(),
 
                    EvalContinuation::Put(port, msg) => unreachable!(),
 
                },
 
            }
 
        }
 
    }
 

	
 
    fn sync_run<C: PolyContext<D = ProtocolDescriptionImpl>>(
 
        &mut self,
 
        context: &mut C,
 
        pd: &ProtocolDescriptionImpl,
 
    ) -> PolyBlocker {
 
        let mut context = EvalContext::Poly(context);
 
        loop {
 
            let result = self.prompt.step(&pd.heap, &mut context);
 
            match result {
 
                // Inside synchronous blocks, there are no return statements
 
                Ok(_) => unreachable!(),
 
                Err(cont) => match cont {
 
                    EvalContinuation::Stepping => continue,
 
                    EvalContinuation::Inconsistent => return PolyBlocker::Inconsistent,
 
                    // First need to exit synchronous block before definition may end
 
                    EvalContinuation::Terminal => unreachable!(),
 
                    // No nested synchronous blocks
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => return PolyBlocker::SyncBlockEnd,
 
                    // Not possible to create component in sync block
 
                    EvalContinuation::NewComponent(args) => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => match port {
 
                        Value::Output(OutputValue(key)) => {
 
                            return PolyBlocker::CouldntCheckFiring(key);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            return PolyBlocker::CouldntCheckFiring(key);
 
                        }
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::BlockGet(port) => match port {
 
                        Value::Output(OutputValue(key)) => {
 
                            return PolyBlocker::CouldntReadMsg(key);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            return PolyBlocker::CouldntReadMsg(key);
 
                        }
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::Put(port, message) => {
 
                        let key;
 
                        match port {
 
                            Value::Output(OutputValue(the_key)) => {
 
                                key = the_key;
 
                            }
 
                            Value::Input(InputValue(the_key)) => {
 
                                key = the_key;
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        let payload;
 
                        match message {
 
                            Value::Message(MessageValue(None)) => {
 
                                // Putting a null message is inconsistent
 
                                return PolyBlocker::Inconsistent;
 
                            }
 
                            Value::Message(MessageValue(Some(buffer))) => {
 
                                // Create a copy of the payload
 
                                payload = buffer.clone();
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                        return PolyBlocker::PutMsg(key, payload);
 
                    }
 
                },
 
            }
 
        }
 
    }
 
}
 

	
 
pub enum EvalContext<'a> {
 
    Mono(&'a mut dyn MonoContext<D = ProtocolDescriptionImpl, S = ComponentStateImpl>),
 
    Poly(&'a mut dyn PolyContext<D = ProtocolDescriptionImpl>),
 
    None,
 
}
 
impl EvalContext<'_> {
 
    fn random(&mut self) -> LongValue {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => todo!(),
 
            EvalContext::Poly(context) => unreachable!(),
 
        }
 
    }
 
    fn channel(&mut self) -> (Value, Value) {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => unreachable!(),
 
            EvalContext::Poly(context) => todo!(),
 
        }
 
    }
 
    fn fires(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => unreachable!(),
 
            EvalContext::Poly(context) => match port {
 
                Value::Output(OutputValue(key)) => context.is_firing(key).map(Value::from),
 
                Value::Input(InputValue(key)) => context.is_firing(key).map(Value::from),
 
                _ => unreachable!(),
 
            },
 
        }
 
    }
 
    fn get(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => unreachable!(),
 
            EvalContext::Poly(context) => match port {
 
                Value::Output(OutputValue(key)) => {
 
                    context.read_msg(key).map(Value::receive_message)
 
                }
 
                Value::Input(InputValue(key)) => context.read_msg(key).map(Value::receive_message),
 
                _ => unreachable!(),
 
            },
 
        }
 
    }
 
}
src/runtime/actors.rs
Show inline comments
 
@@ -3,395 +3,399 @@ use crate::runtime::{endpoint::*, *};
 

	
 
#[derive(Debug)]
 
pub(crate) struct MonoN {
 
    pub ekeys: HashSet<Key>,
 
    pub result: Option<(usize, HashMap<Key, Payload>)>,
 
}
 
#[derive(Debug)]
 
pub(crate) struct PolyN {
 
    pub ekeys: HashSet<Key>,
 
    pub branches: HashMap<Predicate, BranchN>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct BranchN {
 
    pub to_get: HashSet<Key>,
 
    pub gotten: HashMap<Key, Payload>,
 
    pub sync_batch_index: usize,
 
}
 

	
 
#[derive(Debug)]
 
pub struct MonoP {
 
    pub state: ProtocolS,
 
    pub ekeys: HashSet<Key>,
 
}
 
#[derive(Debug)]
 
pub(crate) struct PolyP {
 
    pub incomplete: HashMap<Predicate, BranchP>,
 
    pub complete: HashMap<Predicate, BranchP>,
 
    pub ekeys: HashSet<Key>,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct BranchP {
 
    pub outbox: HashMap<Key, Payload>,
 
    pub inbox: HashMap<Key, Payload>,
 
    pub state: ProtocolS,
 
}
 

	
 
//////////////////////////////////////////////////////////////////
 

	
 
impl PolyP {
 
    pub(crate) fn poly_run(
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        let to_run: Vec<_> = self.incomplete.drain().collect();
 
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
 
    }
 

	
 
    pub(crate) fn poly_run_these_branches(
 
        &mut self,
 
        mut m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        mut to_run: Vec<(Predicate, BranchP)>,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        use SyncRunResult as Srr;
 
        log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,);
 
        'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() {
 
            let mut r_ctx = BranchPContext {
 
                m_ctx: m_ctx.reborrow(),
 
                ekeys: &self.ekeys,
 
                predicate: &predicate,
 
                inbox: &branch.inbox,
 
            };
 
            use PolyBlocker as Sb;
 
            let blocker = branch.state.sync_run(&mut r_ctx, protocol_description);
 
            log!(
 
                &mut r_ctx.m_ctx.inner.logger,
 
                "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                r_ctx.m_ctx.my_subtree_id,
 
                &predicate,
 
                &blocker
 
            );
 
            match blocker {
 
                Sb::Inconsistent => {} // DROP
 
                Sb::CouldntReadMsg(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    log!(
 
                        &mut r_ctx.m_ctx.inner.logger,
 
                        "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}",
 
                        r_ctx.m_ctx.my_subtree_id,
 
                        channel_id,
 
                        &branch.inbox,
 
                    );
 
                    if predicate.replace_assignment(channel_id, true) != Some(false) {
 
                        // don't rerun now. Rerun at next `sync_run`
 

	
 
                        log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,);
 
                        self.incomplete.insert(predicate, branch);
 
                    } else {
 
                        log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,);
 
                    }
 
                    // ELSE DROP
 
                }
 
                Sb::CouldntCheckFiring(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
                        r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    // split the branch!
 
                    let branch_f = branch.clone();
 
                    let mut predicate_f = predicate.clone();
 
                    if predicate_f.replace_assignment(channel_id, false).is_some() {
 
                        panic!("OI HANS QUERY FIRST!");
 
                    }
 
                    assert!(predicate.replace_assignment(channel_id, true).is_none());
 
                    to_run.push((predicate, branch));
 
                    to_run.push((predicate_f, branch_f));
 
                }
 
                Sb::SyncBlockEnd => {
 
                    let ControllerInner { logger, endpoint_exts, .. } = m_ctx.inner;
 
                    log!(
 
                        logger,
 
                        "~ ... ran {:?} reached SyncBlockEnd with pred {:?} ...",
 
                        m_ctx.my_subtree_id,
 
                        &predicate,
 
                    );
 
                    // come up with the predicate for this local solution
 

	
 
                    for ekey in self.ekeys.iter() {
 
                        let channel_id = endpoint_exts.get(*ekey).unwrap().info.channel_id;
 
                        let fired =
 
                            branch.inbox.contains_key(ekey) || branch.outbox.contains_key(ekey);
 
                        match predicate.query(channel_id) {
 
                            Some(true) => {
 
                                if !fired {
 
                                    // This branch should have fired but didn't!
 
                                    log!(
 
                                        logger,
 
                                        "~ ... ... should have fired {:?} and didn't! pruning!",
 
                                        channel_id,
 
                                    );
 
                                    continue 'to_run_loop;
 
                                }
 
                            }
 
                            Some(false) => assert!(!fired),
 
                            None => {
 
                                predicate.replace_assignment(channel_id, false);
 
                                assert!(!fired)
 
                            }
 
                        }
 
                    }
 
                    log!(logger, "~ ... ... and finished just fine!",);
 
                    m_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut m_ctx.inner.logger,
 
                        m_ctx.my_subtree_id,
 
                        predicate.clone(),
 
                    );
 
                    self.complete.insert(predicate, branch);
 
                }
 
                Sb::PutMsg(ekey, payload) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let EndpointExt { info, endpoint } =
 
                        m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap();
 
                    if predicate.replace_assignment(info.channel_id, true) != Some(false) {
 
                        branch.outbox.insert(ekey, payload.clone());
 
                        let msg = CommMsgContents::SendPayload {
 
                            payload_predicate: predicate.clone(),
 
                            payload,
 
                        }
 
                        .into_msg(m_ctx.inner.round_index);
 
                        endpoint.send(msg)?;
 
                        to_run.push((predicate, branch));
 
                    }
 
                    // ELSE DROP
 
                }
 
            }
 
        }
 
        // all in self.incomplete most recently returned Blocker::CouldntReadMsg
 
        Ok(if self.incomplete.is_empty() {
 
            if self.complete.is_empty() {
 
                Srr::NoBranches
 
            } else {
 
                Srr::AllBranchesComplete
 
            }
 
        } else {
 
            Srr::BlockingForRecv
 
        })
 
    }
 

	
 
    pub(crate) fn poly_recv_run(
 
        &mut self,
 
        m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        ekey: Key,
 
        payload_predicate: Predicate,
 
        payload: Payload,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        // try exact match
 

	
 
        let to_run = if self.complete.contains_key(&payload_predicate) {
 
            // exact match with stopped machine
 

	
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run matched stopped machine exactly! nothing to do here",
 
            );
 
            vec![]
 
        } else if let Some(mut branch) = self.incomplete.remove(&payload_predicate) {
 
            // exact match with running machine
 

	
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run matched running machine exactly! pred is {:?}",
 
                &payload_predicate
 
            );
 
            branch.inbox.insert(ekey, payload);
 
            vec![(payload_predicate, branch)]
 
        } else {
 
            log!(
 
                &mut m_ctx.inner.logger,
 
                "... poly_recv_run didn't have any exact matches... Let's try feed it to all branches",
 

	
 
            );
 
            let mut incomplete2 = HashMap::<_, _>::default();
 
            let to_run = self
 
                .incomplete
 
                .drain()
 
                .filter_map(|(old_predicate, mut branch)| {
 
                    use CommonSatResult as Csr;
 
                    match old_predicate.common_satisfier(&payload_predicate) {
 
                        Csr::FormerNotLatter | Csr::Equivalent => {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}",
 
                                &old_predicate
 
                            );
 
                            // old_predicate COVERS the assumptions of payload_predicate
 
                            let was = branch.inbox.insert(ekey, payload.clone());
 
                            assert!(was.is_none()); // INBOX MUST BE EMPTY!
 
                            Some((old_predicate, branch))
 
                        }
 
                        Csr::New(new) => {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING",
 
                                &payload_predicate,
 
                                &old_predicate,
 
                                &new,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            let was = payload_branch.inbox.insert(ekey, payload.clone());
 
                            assert!(was.is_none()); // INBOX MUST BE EMPTY!
 

	
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            Some((new, payload_branch))
 
                        }
 
                        Csr::LatterNotFormer => {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING",
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            let was = payload_branch.inbox.insert(ekey, payload.clone());
 
                            assert!(was.is_none()); // INBOX MUST BE EMPTY!
 

	
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            Some((payload_predicate.clone(), payload_branch))
 
                        }
 
                        Csr::Nonexistant => {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}",
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // predicates contradict
 
                            incomplete2.insert(old_predicate, branch);
 
                            None
 
                        }
 
                    }
 
                })
 
                .collect();
 
            std::mem::swap(&mut self.incomplete, &mut incomplete2);
 
            to_run
 
        };
 
        log!(
 
            &mut m_ctx.inner.logger,
 
            "... DONE FEEDING BRANCHES. {} branches to run!",
 
            to_run.len(),
 
        );
 
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
 
    }
 

	
 
    pub(crate) fn become_mono(
 
        mut self,
 
        decision: &Predicate,
 
        table_row: &mut HashMap<Key, Payload>,
 
    ) -> MonoP {
 
        if let Some((_, branch)) = self.complete.drain().find(|(p, _)| decision.satisfies(p)) {
 
            let BranchP { inbox, state, outbox } = branch;
 
            for (key, payload) in inbox.into_iter().chain(outbox.into_iter()) {
 
                table_row.insert(key, payload);
 
            }
 
            self.incomplete.clear();
 
            MonoP { state, ekeys: self.ekeys }
 
        } else {
 
            panic!("No such solution!")
 
        }
 
    }
 
}
 

	
 
impl PolyN {
 
    pub fn sync_recv(
 
        &mut self,
 
        ekey: Key,
 
        logger: &mut String,
 
        payload: Payload,
 
        payload_predicate: Predicate,
 
        solution_storage: &mut SolutionStorage,
 
    ) {
 
        let mut branches2: HashMap<_, _> = Default::default();
 
        for (old_predicate, mut branch) in self.branches.drain() {
 
            use CommonSatResult as Csr;
 
            let case = old_predicate.common_satisfier(&payload_predicate);
 
            let mut report_if_solution =
 
                |branch: &BranchN, pred: &Predicate, logger: &mut String| {
 
                    if branch.to_get.is_empty() {
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            logger,
 
                            SubtreeId::PolyN,
 
                            pred.clone(),
 
                        );
 
                    }
 
                };
 
            log!(
 
                logger,
 
                "Feeding msg {:?} {:?} to native branch with pred {:?}. Predicate case {:?}",
 
                &payload_predicate,
 
                &payload,
 
                &old_predicate,
 
                &case
 
            );
 
            match case {
 
                Csr::Nonexistant => { /* skip branch */ }
 
                Csr::FormerNotLatter | Csr::Equivalent => {
 
                    // Feed the message to this branch in-place. no need to modify pred.
 
                    if branch.to_get.remove(&ekey) {
 
                        branch.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&branch, &old_predicate, logger);
 
                    }
 
                }
 
                Csr::LatterNotFormer => {
 
                    // create a new branch with the payload_predicate.
 
                    let mut forked = branch.clone();
 
                    if forked.to_get.remove(&ekey) {
 
                        forked.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&forked, &payload_predicate, logger);
 
                        branches2.insert(payload_predicate.clone(), forked);
 
                    }
 
                }
 
                Csr::New(new) => {
 
                    // create a new branch with the newly-created predicate
 
                    let mut forked = branch.clone();
 
                    if forked.to_get.remove(&ekey) {
 
                        forked.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&forked, &new, logger);
 
                        branches2.insert(new.clone(), forked);
 
                    }
 
                }
 
            }
 
            // unlike PolyP machines, Native branches do not become inconsistent
 
            branches2.insert(old_predicate, branch);
 
        }
 
        log!(
 
            logger,
 
            "Native now has {} branches with predicates: {:?}",
 
            branches2.len(),
 
            branches2.keys().collect::<Vec<_>>()
 
        );
 
        std::mem::swap(&mut branches2, &mut self.branches);
 
    }
 

	
 
    pub fn become_mono(
 
        mut self,
 
        decision: &Predicate,
 
        table_row: &mut HashMap<Key, Payload>,
 
    ) -> MonoN {
 
        if let Some((_, branch)) = self.branches.drain().find(|(p, _)| decision.satisfies(p)) {
 
        if let Some((_, branch)) = self
 
            .branches
 
            .drain()
 
            .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p))
 
        {
 
            let BranchN { gotten, sync_batch_index, .. } = branch;
 
            for (&key, payload) in gotten.iter() {
 
                assert!(table_row.insert(key, payload.clone()).is_none());
 
            }
 
            MonoN { ekeys: self.ekeys, result: Some((sync_batch_index, gotten)) }
 
        } else {
 
            panic!("No such solution!")
 
        }
 
    }
 
}
src/runtime/connector.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{errors::*, *};
 

	
 
pub fn random_controller_id() -> ControllerId {
 
    type Bytes8 = [u8; std::mem::size_of::<ControllerId>()];
 
    let mut bytes = Bytes8::default();
 
    getrandom::getrandom(&mut bytes).unwrap();
 
    unsafe { std::mem::transmute::<Bytes8, ControllerId>(bytes) }
 
}
 

	
 
impl Default for Unconfigured {
 
    fn default() -> Self {
 
        let controller_id = random_controller_id();
 
        Self { controller_id }
 
    }
 
}
 
impl Default for Connector {
 
    fn default() -> Self {
 
        Self::Unconfigured(Unconfigured::default())
 
    }
 
}
 
impl Connector {
 
    /// Configure the Connector with the given Pdl description.
 
    pub fn configure(&mut self, pdl: &[u8], main_component: &[u8]) -> Result<(), ConfigErr> {
 
        use ConfigErr::*;
 
        let controller_id = match self {
 
            Connector::Configured(_) => return Err(AlreadyConfigured),
 
            Connector::Connected(_) => return Err(AlreadyConnected),
 
            Connector::Unconfigured(Unconfigured { controller_id }) => *controller_id,
 
        };
 
        let protocol_description = Arc::new(ProtocolD::parse(pdl).map_err(ParseErr)?);
 
        let polarities = protocol_description.component_polarities(main_component)?;
 
        let configured = Configured {
 
            controller_id,
 
            protocol_description,
 
            bindings: Default::default(),
 
            polarities,
 
            main_component: main_component.to_vec(),
 
        };
 
        *self = Connector::Configured(configured);
 
        Ok(())
 
    }
 

	
 
    /// Bind the (configured) connector's port corresponding to the
 
    pub fn bind_port(
 
        &mut self,
 
        proto_port_index: usize,
 
        binding: PortBinding,
 
    ) -> Result<(), PortBindErr> {
 
        use PortBindErr::*;
 
        match self {
 
            Connector::Unconfigured { .. } => Err(NotConfigured),
 
            Connector::Connected(_) => Err(AlreadyConnected),
 
            Connector::Configured(configured) => {
 
                if configured.polarities.len() <= proto_port_index {
 
                    return Err(IndexOutOfBounds);
 
                }
 
                configured.bindings.insert(proto_port_index, binding);
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ConnectErr> {
 
        let deadline = Instant::now() + timeout;
 
        use ConnectErr::*;
 
        let configured = match self {
 
            Connector::Unconfigured { .. } => return Err(NotConfigured),
 
            Connector::Connected(_) => return Err(AlreadyConnected),
 
            Connector::Configured(configured) => configured,
 
        };
 
        // 1. Unwrap bindings or err
 
        let bound_proto_interface: Vec<(_, _)> = configured
 
            .proto_maybe_bindings
 
            .polarities
 
            .iter()
 
            .copied()
 
            .enumerate()
 
            .map(|(native_index, (polarity, maybe_binding))| {
 
                Ok((maybe_binding.ok_or(PortNotBound { native_index })?, polarity))
 
            .map(|(native_index, polarity)| {
 
                let binding = configured
 
                    .bindings
 
                    .get(&native_index)
 
                    .copied()
 
                    .ok_or(PortNotBound { native_index })?;
 
                Ok((binding, polarity))
 
            })
 
            .collect::<Result<Vec<(_, _)>, ConnectErr>>()?;
 
        let (controller, native_interface) = Controller::connect(
 
            configured.controller_id,
 
            &configured.main_component,
 
            configured.protocol_description.clone(),
 
            &bound_proto_interface[..],
 
            deadline,
 
        )?;
 
        *self = Connector::Connected(Connected {
 
            native_interface,
 
            sync_batches: vec![Default::default()],
 
            controller,
 
        });
 
        Ok(())
 
    }
 
    pub fn get_mut_logger(&mut self) -> Option<&mut String> {
 
        match self {
 
            Connector::Connected(connected) => Some(&mut connected.controller.inner.logger),
 
            _ => None,
 
        }
 
    }
 

	
 
    pub fn put(&mut self, native_port_index: usize, payload: Payload) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Putter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().unwrap();
 
        if sync_batch.puts.contains_key(&ekey) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.puts.insert(ekey, payload);
 
        Ok(())
 
    }
 

	
 
    pub fn get(&mut self, native_port_index: usize) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().unwrap();
 
        if sync_batch.gets.contains(&ekey) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.gets.insert(ekey);
 
        Ok(())
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, ()> {
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(()),
 
        };
 
        connected.sync_batches.push(SyncBatch::default());
 
        Ok(connected.sync_batches.len() - 1)
 
    }
 

	
 
    pub fn sync(&mut self, timeout: Duration) -> Result<usize, SyncErr> {
 
        let deadline = Instant::now() + timeout;
 
        use SyncErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 

	
 
        // do the synchronous round!
 
        connected.controller.sync_round(deadline, Some(connected.sync_batches.drain(..)))?;
 
        connected.sync_batches.push(SyncBatch::default());
 

	
 
        let mono_n = connected.controller.inner.mono_n.as_mut().unwrap();
 
        let result = mono_n.result.as_mut().unwrap();
 
        Ok(result.0)
 
    }
 

	
 
    pub fn read_gotten(&self, native_port_index: usize) -> Result<&[u8], ReadGottenErr> {
 
        use ReadGottenErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let &(key, polarity) =
 
            connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let mono_n = connected.controller.inner.mono_n.as_ref().expect("controller has no mono_n?");
 
        let result = mono_n.result.as_ref().ok_or(NoPreviousRound)?;
 
        let payload = result.1.get(&key).ok_or(DidntGet)?;
 
        Ok(payload)
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
#[cfg(feature = "ffi")]
 
pub mod ffi;
 

	
 
mod actors;
 
pub(crate) mod communication;
 
pub(crate) mod connector;
 
pub(crate) mod endpoint;
 
pub mod errors;
 
mod predicate; // TODO later
 
mod serde;
 
pub(crate) mod setup;
 

	
 
pub(crate) type ProtocolD = crate::protocol::ProtocolDescriptionImpl;
 
pub(crate) type ProtocolS = crate::protocol::ComponentStateImpl;
 

	
 
use crate::common::*;
 
use actors::*;
 
use endpoint::*;
 
use errors::*;
 

	
 
#[derive(Debug, PartialEq)]
 
pub(crate) enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 

	
 
#[derive(Clone, Eq, PartialEq, Hash)]
 
pub(crate) struct Predicate {
 
    pub assigned: BTreeMap<ChannelId, bool>,
 
}
 

	
 
#[derive(Debug, Default)]
 
struct SyncBatch {
 
    puts: HashMap<Key, Payload>,
 
    gets: HashSet<Key>,
 
}
 

	
 
#[derive(Debug)]
 
pub enum Connector {
 
    Unconfigured(Unconfigured),
 
    Configured(Configured),
 
    Connected(Connected), // TODO consider boxing. currently takes up a lot of stack real estate
 
}
 
#[derive(Debug)]
 
pub struct Unconfigured {
 
    pub controller_id: ControllerId,
 
}
 
#[derive(Debug)]
 
pub struct Configured {
 
    controller_id: ControllerId,
 
    polarities: Vec<Polarity>,
 
    bindings: HashMap<usize, PortBinding>,
 
    protocol_description: Arc<ProtocolD>,
 
    main_component: Vec<u8>,
 
}
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_interface: Vec<(Key, Polarity)>,
 
    sync_batches: Vec<SyncBatch>,
 
    controller: Controller,
 
}
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortBinding {
 
    Native,
 
    Active(SocketAddr),
 
    Passive(SocketAddr),
 
}
 

	
 
#[derive(Debug)]
 
struct Arena<T> {
 
    storage: Vec<T>,
 
}
 

	
 
#[derive(Debug)]
 
struct ReceivedMsg {
 
    recipient: Key,
 
    msg: Msg,
 
}
 

	
 
#[derive(Debug)]
 
struct MessengerState {
 
    poll: Poll,
 
    events: Events,
 
    delayed: Vec<ReceivedMsg>,
 
    undelayed: Vec<ReceivedMsg>,
 
    polled_undrained: IndexSet<Key>,
 
}
 
#[derive(Debug)]
 
struct ChannelIdStream {
 
    controller_id: ControllerId,
 
    next_channel_index: ChannelIndex,
 
}
 

	
 
#[derive(Debug)]
 
struct Controller {
 
    protocol_description: Arc<ProtocolD>,
 
    inner: ControllerInner,
 
    ephemeral: ControllerEphemeral,
 
}
 
#[derive(Debug)]
 
struct ControllerInner {
 
    round_index: usize,
 
    channel_id_stream: ChannelIdStream,
 
    endpoint_exts: Arena<EndpointExt>,
 
    messenger_state: MessengerState,
 
    mono_n: Option<MonoN>,
 
    mono_ps: Vec<MonoP>,
 
    family: ControllerFamily,
 
    logger: String,
 
}
 

	
 
/// This structure has its state entirely reset between synchronous rounds
 
#[derive(Debug, Default)]
 
struct ControllerEphemeral {
 
    solution_storage: SolutionStorage,
 
    poly_n: Option<PolyN>,
 
    poly_ps: Vec<PolyP>,
 
    ekey_to_holder: HashMap<Key, PolyId>,
 
}
 

	
 
#[derive(Debug)]
 
struct ControllerFamily {
 
    parent_ekey: Option<Key>,
 
    children_ekeys: Vec<Key>,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncRunResult {
 
    BlockingForRecv,
 
    AllBranchesComplete,
 
    NoBranches,
 
}
 

	
 
// Used to identify poly actors
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
enum PolyId {
 
    N,
 
    P { index: usize },
 
}
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
pub(crate) enum SubtreeId {
 
    PolyN,
 
    PolyP { index: usize },
 
    ChildController { ekey: Key },
 
}
 

	
 
pub(crate) struct MonoPContext<'a> {
 
    inner: &'a mut ControllerInner,
 
    ekeys: &'a mut HashSet<Key>,
 
}
 
pub(crate) struct PolyPContext<'a> {
 
    my_subtree_id: SubtreeId,
 
    inner: &'a mut ControllerInner,
 
    solution_storage: &'a mut SolutionStorage,
 
}
 
impl PolyPContext<'_> {
 
    #[inline(always)]
 
    fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> {
 
        let Self { solution_storage, my_subtree_id, inner } = self;
 
        PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner }
 
    }
 
}
 
struct BranchPContext<'m, 'r> {
 
    m_ctx: PolyPContext<'m>,
 
    ekeys: &'r HashSet<Key>,
 
    predicate: &'r Predicate,
 
    inbox: &'r HashMap<Key, Payload>,
 
}
 

	
 
#[derive(Debug, Default)]
 
pub(crate) struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 

	
 
trait Messengerlike {
 
    fn get_state_mut(&mut self) -> &mut MessengerState;
 
    fn get_endpoint_mut(&mut self, eekey: Key) -> &mut Endpoint;
 

	
 
    fn delay(&mut self, received: ReceivedMsg) {
 
        self.get_state_mut().delayed.push(received);
 
    }
 
    fn undelay_all(&mut self) {
 
        let MessengerState { delayed, undelayed, .. } = self.get_state_mut();
 
        undelayed.extend(delayed.drain(..))
 
    }
 

	
 
    fn send(&mut self, to: Key, msg: Msg) -> Result<(), EndpointErr> {
 
        self.get_endpoint_mut(to).send(msg)
 
    }
 

	
 
    // attempt to receive a message from one of the endpoints before the deadline
 
    fn recv(&mut self, deadline: Instant) -> Result<Option<ReceivedMsg>, MessengerRecvErr> {
 
        // try get something buffered
 
        if let Some(x) = self.get_state_mut().undelayed.pop() {
 
            return Ok(Some(x));
 
        }
 

	
 
        loop {
 
            // polled_undrained may not be empty
 
            while let Some(eekey) = self.get_state_mut().polled_undrained.pop() {
 
                if let Some(msg) = self.get_endpoint_mut(eekey).recv()? {
 
                    // this endpoint MAY still have messages! check again in future
 
                    self.get_state_mut().polled_undrained.insert(eekey);
 
                    return Ok(Some(ReceivedMsg { recipient: eekey, msg }));
 
                }
 
            }
 

	
 
            let state = self.get_state_mut();
 
            match state.poll_events(deadline) {
 
                Ok(()) => {
 
                    for e in state.events.iter() {
 
                        state.polled_undrained.insert(Key::from_token(e.token()));
 
                    }
 
                }
 
                Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed),
 
                Err(PollDeadlineErr::Timeout) => return Ok(None),
 
            }
 
        }
 
    }
 
}
 

	
 
/////////////////////////////////
 

	
 
impl From<EvalErr> for SyncErr {
 
    fn from(e: EvalErr) -> SyncErr {
 
        SyncErr::EvalErr(e)
 
    }
 
}
 
impl From<MessengerRecvErr> for SyncErr {
 
    fn from(e: MessengerRecvErr) -> SyncErr {
 
        SyncErr::MessengerRecvErr(e)
 
    }
 
}
 
impl From<MessengerRecvErr> for ConnectErr {
 
    fn from(e: MessengerRecvErr) -> ConnectErr {
 
        ConnectErr::MessengerRecvErr(e)
 
    }
 
}
 
impl From<EndpointErr> for MessengerRecvErr {
 
    fn from(e: EndpointErr) -> MessengerRecvErr {
 
        MessengerRecvErr::EndpointErr(e)
 
    }
 
}
 
impl<T> Default for Arena<T> {
 
    fn default() -> Self {
 
        Self { storage: vec![] }
 
    }
 
}
 
impl<T> Arena<T> {
 
    pub fn alloc(&mut self, t: T) -> Key {
 
        self.storage.push(t);
 
        Key::from_raw(self.storage.len() as u64 - 1)
 
    }
 
    pub fn get(&self, key: Key) -> Option<&T> {
 
        self.storage.get(key.to_raw() as usize)
 
    }
 
    pub fn get_mut(&mut self, key: Key) -> Option<&mut T> {
 
        self.storage.get_mut(key.to_raw() as usize)
 
    }
 
    pub fn type_convert<X>(self, f: impl FnMut((Key, T)) -> X) -> Arena<X> {
 
        Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() }
 
    }
 
    pub fn iter(&self) -> impl Iterator<Item = (Key, &T)> {
 
        self.keyspace().zip(self.storage.iter())
 
    }
 
    pub fn len(&self) -> usize {
 
        self.storage.len()
 
    }
 
    pub fn keyspace(&self) -> impl Iterator<Item = Key> {
 
        (0..(self.storage.len() as u64)).map(Key::from_raw)
 
    }
 
}
 

	
 
impl ChannelIdStream {
 
    fn new(controller_id: ControllerId) -> Self {
 
        Self { controller_id, next_channel_index: 0 }
 
    }
 
    fn next(&mut self) -> ChannelId {
 
        self.next_channel_index += 1;
 
        ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 }
 
    }
 
}
 

	
 
impl MessengerState {
 
    // does NOT guarantee that events is non-empty
 
    fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> {
 
        use PollDeadlineErr::*;
 
        self.events.clear();
 
        let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
        self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?;
 
        Ok(())
 
    }
 
}
 
impl From<PollDeadlineErr> for ConnectErr {
 
    fn from(e: PollDeadlineErr) -> ConnectErr {
 
        match e {
 
            PollDeadlineErr::Timeout => ConnectErr::Timeout,
 
            PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed,
 
        }
 
    }
 
}
 

	
 
impl std::ops::Not for Polarity {
 
    type Output = Self;
 
    fn not(self) -> Self::Output {
 
        use Polarity::*;
 
        match self {
 
            Putter => Getter,
 
            Getter => Putter,
 
        }
 
    }
 
}
 

	
 
impl Predicate {
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn satisfies(&self, other: &Self) -> bool {
 
        let mut s_it = self.assigned.iter();
 
        let mut s = if let Some(s) = s_it.next() {
 
            s
 
        } else {
 
            return other.assigned.is_empty();
 
        };
 
        for (oid, ob) in other.assigned.iter() {
 
            while s.0 < oid {
 
                s = if let Some(s) = s_it.next() {
 
                    s
 
                } else {
 
                    return false;
 
                };
 
            }
 
            if s.0 > oid || s.1 != ob {
 
                return false;
 
            }
 
        }
 
        true
 
    }
 

	
 
    /// Given self and other, two predicates, return the most general Predicate possible, N
 
    /// such that n.satisfies(self) && n.satisfies(other).
 
    /// If none exists Nonexistant is returned.
 
    /// If the resulting predicate is equivlanet to self, other, or both,
 
    /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
    /// otherwise New(N) is returned.
 
    pub fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
        use CommonSatResult::*;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // lists of assignments in self but not other and vice versa.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
            match [s, o] {
 
                [None, None] => break,
 
                [None, Some(x)] => {
 
                    o_not_s.push(x);
 
                    o_not_s.extend(o_it);
 
                    break;
 
                }
 
                [Some(x), None] => {
 
                    s_not_o.push(x);
 
                    s_not_o.extend(s_it);
 
                    break;
 
                }
 
                [Some((sid, sb)), Some((oid, ob))] => {
 
                    if sid < oid {
 
                        // o is missing this element
 
                        s_not_o.push((sid, sb));
 
                        s = s_it.next();
 
                    } else if sid > oid {
 
                        // s is missing this element
 
                        o_not_s.push((sid, sb));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Equivalent,       // ... equivalent to both.
 
            [false, true] => FormerNotLatter, // ... equivalent to self.
 
            [true, false] => LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut predicate = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    predicate.assigned.insert(id, b);
 
                }
 
                New(predicate)
 
            }
 
        }
 
    }
 

	
 
    pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = ChannelId> + '_ {
 
        self.assigned
 
            .iter()
 
            .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    }
 

	
 
    pub fn batch_assign_nones(
 
        &mut self,
 
        channel_ids: impl Iterator<Item = ChannelId>,
 
        value: bool,
 
    ) {
 
        for channel_id in channel_ids {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
 
    }
 
    pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option<bool> {
 
        self.assigned.insert(channel_id, value)
 
    }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
            match res.assigned.insert(channel_id, assignment_1) {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, x: ChannelId) -> Option<bool> {
 
        self.assigned.get(&x).copied()
 
    }
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{
 
    actors::{MonoN, MonoP},
 
    endpoint::*,
 
    errors::*,
 
    *,
 
};
 

	
 
#[derive(Debug)]
 
enum EndpointExtTodo {
 
    Finished(EndpointExt),
 
    ActiveConnecting { addr: SocketAddr, polarity: Polarity, stream: TcpStream },
 
    ActiveRecving { addr: SocketAddr, polarity: Polarity, endpoint: Endpoint },
 
    PassiveAccepting { addr: SocketAddr, info: EndpointInfo, listener: TcpListener },
 
    PassiveConnecting { addr: SocketAddr, info: EndpointInfo, stream: TcpStream },
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl Controller {
 
    // Given port bindings and a protocol config, create a connector with 1 native node
 
    pub fn connect(
 
        major: ControllerId,
 
        main_component: &[u8],
 
        protocol_description: Arc<ProtocolD>,
 
        bound_proto_interface: &[(PortBinding, Polarity)],
 
        deadline: Instant,
 
    ) -> Result<(Self, Vec<(Key, Polarity)>), ConnectErr> {
 
        use ConnectErr::*;
 

	
 
        let mut logger = String::default();
 
        log!(&mut logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major);
 

	
 
        let mut channel_id_stream = ChannelIdStream::new(major);
 
        let mut endpoint_ext_todos = Arena::default();
 

	
 
        let mut ekeys_native = vec![];
 
        let mut ekeys_proto = vec![];
 
        let mut ekeys_network = vec![];
 

	
 
        let mut native_interface = vec![];
 

	
 
        /*
 
        1.  - allocate an EndpointExtTodo for every native and interface port
 
            - store all the resulting keys in two keylists for the interfaces of the native and proto components
 
                native: [a, c,    f]
 
                         |  |     |
 
                         |  |     |
 
                proto:  [b, d, e, g]
 
                               ^todo
 
                arena: <A,B,C,D,E,F,G>
 
        */
 
        for &(binding, polarity) in bound_proto_interface.iter() {
 
            match binding {
 
                PortBinding::Native => {
 
                    let channel_id = channel_id_stream.next();
 
                    let ([ekey_native, ekey_proto], native_polarity) = {
 
                        let [p, g] = Endpoint::new_memory_pair();
 
                        let mut endpoint_to_key = |endpoint, polarity| {
 
                            endpoint_ext_todos.alloc(EndpointExtTodo::Finished(EndpointExt {
 
                                endpoint,
 
                                info: EndpointInfo { polarity, channel_id },
 
                            }))
 
                        };
 
                        let pkey = endpoint_to_key(p, Putter);
 
                        let gkey = endpoint_to_key(g, Getter);
 
                        let key_pair = match polarity {
 
                            Putter => [gkey, pkey],
 
                            Getter => [pkey, gkey],
 
                        };
 
                        (key_pair, !polarity)
 
                    };
 
                    native_interface.push((ekey_native, native_polarity));
 
                    ekeys_native.push(ekey_native);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
                PortBinding::Passive(addr) => {
 
                    let channel_id = channel_id_stream.next();
 
                    let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::PassiveAccepting {
 
                        addr,
 
                        info: EndpointInfo { polarity, channel_id },
 
                        listener: TcpListener::bind(&addr).map_err(|_| BindFailed(addr))?,
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
                PortBinding::Active(addr) => {
 
                    let ekey_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting {
 
                        addr,
 
                        polarity,
 
                        stream: TcpStream::connect(&addr).unwrap(),
 
                    });
 
                    ekeys_network.push(ekey_proto);
 
                    ekeys_proto.push(ekey_proto);
 
                }
 
            }
 
        }
 
        log!(&mut logger, "{:03?} setup todos...", major);
 

	
 
        // 2. convert the arena to Arena<EndpointExt>  and return the
 
        let (mut messenger_state, mut endpoint_exts) =
 
            Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?;
 

	
 
        let n_mono = Some(MonoN { ekeys: ekeys_native.into_iter().collect(), result: None });
 
        let p_monos = vec![MonoP {
 
            state: protocol_description.new_main_component(&ekeys_proto),
 
            state: protocol_description.new_main_component(main_component, &ekeys_proto),
 
            ekeys: ekeys_proto.into_iter().collect(),
 
        }];
 

	
 
        // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS}
 
        let family = Self::setup_sink_tree_family(
 
            major,
 
            &mut logger,
 
            &mut endpoint_exts,
 
            &mut messenger_state,
 
            ekeys_network,
 
            deadline,
 
        )?;
 

	
 
        log!(&mut logger, "CONNECT PHASE END! ~");
 
        let inner = ControllerInner {
 
            family,
 
            messenger_state,
 
            channel_id_stream,
 
            endpoint_exts,
 
            mono_ps: p_monos,
 
            mono_n: n_mono,
 
            round_index: 0,
 
            logger,
 
        };
 
        let controller = Self { protocol_description, inner, ephemeral: Default::default() };
 
        Ok((controller, native_interface))
 
    }
 

	
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
        use std::io::Write;
 
        stream.write(&[]).is_ok()
 
    }
 

	
 
    // inserts
 
    fn finish_endpoint_ext_todos(
 
        major: ControllerId,
 
        logger: &mut String,
 
        mut endpoint_ext_todos: Arena<EndpointExtTodo>,
 
        deadline: Instant,
 
    ) -> Result<(MessengerState, Arena<EndpointExt>), ConnectErr> {
 
        use {ConnectErr::*, EndpointExtTodo::*};
 

	
 
        // 1. define and setup a poller and event loop
 
        let edge = PollOpt::edge();
 
        let [ready_r, ready_w] = [Ready::readable(), Ready::writable()];
 
        let mut ms = MessengerState {
 
            poll: Poll::new().map_err(|_| PollInitFailed)?,
 
            events: Events::with_capacity(endpoint_ext_todos.len()),
 
            delayed: vec![],
 
            undelayed: vec![],
 
            polled_undrained: Default::default(),
 
        };
 

	
 
        // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener}
 
        // 3. store the keyset of EndpointExtTodos which are not Finished in `to_finish`.
 
        let mut to_finish = HashSet::<_>::default();
 
        log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len());
 
        for (key, t) in endpoint_ext_todos.iter() {
 
            let token = key.to_token();
 
            match t {
 
                ActiveRecving { .. } | PassiveConnecting { .. } => unreachable!(),
 
                Finished(EndpointExt { endpoint, .. }) => {
 
                    ms.poll.register(endpoint, token, ready_r, edge)
 
                }
 
                ActiveConnecting { stream, .. } => {
 
                    to_finish.insert(key);
 
                    ms.poll.register(stream, token, ready_w, edge)
 
                }
 
                PassiveAccepting { listener, .. } => {
 
                    to_finish.insert(key);
 
                    ms.poll.register(listener, token, ready_r, edge)
 
                }
 
            }
 
            .expect("register first");
 
        }
 
        // invariant: every EndpointExtTodo has one thing registered with mio
 

	
 
        // 4. until all in endpoint_ext_todos are Finished variant, handle events
 
        let mut polled_undrained_later = IndexSet::<_>::default();
 
        let mut backoff_millis = 10;
 
        while !to_finish.is_empty() {
 
            ms.poll_events(deadline)?;
 
            for event in ms.events.iter() {
 
                let token = event.token();
 
                let ekey = Key::from_token(token);
 
                let entry = endpoint_ext_todos.get_mut(ekey).unwrap();
 
                match entry {
 
                    Finished(_) => {
 
                        polled_undrained_later.insert(ekey);
 
                    }
 
                    PassiveAccepting { addr, listener, .. } => {
 
                        log!(logger, "{:03?} start PassiveAccepting...", major);
 
                        assert!(event.readiness().is_readable());
 
                        let (stream, _peer_addr) =
 
                            listener.accept().map_err(|_| AcceptFailed(*addr))?;
 
                        ms.poll.deregister(listener).expect("wer");
 
                        ms.poll.register(&stream, token, ready_w, edge).expect("3y5");
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveAccepting { addr, info, .. } = e => {
 
                                PassiveConnecting { addr, info, stream }
 
                            }]
 
                        });
 
                        log!(logger, "{:03?} ... end PassiveAccepting", major);
 
                    }
 
                    PassiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start PassiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if !Self::test_stream_connectivity(stream) {
 
                            return Err(PassiveConnectFailed(*addr));
 
                        }
 
                        ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                        let mut res = Ok(());
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveConnecting { info, stream, .. } = e => {
 
                                let mut endpoint = Endpoint::from_fresh_stream(stream);
 
                                let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                                res = endpoint.send(msg);
 
                                Finished(EndpointExt { info, endpoint })
 
                            }]
 
                        });
 
                        res?;
 
                        log!(logger, "{:03?} ... end PassiveConnecting", major);
 
                        assert!(to_finish.remove(&ekey));
 
                    }
 
                    ActiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start ActiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if Self::test_stream_connectivity(stream) {
 
                            // connect successful
 
                            log!(logger, "CONNECT SUCCESS");
 
                            ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                            take_mut::take(entry, |e| {
 
                                assert_let![ActiveConnecting { stream, polarity, addr } = e => {
 
                                    let endpoint = Endpoint::from_fresh_stream(stream);
 
                                    ActiveRecving { endpoint, polarity, addr }
 
                                }]
 
                            });
 
                            log!(logger, ".. ok");
 
                        } else {
 
                            // connect failure. retry!
 
                            log!(logger, "CONNECT FAIL");
 
                            ms.poll.deregister(stream).expect("wt");
 
                            std::thread::sleep(Duration::from_millis(backoff_millis));
 
                            backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3;
 
                            let mut new_stream = TcpStream::connect(addr).unwrap();
 
                            ms.poll.register(&new_stream, token, ready_w, edge).expect("PAC 3");
 
                            std::mem::swap(stream, &mut new_stream);
 
                        }
 
                        log!(logger, "{:03?} ... end ActiveConnecting", major);
 
                    }
 
                    ActiveRecving { addr, polarity, endpoint } => {
 
                        log!(logger, "{:03?} start ActiveRecving...", major);
 
                        assert!(event.readiness().is_readable());
 
                        'recv_loop: while let Some(msg) = endpoint.recv()? {
 
                            if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg {
 
                                if info.polarity == *polarity {
 
                                    return Err(PolarityMatched(*addr));
 
                                }
 
                                take_mut::take(entry, |e| {
 
                                    assert_let![ActiveRecving { polarity, endpoint, .. } = e => {
 
                                        let info = EndpointInfo { polarity, channel_id: info.channel_id };
 
                                        Finished(EndpointExt { info, endpoint })
 
                                    }]
 
                                });
 
                                ms.polled_undrained.insert(ekey);
 
                                assert!(to_finish.remove(&ekey));
 
                                break 'recv_loop;
 
                            } else {
 
                                ms.delayed.push(ReceivedMsg { recipient: ekey, msg });
 
                            }
 
                        }
 
                        log!(logger, "{:03?} ... end ActiveRecving", major);
 
                    }
 
                }
 
            }
 
        }
 
        for ekey in polled_undrained_later {
 
            ms.polled_undrained.insert(ekey);
 
        }
 
        let endpoint_exts = endpoint_ext_todos.type_convert(|(_, todo)| match todo {
 
            Finished(endpoint_ext) => endpoint_ext,
 
            _ => unreachable!(),
 
        });
 
        Ok((ms, endpoint_exts))
 
    }
 

	
 
    fn setup_sink_tree_family(
 
        major: ControllerId,
 
        logger: &mut String,
 
        endpoint_exts: &mut Arena<EndpointExt>,
 
        messenger_state: &mut MessengerState,
 
        neighbors: Vec<Key>,
 
        deadline: Instant,
 
    ) -> Result<ControllerFamily, ConnectErr> {
 
        use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*};
 

	
 
        log!(logger, "neighbors {:?}", &neighbors);
 

	
 
        let mut messenger = (messenger_state, endpoint_exts);
 
        impl Messengerlike for (&mut MessengerState, &mut Arena<EndpointExt>) {
 
            fn get_state_mut(&mut self) -> &mut MessengerState {
 
                self.0
 
            }
 
            fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint {
 
                &mut self.1.get_mut(ekey).expect("OUT OF BOUNDS").endpoint
 
            }
 
        }
 

	
 
        // 1. broadcast my ID as the first echo. await reply from all in net_keylist
 
        let echo = S(LeaderEcho { maybe_leader: major });
 
        let mut awaiting = IndexSet::with_capacity(neighbors.len());
 
        for &n in neighbors.iter() {
 
            log!(logger, "{:?}'s initial echo to {:?}, {:?}", major, n, &echo);
 
            messenger.send(n, echo.clone())?;
 
            awaiting.insert(n);
 
        }
 

	
 
        // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
        //    adopt it as leader, sender as parent, and reset the await set.
 
        let mut parent: Option<Key> = None;
 
        let mut my_leader = major;
 
        messenger.undelay_all();
 
        'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
            let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?;
 
            log!(logger, "{:?} GOT {:?} {:?}", major, &recipient, &msg);
 
            match msg {
 
                S(LeaderAnnounce { leader }) => {
 
                    // someone else completed the echo and became leader first!
 
                    // the sender is my parent
 
                    parent = Some(recipient);
 
                    my_leader = leader;
 
                    awaiting.clear();
 
                    break 'echo_loop;
 
                }
 
                S(LeaderEcho { maybe_leader }) => {
 
                    use Ordering::*;
 
                    match maybe_leader.cmp(&my_leader) {
 
                        Less => { /* ignore */ }
 
                        Equal => {
 
                            awaiting.remove(&recipient);
 
                            if awaiting.is_empty() {
 
                                if let Some(p) = parent {
 
                                    // return the echo to my parent
 
                                    messenger.send(p, S(LeaderEcho { maybe_leader }))?;
 
                                } else {
 
                                    // DECIDE!
 
                                    break 'echo_loop;
 
                                }
 
                            }
 
                        }
 
                        Greater => {
 
                            // join new echo
 
                            log!(logger, "{:?} setting leader to {:?}", major, recipient);
 
                            parent = Some(recipient);
 
                            my_leader = maybe_leader;
 
                            let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                            awaiting.clear();
 
                            if neighbors.len() == 1 {
 
                                // immediately reply to parent
 
                                log!(
 
                                    logger,
 
                                    "{:?} replying echo to parent {:?} immediately",
 
                                    major,
 
                                    recipient
 
                                );
 
                                messenger.send(recipient, echo.clone())?;
 
                            } else {
 
                                for &n in neighbors.iter() {
 
                                    if n != recipient {
 
                                        log!(
 
                                            logger,
 
                                            "{:?} repeating echo {:?} to {:?}",
 
                                            major,
 
                                            &echo,
 
                                            n
 
                                        );
 
                                        messenger.send(n, echo.clone())?;
 
                                        awaiting.insert(n);
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 
                msg => messenger.delay(ReceivedMsg { recipient, msg }),
 
            }
 
        }
 
        match parent {
 
            None => assert_eq!(
 
                my_leader, major,
 
                "I've got no parent, but I consider {:?} the leader?",
 
                my_leader
 
            ),
 
            Some(parent) => assert_ne!(
 
                my_leader, major,
 
                "I have {:?} as parent, but I consider myself ({:?}) the leader?",
 
                parent, major
 
            ),
 
        }
 
        log!(logger, "{:?} DONE WITH ECHO", major);
 

	
 
        // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
        //    in this loop, every node sends 1 message to each neighbor
 
        let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
        for &k in neighbors.iter() {
 
            let msg =
 
                if Some(k) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() };
 
            log!(logger, "{:?} ANNOUNCING to {:?} {:?}", major, k, &msg);
 
            messenger.send(k, msg)?;
 
        }
 

	
 
        // await 1 message from all non-parents
 
        for &n in neighbors.iter() {
 
            if Some(n) != parent {
 
                awaiting.insert(n);
 
            }
 
        }
 
        let mut children = Vec::default();
 
        messenger.undelay_all();
 
        while !awaiting.is_empty() {
 
            let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?;
 
            match msg {
 
                S(YouAreMyParent) => {
 
                    assert!(awaiting.remove(&recipient));
 
                    children.push(recipient);
 
                }
 
                S(SetupMsg::LeaderAnnounce { leader }) => {
 
                    assert!(awaiting.remove(&recipient));
 
                    assert!(leader == my_leader);
 
                    assert!(Some(recipient) != parent);
 
                    // they wouldn't send me this if they considered me their parent
 
                }
 
                _ => messenger.delay(ReceivedMsg { recipient, msg }),
 
            }
 
        }
 
        Ok(ControllerFamily { parent_ekey: parent, children_ekeys: children })
 
    }
 
}
 

	
 
impl Messengerlike for Controller {
 
    fn get_state_mut(&mut self) -> &mut MessengerState {
 
        &mut self.inner.messenger_state
 
    }
 
    fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint {
 
        &mut self.inner.endpoint_exts.get_mut(ekey).expect("OUT OF BOUNDS").endpoint
 
    }
 
}
src/test/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
use PortBinding::*;
 

	
 
use super::*;
 

	
 
#[test]
 
fn config_ok_0() {
 
    let pdl = b"primitive main() {}";
 
    let d = ProtocolD::parse(pdl).unwrap();
 
    let pol = d.main_interface_polarities();
 
    let pol = d.component_polarities(b"main").unwrap();
 
    assert_eq!(&pol[..], &[]);
 
}
 

	
 
#[test]
 
fn config_ok_2() {
 
    let pdl = b"primitive main(in x, out y) {}";
 
    let d = ProtocolD::parse(pdl).unwrap();
 
    let pol = d.main_interface_polarities();
 
    assert_eq!(&pol[..], &[Polarity::Getter, Polarity::Putter]);
 
    let pol = d.component_polarities(b"main").unwrap();
 
    assert_eq!(&pol[..], &[Getter, Putter]);
 
}
 

	
 
#[test]
 
#[should_panic]
 
fn config_non_port() {
 
    let pdl = b"primitive main(in q, int q) {}";
 
    ProtocolD::parse(pdl).unwrap();
 
}
 

	
 
#[test]
 
fn config_and_connect_2() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = ["127.0.0.1:9000".parse().unwrap(), "127.0.0.1:9001".parse().unwrap()];
 
    use std::thread;
 
    let handles = vec![
 
        //
 
        thread::spawn(move || {
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id: 0 });
 
            x.configure(b"primitive main(in a, out b) {}").unwrap();
 
            x.configure(b"primitive main(in a, out b) {}", b"main").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 
        }),
 
        thread::spawn(move || {
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id: 1 });
 
            x.configure(b"primitive main(out a, in b) {}").unwrap();
 
            x.configure(b"primitive main(out a, in b) {}", b"main").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Active(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 
        }),
 
    ];
 
    for h in handles {
 
        handle(h.join())
 
    }
 
}
 

	
 
#[test]
 
fn bind_too_much() {
 
    let mut x = Connector::Unconfigured(Unconfigured { controller_id: 0 });
 
    x.configure(b"primitive main(in a) {}").unwrap();
 
    x.configure(b"primitive main(in a) {}", b"main").unwrap();
 
    x.bind_port(0, Native).unwrap();
 
    assert!(x.bind_port(1, Native).is_err());
 
}
 

	
 
#[test]
 
fn config_and_connect_chain() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [
 
        "127.0.0.1:9002".parse().unwrap(),
 
        "127.0.0.1:9003".parse().unwrap(),
 
        "127.0.0.1:9004".parse().unwrap(),
 
    ];
 
    use std::thread;
 
    let handles = vec![
 
        //
 
        thread::spawn(move || {
 
            // PRODUCER A->
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id: 0 });
 
            x.configure(b"primitive main(out a) {}").unwrap();
 
            x.configure(b"primitive main(out a) {}", b"main").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
        }),
 
        thread::spawn(move || {
 
            // FORWARDER ->B->
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id: 1 });
 
            x.configure(b"primitive main(in a, out b) {}").unwrap();
 
            x.configure(b"primitive main(in a, out b) {}", b"main").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Active(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 
        }),
 
        thread::spawn(move || {
 
            // FORWARDER ->C->
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id: 2 });
 
            x.configure(b"primitive main(in a, out b) {}").unwrap();
 
            x.configure(b"primitive main(in a, out b) {}", b"main").unwrap();
 
            x.bind_port(0, Passive(addrs[1])).unwrap();
 
            x.bind_port(1, Active(addrs[2])).unwrap();
 
            x.connect(timeout).unwrap();
 
        }),
 
        thread::spawn(move || {
 
            // CONSUMER ->D
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id: 3 });
 
            x.configure(b"primitive main(in a) {}").unwrap();
 
            x.configure(b"primitive main(in a) {}", b"main").unwrap();
 
            x.bind_port(0, Passive(addrs[2])).unwrap();
 
            x.connect(timeout).unwrap();
 
        }),
 
    ];
 
    for h in handles {
 
        handle(h.join())
 
    }
 
}
0 comments (0 inline, 0 general)