diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index b38ef2db7bb7e121b63361fafcced1793eacac0c..ae4c9985bebf731f91caeaf5bec0a4158c378339 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -42,26 +42,30 @@ struct CyclicDrainInner<'a, K: Eq + Hash, V> { impl Connector { pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { use GottenError::*; - match &mut self.phased { + let Self { phased, .. } = self; + match phased { ConnectorPhased::Setup { .. } => Err(NoPreviousRound), - ConnectorPhased::Communication { round_result, .. } => match round_result { - Err(_) => Err(PreviousSyncFailed), - Ok(None) => Err(NoPreviousRound), - Ok(Some((_index, gotten))) => gotten.get(&port).ok_or(PortDidntGet), - }, + ConnectorPhased::Communication(ConnectorCommunication { round_result, .. }) => { + match round_result { + Err(_) => Err(PreviousSyncFailed), + Ok(None) => Err(NoPreviousRound), + Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet), + } + } } } pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> { use PortOpError::*; - if !self.native_ports.contains(&port) { + let Self { unphased, phased } = self; + if !unphased.native_ports.contains(&port) { return Err(PortUnavailable); } - if Putter != *self.port_info.polarities.get(&port).unwrap() { + if Putter != *unphased.port_info.polarities.get(&port).unwrap() { return Err(WrongPolarity); } - match &mut self.phased { + match phased { ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication { native_batches, .. } => { + ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => { let batch = native_batches.last_mut().unwrap(); if batch.to_put.contains_key(&port) { return Err(MultipleOpsOnPort); @@ -74,9 +78,10 @@ impl Connector { pub fn next_batch(&mut self) -> Result { // returns index of new batch use NextBatchError::*; - match &mut self.phased { + let Self { phased, .. } = self; + match phased { ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication { native_batches, .. } => { + ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => { native_batches.push(Default::default()); Ok(native_batches.len() - 1) } @@ -84,15 +89,16 @@ impl Connector { } pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> { use PortOpError::*; - if !self.native_ports.contains(&port) { + let Self { unphased, phased } = self; + if !unphased.native_ports.contains(&port) { return Err(PortUnavailable); } - if Getter != *self.port_info.polarities.get(&port).unwrap() { + if Getter != *unphased.port_info.polarities.get(&port).unwrap() { return Err(WrongPolarity); } - match &mut self.phased { + match phased { ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication { native_batches, .. } => { + ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => { let batch = native_batches.last_mut().unwrap(); if !batch.to_get.insert(port) { return Err(MultipleOpsOnPort); @@ -101,417 +107,405 @@ impl Connector { } } } + // entrypoint for caller. overwrites round result enum, and returns what happened pub fn sync(&mut self, timeout: Option) -> Result { - use SyncError::*; - match &mut self.phased { - ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication { - round_index, - neighborhood, - native_batches, - endpoint_manager, - round_result, - .. - } => { - let mut deadline = timeout.map(|to| Instant::now() + to); - let logger: &mut dyn Logger = &mut *self.logger; - // 1. run all proto components to Nonsync blockers - log!( - logger, - "~~~ SYNC called with timeout {:?}; starting round {}", - &timeout, - round_index - ); - let mut branching_proto_components = - HashMap::::default(); - let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = - self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); - log!(logger, "Nonsync running {} proto components...", unrun_components.len()); - while let Some((proto_component_id, mut component)) = unrun_components.pop() { - // TODO coalesce fields - log!( - logger, - "Nonsync running proto component with ID {:?}. {} to go after this", - proto_component_id, - unrun_components.len() - ); - let mut ctx = NonsyncProtoContext { - logger: &mut *logger, - port_info: &mut self.port_info, - id_manager: &mut self.id_manager, - proto_component_id, - unrun_components: &mut unrun_components, - proto_component_ports: &mut self - .proto_components - .get_mut(&proto_component_id) - .unwrap() - .ports, - }; - let blocker = component.state.nonsync_run(&mut ctx, &self.proto_description); - log!( - logger, - "proto component {:?} ran to nonsync blocker {:?}", - proto_component_id, - &blocker - ); - use NonsyncBlocker as B; - match blocker { - B::ComponentExit => drop(component), - B::Inconsistent => { - return Err(InconsistentProtoComponent(proto_component_id)) - } - B::SyncBlockStart => { - branching_proto_components.insert( - proto_component_id, - BranchingProtoComponent::initial(component), - ); - } - } + let Self { unphased, phased } = self; + match phased { + ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected), + ConnectorPhased::Communication(comm) => { + comm.round_result = Self::connected_sync(unphased, comm, timeout); + match &comm.round_result { + Ok(None) => unreachable!(), + Ok(Some(ok_result)) => Ok(ok_result.batch_index), + Err(sync_error) => Err(sync_error.clone()), } - log!( - logger, - "All {} proto components are now done with Nonsync phase", - branching_proto_components.len(), - ); + } + } + } - // NOTE: all msgs in outbox are of form (Getter, Payload) - let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![]; + // TODO make cu immutable + fn connected_sync( + cu: &mut ConnectorUnphased, + comm: &mut ConnectorCommunication, + timeout: Option, + ) -> Result, SyncError> { + use SyncError as Se; + let mut deadline = timeout.map(|to| Instant::now() + to); + // 1. run all proto components to Nonsync blockers + log!( + cu.logger, + "~~~ SYNC called with timeout {:?}; starting round {}", + &timeout, + comm.round_index + ); + let mut branching_proto_components = + HashMap::::default(); + let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = + cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); + log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len()); + while let Some((proto_component_id, mut component)) = unrun_components.pop() { + // TODO coalesce fields + log!( + cu.logger, + "Nonsync running proto component with ID {:?}. {} to go after this", + proto_component_id, + unrun_components.len() + ); + let mut ctx = NonsyncProtoContext { + logger: &mut *cu.logger, + port_info: &mut cu.port_info, + id_manager: &mut cu.id_manager, + proto_component_id, + unrun_components: &mut unrun_components, + proto_component_ports: &mut cu + .proto_components + .get_mut(&proto_component_id) + .unwrap() + .ports, + }; + let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description); + log!( + cu.logger, + "proto component {:?} ran to nonsync blocker {:?}", + proto_component_id, + &blocker + ); + use NonsyncBlocker as B; + match blocker { + B::ComponentExit => drop(component), + B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)), + B::SyncBlockStart => { + branching_proto_components + .insert(proto_component_id, BranchingProtoComponent::initial(component)); + } + } + } + log!( + cu.logger, + "All {} proto components are now done with Nonsync phase", + branching_proto_components.len(), + ); - // create the solution storage - let mut solution_storage = { - let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native)); - let c = self - .proto_components - .keys() - .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id))); - let e = neighborhood.children.iter().map(|&index| Route::Endpoint { index }); - SolutionStorage::new(n.chain(c).chain(e)) - }; - log!(logger, "Solution storage initialized"); + // NOTE: all msgs in outbox are of form (Getter, Payload) + let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![]; - // 2. kick off the native + // create the solution storage + let mut solution_storage = { + let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native)); + let c = cu + .proto_components + .keys() + .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id))); + let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index }); + SolutionStorage::new(n.chain(c).chain(e)) + }; + log!(cu.logger, "Solution storage initialized"); + + // 2. kick off the native + log!( + cu.logger, + "Translating {} native batches into branches...", + comm.native_batches.len() + ); + let mut branching_native = BranchingNative { branches: Default::default() }; + for (index, NativeBatch { to_get, to_put }) in comm.native_batches.drain(..).enumerate() { + let predicate = { + let mut predicate = Predicate::default(); + // assign trues + for &port in to_get.iter().chain(to_put.keys()) { + let var = cu.port_info.firing_var_for(port); + predicate.assigned.insert(var, true); + } + // assign falses + for &port in cu.native_ports.iter() { + let var = cu.port_info.firing_var_for(port); + predicate.assigned.entry(var).or_insert(false); + } + predicate + }; + log!(cu.logger, "Native branch {} has pred {:?}", index, &predicate); + + // put all messages + for (putter, payload) in to_put { + let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; + log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg); + // rely on invariant: sync batches respect port polarity + let getter = *cu.port_info.peers.get(&putter).unwrap(); + payloads_to_get.push((getter, msg)); + } + if to_get.is_empty() { log!( - logger, - "Translating {} native batches into branches...", - native_batches.len() + cu.logger, + "Native submitting solution for batch {} with {:?}", + index, + &predicate ); - let mut branching_native = BranchingNative { branches: Default::default() }; - for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate() - { - let predicate = { - let mut predicate = Predicate::default(); - // assign trues - for &port in to_get.iter().chain(to_put.keys()) { - let var = self.port_info.firing_var_for(port); - predicate.assigned.insert(var, true); - } - // assign falses - for &port in self.native_ports.iter() { - let var = self.port_info.firing_var_for(port); - predicate.assigned.entry(var).or_insert(false); - } - predicate - }; - log!(logger, "Native branch {} has pred {:?}", index, &predicate); + solution_storage.submit_and_digest_subtree_solution( + &mut *cu.logger, + Route::LocalComponent(LocalComponentId::Native), + predicate.clone(), + ); + } + let branch = NativeBranch { index, gotten: Default::default(), to_get }; + if let Some(existing) = branching_native.branches.insert(predicate, branch) { + // TODO + return Err(Se::IndistinguishableBatches([index, existing.index])); + } + } + log!(cu.logger, "Done translating native batches into branches"); + comm.native_batches.push(Default::default()); - // put all messages - for (putter, payload) in to_put { - let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - log!(logger, "Native branch {} sending msg {:?}", index, &msg); - // rely on invariant: sync batches respect port polarity - let getter = *self.port_info.peers.get(&putter).unwrap(); - payloads_to_get.push((getter, msg)); - } - if to_get.is_empty() { - log!( - logger, - "Native submitting solution for batch {} with {:?}", - index, - &predicate - ); - solution_storage.submit_and_digest_subtree_solution( - logger, - Route::LocalComponent(LocalComponentId::Native), - predicate.clone(), - ); + // run all proto components to their sync blocker + log!( + cu.logger, + "Running all {} proto components to their sync blocker...", + branching_proto_components.len() + ); + for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() { + let ConnectorUnphased { port_info, proto_description, .. } = cu; + let BranchingProtoComponent { ports, branches } = proto_component; + let mut swap = HashMap::default(); + let mut blocked = HashMap::default(); + // drain from branches --> blocked + let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked); + BranchingProtoComponent::drain_branches_to_blocked( + cd, + cu, + &mut solution_storage, + &mut payloads_to_get, + proto_component_id, + ports, + ); + // swap the blocked branches back + std::mem::swap(&mut blocked, branches); + } + log!(cu.logger, "All proto components are blocked"); + + log!(cu.logger, "Entering decision loop..."); + comm.endpoint_manager.undelay_all(); + let decision = 'undecided: loop { + // drain payloads_to_get, sending them through endpoints / feeding them to components + while let Some((getter, send_payload_msg)) = payloads_to_get.pop() { + assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); + match cu.port_info.routes.get(&getter).unwrap() { + Route::Endpoint { index } => { + let msg = Msg::CommMsg(CommMsg { + round_index: comm.round_index, + contents: CommMsgContents::SendPayload(send_payload_msg), + }); + comm.endpoint_manager.send_to(*index, &msg).unwrap(); } - let branch = NativeBranch { index, gotten: Default::default(), to_get }; - if let Some(existing) = branching_native.branches.insert(predicate, branch) { - // TODO - return Err(IndistinguishableBatches([index, existing.index])); + Route::LocalComponent(LocalComponentId::Native) => branching_native.feed_msg( + cu, + &mut solution_storage, + // &mut Pay + getter, + send_payload_msg, + ), + Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => { + if let Some(branching_component) = + branching_proto_components.get_mut(proto_component_id) + { + let proto_component_id = *proto_component_id; + // let ConnectorUnphased { port_info, proto_description, .. } = cu; + branching_component.feed_msg( + cu, + &mut solution_storage, + proto_component_id, + &mut payloads_to_get, + getter, + send_payload_msg, + ) + } } } - log!(logger, "Done translating native batches into branches"); - native_batches.push(Default::default()); + } - // run all proto components to their sync blocker - log!( - logger, - "Running all {} proto components to their sync blocker...", - branching_proto_components.len() - ); - for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() - { - let Self { port_info, proto_description, .. } = self; - let BranchingProtoComponent { ports, branches } = proto_component; - let mut swap = HashMap::default(); - let mut blocked = HashMap::default(); - // drain from branches --> blocked - let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked); - BranchingProtoComponent::drain_branches_to_blocked( - cd, - logger, - port_info, - proto_description, - &mut solution_storage, - |putter, m| { - let getter = *port_info.peers.get(&putter).unwrap(); - payloads_to_get.push((getter, m)); - }, - proto_component_id, - ports, - ); - // swap the blocked branches back - std::mem::swap(&mut blocked, branches); + // check if we have a solution yet + log!(cu.logger, "Check if we have any local decisions..."); + for solution in solution_storage.iter_new_local_make_old() { + log!(cu.logger, "New local decision with solution {:?}...", &solution); + match comm.neighborhood.parent { + Some(parent) => { + log!(cu.logger, "Forwarding to my parent {:?}", parent); + let suggestion = Decision::Success(solution); + let msg = Msg::CommMsg(CommMsg { + round_index: comm.round_index, + contents: CommMsgContents::Suggest { suggestion }, + }); + comm.endpoint_manager.send_to(parent, &msg).unwrap(); + } + None => { + log!(cu.logger, "No parent. Deciding on solution {:?}", &solution); + break 'undecided Decision::Success(solution); + } } - log!(logger, "All proto components are blocked"); + } - log!(logger, "Entering decision loop..."); - endpoint_manager.undelay_all(); - let decision = 'undecided: loop { - // drain payloads_to_get, sending them through endpoints / feeding them to components - while let Some((getter, send_payload_msg)) = payloads_to_get.pop() { - assert!(self.port_info.polarities.get(&getter).copied() == Some(Getter)); - match self.port_info.routes.get(&getter).unwrap() { - Route::Endpoint { index } => { + // stuck! make progress by receiving a msg + // try recv messages arriving through endpoints + log!(cu.logger, "No decision yet. Let's recv an endpoint msg..."); + { + let (endpoint_index, msg) = loop { + match comm.endpoint_manager.try_recv_any_comms(&mut *cu.logger, deadline)? { + None => { + log!(cu.logger, "Reached user-defined deadling without decision..."); + if let Some(parent) = comm.neighborhood.parent { + log!( + cu.logger, + "Sending failure request to parent index {}", + parent + ); let msg = Msg::CommMsg(CommMsg { - round_index: *round_index, - contents: CommMsgContents::SendPayload(send_payload_msg), + round_index: comm.round_index, + contents: CommMsgContents::Suggest { + suggestion: Decision::Failure, + }, }); - endpoint_manager.send_to(*index, &msg).unwrap(); - } - Route::LocalComponent(LocalComponentId::Native) => branching_native - .feed_msg( - logger, - &self.port_info, - &mut solution_storage, - getter, - send_payload_msg, - ), - Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => { - if let Some(branching_component) = - branching_proto_components.get_mut(proto_component_id) - { - let proto_component_id = *proto_component_id; - let Self { port_info, proto_description, .. } = self; - branching_component.feed_msg( - logger, - port_info, - proto_description, - &mut solution_storage, - proto_component_id, - |putter, m| { - let getter = *port_info.peers.get(&putter).unwrap(); - payloads_to_get.push((getter, m)); - }, - getter, - send_payload_msg, - ) - } + comm.endpoint_manager.send_to(parent, &msg).unwrap(); + } else { + log!(cu.logger, "As the leader, deciding on timeout"); + break 'undecided Decision::Failure; } + deadline = None; } + Some((endpoint_index, msg)) => break (endpoint_index, msg), } - - // check if we have a solution yet - log!(logger, "Check if we have any local decisions..."); - for solution in solution_storage.iter_new_local_make_old() { - log!(logger, "New local decision with solution {:?}...", &solution); - match neighborhood.parent { - Some(parent) => { - log!(logger, "Forwarding to my parent {:?}", parent); - let suggestion = Decision::Success(solution); - let msg = Msg::CommMsg(CommMsg { - round_index: *round_index, - contents: CommMsgContents::Suggest { suggestion }, - }); - endpoint_manager.send_to(parent, &msg).unwrap(); - } - None => { - log!(logger, "No parent. Deciding on solution {:?}", &solution); - break 'undecided Decision::Success(solution); - } + }; + log!(cu.logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg); + let comm_msg_contents = match msg { + Msg::SetupMsg(..) => { + log!(cu.logger, "Discarding setup message; that phase is over"); + continue 'undecided; + } + Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&comm.round_index) { + Ordering::Equal => comm_msg.contents, + Ordering::Less => { + log!( + cu.logger, + "We are in round {}, but msg is for round {}. Discard", + comm_msg.round_index, + comm.round_index, + ); + drop(comm_msg); + continue 'undecided; } + Ordering::Greater => { + log!( + cu.logger, + "We are in round {}, but msg is for round {}. Buffer", + comm_msg.round_index, + comm.round_index, + ); + comm.endpoint_manager + .delayed_messages + .push((endpoint_index, Msg::CommMsg(comm_msg))); + continue 'undecided; + } + }, + }; + match comm_msg_contents { + CommMsgContents::SendPayload(send_payload_msg) => { + let getter = + comm.endpoint_manager.endpoint_exts[endpoint_index].getter_for_incoming; + assert!(cu.port_info.polarities.get(&getter) == Some(&Getter)); + log!( + cu.logger, + "Msg routed to getter port {:?}. Buffer for recv loop", + getter, + ); + payloads_to_get.push((getter, send_payload_msg)); } - - // stuck! make progress by receiving a msg - // try recv messages arriving through endpoints - log!(logger, "No decision yet. Let's recv an endpoint msg..."); - { - let (endpoint_index, msg) = loop { - match endpoint_manager.try_recv_any_comms(logger, deadline)? { - None => { - log!( - logger, - "Reached user-defined deadling without decision..." + CommMsgContents::Suggest { suggestion } => { + // only accept this control msg through a child endpoint + if comm.neighborhood.children.contains(&endpoint_index) { + match suggestion { + Decision::Success(predicate) => { + // child solution contributes to local solution + log!(cu.logger, "Child provided solution {:?}", &predicate); + let route = Route::Endpoint { index: endpoint_index }; + solution_storage.submit_and_digest_subtree_solution( + &mut *cu.logger, + route, + predicate, ); - if let Some(parent) = neighborhood.parent { - log!( - logger, - "Sending failure request to parent index {}", - parent - ); - let msg = Msg::CommMsg(CommMsg { - round_index: *round_index, - contents: CommMsgContents::Suggest { - suggestion: Decision::Failure, - }, - }); - endpoint_manager.send_to(parent, &msg).unwrap(); - } else { - log!(logger, "As the leader, deciding on timeout"); - break 'undecided Decision::Failure; - } - deadline = None; } - Some((endpoint_index, msg)) => break (endpoint_index, msg), - } - }; - log!(logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg); - let comm_msg_contents = match msg { - Msg::SetupMsg(..) => { - log!(logger, "Discarding setup message; that phase is over"); - continue 'undecided; - } - Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(round_index) { - Ordering::Equal => comm_msg.contents, - Ordering::Less => { - log!( - logger, - "We are in round {}, but msg is for round {}. Discard", - comm_msg.round_index, - round_index, - ); - drop(comm_msg); - continue 'undecided; - } - Ordering::Greater => { - log!( - logger, - "We are in round {}, but msg is for round {}. Buffer", - comm_msg.round_index, - round_index, - ); - endpoint_manager - .delayed_messages - .push((endpoint_index, Msg::CommMsg(comm_msg))); - continue 'undecided; - } - }, - }; - match comm_msg_contents { - CommMsgContents::SendPayload(send_payload_msg) => { - let getter = endpoint_manager.endpoint_exts[endpoint_index] - .getter_for_incoming; - assert!(self.port_info.polarities.get(&getter) == Some(&Getter)); - log!( - logger, - "Msg routed to getter port {:?}. Buffer for recv loop", - getter, - ); - payloads_to_get.push((getter, send_payload_msg)); - } - CommMsgContents::Suggest { suggestion } => { - // only accept this control msg through a child endpoint - if neighborhood.children.contains(&endpoint_index) { - match suggestion { - Decision::Success(predicate) => { - // child solution contributes to local solution + Decision::Failure => { + match comm.neighborhood.parent { + None => { log!( - logger, - "Child provided solution {:?}", - &predicate - ); - let route = Route::Endpoint { index: endpoint_index }; - solution_storage.submit_and_digest_subtree_solution( - logger, route, predicate, + cu.logger, + "As sink, I decide on my child's failure" ); + // I am the sink. Decide on failed + break 'undecided Decision::Failure; + } + Some(parent) => { + log!(cu.logger, "Forwarding failure through my parent endpoint {:?}", parent); + // I've got a parent. Forward the failure suggestion. + let msg = Msg::CommMsg(CommMsg { + round_index: comm.round_index, + contents: CommMsgContents::Suggest { suggestion }, + }); + comm.endpoint_manager.send_to(parent, &msg).unwrap(); } - Decision::Failure => match neighborhood.parent { - None => { - log!( - logger, - "As sink, I decide on my child's failure" - ); - // I am the sink. Decide on failed - break 'undecided Decision::Failure; - } - Some(parent) => { - log!(logger, "Forwarding failure through my parent endpoint {:?}", parent); - // I've got a parent. Forward the failure suggestion. - let msg = Msg::CommMsg(CommMsg { - round_index: *round_index, - contents: CommMsgContents::Suggest { - suggestion, - }, - }); - endpoint_manager.send_to(parent, &msg).unwrap(); - } - }, } - } else { - log!(logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, endpoint_index); - } - } - CommMsgContents::Announce { decision } => { - if Some(endpoint_index) == neighborhood.parent { - // adopt this decision - break 'undecided decision; - } else { - log!(logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, endpoint_index); } } + } else { + log!( + cu.logger, + "Discarding suggestion {:?} from non-child endpoint idx {:?}", + &suggestion, + endpoint_index + ); + } + } + CommMsgContents::Announce { decision } => { + if Some(endpoint_index) == comm.neighborhood.parent { + // adopt this decision + break 'undecided decision; + } else { + log!( + cu.logger, + "Discarding announcement {:?} from non-parent endpoint idx {:?}", + &decision, + endpoint_index + ); } } - log!(logger, "Endpoint msg recv done"); - }; - log!(logger, "Committing to decision {:?}!", &decision); - - // propagate the decision to children - let msg = Msg::CommMsg(CommMsg { - round_index: *round_index, - contents: CommMsgContents::Announce { decision: decision.clone() }, - }); - log!( - logger, - "Announcing decision {:?} through child endpoints {:?}", - &msg, - &neighborhood.children - ); - for &child in neighborhood.children.iter() { - endpoint_manager.send_to(child, &msg).unwrap(); } + } + log!(cu.logger, "Endpoint msg recv done"); + }; + log!(cu.logger, "Committing to decision {:?}!", &decision); - *round_result = match decision { - Decision::Failure => Err(RoundFailure), - Decision::Success(predicate) => { - // commit changes to component states - self.proto_components.clear(); - self.proto_components.extend( - branching_proto_components - .into_iter() - .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), - ); - Ok(Some(branching_native.collapse_with(&predicate))) - } - }; - log!(logger, "Updated round_result to {:?}", round_result); + // propagate the decision to children + let msg = Msg::CommMsg(CommMsg { + round_index: comm.round_index, + contents: CommMsgContents::Announce { decision: decision.clone() }, + }); + log!( + cu.logger, + "Announcing decision {:?} through child endpoints {:?}", + &msg, + &comm.neighborhood.children + ); + for &child in comm.neighborhood.children.iter() { + comm.endpoint_manager.send_to(child, &msg).unwrap(); + } - let returning = round_result - .as_ref() - .map(|option| option.as_ref().unwrap().0) - .map_err(|sync_error| sync_error.clone()); - log!(logger, "Returning {:?}", &returning); - returning + match decision { + Decision::Failure => Err(Se::RoundFailure), + Decision::Success(predicate) => { + // commit changes to component states + cu.proto_components.clear(); + cu.proto_components.extend( + branching_proto_components + .into_iter() + .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), + ); + Ok(Some(branching_native.collapse_with(&predicate))) } } } @@ -519,21 +513,21 @@ impl Connector { impl BranchingNative { fn feed_msg( &mut self, - logger: &mut dyn Logger, - port_info: &PortInfo, + cu: &mut ConnectorUnphased, solution_storage: &mut SolutionStorage, + // payloads_to_get: &mut Vec<(PortId, CommMsgContents)>, getter: PortId, send_payload_msg: SendPayloadMsg, ) { - log!(logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); - assert!(port_info.polarities.get(&getter).copied() == Some(Getter)); + log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); + assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); let mut draining = HashMap::default(); let finished = &mut self.branches; std::mem::swap(&mut draining, finished); for (predicate, mut branch) in draining.drain() { - log!(logger, "visiting native branch {:?} with {:?}", &branch, &predicate); + log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); // check if this branch expects to receive it - let var = port_info.firing_var_for(getter); + let var = cu.port_info.firing_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); assert!(was.is_none()); @@ -541,7 +535,7 @@ impl BranchingNative { if branch.to_get.is_empty() { let route = Route::LocalComponent(LocalComponentId::Native); solution_storage.submit_and_digest_subtree_solution( - logger, + &mut *cu.logger, route, predicate.clone(), ); @@ -550,7 +544,7 @@ impl BranchingNative { if predicate.query(var) != Some(true) { // optimization. Don't bother trying this branch log!( - logger, + cu.logger, "skipping branch with {:?} that doesn't want the message (fastpath)", &predicate ); @@ -562,7 +556,7 @@ impl BranchingNative { Csr::Nonexistant => { // this branch does not receive the message log!( - logger, + cu.logger, "skipping branch with {:?} that doesn't want the message (slowpath)", &predicate ); @@ -571,7 +565,7 @@ impl BranchingNative { Csr::Equivalent | Csr::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); - log!(logger, "branch pred covers it! Accept the msg"); + log!(cu.logger, "branch pred covers it! Accept the msg"); finished.insert(predicate, branch); } Csr::LatterNotFormer => { @@ -580,7 +574,7 @@ impl BranchingNative { let predicate2 = send_payload_msg.predicate.clone(); feed_branch(&mut branch2, &predicate2); log!( - logger, + cu.logger, "payload pred {:?} covers branch pred {:?}", &predicate2, &predicate @@ -593,7 +587,7 @@ impl BranchingNative { let mut branch2 = branch.clone(); feed_branch(&mut branch2, &predicate2); log!( - logger, + cu.logger, "new subsuming pred created {:?}. forking and feeding", &predicate2 ); @@ -603,38 +597,40 @@ impl BranchingNative { } } } - fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap) { + fn collapse_with(self, solution_predicate: &Predicate) -> RoundOk { for (branch_predicate, branch) in self.branches { if solution_predicate.satisfies(&branch_predicate) { let NativeBranch { index, gotten, .. } = branch; - return (index, gotten); + return RoundOk { batch_index: index, gotten }; } } panic!("Native had no branches matching pred {:?}", solution_predicate); } } + +// |putter, m| { +// let getter = *cu.port_info.peers.get(&putter).unwrap(); +// payloads_to_get.push((getter, m)); +// }, impl BranchingProtoComponent { fn drain_branches_to_blocked( cd: CyclicDrainer, - // - logger: &mut dyn Logger, - port_info: &PortInfo, - proto_description: &ProtocolDescription, + cu: &mut ConnectorUnphased, solution_storage: &mut SolutionStorage, - mut outbox_unqueue: impl FnMut(PortId, SendPayloadMsg), + payloads_to_get: &mut Vec<(PortId, SendPayloadMsg)>, proto_component_id: ProtoComponentId, ports: &HashSet, ) { cd.cylic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { - logger, + logger: &mut *cu.logger, predicate: &predicate, - port_info, + port_info: &cu.port_info, inbox: &branch.inbox, }; - let blocker = branch.state.sync_run(&mut ctx, proto_description); + let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description); log!( - logger, + cu.logger, "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", proto_component_id, &predicate, @@ -649,12 +645,12 @@ impl BranchingProtoComponent { B::SyncBlockEnd => { // make concrete all variables for &port in ports.iter() { - let var = port_info.firing_var_for(port); + let var = cu.port_info.firing_var_for(port); predicate.assigned.entry(var).or_insert(false); } // submit solution for this component solution_storage.submit_and_digest_subtree_solution( - logger, + &mut *cu.logger, Route::LocalComponent(LocalComponentId::Proto(proto_component_id)), predicate.clone(), ); @@ -668,7 +664,7 @@ impl BranchingProtoComponent { } B::CouldntCheckFiring(port) => { // sanity check - let var = port_info.firing_var_for(port); + let var = cu.port_info.firing_var_for(port); assert!(predicate.query(var).is_none()); // keep forks in "unblocked" drainer.add_input(predicate.clone().inserted(var, false), branch.clone()); @@ -676,21 +672,20 @@ impl BranchingProtoComponent { } B::PutMsg(putter, payload) => { // sanity check - assert_eq!(Some(&Putter), port_info.polarities.get(&putter)); + assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter)); // overwrite assignment - let var = port_info.firing_var_for(putter); + let var = cu.port_info.firing_var_for(putter); let was = predicate.assigned.insert(var, true); if was == Some(false) { - log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); + log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" - log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); - outbox_unqueue( - putter, - SendPayloadMsg { predicate: predicate.clone(), payload }, - ); + log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); + let getter = *cu.port_info.peers.get(&putter).unwrap(); + let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; + payloads_to_get.push((getter, msg)); drainer.add_input(predicate, branch); } } @@ -699,15 +694,14 @@ impl BranchingProtoComponent { } fn feed_msg( &mut self, - logger: &mut dyn Logger, - port_info: &PortInfo, - proto_description: &ProtocolDescription, + cu: &mut ConnectorUnphased, solution_storage: &mut SolutionStorage, proto_component_id: ProtoComponentId, - outbox_unqueue: impl FnMut(PortId, SendPayloadMsg), + payloads_to_get: &mut Vec<(PortId, SendPayloadMsg)>, getter: PortId, send_payload_msg: SendPayloadMsg, ) { + let logger = &mut *cu.logger; log!( logger, "feeding proto component {:?} getter {:?} {:?}", @@ -760,17 +754,15 @@ impl BranchingProtoComponent { let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked); BranchingProtoComponent::drain_branches_to_blocked( cd, - logger, - port_info, - proto_description, + cu, solution_storage, - outbox_unqueue, + payloads_to_get, proto_component_id, ports, ); // swap the blocked branches back std::mem::swap(&mut blocked, branches); - log!(logger, "component settles down with branches: {:?}", branches.keys()); + log!(cu.logger, "component settles down with branches: {:?}", branches.keys()); } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; @@ -957,7 +949,6 @@ impl ProtoComponentBranch { assert!(was.is_none()) } } - impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { fn new( input: &'a mut HashMap,