diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 327b5ffcdd162e0dcff5664b368d52f6c5dd9443..15b2bc12c0cb13ac0c1190ea5102044f13fbe5b9 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -11,7 +11,7 @@ use error::*; #[derive(Clone, Copy, Debug)] pub enum LocalComponentId { Native, - Proto { index: usize }, + Proto(ProtoComponentId), } #[derive(Debug, Clone, Copy)] pub enum Route { @@ -47,11 +47,16 @@ pub struct CommMsg { } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum CommMsgContents { - SendPayload { payload_predicate: Predicate, payload: Payload }, + SendPayload(SendPayloadMsg), Elaborate { partial_oracle: Predicate }, // SINKWARD Failure, // SINKWARD Announce { decision: Decision }, // SINKAWAYS } +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct SendPayloadMsg { + payload_predicate: Predicate, + payload: Payload, +} #[derive(Debug, PartialEq)] pub enum CommonSatResult { FormerNotLatter, @@ -64,16 +69,7 @@ pub struct Endpoint { inbox: Vec, stream: TcpStream, } -#[derive(Debug, Default)] -pub struct IntStream { - next: u32, -} -#[derive(Debug)] -pub struct IdManager { - controller_id: ControllerId, - port_suffix_stream: IntStream, -} -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ProtoComponent { state: ComponentState, ports: HashSet, @@ -103,6 +99,12 @@ pub struct MemInMsg { msg: Payload, } #[derive(Debug)] +pub struct IdManager { + controller_id: ControllerId, + port_suffix_stream: U32Stream, + proto_component_suffix_stream: U32Stream, +} +#[derive(Debug)] pub struct EndpointManager { // invariants: // 1. endpoint N is registered READ | WRITE with poller @@ -122,11 +124,11 @@ pub struct PortInfo { } #[derive(Debug)] pub struct Connector { - logger: Box, proto_description: Arc, + proto_components: HashMap, + logger: Box, id_manager: IdManager, native_ports: HashSet, - proto_components: Vec, port_info: PortInfo, phased: ConnectorPhased, } @@ -137,29 +139,42 @@ pub enum ConnectorPhased { surplus_sockets: u16, }, Communication { + round_index: usize, endpoint_manager: EndpointManager, neighborhood: Neighborhood, mem_inbox: Vec, - native_actor: NativeActor, // sync invariant: in Nonsync state + native_batches: Vec, + round_result: Result, SyncError>, }, } #[derive(Debug)] pub struct StringLogger(ControllerId, String); -#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +#[derive(Default, Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct Predicate { pub assigned: BTreeMap, } +#[derive(Debug, Default)] +pub struct NativeBatch { + // invariant: putters' and getters' polarities respected + to_put: HashMap, + to_get: HashSet, +} pub struct MonitoredReader { bytes: usize, r: R, } -pub struct SyncProtoContext<'a> { - connector: &'a mut Connector, - proto_component_index: usize, -} pub struct NonsyncProtoContext<'a> { - connector: &'a mut Connector, - proto_component_index: usize, + logger: &'a mut dyn Logger, + proto_component_id: ProtoComponentId, + port_info: &'a mut PortInfo, + id_manager: &'a mut IdManager, + proto_component_ports: &'a mut HashSet, + unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>, +} +pub struct SyncProtoContext<'a> { + predicate: &'a Predicate, + proto_component_id: ProtoComponentId, + inbox: HashMap, } // pub struct MonoPContext<'a> { @@ -186,72 +201,96 @@ pub struct NonsyncProtoContext<'a> { // inbox: &'r HashMap, // } -#[derive(Default)] -pub struct SolutionStorage { - old_local: HashSet, - new_local: HashSet, - // this pair acts as SubtreeId -> HashSet which is friendlier to iteration - subtree_solutions: Vec>, - subtree_id_to_index: HashMap, -} -#[derive(Debug)] -pub enum SyncRunResult { - BlockingForRecv, - AllBranchesComplete, - NoBranches, -} -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub enum PolyId { - N, - P { index: usize }, -} +// #[derive(Default)] +// pub struct SolutionStorage { +// old_local: HashSet, +// new_local: HashSet, +// // this pair acts as SubtreeId -> HashSet which is friendlier to iteration +// subtree_solutions: Vec>, +// subtree_id_to_index: HashMap, +// } +// #[derive(Debug)] +// pub enum SyncRunResult { +// BlockingForRecv, +// AllBranchesComplete, +// NoBranches, +// } +// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +// pub enum PolyId { +// N, +// P { index: usize }, +// } -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub enum SubtreeId { - PolyN, - PolyP { index: usize }, - ChildController { port: PortId }, -} -#[derive(Debug, Default)] -pub struct SyncBatch { - to_put: HashMap, - to_get: HashSet, +// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +// pub enum SubtreeId { +// PolyN, +// PolyP { index: usize }, +// ChildController { port: PortId }, +// } +// #[derive(Debug)] +// pub struct NativeBranch { +// gotten: HashMap, +// to_get: HashSet, +// } + +//////////////// +impl PortInfo { + fn firing_var_for(&self, port: PortId) -> PortId { + match self.polarities.get(&port).unwrap() { + Getter => port, + Putter => *self.peers.get(&port).unwrap(), + } + } } -#[derive(Debug)] -pub enum NativeActor { - Nonsync { - sync_result_branch: Option, // invariant: sync_result_branch.to_get.is_empty() - next_batches: Vec, // invariant: nonempty - }, - Sync { - branches: HashMap, - }, +impl IdManager { + fn new(controller_id: ControllerId) -> Self { + Self { + controller_id, + port_suffix_stream: Default::default(), + proto_component_suffix_stream: Default::default(), + } + } + fn new_port_id(&mut self) -> PortId { + Id { controller_id: self.controller_id, u32_suffix: self.port_suffix_stream.next() }.into() + } + fn new_proto_component_id(&mut self) -> ProtoComponentId { + Id { + controller_id: self.controller_id, + u32_suffix: self.proto_component_suffix_stream.next(), + } + .into() + } } -#[derive(Debug)] -pub struct NativeBranch { - batch_index: usize, - gotten: HashMap, - to_get: HashSet, +impl Connector { + pub fn new_port_pair(&mut self) -> [PortId; 2] { + // adds two new associated ports, related to each other, and exposed to the native + let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; + self.native_ports.insert(o); + self.native_ports.insert(i); + // {polarity, peer, route} known. {} unknown. + self.port_info.polarities.insert(o, Putter); + self.port_info.polarities.insert(i, Getter); + self.port_info.peers.insert(o, i); + self.port_info.peers.insert(i, o); + let route = Route::LocalComponent(LocalComponentId::Native); + self.port_info.routes.insert(o, route); + self.port_info.routes.insert(i, route); + log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); + [o, i] + } } - -//////////////// impl EndpointManager { fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { self.endpoint_exts[index].endpoint.send(msg) } - fn try_recv_any( - &mut self, - logger: &mut dyn Logger, - deadline: Instant, - ) -> Result<(usize, Msg), TryRecyAnyError> { + fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> { use TryRecyAnyError::*; // 1. try messages already buffered if let Some(x) = self.undelayed_messages.pop() { return Ok(x); } - loop { - // 2. try read a message from an enpoint that previously raised an event + // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained while let Some(index) = self.polled_undrained.pop() { let endpoint = &mut self.endpoint_exts[index].endpoint; if let Some(msg) = @@ -264,14 +303,14 @@ impl EndpointManager { return Ok((index, msg)); } } - // 3. No message yet. poll! + // 3. No message yet. Do we have enough time to poll? let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; for event in self.events.iter() { - log!(logger, "Poll event {:?}", event); let Token(index) = event.token(); self.polled_undrained.insert(index); } + self.events.clear(); } } fn undelay_all(&mut self) { @@ -331,25 +370,6 @@ impl std::fmt::Write for StringLogger { self.1.write_str(s) } } -impl IntStream { - fn next(&mut self) -> u32 { - if self.next == u32::MAX { - panic!("NO NEXT!") - } - self.next += 1; - self.next - 1 - } -} -impl IdManager { - fn next_port(&mut self) -> PortId { - let port_suffix = self.port_suffix_stream.next(); - let controller_id = self.controller_id; - PortId { controller_id, port_index: port_suffix } - } - fn new(controller_id: ControllerId) -> Self { - Self { controller_id, port_suffix_stream: Default::default() } - } -} impl Endpoint { fn try_recv(&mut self) -> Result, EndpointError> { use EndpointError::*; @@ -397,19 +417,19 @@ impl Connector { ) .unwrap(); self.get_logger().dump_log(&mut lock); - writeln!(lock, "DEBUG_PRINT:\n{:#?}\n", self).unwrap(); - } -} -impl Debug for SolutionStorage { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.pad("Solutions: [")?; - for (subtree_id, &index) in self.subtree_id_to_index.iter() { - let sols = &self.subtree_solutions[index]; - f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; - } - f.pad("]") + writeln!(lock, "\n\nDEBUG_PRINT:\n{:#?}\n", self).unwrap(); } } +// impl Debug for SolutionStorage { +// fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { +// f.pad("Solutions: [")?; +// for (subtree_id, &index) in self.subtree_id_to_index.iter() { +// let sols = &self.subtree_solutions[index]; +// f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; +// } +// f.pad("]") +// } +// } impl Predicate { // returns true IFF self.unify would return Equivalent OR FormerNotLatter @@ -526,7 +546,4 @@ impl Predicate { pub fn query(&self, x: PortId) -> Option { self.assigned.get(&x).copied() } - pub fn new_trivial() -> Self { - Self { assigned: Default::default() } - } }