From 330b9c117fa53f5bbb0c89f341ea0d03ba9754b3 2020-09-17 14:27:27 From: Christopher Esterhuyse Date: 2020-09-17 14:27:27 Subject: [PATCH] started refactor (safe state). goals: clarify ownership. make cu immutable while round is fallable. distinguish routing from ownership --- diff --git a/examples/bench_3/getter.c b/examples/bench_3/getter.c index 7e9de8a9e4c753780e21df16a34ac328b7a21452..47b760d78abc674552a4e0a4c0f383c1603d7d17 100644 --- a/examples/bench_3/getter.c +++ b/examples/bench_3/getter.c @@ -7,7 +7,7 @@ int main(int argc, char** argv) { rw_err_peek(c); PortId getter; - FfiSocketAddr addr = {{192, 168, 1, 124}, 8009}; + FfiSocketAddr addr = {{127, 0, 0, 1}, 8001}; rw_err_peek(c); connector_add_net_port(c, &getter, addr, Polarity_Getter, EndpointPolarity_Passive); connector_connect(c, -1); diff --git a/examples/bench_3/putter.c b/examples/bench_3/putter.c index 4d08f71ca2e9a68570db242a08d39b62619a5db0..3103eb026578b83551f15ad17b9f45fb0582b9f7 100644 --- a/examples/bench_3/putter.c +++ b/examples/bench_3/putter.c @@ -7,7 +7,7 @@ int main(int argc, char** argv) { rw_err_peek(c); PortId putter; - FfiSocketAddr addr = {{192, 168, 1, 124}, 8009}; + FfiSocketAddr addr = {{127, 0, 0, 1}, 8001}; rw_err_peek(c); connector_add_net_port(c, &putter, addr, Polarity_Putter, EndpointPolarity_Active); connector_connect(c, -1); diff --git a/src/common.rs b/src/common.rs index 4a9fc999d47977834f69e25ca118e5462a7356d5..dbdbdc02169b6deb0cd7dff6225728b2affb08f1 100644 --- a/src/common.rs +++ b/src/common.rs @@ -34,8 +34,8 @@ pub type U32Suffix = u32; #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] -// acquired via error in the Rust API -pub struct ProtoComponentId(Id); +// pub, because it can be acquired via error in the Rust API +pub struct ComponentId(Id); #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] @@ -99,7 +99,7 @@ impl IdParts for PortId { self.0.id_parts() } } -impl IdParts for ProtoComponentId { +impl IdParts for ComponentId { fn id_parts(self) -> (ConnectorId, U32Suffix) { self.0.id_parts() } @@ -122,8 +122,8 @@ impl From for PortId { Self(id) } } -impl From for ProtoComponentId { - fn from(id: Id) -> ProtoComponentId { +impl From for ComponentId { + fn from(id: Id) -> Self { Self(id) } } @@ -184,7 +184,7 @@ impl Debug for PortId { write!(f, "pid{}_{}", a, b) } } -impl Debug for ProtoComponentId { +impl Debug for ComponentId { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { let (a, b) = self.id_parts(); write!(f, "cid{}_{}", a, b) diff --git a/src/macros.rs b/src/macros.rs index ec1917ee0461819d67d420c31e395d5bc7dcac36..ae727cd4f73bb7e2625a5aebd65d4ad43ac0d84e 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -4,6 +4,11 @@ Change the definition of these macros to control the logging level statically macro_rules! log { (@BENCH, $logger:expr, $($arg:tt)*) => {{ + // if let Some(w) = $logger.line_writer() { + // let _ = writeln!(w, $($arg)*); + // } + }}; + (@MARK, $logger:expr, $($arg:tt)*) => {{ if let Some(w) = $logger.line_writer() { let _ = writeln!(w, $($arg)*); } diff --git a/src/protocol/eval.rs b/src/protocol/eval.rs index 98ac5816ce85366414d4bd5075963fe5bd1b81eb..4e77eec893d793d355ee1b192726e82ebbbe8577 100644 --- a/src/protocol/eval.rs +++ b/src/protocol/eval.rs @@ -1533,6 +1533,7 @@ impl Store { BinaryOperator::GreaterThan => Ok(left.gt(&right)), BinaryOperator::GreaterThanEqual => Ok(left.gte(&right)), BinaryOperator::Remainder => Ok(left.modulus(&right)), + BinaryOperator::Add => Ok(left.plus(&right)), _ => unimplemented!("{:?}", expr.operation), } } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 09c7fb9545a9b9275a89efef775b0cb6370d58de..22834bbf2cc17c68fe44c83e4de4b5ccd0f0a8c2 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -203,6 +203,8 @@ impl Connector { ////////////////////////////////// use SyncError as Se; ////////////////////////////////// + + log!(@MARK, cu.inner.logger, "sync start {}", comm.round_index); log!( cu.inner.logger, "~~~ SYNC called with timeout {:?}; starting round {}", @@ -212,15 +214,17 @@ impl Connector { log!(@BENCH, cu.inner.logger, ""); // 1. run all proto components to Nonsync blockers - // NOTE: original components are immutable until Decision::Success + // iterate let mut branching_proto_components = - HashMap::::default(); - let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = - cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); + HashMap::::default(); + let mut unrun_components: Vec<(ComponentId, ProtoComponent)> = cu + .proto_components + .iter() + .map(|(&proto_id, proto)| (proto_id, proto.clone())) + .collect(); log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len()); // drains unrun_components, and populates branching_proto_components. while let Some((proto_component_id, mut component)) = unrun_components.pop() { - // TODO coalesce fields log!( cu.inner.logger, "Nonsync running proto component with ID {:?}. {} to go after this", @@ -264,11 +268,8 @@ impl Connector { // Create temp structures needed for the synchronous phase of the round let mut rctx = RoundCtx { solution_storage: { - let n = std::iter::once(SubtreeId::LocalComponent(ComponentId::Native)); - let c = cu - .proto_components - .keys() - .map(|&id| SubtreeId::LocalComponent(ComponentId::Proto(id))); + let n = std::iter::once(SubtreeId::LocalComponent(cu.inner.native_component_id)); + let c = cu.proto_components.keys().map(|&cid| SubtreeId::LocalComponent(cid)); let e = comm .neighborhood .children @@ -350,7 +351,7 @@ impl Connector { ); rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.inner.logger, - SubtreeId::LocalComponent(ComponentId::Native), + SubtreeId::LocalComponent(cu.inner.native_component_id), predicate.clone(), ); } @@ -371,6 +372,7 @@ impl Connector { &mut branching_proto_components, &mut rctx, )?; + log!(@MARK, cu.inner.logger, "got decision!"); log!(cu.inner.logger, "Committing to decision {:?}!", &decision); log!(@BENCH, cu.inner.logger, ""); comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?; @@ -388,6 +390,7 @@ impl Connector { &msg, &comm.neighborhood.children ); + log!(@MARK, cu.inner.logger, "forwarding decision!"); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_comms(child, &msg)?; } @@ -423,9 +426,10 @@ impl Connector { cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, branching_native: &mut BranchingNative, - branching_proto_components: &mut HashMap, + branching_proto_components: &mut HashMap, rctx: &mut RoundCtx, ) -> Result { + log!(@MARK, cu.inner.logger, "decide start"); let mut already_requested_failure = false; if branching_native.branches.is_empty() { log!(cu.inner.logger, "Native starts with no branches! Failure!"); @@ -443,8 +447,6 @@ impl Connector { } } } - log!(cu.inner.logger, "Done translating native batches into branches"); - let mut pcb_temps_owner = <[HashMap; 3]>::default(); let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner); let mut bn_temp_owner = >::default(); @@ -498,6 +500,7 @@ impl Connector { rctx.getter_buffer.len() ); while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() { + log!(@MARK, cu.inner.logger, "handling payload msg"); assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); let route = cu.inner.port_info.routes.get(&getter); log!( @@ -517,38 +520,35 @@ impl Connector { udp_endpoint_ext.outgoing_payloads.insert(predicate, payload); } Some(Route::NetEndpoint { index }) => { + log!(@MARK, cu.inner.logger, "sending payload"); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, contents: CommMsgContents::SendPayload(send_payload_msg), }); comm.endpoint_manager.send_to_comms(*index, &msg)?; } - Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg( - cu, - rctx, - getter, - &send_payload_msg, - MapTempGuard::new(&mut bn_temp_owner), - ), - Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => { - if let Some(branching_component) = - branching_proto_components.get_mut(proto_component_id) - { - let proto_component_id = *proto_component_id; + Some(Route::LocalComponent(cid)) if *cid == cu.inner.native_component_id => { + branching_native.feed_msg( + cu, + rctx, + getter, + &send_payload_msg, + MapTempGuard::new(&mut bn_temp_owner), + ) + } + Some(Route::LocalComponent(cid)) => { + if let Some(branching_component) = branching_proto_components.get_mut(cid) { + let cid = *cid; branching_component.feed_msg( cu, rctx, - proto_component_id, + cid, getter, &send_payload_msg, pcb_temps.reborrow(), )?; if branching_component.branches.is_empty() { - log!( - cu.inner.logger, - "{:?} has become inconsistent!", - proto_component_id - ); + log!(cu.inner.logger, "{:?} has become inconsistent!", cid); if let Some(parent) = comm.neighborhood.parent { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? @@ -577,6 +577,7 @@ impl Connector { log!(cu.inner.logger, "Check if we have any local decisions..."); for solution in rctx.solution_storage.iter_new_local_make_old() { log!(cu.inner.logger, "New local decision with solution {:?}...", &solution); + log!(@MARK, cu.inner.logger, "local solution"); match comm.neighborhood.parent { Some(parent) => { log!(cu.inner.logger, "Forwarding to my parent {:?}", parent); @@ -743,7 +744,7 @@ impl BranchingNative { &predicate, &branch.gotten ); - let subtree_id = SubtreeId::LocalComponent(ComponentId::Native); + let subtree_id = SubtreeId::LocalComponent(cu.inner.native_component_id); round_ctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.inner.logger, subtree_id, @@ -869,7 +870,7 @@ impl BranchingProtoComponent { cd: CyclicDrainer, cu: &mut ConnectorUnphased, rctx: &mut RoundCtx, - proto_component_id: ProtoComponentId, + proto_component_id: ComponentId, ports: &HashSet, ) -> Result<(), UnrecoverableSyncError> { cd.cyclic_drain(|mut predicate, mut branch, mut drainer| { @@ -918,13 +919,15 @@ impl BranchingProtoComponent { let var = cu.inner.port_info.spec_var_for(putter); let was = predicate.assigned.insert(var, SpecVal::FIRING); if was == Some(SpecVal::SILENT) { - log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); + log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", + proto_component_id, putter, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" branch.inner.did_put_or_get.insert(putter); - log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); + log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", + proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; rctx.getter_buffer.putter_add(cu, putter, msg); drainer.add_input(predicate, branch); @@ -938,14 +941,15 @@ impl BranchingProtoComponent { let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); let did_fire = val == SpecVal::FIRING; if did_fire != should_have_fired { - log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); + log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}" + port, var, val, did_fire, should_have_fired); // IMPLICIT inconsistency drop((predicate, branch)); return Ok(()); } } // submit solution for this component - let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id)); + let subtree_id = SubtreeId::LocalComponent(proto_component_id); rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.inner.logger, subtree_id, @@ -973,7 +977,7 @@ impl BranchingProtoComponent { &mut self, cu: &mut ConnectorUnphased, rctx: &mut RoundCtx, - proto_component_id: ProtoComponentId, + proto_component_id: ComponentId, getter: PortId, send_payload_msg: &SendPayloadMsg, pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>, @@ -1218,19 +1222,6 @@ impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { fn add_input(&mut self, k: K, v: V) { self.swap.insert(k, v); } - // fn merge_input_with V>(&mut self, k: K, v: V, mut func: F) { - // use std::collections::hash_map::Entry; - // let e = self.swap.entry(k); - // match e { - // Entry::Vacant(ev) => { - // ev.insert(v); - // } - // Entry::Occupied(mut eo) => { - // let old = eo.get_mut(); - // *old = func(v, old); - // } - // } - // } fn add_output(&mut self, k: K, v: V) { self.output.insert(k, v); } @@ -1246,15 +1237,13 @@ impl NonsyncProtoContext<'_> { &state, &moved_ports ); - assert!(self.proto_component_ports.is_subset(&moved_ports)); + println!("MOVED PORTS {:#?}. got {:#?}", &moved_ports, &self.proto_component_ports); + assert!(self.proto_component_ports.is_superset(&moved_ports)); // 2. remove ports from old component & update port->route - let new_id = self.cu_inner.id_manager.new_proto_component_id(); + let new_id = self.cu_inner.id_manager.new_component_id(); for port in moved_ports.iter() { self.proto_component_ports.remove(port); - self.cu_inner - .port_info - .routes - .insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); + self.cu_inner.port_info.routes.insert(*port, Route::LocalComponent(new_id)); } // 3. create a new component self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); @@ -1270,7 +1259,7 @@ impl NonsyncProtoContext<'_> { self.cu_inner.port_info.polarities.insert(i, Getter); self.cu_inner.port_info.peers.insert(o, i); self.cu_inner.port_info.peers.insert(i, o); - let route = Route::LocalComponent(ComponentId::Proto(self.proto_component_id)); + let route = Route::LocalComponent(self.proto_component_id); self.cu_inner.port_info.routes.insert(o, route); self.cu_inner.port_info.routes.insert(i, route); log!( diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index abce03511bc17e7038a05971f2fac31f116fd564..661b55a5a5dd05e9992fd89087e29bb40cf879d3 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -73,7 +73,9 @@ impl NetEndpoint { use NetEndpointError as Nee; Self::bincode_opts() .serialize_into(&mut self.stream, msg) - .map_err(|_| Nee::BrokenNetEndpoint) + .map_err(|_| Nee::BrokenNetEndpoint)?; + let _ = self.stream.flush(); + Ok(()) } } @@ -379,7 +381,19 @@ impl EndpointManager { } impl Debug for NetEndpoint { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() + struct DebugStream<'a>(&'a TcpStream); + impl Debug for DebugStream<'_> { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("Endpoint") + .field("local_addr", &self.0.local_addr()) + .field("peer_addr", &self.0.peer_addr()) + .finish() + } + } + f.debug_struct("Endpoint") + .field("inbox", &self.inbox) + .field("stream", &DebugStream(&self.stream)) + .finish() } } impl From for MonitoredReader { diff --git a/src/runtime/error.rs b/src/runtime/error.rs index 16322977191072c35e6259929d78cf68be9fb515..231fa8a7e0d0794c4cac69f1bb886e6d478407e9 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -34,7 +34,7 @@ pub enum UnrecoverableSyncError { #[derive(Debug, Clone)] pub enum SyncError { NotConnected, - InconsistentProtoComponent(ProtoComponentId), + InconsistentProtoComponent(ComponentId), RoundFailure, Unrecoverable(UnrecoverableSyncError), } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 91b233b0363546af1d305a65d0f98362829e89a4..b089a69e4a8f084681e49dbd1438dc383614e707 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -32,8 +32,8 @@ pub struct FileLogger(ConnectorId, std::fs::File); pub(crate) struct NonsyncProtoContext<'a> { cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds proto_component_ports: &'a mut HashSet, // sub-structure of component - unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>, // lives for Nonsync phase - proto_component_id: ProtoComponentId, // KEY in id->component map + unrun_components: &'a mut Vec<(ComponentId, ProtoComponent)>, // lives for Nonsync phase + proto_component_id: ComponentId, // KEY in id->component map } pub(crate) struct SyncProtoContext<'a> { cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds @@ -65,11 +65,6 @@ struct VecSet { vec: Vec, } #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -enum ComponentId { - Native, - Proto(ProtoComponentId), -} -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] enum Route { LocalComponent(ComponentId), NetEndpoint { index: usize }, @@ -109,7 +104,7 @@ struct SessionInfo { serde_proto_description: SerdeProtocolDescription, port_info: PortInfo, endpoint_incoming_to_getter: Vec, - proto_components: HashMap, + proto_components: HashMap, } #[derive(Debug, Clone)] struct SerdeProtocolDescription(Arc); @@ -184,9 +179,8 @@ struct Neighborhood { struct IdManager { connector_id: ConnectorId, port_suffix_stream: U32Stream, - proto_component_suffix_stream: U32Stream, + component_suffix_stream: U32Stream, } -#[derive(Debug)] struct UdpInBuffer { byte_vec: Vec, } @@ -215,6 +209,7 @@ struct EndpointStore { } #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] struct PortInfo { + owners: HashMap, polarities: HashMap, peers: HashMap, routes: HashMap, @@ -230,7 +225,7 @@ struct ConnectorCommunication { #[derive(Debug)] struct ConnectorUnphased { proto_description: Arc, - proto_components: HashMap, + proto_components: HashMap, inner: ConnectorUnphasedInner, } #[derive(Debug)] @@ -239,6 +234,7 @@ struct ConnectorUnphasedInner { id_manager: IdManager, native_ports: HashSet, port_info: PortInfo, + native_component_id: ComponentId, } #[derive(Debug)] struct ConnectorSetup { @@ -350,7 +346,7 @@ impl IdManager { Self { connector_id, port_suffix_stream: Default::default(), - proto_component_suffix_stream: Default::default(), + component_suffix_stream: Default::default(), } } fn new_spec_var_stream(&self) -> SpecVarStream { @@ -365,12 +361,9 @@ impl IdManager { fn new_port_id(&mut self) -> PortId { Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into() } - fn new_proto_component_id(&mut self) -> ProtoComponentId { - Id { - connector_id: self.connector_id, - u32_suffix: self.proto_component_suffix_stream.next(), - } - .into() + fn new_component_id(&mut self) -> ComponentId { + Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() } + .into() } } impl Drop for Connector { @@ -418,7 +411,7 @@ impl Connector { cu.inner.port_info.polarities.insert(i, Getter); cu.inner.port_info.peers.insert(o, i); cu.inner.port_info.peers.insert(i, o); - let route = Route::LocalComponent(ComponentId::Native); + let route = Route::LocalComponent(cu.inner.native_component_id); cu.inner.port_info.routes.insert(o, route); cu.inner.port_info.routes.insert(i, route); log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); @@ -446,17 +439,14 @@ impl Connector { } } // 3. remove ports from old component & update port->route - let new_id = cu.inner.id_manager.new_proto_component_id(); + let new_cid = cu.inner.id_manager.new_component_id(); for port in ports.iter() { - cu.inner - .port_info - .routes - .insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); + cu.inner.port_info.routes.insert(*port, Route::LocalComponent(new_cid)); } cu.inner.native_ports.retain(|port| !ports.contains(port)); // 4. add new component cu.proto_components.insert( - new_id, + new_cid, ProtoComponent { state: cu.proto_description.new_main_component(identifier, ports), ports: ports.iter().copied().collect(), @@ -650,3 +640,9 @@ impl UdpInBuffer { self.byte_vec.as_mut_slice() } } + +impl Debug for UdpInBuffer { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "UdpInBuffer") + } +} diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 476bdb1ca51f259bf91cbcf905ff6edcc70109d4..ed9962ed069473e98fe41613a225d9da56c1b91b 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -8,13 +8,15 @@ impl Connector { connector_id: ConnectorId, ) -> Self { log!(&mut *logger, "Created with connector_id {:?}", connector_id); + let mut id_manager = IdManager::new(connector_id); Self { unphased: ConnectorUnphased { proto_description, proto_components: Default::default(), inner: ConnectorUnphasedInner { logger, - id_manager: IdManager::new(connector_id), + native_component_id: id_manager.new_component_id(), + id_manager, native_ports: Default::default(), port_info: Default::default(), }, @@ -52,8 +54,14 @@ impl Connector { cu.inner.port_info.peers.insert(nout, uin); cu.inner.port_info.peers.insert(uin, nout); cu.inner.port_info.peers.insert(uout, nin); - cu.inner.port_info.routes.insert(nin, Route::LocalComponent(ComponentId::Native)); - cu.inner.port_info.routes.insert(nout, Route::LocalComponent(ComponentId::Native)); + cu.inner + .port_info + .routes + .insert(nin, Route::LocalComponent(cu.inner.native_component_id)); + cu.inner + .port_info + .routes + .insert(nout, Route::LocalComponent(cu.inner.native_component_id)); cu.inner.port_info.routes.insert(uin, Route::UdpEndpoint { index: udp_index }); cu.inner.port_info.routes.insert(uout, Route::UdpEndpoint { index: udp_index }); setup.udp_endpoint_setups.push(UdpEndpointSetup { @@ -82,7 +90,7 @@ impl Connector { cu.inner .port_info .routes - .insert(local_port, Route::LocalComponent(ComponentId::Native)); + .insert(local_port, Route::LocalComponent(cu.inner.native_component_id)); log!( cu.inner.logger, "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}", diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 66e56fc5deaf489582c0f3a77b2b0bb8400a45d2..660e6da4fef7a9dd59b1ff8b040ff9bd66e741d9 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -1014,3 +1014,110 @@ fn pdl_msg_consensus() { c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap(); c.sync(SEC1).unwrap_err(); } + +#[test] +fn sequencer3_prim() { + let test_log_path = Path::new("./logs/sequencer3_prim"); + let pdl = b" + primitive seq3primitive(out a, out b, out c) { + int i = 0; + while(true) synchronous { + out to = a; + if (i==1) to = b; + else if(i==2) to = c; + if(fires(to)) { + put(to, create(0)); + i = (i + 1)%3; + } + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) primitive sequencer3, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + let [p2, g2] = c.new_port_pair(); + c.add_component(b"seq3primitive", &[p0, p1, p2]).unwrap(); + c.connect(None).unwrap(); + + let mut which_of_three = move || { + // setup three sync batches. sync. return which succeeded + c.get(g0).unwrap(); + c.next_batch().unwrap(); + c.get(g1).unwrap(); + c.next_batch().unwrap(); + c.get(g2).unwrap(); + c.sync(None).unwrap() + }; + + const TEST_ROUNDS: usize = 50; + // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...] + for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) { + assert_eq!(expected_batch_idx, which_of_three()); + } +} + +// #[test] +// fn sequencer3_comp() { +// let test_log_path = Path::new("./logs/sequencer3_comp"); +// let pdl = b" +// primitive fifo1_init(msg m, in a, out b) { +// while(true) synchronous { +// if(m != null && fires(b)) { +// put(b, m); +// m = null; +// } else if (m == null && fires(a)) { +// m = get(a); +// } +// } +// } +// composite fifo1_full(in a, out b) { +// new fifo1_init(create(0), a, b); +// } +// composite fifo1(in a, out b) { +// new fifo1_init(null, a, b); +// } +// composite seq3composite(out a, out b, out c) { +// channel d -> e; +// channel f -> g; +// channel h -> i; +// channel j -> k; +// channel l -> m; +// channel n -> o; + +// new fifo1_full(o, d); +// new replicator2(e, f, a); +// new fifo1(g, h); +// new replicator2(i, j, b); +// new fifo1(k, l); +// new replicator2(m, n, c); +// } +// "; +// let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); +// let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + +// // setup a session between (a) native, and (b) composite sequencer3, connected by 3 ports. +// let [p0, g0] = c.new_port_pair(); +// let [p1, g1] = c.new_port_pair(); +// let [p2, g2] = c.new_port_pair(); +// c.add_component(b"seq3composite", &[p0, p1, p2]).unwrap(); +// c.connect(None).unwrap(); + +// let mut which_of_three = move || { +// // setup three sync batches. sync. return which succeeded +// c.get(g0).unwrap(); +// c.next_batch().unwrap(); +// c.get(g1).unwrap(); +// c.next_batch().unwrap(); +// c.get(g2).unwrap(); +// c.sync(None).unwrap() +// }; + +// const TEST_ROUNDS: usize = 50; +// // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...] +// for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) { +// assert_eq!(expected_batch_idx, which_of_three()); +// } +// }