diff --git a/src/common.rs b/src/common.rs index 21f3d91b2c6977da112626c46870f0687e451248..b6186c5b499b84d41fc7a2ece31ec2618c9ba200 100644 --- a/src/common.rs +++ b/src/common.rs @@ -52,6 +52,10 @@ pub struct PortId(Id); #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] +pub struct FiringVar(pub(crate) PortId); +#[derive( + Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, +)] pub struct ProtoComponentId(Id); #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] @@ -175,7 +179,12 @@ impl From> for Payload { } impl Debug for PortId { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "PortId({},{})", self.0.controller_id, self.0.u32_suffix) + write!(f, "PID<{},{}>", self.0.controller_id, self.0.u32_suffix) + } +} +impl Debug for FiringVar { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "VID<{},{}>", (self.0).0.controller_id, (self.0).0.u32_suffix) } } impl Debug for ProtoComponentId { diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index b4991f68b89f9b470feb528b563867ed5c1436b1..87f4d4f5c6f37d018cfbcf3056c3dc17705d9a79 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -39,62 +39,6 @@ struct CyclicDrainInner<'a, K: Eq + Hash, V> { } //////////////// -impl NonsyncProtoContext<'_> { - pub fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { - // called by a PROTO COMPONENT. moves its own ports. - // 1. sanity check: this component owns these ports - log!( - self.logger, - "Component {:?} added new component with state {:?}, moving ports {:?}", - self.proto_component_id, - &state, - &moved_ports - ); - assert!(self.proto_component_ports.is_subset(&moved_ports)); - // 2. remove ports from old component & update port->route - let new_id = self.id_manager.new_proto_component_id(); - for port in moved_ports.iter() { - self.proto_component_ports.remove(port); - self.port_info - .routes - .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); - } - // 3. create a new component - self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); - } - pub fn new_port_pair(&mut self) -> [PortId; 2] { - // adds two new associated ports, related to each other, and exposed to the proto component - let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; - self.proto_component_ports.insert(o); - self.proto_component_ports.insert(i); - // {polarity, peer, route} known. {} unknown. - self.port_info.polarities.insert(o, Putter); - self.port_info.polarities.insert(i, Getter); - self.port_info.peers.insert(o, i); - self.port_info.peers.insert(i, o); - let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id)); - self.port_info.routes.insert(o, route); - self.port_info.routes.insert(i, route); - log!( - self.logger, - "Component {:?} port pair (out->in) {:?} -> {:?}", - self.proto_component_id, - o, - i - ); - [o, i] - } -} -impl SyncProtoContext<'_> { - pub fn is_firing(&mut self, port: PortId) -> Option { - let var = self.port_info.firing_var_for(port); - self.predicate.query(var) - } - pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - self.inbox.get(&port) - } -} - impl Connector { pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { use GottenError::*; @@ -634,7 +578,7 @@ impl BranchingNative { } fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap) { for (branch_predicate, branch) in self.branches { - if branch_predicate.satisfies(solution_predicate) { + if solution_predicate.satisfies(&branch_predicate) { let NativeBranch { index, gotten, .. } = branch; return (index, gotten); } @@ -642,40 +586,6 @@ impl BranchingNative { panic!("Native had no branches matching pred {:?}", solution_predicate); } } - -impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { - fn new( - input: &'a mut HashMap, - swap: &'a mut HashMap, - output: &'a mut HashMap, - ) -> Self { - Self { input, inner: CyclicDrainInner { swap, output } } - } - fn cylic_drain(self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>)) { - let Self { input, inner: CyclicDrainInner { swap, output } } = self; - // assert!(swap.is_empty()); - while !input.is_empty() { - for (k, v) in input.drain() { - func(k, v, CyclicDrainInner { swap, output }) - } - } - } -} -impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { - fn add_input(&mut self, k: K, v: V) { - self.swap.insert(k, v); - } - fn add_output(&mut self, k: K, v: V) { - self.output.insert(k, v); - } -} - -impl ProtoComponentBranch { - fn feed_msg(&mut self, getter: PortId, payload: Payload) { - let was = self.inbox.insert(getter, payload); - assert!(was.is_none()) - } -} impl BranchingProtoComponent { fn drain_branches_to_blocked( cd: CyclicDrainer, @@ -693,7 +603,6 @@ impl BranchingProtoComponent { logger, predicate: &predicate, port_info, - proto_component_id, inbox: &branch.inbox, }; let blocker = branch.state.sync_run(&mut ctx, proto_description); @@ -743,7 +652,6 @@ impl BranchingProtoComponent { assert_eq!(Some(&Putter), port_info.polarities.get(&putter)); // overwrite assignment let var = port_info.firing_var_for(putter); - let was = predicate.assigned.insert(var, true); if was == Some(false) { log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); @@ -773,24 +681,36 @@ impl BranchingProtoComponent { getter: PortId, send_payload_msg: SendPayloadMsg, ) { + log!( + logger, + "feeding proto component {:?} getter {:?} {:?}", + proto_component_id, + getter, + &send_payload_msg + ); let BranchingProtoComponent { branches, ports } = self; let mut unblocked = HashMap::default(); let mut blocked = HashMap::default(); // partition drain from branches -> {unblocked, blocked} + log!(logger, "visiting {} blocked branches...", branches.len()); for (predicate, mut branch) in branches.drain() { use CommonSatResult as Csr; + log!(logger, "visiting branch with pred {:?}", &predicate); match predicate.common_satisfier(&send_payload_msg.predicate) { Csr::Nonexistant => { // this branch does not receive the message + log!(logger, "skipping branch"); blocked.insert(predicate, branch); } Csr::Equivalent | Csr::FormerNotLatter => { // retain the existing predicate, but add this payload + log!(logger, "feeding this branch without altering its predicate"); branch.feed_msg(getter, send_payload_msg.payload.clone()); unblocked.insert(predicate, branch); } Csr::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched + log!(logger, "Forking this branch, giving it the predicate of the msg"); let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); @@ -799,6 +719,7 @@ impl BranchingProtoComponent { } Csr::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched + log!(logger, "Forking this branch with new predicate {:?}", &predicate2); let mut branch2 = branch.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); blocked.insert(predicate, branch); @@ -806,6 +727,7 @@ impl BranchingProtoComponent { } } } + log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len()); // drain from unblocked --> blocked let mut swap = HashMap::default(); let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked); @@ -821,6 +743,7 @@ impl BranchingProtoComponent { ); // swap the blocked branches back std::mem::swap(&mut blocked, branches); + log!(logger, "component settles down with branches: {:?}", branches.keys()); } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; @@ -874,11 +797,9 @@ impl SolutionStorage { self.subtree_solutions.push(Default::default()) } } - - pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { - self.new_local.iter() - } - + // pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { + // self.new_local.iter() + // } pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { let Self { old_local, new_local, .. } = self; new_local.drain().map(move |local| { @@ -886,7 +807,6 @@ impl SolutionStorage { local }) } - pub(crate) fn submit_and_digest_subtree_solution( &mut self, logger: &mut dyn Logger, @@ -911,7 +831,6 @@ impl SolutionStorage { ); } } - fn elaborate_into_new_local_rec<'a, 'b>( logger: &mut dyn Logger, partial: Predicate, @@ -942,3 +861,92 @@ impl SolutionStorage { } } } +impl SyncProtoContext<'_> { + pub fn is_firing(&mut self, port: PortId) -> Option { + let var = self.port_info.firing_var_for(port); + self.predicate.query(var) + } + pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + self.inbox.get(&port) + } +} +impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { + fn add_input(&mut self, k: K, v: V) { + self.swap.insert(k, v); + } + fn add_output(&mut self, k: K, v: V) { + self.output.insert(k, v); + } +} +impl NonsyncProtoContext<'_> { + pub fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { + // called by a PROTO COMPONENT. moves its own ports. + // 1. sanity check: this component owns these ports + log!( + self.logger, + "Component {:?} added new component with state {:?}, moving ports {:?}", + self.proto_component_id, + &state, + &moved_ports + ); + assert!(self.proto_component_ports.is_subset(&moved_ports)); + // 2. remove ports from old component & update port->route + let new_id = self.id_manager.new_proto_component_id(); + for port in moved_ports.iter() { + self.proto_component_ports.remove(port); + self.port_info + .routes + .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); + } + // 3. create a new component + self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); + } + pub fn new_port_pair(&mut self) -> [PortId; 2] { + // adds two new associated ports, related to each other, and exposed to the proto component + let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; + self.proto_component_ports.insert(o); + self.proto_component_ports.insert(i); + // {polarity, peer, route} known. {} unknown. + self.port_info.polarities.insert(o, Putter); + self.port_info.polarities.insert(i, Getter); + self.port_info.peers.insert(o, i); + self.port_info.peers.insert(i, o); + let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id)); + self.port_info.routes.insert(o, route); + self.port_info.routes.insert(i, route); + log!( + self.logger, + "Component {:?} port pair (out->in) {:?} -> {:?}", + self.proto_component_id, + o, + i + ); + [o, i] + } +} +impl ProtoComponentBranch { + fn feed_msg(&mut self, getter: PortId, payload: Payload) { + let was = self.inbox.insert(getter, payload); + assert!(was.is_none()) + } +} + +impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { + fn new( + input: &'a mut HashMap, + swap: &'a mut HashMap, + output: &'a mut HashMap, + ) -> Self { + Self { input, inner: CyclicDrainInner { swap, output } } + } + fn cylic_drain(self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>)) { + let Self { input, inner: CyclicDrainInner { swap, output } } = self; + // assert!(swap.is_empty()); + while !input.is_empty() { + for (k, v) in input.drain() { + func(k, v, CyclicDrainInner { swap, output }) + } + std::mem::swap(input, swap); + } + } +} diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs new file mode 100644 index 0000000000000000000000000000000000000000..947bbadd2915b8b24b4ff1bafc98fc26844e7297 --- /dev/null +++ b/src/runtime/endpoints.rs @@ -0,0 +1,115 @@ +use super::*; + +struct MonitoredReader { + bytes: usize, + r: R, +} + +///////////////////// + +impl Endpoint { + pub fn try_recv(&mut self) -> Result, EndpointError> { + use EndpointError::*; + // populate inbox as much as possible + 'read_loop: loop { + match self.stream.read_to_end(&mut self.inbox) { + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop, + Ok(0) => break 'read_loop, + Ok(_) => (), + Err(_e) => return Err(BrokenEndpoint), + } + } + let mut monitored = MonitoredReader::from(&self.inbox[..]); + match bincode::deserialize_from(&mut monitored) { + Ok(msg) => { + let msg_size = monitored.bytes_read(); + self.inbox.drain(0..(msg_size.try_into().unwrap())); + Ok(Some(msg)) + } + Err(e) => match *e { + bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { + Ok(None) + } + _ => Err(MalformedMessage), + // println!("SERDE ERRKIND {:?}", e); + // Err(MalformedMessage) + }, + } + } + pub fn send(&mut self, msg: &T) -> Result<(), ()> { + bincode::serialize_into(&mut self.stream, msg).map_err(drop) + } +} + +impl EndpointManager { + pub fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { + self.endpoint_exts[index].endpoint.send(msg) + } + pub fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> { + use TryRecyAnyError::*; + // 1. try messages already buffered + if let Some(x) = self.undelayed_messages.pop() { + return Ok(x); + } + loop { + // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained + while let Some(index) = self.polled_undrained.pop() { + let endpoint = &mut self.endpoint_exts[index].endpoint; + if let Some(msg) = + endpoint.try_recv().map_err(|error| EndpointError { error, index })? + { + if !endpoint.inbox.is_empty() { + // there may be another message waiting! + self.polled_undrained.insert(index); + } + return Ok((index, msg)); + } + } + // 3. No message yet. Do we have enough time to poll? + let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; + self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; + for event in self.events.iter() { + let Token(index) = event.token(); + self.polled_undrained.insert(index); + } + self.events.clear(); + } + } + pub fn undelay_all(&mut self) { + if self.undelayed_messages.is_empty() { + // fast path + std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages); + return; + } + // slow path + self.undelayed_messages.extend(self.delayed_messages.drain(..)); + } +} +impl Debug for Endpoint { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() + } +} +impl From for MonitoredReader { + fn from(r: R) -> Self { + Self { r, bytes: 0 } + } +} +impl MonitoredReader { + pub fn bytes_read(&self) -> usize { + self.bytes + } +} +impl Read for MonitoredReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + let n = self.r.read(buf)?; + self.bytes += n; + Ok(n) + } +} + +impl Into for SetupMsg { + fn into(self) -> Msg { + Msg::SetupMsg(self) + } +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 6d98eebb584cd9b32dfde5c9b2549072669cceb8..a6192caa1e694f193953d0b684cb38e368306f73 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,6 +1,7 @@ mod communication; +mod endpoints; pub mod error; -mod setup2; +mod setup; #[cfg(test)] mod tests; @@ -8,10 +9,6 @@ mod tests; use crate::common::*; use error::*; -#[derive( - Debug, Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] -pub struct FiringVar(PortId); #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub enum LocalComponentId { Native, @@ -81,6 +78,10 @@ pub trait Logger: Debug { fn line_writer(&mut self) -> &mut dyn std::fmt::Write; fn dump_log(&self, w: &mut dyn std::io::Write); } +#[derive(Debug)] +pub struct StringLogger(ControllerId, String); +#[derive(Debug)] +pub struct DummyLogger; #[derive(Debug, Clone)] pub struct EndpointSetup { pub sock_addr: SocketAddr, @@ -150,8 +151,6 @@ pub enum ConnectorPhased { round_result: Result)>, SyncError>, }, } -#[derive(Debug)] -pub struct StringLogger(ControllerId, String); #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct Predicate { pub assigned: BTreeMap, @@ -162,10 +161,6 @@ pub struct NativeBatch { to_put: HashMap, to_get: HashSet, } -pub struct MonitoredReader { - bytes: usize, - r: R, -} pub struct NonsyncProtoContext<'a> { logger: &'a mut dyn Logger, proto_component_id: ProtoComponentId, @@ -177,59 +172,9 @@ pub struct NonsyncProtoContext<'a> { pub struct SyncProtoContext<'a> { logger: &'a mut dyn Logger, predicate: &'a Predicate, - proto_component_id: ProtoComponentId, port_info: &'a PortInfo, inbox: &'a HashMap, } - -// pub struct MonoPContext<'a> { -// inner: &'a mut ControllerInner, -// ports: &'a mut HashSet, -// mono_ps: &'a mut Vec, -// } -// pub struct PolyPContext<'a> { -// my_subtree_id: SubtreeId, -// inner: &'a mut Connector, -// solution_storage: &'a mut SolutionStorage, -// } -// impl PolyPContext<'_> { -// #[inline(always)] -// fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { -// let Self { solution_storage, my_subtree_id, inner } = self; -// PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } -// } -// } -// struct BranchPContext<'m, 'r> { -// m_ctx: PolyPContext<'m>, -// ports: &'r HashSet, -// predicate: &'r Predicate, -// inbox: &'r HashMap, -// } - -// #[derive(Debug)] -// pub enum SyncRunResult { -// BlockingForRecv, -// AllBranchesComplete, -// NoBranches, -// } -// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -// pub enum PolyId { -// N, -// P { index: usize }, -// } - -// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -// pub enum SubtreeId { -// PolyN, -// PolyP { index: usize }, -// ChildController { port: PortId }, -// } -// #[derive(Debug)] -// pub struct NativeBranch { -// gotten: HashMap, -// to_get: HashSet, -// } - //////////////// impl PortInfo { fn firing_var_for(&self, port: PortId) -> FiringVar { @@ -314,76 +259,16 @@ impl Connector { Ok(()) } } -impl EndpointManager { - fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { - self.endpoint_exts[index].endpoint.send(msg) - } - fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> { - use TryRecyAnyError::*; - // 1. try messages already buffered - if let Some(x) = self.undelayed_messages.pop() { - return Ok(x); - } - loop { - // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained - while let Some(index) = self.polled_undrained.pop() { - let endpoint = &mut self.endpoint_exts[index].endpoint; - if let Some(msg) = - endpoint.try_recv().map_err(|error| EndpointError { error, index })? - { - if !endpoint.inbox.is_empty() { - // there may be another message waiting! - self.polled_undrained.insert(index); - } - return Ok((index, msg)); - } - } - // 3. No message yet. Do we have enough time to poll? - let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; - self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; - for event in self.events.iter() { - let Token(index) = event.token(); - self.polled_undrained.insert(index); +impl Logger for DummyLogger { + fn line_writer(&mut self) -> &mut dyn std::fmt::Write { + impl std::fmt::Write for DummyLogger { + fn write_str(&mut self, _: &str) -> Result<(), std::fmt::Error> { + Ok(()) } - self.events.clear(); - } - } - fn undelay_all(&mut self) { - if self.undelayed_messages.is_empty() { - // fast path - std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages); - return; } - // slow path - self.undelayed_messages.extend(self.delayed_messages.drain(..)); - } -} -impl Debug for Endpoint { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() - } -} -impl From for MonitoredReader { - fn from(r: R) -> Self { - Self { r, bytes: 0 } - } -} -impl MonitoredReader { - pub fn bytes_read(&self) -> usize { - self.bytes - } -} -impl Read for MonitoredReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - let n = self.r.read(buf)?; - self.bytes += n; - Ok(n) - } -} -impl Into for SetupMsg { - fn into(self) -> Msg { - Msg::SetupMsg(self) + self } + fn dump_log(&self, _: &mut dyn std::io::Write) {} } impl StringLogger { pub fn new(controller_id: ControllerId) -> Self { @@ -392,10 +277,12 @@ impl StringLogger { } impl Drop for StringLogger { fn drop(&mut self) { - let stdout = std::io::stdout(); + let stdout = std::io::stderr(); let mut lock = stdout.lock(); writeln!(lock, "--- DROP LOG DUMP ---").unwrap(); self.dump_log(&mut lock); + // lock.flush().unwrap(); + // std::thread::sleep(Duration::from_millis(50)); } } impl Logger for StringLogger { @@ -413,39 +300,6 @@ impl std::fmt::Write for StringLogger { self.1.write_str(s) } } -impl Endpoint { - fn try_recv(&mut self) -> Result, EndpointError> { - use EndpointError::*; - // populate inbox as much as possible - 'read_loop: loop { - match self.stream.read_to_end(&mut self.inbox) { - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop, - Ok(0) => break 'read_loop, - Ok(_) => (), - Err(_e) => return Err(BrokenEndpoint), - } - } - let mut monitored = MonitoredReader::from(&self.inbox[..]); - match bincode::deserialize_from(&mut monitored) { - Ok(msg) => { - let msg_size = monitored.bytes_read(); - self.inbox.drain(0..(msg_size.try_into().unwrap())); - Ok(Some(msg)) - } - Err(e) => match *e { - bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { - Ok(None) - } - _ => Err(MalformedMessage), - // println!("SERDE ERRKIND {:?}", e); - // Err(MalformedMessage) - }, - } - } - fn send(&mut self, msg: &T) -> Result<(), ()> { - bincode::serialize_into(&mut self.stream, msg).map_err(drop) - } -} impl Connector { pub fn get_logger(&self) -> &dyn Logger { &*self.logger @@ -463,16 +317,6 @@ impl Connector { writeln!(lock, "\n\nDEBUG_PRINT:\n{:#?}\n", self).unwrap(); } } -// impl Debug for SolutionStorage { -// fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { -// f.pad("Solutions: [")?; -// for (subtree_id, &index) in self.subtree_id_to_index.iter() { -// let sols = &self.subtree_solutions[index]; -// f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; -// } -// f.pad("]") -// } -// } impl Predicate { #[inline] pub fn inserted(mut self, k: FiringVar, v: bool) -> Self { @@ -565,21 +409,6 @@ impl Predicate { } } } - - // pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { - // self.assigned - // .iter() - // .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) - // } - - // pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator, value: bool) { - // for channel_id in channel_ids { - // self.assigned.entry(channel_id).or_insert(value); - // } - // } - // pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option { - // self.assigned.insert(channel_id, value) - // } pub fn union_with(&self, other: &Self) -> Option { let mut res = self.clone(); for (&channel_id, &assignment_1) in other.assigned.iter() { diff --git a/src/runtime/setup2.rs b/src/runtime/setup.rs similarity index 99% rename from src/runtime/setup2.rs rename to src/runtime/setup.rs index 2aa51de799e5e414ae44b4391db92f44cfc1a45d..f6233c1b4a40ea2bfdd0b6239a8fb31401f46eb1 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup.rs @@ -7,6 +7,7 @@ impl Connector { controller_id: ControllerId, ) -> Self { let logger = Box::new(StringLogger::new(controller_id)); + // let logger = Box::new(DummyLogger); let surplus_sockets = 8; Self::new(logger, proto_description, controller_id, surplus_sockets) } diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 38ba52506a6eb324a609614d931f189b3d92cd98..89eec1b24bdaf8969138bb3edabbbf96f9ac38d6 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -21,8 +21,7 @@ lazy_static::lazy_static! { #[test] fn simple_connector() { - let c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - println!("{:#?}", c); + Connector::new_simple(MINIMAL_PROTO.clone(), 0); } #[test] @@ -30,7 +29,6 @@ fn new_port_pair() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [_, _] = c.new_port_pair(); let [_, _] = c.new_port_pair(); - println!("{:#?}", c); } #[test] @@ -38,7 +36,6 @@ fn new_sync() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let [o, i] = c.new_port_pair(); c.add_component(b"sync", &[i, o]).unwrap(); - println!("{:#?}", c); } #[test] @@ -47,14 +44,12 @@ fn new_net_port() { let sock_addr = next_test_addr(); let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); - println!("{:#?}", c); } #[test] fn trivial_connect() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); c.connect(Duration::from_secs(1)).unwrap(); - println!("{:#?}", c); } #[test] @@ -63,10 +58,7 @@ fn single_node_connect() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); - let res = c.connect(Duration::from_secs(1)); - println!("{:#?}", c); - c.get_logger().dump_log(&mut std::io::stdout().lock()); - res.unwrap(); + c.connect(Duration::from_secs(1)).unwrap(); } #[test] @@ -77,13 +69,11 @@ fn multithreaded_connect() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); c.connect(Duration::from_secs(1)).unwrap(); - c.print_state(); }); s.spawn(|_| { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); c.connect(Duration::from_secs(1)).unwrap(); - c.print_state(); }); }) .unwrap(); @@ -119,7 +109,6 @@ fn trivial_sync() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); c.connect(Duration::from_secs(1)).unwrap(); c.sync(Duration::from_secs(1)).unwrap(); - c.print_state(); } #[test] @@ -250,12 +239,12 @@ fn connector_pair_nondet() { #[test] fn cannot_use_moved_ports() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); - let [p, g] = c.new_port_pair(); - c.add_component(b"sync", &[g, p]).unwrap(); /* native p|-->|g sync */ + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); + let [p, g] = c.new_port_pair(); + c.add_component(b"sync", &[g, p]).unwrap(); c.connect(Duration::from_secs(1)).unwrap(); c.put(p, (b"hello" as &[_]).into()).unwrap_err(); c.get(g).unwrap_err(); @@ -263,17 +252,103 @@ fn cannot_use_moved_ports() { #[test] fn sync_sync() { - let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); - let [p0, g0] = c.new_port_pair(); - let [p1, g1] = c.new_port_pair(); - c.add_component(b"sync", &[g0, p1]).unwrap(); /* native p0|-->|g0 sync g1|<--|p1 */ + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + c.add_component(b"sync", &[g0, p1]).unwrap(); c.connect(Duration::from_secs(1)).unwrap(); c.put(p0, (b"hello" as &[_]).into()).unwrap(); c.get(g1).unwrap(); c.sync(Duration::from_secs(1)).unwrap(); c.gotten(g1).unwrap(); } + +#[test] +fn double_net_connect() { + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 2); + let [_p, _g] = [ + c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[0], is_active: true }) + .unwrap(), + c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[1], is_active: true }) + .unwrap(), + ]; + c.connect(Duration::from_secs(1)).unwrap(); + }); + s.spawn(|_| { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 3); + let [_g, _p] = [ + c.new_net_port( + Getter, + EndpointSetup { sock_addr: sock_addrs[0], is_active: false }, + ) + .unwrap(), + c.new_net_port( + Putter, + EndpointSetup { sock_addr: sock_addrs[1], is_active: false }, + ) + .unwrap(), + ]; + c.connect(Duration::from_secs(1)).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn distributed_msg_bounce() { + /* + native[0] | sync 0.p|-->|1.p native[1] + 0.g|<--|1.g + */ + let sock_addrs = [next_test_addr(), next_test_addr()]; + scope(|s| { + s.spawn(|_| { + /* + native | sync p|--> + | g|<-- + */ + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 4); + let [p, g] = [ + c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[0], is_active: true }) + .unwrap(), + c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[1], is_active: true }) + .unwrap(), + ]; + c.add_component(b"sync", &[g, p]).unwrap(); + c.connect(Duration::from_secs(1)).unwrap(); + c.sync(Duration::from_secs(1)).unwrap(); + }); + s.spawn(|_| { + /* + native p|--> + g|<-- + */ + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 5); + let [g, p] = [ + c.new_net_port( + Getter, + EndpointSetup { sock_addr: sock_addrs[0], is_active: false }, + ) + .unwrap(), + c.new_net_port( + Putter, + EndpointSetup { sock_addr: sock_addrs[1], is_active: false }, + ) + .unwrap(), + ]; + c.connect(Duration::from_secs(1)).unwrap(); + c.put(p, (b"hello" as &[_]).into()).unwrap(); + c.get(g).unwrap(); + c.sync(Duration::from_secs(1)).unwrap(); + c.gotten(g).unwrap(); + }); + }) + .unwrap(); +}