From a226385adc2d1eb53cb642436aec18ffd0ed4964 2020-02-05 16:50:00 From: Christopher Esterhuyse Date: 2020-02-05 16:50:00 Subject: [PATCH] natives working --- diff --git a/Cargo.toml b/Cargo.toml index 2f05a3e211af95e51c6c5df665f9d111756e0765..75b3a4215d44c41b9c752867d3a21c40acc63490 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ backtrace = "0.3" [dev-dependencies] test-generator = "0.3.0" +crossbeam-utils = "0.7.0" [lib] crate-type = ["cdylib"] diff --git a/src/protocol/ast.rs b/src/protocol/ast.rs index 9df653dede77fb9515afcc835ff18a68c61548f4..8de88d47194be4ef92063ac95d3e4139b4ee6db1 100644 --- a/src/protocol/ast.rs +++ b/src/protocol/ast.rs @@ -764,13 +764,8 @@ impl Heap { pub fn alloc_import(&mut self, f: impl FnOnce(ImportId) -> Import) -> ImportId { ImportId(self.imports.alloc_with_id(|id| f(ImportId(id)))) } - pub fn alloc_protocol_description( - &mut self, - f: impl FnOnce(RootId) -> Root, - ) -> RootId { - RootId( - self.protocol_descriptions.alloc_with_id(|id| f(RootId(id))), - ) + pub fn alloc_protocol_description(&mut self, f: impl FnOnce(RootId) -> Root) -> RootId { + RootId(self.protocol_descriptions.alloc_with_id(|id| f(RootId(id)))) } pub fn alloc_imported_declaration( &mut self, @@ -1636,7 +1631,7 @@ impl Definition { match self { Definition::Component(_) => true, _ => false, - } + } } pub fn as_component(&self) -> &Component { match self { diff --git a/src/protocol/eval.rs b/src/protocol/eval.rs index 5f1166344a0335178806023514058faeefee7e5a..37c4580088d9d32a2d128f9e417d4dfe8ee2435b 100644 --- a/src/protocol/eval.rs +++ b/src/protocol/eval.rs @@ -54,7 +54,7 @@ impl Value { fn create_message(length: Value) -> Value { match length { Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => { - let length : i64 = i64::from(length); + let length: i64 = i64::from(length); if length < 0 || length > MESSAGE_MAX_LENGTH { // Only messages within the expected length are allowed Value::Message(MessageValue(None)) @@ -62,7 +62,7 @@ impl Value { Value::Message(MessageValue(Some(vec![0; length.try_into().unwrap()]))) } } - _ => unimplemented!() + _ => unimplemented!(), } } fn from_constant(constant: &Constant) -> Value { @@ -89,7 +89,7 @@ impl Value { } fn set(&mut self, index: &Value, value: &Value) -> Option { // The index must be of integer type, and non-negative - let the_index : usize; + let the_index: usize; match index { Value::Byte(_) | Value::Short(_) | Value::Int(_) | Value::Long(_) => { let index = i64::from(index); @@ -99,7 +99,7 @@ impl Value { } the_index = index.try_into().unwrap(); } - _ => unreachable!() + _ => unreachable!(), } // The subject must be either a message or an array // And the value and the subject must be compatible @@ -142,7 +142,7 @@ impl Value { (Value::ShortArray(_), Value::Short(_)) => todo!(), (Value::IntArray(_), Value::Int(_)) => todo!(), (Value::LongArray(_), Value::Long(_)) => todo!(), - _ => unreachable!() + _ => unreachable!(), } } fn plus(&self, other: &Value) -> Value { @@ -891,7 +891,7 @@ impl Display for MessageValue { } } write!(f, ")") - }, + } } } } @@ -1334,7 +1334,13 @@ impl Store { // Overwrite mapping self.map.insert(var, value.clone()); } - fn update(&mut self, h: &Heap, ctx: &mut EvalContext, lexpr: ExpressionId, value: Value) -> EvalResult { + fn update( + &mut self, + h: &Heap, + ctx: &mut EvalContext, + lexpr: ExpressionId, + value: Value, + ) -> EvalResult { match &h[lexpr] { Expression::Variable(var) => { let var = var.declaration.unwrap(); @@ -1359,7 +1365,7 @@ impl Store { } match subject.set(&index, &value) { Some(value) => Ok(value), - None => Err(EvalContinuation::Inconsistent) + None => Err(EvalContinuation::Inconsistent), } } _ => unimplemented!("{:?}", h[lexpr]), @@ -1405,7 +1411,7 @@ impl Store { } Expression::Binary(expr) => { let left = self.eval(h, ctx, expr.left)?; - let right = self.eval(h, ctx,expr.right)?; + let right = self.eval(h, ctx, expr.right)?; match expr.operation { BinaryOperator::Equality => Ok(left.eq(&right)), BinaryOperator::Inequality => Ok(left.neq(&right)), @@ -1443,32 +1449,30 @@ impl Store { Expression::Select(expr) => self.get(h, expr.this.upcast()), Expression::Array(expr) => unimplemented!(), Expression::Constant(expr) => Ok(Value::from_constant(&expr.value)), - Expression::Call(expr) => { - match expr.method { - Method::Create => { - assert_eq!(1, expr.arguments.len()); - let length = self.eval(h, ctx, expr.arguments[0])?; - Ok(Value::create_message(length)) - } - Method::Fires => { - assert_eq!(1, expr.arguments.len()); - let value = self.eval(h, ctx, expr.arguments[0])?; - match ctx.fires(value.clone()) { - None => Err(EvalContinuation::BlockFires(value)), - Some(result) => Ok(result), - } + Expression::Call(expr) => match expr.method { + Method::Create => { + assert_eq!(1, expr.arguments.len()); + let length = self.eval(h, ctx, expr.arguments[0])?; + Ok(Value::create_message(length)) + } + Method::Fires => { + assert_eq!(1, expr.arguments.len()); + let value = self.eval(h, ctx, expr.arguments[0])?; + match ctx.fires(value.clone()) { + None => Err(EvalContinuation::BlockFires(value)), + Some(result) => Ok(result), } - Method::Get => { - assert_eq!(1, expr.arguments.len()); - let value = self.eval(h, ctx, expr.arguments[0])?; - match ctx.get(value.clone()) { - None => Err(EvalContinuation::BlockGet(value)), - Some(result) => Ok(result) - } + } + Method::Get => { + assert_eq!(1, expr.arguments.len()); + let value = self.eval(h, ctx, expr.arguments[0])?; + match ctx.get(value.clone()) { + None => Err(EvalContinuation::BlockGet(value)), + Some(result) => Ok(result), } - Method::Symbolic(symbol) => unimplemented!() } - } + Method::Symbolic(symbol) => unimplemented!(), + }, Expression::Variable(expr) => self.get(h, expr.this.upcast()), } } @@ -1496,11 +1500,8 @@ pub struct Prompt { impl Prompt { pub fn new(h: &Heap, def: DefinitionId, args: &Vec) -> Self { - let mut prompt = Prompt { - definition: def, - store: Store::new(), - position: Some((&h[def]).body()) - }; + let mut prompt = + Prompt { definition: def, store: Store::new(), position: Some((&h[def]).body()) }; prompt.set_arguments(h, args); prompt } @@ -1642,8 +1643,8 @@ impl Prompt { EvalContinuation::NewComponent(args) => unreachable!(), EvalContinuation::BlockFires(val) => unreachable!(), EvalContinuation::BlockGet(val) => unreachable!(), - EvalContinuation::Put(port, msg) => unreachable!() - } + EvalContinuation::Put(port, msg) => unreachable!(), + }, } } } diff --git a/src/protocol/lexer.rs b/src/protocol/lexer.rs index 48caa4716e6ca486c2436181f3453cfcb8ae7396..9928f47823ed69ca7f575313db73947ea22cee32 100644 --- a/src/protocol/lexer.rs +++ b/src/protocol/lexer.rs @@ -1618,10 +1618,7 @@ impl Lexer<'_> { self.consume_string(b";")?; Ok(h.alloc_import(|this| Import { this, position, value })) } - pub fn consume_protocol_description( - &mut self, - h: &mut Heap, - ) -> Result { + pub fn consume_protocol_description(&mut self, h: &mut Heap) -> Result { let position = self.source.pos(); let mut pragmas = Vec::new(); let mut imports = Vec::new(); diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index d770d19f288f2d7bb087fcbfdbca58a2de1899fd..c3deda22f0cfdfe0559ccda1054ada090fc6ae46 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -7,9 +7,9 @@ mod parser; use crate::common::*; use crate::protocol::ast::*; +use crate::protocol::eval::*; use crate::protocol::inputsource::*; use crate::protocol::parser::*; -use crate::protocol::eval::*; use std::hint::unreachable_unchecked; pub struct ProtocolDescriptionImpl { @@ -69,24 +69,24 @@ impl ProtocolDescription for ProtocolDescriptionImpl { for (&x, y) in interface.iter().zip(self.main_interface_polarities()) { match y { Polarity::Getter => args.push(Value::Input(InputValue(x))), - Polarity::Putter => args.push(Value::Output(OutputValue(x))) + Polarity::Putter => args.push(Value::Output(OutputValue(x))), } } - ComponentStateImpl { - prompt: Prompt::new(&self.heap, self.main.upcast(), &args) - } + ComponentStateImpl { prompt: Prompt::new(&self.heap, self.main.upcast(), &args) } } } #[derive(Debug, Clone)] pub struct ComponentStateImpl { - prompt: Prompt + prompt: Prompt, } impl ComponentState for ComponentStateImpl { type D = ProtocolDescriptionImpl; fn pre_sync_run>( - &mut self, context: &mut C, pd: &ProtocolDescriptionImpl, + &mut self, + context: &mut C, + pd: &ProtocolDescriptionImpl, ) -> MonoBlocker { let mut context = EvalContext::Mono(context); loop { @@ -103,19 +103,21 @@ impl ComponentState for ComponentStateImpl { EvalContinuation::SyncBlockEnd => unreachable!(), EvalContinuation::NewComponent(args) => { todo!(); - continue + continue; } // Outside synchronous blocks, no fires/get/put happens EvalContinuation::BlockFires(val) => unreachable!(), EvalContinuation::BlockGet(val) => unreachable!(), - EvalContinuation::Put(port, msg) => unreachable!() - } + EvalContinuation::Put(port, msg) => unreachable!(), + }, } } } fn sync_run>( - &mut self, context: &mut C, pd: &ProtocolDescriptionImpl, + &mut self, + context: &mut C, + pd: &ProtocolDescriptionImpl, ) -> PolyBlocker { let mut context = EvalContext::Poly(context); loop { @@ -133,28 +135,24 @@ impl ComponentState for ComponentStateImpl { EvalContinuation::SyncBlockEnd => return PolyBlocker::SyncBlockEnd, // Not possible to create component in sync block EvalContinuation::NewComponent(args) => unreachable!(), - EvalContinuation::BlockFires(port) => { - match port { - Value::Output(OutputValue(key)) => { - return PolyBlocker::CouldntCheckFiring(key); - } - Value::Input(InputValue(key)) => { - return PolyBlocker::CouldntCheckFiring(key); - } - _ => unreachable!() + EvalContinuation::BlockFires(port) => match port { + Value::Output(OutputValue(key)) => { + return PolyBlocker::CouldntCheckFiring(key); } - } - EvalContinuation::BlockGet(port) => { - match port { - Value::Output(OutputValue(key)) => { - return PolyBlocker::CouldntReadMsg(key); - } - Value::Input(InputValue(key)) => { - return PolyBlocker::CouldntReadMsg(key); - } - _ => unreachable!() + Value::Input(InputValue(key)) => { + return PolyBlocker::CouldntCheckFiring(key); } - } + _ => unreachable!(), + }, + EvalContinuation::BlockGet(port) => match port { + Value::Output(OutputValue(key)) => { + return PolyBlocker::CouldntReadMsg(key); + } + Value::Input(InputValue(key)) => { + return PolyBlocker::CouldntReadMsg(key); + } + _ => unreachable!(), + }, EvalContinuation::Put(port, message) => { let key; match port { @@ -164,7 +162,7 @@ impl ComponentState for ComponentStateImpl { Value::Input(InputValue(the_key)) => { key = the_key; } - _ => unreachable!() + _ => unreachable!(), } let payload; match message { @@ -176,11 +174,11 @@ impl ComponentState for ComponentStateImpl { // Create a copy of the payload payload = buffer.clone(); } - _ => unreachable!() + _ => unreachable!(), } return PolyBlocker::PutMsg(key, payload); } - } + }, } } } @@ -189,7 +187,7 @@ impl ComponentState for ComponentStateImpl { pub enum EvalContext<'a> { Mono(&'a mut dyn MonoContext), Poly(&'a mut dyn PolyContext), - None + None, } impl EvalContext<'_> { fn random(&mut self) -> LongValue { @@ -210,34 +208,24 @@ impl EvalContext<'_> { match self { EvalContext::None => unreachable!(), EvalContext::Mono(context) => unreachable!(), - EvalContext::Poly(context) => { - match port { - Value::Output(OutputValue(key)) => { - context.is_firing(key).map(Value::from) - } - Value::Input(InputValue(key)) => { - context.is_firing(key).map(Value::from) - } - _ => unreachable!() - } - } + EvalContext::Poly(context) => match port { + Value::Output(OutputValue(key)) => context.is_firing(key).map(Value::from), + Value::Input(InputValue(key)) => context.is_firing(key).map(Value::from), + _ => unreachable!(), + }, } } fn get(&mut self, port: Value) -> Option { match self { EvalContext::None => unreachable!(), EvalContext::Mono(context) => unreachable!(), - EvalContext::Poly(context) => { - match port { - Value::Output(OutputValue(key)) => { - context.read_msg(key).map(Value::receive_message) - } - Value::Input(InputValue(key)) => { - context.read_msg(key).map(Value::receive_message) - } - _ => unreachable!() + EvalContext::Poly(context) => match port { + Value::Output(OutputValue(key)) => { + context.read_msg(key).map(Value::receive_message) } + Value::Input(InputValue(key)) => context.read_msg(key).map(Value::receive_message), + _ => unreachable!(), }, } } -} \ No newline at end of file +} diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index 81bd02c3c84d296c3230280c5a87ed5444909d60..2302702f7bdb5602ab30406b9d2845b8ed1e06d2 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -55,9 +55,8 @@ impl PolyP { mut to_run: Vec<(Predicate, BranchP)>, ) -> Result { use SyncRunResult as Srr; - let cid = m_ctx.inner.channel_id_stream.controller_id; log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,); - while let Some((mut predicate, mut branch)) = to_run.pop() { + 'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() { let mut r_ctx = BranchPContext { m_ctx: m_ctx.reborrow(), ekeys: &self.ekeys, @@ -111,46 +110,45 @@ impl PolyP { to_run.push((predicate_f, branch_f)); } Sb::SyncBlockEnd => { + let ControllerInner { logger, endpoint_exts, .. } = m_ctx.inner; log!( - &mut m_ctx.inner.logger, - "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}", + logger, + "~ ... ran {:?} reached SyncBlockEnd with pred {:?} ...", m_ctx.my_subtree_id, &predicate, - &blocker ); // come up with the predicate for this local solution - let lookup = - |&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; - let ekeys_channel_id_iter = self.ekeys.iter().map(lookup); - predicate.batch_assign_nones(ekeys_channel_id_iter, false); - // OK now check we really received all the messages we expected to - let num_fired = predicate.iter_matching(true).count(); - let num_msgs = - branch.inbox.keys().chain(branch.outbox.keys()).map(lookup).count(); - match num_fired.cmp(&num_msgs) { - Ordering::Less => unreachable!(), - Ordering::Greater => log!( - &mut m_ctx.inner.logger, - "{:?} with pred {:?} finished but |inbox|+|outbox| < .", - m_ctx.my_subtree_id, - &predicate, - ), - Ordering::Equal => { - log!( - &mut m_ctx.inner.logger, - "{:?} with pred {:?} finished! Storing this solution locally.", - m_ctx.my_subtree_id, - &predicate, - ); - m_ctx.solution_storage.submit_and_digest_subtree_solution( - m_ctx.my_subtree_id, - predicate.clone(), - ); - // store the solution for recovering later - self.complete.insert(predicate, branch); + for ekey in self.ekeys.iter() { + let channel_id = endpoint_exts.get(*ekey).unwrap().info.channel_id; + let fired = + branch.inbox.contains_key(ekey) || branch.outbox.contains_key(ekey); + match predicate.query(channel_id) { + Some(true) => { + if !fired { + // This branch should have fired but didn't! + log!( + logger, + "~ ... ... should have fired {:?} and didn't! pruning!", + channel_id, + ); + continue 'to_run_loop; + } + } + Some(false) => assert!(!fired), + None => { + predicate.replace_assignment(channel_id, false); + assert!(!fired) + } } } + log!(logger, "~ ... ... and finished just fine!",); + m_ctx.solution_storage.submit_and_digest_subtree_solution( + &mut m_ctx.inner.logger, + m_ctx.my_subtree_id, + predicate.clone(), + ); + self.complete.insert(predicate, branch); } Sb::PutMsg(ekey, payload) => { assert!(self.ekeys.contains(&ekey)); @@ -225,9 +223,8 @@ impl PolyP { match old_predicate.common_satisfier(&payload_predicate) { Csr::FormerNotLatter | Csr::Equivalent => { log!( - &mut m_ctx.inner.logger, + &mut m_ctx.inner.logger, "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}", - &old_predicate ); // old_predicate COVERS the assumptions of payload_predicate @@ -236,11 +233,9 @@ impl PolyP { Some((old_predicate, branch)) } Csr::New(new) => { - log!( - &mut m_ctx.inner.logger, + &mut m_ctx.inner.logger, "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING", - &payload_predicate, &old_predicate, &new, @@ -255,11 +250,9 @@ impl PolyP { Some((new, payload_branch)) } Csr::LatterNotFormer => { - log!( - &mut m_ctx.inner.logger, + &mut m_ctx.inner.logger, "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING", - &old_predicate, &payload_predicate, ); @@ -274,9 +267,8 @@ impl PolyP { } Csr::Nonexistant => { log!( - &mut m_ctx.inner.logger, + &mut m_ctx.inner.logger, "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}", - &old_predicate, &payload_predicate, ); @@ -320,18 +312,71 @@ impl PolyN { pub fn sync_recv( &mut self, ekey: Key, + logger: &mut String, payload: Payload, + payload_predicate: Predicate, solution_storage: &mut SolutionStorage, ) { - for (predicate, branch) in self.branches.iter_mut() { - if branch.to_get.remove(&ekey) { - branch.gotten.insert(ekey, payload.clone()); - if branch.to_get.is_empty() { - solution_storage - .submit_and_digest_subtree_solution(SubtreeId::PolyN, predicate.clone()); + let mut branches2: HashMap<_, _> = Default::default(); + for (old_predicate, mut branch) in self.branches.drain() { + use CommonSatResult as Csr; + let case = old_predicate.common_satisfier(&payload_predicate); + let mut report_if_solution = + |branch: &BranchN, pred: &Predicate, logger: &mut String| { + if branch.to_get.is_empty() { + solution_storage.submit_and_digest_subtree_solution( + logger, + SubtreeId::PolyN, + pred.clone(), + ); + } + }; + log!( + logger, + "Feeding msg {:?} {:?} to native branch with pred {:?}. Predicate case {:?}", + &payload_predicate, + &payload, + &old_predicate, + &case + ); + match case { + Csr::Nonexistant => { /* skip branch */ } + Csr::FormerNotLatter | Csr::Equivalent => { + // Feed the message to this branch in-place. no need to modify pred. + if branch.to_get.remove(&ekey) { + branch.gotten.insert(ekey, payload.clone()); + report_if_solution(&branch, &old_predicate, logger); + } + } + Csr::LatterNotFormer => { + // create a new branch with the payload_predicate. + let mut forked = branch.clone(); + if forked.to_get.remove(&ekey) { + forked.gotten.insert(ekey, payload.clone()); + report_if_solution(&forked, &payload_predicate, logger); + branches2.insert(payload_predicate.clone(), forked); + } + } + Csr::New(new) => { + // create a new branch with the newly-created predicate + let mut forked = branch.clone(); + if forked.to_get.remove(&ekey) { + forked.gotten.insert(ekey, payload.clone()); + report_if_solution(&forked, &new, logger); + branches2.insert(new.clone(), forked); + } } } + // unlike PolyP machines, Native branches do not become inconsistent + branches2.insert(old_predicate, branch); } + log!( + logger, + "Native now has {} branches with predicates: {:?}", + branches2.len(), + branches2.keys().collect::>() + ); + std::mem::swap(&mut branches2, &mut self.branches); } pub fn become_mono( diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 233df882222cf12f109238462a28fd3648f44650..4f02976bf9624fd2bfc64dba565a5de0b56d3698 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -98,21 +98,32 @@ impl Controller { predicate requires the support of oracle boolean variables" ) } - let branch = BranchN { - to_get: true_ekeys.collect(), - gotten: Default::default(), - sync_batch_index, - }; + let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; for (ekey, payload) in puts { + log!( + &mut self.inner.logger, + "... ... Initial native put msg {:?} pred {:?} batch {:?}", + &payload, + &predicate, + sync_batch_index, + ); let msg = CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } .into_msg(*round_index); endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?; } + log!( + &mut self.inner.logger, + "... Initial native branch (batch index={} with pred {:?}", + sync_batch_index, + &predicate + ); if branch.to_get.is_empty() { - self.ephemeral - .solution_storage - .submit_and_digest_subtree_solution(SubtreeId::PolyN, predicate.clone()); + self.ephemeral.solution_storage.submit_and_digest_subtree_solution( + &mut self.inner.logger, + SubtreeId::PolyN, + predicate.clone(), + ); } branches.insert(predicate, branch); } @@ -315,9 +326,11 @@ impl Controller { subtree_id, &partial_oracle ); - self.ephemeral - .solution_storage - .submit_and_digest_subtree_solution(subtree_id, partial_oracle); + self.ephemeral.solution_storage.submit_and_digest_subtree_solution( + &mut self.inner.logger, + subtree_id, + partial_oracle, + ); if self.handle_locals_maybe_decide()? { return Ok(()); @@ -351,15 +364,19 @@ impl Controller { // this happens when a message is sent to a component that has exited. // It's safe to drop this message; // The sender branch will certainly not be part of the solution - continue 'recv_loop; } Some(PolyId::N) => { // Message for NativeMachine self.ephemeral.poly_n.as_mut().unwrap().sync_recv( received.recipient, + &mut self.inner.logger, payload, + payload_predicate, &mut self.ephemeral.solution_storage, ); + if self.handle_locals_maybe_decide()? { + return Ok(()); + } } Some(PolyId::P { index }) => { // Message for protocol actor @@ -546,9 +563,11 @@ impl SolutionStorage { pub(crate) fn submit_and_digest_subtree_solution( &mut self, + logger: &mut String, subtree_id: SubtreeId, predicate: Predicate, ) { + log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate); let index = self.subtree_id_to_index[&subtree_id]; let left = 0..index; let right = (index + 1)..self.subtree_solutions.len(); @@ -557,11 +576,18 @@ impl SolutionStorage { let was_new = subtree_solutions[index].insert(predicate.clone()); if was_new { let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); - Self::elaborate_into_new_local_rec(predicate, set_visitor, old_local, new_local); + Self::elaborate_into_new_local_rec( + logger, + predicate, + set_visitor, + old_local, + new_local, + ); } } fn elaborate_into_new_local_rec<'a, 'b>( + logger: &mut String, partial: Predicate, mut set_visitor: impl Iterator> + Clone, old_local: &'b HashSet, @@ -572,6 +598,7 @@ impl SolutionStorage { for pred in set.iter() { if let Some(elaborated) = pred.union_with(&partial) { Self::elaborate_into_new_local_rec( + logger, elaborated, set_visitor.clone(), old_local, @@ -583,6 +610,7 @@ impl SolutionStorage { // recursive stop condition. `partial` is a local subtree solution if !old_local.contains(&partial) { // ... and it hasn't been found before + log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); new_local.insert(partial); } } diff --git a/src/runtime/endpoint.rs b/src/runtime/endpoint.rs index 342db8d7230a5216faee4e3464b721fea2b8b0b4..c5f11524fa972919851acdd8a0552aab266bcd03 100644 --- a/src/runtime/endpoint.rs +++ b/src/runtime/endpoint.rs @@ -61,7 +61,7 @@ impl std::fmt::Debug for Endpoint { Endpoint::Memory { .. } => "Memory", Endpoint::Network(..) => "Network", }; - write!(f, "Endpoint::{}", s) + f.write_fmt(format_args!("Endpoint::{}", s)) } } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index deeeaa443d29def446a15f508ead3d5603ba5bcd..8c3629e2eb715bc1ab2241594ec80afbebe396b2 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -444,16 +444,16 @@ impl Predicate { } impl Debug for Predicate { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.pad("{")?; for (ChannelId { controller_id, channel_index }, &v) in self.assigned.iter() { - write!( - f, + f.write_fmt(format_args!( "({:?},{:?})=>{}, ", controller_id, channel_index, if v { 'T' } else { 'F' } - )?; + ))? } - Ok(()) + f.pad("}") } } diff --git a/src/test/connector.rs b/src/test/connector.rs index a6f70c2cb4e9baa585bead6db95d4033287702a6..c328ec417f8cbaa41aba411d0a466cd41c9136aa 100644 --- a/src/test/connector.rs +++ b/src/test/connector.rs @@ -2,83 +2,93 @@ extern crate test_generator; use super::*; -use std::fs; -use std::path::Path; use std::thread; -use test_generator::test_resources; use crate::common::*; -use crate::runtime::*; -use crate::runtime::errors::*; +use crate::runtime::{errors::*, PortBinding::*, *}; + +// using a static AtomicU16, shared between all tests in the binary, +// allocate and return a socketaddr of the form 127.0.0.1:X where X in 7000.. +fn next_addr() -> SocketAddr { + use std::{ + net::{Ipv4Addr, SocketAddrV4}, + sync::atomic::{AtomicU16, Ordering::SeqCst}, + }; + static TEST_PORT: AtomicU16 = AtomicU16::new(7_000); + let port = TEST_PORT.fetch_add(1, SeqCst); + SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port).into() +} #[test] fn incremental() { let timeout = Duration::from_millis(1_500); - let addrs = ["127.0.0.1:7010".parse().unwrap(), "127.0.0.1:7011".parse().unwrap()]; - let a = thread::spawn(move || { - let controller_id = 0; - let mut x = Connector::Unconfigured(Unconfigured { controller_id }); - x.configure( - b"primitive main(out a, out b) { - synchronous { - msg m = create(0); - put(a, m); - } - }", - ) - .unwrap(); - x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap(); - x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap(); - x.connect(timeout).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); - println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); - }); - let b = thread::spawn(move || { - let controller_id = 1; - let mut x = Connector::Unconfigured(Unconfigured { controller_id }); - x.configure( - b"primitive main(in a, in b) { - synchronous { - get(a); - } - }", - ) - .unwrap(); - x.bind_port(0, PortBinding::Active(addrs[0])).unwrap(); - x.bind_port(1, PortBinding::Active(addrs[1])).unwrap(); - x.connect(timeout).unwrap(); - assert_eq!(0, x.sync(timeout).unwrap()); - println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); - }); - handle(a.join()); - handle(b.join()); + let addrs = [next_addr(), next_addr()]; + let handles = vec![ + thread::spawn(move || { + let controller_id = 0; + let mut x = Connector::Unconfigured(Unconfigured { controller_id }); + x.configure( + b"primitive main(out a, out b) { + synchronous { + msg m = create(0); + put(a, m); + } + }", + ) + .unwrap(); + x.bind_port(0, Passive(addrs[0])).unwrap(); + x.bind_port(1, Passive(addrs[1])).unwrap(); + x.connect(timeout).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); + }), + thread::spawn(move || { + let controller_id = 1; + let mut x = Connector::Unconfigured(Unconfigured { controller_id }); + x.configure( + b"primitive main(in a, in b) { + synchronous { + get(a); + } + }", + ) + .unwrap(); + x.bind_port(0, Active(addrs[0])).unwrap(); + x.bind_port(1, Active(addrs[1])).unwrap(); + x.connect(timeout).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); + }), + ]; + for h in handles { + handle(h.join()) + } } #[test] fn duo_positive() { let timeout = Duration::from_millis(1_500); - let addrs = ["127.0.0.1:7012".parse().unwrap(), "127.0.0.1:7013".parse().unwrap()]; + let addrs = [next_addr(), next_addr()]; let a = thread::spawn(move || { let controller_id = 0; let mut x = Connector::Unconfigured(Unconfigured { controller_id }); x.configure( - b" - primitive main(out a, out b) { - synchronous {} - synchronous {} - synchronous { - msg m = create(0); - put(a, m); - } - synchronous { - msg m = create(0); - put(b, m); - } - }", + b"primitive main(out a, out b) { + synchronous {} + synchronous {} + synchronous { + msg m = create(0); + put(a, m); + } + synchronous { + msg m = create(0); + put(b, m); + } + }", ) .unwrap(); - x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap(); - x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap(); + x.bind_port(0, Passive(addrs[0])).unwrap(); + x.bind_port(1, Passive(addrs[1])).unwrap(); x.connect(timeout).unwrap(); assert_eq!(0, x.sync(timeout).unwrap()); assert_eq!(0, x.sync(timeout).unwrap()); @@ -90,25 +100,24 @@ fn duo_positive() { let controller_id = 1; let mut x = Connector::Unconfigured(Unconfigured { controller_id }); x.configure( - b" - primitive main(in a, in b) { - while (true) { - synchronous { - if (fires(a)) { - get(a); + b"primitive main(in a, in b) { + while (true) { + synchronous { + if (fires(a)) { + get(a); + } } - } - synchronous { - if (fires(b)) { - get(b); + synchronous { + if (fires(b)) { + get(b); + } } } - } - }", + }", ) .unwrap(); - x.bind_port(0, PortBinding::Active(addrs[0])).unwrap(); - x.bind_port(1, PortBinding::Active(addrs[1])).unwrap(); + x.bind_port(0, Active(addrs[0])).unwrap(); + x.bind_port(1, Active(addrs[1])).unwrap(); x.connect(timeout).unwrap(); assert_eq!(0, x.sync(timeout).unwrap()); assert_eq!(0, x.sync(timeout).unwrap()); @@ -123,58 +132,124 @@ fn duo_positive() { #[test] fn duo_negative() { let timeout = Duration::from_millis(500); - let addrs = ["127.0.0.1:7014".parse().unwrap(), "127.0.0.1:7015".parse().unwrap()]; + let addrs = [next_addr(), next_addr()]; let a = thread::spawn(move || { let controller_id = 0; let mut x = Connector::Unconfigured(Unconfigured { controller_id }); - x.configure(b" - primitive main(out a, out b) { - synchronous {} - synchronous { - msg m = create(0); - put(a, m); // fires a on second round - } - }").unwrap(); - x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap(); - x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap(); + x.configure( + b"primitive main(out a, out b) { + synchronous {} + synchronous { + msg m = create(0); + put(a, m); // fires a on second round + } + }", + ) + .unwrap(); + x.bind_port(0, Passive(addrs[0])).unwrap(); + x.bind_port(1, Passive(addrs[1])).unwrap(); x.connect(timeout).unwrap(); assert_eq!(0, x.sync(timeout).unwrap()); let r = x.sync(timeout); println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); match r { Err(SyncErr::Timeout) => {} - x => unreachable!("{:?}", x) + x => unreachable!("{:?}", x), } }); let b = thread::spawn(move || { let controller_id = 1; let mut x = Connector::Unconfigured(Unconfigured { controller_id }); - x.configure(b" - primitive main(in a, in b) { - while (true) { - synchronous { - if (fires(a)) { - get(a); + x.configure( + b"primitive main(in a, in b) { + while (true) { + synchronous { + if (fires(a)) { + get(a); + } } - } - synchronous { - if (fires(b)) { // never fire a on even round - get(b); + synchronous { + if (fires(b)) { // never fire a on even round + get(b); + } } } - } - }").unwrap(); - x.bind_port(0, PortBinding::Active(addrs[0])).unwrap(); - x.bind_port(1, PortBinding::Active(addrs[1])).unwrap(); + }", + ) + .unwrap(); + x.bind_port(0, Active(addrs[0])).unwrap(); + x.bind_port(1, Active(addrs[1])).unwrap(); x.connect(timeout).unwrap(); assert_eq!(0, x.sync(timeout).unwrap()); let r = x.sync(timeout); println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap()); match r { Err(SyncErr::Timeout) => {} - x => unreachable!("{:?}", x) + x => unreachable!("{:?}", x), } }); handle(a.join()); handle(b.join()); } + +#[test] +fn connect_natives() { + static CHAIN: &[u8] = b" + primitive main(in i, out o) { + while(true) synchronous {} + }"; + let timeout = Duration::from_millis(1_500); + let addrs = [next_addr()]; + do_all(&[ + &|x| { + x.configure(CHAIN).unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Passive(addrs[0])).unwrap(); + x.connect(timeout).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + }, + &|x| { + x.configure(CHAIN).unwrap(); + x.bind_port(0, Active(addrs[0])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + }, + ]); +} + +#[test] +fn forward() { + static FORWARD: &[u8] = b" + primitive main(in i, out o) { + while(true) synchronous { + put(o, get(i)); + } + }"; + let timeout = Duration::from_millis(1_500); + let addrs = [next_addr()]; + do_all(&[ + // + &|x| { + x.configure(FORWARD).unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Passive(addrs[0])).unwrap(); + x.connect(timeout).unwrap(); + + let msg = b"HELLO!".to_vec(); + x.put(0, msg).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + }, + &|x| { + x.configure(FORWARD).unwrap(); + x.bind_port(0, Active(addrs[0])).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + + let expect = b"HELLO!".to_vec(); + x.get(0).unwrap(); + assert_eq!(0, x.sync(timeout).unwrap()); + assert_eq!(expect, x.read_gotten(0).unwrap()); + }, + ]); +} diff --git a/src/test/mod.rs b/src/test/mod.rs index 19df860ad7ae855b0c1309654005b8f1956e7aa3..329b07c7c7edcaede8ca5f0f7b1be29f588454b3 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -1,3 +1,6 @@ +use crate::common::ControllerId; +use crate::runtime::Connector; +use crate::runtime::Unconfigured; use core::fmt::Debug; mod connector; @@ -15,11 +18,51 @@ impl Debug for Panicked { } } } -fn handle(result: Result<(), std::boxed::Box<(dyn std::any::Any + std::marker::Send + 'static)>>) { - match result { - Ok(_) => {} - Err(x) => { - panic!("Worker panicked: {:?}", Panicked(x)); +fn handle(result: Result<(), Box<(dyn std::any::Any + Send + 'static)>>) { + if let Err(x) = result { + panic!("Worker panicked: {:?}", Panicked(x)) + } +} + +fn do_all(i: &[&(dyn Fn(&mut Connector) + Sync)]) { + let cid_iter = 0..(i.len() as ControllerId); + let mut connectors = cid_iter + .clone() + .map(|controller_id| Connector::Unconfigured(Unconfigured { controller_id })) + .collect::>(); + + let mut results = vec![]; + crossbeam_utils::thread::scope(|s| { + let handles: Vec<_> = i + .iter() + .zip(connectors.iter_mut()) + .map(|(func, connector)| s.spawn(move |_| func(connector))) + .collect(); + for h in handles { + results.push(h.join()); } + }) + .unwrap(); + + let mut failures = false; + + for ((controller_id, connector), res) in + cid_iter.zip(connectors.iter_mut()).zip(results.into_iter()) + { + println!("====================\n CID {:?} ...", controller_id); + match connector.get_mut_logger() { + Some(logger) => println!("{}", logger), + None => println!(""), + } + match res { + Ok(()) => println!("CID {:?} OK!", controller_id), + Err(e) => { + failures = true; + println!("CI {:?} PANIC! {:?}", controller_id, Panicked(e)); + } + }; + } + if failures { + panic!("FAILURES!"); } }