diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 6d98eebb584cd9b32dfde5c9b2549072669cceb8..a6192caa1e694f193953d0b684cb38e368306f73 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,6 +1,7 @@ mod communication; +mod endpoints; pub mod error; -mod setup2; +mod setup; #[cfg(test)] mod tests; @@ -8,10 +9,6 @@ mod tests; use crate::common::*; use error::*; -#[derive( - Debug, Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] -pub struct FiringVar(PortId); #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub enum LocalComponentId { Native, @@ -81,6 +78,10 @@ pub trait Logger: Debug { fn line_writer(&mut self) -> &mut dyn std::fmt::Write; fn dump_log(&self, w: &mut dyn std::io::Write); } +#[derive(Debug)] +pub struct StringLogger(ControllerId, String); +#[derive(Debug)] +pub struct DummyLogger; #[derive(Debug, Clone)] pub struct EndpointSetup { pub sock_addr: SocketAddr, @@ -150,8 +151,6 @@ pub enum ConnectorPhased { round_result: Result)>, SyncError>, }, } -#[derive(Debug)] -pub struct StringLogger(ControllerId, String); #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct Predicate { pub assigned: BTreeMap, @@ -162,10 +161,6 @@ pub struct NativeBatch { to_put: HashMap, to_get: HashSet, } -pub struct MonitoredReader { - bytes: usize, - r: R, -} pub struct NonsyncProtoContext<'a> { logger: &'a mut dyn Logger, proto_component_id: ProtoComponentId, @@ -177,59 +172,9 @@ pub struct NonsyncProtoContext<'a> { pub struct SyncProtoContext<'a> { logger: &'a mut dyn Logger, predicate: &'a Predicate, - proto_component_id: ProtoComponentId, port_info: &'a PortInfo, inbox: &'a HashMap, } - -// pub struct MonoPContext<'a> { -// inner: &'a mut ControllerInner, -// ports: &'a mut HashSet, -// mono_ps: &'a mut Vec, -// } -// pub struct PolyPContext<'a> { -// my_subtree_id: SubtreeId, -// inner: &'a mut Connector, -// solution_storage: &'a mut SolutionStorage, -// } -// impl PolyPContext<'_> { -// #[inline(always)] -// fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { -// let Self { solution_storage, my_subtree_id, inner } = self; -// PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } -// } -// } -// struct BranchPContext<'m, 'r> { -// m_ctx: PolyPContext<'m>, -// ports: &'r HashSet, -// predicate: &'r Predicate, -// inbox: &'r HashMap, -// } - -// #[derive(Debug)] -// pub enum SyncRunResult { -// BlockingForRecv, -// AllBranchesComplete, -// NoBranches, -// } -// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -// pub enum PolyId { -// N, -// P { index: usize }, -// } - -// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -// pub enum SubtreeId { -// PolyN, -// PolyP { index: usize }, -// ChildController { port: PortId }, -// } -// #[derive(Debug)] -// pub struct NativeBranch { -// gotten: HashMap, -// to_get: HashSet, -// } - //////////////// impl PortInfo { fn firing_var_for(&self, port: PortId) -> FiringVar { @@ -314,76 +259,16 @@ impl Connector { Ok(()) } } -impl EndpointManager { - fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { - self.endpoint_exts[index].endpoint.send(msg) - } - fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> { - use TryRecyAnyError::*; - // 1. try messages already buffered - if let Some(x) = self.undelayed_messages.pop() { - return Ok(x); - } - loop { - // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained - while let Some(index) = self.polled_undrained.pop() { - let endpoint = &mut self.endpoint_exts[index].endpoint; - if let Some(msg) = - endpoint.try_recv().map_err(|error| EndpointError { error, index })? - { - if !endpoint.inbox.is_empty() { - // there may be another message waiting! - self.polled_undrained.insert(index); - } - return Ok((index, msg)); - } - } - // 3. No message yet. Do we have enough time to poll? - let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; - self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; - for event in self.events.iter() { - let Token(index) = event.token(); - self.polled_undrained.insert(index); +impl Logger for DummyLogger { + fn line_writer(&mut self) -> &mut dyn std::fmt::Write { + impl std::fmt::Write for DummyLogger { + fn write_str(&mut self, _: &str) -> Result<(), std::fmt::Error> { + Ok(()) } - self.events.clear(); - } - } - fn undelay_all(&mut self) { - if self.undelayed_messages.is_empty() { - // fast path - std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages); - return; } - // slow path - self.undelayed_messages.extend(self.delayed_messages.drain(..)); - } -} -impl Debug for Endpoint { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() - } -} -impl From for MonitoredReader { - fn from(r: R) -> Self { - Self { r, bytes: 0 } - } -} -impl MonitoredReader { - pub fn bytes_read(&self) -> usize { - self.bytes - } -} -impl Read for MonitoredReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - let n = self.r.read(buf)?; - self.bytes += n; - Ok(n) - } -} -impl Into for SetupMsg { - fn into(self) -> Msg { - Msg::SetupMsg(self) + self } + fn dump_log(&self, _: &mut dyn std::io::Write) {} } impl StringLogger { pub fn new(controller_id: ControllerId) -> Self { @@ -392,10 +277,12 @@ impl StringLogger { } impl Drop for StringLogger { fn drop(&mut self) { - let stdout = std::io::stdout(); + let stdout = std::io::stderr(); let mut lock = stdout.lock(); writeln!(lock, "--- DROP LOG DUMP ---").unwrap(); self.dump_log(&mut lock); + // lock.flush().unwrap(); + // std::thread::sleep(Duration::from_millis(50)); } } impl Logger for StringLogger { @@ -413,39 +300,6 @@ impl std::fmt::Write for StringLogger { self.1.write_str(s) } } -impl Endpoint { - fn try_recv(&mut self) -> Result, EndpointError> { - use EndpointError::*; - // populate inbox as much as possible - 'read_loop: loop { - match self.stream.read_to_end(&mut self.inbox) { - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop, - Ok(0) => break 'read_loop, - Ok(_) => (), - Err(_e) => return Err(BrokenEndpoint), - } - } - let mut monitored = MonitoredReader::from(&self.inbox[..]); - match bincode::deserialize_from(&mut monitored) { - Ok(msg) => { - let msg_size = monitored.bytes_read(); - self.inbox.drain(0..(msg_size.try_into().unwrap())); - Ok(Some(msg)) - } - Err(e) => match *e { - bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { - Ok(None) - } - _ => Err(MalformedMessage), - // println!("SERDE ERRKIND {:?}", e); - // Err(MalformedMessage) - }, - } - } - fn send(&mut self, msg: &T) -> Result<(), ()> { - bincode::serialize_into(&mut self.stream, msg).map_err(drop) - } -} impl Connector { pub fn get_logger(&self) -> &dyn Logger { &*self.logger @@ -463,16 +317,6 @@ impl Connector { writeln!(lock, "\n\nDEBUG_PRINT:\n{:#?}\n", self).unwrap(); } } -// impl Debug for SolutionStorage { -// fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { -// f.pad("Solutions: [")?; -// for (subtree_id, &index) in self.subtree_id_to_index.iter() { -// let sols = &self.subtree_solutions[index]; -// f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; -// } -// f.pad("]") -// } -// } impl Predicate { #[inline] pub fn inserted(mut self, k: FiringVar, v: bool) -> Self { @@ -565,21 +409,6 @@ impl Predicate { } } } - - // pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { - // self.assigned - // .iter() - // .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) - // } - - // pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator, value: bool) { - // for channel_id in channel_ids { - // self.assigned.entry(channel_id).or_insert(value); - // } - // } - // pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option { - // self.assigned.insert(channel_id, value) - // } pub fn union_with(&self, other: &Self) -> Option { let mut res = self.clone(); for (&channel_id, &assignment_1) in other.assigned.iter() {