Changeset - a226385adc2d
[Not reviewed]
0 11 0
Christopher Esterhuyse - 5 years ago 2020-02-05 16:50:00
christopheresterhuyse@gmail.com
natives working
11 files changed with 453 insertions and 280 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -24,12 +24,13 @@ mio-extras = "2.0.6"
 
# protocol stuff
 
id-arena = "2.2.1"
 
backtrace = "0.3"
 

	
 
[dev-dependencies]
 
test-generator = "0.3.0"
 
crossbeam-utils = "0.7.0"
 

	
 
[lib]
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi"]
src/protocol/ast.rs
Show inline comments
 
@@ -761,19 +761,14 @@ impl Heap {
 
    pub fn alloc_pragma(&mut self, f: impl FnOnce(PragmaId) -> Pragma) -> PragmaId {
 
        PragmaId(self.pragmas.alloc_with_id(|id| f(PragmaId(id))))
 
    }
 
    pub fn alloc_import(&mut self, f: impl FnOnce(ImportId) -> Import) -> ImportId {
 
        ImportId(self.imports.alloc_with_id(|id| f(ImportId(id))))
 
    }
 
    pub fn alloc_protocol_description(
 
        &mut self,
 
        f: impl FnOnce(RootId) -> Root,
 
    ) -> RootId {
 
        RootId(
 
            self.protocol_descriptions.alloc_with_id(|id| f(RootId(id))),
 
        )
 
    pub fn alloc_protocol_description(&mut self, f: impl FnOnce(RootId) -> Root) -> RootId {
 
        RootId(self.protocol_descriptions.alloc_with_id(|id| f(RootId(id))))
 
    }
 
    pub fn alloc_imported_declaration(
 
        &mut self,
 
        f: impl FnOnce(ImportedDeclarationId) -> ImportedDeclaration,
 
    ) -> ImportedDeclarationId {
 
        ImportedDeclarationId(DeclarationId(self.declarations.alloc_with_id(|id| {
 
@@ -1633,13 +1628,13 @@ pub enum Definition {
 

	
 
impl Definition {
 
    pub fn is_component(&self) -> bool {
 
        match self {
 
            Definition::Component(_) => true,
 
            _ => false,
 
         }
 
        }
 
    }
 
    pub fn as_component(&self) -> &Component {
 
        match self {
 
            Definition::Component(result) => result,
 
            _ => panic!("Unable to cast `Definition` to `Component`"),
 
        }
src/protocol/eval.rs
Show inline comments
 
@@ -51,21 +51,21 @@ impl Value {
 
    pub fn receive_message(buffer: &Vec<u8>) -> Value {
 
        Value::Message(MessageValue(Some(buffer.clone())))
 
    }
 
    fn create_message(length: Value) -> Value {
 
        match length {
 
            Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => {
 
                let length : i64 = i64::from(length);
 
                let length: i64 = i64::from(length);
 
                if length < 0 || length > MESSAGE_MAX_LENGTH {
 
                    // Only messages within the expected length are allowed
 
                    Value::Message(MessageValue(None))
 
                } else {
 
                    Value::Message(MessageValue(Some(vec![0; length.try_into().unwrap()])))
 
                }
 
            }
 
            _ => unimplemented!()
 
            _ => unimplemented!(),
 
        }
 
    }
 
    fn from_constant(constant: &Constant) -> Value {
 
        match constant {
 
            Constant::Null => Value::Message(MessageValue(None)),
 
            Constant::True => Value::Boolean(BooleanValue(true)),
 
@@ -86,23 +86,23 @@ impl Value {
 
            }
 
            Constant::Character(data) => unimplemented!(),
 
        }
 
    }
 
    fn set(&mut self, index: &Value, value: &Value) -> Option<Value> {
 
        // The index must be of integer type, and non-negative
 
        let the_index : usize;
 
        let the_index: usize;
 
        match index {
 
            Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => {
 
                let index = i64::from(index);
 
                if index < 0 || index > MESSAGE_MAX_LENGTH {
 
                    // It is inconsistent to update out of bounds
 
                    return None;
 
                }
 
                the_index = index.try_into().unwrap();
 
            }
 
            _ => unreachable!()
 
            _ => unreachable!(),
 
        }
 
        // The subject must be either a message or an array
 
        // And the value and the subject must be compatible
 
        match (self, value) {
 
            (Value::Message(MessageValue(None)), _) => {
 
                // It is inconsistent to update the null message
 
@@ -139,13 +139,13 @@ impl Value {
 
            (Value::MessageArray(_), Value::Message(_)) => todo!(),
 
            (Value::BooleanArray(_), Value::Boolean(_)) => todo!(),
 
            (Value::ByteArray(_), Value::Byte(_)) => todo!(),
 
            (Value::ShortArray(_), Value::Short(_)) => todo!(),
 
            (Value::IntArray(_), Value::Int(_)) => todo!(),
 
            (Value::LongArray(_), Value::Long(_)) => todo!(),
 
            _ => unreachable!()
 
            _ => unreachable!(),
 
        }
 
    }
 
    fn plus(&self, other: &Value) -> Value {
 
        // TODO: do a match on the value directly
 
        assert!(!self.exact_type().array);
 
        assert!(!other.exact_type().array);
 
@@ -888,13 +888,13 @@ impl Display for MessageValue {
 
                    if i >= 10 {
 
                        write!(f, ",...")?;
 
                        break;
 
                    }
 
                }
 
                write!(f, ")")
 
            },
 
            }
 
        }
 
    }
 
}
 

	
 
impl ValueImpl for MessageValue {
 
    fn exact_type(&self) -> Type {
 
@@ -1331,13 +1331,19 @@ impl Store {
 
        // Ensure value is compatible with type of variable
 
        let the_type = h[var].the_type(h);
 
        assert!(value.is_type_compatible(the_type));
 
        // Overwrite mapping
 
        self.map.insert(var, value.clone());
 
    }
 
    fn update(&mut self, h: &Heap, ctx: &mut EvalContext, lexpr: ExpressionId, value: Value) -> EvalResult {
 
    fn update(
 
        &mut self,
 
        h: &Heap,
 
        ctx: &mut EvalContext,
 
        lexpr: ExpressionId,
 
        value: Value,
 
    ) -> EvalResult {
 
        match &h[lexpr] {
 
            Expression::Variable(var) => {
 
                let var = var.declaration.unwrap();
 
                // Ensure value is compatible with type of variable
 
                let the_type = h[var].the_type(h);
 
                assert!(value.is_type_compatible(the_type));
 
@@ -1356,13 +1362,13 @@ impl Store {
 
                        subject = self.map.get_mut(&var).unwrap();
 
                    }
 
                    _ => unreachable!(),
 
                }
 
                match subject.set(&index, &value) {
 
                    Some(value) => Ok(value),
 
                    None => Err(EvalContinuation::Inconsistent)
 
                    None => Err(EvalContinuation::Inconsistent),
 
                }
 
            }
 
            _ => unimplemented!("{:?}", h[lexpr]),
 
        }
 
    }
 
    fn get(&mut self, h: &Heap, rexpr: ExpressionId) -> EvalResult {
 
@@ -1402,13 +1408,13 @@ impl Store {
 
                } else {
 
                    self.eval(h, ctx, expr.false_expression)
 
                }
 
            }
 
            Expression::Binary(expr) => {
 
                let left = self.eval(h, ctx, expr.left)?;
 
                let right = self.eval(h, ctx,expr.right)?;
 
                let right = self.eval(h, ctx, expr.right)?;
 
                match expr.operation {
 
                    BinaryOperator::Equality => Ok(left.eq(&right)),
 
                    BinaryOperator::Inequality => Ok(left.neq(&right)),
 
                    BinaryOperator::LessThan => Ok(left.lt(&right)),
 
                    BinaryOperator::LessThanEqual => Ok(left.lte(&right)),
 
                    BinaryOperator::GreaterThan => Ok(left.gt(&right)),
 
@@ -1440,38 +1446,36 @@ impl Store {
 
            }
 
            Expression::Indexing(expr) => self.get(h, expr.this.upcast()),
 
            Expression::Slicing(expr) => unimplemented!(),
 
            Expression::Select(expr) => self.get(h, expr.this.upcast()),
 
            Expression::Array(expr) => unimplemented!(),
 
            Expression::Constant(expr) => Ok(Value::from_constant(&expr.value)),
 
            Expression::Call(expr) => {
 
                match expr.method {
 
                    Method::Create => {
 
                        assert_eq!(1, expr.arguments.len());
 
                        let length = self.eval(h, ctx, expr.arguments[0])?;
 
                        Ok(Value::create_message(length))
 
                    }
 
                    Method::Fires => {
 
                        assert_eq!(1, expr.arguments.len());
 
                        let value = self.eval(h, ctx, expr.arguments[0])?;
 
                        match ctx.fires(value.clone()) {
 
                            None => Err(EvalContinuation::BlockFires(value)),
 
                            Some(result) => Ok(result),
 
                        }
 
            Expression::Call(expr) => match expr.method {
 
                Method::Create => {
 
                    assert_eq!(1, expr.arguments.len());
 
                    let length = self.eval(h, ctx, expr.arguments[0])?;
 
                    Ok(Value::create_message(length))
 
                }
 
                Method::Fires => {
 
                    assert_eq!(1, expr.arguments.len());
 
                    let value = self.eval(h, ctx, expr.arguments[0])?;
 
                    match ctx.fires(value.clone()) {
 
                        None => Err(EvalContinuation::BlockFires(value)),
 
                        Some(result) => Ok(result),
 
                    }
 
                    Method::Get => {
 
                        assert_eq!(1, expr.arguments.len());
 
                        let value = self.eval(h, ctx, expr.arguments[0])?;
 
                        match ctx.get(value.clone()) {
 
                            None => Err(EvalContinuation::BlockGet(value)),
 
                            Some(result) => Ok(result)
 
                        }
 
                }
 
                Method::Get => {
 
                    assert_eq!(1, expr.arguments.len());
 
                    let value = self.eval(h, ctx, expr.arguments[0])?;
 
                    match ctx.get(value.clone()) {
 
                        None => Err(EvalContinuation::BlockGet(value)),
 
                        Some(result) => Ok(result),
 
                    }
 
                    Method::Symbolic(symbol) => unimplemented!()
 
                }
 
            }
 
                Method::Symbolic(symbol) => unimplemented!(),
 
            },
 
            Expression::Variable(expr) => self.get(h, expr.this.upcast()),
 
        }
 
    }
 
}
 

	
 
type EvalResult = Result<Value, EvalContinuation>;
 
@@ -1493,17 +1497,14 @@ pub struct Prompt {
 
    store: Store,
 
    position: Option<StatementId>,
 
}
 

	
 
impl Prompt {
 
    pub fn new(h: &Heap, def: DefinitionId, args: &Vec<Value>) -> Self {
 
        let mut prompt = Prompt {
 
            definition: def,
 
            store: Store::new(),
 
            position: Some((&h[def]).body())
 
        };
 
        let mut prompt =
 
            Prompt { definition: def, store: Store::new(), position: Some((&h[def]).body()) };
 
        prompt.set_arguments(h, args);
 
        prompt
 
    }
 
    fn set_arguments(&mut self, h: &Heap, args: &Vec<Value>) {
 
        let def = &h[self.definition];
 
        let params = def.parameters();
 
@@ -1639,14 +1640,14 @@ impl Prompt {
 
                    // Functions never encounter any blocking behavior
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => unreachable!(),
 
                    EvalContinuation::NewComponent(args) => unreachable!(),
 
                    EvalContinuation::BlockFires(val) => unreachable!(),
 
                    EvalContinuation::BlockGet(val) => unreachable!(),
 
                    EvalContinuation::Put(port, msg) => unreachable!()
 
                }
 
                    EvalContinuation::Put(port, msg) => unreachable!(),
 
                },
 
            }
 
        }
 
    }
 
}
 

	
 
#[cfg(test)]
src/protocol/lexer.rs
Show inline comments
 
@@ -1615,16 +1615,13 @@ impl Lexer<'_> {
 
            value.append(&mut ident);
 
        }
 
        self.consume_whitespace(false)?;
 
        self.consume_string(b";")?;
 
        Ok(h.alloc_import(|this| Import { this, position, value }))
 
    }
 
    pub fn consume_protocol_description(
 
        &mut self,
 
        h: &mut Heap,
 
    ) -> Result<RootId, ParseError> {
 
    pub fn consume_protocol_description(&mut self, h: &mut Heap) -> Result<RootId, ParseError> {
 
        let position = self.source.pos();
 
        let mut pragmas = Vec::new();
 
        let mut imports = Vec::new();
 
        let mut definitions = Vec::new();
 
        self.consume_whitespace(false)?;
 
        while self.has_pragma() {
src/protocol/mod.rs
Show inline comments
 
@@ -4,15 +4,15 @@ pub mod inputsource;
 
mod lexer;
 
mod library;
 
mod parser;
 

	
 
use crate::common::*;
 
use crate::protocol::ast::*;
 
use crate::protocol::eval::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 
use crate::protocol::eval::*;
 
use std::hint::unreachable_unchecked;
 

	
 
pub struct ProtocolDescriptionImpl {
 
    heap: Heap,
 
    source: InputSource,
 
    root: RootId,
 
@@ -66,30 +66,30 @@ impl ProtocolDescription for ProtocolDescriptionImpl {
 
    }
 
    fn new_main_component(&self, interface: &[Key]) -> ComponentStateImpl {
 
        let mut args = Vec::new();
 
        for (&x, y) in interface.iter().zip(self.main_interface_polarities()) {
 
            match y {
 
                Polarity::Getter => args.push(Value::Input(InputValue(x))),
 
                Polarity::Putter => args.push(Value::Output(OutputValue(x)))
 
                Polarity::Putter => args.push(Value::Output(OutputValue(x))),
 
            }
 
        }
 
        ComponentStateImpl {
 
            prompt: Prompt::new(&self.heap, self.main.upcast(), &args)
 
        }
 
        ComponentStateImpl { prompt: Prompt::new(&self.heap, self.main.upcast(), &args) }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ComponentStateImpl {
 
    prompt: Prompt
 
    prompt: Prompt,
 
}
 
impl ComponentState for ComponentStateImpl {
 
    type D = ProtocolDescriptionImpl;
 

	
 
    fn pre_sync_run<C: MonoContext<D = ProtocolDescriptionImpl, S = Self>>(
 
        &mut self, context: &mut C, pd: &ProtocolDescriptionImpl,
 
        &mut self,
 
        context: &mut C,
 
        pd: &ProtocolDescriptionImpl,
 
    ) -> MonoBlocker {
 
        let mut context = EvalContext::Mono(context);
 
        loop {
 
            let result = self.prompt.step(&pd.heap, &mut context);
 
            match result {
 
                // In component definitions, there are no return statements
 
@@ -100,25 +100,27 @@ impl ComponentState for ComponentStateImpl {
 
                    EvalContinuation::Terminal => return MonoBlocker::ComponentExit,
 
                    EvalContinuation::SyncBlockStart => return MonoBlocker::SyncBlockStart,
 
                    // Not possible to end sync block if never entered one
 
                    EvalContinuation::SyncBlockEnd => unreachable!(),
 
                    EvalContinuation::NewComponent(args) => {
 
                        todo!();
 
                        continue
 
                        continue;
 
                    }
 
                    // Outside synchronous blocks, no fires/get/put happens
 
                    EvalContinuation::BlockFires(val) => unreachable!(),
 
                    EvalContinuation::BlockGet(val) => unreachable!(),
 
                    EvalContinuation::Put(port, msg) => unreachable!()
 
                }
 
                    EvalContinuation::Put(port, msg) => unreachable!(),
 
                },
 
            }
 
        }
 
    }
 

	
 
    fn sync_run<C: PolyContext<D = ProtocolDescriptionImpl>>(
 
        &mut self, context: &mut C,  pd: &ProtocolDescriptionImpl,
 
        &mut self,
 
        context: &mut C,
 
        pd: &ProtocolDescriptionImpl,
 
    ) -> PolyBlocker {
 
        let mut context = EvalContext::Poly(context);
 
        loop {
 
            let result = self.prompt.step(&pd.heap, &mut context);
 
            match result {
 
                // Inside synchronous blocks, there are no return statements
 
@@ -130,69 +132,65 @@ impl ComponentState for ComponentStateImpl {
 
                    EvalContinuation::Terminal => unreachable!(),
 
                    // No nested synchronous blocks
 
                    EvalContinuation::SyncBlockStart => unreachable!(),
 
                    EvalContinuation::SyncBlockEnd => return PolyBlocker::SyncBlockEnd,
 
                    // Not possible to create component in sync block
 
                    EvalContinuation::NewComponent(args) => unreachable!(),
 
                    EvalContinuation::BlockFires(port) => {
 
                        match port {
 
                            Value::Output(OutputValue(key)) => {
 
                                return PolyBlocker::CouldntCheckFiring(key);
 
                            }
 
                            Value::Input(InputValue(key)) => {
 
                                return PolyBlocker::CouldntCheckFiring(key);
 
                            }
 
                            _ => unreachable!()
 
                    EvalContinuation::BlockFires(port) => match port {
 
                        Value::Output(OutputValue(key)) => {
 
                            return PolyBlocker::CouldntCheckFiring(key);
 
                        }
 
                    }
 
                    EvalContinuation::BlockGet(port) => {
 
                        match port {
 
                            Value::Output(OutputValue(key)) => {
 
                                return PolyBlocker::CouldntReadMsg(key);
 
                            }
 
                            Value::Input(InputValue(key)) => {
 
                                return PolyBlocker::CouldntReadMsg(key);
 
                            }
 
                            _ => unreachable!()
 
                        Value::Input(InputValue(key)) => {
 
                            return PolyBlocker::CouldntCheckFiring(key);
 
                        }
 
                    }
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::BlockGet(port) => match port {
 
                        Value::Output(OutputValue(key)) => {
 
                            return PolyBlocker::CouldntReadMsg(key);
 
                        }
 
                        Value::Input(InputValue(key)) => {
 
                            return PolyBlocker::CouldntReadMsg(key);
 
                        }
 
                        _ => unreachable!(),
 
                    },
 
                    EvalContinuation::Put(port, message) => {
 
                        let key;
 
                        match port {
 
                            Value::Output(OutputValue(the_key)) => {
 
                                key = the_key;
 
                            }
 
                            Value::Input(InputValue(the_key)) => {
 
                                key = the_key;
 
                            }
 
                            _ => unreachable!()
 
                            _ => unreachable!(),
 
                        }
 
                        let payload;
 
                        match message {
 
                            Value::Message(MessageValue(None)) => {
 
                                // Putting a null message is inconsistent
 
                                return PolyBlocker::Inconsistent;
 
                            }
 
                            Value::Message(MessageValue(Some(buffer))) => {
 
                                // Create a copy of the payload
 
                                payload = buffer.clone();
 
                            }
 
                            _ => unreachable!()
 
                            _ => unreachable!(),
 
                        }
 
                        return PolyBlocker::PutMsg(key, payload);
 
                    }
 
                }
 
                },
 
            }
 
        }
 
    }
 
}
 

	
 
pub enum EvalContext<'a> {
 
    Mono(&'a mut dyn MonoContext<D = ProtocolDescriptionImpl, S = ComponentStateImpl>),
 
    Poly(&'a mut dyn PolyContext<D = ProtocolDescriptionImpl>),
 
    None
 
    None,
 
}
 
impl EvalContext<'_> {
 
    fn random(&mut self) -> LongValue {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => todo!(),
 
@@ -207,37 +205,27 @@ impl EvalContext<'_> {
 
        }
 
    }
 
    fn fires(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => unreachable!(),
 
            EvalContext::Poly(context) => {
 
                match port {
 
                    Value::Output(OutputValue(key)) => {
 
                        context.is_firing(key).map(Value::from)
 
                    }
 
                    Value::Input(InputValue(key)) => {
 
                        context.is_firing(key).map(Value::from)
 
                    }
 
                    _ => unreachable!()
 
                }
 
            }
 
            EvalContext::Poly(context) => match port {
 
                Value::Output(OutputValue(key)) => context.is_firing(key).map(Value::from),
 
                Value::Input(InputValue(key)) => context.is_firing(key).map(Value::from),
 
                _ => unreachable!(),
 
            },
 
        }
 
    }
 
    fn get(&mut self, port: Value) -> Option<Value> {
 
        match self {
 
            EvalContext::None => unreachable!(),
 
            EvalContext::Mono(context) => unreachable!(),
 
            EvalContext::Poly(context) => {
 
                match port {
 
                    Value::Output(OutputValue(key)) => {
 
                        context.read_msg(key).map(Value::receive_message)
 
                    }
 
                    Value::Input(InputValue(key)) => {
 
                        context.read_msg(key).map(Value::receive_message)
 
                    }
 
                    _ => unreachable!()
 
            EvalContext::Poly(context) => match port {
 
                Value::Output(OutputValue(key)) => {
 
                    context.read_msg(key).map(Value::receive_message)
 
                }
 
                Value::Input(InputValue(key)) => context.read_msg(key).map(Value::receive_message),
 
                _ => unreachable!(),
 
            },
 
        }
 
    }
 
}
 
\ No newline at end of file
 
}
src/runtime/actors.rs
Show inline comments
 
@@ -52,15 +52,14 @@ impl PolyP {
 
        &mut self,
 
        mut m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        mut to_run: Vec<(Predicate, BranchP)>,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        use SyncRunResult as Srr;
 
        let cid = m_ctx.inner.channel_id_stream.controller_id;
 
        log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,);
 
        while let Some((mut predicate, mut branch)) = to_run.pop() {
 
        'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() {
 
            let mut r_ctx = BranchPContext {
 
                m_ctx: m_ctx.reborrow(),
 
                ekeys: &self.ekeys,
 
                predicate: &predicate,
 
                inbox: &branch.inbox,
 
            };
 
@@ -108,52 +107,51 @@ impl PolyP {
 
                    }
 
                    assert!(predicate.replace_assignment(channel_id, true).is_none());
 
                    to_run.push((predicate, branch));
 
                    to_run.push((predicate_f, branch_f));
 
                }
 
                Sb::SyncBlockEnd => {
 
                    let ControllerInner { logger, endpoint_exts, .. } = m_ctx.inner;
 
                    log!(
 
                        &mut m_ctx.inner.logger,
 
                        "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                        logger,
 
                        "~ ... ran {:?} reached SyncBlockEnd with pred {:?} ...",
 
                        m_ctx.my_subtree_id,
 
                        &predicate,
 
                        &blocker
 
                    );
 
                    // come up with the predicate for this local solution
 
                    let lookup =
 
                        |&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    let ekeys_channel_id_iter = self.ekeys.iter().map(lookup);
 
                    predicate.batch_assign_nones(ekeys_channel_id_iter, false);
 

	
 
                    // OK now check we really received all the messages we expected to
 
                    let num_fired = predicate.iter_matching(true).count();
 
                    let num_msgs =
 
                        branch.inbox.keys().chain(branch.outbox.keys()).map(lookup).count();
 
                    match num_fired.cmp(&num_msgs) {
 
                        Ordering::Less => unreachable!(),
 
                        Ordering::Greater => log!(
 
                            &mut m_ctx.inner.logger,
 
                            "{:?} with pred {:?} finished but |inbox|+|outbox| < .",
 
                            m_ctx.my_subtree_id,
 
                            &predicate,
 
                        ),
 
                        Ordering::Equal => {
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "{:?} with pred {:?} finished! Storing this solution locally.",
 
                                m_ctx.my_subtree_id,
 
                                &predicate,
 
                            );
 
                            m_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                                m_ctx.my_subtree_id,
 
                                predicate.clone(),
 
                            );
 
                            // store the solution for recovering later
 
                            self.complete.insert(predicate, branch);
 
                    for ekey in self.ekeys.iter() {
 
                        let channel_id = endpoint_exts.get(*ekey).unwrap().info.channel_id;
 
                        let fired =
 
                            branch.inbox.contains_key(ekey) || branch.outbox.contains_key(ekey);
 
                        match predicate.query(channel_id) {
 
                            Some(true) => {
 
                                if !fired {
 
                                    // This branch should have fired but didn't!
 
                                    log!(
 
                                        logger,
 
                                        "~ ... ... should have fired {:?} and didn't! pruning!",
 
                                        channel_id,
 
                                    );
 
                                    continue 'to_run_loop;
 
                                }
 
                            }
 
                            Some(false) => assert!(!fired),
 
                            None => {
 
                                predicate.replace_assignment(channel_id, false);
 
                                assert!(!fired)
 
                            }
 
                        }
 
                    }
 
                    log!(logger, "~ ... ... and finished just fine!",);
 
                    m_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut m_ctx.inner.logger,
 
                        m_ctx.my_subtree_id,
 
                        predicate.clone(),
 
                    );
 
                    self.complete.insert(predicate, branch);
 
                }
 
                Sb::PutMsg(ekey, payload) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let EndpointExt { info, endpoint } =
 
                        m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap();
 
                    if predicate.replace_assignment(info.channel_id, true) != Some(false) {
 
@@ -222,28 +220,25 @@ impl PolyP {
 
                .drain()
 
                .filter_map(|(old_predicate, mut branch)| {
 
                    use CommonSatResult as Csr;
 
                    match old_predicate.common_satisfier(&payload_predicate) {
 
                        Csr::FormerNotLatter | Csr::Equivalent => {
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}",
 
                              
 
                                &old_predicate
 
                            );
 
                            // old_predicate COVERS the assumptions of payload_predicate
 
                            let was = branch.inbox.insert(ekey, payload.clone());
 
                            assert!(was.is_none()); // INBOX MUST BE EMPTY!
 
                            Some((old_predicate, branch))
 
                        }
 
                        Csr::New(new) => {
 

	
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING",
 
                              
 
                                &payload_predicate,
 
                                &old_predicate,
 
                                &new,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
@@ -252,17 +247,15 @@ impl PolyP {
 

	
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            Some((new, payload_branch))
 
                        }
 
                        Csr::LatterNotFormer => {
 

	
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING",
 
                           
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // payload_predicate has new assumptions. FORK!
 
                            let mut payload_branch = branch.clone();
 
                            let was = payload_branch.inbox.insert(ekey, payload.clone());
 
@@ -271,15 +264,14 @@ impl PolyP {
 
                            // put the original back untouched
 
                            incomplete2.insert(old_predicate, branch);
 
                            Some((payload_predicate.clone(), payload_branch))
 
                        }
 
                        Csr::Nonexistant => {
 
                            log!(
 
                &mut m_ctx.inner.logger,
 
                                &mut m_ctx.inner.logger,
 
                                "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}",
 
                              
 
                                &old_predicate,
 
                                &payload_predicate,
 
                            );
 
                            // predicates contradict
 
                            incomplete2.insert(old_predicate, branch);
 
                            None
 
@@ -317,24 +309,77 @@ impl PolyP {
 
}
 

	
 
impl PolyN {
 
    pub fn sync_recv(
 
        &mut self,
 
        ekey: Key,
 
        logger: &mut String,
 
        payload: Payload,
 
        payload_predicate: Predicate,
 
        solution_storage: &mut SolutionStorage,
 
    ) {
 
        for (predicate, branch) in self.branches.iter_mut() {
 
            if branch.to_get.remove(&ekey) {
 
                branch.gotten.insert(ekey, payload.clone());
 
                if branch.to_get.is_empty() {
 
                    solution_storage
 
                        .submit_and_digest_subtree_solution(SubtreeId::PolyN, predicate.clone());
 
        let mut branches2: HashMap<_, _> = Default::default();
 
        for (old_predicate, mut branch) in self.branches.drain() {
 
            use CommonSatResult as Csr;
 
            let case = old_predicate.common_satisfier(&payload_predicate);
 
            let mut report_if_solution =
 
                |branch: &BranchN, pred: &Predicate, logger: &mut String| {
 
                    if branch.to_get.is_empty() {
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            logger,
 
                            SubtreeId::PolyN,
 
                            pred.clone(),
 
                        );
 
                    }
 
                };
 
            log!(
 
                logger,
 
                "Feeding msg {:?} {:?} to native branch with pred {:?}. Predicate case {:?}",
 
                &payload_predicate,
 
                &payload,
 
                &old_predicate,
 
                &case
 
            );
 
            match case {
 
                Csr::Nonexistant => { /* skip branch */ }
 
                Csr::FormerNotLatter | Csr::Equivalent => {
 
                    // Feed the message to this branch in-place. no need to modify pred.
 
                    if branch.to_get.remove(&ekey) {
 
                        branch.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&branch, &old_predicate, logger);
 
                    }
 
                }
 
                Csr::LatterNotFormer => {
 
                    // create a new branch with the payload_predicate.
 
                    let mut forked = branch.clone();
 
                    if forked.to_get.remove(&ekey) {
 
                        forked.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&forked, &payload_predicate, logger);
 
                        branches2.insert(payload_predicate.clone(), forked);
 
                    }
 
                }
 
                Csr::New(new) => {
 
                    // create a new branch with the newly-created predicate
 
                    let mut forked = branch.clone();
 
                    if forked.to_get.remove(&ekey) {
 
                        forked.gotten.insert(ekey, payload.clone());
 
                        report_if_solution(&forked, &new, logger);
 
                        branches2.insert(new.clone(), forked);
 
                    }
 
                }
 
            }
 
            // unlike PolyP machines, Native branches do not become inconsistent
 
            branches2.insert(old_predicate, branch);
 
        }
 
        log!(
 
            logger,
 
            "Native now has {} branches with predicates: {:?}",
 
            branches2.len(),
 
            branches2.keys().collect::<Vec<_>>()
 
        );
 
        std::mem::swap(&mut branches2, &mut self.branches);
 
    }
 

	
 
    pub fn become_mono(
 
        mut self,
 
        decision: &Predicate,
 
        table_row: &mut HashMap<Key, Payload>,
src/runtime/communication.rs
Show inline comments
 
@@ -95,27 +95,38 @@ impl Controller {
 
                // TODO what do I do with redundant predicates?
 
                unimplemented!(
 
                    "Having multiple batches with the same
 
                    predicate requires the support of oracle boolean variables"
 
                )
 
            }
 
            let branch = BranchN {
 
                to_get: true_ekeys.collect(),
 
                gotten: Default::default(),
 
                sync_batch_index,
 
            };
 
            let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
            for (ekey, payload) in puts {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
                    &payload,
 
                    &predicate,
 
                    sync_batch_index,
 
                );
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch (batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral
 
                    .solution_storage
 
                    .submit_and_digest_subtree_solution(SubtreeId::PolyN, predicate.clone());
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ekeys, branches })
 
    }
 

	
 
@@ -312,15 +323,17 @@ impl Controller {
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received elaboration from child for subtree {:?}: {:?}",
 
                        subtree_id,
 
                        &partial_oracle
 
                    );
 
                    self.ephemeral
 
                        .solution_storage
 
                        .submit_and_digest_subtree_solution(subtree_id, partial_oracle);
 
                    self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut self.inner.logger,
 
                        subtree_id,
 
                        partial_oracle,
 
                    );
 

	
 
                    if self.handle_locals_maybe_decide()? {
 
                        return Ok(());
 
                    }
 
                }
 
                CommMsgContents::Announce { oracle } => {
 
@@ -348,21 +361,25 @@ impl Controller {
 
                    );
 
                    match subtree_id {
 
                        None => {
 
                            // this happens when a message is sent to a component that has exited.
 
                            // It's safe to drop this message;
 
                            // The sender branch will certainly not be part of the solution
 
                            continue 'recv_loop;
 
                        }
 
                        Some(PolyId::N) => {
 
                            // Message for NativeMachine
 
                            self.ephemeral.poly_n.as_mut().unwrap().sync_recv(
 
                                received.recipient,
 
                                &mut self.inner.logger,
 
                                payload,
 
                                payload_predicate,
 
                                &mut self.ephemeral.solution_storage,
 
                            );
 
                            if self.handle_locals_maybe_decide()? {
 
                                return Ok(());
 
                            }
 
                        }
 
                        Some(PolyId::P { index }) => {
 
                            // Message for protocol actor
 
                            let channel_id = self
 
                                .inner
 
                                .endpoint_exts
 
@@ -543,49 +560,60 @@ impl SolutionStorage {
 
            local
 
        })
 
    }
 

	
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut String,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(predicate, set_visitor, old_local, new_local);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
        }
 
    }
 

	
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut String,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep traversing
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        logger,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl PolyContext for BranchPContext<'_, '_> {
src/runtime/endpoint.rs
Show inline comments
 
@@ -58,13 +58,13 @@ pub struct NetworkEndpoint {
 
impl std::fmt::Debug for Endpoint {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
 
        let s = match self {
 
            Endpoint::Memory { .. } => "Memory",
 
            Endpoint::Network(..) => "Network",
 
        };
 
        write!(f, "Endpoint::{}", s)
 
        f.write_fmt(format_args!("Endpoint::{}", s))
 
    }
 
}
 

	
 
impl CommMsgContents {
 
    pub fn into_msg(self, round_index: usize) -> Msg {
 
        Msg::CommMsg(CommMsg { round_index, contents: self })
src/runtime/mod.rs
Show inline comments
 
@@ -441,22 +441,22 @@ impl Predicate {
 
    pub fn new_trivial() -> Self {
 
        Self { assigned: Default::default() }
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        f.pad("{")?;
 
        for (ChannelId { controller_id, channel_index }, &v) in self.assigned.iter() {
 
            write!(
 
                f,
 
            f.write_fmt(format_args!(
 
                "({:?},{:?})=>{}, ",
 
                controller_id,
 
                channel_index,
 
                if v { 'T' } else { 'F' }
 
            )?;
 
            ))?
 
        }
 
        Ok(())
 
        f.pad("}")
 
    }
 
}
 

	
 
#[test]
 
fn pred_sat() {
 
    use maplit::btreemap;
src/test/connector.rs
Show inline comments
 
extern crate test_generator;
 

	
 
use super::*;
 

	
 
use std::fs;
 
use std::path::Path;
 
use std::thread;
 
use test_generator::test_resources;
 

	
 
use crate::common::*;
 
use crate::runtime::*;
 
use crate::runtime::errors::*;
 
use crate::runtime::{errors::*, PortBinding::*, *};
 

	
 
// using a static AtomicU16, shared between all tests in the binary,
 
// allocate and return a socketaddr of the form 127.0.0.1:X where X in 7000..
 
fn next_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(7_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port).into()
 
}
 

	
 
#[test]
 
fn incremental() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = ["127.0.0.1:7010".parse().unwrap(), "127.0.0.1:7011".parse().unwrap()];
 
    let a = thread::spawn(move || {
 
        let controller_id = 0;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(out a, out b) {
 
            synchronous {
 
                msg m = create(0);
 
                put(a, m);
 
            }
 
        }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
    });
 
    let b = thread::spawn(move || {
 
        let controller_id = 1;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(in a, in b) {
 
            synchronous {
 
                get(a);
 
            }
 
        }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Active(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
    });
 
    handle(a.join());
 
    handle(b.join());
 
    let addrs = [next_addr(), next_addr()];
 
    let handles = vec![
 
        thread::spawn(move || {
 
            let controller_id = 0;
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
            x.configure(
 
                b"primitive main(out a, out b) {
 
                    synchronous {
 
                        msg m = create(0);
 
                        put(a, m);
 
                    }
 
                }",
 
            )
 
            .unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        }),
 
        thread::spawn(move || {
 
            let controller_id = 1;
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
            x.configure(
 
                b"primitive main(in a, in b) {
 
                    synchronous {
 
                        get(a);
 
                    }
 
                }",
 
            )
 
            .unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Active(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        }),
 
    ];
 
    for h in handles {
 
        handle(h.join())
 
    }
 
}
 

	
 
#[test]
 
fn duo_positive() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = ["127.0.0.1:7012".parse().unwrap(), "127.0.0.1:7013".parse().unwrap()];
 
    let addrs = [next_addr(), next_addr()];
 
    let a = thread::spawn(move || {
 
        let controller_id = 0;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"
 
        primitive main(out a, out b) {
 
            synchronous {}
 
            synchronous {}
 
            synchronous {
 
                msg m = create(0);
 
                put(a, m);
 
            }
 
            synchronous {
 
                msg m = create(0);
 
                put(b, m);
 
            }
 
        }",
 
            b"primitive main(out a, out b) {
 
                synchronous {}
 
                synchronous {}
 
                synchronous {
 
                    msg m = create(0);
 
                    put(a, m);
 
                }
 
                synchronous {
 
                    msg m = create(0);
 
                    put(b, m);
 
                }
 
            }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap();
 
        x.bind_port(0, Passive(addrs[0])).unwrap();
 
        x.bind_port(1, Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
    });
 
    let b = thread::spawn(move || {
 
        let controller_id = 1;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"
 
        primitive main(in a, in b) {
 
            while (true) {
 
                synchronous {
 
                    if (fires(a)) {
 
                        get(a);
 
            b"primitive main(in a, in b) {
 
                while (true) {
 
                    synchronous {
 
                        if (fires(a)) {
 
                            get(a);
 
                        }
 
                    }
 
                }
 
                synchronous {
 
                    if (fires(b)) {
 
                        get(b);
 
                    synchronous {
 
                        if (fires(b)) {
 
                            get(b);
 
                        }
 
                    }
 
                }
 
            }
 
        }",
 
            }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Active(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Active(addrs[1])).unwrap();
 
        x.bind_port(0, Active(addrs[0])).unwrap();
 
        x.bind_port(1, Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
@@ -120,61 +129,127 @@ fn duo_positive() {
 
    handle(b.join());
 
}
 

	
 
#[test]
 
fn duo_negative() {
 
    let timeout = Duration::from_millis(500);
 
    let addrs = ["127.0.0.1:7014".parse().unwrap(), "127.0.0.1:7015".parse().unwrap()];
 
    let addrs = [next_addr(), next_addr()];
 
    let a = thread::spawn(move || {
 
        let controller_id = 0;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(b"
 
        primitive main(out a, out b) {
 
            synchronous {}
 
            synchronous {
 
                msg m = create(0);
 
                put(a, m); // fires a on second round
 
            }
 
        }").unwrap();
 
        x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap();
 
        x.configure(
 
            b"primitive main(out a, out b) {
 
                synchronous {}
 
                synchronous {
 
                    msg m = create(0);
 
                    put(a, m); // fires a on second round
 
                }
 
            }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Passive(addrs[0])).unwrap();
 
        x.bind_port(1, Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        let r = x.sync(timeout);
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        match r {
 
            Err(SyncErr::Timeout) => {}
 
            x => unreachable!("{:?}", x)
 
            x => unreachable!("{:?}", x),
 
        }
 
    });
 
    let b = thread::spawn(move || {
 
        let controller_id = 1;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(b"
 
        primitive main(in a, in b) {
 
            while (true) {
 
                synchronous {
 
                    if (fires(a)) {
 
                        get(a);
 
        x.configure(
 
            b"primitive main(in a, in b) {
 
                while (true) {
 
                    synchronous {
 
                        if (fires(a)) {
 
                            get(a);
 
                        }
 
                    }
 
                }
 
                synchronous {
 
                    if (fires(b)) { // never fire a on even round
 
                        get(b);
 
                    synchronous {
 
                        if (fires(b)) { // never fire a on even round
 
                            get(b);
 
                        }
 
                    }
 
                }
 
            }
 
        }").unwrap();
 
        x.bind_port(0, PortBinding::Active(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Active(addrs[1])).unwrap();
 
            }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Active(addrs[0])).unwrap();
 
        x.bind_port(1, Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        let r = x.sync(timeout);
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        match r {
 
            Err(SyncErr::Timeout) => {}
 
            x => unreachable!("{:?}", x)
 
            x => unreachable!("{:?}", x),
 
        }
 
    });
 
    handle(a.join());
 
    handle(b.join());
 
}
 

	
 
#[test]
 
fn connect_natives() {
 
    static CHAIN: &[u8] = b"
 
    primitive main(in i, out o) {
 
        while(true) synchronous {}
 
    }";
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    do_all(&[
 
        &|x| {
 
            x.configure(CHAIN).unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
        },
 
        &|x| {
 
            x.configure(CHAIN).unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
        },
 
    ]);
 
}
 

	
 
#[test]
 
fn forward() {
 
    static FORWARD: &[u8] = b"
 
    primitive main(in i, out o) {
 
        while(true) synchronous {
 
            put(o, get(i));
 
        }
 
    }";
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    do_all(&[
 
        //
 
        &|x| {
 
            x.configure(FORWARD).unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let msg = b"HELLO!".to_vec();
 
            x.put(0, msg).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
        },
 
        &|x| {
 
            x.configure(FORWARD).unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let expect = b"HELLO!".to_vec();
 
            x.get(0).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            assert_eq!(expect, x.read_gotten(0).unwrap());
 
        },
 
    ]);
 
}
src/test/mod.rs
Show inline comments
 
use crate::common::ControllerId;
 
use crate::runtime::Connector;
 
use crate::runtime::Unconfigured;
 
use core::fmt::Debug;
 

	
 
mod connector;
 
mod setup;
 

	
 
struct Panicked(Box<dyn std::any::Any>);
 
@@ -12,14 +15,54 @@ impl Debug for Panicked {
 
            f.pad(string)
 
        } else {
 
            f.pad("Box<Any>")
 
        }
 
    }
 
}
 
fn handle(result: Result<(), std::boxed::Box<(dyn std::any::Any + std::marker::Send + 'static)>>) {
 
    match result {
 
        Ok(_) => {}
 
        Err(x) => {
 
            panic!("Worker panicked: {:?}", Panicked(x));
 
fn handle(result: Result<(), Box<(dyn std::any::Any + Send + 'static)>>) {
 
    if let Err(x) = result {
 
        panic!("Worker panicked: {:?}", Panicked(x))
 
    }
 
}
 

	
 
fn do_all(i: &[&(dyn Fn(&mut Connector) + Sync)]) {
 
    let cid_iter = 0..(i.len() as ControllerId);
 
    let mut connectors = cid_iter
 
        .clone()
 
        .map(|controller_id| Connector::Unconfigured(Unconfigured { controller_id }))
 
        .collect::<Vec<_>>();
 

	
 
    let mut results = vec![];
 
    crossbeam_utils::thread::scope(|s| {
 
        let handles: Vec<_> = i
 
            .iter()
 
            .zip(connectors.iter_mut())
 
            .map(|(func, connector)| s.spawn(move |_| func(connector)))
 
            .collect();
 
        for h in handles {
 
            results.push(h.join());
 
        }
 
    })
 
    .unwrap();
 

	
 
    let mut failures = false;
 

	
 
    for ((controller_id, connector), res) in
 
        cid_iter.zip(connectors.iter_mut()).zip(results.into_iter())
 
    {
 
        println!("====================\n CID {:?} ...", controller_id);
 
        match connector.get_mut_logger() {
 
            Some(logger) => println!("{}", logger),
 
            None => println!("<No Log>"),
 
        }
 
        match res {
 
            Ok(()) => println!("CID {:?} OK!", controller_id),
 
            Err(e) => {
 
                failures = true;
 
                println!("CI {:?} PANIC! {:?}", controller_id, Panicked(e));
 
            }
 
        };
 
    }
 
    if failures {
 
        panic!("FAILURES!");
 
    }
 
}
0 comments (0 inline, 0 general)