diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 266e9c33be61cf7606ee988fbbd048d77e02a07d..921d409c7b68d2e5b83d5bae2c3a4b2675a06a45 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -5,6 +5,7 @@ use crate::common::*; struct BranchingNative { branches: HashMap, } +#[derive(Clone, Debug)] struct NativeBranch { index: usize, gotten: HashMap, @@ -77,7 +78,8 @@ impl NonsyncProtoContext<'_> { } impl SyncProtoContext<'_> { pub fn is_firing(&mut self, port: PortId) -> Option { - self.predicate.query(port) + let var = self.port_info.firing_var_for(port); + self.predicate.query(var) } pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { self.inbox.get(&port) @@ -116,6 +118,36 @@ impl Connector { } } } + pub fn next_batch(&mut self) -> Result { + // returns index of new batch + use NextBatchError::*; + match &mut self.phased { + ConnectorPhased::Setup { .. } => Err(NotConnected), + ConnectorPhased::Communication { native_batches, .. } => { + native_batches.push(Default::default()); + Ok(native_batches.len() - 1) + } + } + } + pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> { + use PortOpError::*; + if !self.native_ports.contains(&port) { + return Err(PortUnavailable); + } + if Getter != *self.port_info.polarities.get(&port).unwrap() { + return Err(WrongPolarity); + } + match &mut self.phased { + ConnectorPhased::Setup { .. } => Err(NotConnected), + ConnectorPhased::Communication { native_batches, .. } => { + let batch = native_batches.last_mut().unwrap(); + if !batch.to_get.insert(port) { + return Err(MultipleOpsOnPort); + } + Ok(()) + } + } + } pub fn sync(&mut self, timeout: Duration) -> Result { use SyncError::*; match &mut self.phased { @@ -128,7 +160,7 @@ impl Connector { round_result, .. } => { - let _deadline = Instant::now() + timeout; + let deadline = Instant::now() + timeout; let logger: &mut dyn Logger = &mut *self.logger; // 1. run all proto components to Nonsync blockers log!( @@ -189,8 +221,8 @@ impl Connector { branching_proto_components.len(), ); - // NOTE: all msgs in outbox are of form (Putter, Payload) - let mut payload_outbox: Vec<(PortId, SendPayloadMsg)> = vec![]; + // NOTE: all msgs in outbox are of form (Getter, Payload) + let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![]; // create the solution storage let mut solution_storage = { @@ -218,21 +250,25 @@ impl Connector { let mut predicate = Predicate::default(); // assign trues for &port in to_get.iter().chain(to_put.keys()) { - predicate.assigned.insert(port, true); + let var = self.port_info.firing_var_for(port); + predicate.assigned.insert(var, true); } // assign falses for &port in self.native_ports.iter() { - predicate.assigned.entry(port).or_insert(false); + let var = self.port_info.firing_var_for(port); + predicate.assigned.entry(var).or_insert(false); } predicate }; log!(logger, "Native branch {} has pred {:?}", index, &predicate); // put all messages - for (port, payload) in to_put { - let msg = SendPayloadMsg { payload_predicate: predicate.clone(), payload }; + for (putter, payload) in to_put { + let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; log!(logger, "Native branch {} sending msg {:?}", index, &msg); - payload_outbox.push((port, msg)); + // rely on invariant: sync batches respect port polarity + let getter = *self.port_info.peers.get(&putter).unwrap(); + payloads_to_get.push((getter, msg)); } if to_get.is_empty() { log!(logger, "Native submitting trivial solution for index {}", index); @@ -276,6 +312,7 @@ impl Connector { let mut ctx = SyncProtoContext { logger, predicate: &predicate, + port_info: &self.port_info, proto_component_id: *proto_component_id, inbox: &branch.inbox, }; @@ -296,7 +333,8 @@ impl Connector { B::SyncBlockEnd => { // make concrete all variables for &port in proto_component.ports.iter() { - predicate.assigned.entry(port).or_insert(false); + let var = self.port_info.firing_var_for(port); + predicate.assigned.entry(var).or_insert(false); } // submit solution for this component solution_storage.submit_and_digest_subtree_solution( @@ -311,14 +349,15 @@ impl Connector { } B::CouldntReadMsg(port) => { // move to "blocked" - assert!(predicate.query(port).is_none()); + let var = self.port_info.firing_var_for(port); + assert!(predicate.query(var).is_none()); assert!(!branch.inbox.contains_key(&port)); blocked.insert(predicate, branch); } B::CouldntCheckFiring(port) => { // sanity check - assert!(predicate.query(port).is_none()); let var = self.port_info.firing_var_for(port); + assert!(predicate.query(var).is_none()); // keep forks in "unblocked" unblocked_to.insert( predicate.clone().inserted(var, false), @@ -326,24 +365,28 @@ impl Connector { ); unblocked_to.insert(predicate.inserted(var, true), branch); } - B::PutMsg(port, payload) => { + B::PutMsg(putter, payload) => { // sanity check - assert_eq!(Some(&Putter), self.port_info.polarities.get(&port)); + assert_eq!( + Some(&Putter), + self.port_info.polarities.get(&putter) + ); // overwrite assignment - let var = self.port_info.firing_var_for(port); + let var = self.port_info.firing_var_for(putter); let was = predicate.assigned.insert(var, true); if was == Some(false) { - log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var); + log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" - log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, port, var); - payload_outbox.push(( - port, + let getter = *self.port_info.peers.get(&putter).unwrap(); + log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); + payloads_to_get.push(( + getter, SendPayloadMsg { - payload_predicate: predicate.clone(), + predicate: predicate.clone(), payload, }, )); @@ -358,19 +401,54 @@ impl Connector { log!(logger, "All proto components are blocked"); log!(logger, "Entering decision loop..."); + endpoint_manager.undelay_all(); let decision = 'undecided: loop { - // check if we already have a solution + // drain payloads_to_get, sending them through endpoints / feeding them to components + while let Some((getter, send_payload_msg)) = payloads_to_get.pop() { + assert!(self.port_info.polarities.get(&getter).copied() == Some(Getter)); + match self.port_info.routes.get(&getter).unwrap() { + Route::Endpoint { index } => { + let msg = Msg::CommMsg(CommMsg { + round_index: *round_index, + contents: CommMsgContents::SendPayload(send_payload_msg), + }); + endpoint_manager.send_to(*index, &msg).unwrap(); + } + Route::LocalComponent(LocalComponentId::Native) => branching_native + .feed_msg( + logger, + &self.port_info, + &mut solution_storage, + getter, + send_payload_msg, + ), + Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => { + if let Some(branching_component) = + branching_proto_components.get_mut(&proto_component_id) + { + branching_component.feed_msg( + logger, + &self.port_info, + &mut solution_storage, + getter, + send_payload_msg, + ) + } + } + } + } + + // check if we have a solution yet log!(logger, "Check if we have any local decisions..."); for solution in solution_storage.iter_new_local_make_old() { log!(logger, "New local decision with solution {:?}...", &solution); match neighborhood.parent { Some(parent) => { log!(logger, "Forwarding to my parent {:?}", parent); + let suggestion = Decision::Success(solution); let msg = Msg::CommMsg(CommMsg { round_index: *round_index, - contents: CommMsgContents::Elaborate { - partial_oracle: solution, - }, + contents: CommMsgContents::Suggest { suggestion }, }); endpoint_manager.send_to(parent, &msg).unwrap(); } @@ -381,7 +459,109 @@ impl Connector { } } - // TODO send / recv messages + // stuck! make progress by receiving a msg + // try recv messages arriving through endpoints + log!(logger, "No decision yet. Let's recv an endpoint msg..."); + { + let (endpoint_index, msg) = + endpoint_manager.try_recv_any(deadline).unwrap(); + log!(logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg); + let comm_msg_contents = match msg { + Msg::SetupMsg(..) => { + log!(logger, "Discarding setup message; that phase is over"); + continue 'undecided; + } + Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(round_index) { + Ordering::Equal => comm_msg.contents, + Ordering::Less => { + log!( + logger, + "We are in round {}, but msg is for round {}. Discard", + comm_msg.round_index, + round_index, + ); + drop(comm_msg); + continue 'undecided; + } + Ordering::Greater => { + log!( + logger, + "We are in round {}, but msg is for round {}. Buffer", + comm_msg.round_index, + round_index, + ); + endpoint_manager + .delayed_messages + .push((endpoint_index, Msg::CommMsg(comm_msg))); + continue 'undecided; + } + }, + }; + match comm_msg_contents { + CommMsgContents::SendPayload(send_payload_msg) => { + let getter = endpoint_manager.endpoint_exts[endpoint_index] + .getter_for_incoming; + assert!(self.port_info.polarities.get(&getter) == Some(&Getter)); + log!( + logger, + "Msg routed to getter port {:?}. Buffer for recv loop", + getter, + ); + payloads_to_get.push((getter, send_payload_msg)); + } + CommMsgContents::Suggest { suggestion } => { + // only accept this control msg through a child endpoint + if neighborhood.children.binary_search(&endpoint_index).is_ok() { + match suggestion { + Decision::Success(predicate) => { + // child solution contributes to local solution + log!( + logger, + "Child provided solution {:?}", + &predicate + ); + let route = Route::Endpoint { index: endpoint_index }; + solution_storage.submit_and_digest_subtree_solution( + logger, route, predicate, + ); + } + Decision::Failure => match neighborhood.parent { + None => { + log!( + logger, + "As sink, I decide on my child's failure" + ); + // I am the sink. Decide on failed + break 'undecided Decision::Failure; + } + Some(parent) => { + log!(logger, "Forwarding failure through my parent endpoint {:?}", parent); + // I've got a parent. Forward the failure suggestion. + let msg = Msg::CommMsg(CommMsg { + round_index: *round_index, + contents: CommMsgContents::Suggest { + suggestion, + }, + }); + endpoint_manager.send_to(parent, &msg).unwrap(); + } + }, + } + } else { + log!(logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, endpoint_index); + } + } + CommMsgContents::Announce { decision } => { + if Some(endpoint_index) == neighborhood.parent { + // adopt this decision + break 'undecided decision; + } else { + log!(logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, endpoint_index); + } + } + } + } + log!(logger, "Endpoint msg recv done"); }; log!(logger, "Committing to decision {:?}!", &decision); @@ -426,6 +606,72 @@ impl Connector { } } impl BranchingNative { + fn feed_msg( + &mut self, + logger: &mut dyn Logger, + port_info: &PortInfo, + solution_storage: &mut SolutionStorage, + getter: PortId, + send_payload_msg: SendPayloadMsg, + ) { + assert!(port_info.polarities.get(&getter).copied() == Some(Getter)); + println!("BEFORE {:#?}", &self.branches); + let mut draining = HashMap::default(); + let finished = &mut self.branches; + std::mem::swap(&mut draining, finished); + for (predicate, mut branch) in draining.drain() { + // check if this branch expects to receive it + let var = port_info.firing_var_for(getter); + let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { + let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); + assert!(was.is_none()); + branch.to_get.remove(&getter); + if branch.to_get.is_empty() { + let route = Route::LocalComponent(LocalComponentId::Native); + solution_storage.submit_and_digest_subtree_solution( + logger, + route, + predicate.clone(), + ); + } + }; + if predicate.query(var) != Some(true) { + // optimization. Don't bother trying this branch + finished.insert(predicate, branch); + continue; + } + use CommonSatResult as Csr; + match predicate.common_satisfier(&send_payload_msg.predicate) { + Csr::Equivalent | Csr::FormerNotLatter => { + // retain the existing predicate, but add this payload + feed_branch(&mut branch, &predicate); + finished.insert(predicate, branch); + } + Csr::Nonexistant => { + // this branch does not receive the message + finished.insert(predicate, branch); + } + Csr::LatterNotFormer => { + // fork branch, give fork the message and payload predicate + let mut branch2 = branch.clone(); + // original branch untouched + finished.insert(predicate, branch); + let predicate2 = send_payload_msg.predicate.clone(); + feed_branch(&mut branch2, &predicate2); + finished.insert(predicate2, branch2); + } + Csr::New(new_predicate) => { + // fork branch, give fork the message and the new predicate + let mut branch2 = branch.clone(); + // original branch untouched + finished.insert(predicate, branch); + feed_branch(&mut branch2, &new_predicate); + finished.insert(new_predicate, branch2); + } + } + } + println!("AFTER {:#?}", &self.branches); + } fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap) { for (branch_predicate, branch) in self.branches { if branch_predicate.satisfies(solution_predicate) { @@ -437,6 +683,16 @@ impl BranchingNative { } } impl BranchingProtoComponent { + fn feed_msg( + &mut self, + _logger: &mut dyn Logger, + _port_info: &PortInfo, + _solution_storage: &mut SolutionStorage, + _getter: PortId, + _send_payload_msg: SendPayloadMsg, + ) { + todo!() + } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; for (branch_predicate, branch) in branches {