diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 839cc539d37d3e5071e143f6af6f952206b034a6..266e9c33be61cf7606ee988fbbd048d77e02a07d 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -1,6 +1,5 @@ use super::*; use crate::common::*; -use core::marker::PhantomData; //////////////// struct BranchingNative { @@ -11,6 +10,7 @@ struct NativeBranch { gotten: HashMap, to_get: HashSet, } +#[derive(Debug)] struct SolutionStorage { old_local: HashSet, new_local: HashSet, @@ -85,6 +85,17 @@ impl SyncProtoContext<'_> { } impl Connector { + pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { + use GottenError::*; + match &mut self.phased { + ConnectorPhased::Setup { .. } => Err(NoPreviousRound), + ConnectorPhased::Communication { round_result, .. } => match round_result { + Err(_) => Err(PreviousSyncFailed), + Ok(None) => Err(NoPreviousRound), + Ok(Some((_index, gotten))) => gotten.get(&port).ok_or(PortDidntGet), + }, + } + } pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> { use PortOpError::*; if !self.native_ports.contains(&port) { @@ -105,20 +116,42 @@ impl Connector { } } } - pub fn sync(&mut self) -> Result { + pub fn sync(&mut self, timeout: Duration) -> Result { use SyncError::*; match &mut self.phased { ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication { native_batches, endpoint_manager, .. } => { + ConnectorPhased::Communication { + round_index, + neighborhood, + native_batches, + endpoint_manager, + round_result, + .. + } => { + let _deadline = Instant::now() + timeout; + let logger: &mut dyn Logger = &mut *self.logger; // 1. run all proto components to Nonsync blockers + log!( + logger, + "~~~ SYNC called with timeout {:?}; starting round {}", + &timeout, + round_index + ); let mut branching_proto_components = HashMap::::default(); let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); + log!(logger, "Nonsync running {} proto components...", unrun_components.len()); while let Some((proto_component_id, mut component)) = unrun_components.pop() { // TODO coalesce fields + log!( + logger, + "Nonsync running proto component with ID {:?}. {} to go after this", + proto_component_id, + unrun_components.len() + ); let mut ctx = NonsyncProtoContext { - logger: &mut *self.logger, + logger: &mut *logger, port_info: &mut self.port_info, id_manager: &mut self.id_manager, proto_component_id, @@ -129,8 +162,15 @@ impl Connector { .unwrap() .ports, }; + let blocker = component.state.nonsync_run(&mut ctx, &self.proto_description); + log!( + logger, + "proto component {:?} ran to nonsync blocker {:?}", + proto_component_id, + &blocker + ); use NonsyncBlocker as B; - match component.state.nonsync_run(&mut ctx, &self.proto_description) { + match blocker { B::ComponentExit => drop(component), B::Inconsistent => { return Err(InconsistentProtoComponent(proto_component_id)) @@ -143,51 +183,87 @@ impl Connector { } } } + log!( + logger, + "All {} proto components are now done with Nonsync phase", + branching_proto_components.len(), + ); - // (Putter, ) + // NOTE: all msgs in outbox are of form (Putter, Payload) let mut payload_outbox: Vec<(PortId, SendPayloadMsg)> = vec![]; + // create the solution storage + let mut solution_storage = { + let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native)); + let c = self + .proto_components + .keys() + .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id))); + let e = (0..endpoint_manager.endpoint_exts.len()) + .map(|index| Route::Endpoint { index }); + SolutionStorage::new(n.chain(c).chain(e)) + }; + log!(logger, "Solution storage initialized"); + // 2. kick off the native + log!( + logger, + "Translating {} native batches into branches...", + native_batches.len() + ); let mut branching_native = BranchingNative { branches: Default::default() }; for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate() { - let mut predicate = Predicate::default(); - // assign trues - for &port in to_get.iter().chain(to_put.keys()) { - predicate.assigned.insert(port, true); - } - // assign falses - for &port in self.native_ports.iter() { - predicate.assigned.entry(port).or_insert(false); - } + let predicate = { + let mut predicate = Predicate::default(); + // assign trues + for &port in to_get.iter().chain(to_put.keys()) { + predicate.assigned.insert(port, true); + } + // assign falses + for &port in self.native_ports.iter() { + predicate.assigned.entry(port).or_insert(false); + } + predicate + }; + log!(logger, "Native branch {} has pred {:?}", index, &predicate); + // put all messages for (port, payload) in to_put { - payload_outbox.push(( - port, - SendPayloadMsg { payload_predicate: predicate.clone(), payload }, - )); + let msg = SendPayloadMsg { payload_predicate: predicate.clone(), payload }; + log!(logger, "Native branch {} sending msg {:?}", index, &msg); + payload_outbox.push((port, msg)); + } + if to_get.is_empty() { + log!(logger, "Native submitting trivial solution for index {}", index); + solution_storage.submit_and_digest_subtree_solution( + logger, + Route::LocalComponent(LocalComponentId::Native), + Predicate::default(), + ); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; if let Some(existing) = branching_native.branches.insert(predicate, branch) { + // TODO return Err(IndistinguishableBatches([index, existing.index])); } } - - // create the solution storage - let mut solution_storage = { - let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native)); - let c = self - .proto_components - .keys() - .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id))); - let e = (0..endpoint_manager.endpoint_exts.len()) - .map(|index| Route::Endpoint { index }); - SolutionStorage::new(n.chain(c).chain(e)) - }; + log!(logger, "Done translating native batches into branches"); + native_batches.push(Default::default()); // run all proto components to their sync blocker + log!( + logger, + "Running all {} proto components to their sync blocker...", + branching_proto_components.len() + ); for (proto_component_id, proto_component) in branching_proto_components.iter_mut() { // run this component to sync blocker in-place + log!( + logger, + "Running proto component with id {:?} to blocker...", + proto_component_id + ); let blocked = &mut proto_component.branches; let [unblocked_from, unblocked_to] = [ &mut HashMap::::default(), @@ -198,15 +274,22 @@ impl Connector { while !unblocked_from.is_empty() { for (mut predicate, mut branch) in unblocked_from.drain() { let mut ctx = SyncProtoContext { - logger: &mut *self.logger, + logger, predicate: &predicate, proto_component_id: *proto_component_id, inbox: &branch.inbox, }; + let blocker = branch.state.sync_run(&mut ctx, &self.proto_description); + log!( + logger, + "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", + proto_component_id, + &predicate, + &blocker, + ); use SyncBlocker as B; - match branch.state.sync_run(&mut ctx, &self.proto_description) { + match blocker { B::Inconsistent => { - log!(self.logger, "Proto component {:?} branch with pred {:?} became inconsistent", proto_component_id, &predicate); // branch is inconsistent. throw it away drop((predicate, branch)); } @@ -216,9 +299,8 @@ impl Connector { predicate.assigned.entry(port).or_insert(false); } // submit solution for this component - log!(self.logger, "Proto component {:?} branch with pred {:?} reached SyncBlockEnd", proto_component_id, &predicate); solution_storage.submit_and_digest_subtree_solution( - &mut *self.logger, + logger, Route::LocalComponent(LocalComponentId::Proto( *proto_component_id, )), @@ -249,13 +331,15 @@ impl Connector { assert_eq!(Some(&Putter), self.port_info.polarities.get(&port)); // overwrite assignment let var = self.port_info.firing_var_for(port); + let was = predicate.assigned.insert(var, true); if was == Some(false) { - log!(self.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var); + log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" + log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, port, var); payload_outbox.push(( port, SendPayloadMsg { @@ -271,24 +355,98 @@ impl Connector { std::mem::swap(unblocked_from, unblocked_to); } } - // now all components are blocked! - // + log!(logger, "All proto components are blocked"); + log!(logger, "Entering decision loop..."); let decision = 'undecided: loop { // check if we already have a solution - for _solution in solution_storage.iter_new_local_make_old() { - // todo check if parent, inform children etc. etc. - break 'undecided Ok(0); + log!(logger, "Check if we have any local decisions..."); + for solution in solution_storage.iter_new_local_make_old() { + log!(logger, "New local decision with solution {:?}...", &solution); + match neighborhood.parent { + Some(parent) => { + log!(logger, "Forwarding to my parent {:?}", parent); + let msg = Msg::CommMsg(CommMsg { + round_index: *round_index, + contents: CommMsgContents::Elaborate { + partial_oracle: solution, + }, + }); + endpoint_manager.send_to(parent, &msg).unwrap(); + } + None => { + log!(logger, "No parent. Deciding on solution {:?}", &solution); + break 'undecided Decision::Success(solution); + } + } } - // send / recv messages + // TODO send / recv messages }; - decision + log!(logger, "Committing to decision {:?}!", &decision); + + // propagate the decision to children + let msg = Msg::CommMsg(CommMsg { + round_index: *round_index, + contents: CommMsgContents::Announce { decision: decision.clone() }, + }); + log!( + logger, + "Announcing decision {:?} through child endpoints {:?}", + &msg, + &neighborhood.children + ); + for &child in neighborhood.children.iter() { + endpoint_manager.send_to(child, &msg).unwrap(); + } + + *round_result = match decision { + Decision::Failure => Err(DistributedTimeout), + Decision::Success(predicate) => { + // commit changes to component states + self.proto_components.clear(); + self.proto_components.extend( + branching_proto_components + .into_iter() + .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), + ); + Ok(Some(branching_native.collapse_with(&predicate))) + } + }; + log!(logger, "Updated round_result to {:?}", round_result); + + let returning = round_result + .as_ref() + .map(|option| option.as_ref().unwrap().0) + .map_err(|sync_error| sync_error.clone()); + log!(logger, "Returning {:?}", &returning); + returning } } } } +impl BranchingNative { + fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap) { + for (branch_predicate, branch) in self.branches { + if branch_predicate.satisfies(solution_predicate) { + let NativeBranch { index, gotten, .. } = branch; + return (index, gotten); + } + } + panic!("Native had no branches matching pred {:?}", solution_predicate); + } +} impl BranchingProtoComponent { + fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { + let BranchingProtoComponent { ports, branches } = self; + for (branch_predicate, branch) in branches { + if branch_predicate.satisfies(solution_predicate) { + let ProtoComponentBranch { state, .. } = branch; + return ProtoComponent { state, ports }; + } + } + panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); + } fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { let branch = ProtoComponentBranch { inbox: Default::default(), state }; Self { ports, branches: hashmap! { Predicate::default() => branch } } @@ -393,7 +551,7 @@ impl SolutionStorage { // recursive stop condition. `partial` is a local subtree solution if !old_local.contains(&partial) { // ... and it hasn't been found before - log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); + log!(logger, "storing NEW LOCAL SOLUTION {:?}", &partial); new_local.insert(partial); } }