From 3f6d45f84e7610990201e14b210a12bdfe3cb6fe 2020-06-23 09:30:20 From: Christopher Esterhuyse Date: 2020-06-23 09:30:20 Subject: [PATCH] trivial sync reintegrated. debug improvements. more logging --- diff --git a/src/lib.rs b/src/lib.rs index 755f30dc3d0c8e85f53e394469426a8fcb150bf1..a9014653332517454160244edebd54fb3ebb64be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ mod runtime; pub use common::Polarity; pub use protocol::ProtocolDescription; -pub use runtime::{Connector, EndpointSetup, StringLogger}; +pub use runtime::{error, Connector, EndpointSetup, StringLogger}; // #[cfg(feature = "ffi")] // pub use runtime::ffi; diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 839cc539d37d3e5071e143f6af6f952206b034a6..266e9c33be61cf7606ee988fbbd048d77e02a07d 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -1,6 +1,5 @@ use super::*; use crate::common::*; -use core::marker::PhantomData; //////////////// struct BranchingNative { @@ -11,6 +10,7 @@ struct NativeBranch { gotten: HashMap, to_get: HashSet, } +#[derive(Debug)] struct SolutionStorage { old_local: HashSet, new_local: HashSet, @@ -85,6 +85,17 @@ impl SyncProtoContext<'_> { } impl Connector { + pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { + use GottenError::*; + match &mut self.phased { + ConnectorPhased::Setup { .. } => Err(NoPreviousRound), + ConnectorPhased::Communication { round_result, .. } => match round_result { + Err(_) => Err(PreviousSyncFailed), + Ok(None) => Err(NoPreviousRound), + Ok(Some((_index, gotten))) => gotten.get(&port).ok_or(PortDidntGet), + }, + } + } pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> { use PortOpError::*; if !self.native_ports.contains(&port) { @@ -105,20 +116,42 @@ impl Connector { } } } - pub fn sync(&mut self) -> Result { + pub fn sync(&mut self, timeout: Duration) -> Result { use SyncError::*; match &mut self.phased { ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication { native_batches, endpoint_manager, .. } => { + ConnectorPhased::Communication { + round_index, + neighborhood, + native_batches, + endpoint_manager, + round_result, + .. + } => { + let _deadline = Instant::now() + timeout; + let logger: &mut dyn Logger = &mut *self.logger; // 1. run all proto components to Nonsync blockers + log!( + logger, + "~~~ SYNC called with timeout {:?}; starting round {}", + &timeout, + round_index + ); let mut branching_proto_components = HashMap::::default(); let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); + log!(logger, "Nonsync running {} proto components...", unrun_components.len()); while let Some((proto_component_id, mut component)) = unrun_components.pop() { // TODO coalesce fields + log!( + logger, + "Nonsync running proto component with ID {:?}. {} to go after this", + proto_component_id, + unrun_components.len() + ); let mut ctx = NonsyncProtoContext { - logger: &mut *self.logger, + logger: &mut *logger, port_info: &mut self.port_info, id_manager: &mut self.id_manager, proto_component_id, @@ -129,8 +162,15 @@ impl Connector { .unwrap() .ports, }; + let blocker = component.state.nonsync_run(&mut ctx, &self.proto_description); + log!( + logger, + "proto component {:?} ran to nonsync blocker {:?}", + proto_component_id, + &blocker + ); use NonsyncBlocker as B; - match component.state.nonsync_run(&mut ctx, &self.proto_description) { + match blocker { B::ComponentExit => drop(component), B::Inconsistent => { return Err(InconsistentProtoComponent(proto_component_id)) @@ -143,51 +183,87 @@ impl Connector { } } } + log!( + logger, + "All {} proto components are now done with Nonsync phase", + branching_proto_components.len(), + ); - // (Putter, ) + // NOTE: all msgs in outbox are of form (Putter, Payload) let mut payload_outbox: Vec<(PortId, SendPayloadMsg)> = vec![]; + // create the solution storage + let mut solution_storage = { + let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native)); + let c = self + .proto_components + .keys() + .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id))); + let e = (0..endpoint_manager.endpoint_exts.len()) + .map(|index| Route::Endpoint { index }); + SolutionStorage::new(n.chain(c).chain(e)) + }; + log!(logger, "Solution storage initialized"); + // 2. kick off the native + log!( + logger, + "Translating {} native batches into branches...", + native_batches.len() + ); let mut branching_native = BranchingNative { branches: Default::default() }; for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate() { - let mut predicate = Predicate::default(); - // assign trues - for &port in to_get.iter().chain(to_put.keys()) { - predicate.assigned.insert(port, true); - } - // assign falses - for &port in self.native_ports.iter() { - predicate.assigned.entry(port).or_insert(false); - } + let predicate = { + let mut predicate = Predicate::default(); + // assign trues + for &port in to_get.iter().chain(to_put.keys()) { + predicate.assigned.insert(port, true); + } + // assign falses + for &port in self.native_ports.iter() { + predicate.assigned.entry(port).or_insert(false); + } + predicate + }; + log!(logger, "Native branch {} has pred {:?}", index, &predicate); + // put all messages for (port, payload) in to_put { - payload_outbox.push(( - port, - SendPayloadMsg { payload_predicate: predicate.clone(), payload }, - )); + let msg = SendPayloadMsg { payload_predicate: predicate.clone(), payload }; + log!(logger, "Native branch {} sending msg {:?}", index, &msg); + payload_outbox.push((port, msg)); + } + if to_get.is_empty() { + log!(logger, "Native submitting trivial solution for index {}", index); + solution_storage.submit_and_digest_subtree_solution( + logger, + Route::LocalComponent(LocalComponentId::Native), + Predicate::default(), + ); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; if let Some(existing) = branching_native.branches.insert(predicate, branch) { + // TODO return Err(IndistinguishableBatches([index, existing.index])); } } - - // create the solution storage - let mut solution_storage = { - let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native)); - let c = self - .proto_components - .keys() - .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id))); - let e = (0..endpoint_manager.endpoint_exts.len()) - .map(|index| Route::Endpoint { index }); - SolutionStorage::new(n.chain(c).chain(e)) - }; + log!(logger, "Done translating native batches into branches"); + native_batches.push(Default::default()); // run all proto components to their sync blocker + log!( + logger, + "Running all {} proto components to their sync blocker...", + branching_proto_components.len() + ); for (proto_component_id, proto_component) in branching_proto_components.iter_mut() { // run this component to sync blocker in-place + log!( + logger, + "Running proto component with id {:?} to blocker...", + proto_component_id + ); let blocked = &mut proto_component.branches; let [unblocked_from, unblocked_to] = [ &mut HashMap::::default(), @@ -198,15 +274,22 @@ impl Connector { while !unblocked_from.is_empty() { for (mut predicate, mut branch) in unblocked_from.drain() { let mut ctx = SyncProtoContext { - logger: &mut *self.logger, + logger, predicate: &predicate, proto_component_id: *proto_component_id, inbox: &branch.inbox, }; + let blocker = branch.state.sync_run(&mut ctx, &self.proto_description); + log!( + logger, + "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", + proto_component_id, + &predicate, + &blocker, + ); use SyncBlocker as B; - match branch.state.sync_run(&mut ctx, &self.proto_description) { + match blocker { B::Inconsistent => { - log!(self.logger, "Proto component {:?} branch with pred {:?} became inconsistent", proto_component_id, &predicate); // branch is inconsistent. throw it away drop((predicate, branch)); } @@ -216,9 +299,8 @@ impl Connector { predicate.assigned.entry(port).or_insert(false); } // submit solution for this component - log!(self.logger, "Proto component {:?} branch with pred {:?} reached SyncBlockEnd", proto_component_id, &predicate); solution_storage.submit_and_digest_subtree_solution( - &mut *self.logger, + logger, Route::LocalComponent(LocalComponentId::Proto( *proto_component_id, )), @@ -249,13 +331,15 @@ impl Connector { assert_eq!(Some(&Putter), self.port_info.polarities.get(&port)); // overwrite assignment let var = self.port_info.firing_var_for(port); + let was = predicate.assigned.insert(var, true); if was == Some(false) { - log!(self.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var); + log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" + log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, port, var); payload_outbox.push(( port, SendPayloadMsg { @@ -271,24 +355,98 @@ impl Connector { std::mem::swap(unblocked_from, unblocked_to); } } - // now all components are blocked! - // + log!(logger, "All proto components are blocked"); + log!(logger, "Entering decision loop..."); let decision = 'undecided: loop { // check if we already have a solution - for _solution in solution_storage.iter_new_local_make_old() { - // todo check if parent, inform children etc. etc. - break 'undecided Ok(0); + log!(logger, "Check if we have any local decisions..."); + for solution in solution_storage.iter_new_local_make_old() { + log!(logger, "New local decision with solution {:?}...", &solution); + match neighborhood.parent { + Some(parent) => { + log!(logger, "Forwarding to my parent {:?}", parent); + let msg = Msg::CommMsg(CommMsg { + round_index: *round_index, + contents: CommMsgContents::Elaborate { + partial_oracle: solution, + }, + }); + endpoint_manager.send_to(parent, &msg).unwrap(); + } + None => { + log!(logger, "No parent. Deciding on solution {:?}", &solution); + break 'undecided Decision::Success(solution); + } + } } - // send / recv messages + // TODO send / recv messages }; - decision + log!(logger, "Committing to decision {:?}!", &decision); + + // propagate the decision to children + let msg = Msg::CommMsg(CommMsg { + round_index: *round_index, + contents: CommMsgContents::Announce { decision: decision.clone() }, + }); + log!( + logger, + "Announcing decision {:?} through child endpoints {:?}", + &msg, + &neighborhood.children + ); + for &child in neighborhood.children.iter() { + endpoint_manager.send_to(child, &msg).unwrap(); + } + + *round_result = match decision { + Decision::Failure => Err(DistributedTimeout), + Decision::Success(predicate) => { + // commit changes to component states + self.proto_components.clear(); + self.proto_components.extend( + branching_proto_components + .into_iter() + .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), + ); + Ok(Some(branching_native.collapse_with(&predicate))) + } + }; + log!(logger, "Updated round_result to {:?}", round_result); + + let returning = round_result + .as_ref() + .map(|option| option.as_ref().unwrap().0) + .map_err(|sync_error| sync_error.clone()); + log!(logger, "Returning {:?}", &returning); + returning } } } } +impl BranchingNative { + fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap) { + for (branch_predicate, branch) in self.branches { + if branch_predicate.satisfies(solution_predicate) { + let NativeBranch { index, gotten, .. } = branch; + return (index, gotten); + } + } + panic!("Native had no branches matching pred {:?}", solution_predicate); + } +} impl BranchingProtoComponent { + fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { + let BranchingProtoComponent { ports, branches } = self; + for (branch_predicate, branch) in branches { + if branch_predicate.satisfies(solution_predicate) { + let ProtoComponentBranch { state, .. } = branch; + return ProtoComponent { state, ports }; + } + } + panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); + } fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { let branch = ProtoComponentBranch { inbox: Default::default(), state }; Self { ports, branches: hashmap! { Predicate::default() => branch } } @@ -393,7 +551,7 @@ impl SolutionStorage { // recursive stop condition. `partial` is a local subtree solution if !old_local.contains(&partial) { // ... and it hasn't been found before - log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); + log!(logger, "storing NEW LOCAL SOLUTION {:?}", &partial); new_local.insert(partial); } } diff --git a/src/runtime/error.rs b/src/runtime/error.rs index 7de8323c2e0d468fbcf03a6b55ef1b8dd1d0b437..8b526dad37c198333c59a79c0890956fd121aaad 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -12,12 +12,13 @@ pub enum TryRecyAnyError { EndpointError { error: EndpointError, index: usize }, BrokenEndpoint(usize), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum SyncError { Timeout, NotConnected, InconsistentProtoComponent(ProtoComponentId), IndistinguishableBatches([usize; 2]), + DistributedTimeout, } #[derive(Debug)] pub enum PortOpError { @@ -26,3 +27,9 @@ pub enum PortOpError { MultipleOpsOnPort, PortUnavailable, } +#[derive(Debug, Eq, PartialEq)] +pub enum GottenError { + NoPreviousRound, + PortDidntGet, + PreviousSyncFailed, +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 07bf1cb62b5e520d2d9356c9bc475e1cc4f7d19b..2223cf6be13fcbd670fa28299647756c2e8cea80 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,5 +1,5 @@ mod communication; -mod error; +pub mod error; mod setup2; #[cfg(test)] @@ -144,12 +144,12 @@ pub enum ConnectorPhased { neighborhood: Neighborhood, mem_inbox: Vec, native_batches: Vec, - round_result: Result, SyncError>, + round_result: Result)>, SyncError>, }, } #[derive(Debug)] pub struct StringLogger(ControllerId, String); -#[derive(Default, Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct Predicate { pub assigned: BTreeMap, } @@ -545,3 +545,24 @@ impl Predicate { self.assigned.get(&x).copied() } } +impl Debug for Predicate { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + struct MySet<'a>(&'a Predicate, bool); + impl Debug for MySet<'_> { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + let iter = self.0.assigned.iter().filter_map(|(port, &firing)| { + if firing == self.1 { + Some(port) + } else { + None + } + }); + f.debug_set().entries(iter).finish() + } + } + f.debug_struct("Predicate") + .field("Trues", &MySet(self, true)) + .field("Falses", &MySet(self, false)) + .finish() + } +} diff --git a/src/runtime/my_tests.rs b/src/runtime/my_tests.rs index d702b34971338a724d540e61cbbae627f120e604..b5d4a1ba92e524630d814722151f0cbb0ccecb23 100644 --- a/src/runtime/my_tests.rs +++ b/src/runtime/my_tests.rs @@ -113,3 +113,35 @@ fn dup_put_bad() { c.put(o, (b"hi" as &[_]).into()).unwrap(); c.put(o, (b"hi" as &[_]).into()).unwrap_err(); } + +#[test] +fn trivial_sync() { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + c.connect(Duration::from_secs(1)).unwrap(); + c.sync(Duration::from_secs(1)).unwrap(); + c.print_state(); +} + +#[test] +fn unconnected_gotten_err() { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let [_, i] = c.new_port_pair(); + assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); +} + +#[test] +fn connected_gotten_err_no_round() { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let [_, i] = c.new_port_pair(); + c.connect(Duration::from_secs(1)).unwrap(); + assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err()); +} + +#[test] +fn connected_gotten_err_ungotten() { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let [_, i] = c.new_port_pair(); + c.connect(Duration::from_secs(1)).unwrap(); + c.sync(Duration::from_secs(1)).unwrap(); + assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err()); +} diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs index 72524062d4f1c1b0b9470717abc0f9b7fae7a475..b3e3000e767f812aa3c417ba9f1a777abaf8016b 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup2.rs @@ -96,7 +96,7 @@ impl Connector { Err(()) } ConnectorPhased::Setup { endpoint_setups, .. } => { - log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout); + log!(self.logger, "~~~ CONNECT called with timeout {:?}", timeout); let deadline = Instant::now() + timeout; // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( @@ -394,9 +394,7 @@ fn init_neighborhood( ee.endpoint.send(msg)?; } let mut children = Vec::default(); - log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages); em.undelay_all(); - log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages); while !awaiting.is_empty() { log!(logger, "awaiting {:?}", &awaiting); let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;