diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 8c71e3c3b8e83d32f2d7fc35058766d520740164..b4991f68b89f9b470feb528b563867ed5c1436b1 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -19,15 +19,24 @@ struct SolutionStorage { subtree_solutions: Vec>, subtree_id_to_index: HashMap, } +#[derive(Debug)] struct BranchingProtoComponent { ports: HashSet, branches: HashMap, } -#[derive(Clone)] +#[derive(Debug, Clone)] struct ProtoComponentBranch { inbox: HashMap, state: ComponentState, } +struct CyclicDrainer<'a, K: Eq + Hash, V> { + input: &'a mut HashMap, + inner: CyclicDrainInner<'a, K, V>, +} +struct CyclicDrainInner<'a, K: Eq + Hash, V> { + swap: &'a mut HashMap, + output: &'a mut HashMap, +} //////////////// impl NonsyncProtoContext<'_> { @@ -297,110 +306,29 @@ impl Connector { "Running all {} proto components to their sync blocker...", branching_proto_components.len() ); - for (proto_component_id, proto_component) in branching_proto_components.iter_mut() { - // run this component to sync blocker in-place - log!( + for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() + { + let Self { port_info, proto_description, .. } = self; + let BranchingProtoComponent { ports, branches } = proto_component; + let mut swap = HashMap::default(); + let mut blocked = HashMap::default(); + // drain from branches --> blocked + let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked); + BranchingProtoComponent::drain_branches_to_blocked( + cd, logger, - "Running proto component with id {:?} to blocker...", - proto_component_id + port_info, + proto_description, + &mut solution_storage, + |putter, m| { + let getter = *port_info.peers.get(&putter).unwrap(); + payloads_to_get.push((getter, m)); + }, + proto_component_id, + ports, ); - let blocked = &mut proto_component.branches; - let [unblocked_from, unblocked_to] = [ - &mut HashMap::::default(), - &mut Default::default(), - ]; - // DRAIN-AND-POPULATE PATTERN: DRAINING unblocked into blocked while POPULATING unblocked - std::mem::swap(unblocked_from, blocked); - while !unblocked_from.is_empty() { - for (mut predicate, mut branch) in unblocked_from.drain() { - let mut ctx = SyncProtoContext { - logger, - predicate: &predicate, - port_info: &self.port_info, - proto_component_id: *proto_component_id, - inbox: &branch.inbox, - }; - let blocker = branch.state.sync_run(&mut ctx, &self.proto_description); - log!( - logger, - "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", - proto_component_id, - &predicate, - &blocker, - ); - use SyncBlocker as B; - match blocker { - B::Inconsistent => { - // branch is inconsistent. throw it away - drop((predicate, branch)); - } - B::SyncBlockEnd => { - // make concrete all variables - for &port in proto_component.ports.iter() { - let var = self.port_info.firing_var_for(port); - predicate.assigned.entry(var).or_insert(false); - } - // submit solution for this component - solution_storage.submit_and_digest_subtree_solution( - logger, - Route::LocalComponent(LocalComponentId::Proto( - *proto_component_id, - )), - predicate.clone(), - ); - // move to "blocked" - blocked.insert(predicate, branch); - } - B::CouldntReadMsg(port) => { - // move to "blocked" - let var = self.port_info.firing_var_for(port); - assert!(predicate.query(var).is_none()); - assert!(!branch.inbox.contains_key(&port)); - blocked.insert(predicate, branch); - } - B::CouldntCheckFiring(port) => { - // sanity check - let var = self.port_info.firing_var_for(port); - assert!(predicate.query(var).is_none()); - // keep forks in "unblocked" - unblocked_to.insert( - predicate.clone().inserted(var, false), - branch.clone(), - ); - unblocked_to.insert(predicate.inserted(var, true), branch); - } - B::PutMsg(putter, payload) => { - // sanity check - assert_eq!( - Some(&Putter), - self.port_info.polarities.get(&putter) - ); - // overwrite assignment - let var = self.port_info.firing_var_for(putter); - - let was = predicate.assigned.insert(var, true); - if was == Some(false) { - log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); - // discard forever - drop((predicate, branch)); - } else { - // keep in "unblocked" - let getter = *self.port_info.peers.get(&putter).unwrap(); - log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); - payloads_to_get.push(( - getter, - SendPayloadMsg { - predicate: predicate.clone(), - payload, - }, - )); - unblocked_to.insert(predicate, branch); - } - } - } - } - std::mem::swap(unblocked_from, unblocked_to); - } + // swap the blocked branches back + std::mem::swap(&mut blocked, branches); } log!(logger, "All proto components are blocked"); @@ -428,12 +356,20 @@ impl Connector { ), Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => { if let Some(branching_component) = - branching_proto_components.get_mut(&proto_component_id) + branching_proto_components.get_mut(proto_component_id) { + let proto_component_id = *proto_component_id; + let Self { port_info, proto_description, .. } = self; branching_component.feed_msg( logger, - &self.port_info, + port_info, + proto_description, &mut solution_storage, + proto_component_id, + |putter, m| { + let getter = *port_info.peers.get(&putter).unwrap(); + payloads_to_get.push((getter, m)); + }, getter, send_payload_msg, ) @@ -624,6 +560,7 @@ impl BranchingNative { let finished = &mut self.branches; std::mem::swap(&mut draining, finished); for (predicate, mut branch) in draining.drain() { + log!(logger, "visiting native branch {:?} with {:?}", &branch, &predicate); // check if this branch expects to receive it let var = port_info.firing_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { @@ -705,16 +642,185 @@ impl BranchingNative { panic!("Native had no branches matching pred {:?}", solution_predicate); } } + +impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { + fn new( + input: &'a mut HashMap, + swap: &'a mut HashMap, + output: &'a mut HashMap, + ) -> Self { + Self { input, inner: CyclicDrainInner { swap, output } } + } + fn cylic_drain(self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>)) { + let Self { input, inner: CyclicDrainInner { swap, output } } = self; + // assert!(swap.is_empty()); + while !input.is_empty() { + for (k, v) in input.drain() { + func(k, v, CyclicDrainInner { swap, output }) + } + } + } +} +impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { + fn add_input(&mut self, k: K, v: V) { + self.swap.insert(k, v); + } + fn add_output(&mut self, k: K, v: V) { + self.output.insert(k, v); + } +} + +impl ProtoComponentBranch { + fn feed_msg(&mut self, getter: PortId, payload: Payload) { + let was = self.inbox.insert(getter, payload); + assert!(was.is_none()) + } +} impl BranchingProtoComponent { + fn drain_branches_to_blocked( + cd: CyclicDrainer, + // + logger: &mut dyn Logger, + port_info: &PortInfo, + proto_description: &ProtocolDescription, + solution_storage: &mut SolutionStorage, + mut outbox_unqueue: impl FnMut(PortId, SendPayloadMsg), + proto_component_id: ProtoComponentId, + ports: &HashSet, + ) { + cd.cylic_drain(|mut predicate, mut branch, mut drainer| { + let mut ctx = SyncProtoContext { + logger, + predicate: &predicate, + port_info, + proto_component_id, + inbox: &branch.inbox, + }; + let blocker = branch.state.sync_run(&mut ctx, proto_description); + log!( + logger, + "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", + proto_component_id, + &predicate, + &blocker, + ); + use SyncBlocker as B; + match blocker { + B::Inconsistent => { + // branch is inconsistent. throw it away + drop((predicate, branch)); + } + B::SyncBlockEnd => { + // make concrete all variables + for &port in ports.iter() { + let var = port_info.firing_var_for(port); + predicate.assigned.entry(var).or_insert(false); + } + // submit solution for this component + solution_storage.submit_and_digest_subtree_solution( + logger, + Route::LocalComponent(LocalComponentId::Proto(proto_component_id)), + predicate.clone(), + ); + // move to "blocked" + drainer.add_output(predicate, branch); + } + B::CouldntReadMsg(port) => { + // move to "blocked" + assert!(!branch.inbox.contains_key(&port)); + drainer.add_output(predicate, branch); + } + B::CouldntCheckFiring(port) => { + // sanity check + let var = port_info.firing_var_for(port); + assert!(predicate.query(var).is_none()); + // keep forks in "unblocked" + drainer.add_input(predicate.clone().inserted(var, false), branch.clone()); + drainer.add_input(predicate.inserted(var, true), branch); + } + B::PutMsg(putter, payload) => { + // sanity check + assert_eq!(Some(&Putter), port_info.polarities.get(&putter)); + // overwrite assignment + let var = port_info.firing_var_for(putter); + + let was = predicate.assigned.insert(var, true); + if was == Some(false) { + log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); + // discard forever + drop((predicate, branch)); + } else { + // keep in "unblocked" + log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); + outbox_unqueue( + putter, + SendPayloadMsg { predicate: predicate.clone(), payload }, + ); + drainer.add_input(predicate, branch); + } + } + } + }); + } fn feed_msg( &mut self, - _logger: &mut dyn Logger, - _port_info: &PortInfo, - _solution_storage: &mut SolutionStorage, - _getter: PortId, - _send_payload_msg: SendPayloadMsg, + logger: &mut dyn Logger, + port_info: &PortInfo, + proto_description: &ProtocolDescription, + solution_storage: &mut SolutionStorage, + proto_component_id: ProtoComponentId, + outbox_unqueue: impl FnMut(PortId, SendPayloadMsg), + getter: PortId, + send_payload_msg: SendPayloadMsg, ) { - todo!() + let BranchingProtoComponent { branches, ports } = self; + let mut unblocked = HashMap::default(); + let mut blocked = HashMap::default(); + // partition drain from branches -> {unblocked, blocked} + for (predicate, mut branch) in branches.drain() { + use CommonSatResult as Csr; + match predicate.common_satisfier(&send_payload_msg.predicate) { + Csr::Nonexistant => { + // this branch does not receive the message + blocked.insert(predicate, branch); + } + Csr::Equivalent | Csr::FormerNotLatter => { + // retain the existing predicate, but add this payload + branch.feed_msg(getter, send_payload_msg.payload.clone()); + unblocked.insert(predicate, branch); + } + Csr::LatterNotFormer => { + // fork branch, give fork the message and payload predicate. original branch untouched + let mut branch2 = branch.clone(); + let predicate2 = send_payload_msg.predicate.clone(); + branch2.feed_msg(getter, send_payload_msg.payload.clone()); + blocked.insert(predicate, branch); + unblocked.insert(predicate2, branch2); + } + Csr::New(predicate2) => { + // fork branch, give fork the message and the new predicate. original branch untouched + let mut branch2 = branch.clone(); + branch2.feed_msg(getter, send_payload_msg.payload.clone()); + blocked.insert(predicate, branch); + unblocked.insert(predicate2, branch2); + } + } + } + // drain from unblocked --> blocked + let mut swap = HashMap::default(); + let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked); + BranchingProtoComponent::drain_branches_to_blocked( + cd, + logger, + port_info, + proto_description, + solution_storage, + outbox_unqueue, + proto_component_id, + ports, + ); + // swap the blocked branches back + std::mem::swap(&mut blocked, branches); } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; @@ -836,583 +942,3 @@ impl SolutionStorage { } } } - -// impl ControllerEphemeral { -// fn is_clear(&self) -> bool { -// self.solution_storage.is_clear() -// && self.poly_n.is_none() -// && self.poly_ps.is_empty() -// && self.mono_ps.is_empty() -// && self.port_to_holder.is_empty() -// } -// fn clear(&mut self) { -// self.solution_storage.clear(); -// self.poly_n.take(); -// self.poly_ps.clear(); -// self.port_to_holder.clear(); -// } -// } -// impl Into for MonoP { -// fn into(self) -> PolyP { -// PolyP { -// complete: Default::default(), -// incomplete: hashmap! { -// Predicate::new_trivial() => -// BranchP { -// state: self.state, -// inbox: Default::default(), -// outbox: Default::default(), -// blocking_on: None, -// } -// }, -// ports: self.ports, -// } -// } -// } - -// impl From for SyncError { -// fn from(e: EndpointError) -> SyncError { -// SyncError::EndpointError(e) -// } -// } - -// impl ProtoSyncContext<'_> { -// fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { -// todo!() -// } -// fn new_channel(&mut self) -> [PortId; 2] { -// todo!() -// } -// } - -// impl PolyContext for BranchPContext<'_, '_> { -// type D = ProtocolD; - -// fn is_firing(&mut self, port: PortId) -> Option { -// assert!(self.ports.contains(&port)); -// let channel_id = self.m_ctx.endpoint_exts.get(port).unwrap().info.channel_id; -// let val = self.predicate.query(channel_id); -// log!( -// &mut self.m_ctx.logger, -// "!! PolyContext callback to is_firing by {:?}! returning {:?}", -// self.m_ctx.my_subtree_id, -// val, -// ); -// val -// } -// fn read_msg(&mut self, port: PortId) -> Option<&Payload> { -// assert!(self.ports.contains(&port)); -// let val = self.inbox.get(&port); -// log!( -// &mut self.m_ctx.logger, -// "!! PolyContext callback to read_msg by {:?}! returning {:?}", -// self.m_ctx.my_subtree_id, -// val, -// ); -// val -// } -// } - -////////////// - -// impl Connector { -// fn end_round_with_decision(&mut self, decision: Decision) -> Result { -// log!(&mut self.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); -// let ret = match &decision { -// Decision::Success(predicate) => { -// // overwrite MonoN/P -// self.mono_n = { -// let poly_n = self.ephemeral.poly_n.take().unwrap(); -// poly_n.choose_mono(predicate).unwrap_or_else(|| { -// panic!( -// "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", -// &predicate, &poly_n.branches, &self.logger -// ); -// }) -// }; -// self.mono_ps.clear(); -// self.mono_ps.extend( -// self.ephemeral -// .poly_ps -// .drain(..) -// .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), -// ); -// Ok(()) -// } -// Decision::Failure => Err(SyncError::Timeout), -// }; -// let announcement = CommMsgContents::Announce { decision }.into_msg(self.round_index); -// for &child_port in self.family.children_ports.iter() { -// log!( -// &mut self.logger, -// "Forwarding {:?} to child with port {:?}", -// &announcement, -// child_port -// ); -// self.endpoint_exts -// .get_mut(child_port) -// .expect("eefef") -// .endpoint -// .send(announcement.clone())?; -// } -// self.round_index += 1; -// self.ephemeral.clear(); -// ret -// } - -// // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found -// fn handle_locals_maybe_decide(&mut self) -> Result { -// if let Some(parent_port) = self.family.parent_port { -// // I have a parent -> I'm not the leader -// let parent_endpoint = -// &mut self.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; -// for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { -// let msg = CommMsgContents::Elaborate { partial_oracle }.into_msg(self.round_index); -// log!(&mut self.logger, "Sending {:?} to parent {:?}", &msg, parent_port); -// parent_endpoint.send(msg)?; -// } -// Ok(false) -// } else { -// // I have no parent -> I'm the leader -// assert!(self.family.parent_port.is_none()); -// let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); -// Ok(if let Some(predicate) = maybe_predicate { -// let decision = Decision::Success(predicate); -// log!(&mut self.logger, "DECIDE ON {:?} AS LEADER!", &decision); -// self.end_round_with_decision(decision)?; -// true -// } else { -// false -// }) -// } -// } - -// fn kick_off_native( -// &mut self, -// sync_batches: impl Iterator, -// ) -> Result { -// let MonoN { ports, .. } = self.mono_n.clone(); -// let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; -// let mut branches = HashMap::<_, _>::default(); -// for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { -// let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; -// let all_ports = ports.iter().copied(); -// let all_channel_ids = all_ports.map(port_to_channel_id); - -// let mut predicate = Predicate::new_trivial(); - -// // assign TRUE for puts and gets -// let true_ports = puts.keys().chain(gets.iter()).copied(); -// let true_channel_ids = true_ports.clone().map(port_to_channel_id); -// predicate.batch_assign_nones(true_channel_ids, true); - -// // assign FALSE for all in interface not assigned true -// predicate.batch_assign_nones(all_channel_ids.clone(), false); - -// if branches.contains_key(&predicate) { -// // TODO what do I do with redundant predicates? -// unimplemented!( -// "Duplicate predicate {:#?}!\nHaving multiple batches with the same -// predicate requires the support of oracle boolean variables", -// &predicate, -// ) -// } -// let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; -// for (port, payload) in puts { -// log!( -// &mut self.logger, -// "... ... Initial native put msg {:?} pred {:?} batch {:?}", -// &payload, -// &predicate, -// sync_batch_index, -// ); -// let msg = -// CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } -// .into_msg(*round_index); -// endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; -// } -// log!( -// &mut self.logger, -// "... Initial native branch batch index={} with pred {:?}", -// sync_batch_index, -// &predicate -// ); -// if branch.to_get.is_empty() { -// self.ephemeral.solution_storage.submit_and_digest_subtree_solution( -// &mut self.logger, -// Route::PolyN, -// predicate.clone(), -// ); -// } -// branches.insert(predicate, branch); -// } -// Ok(PolyN { ports, branches }) -// } -// pub fn sync_round( -// &mut self, -// deadline: Option, -// sync_batches: Option>, -// ) -> Result<(), SyncError> { -// if let Some(e) = self.unrecoverable_error { -// return Err(e.clone()); -// } -// self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e { -// SyncError::Timeout => e, // this isn't unrecoverable -// _ => { -// // Must set unrecoverable error! and tear down our net channels -// self.unrecoverable_error = Some(e); -// self.ephemeral.clear(); -// self.endpoint_exts = Default::default(); -// e -// } -// }) -// } - -// // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. -// // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. -// fn sync_round_inner( -// &mut self, -// mut deadline: Option, -// sync_batches: Option>, -// ) -> Result<(), SyncError> { -// log!(&mut self.logger, "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", self.round_index); -// assert!(self.ephemeral.is_clear()); -// assert!(self.unrecoverable_error.is_none()); - -// // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). -// // Some actors are dropped. some new actors are created. -// // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors -// self.ephemeral.mono_ps.extend(self.mono_ps.iter().cloned()); -// log!(&mut self.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); -// while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { -// let mut m_ctx = ProtoSyncContext { -// ports: &mut mono_p.ports, -// mono_ps: &mut self.ephemeral.mono_ps, -// inner: &mut self, -// }; -// // cross boundary into crate::protocol -// let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); -// log!(&mut self.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); -// match blocker { -// NonsyncBlocker::Inconsistent => return Err(SyncError::Inconsistent), -// NonsyncBlocker::ComponentExit => drop(mono_p), -// NonsyncBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), -// } -// } -// log!( -// &mut self.logger, -// "Finished running all MonoPs! Have {} PolyPs waiting", -// self.ephemeral.poly_ps.len() -// ); - -// // 3. define the mapping from port -> actor -// // this is needed during the event loop to determine which actor -// // should receive the incoming message. -// // TODO: store and update this mapping rather than rebuilding it each round. -// let port_to_holder: HashMap = { -// use PolyId::*; -// let n = self.mono_n.ports.iter().map(move |&e| (e, N)); -// let p = self -// .ephemeral -// .poly_ps -// .iter() -// .enumerate() -// .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); -// n.chain(p).collect() -// }; -// log!( -// &mut self.logger, -// "SET OF PolyPs and MonoPs final! port lookup map is {:?}", -// &port_to_holder -// ); - -// // 4. Create the solution storage. it tracks the solutions of "subtrees" -// // of the controller in the overlay tree. -// self.ephemeral.solution_storage.reset({ -// let n = std::iter::once(Route::PolyN); -// let m = (0..self.ephemeral.poly_ps.len()).map(|index| Route::PolyP { index }); -// let c = self.family.children_ports.iter().map(|&port| Route::ChildController { port }); -// let subtree_id_iter = n.chain(m).chain(c); -// log!( -// &mut self.logger, -// "Solution Storage has subtree Ids: {:?}", -// &subtree_id_iter.clone().collect::>() -// ); -// subtree_id_iter -// }); - -// // 5. kick off the synchronous round of the native actor if it exists - -// log!(&mut self.logger, "Kicking off native's synchronous round..."); -// self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { -// // using if let because of nested ? operator -// // TODO check that there are 1+ branches or NO SOLUTION -// let poly_n = self.kick_off_native(sync_batches)?; -// log!( -// &mut self.logger, -// "PolyN kicked off, and has branches with predicates... {:?}", -// poly_n.branches.keys().collect::>() -// ); -// Some(poly_n) -// } else { -// log!(&mut self.logger, "NO NATIVE COMPONENT"); -// None -// }; - -// // 6. Kick off the synchronous round of each protocol actor -// // If just one actor becomes inconsistent now, there can be no solution! -// // TODO distinguish between completed and not completed poly_p's? -// log!(&mut self.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); -// for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { -// let my_subtree_id = Route::PolyP { index }; -// let m_ctx = PolyPContext { -// my_subtree_id, -// inner: &mut self, -// solution_storage: &mut self.ephemeral.solution_storage, -// }; -// use SyncRunResult as Srr; -// let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; -// log!(&mut self.logger, "... PolyP's poly_run got blocker {:?}", &blocker); -// match blocker { -// Srr::NoBranches => return Err(SyncError::Inconsistent), -// Srr::AllBranchesComplete | Srr::BlockingForRecv => (), -// } -// } -// log!(&mut self.logger, "All Poly machines have been kicked off!"); - -// // 7. `solution_storage` may have new solutions for this controller -// // handle their discovery. LEADER => announce, otherwise => send to parent -// { -// let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); -// log!( -// &mut self.logger, -// "Got {} controller-local solutions before a single RECV: {:?}", -// peeked.len(), -// peeked -// ); -// } -// if self.handle_locals_maybe_decide()? { -// return Ok(()); -// } - -// // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error -// log!(&mut self.logger, "`No decision yet`. Time to recv messages"); -// self.undelay_all(); -// 'recv_loop: loop { -// log!(&mut self.logger, "`POLLING` with deadline {:?}...", deadline); -// let received = match deadline { -// None => { -// // we have personally timed out. perform a "long" poll. -// self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP") -// } -// Some(d) => match self.recv(d)? { -// // we have not yet timed out. performed a time-limited poll -// Some(received) => received, -// None => { -// // timed out! send a FAILURE message to the sink, -// // and henceforth don't time out on polling. -// deadline = None; -// match self.family.parent_port { -// None => { -// // I am the sink! announce failure and return. -// return self.end_round_with_decision(Decision::Failure); -// } -// Some(parent_port) => { -// // I am not the sink! send a failure message. -// let announcement = Msg::CommMsg(CommMsg { -// round_index: self.round_index, -// contents: CommMsgContents::Failure, -// }); -// log!( -// &mut self.logger, -// "Forwarding {:?} to parent with port {:?}", -// &announcement, -// parent_port -// ); -// self.endpoint_exts -// .get_mut(parent_port) -// .expect("ss") -// .endpoint -// .send(announcement.clone())?; -// continue; // poll some more -// } -// } -// } -// }, -// }; -// log!(&mut self.logger, "::: message {:?}...", &received); -// let current_content = match received.msg { -// Msg::SetupMsg(s) => { -// // This occurs in the event the connector was malformed during connect() -// println!("WASNT EXPECTING {:?}", s); -// return Err(SyncError::UnexpectedSetupMsg); -// } -// Msg::CommMsg(CommMsg { round_index, .. }) if round_index < self.round_index => { -// // Old message! Can safely discard -// log!(&mut self.logger, "...and its OLD! :("); -// drop(received); -// continue 'recv_loop; -// } -// Msg::CommMsg(CommMsg { round_index, .. }) if round_index > self.round_index => { -// // Message from a next round. Keep for later! -// log!(&mut self.logger, "... DELAY! :("); -// self.delay(received); -// continue 'recv_loop; -// } -// Msg::CommMsg(CommMsg { contents, round_index }) => { -// log!( -// &mut self.logger, -// "... its a round-appropriate CommMsg with port {:?}", -// received.recipient -// ); -// assert_eq!(round_index, self.round_index); -// contents -// } -// }; -// match current_content { -// CommMsgContents::Failure => match self.family.parent_port { -// Some(parent_port) => { -// let announcement = Msg::CommMsg(CommMsg { -// round_index: self.round_index, -// contents: CommMsgContents::Failure, -// }); -// log!( -// &mut self.logger, -// "Forwarding {:?} to parent with port {:?}", -// &announcement, -// parent_port -// ); -// self.endpoint_exts -// .get_mut(parent_port) -// .expect("ss") -// .endpoint -// .send(announcement.clone())?; -// } -// None => return self.end_round_with_decision(Decision::Failure), -// }, -// CommMsgContents::Elaborate { partial_oracle } => { -// // Child controller submitted a subtree solution. -// if !self.family.children_ports.contains(&received.recipient) { -// return Err(SyncError::ElaborateFromNonChild); -// } -// let subtree_id = Route::ChildController { port: received.recipient }; -// log!( -// &mut self.logger, -// "Received elaboration from child for subtree {:?}: {:?}", -// subtree_id, -// &partial_oracle -// ); -// self.ephemeral.solution_storage.submit_and_digest_subtree_solution( -// &mut self.logger, -// subtree_id, -// partial_oracle, -// ); -// if self.handle_locals_maybe_decide()? { -// return Ok(()); -// } -// } -// CommMsgContents::Announce { decision } => { -// if self.family.parent_port != Some(received.recipient) { -// return Err(SyncError::AnnounceFromNonParent); -// } -// log!( -// &mut self.logger, -// "Received ANNOUNCEMENT from from parent {:?}: {:?}", -// received.recipient, -// &decision -// ); -// return self.end_round_with_decision(decision); -// } -// CommMsgContents::SendPayload { payload_predicate, payload } => { -// // check that we expect to be able to receive payloads from this sender -// assert_eq!( -// Getter, -// self.endpoint_exts.get(received.recipient).unwrap().info.polarity -// ); - -// // message for some actor. Feed it to the appropriate actor -// // and then give them another chance to run. -// let subtree_id = port_to_holder.get(&received.recipient); -// log!( -// &mut self.logger, -// "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", -// subtree_id, -// &payload_predicate, -// &payload -// ); -// let channel_id = -// self.endpoint_exts.get(received.recipient).expect("UEHFU").info.channel_id; -// if payload_predicate.query(channel_id) != Some(true) { -// // sender didn't preserve the invariant -// return Err(SyncError::PayloadPremiseExcludesTheChannel(channel_id)); -// } -// match subtree_id { -// None => { -// // this happens when a message is sent to a component that has exited. -// // It's safe to drop this message; -// // The sender branch will certainly not be part of the solution -// } -// Some(PolyId::N) => { -// // Message for NativeMachine -// self.ephemeral.poly_n.as_mut().unwrap().sync_recv( -// received.recipient, -// &mut self.logger, -// payload, -// payload_predicate, -// &mut self.ephemeral.solution_storage, -// ); -// if self.handle_locals_maybe_decide()? { -// return Ok(()); -// } -// } -// Some(PolyId::P { index }) => { -// // Message for protocol actor -// let poly_p = &mut self.ephemeral.poly_ps[*index]; - -// let m_ctx = PolyPContext { -// my_subtree_id: Route::PolyP { index: *index }, -// inner: &mut self, -// solution_storage: &mut self.ephemeral.solution_storage, -// }; -// use SyncRunResult as Srr; -// let blocker = poly_p.poly_recv_run( -// m_ctx, -// &self.protocol_description, -// received.recipient, -// payload_predicate, -// payload, -// )?; -// log!( -// &mut self.logger, -// "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", -// subtree_id, -// blocker -// ); -// match blocker { -// Srr::NoBranches => return Err(SyncError::Inconsistent), -// Srr::BlockingForRecv | Srr::AllBranchesComplete => { -// { -// let peeked = self -// .ephemeral -// .solution_storage -// .peek_new_locals() -// .collect::>(); -// log!( -// &mut self.logger, -// "Got {} new controller-local solutions from RECV: {:?}", -// peeked.len(), -// peeked -// ); -// } -// if self.handle_locals_maybe_decide()? { -// return Ok(()); -// } -// } -// } -// } -// }; -// } -// } -// } -// } -// } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 5c605e16603e8a90263735197d5beb1a6263d255..6d98eebb584cd9b32dfde5c9b2549072669cceb8 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -275,6 +275,44 @@ impl Connector { log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } + pub fn add_component( + &mut self, + identifier: &[u8], + ports: &[PortId], + ) -> Result<(), AddComponentError> { + // called by the USER. moves ports owned by the NATIVE + use AddComponentError::*; + // 1. check if this is OK + let polarities = self.proto_description.component_polarities(identifier)?; + if polarities.len() != ports.len() { + return Err(WrongNumberOfParamaters { expected: polarities.len() }); + } + for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { + if !self.native_ports.contains(port) { + return Err(UnknownPort(*port)); + } + if expected_polarity != *self.port_info.polarities.get(port).unwrap() { + return Err(WrongPortPolarity { port: *port, expected_polarity }); + } + } + // 3. remove ports from old component & update port->route + let new_id = self.id_manager.new_proto_component_id(); + for port in ports.iter() { + self.port_info + .routes + .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); + } + self.native_ports.retain(|port| !ports.contains(port)); + // 4. add new component + self.proto_components.insert( + new_id, + ProtoComponent { + state: self.proto_description.new_main_component(identifier, ports), + ports: ports.iter().copied().collect(), + }, + ); + Ok(()) + } } impl EndpointManager { fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs index 3721383b660b9ca11709b752e6f57a2ac8f86f41..2aa51de799e5e414ae44b4391db92f44cfc1a45d 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup2.rs @@ -1,13 +1,6 @@ use crate::common::*; use crate::runtime::*; -struct LogicalChannelInfo { - local_port: PortId, - peer_port: PortId, - local_polarity: Polarity, - endpoint_index: usize, -} -/////////////// impl Connector { pub fn new_simple( proto_description: Arc, @@ -58,43 +51,6 @@ impl Connector { ConnectorPhased::Communication { .. } => Err(()), } } - pub fn add_component( - &mut self, - identifier: &[u8], - ports: &[PortId], - ) -> Result<(), AddComponentError> { - // called by the USER. moves ports owned by the NATIVE - use AddComponentError::*; - // 1. check if this is OK - let polarities = self.proto_description.component_polarities(identifier)?; - if polarities.len() != ports.len() { - return Err(WrongNumberOfParamaters { expected: polarities.len() }); - } - for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { - if !self.native_ports.contains(port) { - return Err(UnknownPort(*port)); - } - if expected_polarity != *self.port_info.polarities.get(port).unwrap() { - return Err(WrongPortPolarity { port: *port, expected_polarity }); - } - } - // 3. remove ports from old component & update port->route - let new_id = self.id_manager.new_proto_component_id(); - for port in ports.iter() { - self.port_info - .routes - .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); - } - // 4. add new component - self.proto_components.insert( - new_id, - ProtoComponent { - state: self.proto_description.new_main_component(identifier, ports), - ports: ports.iter().copied().collect(), - }, - ); - Ok(()) - } pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> { match &mut self.phased { ConnectorPhased::Communication { .. } => { diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 1caa7f2477df097bd555ac26c988250c29c98d8b..38ba52506a6eb324a609614d931f189b3d92cd98 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -179,7 +179,7 @@ fn next_batch() { } #[test] -fn native_sync() { +fn native_self_msg() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [o, i] = c.new_port_pair(); c.connect(Duration::from_secs(1)).unwrap(); @@ -189,7 +189,7 @@ fn native_sync() { } #[test] -fn native_message_pass() { +fn two_natives_msg() { let sock_addr = next_test_addr(); scope(|s| { s.spawn(|_| { @@ -210,3 +210,70 @@ fn native_message_pass() { }) .unwrap(); } + +#[test] +fn trivial_nondet() { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let [_, i] = c.new_port_pair(); + c.connect(Duration::from_secs(1)).unwrap(); + c.get(i).unwrap(); + // getting 0 batch + c.next_batch().unwrap(); + // silent 1 batch + assert_eq!(1, c.sync(Duration::from_secs(1)).unwrap()); + c.gotten(i).unwrap_err(); +} + +#[test] +fn connector_pair_nondet() { + let sock_addr = next_test_addr(); + scope(|s| { + s.spawn(|_| { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + c.connect(Duration::from_secs(1)).unwrap(); + c.next_batch().unwrap(); + c.get(g).unwrap(); + assert_eq!(1, c.sync(Duration::from_secs(1)).unwrap()); + c.gotten(g).unwrap(); + }); + s.spawn(|_| { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); + let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + c.connect(Duration::from_secs(1)).unwrap(); + c.put(p, (b"hello" as &[_]).into()).unwrap(); + c.sync(Duration::from_secs(1)).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn cannot_use_moved_ports() { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); + let [p, g] = c.new_port_pair(); + c.add_component(b"sync", &[g, p]).unwrap(); + /* + native p|-->|g sync + */ + c.connect(Duration::from_secs(1)).unwrap(); + c.put(p, (b"hello" as &[_]).into()).unwrap_err(); + c.get(g).unwrap_err(); +} + +#[test] +fn sync_sync() { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + c.add_component(b"sync", &[g0, p1]).unwrap(); + /* + native p0|-->|g0 sync + g1|<--|p1 + */ + c.connect(Duration::from_secs(1)).unwrap(); + c.put(p0, (b"hello" as &[_]).into()).unwrap(); + c.get(g1).unwrap(); + c.sync(Duration::from_secs(1)).unwrap(); + c.gotten(g1).unwrap(); +}