diff --git a/src/common.rs b/src/common.rs index 698ac11837209e13af4a196cc69036d1c85e89e6..f64a289d9c192138cca32e3d48dabe45fab72302 100644 --- a/src/common.rs +++ b/src/common.rs @@ -31,15 +31,29 @@ pub use Polarity::*; pub type ControllerId = u32; pub type PortSuffix = u32; -// globally unique #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] -pub struct PortId { +pub struct Id { pub(crate) controller_id: ControllerId, - pub(crate) port_index: PortSuffix, + pub(crate) u32_suffix: PortSuffix, +} + +#[derive(Debug, Default)] +pub struct U32Stream { + next: u32, } +// globally unique +#[derive( + Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, +)] +pub struct PortId(Id); +#[derive( + Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, +)] +pub struct ProtoComponentId(Id); + #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] pub struct Payload(Arc>); @@ -79,6 +93,25 @@ pub enum SyncBlocker { } ///////////////////// IMPL ///////////////////// +impl U32Stream { + pub fn next(&mut self) -> u32 { + if self.next == u32::MAX { + panic!("NO NEXT!") + } + self.next += 1; + self.next - 1 + } +} +impl From for PortId { + fn from(id: Id) -> PortId { + Self(id) + } +} +impl From for ProtoComponentId { + fn from(id: Id) -> ProtoComponentId { + Self(id) + } +} impl Payload { pub fn new(len: usize) -> Payload { let mut v = Vec::with_capacity(len); @@ -137,7 +170,12 @@ impl From> for Payload { } impl Debug for PortId { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "PortId({},{})", self.controller_id, self.port_index) + write!(f, "PortId({},{})", self.0.controller_id, self.0.u32_suffix) + } +} +impl Debug for ProtoComponentId { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "ProtoComponentId({},{})", self.0.controller_id, self.0.u32_suffix) } } impl std::ops::Not for Polarity { diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 4a901047cbc4513c18fb04d3f58a511a93ae694a..a76dd76dfb6a964fb93ca42ea626d737d198f086 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -247,7 +247,7 @@ impl EvalContext<'_> { match self { EvalContext::None => unreachable!(), EvalContext::Nonsync(context) => { - let [from, to] = context.new_channel(); + let [from, to] = context.new_port_pair(); let from = Value::Output(OutputValue(from)); let to = Value::Input(InputValue(to)); return [from, to]; diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 4ddefc9f54cef6af8c49f37b6db0e5cd6cf5eb4c..3b949ca7953721d334e1fcc5b74545d9110f07d6 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -1,67 +1,153 @@ use super::*; use crate::common::*; + +//////////////// +struct BranchingNative { + branches: HashMap, +} +struct NativeBranch { + index: usize, + gotten: HashMap, + to_get: HashSet, +} + +//////////////// impl NonsyncProtoContext<'_> { - pub fn new_component(&mut self, moved_ports: HashSet, init_state: ComponentState) { - let component = &mut self.connector.proto_components[self.proto_component_index]; - assert!(component.ports.is_subset(&moved_ports)); - // let polarities = self.proto_description.component_polarities(identifier).expect("BAD "); - // if polarities.len() != ports.len() { - // return Err(WrongNumberOfParamaters { expected: polarities.len() }); - // } - // for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { - // if !self.native_ports.contains(port) { - // return Err(UnknownPort(*port)); - // } - // if expected_polarity != self.check_polarity(port) { - // return Err(WrongPortPolarity { port: *port, expected_polarity }); - // } - // } - // // ok! - // let state = self.proto_description.new_main_component(identifier, ports); - // let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state }; - // let proto_component_index = self.proto_components.len(); - // self.proto_components.push(proto_component); - // for port in ports.iter() { - // if let Polarity::Getter = self.check_polarity(port) { - // self.inp_to_route.insert( - // *port, - // InpRoute::LocalComponent(LocalComponentId::Proto { - // index: proto_component_index, - // }), - // ); - // } - // } + pub fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { + // called by a PROTO COMPONENT. moves its own ports. + // 1. sanity check: this component owns these ports + log!( + self.logger, + "Component {:?} added new component with state {:?}, moving ports {:?}", + self.proto_component_id, + &state, + &moved_ports + ); + assert!(self.proto_component_ports.is_subset(&moved_ports)); + // 2. remove ports from old component & update port->route + let new_id = self.id_manager.new_proto_component_id(); + for port in moved_ports.iter() { + self.proto_component_ports.remove(port); + self.port_info + .routes + .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); + } + // 3. create a new component + self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); } - pub fn new_channel(&mut self) -> [PortId; 2] { - self.connector.add_port_pair() + pub fn new_port_pair(&mut self) -> [PortId; 2] { + // adds two new associated ports, related to each other, and exposed to the proto component + let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; + self.proto_component_ports.insert(o); + self.proto_component_ports.insert(i); + // {polarity, peer, route} known. {} unknown. + self.port_info.polarities.insert(o, Putter); + self.port_info.polarities.insert(i, Getter); + self.port_info.peers.insert(o, i); + self.port_info.peers.insert(i, o); + let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id)); + self.port_info.routes.insert(o, route); + self.port_info.routes.insert(i, route); + log!( + self.logger, + "Component {:?} port pair (out->in) {:?} -> {:?}", + self.proto_component_id, + o, + i + ); + [o, i] } } impl SyncProtoContext<'_> { pub fn is_firing(&mut self, port: PortId) -> Option { - todo!() + self.predicate.query(port) } pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - todo!() + self.inbox.get(&port) + } +} + +impl Connector { + pub fn sync(&mut self) -> Result { + use SyncError::*; + match &mut self.phased { + ConnectorPhased::Setup { .. } => Err(NotConnected), + ConnectorPhased::Communication { native_batches, .. } => { + // 1. run all proto components to Nonsync blockers + let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = + self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); + while let Some((proto_component_id, mut proto_component)) = unrun_components.pop() { + let mut ctx = NonsyncProtoContext { + logger: &mut *self.logger, + port_info: &mut self.port_info, + id_manager: &mut self.id_manager, + proto_component_id, + unrun_components: &mut unrun_components, + proto_component_ports: &mut proto_component.ports, + }; + match proto_component.state.nonsync_run(&mut ctx, &self.proto_description) { + NonsyncBlocker::ComponentExit => drop(proto_component), + NonsyncBlocker::Inconsistent => { + return Err(InconsistentProtoComponent(proto_component_id)) + } + NonsyncBlocker::SyncBlockStart => { + self.proto_components.insert(proto_component_id, proto_component); + } + } + } + + // all ports are GETTER + let mut mem_inbox: Vec<(PortId, SendPayloadMsg)> = vec![]; + + // 2. kick off the native + let mut branching_native = BranchingNative { branches: Default::default() }; + for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate() + { + let mut predicate = Predicate::default(); + // assign trues + for &port in to_get.iter().chain(to_put.keys()) { + predicate.assigned.insert(port, true); + } + // assign falses + for &port in self.native_ports.iter() { + predicate.assigned.entry(port).or_insert(false); + } + // put all messages + for (port, payload) in to_put { + let getter = *self.port_info.peers.get(&port).unwrap(); + mem_inbox.push(( + getter, + SendPayloadMsg { payload_predicate: predicate.clone(), payload }, + )); + } + let branch = NativeBranch { index, gotten: Default::default(), to_get }; + if let Some(existing) = branching_native.branches.insert(predicate, branch) { + return Err(IndistinguishableBatches([index, existing.index])); + } + } + todo!() + } + } } } // impl Connector { // fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncError> { -// log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); +// log!(&mut self.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); // let ret = match &decision { // Decision::Success(predicate) => { // // overwrite MonoN/P -// self.inner.mono_n = { +// self.mono_n = { // let poly_n = self.ephemeral.poly_n.take().unwrap(); // poly_n.choose_mono(predicate).unwrap_or_else(|| { // panic!( // "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", -// &predicate, &poly_n.branches, &self.inner.logger +// &predicate, &poly_n.branches, &self.logger // ); // }) // }; -// self.inner.mono_ps.clear(); -// self.inner.mono_ps.extend( +// self.mono_ps.clear(); +// self.mono_ps.extend( // self.ephemeral // .poly_ps // .drain(..) @@ -71,46 +157,46 @@ impl SyncProtoContext<'_> { // } // Decision::Failure => Err(SyncError::Timeout), // }; -// let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); -// for &child_port in self.inner.family.children_ports.iter() { +// let announcement = CommMsgContents::Announce { decision }.into_msg(self.round_index); +// for &child_port in self.family.children_ports.iter() { // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Forwarding {:?} to child with port {:?}", // &announcement, // child_port // ); -// self.inner +// self // .endpoint_exts // .get_mut(child_port) // .expect("eefef") // .endpoint // .send(announcement.clone())?; // } -// self.inner.round_index += 1; +// self.round_index += 1; // self.ephemeral.clear(); // ret // } // // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found // fn handle_locals_maybe_decide(&mut self) -> Result { -// if let Some(parent_port) = self.inner.family.parent_port { +// if let Some(parent_port) = self.family.parent_port { // // I have a parent -> I'm not the leader // let parent_endpoint = -// &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; +// &mut self.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; // for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { // let msg = -// CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); -// log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port); +// CommMsgContents::Elaborate { partial_oracle }.into_msg(self.round_index); +// log!(&mut self.logger, "Sending {:?} to parent {:?}", &msg, parent_port); // parent_endpoint.send(msg)?; // } // Ok(false) // } else { // // I have no parent -> I'm the leader -// assert!(self.inner.family.parent_port.is_none()); +// assert!(self.family.parent_port.is_none()); // let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); // Ok(if let Some(predicate) = maybe_predicate { // let decision = Decision::Success(predicate); -// log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); +// log!(&mut self.logger, "DECIDE ON {:?} AS LEADER!", &decision); // self.end_round_with_decision(decision)?; // true // } else { @@ -123,7 +209,7 @@ impl SyncProtoContext<'_> { // &mut self, // sync_batches: impl Iterator, // ) -> Result { -// let MonoN { ports, .. } = self.inner.mono_n.clone(); +// let MonoN { ports, .. } = self.mono_n.clone(); // let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; // let mut branches = HashMap::<_, _>::default(); // for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { @@ -152,7 +238,7 @@ impl SyncProtoContext<'_> { // let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; // for (port, payload) in puts { // log!( -// &mut self.inner.logger, +// &mut self.logger, // "... ... Initial native put msg {:?} pred {:?} batch {:?}", // &payload, // &predicate, @@ -164,14 +250,14 @@ impl SyncProtoContext<'_> { // endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; // } // log!( -// &mut self.inner.logger, +// &mut self.logger, // "... Initial native branch batch index={} with pred {:?}", // sync_batch_index, // &predicate // ); // if branch.to_get.is_empty() { // self.ephemeral.solution_storage.submit_and_digest_subtree_solution( -// &mut self.inner.logger, +// &mut self.logger, // SubtreeId::PolyN, // predicate.clone(), // ); @@ -194,7 +280,7 @@ impl SyncProtoContext<'_> { // // Must set unrecoverable error! and tear down our net channels // self.unrecoverable_error = Some(e); // self.ephemeral.clear(); -// self.inner.endpoint_exts = Default::default(); +// self.endpoint_exts = Default::default(); // e // } // }) @@ -208,9 +294,9 @@ impl SyncProtoContext<'_> { // sync_batches: Option>, // ) -> Result<(), SyncError> { // log!( -// &mut self.inner.logger, +// &mut self.logger, // "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", -// self.inner.round_index +// self.round_index // ); // assert!(self.ephemeral.is_clear()); // assert!(self.unrecoverable_error.is_none()); @@ -218,17 +304,17 @@ impl SyncProtoContext<'_> { // // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). // // Some actors are dropped. some new actors are created. // // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors -// self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned()); -// log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); +// self.ephemeral.mono_ps.extend(self.mono_ps.iter().cloned()); +// log!(&mut self.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); // while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { // let mut m_ctx = ProtoSyncContext { // ports: &mut mono_p.ports, // mono_ps: &mut self.ephemeral.mono_ps, -// inner: &mut self.inner, +// inner: &mut self, // }; // // cross boundary into crate::protocol // let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); -// log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); +// log!(&mut self.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); // match blocker { // NonsyncBlocker::Inconsistent => return Err(SyncError::Inconsistent), // NonsyncBlocker::ComponentExit => drop(mono_p), @@ -236,7 +322,7 @@ impl SyncProtoContext<'_> { // } // } // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Finished running all MonoPs! Have {} PolyPs waiting", // self.ephemeral.poly_ps.len() // ); @@ -247,7 +333,7 @@ impl SyncProtoContext<'_> { // // TODO: store and update this mapping rather than rebuilding it each round. // let port_to_holder: HashMap = { // use PolyId::*; -// let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); +// let n = self.mono_n.ports.iter().map(move |&e| (e, N)); // let p = self // .ephemeral // .poly_ps @@ -257,7 +343,7 @@ impl SyncProtoContext<'_> { // n.chain(p).collect() // }; // log!( -// &mut self.inner.logger, +// &mut self.logger, // "SET OF PolyPs and MonoPs final! port lookup map is {:?}", // &port_to_holder // ); @@ -268,14 +354,14 @@ impl SyncProtoContext<'_> { // let n = std::iter::once(SubtreeId::PolyN); // let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); // let c = self -// .inner +// // .family // .children_ports // .iter() // .map(|&port| SubtreeId::ChildController { port }); // let subtree_id_iter = n.chain(m).chain(c); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Solution Storage has subtree Ids: {:?}", // &subtree_id_iter.clone().collect::>() // ); @@ -284,49 +370,49 @@ impl SyncProtoContext<'_> { // // 5. kick off the synchronous round of the native actor if it exists -// log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); +// log!(&mut self.logger, "Kicking off native's synchronous round..."); // self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { // // using if let because of nested ? operator // // TODO check that there are 1+ branches or NO SOLUTION // let poly_n = self.kick_off_native(sync_batches)?; // log!( -// &mut self.inner.logger, +// &mut self.logger, // "PolyN kicked off, and has branches with predicates... {:?}", // poly_n.branches.keys().collect::>() // ); // Some(poly_n) // } else { -// log!(&mut self.inner.logger, "NO NATIVE COMPONENT"); +// log!(&mut self.logger, "NO NATIVE COMPONENT"); // None // }; // // 6. Kick off the synchronous round of each protocol actor // // If just one actor becomes inconsistent now, there can be no solution! // // TODO distinguish between completed and not completed poly_p's? -// log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); +// log!(&mut self.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); // for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { // let my_subtree_id = SubtreeId::PolyP { index }; // let m_ctx = PolyPContext { // my_subtree_id, -// inner: &mut self.inner, +// inner: &mut self, // solution_storage: &mut self.ephemeral.solution_storage, // }; // use SyncRunResult as Srr; // let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; -// log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker); +// log!(&mut self.logger, "... PolyP's poly_run got blocker {:?}", &blocker); // match blocker { // Srr::NoBranches => return Err(SyncError::Inconsistent), // Srr::AllBranchesComplete | Srr::BlockingForRecv => (), // } // } -// log!(&mut self.inner.logger, "All Poly machines have been kicked off!"); +// log!(&mut self.logger, "All Poly machines have been kicked off!"); // // 7. `solution_storage` may have new solutions for this controller // // handle their discovery. LEADER => announce, otherwise => send to parent // { // let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Got {} controller-local solutions before a single RECV: {:?}", // peeked.len(), // peeked @@ -337,10 +423,10 @@ impl SyncProtoContext<'_> { // } // // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error -// log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); +// log!(&mut self.logger, "`No decision yet`. Time to recv messages"); // self.undelay_all(); // 'recv_loop: loop { -// log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline); +// log!(&mut self.logger, "`POLLING` with deadline {:?}...", deadline); // let received = match deadline { // None => { // // we have personally timed out. perform a "long" poll. @@ -353,7 +439,7 @@ impl SyncProtoContext<'_> { // // timed out! send a FAILURE message to the sink, // // and henceforth don't time out on polling. // deadline = None; -// match self.inner.family.parent_port { +// match self.family.parent_port { // None => { // // I am the sink! announce failure and return. // return self.end_round_with_decision(Decision::Failure); @@ -361,16 +447,16 @@ impl SyncProtoContext<'_> { // Some(parent_port) => { // // I am not the sink! send a failure message. // let announcement = Msg::CommMsg(CommMsg { -// round_index: self.inner.round_index, +// round_index: self.round_index, // contents: CommMsgContents::Failure, // }); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Forwarding {:?} to parent with port {:?}", // &announcement, // parent_port // ); -// self.inner +// self // .endpoint_exts // .get_mut(parent_port) // .expect("ss") @@ -382,7 +468,7 @@ impl SyncProtoContext<'_> { // } // }, // }; -// log!(&mut self.inner.logger, "::: message {:?}...", &received); +// log!(&mut self.logger, "::: message {:?}...", &received); // let current_content = match received.msg { // Msg::SetupMsg(s) => { // // This occurs in the event the connector was malformed during connect() @@ -390,45 +476,45 @@ impl SyncProtoContext<'_> { // return Err(SyncError::UnexpectedSetupMsg); // } // Msg::CommMsg(CommMsg { round_index, .. }) -// if round_index < self.inner.round_index => +// if round_index < self.round_index => // { // // Old message! Can safely discard -// log!(&mut self.inner.logger, "...and its OLD! :("); +// log!(&mut self.logger, "...and its OLD! :("); // drop(received); // continue 'recv_loop; // } // Msg::CommMsg(CommMsg { round_index, .. }) -// if round_index > self.inner.round_index => +// if round_index > self.round_index => // { // // Message from a next round. Keep for later! -// log!(&mut self.inner.logger, "... DELAY! :("); +// log!(&mut self.logger, "... DELAY! :("); // self.delay(received); // continue 'recv_loop; // } // Msg::CommMsg(CommMsg { contents, round_index }) => { // log!( -// &mut self.inner.logger, +// &mut self.logger, // "... its a round-appropriate CommMsg with port {:?}", // received.recipient // ); -// assert_eq!(round_index, self.inner.round_index); +// assert_eq!(round_index, self.round_index); // contents // } // }; // match current_content { -// CommMsgContents::Failure => match self.inner.family.parent_port { +// CommMsgContents::Failure => match self.family.parent_port { // Some(parent_port) => { // let announcement = Msg::CommMsg(CommMsg { -// round_index: self.inner.round_index, +// round_index: self.round_index, // contents: CommMsgContents::Failure, // }); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Forwarding {:?} to parent with port {:?}", // &announcement, // parent_port // ); -// self.inner +// self // .endpoint_exts // .get_mut(parent_port) // .expect("ss") @@ -439,18 +525,18 @@ impl SyncProtoContext<'_> { // }, // CommMsgContents::Elaborate { partial_oracle } => { // // Child controller submitted a subtree solution. -// if !self.inner.family.children_ports.contains(&received.recipient) { +// if !self.family.children_ports.contains(&received.recipient) { // return Err(SyncError::ElaborateFromNonChild); // } // let subtree_id = SubtreeId::ChildController { port: received.recipient }; // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Received elaboration from child for subtree {:?}: {:?}", // subtree_id, // &partial_oracle // ); // self.ephemeral.solution_storage.submit_and_digest_subtree_solution( -// &mut self.inner.logger, +// &mut self.logger, // subtree_id, // partial_oracle, // ); @@ -459,11 +545,11 @@ impl SyncProtoContext<'_> { // } // } // CommMsgContents::Announce { decision } => { -// if self.inner.family.parent_port != Some(received.recipient) { +// if self.family.parent_port != Some(received.recipient) { // return Err(SyncError::AnnounceFromNonParent); // } // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Received ANNOUNCEMENT from from parent {:?}: {:?}", // received.recipient, // &decision @@ -474,21 +560,21 @@ impl SyncProtoContext<'_> { // // check that we expect to be able to receive payloads from this sender // assert_eq!( // Getter, -// self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity +// self.endpoint_exts.get(received.recipient).unwrap().info.polarity // ); // // message for some actor. Feed it to the appropriate actor // // and then give them another chance to run. // let subtree_id = port_to_holder.get(&received.recipient); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", // subtree_id, // &payload_predicate, // &payload // ); // let channel_id = self -// .inner +// // .endpoint_exts // .get(received.recipient) // .expect("UEHFU") @@ -508,7 +594,7 @@ impl SyncProtoContext<'_> { // // Message for NativeMachine // self.ephemeral.poly_n.as_mut().unwrap().sync_recv( // received.recipient, -// &mut self.inner.logger, +// &mut self.logger, // payload, // payload_predicate, // &mut self.ephemeral.solution_storage, @@ -523,7 +609,7 @@ impl SyncProtoContext<'_> { // let m_ctx = PolyPContext { // my_subtree_id: SubtreeId::PolyP { index: *index }, -// inner: &mut self.inner, +// inner: &mut self, // solution_storage: &mut self.ephemeral.solution_storage, // }; // use SyncRunResult as Srr; @@ -535,7 +621,7 @@ impl SyncProtoContext<'_> { // payload, // )?; // log!( -// &mut self.inner.logger, +// &mut self.logger, // "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", // subtree_id, // blocker @@ -550,7 +636,7 @@ impl SyncProtoContext<'_> { // .peek_new_locals() // .collect::>(); // log!( -// &mut self.inner.logger, +// &mut self.logger, // "Got {} new controller-local solutions from RECV: {:?}", // peeked.len(), // peeked @@ -712,10 +798,10 @@ impl SyncProtoContext<'_> { // fn is_firing(&mut self, port: PortId) -> Option { // assert!(self.ports.contains(&port)); -// let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; +// let channel_id = self.m_ctx.endpoint_exts.get(port).unwrap().info.channel_id; // let val = self.predicate.query(channel_id); // log!( -// &mut self.m_ctx.inner.logger, +// &mut self.m_ctx.logger, // "!! PolyContext callback to is_firing by {:?}! returning {:?}", // self.m_ctx.my_subtree_id, // val, @@ -726,7 +812,7 @@ impl SyncProtoContext<'_> { // assert!(self.ports.contains(&port)); // let val = self.inbox.get(&port); // log!( -// &mut self.m_ctx.inner.logger, +// &mut self.m_ctx.logger, // "!! PolyContext callback to read_msg by {:?}! returning {:?}", // self.m_ctx.my_subtree_id, // val, diff --git a/src/runtime/error.rs b/src/runtime/error.rs index 0072ca449f2d0d940ad49d0341c745f5c40cf978..672a3d6554cb00feecfe4579beed4fbe9111c55b 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -1,15 +1,21 @@ use crate::common::*; +#[derive(Debug)] pub enum EndpointError { MalformedMessage, BrokenEndpoint, } +#[derive(Debug)] pub enum TryRecyAnyError { Timeout, PollFailed, EndpointError { error: EndpointError, index: usize }, BrokenEndpoint(usize), } +#[derive(Debug)] pub enum SyncError { Timeout, + NotConnected, + InconsistentProtoComponent(ProtoComponentId), + IndistinguishableBatches([usize; 2]), } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 327b5ffcdd162e0dcff5664b368d52f6c5dd9443..15b2bc12c0cb13ac0c1190ea5102044f13fbe5b9 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -11,7 +11,7 @@ use error::*; #[derive(Clone, Copy, Debug)] pub enum LocalComponentId { Native, - Proto { index: usize }, + Proto(ProtoComponentId), } #[derive(Debug, Clone, Copy)] pub enum Route { @@ -47,11 +47,16 @@ pub struct CommMsg { } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum CommMsgContents { - SendPayload { payload_predicate: Predicate, payload: Payload }, + SendPayload(SendPayloadMsg), Elaborate { partial_oracle: Predicate }, // SINKWARD Failure, // SINKWARD Announce { decision: Decision }, // SINKAWAYS } +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct SendPayloadMsg { + payload_predicate: Predicate, + payload: Payload, +} #[derive(Debug, PartialEq)] pub enum CommonSatResult { FormerNotLatter, @@ -64,16 +69,7 @@ pub struct Endpoint { inbox: Vec, stream: TcpStream, } -#[derive(Debug, Default)] -pub struct IntStream { - next: u32, -} -#[derive(Debug)] -pub struct IdManager { - controller_id: ControllerId, - port_suffix_stream: IntStream, -} -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ProtoComponent { state: ComponentState, ports: HashSet, @@ -103,6 +99,12 @@ pub struct MemInMsg { msg: Payload, } #[derive(Debug)] +pub struct IdManager { + controller_id: ControllerId, + port_suffix_stream: U32Stream, + proto_component_suffix_stream: U32Stream, +} +#[derive(Debug)] pub struct EndpointManager { // invariants: // 1. endpoint N is registered READ | WRITE with poller @@ -122,11 +124,11 @@ pub struct PortInfo { } #[derive(Debug)] pub struct Connector { - logger: Box, proto_description: Arc, + proto_components: HashMap, + logger: Box, id_manager: IdManager, native_ports: HashSet, - proto_components: Vec, port_info: PortInfo, phased: ConnectorPhased, } @@ -137,29 +139,42 @@ pub enum ConnectorPhased { surplus_sockets: u16, }, Communication { + round_index: usize, endpoint_manager: EndpointManager, neighborhood: Neighborhood, mem_inbox: Vec, - native_actor: NativeActor, // sync invariant: in Nonsync state + native_batches: Vec, + round_result: Result, SyncError>, }, } #[derive(Debug)] pub struct StringLogger(ControllerId, String); -#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +#[derive(Default, Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct Predicate { pub assigned: BTreeMap, } +#[derive(Debug, Default)] +pub struct NativeBatch { + // invariant: putters' and getters' polarities respected + to_put: HashMap, + to_get: HashSet, +} pub struct MonitoredReader { bytes: usize, r: R, } -pub struct SyncProtoContext<'a> { - connector: &'a mut Connector, - proto_component_index: usize, -} pub struct NonsyncProtoContext<'a> { - connector: &'a mut Connector, - proto_component_index: usize, + logger: &'a mut dyn Logger, + proto_component_id: ProtoComponentId, + port_info: &'a mut PortInfo, + id_manager: &'a mut IdManager, + proto_component_ports: &'a mut HashSet, + unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>, +} +pub struct SyncProtoContext<'a> { + predicate: &'a Predicate, + proto_component_id: ProtoComponentId, + inbox: HashMap, } // pub struct MonoPContext<'a> { @@ -186,72 +201,96 @@ pub struct NonsyncProtoContext<'a> { // inbox: &'r HashMap, // } -#[derive(Default)] -pub struct SolutionStorage { - old_local: HashSet, - new_local: HashSet, - // this pair acts as SubtreeId -> HashSet which is friendlier to iteration - subtree_solutions: Vec>, - subtree_id_to_index: HashMap, -} -#[derive(Debug)] -pub enum SyncRunResult { - BlockingForRecv, - AllBranchesComplete, - NoBranches, -} -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub enum PolyId { - N, - P { index: usize }, -} +// #[derive(Default)] +// pub struct SolutionStorage { +// old_local: HashSet, +// new_local: HashSet, +// // this pair acts as SubtreeId -> HashSet which is friendlier to iteration +// subtree_solutions: Vec>, +// subtree_id_to_index: HashMap, +// } +// #[derive(Debug)] +// pub enum SyncRunResult { +// BlockingForRecv, +// AllBranchesComplete, +// NoBranches, +// } +// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +// pub enum PolyId { +// N, +// P { index: usize }, +// } -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub enum SubtreeId { - PolyN, - PolyP { index: usize }, - ChildController { port: PortId }, -} -#[derive(Debug, Default)] -pub struct SyncBatch { - to_put: HashMap, - to_get: HashSet, +// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +// pub enum SubtreeId { +// PolyN, +// PolyP { index: usize }, +// ChildController { port: PortId }, +// } +// #[derive(Debug)] +// pub struct NativeBranch { +// gotten: HashMap, +// to_get: HashSet, +// } + +//////////////// +impl PortInfo { + fn firing_var_for(&self, port: PortId) -> PortId { + match self.polarities.get(&port).unwrap() { + Getter => port, + Putter => *self.peers.get(&port).unwrap(), + } + } } -#[derive(Debug)] -pub enum NativeActor { - Nonsync { - sync_result_branch: Option, // invariant: sync_result_branch.to_get.is_empty() - next_batches: Vec, // invariant: nonempty - }, - Sync { - branches: HashMap, - }, +impl IdManager { + fn new(controller_id: ControllerId) -> Self { + Self { + controller_id, + port_suffix_stream: Default::default(), + proto_component_suffix_stream: Default::default(), + } + } + fn new_port_id(&mut self) -> PortId { + Id { controller_id: self.controller_id, u32_suffix: self.port_suffix_stream.next() }.into() + } + fn new_proto_component_id(&mut self) -> ProtoComponentId { + Id { + controller_id: self.controller_id, + u32_suffix: self.proto_component_suffix_stream.next(), + } + .into() + } } -#[derive(Debug)] -pub struct NativeBranch { - batch_index: usize, - gotten: HashMap, - to_get: HashSet, +impl Connector { + pub fn new_port_pair(&mut self) -> [PortId; 2] { + // adds two new associated ports, related to each other, and exposed to the native + let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; + self.native_ports.insert(o); + self.native_ports.insert(i); + // {polarity, peer, route} known. {} unknown. + self.port_info.polarities.insert(o, Putter); + self.port_info.polarities.insert(i, Getter); + self.port_info.peers.insert(o, i); + self.port_info.peers.insert(i, o); + let route = Route::LocalComponent(LocalComponentId::Native); + self.port_info.routes.insert(o, route); + self.port_info.routes.insert(i, route); + log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); + [o, i] + } } - -//////////////// impl EndpointManager { fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { self.endpoint_exts[index].endpoint.send(msg) } - fn try_recv_any( - &mut self, - logger: &mut dyn Logger, - deadline: Instant, - ) -> Result<(usize, Msg), TryRecyAnyError> { + fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> { use TryRecyAnyError::*; // 1. try messages already buffered if let Some(x) = self.undelayed_messages.pop() { return Ok(x); } - loop { - // 2. try read a message from an enpoint that previously raised an event + // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained while let Some(index) = self.polled_undrained.pop() { let endpoint = &mut self.endpoint_exts[index].endpoint; if let Some(msg) = @@ -264,14 +303,14 @@ impl EndpointManager { return Ok((index, msg)); } } - // 3. No message yet. poll! + // 3. No message yet. Do we have enough time to poll? let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; for event in self.events.iter() { - log!(logger, "Poll event {:?}", event); let Token(index) = event.token(); self.polled_undrained.insert(index); } + self.events.clear(); } } fn undelay_all(&mut self) { @@ -331,25 +370,6 @@ impl std::fmt::Write for StringLogger { self.1.write_str(s) } } -impl IntStream { - fn next(&mut self) -> u32 { - if self.next == u32::MAX { - panic!("NO NEXT!") - } - self.next += 1; - self.next - 1 - } -} -impl IdManager { - fn next_port(&mut self) -> PortId { - let port_suffix = self.port_suffix_stream.next(); - let controller_id = self.controller_id; - PortId { controller_id, port_index: port_suffix } - } - fn new(controller_id: ControllerId) -> Self { - Self { controller_id, port_suffix_stream: Default::default() } - } -} impl Endpoint { fn try_recv(&mut self) -> Result, EndpointError> { use EndpointError::*; @@ -397,19 +417,19 @@ impl Connector { ) .unwrap(); self.get_logger().dump_log(&mut lock); - writeln!(lock, "DEBUG_PRINT:\n{:#?}\n", self).unwrap(); - } -} -impl Debug for SolutionStorage { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.pad("Solutions: [")?; - for (subtree_id, &index) in self.subtree_id_to_index.iter() { - let sols = &self.subtree_solutions[index]; - f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; - } - f.pad("]") + writeln!(lock, "\n\nDEBUG_PRINT:\n{:#?}\n", self).unwrap(); } } +// impl Debug for SolutionStorage { +// fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { +// f.pad("Solutions: [")?; +// for (subtree_id, &index) in self.subtree_id_to_index.iter() { +// let sols = &self.subtree_solutions[index]; +// f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; +// } +// f.pad("]") +// } +// } impl Predicate { // returns true IFF self.unify would return Equivalent OR FormerNotLatter @@ -526,7 +546,4 @@ impl Predicate { pub fn query(&self, x: PortId) -> Option { self.assigned.get(&x).copied() } - pub fn new_trivial() -> Self { - Self { assigned: Default::default() } - } } diff --git a/src/runtime/my_tests.rs b/src/runtime/my_tests.rs index 40d099a473fe58eebb3950ed6259e7e4fd510fd2..d59d4007dfae04c3bb2d03a627ca054e9c4d495e 100644 --- a/src/runtime/my_tests.rs +++ b/src/runtime/my_tests.rs @@ -26,27 +26,27 @@ fn simple_connector() { } #[test] -fn add_port_pair() { +fn new_port_pair() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - let [_, _] = c.add_port_pair(); - let [_, _] = c.add_port_pair(); + let [_, _] = c.new_port_pair(); + let [_, _] = c.new_port_pair(); println!("{:#?}", c); } #[test] -fn add_sync() { +fn new_sync() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - let [o, i] = c.add_port_pair(); + let [o, i] = c.new_port_pair(); c.add_component(b"sync", &[i, o]).unwrap(); println!("{:#?}", c); } #[test] -fn add_net_port() { +fn new_net_port() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let sock_addr = next_test_addr(); - let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); - let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); println!("{:#?}", c); } @@ -61,8 +61,8 @@ fn trivial_connect() { fn single_node_connect() { let sock_addr = next_test_addr(); let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); - let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); let res = c.connect(Duration::from_secs(1)); println!("{:#?}", c); c.get_logger().dump_log(&mut std::io::stdout().lock()); @@ -75,13 +75,13 @@ fn multithreaded_connect() { scope(|s| { s.spawn(|_| { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); c.connect(Duration::from_secs(1)).unwrap(); c.print_state(); }); s.spawn(|_| { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); - let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); c.connect(Duration::from_secs(1)).unwrap(); c.print_state(); }); diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs index bd6281a1ae2c776f19a4859f0958fbee6265004b..72524062d4f1c1b0b9470717abc0f9b7fae7a475 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup2.rs @@ -24,38 +24,23 @@ impl Connector { surplus_sockets: u16, ) -> Self { Self { - logger, proto_description, + proto_components: Default::default(), + logger, id_manager: IdManager::new(controller_id), native_ports: Default::default(), - proto_components: Default::default(), port_info: Default::default(), phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, } } - pub fn add_port_pair(&mut self) -> [PortId; 2] { - let route = Route::LocalComponent(LocalComponentId::Native); - let [o, i] = [self.id_manager.next_port(), self.id_manager.next_port()]; - self.native_ports.insert(o); - self.native_ports.insert(i); - // {polarity, peer, route} known. {} unknown. - self.port_info.polarities.insert(o, Putter); - self.port_info.polarities.insert(i, Getter); - self.port_info.peers.insert(o, i); - self.port_info.peers.insert(i, o); - self.port_info.routes.insert(o, route); - self.port_info.routes.insert(i, route); - log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); - [o, i] - } - pub fn add_net_port( + pub fn new_net_port( &mut self, polarity: Polarity, endpoint_setup: EndpointSetup, ) -> Result { match &mut self.phased { ConnectorPhased::Setup { endpoint_setups, .. } => { - let p = self.id_manager.next_port(); + let p = self.id_manager.new_port_id(); self.native_ports.insert(p); // {polarity, route} known. {peer} unknown. self.port_info.polarities.insert(p, polarity); @@ -72,7 +57,9 @@ impl Connector { identifier: &[u8], ports: &[PortId], ) -> Result<(), AddComponentError> { + // called by the USER. moves ports owned by the NATIVE use AddComponentError::*; + // 1. check if this is OK let polarities = self.proto_description.component_polarities(identifier)?; if polarities.len() != ports.len() { return Err(WrongNumberOfParamaters { expected: polarities.len() }); @@ -85,19 +72,21 @@ impl Connector { return Err(WrongPortPolarity { port: *port, expected_polarity }); } } - // ok! - let state = self.proto_description.new_main_component(identifier, ports); - let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state }; - let proto_component_index = self.proto_components.len(); - self.proto_components.push(proto_component); + // 3. remove ports from old component & update port->route + let new_id = self.id_manager.new_proto_component_id(); for port in ports.iter() { - if let Polarity::Getter = *self.port_info.polarities.get(port).unwrap() { - self.port_info.routes.insert( - *port, - Route::LocalComponent(LocalComponentId::Proto { index: proto_component_index }), - ); - } + self.port_info + .routes + .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); } + // 4. add new component + self.proto_components.insert( + new_id, + ProtoComponent { + state: self.proto_description.new_main_component(identifier, ports), + ports: ports.iter().copied().collect(), + }, + ); Ok(()) } pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> { @@ -110,10 +99,12 @@ impl Connector { log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout); let deadline = Instant::now() + timeout; // connect all endpoints in parallel; send and receive peer ids through ports - let mut endpoint_manager = { - let Self { logger, port_info, .. } = self; - new_endpoint_manager(&mut **logger, endpoint_setups, port_info, deadline)? - }; + let mut endpoint_manager = new_endpoint_manager( + &mut *self.logger, + endpoint_setups, + &mut self.port_info, + deadline, + )?; log!( self.logger, "Successfully connected {} endpoints", @@ -129,13 +120,12 @@ impl Connector { log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood); // TODO session optimization goes here self.phased = ConnectorPhased::Communication { + round_index: 0, endpoint_manager, neighborhood, mem_inbox: Default::default(), - native_actor: NativeActor::Nonsync { - sync_result_branch: None, - next_batches: vec![SyncBatch::default()], - }, + native_batches: vec![Default::default()], + round_result: Ok(None), }; Ok(()) } @@ -317,7 +307,7 @@ fn init_neighborhood( let mut my_leader = controller_id; em.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); match msg { S(LeaderAnnounce { leader }) => { @@ -331,7 +321,7 @@ fn init_neighborhood( S(LeaderEcho { maybe_leader }) => { use Ordering::*; match maybe_leader.cmp(&my_leader) { - Less => { /* ignore */ } + Less => { /* ignore this wave */ } Equal => { awaiting.remove(&index); if awaiting.is_empty() { @@ -339,7 +329,7 @@ fn init_neighborhood( // return the echo to my parent em.send_to(p, &S(LeaderEcho { maybe_leader }))?; } else { - // DECIDE! + // wave completed! break 'echo_loop; } } @@ -358,6 +348,7 @@ fn init_neighborhood( } else { for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() { if index2 == index { + // don't propagate echo to my parent continue; } log!(logger, "repeating echo {:?} to {:?}", &echo, index2); @@ -408,7 +399,7 @@ fn init_neighborhood( log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages); while !awaiting.is_empty() { log!(logger, "awaiting {:?}", &awaiting); - let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; match msg { S(YouAreMyParent) => { assert!(awaiting.remove(&index));