Changeset - a226385adc2d
[Not reviewed]
0 11 0
Christopher Esterhuyse - 5 years ago 2020-02-05 16:50:00
christopheresterhuyse@gmail.com
natives working
11 files changed with 346 insertions and 173 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -27,6 +27,7 @@ backtrace = "0.3"
 

	
 
[dev-dependencies]
 
test-generator = "0.3.0"
 
crossbeam-utils = "0.7.0"
 

	
 
[lib]
 
crate-type = ["cdylib"]
src/protocol/ast.rs
Show inline comments
 
@@ -764,13 +764,8 @@ impl Heap {
 
    pub fn alloc_import(&mut self, f: impl FnOnce(ImportId) -> Import) -> ImportId {
 
        ImportId(self.imports.alloc_with_id(|id| f(ImportId(id))))
 
    }
 
    pub fn alloc_protocol_description(
 
        &mut self,
 
        f: impl FnOnce(RootId) -> Root,
 
    ) -> RootId {
 
        RootId(
 
            self.protocol_descriptions.alloc_with_id(|id| f(RootId(id))),
 
        )
 
    pub fn alloc_protocol_description(&mut self, f: impl FnOnce(RootId) -> Root) -> RootId {
 
        RootId(self.protocol_descriptions.alloc_with_id(|id| f(RootId(id))))
 
    }
 
    pub fn alloc_imported_declaration(
 
        &mut self,
src/protocol/eval.rs
Show inline comments
 
@@ -62,7 +62,7 @@ impl Value {
 
                    Value::Message(MessageValue(Some(vec![0; length.try_into().unwrap()])))
 
                }
 
            }
 
            _ => unimplemented!()
 
            _ => unimplemented!(),
 
        }
 
    }
 
    fn from_constant(constant: &Constant) -> Value {
 
@@ -99,7 +99,7 @@ impl Value {
 
                }
 
                the_index = index.try_into().unwrap();
 
            }
 
            _ => unreachable!()
 
            _ => unreachable!(),
 
        }
 
        // The subject must be either a message or an array
 
        // And the value and the subject must be compatible
 
@@ -142,7 +142,7 @@ impl Value {
 
            (Value::ShortArray(_), Value::Short(_)) => todo!(),
 
            (Value::IntArray(_), Value::Int(_)) => todo!(),
 
            (Value::LongArray(_), Value::Long(_)) => todo!(),
 
            _ => unreachable!()
 
            _ => unreachable!(),
 
        }
 
    }
 
    fn plus(&self, other: &Value) -> Value {
 
@@ -891,7 +891,7 @@ impl Display for MessageValue {
 
                    }
 
                }
 
                write!(f, ")")
 
            },
 
            }
 
        }
 
    }
 
}
 
