From d230100064869dd9cbc0464614b120786dd756f9 2020-07-03 13:09:20 From: Christopher Esterhuyse Date: 2020-07-03 13:09:20 Subject: [PATCH] bugfix: increment round index on recovery to avoid mixing messages once we switch away from tcp. refactoring communication internals to simplify bookkeeping structures. more consts in tests to make them terser --- diff --git a/src/common.rs b/src/common.rs index feda52f86db093001f6840cd7c3ba370cdf87b61..4d6f9107083e5e953f7f4bd9095d549071aa827a 100644 --- a/src/common.rs +++ b/src/common.rs @@ -81,6 +81,7 @@ pub(crate) enum SyncBlocker { CouldntReadMsg(PortId), CouldntCheckFiring(PortId), PutMsg(PortId, Payload), + NondetChoice { n: u16 }, } pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]); diff --git a/src/protocol/library.rs b/src/protocol/library.rs index 8bdbd29972c8d3aeb45be3474c8cd07c8d0d3619..e2926aefbf7c683a9e5c8c34087303f611d82315 100644 --- a/src/protocol/library.rs +++ b/src/protocol/library.rs @@ -1,6 +1,8 @@ use crate::protocol::ast::*; use crate::protocol::inputsource::*; +// TABBED OUT FOR NOW + pub fn get_declarations(h: &mut Heap, i: ImportId) -> Result, ParseError> { if h[i].value == b"std.reo" { let mut vec = Vec::new(); diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index ab8afb4fb6e22d9cccf63b16b7fe72dfb17dfff0..ebf22a0ae740cc082787d58171826616053c445e 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -3,7 +3,7 @@ mod ast; mod eval; pub(crate) mod inputsource; mod lexer; -mod library; +// mod library; mod parser; use crate::common::*; @@ -32,7 +32,7 @@ pub(crate) enum EvalContext<'a> { impl std::fmt::Debug for ProtocolDescription { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "(A big honkin' protocol description)") + write!(f, "(An opaque protocol description)") } } impl ProtocolDescription { diff --git a/src/protocol/parser.rs b/src/protocol/parser.rs index 7a66d156d202beb48c7ce09d85ca6b798ca398eb..f8b5e1b5b17f47d05fffa08762dd912cf6ca5257 100644 --- a/src/protocol/parser.rs +++ b/src/protocol/parser.rs @@ -1,7 +1,7 @@ use crate::protocol::ast::*; use crate::protocol::inputsource::*; use crate::protocol::lexer::*; -use crate::protocol::library; +// use crate::protocol::library; // The following indirection is needed due to a bug in the cbindgen tool. type Unit = (); @@ -797,12 +797,13 @@ impl Visitor for BuildSymbolDeclarations { Ok(()) } fn visit_import(&mut self, h: &mut Heap, import: ImportId) -> VisitorResult { - let vec = library::get_declarations(h, import)?; - // Destructively iterate over the vector - for decl in vec { - self.checked_add(h, decl)?; - } - Ok(()) + todo!() + // let vec = library::get_declarations(h, import)?; + // // Destructively iterate over the vector + // for decl in vec { + // self.checked_add(h, decl)?; + // } + // Ok(()) } fn visit_symbol_definition(&mut self, h: &mut Heap, definition: DefinitionId) -> VisitorResult { let signature = Signature::from_definition(h, definition); diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 779169ca54a1a4786de5439773d8066b78457ba1..080249437130290a12be7caf72c4bf8183468cb7 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -2,6 +2,16 @@ use super::*; use crate::common::*; //////////////// +#[derive(Default)] +struct GetterBuffer { + getters_and_sends: Vec<(PortId, SendPayloadMsg)>, +} +struct RoundCtx { + solution_storage: SolutionStorage, + spec_var_stream: SpecVarStream, + getter_buffer: GetterBuffer, + deadline: Option, +} struct BranchingNative { branches: HashMap, } @@ -28,6 +38,7 @@ struct BranchingProtoComponent { struct ProtoComponentBranch { inbox: HashMap, state: ComponentState, + untaken_choice: Option, ended: bool, } struct CyclicDrainer<'a, K: Eq + Hash, V> { @@ -38,14 +49,6 @@ struct CyclicDrainInner<'a, K: Eq + Hash, V> { swap: &'a mut HashMap, output: &'a mut HashMap, } -trait PayloadMsgSender { - fn putter_send( - &mut self, - cu: &mut ConnectorUnphased, - putter: PortId, - msg: SendPayloadMsg, - ) -> Result<(), SyncError>; -} trait ReplaceBoolTrue { fn replace_with_true(&mut self) -> bool; } @@ -132,6 +135,7 @@ impl Connector { ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected), ConnectorPhased::Communication(comm) => { comm.round_result = Self::connected_sync(unphased, comm, timeout); + comm.round_index += 1; match &comm.round_result { Ok(None) => unreachable!(), Ok(Some(ok_result)) => Ok(ok_result.batch_index), @@ -148,7 +152,7 @@ impl Connector { timeout: Option, ) -> Result, SyncError> { use SyncError as Se; - let deadline = timeout.map(|to| Instant::now() + to); + // let deadline = timeout.map(|to| Instant::now() + to); log!( cu.logger, "~~~ SYNC called with timeout {:?}; starting round {}", @@ -156,8 +160,6 @@ impl Connector { comm.round_index ); - let mut spec_var_stream = cu.id_manager.new_spec_var_stream(); - // 1. run all proto components to Nonsync blockers let mut branching_proto_components = HashMap::::default(); @@ -207,18 +209,21 @@ impl Connector { branching_proto_components.len(), ); - // NOTE: all msgs in outbox are of form (Getter, Payload) - let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![]; - - // create the solution storage - let mut solution_storage = { - let n = std::iter::once(Route::LocalComponent(ComponentId::Native)); - let c = - cu.proto_components.keys().map(|&id| Route::LocalComponent(ComponentId::Proto(id))); - let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index }); - SolutionStorage::new(n.chain(c).chain(e)) + let mut rctx = RoundCtx { + solution_storage: { + let n = std::iter::once(Route::LocalComponent(ComponentId::Native)); + let c = cu + .proto_components + .keys() + .map(|&id| Route::LocalComponent(ComponentId::Proto(id))); + let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index }); + SolutionStorage::new(n.chain(c).chain(e)) + }, + spec_var_stream: cu.id_manager.new_spec_var_stream(), + getter_buffer: Default::default(), + deadline: timeout.map(|to| Instant::now() + to), }; - log!(cu.logger, "Solution storage initialized"); + log!(cu.logger, "Round context structure initialized"); // 2. kick off the native log!( @@ -226,7 +231,7 @@ impl Connector { "Translating {} native batches into branches...", comm.native_batches.len() ); - let native_branch_spec_var = spec_var_stream.next(); + let native_branch_spec_var = rctx.spec_var_stream.next(); log!(cu.logger, "Native branch spec var is {:?}", native_branch_spec_var); let mut branching_native = BranchingNative { branches: Default::default() }; 'native_branches: for ((native_branch, index), branch_spec_val) in @@ -259,7 +264,7 @@ impl Connector { for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg); - payloads_to_get.putter_send(cu, putter, msg)?; + rctx.getter_buffer.putter_add(cu, putter, msg)?; } if to_get.is_empty() { log!( @@ -268,16 +273,16 @@ impl Connector { index, &predicate ); - solution_storage.submit_and_digest_subtree_solution( + rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.logger, Route::LocalComponent(ComponentId::Native), predicate.clone(), ); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; - if let Some(_existing) = branching_native.branches.insert(predicate, branch) { + if let Some(_) = branching_native.branches.insert(predicate, branch) { + // thanks to the native_branch_spec_var, each batch has a distinct predicate unreachable!() - // return Err(Se::IndistinguishableBatches([index, existing.index])); } } // restore the invariant @@ -287,9 +292,7 @@ impl Connector { comm, &mut branching_native, &mut branching_proto_components, - solution_storage, - payloads_to_get, - deadline, + &mut rctx, )?; log!(cu.logger, "Committing to decision {:?}!", &decision); @@ -340,9 +343,7 @@ impl Connector { comm: &mut ConnectorCommunication, branching_native: &mut BranchingNative, branching_proto_components: &mut HashMap, - mut solution_storage: SolutionStorage, - mut payloads_to_get: Vec<(PortId, SendPayloadMsg)>, - mut deadline: Option, + rctx: &mut RoundCtx, ) -> Result { let mut already_requested_failure = false; if branching_native.branches.is_empty() { @@ -379,8 +380,7 @@ impl Connector { BranchingProtoComponent::drain_branches_to_blocked( cd, cu, - &mut solution_storage, - &mut payloads_to_get, + rctx, proto_component_id, ports, )?; @@ -406,8 +406,8 @@ impl Connector { comm.endpoint_manager.undelay_all(); 'undecided: loop { // drain payloads_to_get, sending them through endpoints / feeding them to components - log!(cu.logger, "Decision loop! have {} messages to recv", payloads_to_get.len()); - while let Some((getter, send_payload_msg)) = payloads_to_get.pop() { + log!(cu.logger, "Decision loop! have {} messages to recv", rctx.getter_buffer.len()); + while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() { assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); let route = cu.port_info.routes.get(&getter); log!(cu.logger, "Routing msg {:?} to {:?}", &send_payload_msg, &route); @@ -429,7 +429,7 @@ impl Connector { } Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg( cu, - &mut solution_storage, + &mut rctx.solution_storage, getter, &send_payload_msg, ), @@ -440,9 +440,8 @@ impl Connector { let proto_component_id = *proto_component_id; branching_component.feed_msg( cu, - &mut solution_storage, + rctx, proto_component_id, - &mut payloads_to_get, getter, &send_payload_msg, )?; @@ -478,7 +477,7 @@ impl Connector { // check if we have a solution yet log!(cu.logger, "Check if we have any local decisions..."); - for solution in solution_storage.iter_new_local_make_old() { + for solution in rctx.solution_storage.iter_new_local_make_old() { log!(cu.logger, "New local decision with solution {:?}...", &solution); match comm.neighborhood.parent { Some(parent) => { @@ -502,7 +501,10 @@ impl Connector { log!(cu.logger, "No decision yet. Let's recv an endpoint msg..."); { let (endpoint_index, msg) = loop { - match comm.endpoint_manager.try_recv_any_comms(&mut *cu.logger, deadline)? { + match comm + .endpoint_manager + .try_recv_any_comms(&mut *cu.logger, rctx.deadline)? + { None => { log!(cu.logger, "Reached user-defined deadling without decision..."); if let Some(parent) = comm.neighborhood.parent { @@ -515,7 +517,7 @@ impl Connector { log!(cu.logger, "As the leader, deciding on timeout"); return Ok(Decision::Failure); } - deadline = None; + rctx.deadline = None; } Some((endpoint_index, msg)) => break (endpoint_index, msg), } @@ -562,7 +564,7 @@ impl Connector { "Msg routed to getter port {:?}. Buffer for recv loop", getter, ); - payloads_to_get.push((getter, send_payload_msg)); + rctx.getter_buffer.getter_add(getter, send_payload_msg); } CommMsgContents::Suggest { suggestion } => { // only accept this control msg through a child endpoint @@ -572,7 +574,7 @@ impl Connector { // child solution contributes to local solution log!(cu.logger, "Child provided solution {:?}", &predicate); let route = Route::Endpoint { index: endpoint_index }; - solution_storage.submit_and_digest_subtree_solution( + rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.logger, route, predicate, @@ -753,13 +755,13 @@ impl BranchingProtoComponent { fn drain_branches_to_blocked( cd: CyclicDrainer, cu: &mut ConnectorUnphased, - solution_storage: &mut SolutionStorage, - payload_msg_sender: &mut impl PayloadMsgSender, + rctx: &mut RoundCtx, proto_component_id: ProtoComponentId, ports: &HashSet, ) -> Result<(), SyncError> { cd.cylic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { + untaken_choice: &mut branch.untaken_choice, logger: &mut *cu.logger, predicate: &predicate, port_info: &cu.port_info, @@ -775,6 +777,15 @@ impl BranchingProtoComponent { ); use SyncBlocker as B; match blocker { + B::NondetChoice { n } => { + let var = rctx.spec_var_stream.next(); + for val in SpecVal::iter_domain().take(n as usize) { + let pred = predicate.clone().inserted(var, val); + let mut branch_n = branch.clone(); + branch_n.untaken_choice = Some(val.0); + drainer.add_input(pred, branch_n); + } + } B::Inconsistent => { // branch is inconsistent. throw it away drop((predicate, branch)); @@ -786,7 +797,7 @@ impl BranchingProtoComponent { predicate.assigned.entry(var).or_insert(SpecVal::SILENT); } // submit solution for this component - solution_storage.submit_and_digest_subtree_solution( + rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.logger, Route::LocalComponent(ComponentId::Proto(proto_component_id)), predicate.clone(), @@ -822,7 +833,7 @@ impl BranchingProtoComponent { // keep in "unblocked" log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - payload_msg_sender.putter_send(cu, putter, msg)?; + rctx.getter_buffer.putter_add(cu, putter, msg)?; drainer.add_input(predicate, branch); } } @@ -833,9 +844,8 @@ impl BranchingProtoComponent { fn feed_msg( &mut self, cu: &mut ConnectorUnphased, - solution_storage: &mut SolutionStorage, + rctx: &mut RoundCtx, proto_component_id: ProtoComponentId, - payload_msg_sender: &mut impl PayloadMsgSender, getter: PortId, send_payload_msg: &SendPayloadMsg, ) -> Result<(), SyncError> { @@ -898,8 +908,7 @@ impl BranchingProtoComponent { BranchingProtoComponent::drain_branches_to_blocked( cd, cu, - solution_storage, - payload_msg_sender, + rctx, proto_component_id, ports, )?; @@ -919,7 +928,12 @@ impl BranchingProtoComponent { panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); } fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { - let branch = ProtoComponentBranch { inbox: Default::default(), state, ended: false }; + let branch = ProtoComponentBranch { + inbox: Default::default(), + state, + ended: false, + untaken_choice: None, + }; Self { ports, branches: hashmap! { Predicate::default() => branch } } } } @@ -960,9 +974,6 @@ impl SolutionStorage { self.subtree_solutions.push(Default::default()) } } - // pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { - // self.new_local.iter() - // } pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { let Self { old_local, new_local, .. } = self; new_local.drain().map(move |local| { @@ -1024,15 +1035,24 @@ impl SolutionStorage { } } } -impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> { - fn putter_send( +impl GetterBuffer { + fn len(&self) -> usize { + self.getters_and_sends.len() + } + fn pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { + self.getters_and_sends.pop() + } + fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) { + self.getters_and_sends.push((getter, msg)); + } + fn putter_add( &mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg, ) -> Result<(), SyncError> { if let Some(&getter) = cu.port_info.peers.get(&putter) { - self.push((getter, msg)); + self.getter_add(getter, msg); Ok(()) } else { Err(SyncError::MalformedStateError(MalformedStateError::GetterUnknownFor { putter })) @@ -1047,6 +1067,9 @@ impl SyncProtoContext<'_> { pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { self.inbox.get(&port) } + pub(crate) fn take_choice(&mut self) -> Option { + self.untaken_choice.take() + } } impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { fn add_input(&mut self, k: K, v: V) { diff --git a/src/runtime/error.rs b/src/runtime/error.rs index 26e5792541c411071f4b4a98e6907138b9d23391..c156a4ac99d9a11ae70d463086107a12beb90b89 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -27,7 +27,6 @@ pub enum AddComponentError { pub enum SyncError { NotConnected, InconsistentProtoComponent(ProtoComponentId), - IndistinguishableBatches([usize; 2]), RoundFailure, PollFailed, BrokenEndpoint(usize), diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index cd2e179e27f9ca79d96f3972d9121b55177890a9..12cc34285eb35ebb65f10d2336aec215e407aab5 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -41,6 +41,7 @@ pub(crate) struct NonsyncProtoContext<'a> { } pub(crate) struct SyncProtoContext<'a> { logger: &'a mut dyn Logger, + untaken_choice: &'a mut Option, predicate: &'a Predicate, port_info: &'a PortInfo, inbox: &'a HashMap, diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 111b78c27c30c090af0255c18085ec52122ab3c5..40558cf2521a8f05239f3857864bc5f116a32709 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -8,6 +8,10 @@ use reowolf::{ }; use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration}; ////////////////////////////////////////// +const SEC1: Option = Some(Duration::from_secs(1)); +const SEC5: Option = Some(Duration::from_secs(5)); +const SEC15: Option = Some(Duration::from_secs(15)); +const MS300: Option = Some(Duration::from_millis(300)); fn next_test_addr() -> SocketAddr { use std::{ net::{Ipv4Addr, SocketAddrV4}, @@ -87,7 +91,7 @@ fn new_net_port() { fn trivial_connect() { let test_log_path = Path::new("./logs/trivial_connect"); let mut c = file_logged_connector(0, test_log_path); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); } #[test] @@ -97,7 +101,7 @@ fn single_node_connect() { let mut c = file_logged_connector(0, test_log_path); let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap(); let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); } #[test] @@ -108,12 +112,12 @@ fn minimal_net_connect() { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); let _ = c.new_net_port(Getter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); let _ = c.new_net_port(Putter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); }); }) .unwrap(); @@ -124,7 +128,7 @@ fn put_no_sync() { let test_log_path = Path::new("./logs/put_no_sync"); let mut c = file_logged_connector(0, test_log_path); let [o, _] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(o, TEST_MSG.clone()).unwrap(); } @@ -133,7 +137,7 @@ fn wrong_polarity_bad() { let test_log_path = Path::new("./logs/wrong_polarity_bad"); let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(i, TEST_MSG.clone()).unwrap_err(); } @@ -142,7 +146,7 @@ fn dup_put_bad() { let test_log_path = Path::new("./logs/dup_put_bad"); let mut c = file_logged_connector(0, test_log_path); let [o, _] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(o, TEST_MSG.clone()).unwrap(); c.put(o, TEST_MSG.clone()).unwrap_err(); } @@ -151,8 +155,8 @@ fn dup_put_bad() { fn trivial_sync() { let test_log_path = Path::new("./logs/trivial_sync"); let mut c = file_logged_connector(0, test_log_path); - c.connect(Some(Duration::from_secs(1))).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); + c.sync(SEC1).unwrap(); } #[test] @@ -168,7 +172,7 @@ fn connected_gotten_err_no_round() { let test_log_path = Path::new("./logs/connected_gotten_err_no_round"); let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); } @@ -177,8 +181,8 @@ fn connected_gotten_err_ungotten() { let test_log_path = Path::new("./logs/connected_gotten_err_ungotten"); let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); + c.sync(SEC1).unwrap(); assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err()); } @@ -187,7 +191,7 @@ fn native_polarity_checks() { let test_log_path = Path::new("./logs/native_polarity_checks"); let mut c = file_logged_connector(0, test_log_path); let [o, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); // fail... c.get(o).unwrap_err(); c.put(i, TEST_MSG.clone()).unwrap_err(); @@ -201,7 +205,7 @@ fn native_multiple_gets() { let test_log_path = Path::new("./logs/native_multiple_gets"); let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(i).unwrap(); c.get(i).unwrap_err(); } @@ -211,7 +215,7 @@ fn next_batch() { let test_log_path = Path::new("./logs/next_batch"); let mut c = file_logged_connector(0, test_log_path); c.next_batch().unwrap_err(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.next_batch().unwrap(); c.next_batch().unwrap(); c.next_batch().unwrap(); @@ -222,10 +226,10 @@ fn native_self_msg() { let test_log_path = Path::new("./logs/native_self_msg"); let mut c = file_logged_connector(0, test_log_path); let [o, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(i).unwrap(); c.put(o, TEST_MSG.clone()).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); } #[test] @@ -236,17 +240,17 @@ fn two_natives_msg() { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); let g = c.new_net_port(Getter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); c.gotten(g).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); let p = c.new_net_port(Putter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); }); }) .unwrap(); @@ -257,12 +261,12 @@ fn trivial_nondet() { let test_log_path = Path::new("./logs/trivial_nondet"); let mut c = file_logged_connector(0, test_log_path); let [_, i] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(i).unwrap(); // getting 0 batch c.next_batch().unwrap(); // silent 1 batch - assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap()); + assert_eq!(1, c.sync(SEC1).unwrap()); c.gotten(i).unwrap_err(); } @@ -274,18 +278,18 @@ fn connector_pair_nondet() { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); let g = c.new_net_port(Getter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.next_batch().unwrap(); c.get(g).unwrap(); - assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap()); + assert_eq!(1, c.sync(SEC1).unwrap()); c.gotten(g).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); let p = c.new_net_port(Putter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); }); }) .unwrap(); @@ -296,9 +300,9 @@ fn native_immediately_inconsistent() { let test_log_path = Path::new("./logs/native_immediately_inconsistent"); let mut c = file_logged_connector(0, test_log_path); let [_, g] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_secs(30))).unwrap_err(); + c.sync(SEC15).unwrap_err(); } #[test] @@ -306,12 +310,12 @@ fn native_recovers() { let test_log_path = Path::new("./logs/native_recovers"); let mut c = file_logged_connector(0, test_log_path); let [p, g] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_secs(30))).unwrap_err(); + c.sync(SEC15).unwrap_err(); c.put(p, TEST_MSG.clone()).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_secs(30))).unwrap(); + c.sync(SEC15).unwrap(); } #[test] @@ -323,7 +327,7 @@ fn cannot_use_moved_ports() { let mut c = file_logged_connector(0, test_log_path); let [p, g] = c.new_port_pair(); c.add_component(b"sync", &[g, p]).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap_err(); c.get(g).unwrap_err(); } @@ -339,10 +343,10 @@ fn sync_sync() { let [p0, g0] = c.new_port_pair(); let [p1, g1] = c.new_port_pair(); c.add_component(b"sync", &[g0, p1]).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p0, TEST_MSG.clone()).unwrap(); c.get(g1).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); c.gotten(g1).unwrap(); } @@ -357,7 +361,7 @@ fn double_net_connect() { c.new_net_port(Putter, sock_addrs[0], Active).unwrap(), c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), ]; - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); @@ -365,7 +369,7 @@ fn double_net_connect() { c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), ]; - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); }); }) .unwrap(); @@ -391,8 +395,8 @@ fn distributed_msg_bounce() { c.new_net_port(Getter, sock_addrs[1], Active).unwrap(), ]; c.add_component(b"sync", &[g, p]).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); + c.sync(SEC1).unwrap(); }); s.spawn(|_| { /* @@ -404,10 +408,10 @@ fn distributed_msg_bounce() { c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(), c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(), ]; - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_secs(1))).unwrap(); + c.sync(SEC1).unwrap(); c.gotten(g).unwrap(); }); }) @@ -419,9 +423,9 @@ fn local_timeout() { let test_log_path = Path::new("./logs/local_timeout"); let mut c = file_logged_connector(0, test_log_path); let [_, g] = c.new_port_pair(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); - match c.sync(Some(Duration::from_millis(200))) { + match c.sync(MS300) { Err(SyncError::RoundFailure) => {} res => panic!("expeted timeout. but got {:?}", res), } @@ -436,14 +440,14 @@ fn parent_timeout() { // parent; times out let mut c = file_logged_connector(999, test_log_path); let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); - c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout + c.connect(SEC1).unwrap(); + c.sync(MS300).unwrap_err(); // timeout }); s.spawn(|_| { // child let mut c = file_logged_connector(000, test_log_path); let g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); // not matched by put c.sync(None).unwrap_err(); // no timeout }); @@ -460,14 +464,14 @@ fn child_timeout() { // child; times out let mut c = file_logged_connector(000, test_log_path); let _ = c.new_net_port(Putter, sock_addr, Active).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); - c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout + c.connect(SEC1).unwrap(); + c.sync(MS300).unwrap_err(); // timeout }); s.spawn(|_| { // parent let mut c = file_logged_connector(999, test_log_path); let g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.get(g).unwrap(); // not matched by put c.sync(None).unwrap_err(); // no timeout }); @@ -483,31 +487,31 @@ fn chain_connect() { s.spawn(|_| { let mut c = file_logged_connector(0, test_log_path); c.new_net_port(Putter, sock_addrs[0], Passive).unwrap(); - c.connect(Some(Duration::from_secs(2))).unwrap(); + c.connect(SEC5).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(10, test_log_path); c.new_net_port(Getter, sock_addrs[0], Active).unwrap(); c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(); - c.connect(Some(Duration::from_secs(2))).unwrap(); + c.connect(SEC5).unwrap(); }); s.spawn(|_| { // LEADER let mut c = file_logged_connector(7, test_log_path); c.new_net_port(Getter, sock_addrs[1], Active).unwrap(); c.new_net_port(Putter, sock_addrs[2], Passive).unwrap(); - c.connect(Some(Duration::from_secs(2))).unwrap(); + c.connect(SEC5).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(4, test_log_path); c.new_net_port(Getter, sock_addrs[2], Active).unwrap(); c.new_net_port(Putter, sock_addrs[3], Passive).unwrap(); - c.connect(Some(Duration::from_secs(2))).unwrap(); + c.connect(SEC5).unwrap(); }); s.spawn(|_| { let mut c = file_logged_connector(1, test_log_path); c.new_net_port(Getter, sock_addrs[3], Active).unwrap(); - c.connect(Some(Duration::from_secs(2))).unwrap(); + c.connect(SEC5).unwrap(); }); }) .unwrap(); @@ -520,10 +524,10 @@ fn net_self_loop() { let mut c = file_logged_connector(0, test_log_path); let p = c.new_net_port(Putter, sock_addr, Active).unwrap(); let g = c.new_net_port(Getter, sock_addr, Passive).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p, TEST_MSG.clone()).unwrap(); c.get(g).unwrap(); - c.sync(Some(Duration::from_millis(500))).unwrap(); + c.sync(MS300).unwrap(); } #[test] @@ -555,10 +559,10 @@ fn together() { let p3 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); let [p4, p5] = c.new_port_pair(); c.add_component(b"together", &[p1, p2, p3, p4]).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p0, TEST_MSG.clone()).unwrap(); c.get(p5).unwrap(); - c.sync(Some(Duration::from_millis(500))).unwrap(); + c.sync(MS300).unwrap(); c.gotten(p5).unwrap(); }); s.spawn(|_| { @@ -568,10 +572,10 @@ fn together() { let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); let [p4, p5] = c.new_port_pair(); c.add_component(b"together", &[p1, p2, p3, p4]).unwrap(); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.put(p0, TEST_MSG.clone()).unwrap(); c.get(p5).unwrap(); - c.sync(Some(Duration::from_millis(500))).unwrap(); + c.sync(MS300).unwrap(); c.gotten(p5).unwrap(); }); }) @@ -582,7 +586,74 @@ fn together() { fn native_batch_distinguish() { let test_log_path = Path::new("./logs/native_batch_distinguish"); let mut c = file_logged_connector(0, test_log_path); - c.connect(Some(Duration::from_secs(1))).unwrap(); + c.connect(SEC1).unwrap(); c.next_batch().unwrap(); - c.sync(Some(Duration::from_secs(3))).unwrap(); + c.sync(SEC1).unwrap(); +} + +#[test] +fn multirounds() { + let test_log_path = Path::new("./logs/multirounds"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); + c.connect(SEC1).unwrap(); + for _ in 0..10 { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(p1).unwrap(); + c.sync(SEC1).unwrap(); + } + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); + c.connect(SEC1).unwrap(); + for _ in 0..10 { + c.get(p0).unwrap(); + c.put(p1, TEST_MSG.clone()).unwrap(); + c.sync(SEC1).unwrap(); + } + }); + }) + .unwrap(); +} + +#[test] +fn multi_recover() { + let test_log_path = Path::new("./logs/multi_recover"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let success_iter = [true, false].iter().copied().cycle().take(10); + scope(|s| { + s.spawn(|_| { + let mut c = file_logged_connector(0, test_log_path); + let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap(); + let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap(); + c.connect(SEC1).unwrap(); + for succeeds in success_iter.clone() { + c.put(p0, TEST_MSG.clone()).unwrap(); + if succeeds { + c.get(p1).unwrap(); + } + let res = c.sync(MS300); + assert_eq!(res.is_ok(), succeeds); + } + }); + s.spawn(|_| { + let mut c = file_logged_connector(1, test_log_path); + let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(); + let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap(); + c.connect(SEC1).unwrap(); + for succeeds in success_iter.clone() { + c.get(p0).unwrap(); + c.put(p1, TEST_MSG.clone()).unwrap(); + let res = c.sync(MS300); + assert_eq!(res.is_ok(), succeeds); + } + }); + }) + .unwrap(); }