@@ -1334,7 +1334,13 @@ impl Store {
 
        // Overwrite mapping
 
        self.map.insert(var, value.clone());
 
    }
 
    fn update(&mut self, h: &Heap, ctx: &mut EvalContext, lexpr: ExpressionId, value: Value) -> EvalResult {
 
    fn update(
 
        &mut self,
 
        h: &Heap,
 
        ctx: &mut EvalContext,
 
        lexpr: ExpressionId,
 
        value: Value,
 
    ) -> EvalResult {
 
        match &h[lexpr] {
 
            Expression::Variable(var) => {
 
                let var = var.declaration.unwrap();
 
@@ -1359,7 +1365,7 @@ impl Store {
 
                }
 
                match subject.set(&index, &value) {
 
                    Some(value) => Ok(value),
 
                    None => Err(EvalContinuation::Inconsistent)
 
                    None => Err(EvalContinuation::Inconsistent),
 
                }
 
            }
 
            _ => unimplemented!("{:?}", h[lexpr]),
 
@@ -1443,8 +1449,7 @@ impl Store {
 
            Expression::Select(expr) => self.get(h, expr.this.upcast()),
 
            Expression::Array(expr) => unimplemented!(),
 
            Expression::Constant(expr) => Ok(Value::from_constant(&expr.value)),
 
            Expression::Call(expr) => {
 
                match expr.method {
 
            Expression::Call(expr) => match expr.method {
 
                Method::Create => {
 
                    assert_eq!(1, expr.arguments.len());
 
                    let length = self.eval(h, ctx, expr.arguments[0])?;
 
@@ -1463,12 +1468,11 @@ impl Store {
 
                    let value = self.eval(h, ctx, expr.arguments[0])?;
 
                    match ctx.get(value.clone()) {
 
                        None => Err(EvalContinuation::BlockGet(value)),
 
                            Some(result) => Ok(result)
 
                        }
 
                    }
 
                    Method::Symbolic(symbol) => unimplemented!()
 
                        Some(result) => Ok(result),
 
                    }
 
                }
 
                Method::Symbolic(symbol) => unimplemented!(),
 
            },
 
            Expression::Variable(expr) => self.get(h, expr.this.upcast()),
 
        }
 
    }
 
@@ -1496,11 +1500,8 @@ pub struct Prompt {
 

	
 
impl Prompt {
 
    pub fn new(h: &Heap, def: DefinitionId, args: &Vec<Value>) -> Self {
 
        let mut prompt = Prompt {
 
            definition: def,
 
            store: Store::new(),
 
            position: Some((&h[def]).body())
 
        };
 
        let mut prompt =
 
            Prompt { definition: def, store: Store::new(), position: Some((&h[def]).body()) };
 
        prompt.set_arguments(h, args);
 
        prompt
 
    }
 
@@ -1642,8 +1643,8 @@ impl Prompt {
 
                    EvalContinuation::NewComponent(args) => unreachable!(),
 
                    EvalContinuation::BlockFires(val) => unreachable!(),
 
                    EvalContinuation::BlockGet(val) => unreachable!(),
 
                    EvalContinuation::Put(port, msg) => unreachable!()
 
                }
 
                    EvalContinuation::Put(port, msg) => unreachable!(),
 
                },
 
            }
 
        }
 
    }
src/protocol/lexer.rs
Show inline comments
 
@@ -1618,10 +1618,7 @@ impl Lexer<'_> {
 
        self.consume_string(b";")?;
 
        Ok(h.alloc_import(|this| Import { this, position, value }))
 
    }
 
    pub fn consume_protocol_description(
 
        &mut self,
 
        h: &mut Heap,
 
    ) -> Result<RootId, ParseError> {
 
    pub fn consume_protocol_description(&mut self, h: &mut Heap) -> Result<RootId, ParseError> {
 
        let position = self.source.pos();
 
        let mut pragmas = Vec::new();
 
        let mut imports = Vec::new();
src/protocol/mod.rs
Show inline comments
 
@@ -7,9 +7,9 @@ mod parser;
 

	
 
use crate::common::*;
 
use crate::protocol::ast::*;
 
use crate::protocol::eval::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 
use crate::protocol::eval::*;
 
use std::hint::unreachable_unchecked;
 

	
 
pub struct ProtocolDescriptionImpl {
 
@@ -69,24 +69,24 @@ impl ProtocolDescription for ProtocolDescriptionImpl {
 
        for (&x, y) in interface.iter().zip(self.main_interface_polarities()) {
 
            match y {
 
                Polarity::Getter => args.push(Value::Input(InputValue(x))),
 
                Polarity::Putter => args.push(Value::Output(OutputValue(x)))
 
                Polarity::Putter => args.push(Value::Output(OutputValue(x))),
 
            }
 
        }
 
        ComponentStateImpl {
 
            prompt: Prompt::new(&self.heap, self.main.upcast(), &args)
 
        }
 
        ComponentStateImpl { prompt: Prompt::new(&self.heap, self.main.upcast(), &args) }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ComponentStateImpl {
 
    prompt: Prompt
 
    prompt: Prompt,
 
}
 
impl ComponentState for ComponentStateImpl {
 
    type D = ProtocolDescriptionImpl;
 

	
 
    fn pre_sync_run<C: MonoContext<D = ProtocolDescriptionImpl, S = Self>>(
 
        &mut self, context: &mut C, pd: &ProtocolDescriptionImpl,
 
        &mut self,
 
        context: &mut C,
 
        pd: &ProtocolDescriptionImpl,
 
    ) -> MonoBlocker {
 
        let mut context = EvalContext::Mono(context);
 
        loop {
 
@@ -103,19 +103,21 @@ impl ComponentState for ComponentStateImpl {
 
                    EvalContinuation::SyncBlockEnd => unreachable!(),
 
                    EvalContinuation::NewComponent(args) => {
 
                        todo!();
 
                        continue
 
                        continue;
 
                    }
 
                    // Outside synchronous blocks, no fires/get/put happens
 
                    EvalContinuation::BlockFires(val) => unreachable!(),
 
                    EvalContinuation::BlockGet(val) => unreachable!(),
 
                    EvalContinuation::Put(port, msg) => unreachable!()
 
                }
 
                    EvalContinuation::Put(port, msg) => unreachable!(),
 
                },
 
            }
 
        }
 
    }
 

	
 
    fn sync_run<C: PolyContext<D = ProtocolDescriptionImpl>>(
 
        &mut self, context: &mut C,  pd: &ProtocolDescriptionImpl,
 
        &mut self,
 
        context: &mut C,
 
        pd: &ProtocolDescriptionImpl,
 
    ) -> PolyBlocker {
 
        let mut context = EvalContext::Poly(context);
 
        loop {
 
@@ -133,28 +135,24 @@ impl ComponentState for ComponentStateImpl {
 
                    EvalContinuation::SyncBlockEnd => return PolyBlocker::SyncBlockEnd,
 
                    // Not possible to create component in sync block
 
                    EvalContinuation::NewComponent(args) => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => {
 
                        match port {
 
                    EvalContinuation::BlockFires(port) => match port {
 
                        Value::Output(OutputValue(key)) => {
 
                            return PolyBlocker::CouldntCheckFiring(key);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            return PolyBlocker::CouldntCheckFiring(key);
 
                        }
 
                            _ => unreachable!()
 
                        }
 
                    }
 
                    EvalContinuation::BlockGet(port) => {
 
                        match port {
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::BlockGet(port) => match port {
 
                        Value::Output(OutputValue(key)) => {
 
                            return PolyBlocker::CouldntReadMsg(key);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            return PolyBlocker::CouldntReadMsg(key);
 
                        }
 
                            _ => unreachable!()
 
                        }
 
                    }
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::Put(port, message) => {
 
                        let key;
 
                        match port {
 
@@ -164,7 +162,7 @@ impl ComponentState for ComponentStateImpl {
 
                            Value::Input(InputValue(the_key)) => {
 
                                key = the_key;
 
                            }
 
                            _ => unreachable!()
 
                            _ => unreachable!(),
 
                        }
 
                        let payload;
 
                        match message {
 
@@ -176,11 +174,11 @@ impl ComponentState for ComponentStateImpl {
 
                                // Create a copy of the payload
 
                                payload = buffer.clone();
 
                            }
 
                            _ => unreachable!()
 
                            _ => unreachable!(),
 
                        }
 
                        return PolyBlocker::PutMsg(key, payload);
 
                    }
 
                }
 
                },
 
            }
 
        }
 
    }
 
@@ -189,7 +187,7 @@ impl ComponentState for ComponentStateImpl {
 
pub enum EvalContext<'a> {
 
    Mono(&'a mut dyn MonoContext<D = ProtocolDescriptionImpl, S = ComponentStateImpl>),
 
    Poly(&'a mut dyn PolyContext<D = ProtocolDescriptionImpl>),
 
    None
 
    None,
 
}
 
impl EvalContext<'_> {
 
    fn random(&mut self) -> LongValue {
 
@@ -210,33 +208,23 @@ impl EvalContext<'_> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => unreachable!(),
 
            EvalContext::Poly(context) => {
 
                match port {
 
                    Value::Output(OutputValue(key)) => {
 
                        context.is_firing(key).map(Value::from)
 
                    }
 
                    Value::Input(InputValue(key)) => {
 
                        context.is_firing(key).map(Value::from)
 
                    }
 
                    _ => unreachable!()
 
                }
 
            }
 
            EvalContext::Poly(context) => match port {
 
                Value::Output(OutputValue(key)) => context.is_firing(key).map(Value::from),
 
                Value::Input(InputValue(key)) => context.is_firing(key).map(Value::from),
 
                _ => unreachable!(),
 
            },
 
        }
 
    }
 
    fn get(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => unreachable!(),
 
            EvalContext::Poly(context) => {
 
                match port {
 
            EvalContext::Poly(context) => match port {
 
                Value::Output(OutputValue(key)) => {
 
                    context.read_msg(key).map(Value::receive_message)
 
                }
 
                    Value::Input(InputValue(key)) => {
 
                        context.read_msg(key).map(Value::receive_message)
 
                    }
 
                    _ => unreachable!()
 
                }
 
                Value::Input(InputValue(key)) => context.read_msg(key).map(Value::receive_message),
 
                _ => unreachable!(),
 
            },
 
        }
 
    }
src/runtime/actors.rs
Show inline comments
 
@@ -55,9 +55,8 @@ impl PolyP {
 
        mut to_run: Vec<(Predicate, BranchP)>,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        use SyncRunResult as Srr;
 
        let cid = m_ctx.inner.channel_id_stream.controller_id;
 
        log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,);
 
        while let Some((mut predicate, mut branch)) = to_run.pop() {
 
        'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() {
 
            let mut r_ctx = BranchPContext {
 
                m_ctx: m_ctx.reborrow(),
 
                ekeys: &self.ekeys,
 
@@ -111,47 +110,46 @@ impl PolyP {
 
                    to_run.push((predicate_f, branch_f));
 
                }
 
                Sb::SyncBlockEnd => {
 
                    let ControllerInner { logger, endpoint_exts, .. } = m_ctx.inner;
 
                    log!(
 
                        &mut m_ctx.inner.logger,
 
                        "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                        logger,
 
                        "~ ... ran {:?} reached SyncBlockEnd with pred {:?} ...",
 
                        m_ctx.my_subtree_id,
 
                        &predicate,
 
                        &blocker
 
                    );
 
                    // come up with the predicate for this local solution
 
                    let lookup =
 
                        |&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    let ekeys_channel_id_iter = self.ekeys.iter().map(lookup);
 
                    predicate.batch_assign_nones(ekeys_channel_id_iter, false);
 

	
 
                    // OK now check we really received all the messages we expected to
 
                    let num_fired = predicate.iter_matching(true).count();
 
                    let num_msgs =
 
                        branch.inbox.keys().chain(branch.outbox.keys()).map(lookup).count();
 
                    match num_fired.cmp(&num_msgs) {
 
                        Ordering::Less => unreachable!(),
 
                        Ordering::Greater => log!(
 
                            &mut m_ctx.inner.logger,
 
                            "{:?} with pred {:?} finished but |inbox|+|outbox| < .",
 
                            m_ctx.my_subtree_id,
 
                            &predicate,
 
                        ),
 
                        Ordering::Equal => {
 
                    for ekey in self.ekeys.iter() {
 
                        let channel_id = endpoint_exts.get(*ekey).unwrap().info.channel_id;
 
                        let fired =
 
                            branch.inbox.contains_key(ekey) || branch.outbox.contains_key(ekey);
 
                        match predicate.query(channel_id) {
 
                            Some(true) => {
 
                                if !fired {
 
                                    // This branch should have fired but didn't!
 
                                    log!(
 
                                &mut m_ctx.inner.logger,
 
                                "{:?} with pred {:?} finished! Storing this solution locally.",
 
                                m_ctx.my_subtree_id,
 
                                &predicate,
 
                                        logger,
 
                                        "~ ... ... should have fired {:?} and didn't! pruning!",
 
                                        channel_id,
 
                                    );
 
                                    continue 'to_run_loop;
 
                                }
 
                            }
 
                            Some(false) => assert!(!fired),
 
                            None => {
 
                                predicate.replace_assignment(channel_id, false);
 
                                assert!(!fired)
 
                            }
 
                        }
 
                    }
 
                    log!(logger, "~ ... ... and finished just fine!",);
 
                    m_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut m_ctx.inner.logger,
 
                        m_ctx.my_subtree_id,
 
                        predicate.clone(),
 
                    );
 
                            // store the solution for recovering later
 
                    self.complete.insert(predicate, branch);
 
                }
 
                    }
 
                }
 
                Sb::PutMsg(ekey, payload) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let EndpointExt { info, endpoint } =
 
@@ -227,7 +225,6 @@ impl PolyP {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}",
 
                              
 
                                &old_predicate
 
                            );
 
                            // old_predicate COVERS the assumptions of payload_predicate
 
@@ -236,11 +233,9 @@ impl PolyP {
 
                            Some((old_predicate, branch))
 
                        }
 
                        Csr::New(new) => {
 

	
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING",
 
                              
 
                                &payload_predicate,
 
                                &old_predicate,
 
                                &new,
 
@@ -255,11 +250,9 @@ impl PolyP {
 
                            Some((new, payload_branch))
 
                        }
 
                        Csr::LatterNotFormer => {
 

	
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING",
 
                           
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
@@ -276,7 +269,6 @@ impl PolyP {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}",
 
                              
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
@@ -320,19 +312,72 @@ impl PolyN {
 
    pub fn sync_recv(
 
        &mut self,
 
        ekey: Key,
 
        logger: &mut String,
 
        payload: Payload,
 
        payload_predicate: Predicate,
 
        solution_storage: &mut SolutionStorage,
 
    ) {
 
        for (predicate, branch) in self.branches.iter_mut() {
 
        let mut branches2: HashMap<_, _> = Default::default();
 
        for (old_predicate, mut branch) in self.branches.drain() {
 
            use CommonSatResult as Csr;
 
            let case = old_predicate.common_satisfier(&payload_predicate);
 
            let mut report_if_solution =
 
                |branch: &BranchN, pred: &Predicate, logger: &mut String| {
 
                    if branch.to_get.is_empty() {
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            logger,
 
                            SubtreeId::PolyN,
 
                            pred.clone(),
 
                        );
 
                    }
 
                };
 
            log!(
 
                logger,
 
                "Feeding msg {:?} {:?} to native branch with pred {:?}. Predicate case {:?}",
 
                &payload_predicate,
 
                &payload,
 
                &old_predicate,
 
                &case
 
            );
 
            match case {
 
                Csr::Nonexistant => { /* skip branch */ }
 
                Csr::FormerNotLatter | Csr::Equivalent => {
 
                    // Feed the message to this branch in-place. no need to modify pred.
 
                    if branch.to_get.remove(&ekey) {
 
                        branch.gotten.insert(ekey, payload.clone());
 
                if branch.to_get.is_empty() {
 
                    solution_storage
 
                        .submit_and_digest_subtree_solution(SubtreeId::PolyN, predicate.clone());
 
                        report_if_solution(&branch, &old_predicate, logger);
 
                    }
 
                }
 
                Csr::LatterNotFormer => {
 
                    // create a new branch with the payload_predicate.
 
                    let mut forked = branch.clone();
 
                    if forked.to_get.remove(&ekey) {
 
                        forked.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&forked, &payload_predicate, logger);
 
                        branches2.insert(payload_predicate.clone(), forked);
 
                    }
 
                }
 
                Csr::New(new) => {
 
                    // create a new branch with the newly-created predicate
 
                    let mut forked = branch.clone();
 
                    if forked.to_get.remove(&ekey) {
 
                        forked.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&forked, &new, logger);
 
                        branches2.insert(new.clone(), forked);
 
                    }
 
                }
 
            }
 
            // unlike PolyP machines, Native branches do not become inconsistent
 
            branches2.insert(old_predicate, branch);
 
        }
 
        log!(
 
            logger,
 
            "Native now has {} branches with predicates: {:?}",
 
            branches2.len(),
 
            branches2.keys().collect::<Vec<_>>()
 
        );
 
        std::mem::swap(&mut branches2, &mut self.branches);
 
    }
 

	
 
    pub fn become_mono(
 
        mut self,
src/runtime/communication.rs
Show inline comments
 
@@ -98,21 +98,32 @@ impl Controller {
 
                    predicate requires the support of oracle boolean variables"
 
                )
 
            }
 
            let branch = BranchN {
 
                to_get: true_ekeys.collect(),
 
                gotten: Default::default(),
 
                sync_batch_index,
 
            };
 
            let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
            for (ekey, payload) in puts {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
                    &payload,
 
                    &predicate,
 
                    sync_batch_index,
 
                );
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch (batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral
 
                    .solution_storage
 
                    .submit_and_digest_subtree_solution(SubtreeId::PolyN, predicate.clone());
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
@@ -315,9 +326,11 @@ impl Controller {
 
                        subtree_id,
 
                        &partial_oracle
 
                    );
 
                    self.ephemeral
 
                        .solution_storage
 
                        .submit_and_digest_subtree_solution(subtree_id, partial_oracle);
 
                    self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut self.inner.logger,
 
                        subtree_id,
 
                        partial_oracle,
 
                    );
 

	
 
                    if self.handle_locals_maybe_decide()? {
 
                        return Ok(());
 
@@ -351,15 +364,19 @@ impl Controller {
 
                            // this happens when a message is sent to a component that has exited.
 
                            // It's safe to drop this message;
 
                            // The sender branch will certainly not be part of the solution
 
                            continue 'recv_loop;
 
                        }
 
                        Some(PolyId::N) => {
 
                            // Message for NativeMachine
 
                            self.ephemeral.poly_n.as_mut().unwrap().sync_recv(
 
                                received.recipient,
 
                                &mut self.inner.logger,
 
                                payload,
 
                                payload_predicate,
 
                                &mut self.ephemeral.solution_storage,
 
                            );
 
                            if self.handle_locals_maybe_decide()? {
 
                                return Ok(());
 
                            }
 
                        }
 
                        Some(PolyId::P { index }) => {
 
                            // Message for protocol actor
 
@@ -546,9 +563,11 @@ impl SolutionStorage {
 

	
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut String,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 
@@ -557,11 +576,18 @@ impl SolutionStorage {
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(predicate, set_visitor, old_local, new_local);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
        }
 
    }
 

	
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut String,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
@@ -572,6 +598,7 @@ impl SolutionStorage {
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        logger,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
@@ -583,6 +610,7 @@ impl SolutionStorage {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
src/runtime/endpoint.rs
Show inline comments
 
@@ -61,7 +61,7 @@ impl std::fmt::Debug for Endpoint {
 
            Endpoint::Memory { .. } => "Memory",
 
            Endpoint::Network(..) => "Network",
 
        };
 
        write!(f, "Endpoint::{}", s)
 
        f.write_fmt(format_args!("Endpoint::{}", s))
 
    }
 
}
 

	
src/runtime/mod.rs
Show inline comments
 
@@ -444,16 +444,16 @@ impl Predicate {
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        f.pad("{")?;
 
        for (ChannelId { controller_id, channel_index }, &v) in self.assigned.iter() {
 
            write!(
 
                f,
 
            f.write_fmt(format_args!(
 
                "({:?},{:?})=>{}, ",
 
                controller_id,
 
                channel_index,
 
                if v { 'T' } else { 'F' }
 
            )?;
 
            ))?
 
        }
 
        Ok(())
 
        f.pad("}")
 
    }
 
}
 

	
src/test/connector.rs
Show inline comments
 
@@ -2,20 +2,29 @@ extern crate test_generator;
 

	
 
use super::*;
 

	
 
use std::fs;
 
use std::path::Path;
 
use std::thread;
 
use test_generator::test_resources;
 

	
 
use crate::common::*;
 
use crate::runtime::*;
 
use crate::runtime::errors::*;
 
use crate::runtime::{errors::*, PortBinding::*, *};
 

	
 
// using a static AtomicU16, shared between all tests in the binary,
 
// allocate and return a socketaddr of the form 127.0.0.1:X where X in 7000..
 
fn next_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(7_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port).into()
 
}
 

	
 
#[test]
 
fn incremental() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = ["127.0.0.1:7010".parse().unwrap(), "127.0.0.1:7011".parse().unwrap()];
 
    let a = thread::spawn(move || {
 
    let addrs = [next_addr(), next_addr()];
 
    let handles = vec![
 
        thread::spawn(move || {
 
            let controller_id = 0;
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
            x.configure(
 
@@ -27,13 +36,13 @@ fn incremental() {
 
                }",
 
            )
 
            .unwrap();
 
        x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
    });
 
    let b = thread::spawn(move || {
 
        }),
 
        thread::spawn(move || {
 
            let controller_id = 1;
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
            x.configure(
 
@@ -44,26 +53,27 @@ fn incremental() {
 
                }",
 
            )
 
            .unwrap();
 
        x.bind_port(0, PortBinding::Active(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Active(addrs[1])).unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Active(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
    });
 
    handle(a.join());
 
    handle(b.join());
 
        }),
 
    ];
 
    for h in handles {
 
        handle(h.join())
 
    }
 
}
 

	
 
#[test]
 
fn duo_positive() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = ["127.0.0.1:7012".parse().unwrap(), "127.0.0.1:7013".parse().unwrap()];
 
    let addrs = [next_addr(), next_addr()];
 
    let a = thread::spawn(move || {
 
        let controller_id = 0;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"
 
        primitive main(out a, out b) {
 
            b"primitive main(out a, out b) {
 
                synchronous {}
 
                synchronous {}
 
                synchronous {
 
@@ -77,8 +87,8 @@ fn duo_positive() {
 
            }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap();
 
        x.bind_port(0, Passive(addrs[0])).unwrap();
 
        x.bind_port(1, Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
@@ -90,8 +100,7 @@ fn duo_positive() {
 
        let controller_id = 1;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"
 
        primitive main(in a, in b) {
 
            b"primitive main(in a, in b) {
 
                while (true) {
 
                    synchronous {
 
                        if (fires(a)) {
 
@@ -107,8 +116,8 @@ fn duo_positive() {
 
            }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Active(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Active(addrs[1])).unwrap();
 
        x.bind_port(0, Active(addrs[0])).unwrap();
 
        x.bind_port(1, Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
@@ -123,34 +132,36 @@ fn duo_positive() {
 
#[test]
 
fn duo_negative() {
 
    let timeout = Duration::from_millis(500);
 
    let addrs = ["127.0.0.1:7014".parse().unwrap(), "127.0.0.1:7015".parse().unwrap()];
 
    let addrs = [next_addr(), next_addr()];
 
    let a = thread::spawn(move || {
 
        let controller_id = 0;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(b"
 
        primitive main(out a, out b) {
 
        x.configure(
 
            b"primitive main(out a, out b) {
 
                synchronous {}
 
                synchronous {
 
                    msg m = create(0);
 
                    put(a, m); // fires a on second round
 
                }
 
        }").unwrap();
 
        x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap();
 
            }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Passive(addrs[0])).unwrap();
 
        x.bind_port(1, Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        let r = x.sync(timeout);
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        match r {
 
            Err(SyncErr::Timeout) => {}
 
            x => unreachable!("{:?}", x)
 
            x => unreachable!("{:?}", x),
 
        }
 
    });
 
    let b = thread::spawn(move || {
 
        let controller_id = 1;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(b"
 
        primitive main(in a, in b) {
 
        x.configure(
 
            b"primitive main(in a, in b) {
 
                while (true) {
 
                    synchronous {
 
                        if (fires(a)) {
 
@@ -163,18 +174,82 @@ fn duo_negative() {
 
                        }
 
                    }
 
                }
 
        }").unwrap();
 
        x.bind_port(0, PortBinding::Active(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Active(addrs[1])).unwrap();
 
            }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Active(addrs[0])).unwrap();
 
        x.bind_port(1, Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        let r = x.sync(timeout);
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        match r {
 
            Err(SyncErr::Timeout) => {}
 
            x => unreachable!("{:?}", x)
 
            x => unreachable!("{:?}", x),
 
        }
 
    });
 
    handle(a.join());
 
    handle(b.join());
 
}
 

	
 
#[test]
 
fn connect_natives() {
 
    static CHAIN: &[u8] = b"
 
    primitive main(in i, out o) {
 
        while(true) synchronous {}
 
    }";
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    do_all(&[
 
        &|x| {
 
            x.configure(CHAIN).unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
        },
 
        &|x| {
 
            x.configure(CHAIN).unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
        },
 
    ]);
 
}
 

	
 
#[test]
 
fn forward() {
 
    static FORWARD: &[u8] = b"
 
    primitive main(in i, out o) {
 
        while(true) synchronous {
 
            put(o, get(i));
 
        }
 
    }";
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    do_all(&[
 
        //
 
        &|x| {
 
            x.configure(FORWARD).unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let msg = b"HELLO!".to_vec();
 
            x.put(0, msg).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
        },
 
        &|x| {
 
            x.configure(FORWARD).unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let expect = b"HELLO!".to_vec();
 
            x.get(0).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            assert_eq!(expect, x.read_gotten(0).unwrap());
 
        },
 
    ]);
 
}
src/test/mod.rs
Show inline comments
 
use crate::common::ControllerId;
 
use crate::runtime::Connector;
 
use crate::runtime::Unconfigured;
 
use core::fmt::Debug;
 

	
 
mod connector;
 
@@ -15,11 +18,51 @@ impl Debug for Panicked {
 
        }
 
    }
 
}
 
fn handle(result: Result<(), std::boxed::Box<(dyn std::any::Any + std::marker::Send + 'static)>>) {
 
    match result {
 
        Ok(_) => {}
 
        Err(x) => {
 
            panic!("Worker panicked: {:?}", Panicked(x));
 
fn handle(result: Result<(), Box<(dyn std::any::Any + Send + 'static)>>) {
 
    if let Err(x) = result {
 
        panic!("Worker panicked: {:?}", Panicked(x))
 
    }
 
}
 

	
 
fn do_all(i: &[&(dyn Fn(&mut Connector) + Sync)]) {
 
    let cid_iter = 0..(i.len() as ControllerId);
 
    let mut connectors = cid_iter
 
        .clone()
 
        .map(|controller_id| Connector::Unconfigured(Unconfigured { controller_id }))
 
        .collect::<Vec<_>>();
 

	
 
    let mut results = vec![];
 
    crossbeam_utils::thread::scope(|s| {
 
        let handles: Vec<_> = i
 
            .iter()
 
            .zip(connectors.iter_mut())
 
            .map(|(func, connector)| s.spawn(move |_| func(connector)))
 
            .collect();
 
        for h in handles {
 
            results.push(h.join());
 
        }
 
    })
 
    .unwrap();
 

	
 
    let mut failures = false;
 

	
 
    for ((controller_id, connector), res) in
 
        cid_iter.zip(connectors.iter_mut()).zip(results.into_iter())
 
    {
 
        println!("====================\n CID {:?} ...", controller_id);
 
        match connector.get_mut_logger() {
 
            Some(logger) => println!("{}", logger),
 
            None => println!("<No Log>"),
 
        }
 
        match res {
 
            Ok(()) => println!("CID {:?} OK!", controller_id),
 
            Err(e) => {
 
                failures = true;
 
                println!("CI {:?} PANIC! {:?}", controller_id, Panicked(e));
 
            }
 
        };
 
    }
 
    if failures {
 
        panic!("FAILURES!");
 
    }
 
}
0 comments (0 inline, 0 general)