diff --git a/Cargo.toml b/Cargo.toml index 28e7ab867343cca968fe958faddcabc70318fd59..523e1e4b9d460a5f3742532f6bb5c76ee82e00aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "reowolf_rs" -version = "0.1.3" +version = "0.1.4" authors = [ "Christopher Esterhuyse ", "Hans-Dieter Hiep " diff --git a/README.md b/README.md index f18a11eecd15e0af40fc59a9a19e9b76bfd55497..f54136870589115064d92d4e87c085159e34be02 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,3 @@ - The resulting dylib can be found in target/release/, to be used with the header file reowolf.h. - Note: A list of immediate ancestor dependencies is visible in Cargo.toml. - Note: Run `cargo test --release` to run unit tests with release-level optimizations. - -## Structure -- The user-facing API is visible in src/runtime/connector.rs diff --git a/reowolf.h b/reowolf_old.h similarity index 100% rename from reowolf.h rename to reowolf_old.h diff --git a/src/runtime/ffi.rs b/src/runtime/ffi.rs new file mode 100644 index 0000000000000000000000000000000000000000..de43f4181242967f4393013ef30171d267bfcef8 --- /dev/null +++ b/src/runtime/ffi.rs @@ -0,0 +1,109 @@ +use super::*; + +use core::cell::RefCell; +use std::os::raw::{c_char, c_int, c_uchar, c_uint}; + +#[derive(Default)] +struct StoredError { + // invariant: len is zero IFF its occupied + // contents are 1+ bytes because we also store the NULL TERMINATOR + buf: Vec, +} +impl StoredError { + const NULL_TERMINATOR: u8 = 0; + fn clear(&mut self) { + // no null terminator either! + self.buf.clear(); + } + fn store(&mut self, error: &E) { + write!(&mut self.buf, "{:?}", error); + self.buf.push(Self::NULL_TERMINATOR); + } + fn tl_store(error: &E) { + STORED_ERROR.with(|stored_error| { + let mut stored_error = stored_error.borrow_mut(); + stored_error.clear(); + stored_error.store(error); + }) + } + fn tl_clear() { + STORED_ERROR.with(|stored_error| { + let mut stored_error = stored_error.borrow_mut(); + stored_error.clear(); + }) + } + fn tl_raw_peek() -> (*const u8, usize) { + STORED_ERROR.with(|stored_error| { + let stored_error = stored_error.borrow(); + match stored_error.buf.len() { + 0 => (core::ptr::null(), 0), // no error! + n => { + // stores an error of length n-1 AND a NULL TERMINATOR + (stored_error.buf.as_ptr(), n - 1) + } + } + }) + } +} +thread_local! { + static STORED_ERROR: RefCell = RefCell::new(StoredError::default()); +} + +type ErrorCode = i32; + +////////////////////////////////////// + +/// Returns length (via out pointer) and pointer (via return value) of the last Reowolf error. +/// - pointer is NULL iff there was no last error +/// - data at pointer is null-delimited +/// - len does NOT include the length of the null-delimiter +#[no_mangle] +pub unsafe extern "C" fn reowolf_error_peek(len: *mut usize) -> *const u8 { + let (err_ptr, err_len) = StoredError::tl_raw_peek(); + len.write(err_len); + err_ptr +} + +#[no_mangle] +pub unsafe extern "C" fn protocol_description_parse( + pdl: *const u8, + pdl_len: usize, + pd: *mut Arc, +) -> ErrorCode { + StoredError::tl_clear(); + let slice: *const [u8] = std::slice::from_raw_parts(pdl, pdl_len); + let slice: &[u8] = &*slice; + match ProtocolDescription::parse(slice) { + Ok(new) => { + pd.write(Arc::new(new)); + 0 + } + Err(err) => { + StoredError::tl_store(&err); + -1 + } + } +} + +#[no_mangle] +pub unsafe extern "C" fn protocol_description_destroy(pd: Arc) { + drop(pd) +} + +#[no_mangle] +pub unsafe extern "C" fn protocol_description_clone( + pd: &Arc, +) -> Arc { + pd.clone() +} + +// #[no_mangle] +// pub extern "C" fn connector_new(pd: *const Arc) -> *mut Connector { +// Box::into_raw(Box::new(Connector::default())) +// } + +// /// Creates and returns Reowolf Connector structure allocated on the heap. +// #[no_mangle] +// pub extern "C" fn connector_with_controller_id(controller_id: ControllerId) -> *mut Connector { +// Box::into_raw(Box::new(Connector::Unconfigured(Unconfigured { controller_id }))) +// } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 2a3984726b3efe32bb18e86d6164fa22d6d616d4..97b6c36ddfb3fcd0a90f670c1292de4198f8aec6 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,6 +1,7 @@ mod communication; mod endpoints; pub mod error; +mod ffi; mod logging; mod setup; @@ -202,6 +203,7 @@ pub struct SyncProtoContext<'a> { inbox: &'a HashMap, } //////////////// + pub fn would_block(err: &std::io::Error) -> bool { err.kind() == std::io::ErrorKind::WouldBlock } diff --git a/src/runtime/retired/actors.rs b/src/runtime/retired/actors.rs deleted file mode 100644 index c86044f404990f03c55c5a2ed2b9f47aa472c1ee..0000000000000000000000000000000000000000 --- a/src/runtime/retired/actors.rs +++ /dev/null @@ -1,442 +0,0 @@ -use crate::common::*; -use crate::runtime::{endpoint::*, *}; - -#[derive(Debug, Clone)] -pub(crate) struct MonoN { - pub ports: HashSet, - pub result: Option<(usize, HashMap)>, -} -#[derive(Debug)] -pub(crate) struct PolyN { - pub ports: HashSet, - pub branches: HashMap, -} -#[derive(Debug, Clone)] -pub(crate) struct BranchN { - pub to_get: HashSet, - pub gotten: HashMap, - pub sync_batch_index: usize, -} - -#[derive(Debug, Clone)] -pub struct MonoP { - pub state: ProtocolS, - pub ports: HashSet, -} -#[derive(Debug)] -pub(crate) struct PolyP { - pub incomplete: HashMap, - pub complete: HashMap, - pub ports: HashSet, -} -#[derive(Debug, Clone)] -pub(crate) struct BranchP { - pub blocking_on: Option, - pub outbox: HashMap, - pub inbox: HashMap, - pub state: ProtocolS, -} - -////////////////////////////////////////////////////////////////// - -impl PolyP { - pub(crate) fn poly_run( - &mut self, - m_ctx: PolyPContext, - protocol_description: &ProtocolD, - ) -> Result { - let to_run: Vec<_> = self.incomplete.drain().collect(); - self.poly_run_these_branches(m_ctx, protocol_description, to_run) - } - - pub(crate) fn poly_run_these_branches( - &mut self, - mut m_ctx: PolyPContext, - protocol_description: &ProtocolD, - mut to_run: Vec<(Predicate, BranchP)>, - ) -> Result { - use SyncRunResult as Srr; - log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,); - 'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() { - let mut r_ctx = BranchPContext { - m_ctx: m_ctx.reborrow(), - ports: &self.ports, - predicate: &predicate, - inbox: &branch.inbox, - }; - use PolyBlocker as Sb; - let blocker = branch.state.sync_run(&mut r_ctx, protocol_description); - log!( - &mut r_ctx.m_ctx.inner.logger, - "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}", - r_ctx.m_ctx.my_subtree_id, - &predicate, - &blocker - ); - match blocker { - Sb::Inconsistent => {} // DROP - Sb::CouldntReadMsg(port) => { - assert!(self.ports.contains(&port)); - let channel_id = - r_ctx.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; - log!( - &mut r_ctx.m_ctx.inner.logger, - "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}", - r_ctx.m_ctx.my_subtree_id, - channel_id, - &branch.inbox, - ); - if predicate.replace_assignment(channel_id, true) != Some(false) { - // don't rerun now. Rerun at next `sync_run` - - log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,); - branch.blocking_on = Some(port); - self.incomplete.insert(predicate, branch); - } else { - log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,); - } - // ELSE DROP - } - Sb::CouldntCheckFiring(port) => { - assert!(self.ports.contains(&port)); - let channel_id = - r_ctx.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; - // split the branch! - let branch_f = branch.clone(); - let mut predicate_f = predicate.clone(); - if predicate_f.replace_assignment(channel_id, false).is_some() { - panic!("OI HANS QUERY FIRST!"); - } - assert!(predicate.replace_assignment(channel_id, true).is_none()); - to_run.push((predicate, branch)); - to_run.push((predicate_f, branch_f)); - } - Sb::SyncBlockEnd => { - let ControllerInner { logger, endpoint_exts, .. } = m_ctx.inner; - log!( - logger, - "~ ... ran {:?} reached SyncBlockEnd with pred {:?} ...", - m_ctx.my_subtree_id, - &predicate, - ); - // come up with the predicate for this local solution - - for port in self.ports.iter() { - let channel_id = endpoint_exts.get(*port).unwrap().info.channel_id; - let fired = - branch.inbox.contains_key(port) || branch.outbox.contains_key(port); - match predicate.query(channel_id) { - Some(true) => { - if !fired { - // This branch should have fired but didn't! - log!( - logger, - "~ ... ... should have fired {:?} and didn't! pruning!", - channel_id, - ); - continue 'to_run_loop; - } - } - Some(false) => { - if fired { - println!( - "pred {:#?} in {:#?} out {:#?}", - &predicate, - branch.inbox.get(port), - branch.outbox.get(port) - ); - panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had Some(false)!" ,channel_id) - } - } - None => { - predicate.replace_assignment(channel_id, false); - if fired { - println!( - "pred {:#?} in {:#?} out {:#?}", - &predicate, - branch.inbox.get(port), - branch.outbox.get(port) - ); - panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had None!" ,channel_id) - } - } - } - } - log!(logger, "~ ... ... and finished just fine!",); - m_ctx.solution_storage.submit_and_digest_subtree_solution( - &mut m_ctx.inner.logger, - m_ctx.my_subtree_id, - predicate.clone(), - ); - self.complete.insert(predicate, branch); - } - Sb::PutMsg(port, payload) => { - assert!(self.ports.contains(&port)); - let EndpointExt { info, endpoint } = - m_ctx.inner.endpoint_exts.get_mut(port).unwrap(); - if predicate.replace_assignment(info.channel_id, true) != Some(false) { - branch.outbox.insert(port, payload.clone()); - let msg = CommMsgContents::SendPayload { - payload_predicate: predicate.clone(), - payload, - } - .into_msg(m_ctx.inner.round_index); - log!( - &mut m_ctx.inner.logger, - "~ ... ... PolyP sending msg {:?} to {:?} ({:?}) now!", - &msg, - port, - (info.channel_id.controller_id, info.channel_id.channel_index), - ); - endpoint.send(msg)?; - to_run.push((predicate, branch)); - } - // ELSE DROP - } - } - } - // all in self.incomplete most recently returned Blocker::CouldntReadMsg - Ok(if self.incomplete.is_empty() { - if self.complete.is_empty() { - Srr::NoBranches - } else { - Srr::AllBranchesComplete - } - } else { - Srr::BlockingForRecv - }) - } - - pub(crate) fn poly_recv_run( - &mut self, - m_ctx: PolyPContext, - protocol_description: &ProtocolD, - port: PortId, - payload_predicate: Predicate, - payload: Payload, - ) -> Result { - // try exact match - - let to_run = if self.complete.contains_key(&payload_predicate) { - // exact match with stopped machine - - log!( - &mut m_ctx.inner.logger, - "... poly_recv_run matched stopped machine exactly! nothing to do here", - ); - vec![] - } else if let Some(mut branch) = self.incomplete.remove(&payload_predicate) { - // exact match with running machine - - log!( - &mut m_ctx.inner.logger, - "... poly_recv_run matched running machine exactly! pred is {:?}", - &payload_predicate - ); - branch.inbox.insert(port, payload); - if branch.blocking_on == Some(port) { - branch.blocking_on = None; - vec![(payload_predicate, branch)] - } else { - vec![] - } - } else { - log!( - &mut m_ctx.inner.logger, - "... poly_recv_run didn't have any exact matches... Let's try feed it to all branches", - ); - let mut incomplete2 = HashMap::<_, _>::default(); - let to_run = self - .incomplete - .drain() - .filter_map(|(old_predicate, mut branch)| { - use CommonSatResult as Csr; - match old_predicate.common_satisfier(&payload_predicate) { - Csr::FormerNotLatter | Csr::Equivalent => { - log!( - &mut m_ctx.inner.logger, - "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}", - &old_predicate - ); - // old_predicate COVERS the assumptions of payload_predicate - - if let Some(prev_payload) = branch.inbox.get(&port) { - // Incorrect to receive two distinct messages in same branch! - assert_eq!(prev_payload, &payload); - } - branch.inbox.insert(port, payload.clone()); - if branch.blocking_on == Some(port) { - // run. - branch.blocking_on = None; - Some((old_predicate, branch)) - } else { - // don't bother running. its awaiting something else - incomplete2.insert(old_predicate, branch); - None - } - } - Csr::New(new) => { - log!( - &mut m_ctx.inner.logger, - "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING", - &payload_predicate, - &old_predicate, - &new, - ); - // payload_predicate has new assumptions. FORK! - let mut payload_branch = branch.clone(); - if let Some(prev_payload) = payload_branch.inbox.get(&port) { - // Incorrect to receive two distinct messages in same branch! - assert_eq!(prev_payload, &payload); - } - payload_branch.inbox.insert(port, payload.clone()); - - // put the original back untouched - incomplete2.insert(old_predicate, branch); - if payload_branch.blocking_on == Some(port) { - // run the fork - payload_branch.blocking_on = None; - Some((new, payload_branch)) - } else { - // don't bother running. its awaiting something else - incomplete2.insert(new, payload_branch); - None - } - } - Csr::LatterNotFormer => { - log!( - &mut m_ctx.inner.logger, - "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING", - &old_predicate, - &payload_predicate, - ); - // payload_predicate has new assumptions. FORK! - let mut payload_branch = branch.clone(); - if let Some(prev_payload) = payload_branch.inbox.get(&port) { - // Incorrect to receive two distinct messages in same branch! - assert_eq!(prev_payload, &payload); - } - payload_branch.inbox.insert(port, payload.clone()); - - // put the original back untouched - incomplete2.insert(old_predicate.clone(), branch); - if payload_branch.blocking_on == Some(port) { - // run the fork - payload_branch.blocking_on = None; - Some((payload_predicate.clone(), payload_branch)) - } else { - // don't bother running. its awaiting something else - incomplete2.insert(payload_predicate.clone(), payload_branch); - None - } - } - Csr::Nonexistant => { - log!( - &mut m_ctx.inner.logger, - "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}", - &old_predicate, - &payload_predicate, - ); - // predicates contradict - incomplete2.insert(old_predicate, branch); - None - } - } - }) - .collect(); - std::mem::swap(&mut self.incomplete, &mut incomplete2); - to_run - }; - log!( - &mut m_ctx.inner.logger, - "... DONE FEEDING BRANCHES. {} branches to run!", - to_run.len(), - ); - self.poly_run_these_branches(m_ctx, protocol_description, to_run) - } - - pub(crate) fn choose_mono(&self, decision: &Predicate) -> Option { - self.complete - .iter() - .find(|(p, _)| decision.satisfies(p)) - .map(|(_, branch)| MonoP { state: branch.state.clone(), ports: self.ports.clone() }) - } -} - -impl PolyN { - pub fn sync_recv( - &mut self, - port: PortId, - logger: &mut String, - payload: Payload, - payload_predicate: Predicate, - solution_storage: &mut SolutionStorage, - ) { - let mut branches2: HashMap<_, _> = Default::default(); - for (old_predicate, mut branch) in self.branches.drain() { - use CommonSatResult as Csr; - let case = old_predicate.common_satisfier(&payload_predicate); - let mut report_if_solution = - |branch: &BranchN, pred: &Predicate, logger: &mut String| { - if branch.to_get.is_empty() { - log!(logger, "Native reporting solution with inbox {:#?}", &branch.gotten); - solution_storage.submit_and_digest_subtree_solution( - logger, - SubtreeId::PolyN, - pred.clone(), - ); - } - }; - log!( - logger, - "Feeding msg {:?} {:?} to native branch with pred {:?}. Predicate case {:?}", - &payload_predicate, - &payload, - &old_predicate, - &case - ); - match case { - Csr::Nonexistant => { /* skip branch */ } - Csr::LatterNotFormer | Csr::Equivalent => { - // Feed the message to this branch in-place. no need to modify pred. - if branch.to_get.remove(&port) { - branch.gotten.insert(port, payload.clone()); - report_if_solution(&branch, &old_predicate, logger); - } - } - Csr::FormerNotLatter => { - // create a new branch with the payload_predicate. - let mut forked = branch.clone(); - if forked.to_get.remove(&port) { - forked.gotten.insert(port, payload.clone()); - report_if_solution(&forked, &payload_predicate, logger); - branches2.insert(payload_predicate.clone(), forked); - } - } - Csr::New(new) => { - // create a new branch with the newly-created predicate - let mut forked = branch.clone(); - if forked.to_get.remove(&port) { - forked.gotten.insert(port, payload.clone()); - report_if_solution(&forked, &new, logger); - branches2.insert(new.clone(), forked); - } - } - } - // unlike PolyP machines, Native branches do NOT become inconsistent - branches2.insert(old_predicate, branch); - } - log!(logger, "Native now has branches {:#?}", &branches2); - std::mem::swap(&mut branches2, &mut self.branches); - } - - pub fn choose_mono(&self, decision: &Predicate) -> Option { - self.branches - .iter() - .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p)) - .map(|(_, branch)| { - let BranchN { gotten, sync_batch_index, .. } = branch.clone(); - MonoN { ports: self.ports.clone(), result: Some((sync_batch_index, gotten)) } - }) - } -} diff --git a/src/runtime/retired/communication.rs b/src/runtime/retired/communication.rs deleted file mode 100644 index e619b0712bbc97002d4b7cb762fd5e264fafdb49..0000000000000000000000000000000000000000 --- a/src/runtime/retired/communication.rs +++ /dev/null @@ -1,739 +0,0 @@ -use crate::common::*; -use crate::runtime::{actors::*, endpoint::*, errors::*, *}; - -impl Controller { - fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> { - log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); - let ret = match &decision { - Decision::Success(predicate) => { - // overwrite MonoN/P - self.inner.mono_n = { - let poly_n = self.ephemeral.poly_n.take().unwrap(); - poly_n.choose_mono(predicate).unwrap_or_else(|| { - panic!( - "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", - &predicate, &poly_n.branches, &self.inner.logger - ); - }) - }; - self.inner.mono_ps.clear(); - self.inner.mono_ps.extend( - self.ephemeral - .poly_ps - .drain(..) - .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), - ); - Ok(()) - } - Decision::Failure => Err(SyncErr::Timeout), - }; - let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); - for &child_port in self.inner.family.children_ports.iter() { - log!( - &mut self.inner.logger, - "Forwarding {:?} to child with port {:?}", - &announcement, - child_port - ); - self.inner - .endpoint_exts - .get_mut(child_port) - .expect("eefef") - .endpoint - .send(announcement.clone())?; - } - self.inner.round_index += 1; - self.ephemeral.clear(); - ret - } - - // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found - fn handle_locals_maybe_decide(&mut self) -> Result { - if let Some(parent_port) = self.inner.family.parent_port { - // I have a parent -> I'm not the leader - let parent_endpoint = - &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; - for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { - let msg = - CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); - log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port); - parent_endpoint.send(msg)?; - } - Ok(false) - } else { - // I have no parent -> I'm the leader - assert!(self.inner.family.parent_port.is_none()); - let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); - Ok(if let Some(predicate) = maybe_predicate { - let decision = Decision::Success(predicate); - log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); - self.end_round_with_decision(decision)?; - true - } else { - false - }) - } - } - - fn kick_off_native( - &mut self, - sync_batches: impl Iterator, - ) -> Result { - let MonoN { ports, .. } = self.inner.mono_n.clone(); - let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; - let mut branches = HashMap::<_, _>::default(); - for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { - let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; - let all_ports = ports.iter().copied(); - let all_channel_ids = all_ports.map(port_to_channel_id); - - let mut predicate = Predicate::new_trivial(); - - // assign TRUE for puts and gets - let true_ports = puts.keys().chain(gets.iter()).copied(); - let true_channel_ids = true_ports.clone().map(port_to_channel_id); - predicate.batch_assign_nones(true_channel_ids, true); - - // assign FALSE for all in interface not assigned true - predicate.batch_assign_nones(all_channel_ids.clone(), false); - - if branches.contains_key(&predicate) { - // TODO what do I do with redundant predicates? - unimplemented!( - "Duplicate predicate {:#?}!\nHaving multiple batches with the same - predicate requires the support of oracle boolean variables", - &predicate, - ) - } - let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; - for (port, payload) in puts { - log!( - &mut self.inner.logger, - "... ... Initial native put msg {:?} pred {:?} batch {:?}", - &payload, - &predicate, - sync_batch_index, - ); - let msg = - CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } - .into_msg(*round_index); - endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; - } - log!( - &mut self.inner.logger, - "... Initial native branch batch index={} with pred {:?}", - sync_batch_index, - &predicate - ); - if branch.to_get.is_empty() { - self.ephemeral.solution_storage.submit_and_digest_subtree_solution( - &mut self.inner.logger, - SubtreeId::PolyN, - predicate.clone(), - ); - } - branches.insert(predicate, branch); - } - Ok(PolyN { ports, branches }) - } - pub fn sync_round( - &mut self, - deadline: Option, - sync_batches: Option>, - ) -> Result<(), SyncErr> { - if let Some(e) = self.unrecoverable_error { - return Err(e.clone()); - } - self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e { - SyncErr::Timeout => e, // this isn't unrecoverable - _ => { - // Must set unrecoverable error! and tear down our net channels - self.unrecoverable_error = Some(e); - self.ephemeral.clear(); - self.inner.endpoint_exts = Default::default(); - e - } - }) - } - - // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. - // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. - fn sync_round_inner( - &mut self, - mut deadline: Option, - sync_batches: Option>, - ) -> Result<(), SyncErr> { - log!( - &mut self.inner.logger, - "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", - self.inner.round_index - ); - assert!(self.ephemeral.is_clear()); - assert!(self.unrecoverable_error.is_none()); - - // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). - // Some actors are dropped. some new actors are created. - // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors - self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned()); - log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); - while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { - let mut m_ctx = MonoPContext { - ports: &mut mono_p.ports, - mono_ps: &mut self.ephemeral.mono_ps, - inner: &mut self.inner, - }; - // cross boundary into crate::protocol - let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); - log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); - match blocker { - MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent), - MonoBlocker::ComponentExit => drop(mono_p), - MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), - } - } - log!( - &mut self.inner.logger, - "Finished running all MonoPs! Have {} PolyPs waiting", - self.ephemeral.poly_ps.len() - ); - - // 3. define the mapping from port -> actor - // this is needed during the event loop to determine which actor - // should receive the incoming message. - // TODO: store and update this mapping rather than rebuilding it each round. - let port_to_holder: HashMap = { - use PolyId::*; - let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); - let p = self - .ephemeral - .poly_ps - .iter() - .enumerate() - .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); - n.chain(p).collect() - }; - log!( - &mut self.inner.logger, - "SET OF PolyPs and MonoPs final! port lookup map is {:?}", - &port_to_holder - ); - - // 4. Create the solution storage. it tracks the solutions of "subtrees" - // of the controller in the overlay tree. - self.ephemeral.solution_storage.reset({ - let n = std::iter::once(SubtreeId::PolyN); - let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); - let c = self - .inner - .family - .children_ports - .iter() - .map(|&port| SubtreeId::ChildController { port }); - let subtree_id_iter = n.chain(m).chain(c); - log!( - &mut self.inner.logger, - "Solution Storage has subtree Ids: {:?}", - &subtree_id_iter.clone().collect::>() - ); - subtree_id_iter - }); - - // 5. kick off the synchronous round of the native actor if it exists - - log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); - self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { - // using if let because of nested ? operator - // TODO check that there are 1+ branches or NO SOLUTION - let poly_n = self.kick_off_native(sync_batches)?; - log!( - &mut self.inner.logger, - "PolyN kicked off, and has branches with predicates... {:?}", - poly_n.branches.keys().collect::>() - ); - Some(poly_n) - } else { - log!(&mut self.inner.logger, "NO NATIVE COMPONENT"); - None - }; - - // 6. Kick off the synchronous round of each protocol actor - // If just one actor becomes inconsistent now, there can be no solution! - // TODO distinguish between completed and not completed poly_p's? - log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); - for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { - let my_subtree_id = SubtreeId::PolyP { index }; - let m_ctx = PolyPContext { - my_subtree_id, - inner: &mut self.inner, - solution_storage: &mut self.ephemeral.solution_storage, - }; - use SyncRunResult as Srr; - let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; - log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker); - match blocker { - Srr::NoBranches => return Err(SyncErr::Inconsistent), - Srr::AllBranchesComplete | Srr::BlockingForRecv => (), - } - } - log!(&mut self.inner.logger, "All Poly machines have been kicked off!"); - - // 7. `solution_storage` may have new solutions for this controller - // handle their discovery. LEADER => announce, otherwise => send to parent - { - let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); - log!( - &mut self.inner.logger, - "Got {} controller-local solutions before a single RECV: {:?}", - peeked.len(), - peeked - ); - } - if self.handle_locals_maybe_decide()? { - return Ok(()); - } - - // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error - log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); - self.undelay_all(); - 'recv_loop: loop { - log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline); - let received = match deadline { - None => { - // we have personally timed out. perform a "long" poll. - self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP") - } - Some(d) => match self.recv(d)? { - // we have not yet timed out. performed a time-limited poll - Some(received) => received, - None => { - // timed out! send a FAILURE message to the sink, - // and henceforth don't time out on polling. - deadline = None; - match self.inner.family.parent_port { - None => { - // I am the sink! announce failure and return. - return self.end_round_with_decision(Decision::Failure); - } - Some(parent_port) => { - // I am not the sink! send a failure message. - let announcement = Msg::CommMsg(CommMsg { - round_index: self.inner.round_index, - contents: CommMsgContents::Failure, - }); - log!( - &mut self.inner.logger, - "Forwarding {:?} to parent with port {:?}", - &announcement, - parent_port - ); - self.inner - .endpoint_exts - .get_mut(parent_port) - .expect("ss") - .endpoint - .send(announcement.clone())?; - continue; // poll some more - } - } - } - }, - }; - log!(&mut self.inner.logger, "::: message {:?}...", &received); - let current_content = match received.msg { - Msg::SetupMsg(s) => { - // This occurs in the event the connector was malformed during connect() - println!("WASNT EXPECTING {:?}", s); - return Err(SyncErr::UnexpectedSetupMsg); - } - Msg::CommMsg(CommMsg { round_index, .. }) - if round_index < self.inner.round_index => - { - // Old message! Can safely discard - log!(&mut self.inner.logger, "...and its OLD! :("); - drop(received); - continue 'recv_loop; - } - Msg::CommMsg(CommMsg { round_index, .. }) - if round_index > self.inner.round_index => - { - // Message from a next round. Keep for later! - log!(&mut self.inner.logger, "... DELAY! :("); - self.delay(received); - continue 'recv_loop; - } - Msg::CommMsg(CommMsg { contents, round_index }) => { - log!( - &mut self.inner.logger, - "... its a round-appropriate CommMsg with port {:?}", - received.recipient - ); - assert_eq!(round_index, self.inner.round_index); - contents - } - }; - match current_content { - CommMsgContents::Failure => match self.inner.family.parent_port { - Some(parent_port) => { - let announcement = Msg::CommMsg(CommMsg { - round_index: self.inner.round_index, - contents: CommMsgContents::Failure, - }); - log!( - &mut self.inner.logger, - "Forwarding {:?} to parent with port {:?}", - &announcement, - parent_port - ); - self.inner - .endpoint_exts - .get_mut(parent_port) - .expect("ss") - .endpoint - .send(announcement.clone())?; - } - None => return self.end_round_with_decision(Decision::Failure), - }, - CommMsgContents::Elaborate { partial_oracle } => { - // Child controller submitted a subtree solution. - if !self.inner.family.children_ports.contains(&received.recipient) { - return Err(SyncErr::ElaborateFromNonChild); - } - let subtree_id = SubtreeId::ChildController { port: received.recipient }; - log!( - &mut self.inner.logger, - "Received elaboration from child for subtree {:?}: {:?}", - subtree_id, - &partial_oracle - ); - self.ephemeral.solution_storage.submit_and_digest_subtree_solution( - &mut self.inner.logger, - subtree_id, - partial_oracle, - ); - if self.handle_locals_maybe_decide()? { - return Ok(()); - } - } - CommMsgContents::Announce { decision } => { - if self.inner.family.parent_port != Some(received.recipient) { - return Err(SyncErr::AnnounceFromNonParent); - } - log!( - &mut self.inner.logger, - "Received ANNOUNCEMENT from from parent {:?}: {:?}", - received.recipient, - &decision - ); - return self.end_round_with_decision(decision); - } - CommMsgContents::SendPayload { payload_predicate, payload } => { - // check that we expect to be able to receive payloads from this sender - assert_eq!( - Getter, - self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity - ); - - // message for some actor. Feed it to the appropriate actor - // and then give them another chance to run. - let subtree_id = port_to_holder.get(&received.recipient); - log!( - &mut self.inner.logger, - "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", - subtree_id, - &payload_predicate, - &payload - ); - let channel_id = self - .inner - .endpoint_exts - .get(received.recipient) - .expect("UEHFU") - .info - .channel_id; - if payload_predicate.query(channel_id) != Some(true) { - // sender didn't preserve the invariant - return Err(SyncErr::PayloadPremiseExcludesTheChannel(channel_id)); - } - match subtree_id { - None => { - // this happens when a message is sent to a component that has exited. - // It's safe to drop this message; - // The sender branch will certainly not be part of the solution - } - Some(PolyId::N) => { - // Message for NativeMachine - self.ephemeral.poly_n.as_mut().unwrap().sync_recv( - received.recipient, - &mut self.inner.logger, - payload, - payload_predicate, - &mut self.ephemeral.solution_storage, - ); - if self.handle_locals_maybe_decide()? { - return Ok(()); - } - } - Some(PolyId::P { index }) => { - // Message for protocol actor - let poly_p = &mut self.ephemeral.poly_ps[*index]; - - let m_ctx = PolyPContext { - my_subtree_id: SubtreeId::PolyP { index: *index }, - inner: &mut self.inner, - solution_storage: &mut self.ephemeral.solution_storage, - }; - use SyncRunResult as Srr; - let blocker = poly_p.poly_recv_run( - m_ctx, - &self.protocol_description, - received.recipient, - payload_predicate, - payload, - )?; - log!( - &mut self.inner.logger, - "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", - subtree_id, - blocker - ); - match blocker { - Srr::NoBranches => return Err(SyncErr::Inconsistent), - Srr::BlockingForRecv | Srr::AllBranchesComplete => { - { - let peeked = self - .ephemeral - .solution_storage - .peek_new_locals() - .collect::>(); - log!( - &mut self.inner.logger, - "Got {} new controller-local solutions from RECV: {:?}", - peeked.len(), - peeked - ); - } - if self.handle_locals_maybe_decide()? { - return Ok(()); - } - } - } - } - }; - } - } - } - } -} -impl ControllerEphemeral { - fn is_clear(&self) -> bool { - self.solution_storage.is_clear() - && self.poly_n.is_none() - && self.poly_ps.is_empty() - && self.mono_ps.is_empty() - && self.port_to_holder.is_empty() - } - fn clear(&mut self) { - self.solution_storage.clear(); - self.poly_n.take(); - self.poly_ps.clear(); - self.port_to_holder.clear(); - } -} -impl Into for MonoP { - fn into(self) -> PolyP { - PolyP { - complete: Default::default(), - incomplete: hashmap! { - Predicate::new_trivial() => - BranchP { - state: self.state, - inbox: Default::default(), - outbox: Default::default(), - blocking_on: None, - } - }, - ports: self.ports, - } - } -} - -impl From for SyncErr { - fn from(e: EndpointErr) -> SyncErr { - SyncErr::EndpointErr(e) - } -} - -impl MonoContext for MonoPContext<'_> { - type D = ProtocolD; - type S = ProtocolS; - fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { - log!( - &mut self.inner.logger, - "!! MonoContext callback to new_component with ports {:?}!", - &moved_ports, - ); - if moved_ports.is_subset(self.ports) { - self.ports.retain(|x| !moved_ports.contains(x)); - self.mono_ps.push(MonoP { state: init_state, ports: moved_ports }); - } else { - panic!("MachineP attempting to move alien port!"); - } - } - fn new_channel(&mut self) -> [PortId; 2] { - let [a, b] = Endpoint::new_memory_pair(); - let channel_id = self.inner.channel_id_stream.next(); - - let mut clos = |endpoint, polarity| { - let endpoint_ext = - EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint }; - let port = self.inner.endpoint_exts.alloc(endpoint_ext); - let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint; - let token = PortId::to_token(port); - self.inner - .messenger_state - .poll - .register(endpoint, token, Ready::readable(), PollOpt::edge()) - .expect("AAGAGGGGG"); - self.ports.insert(port); - port - }; - let [kp, kg] = [clos(a, Putter), clos(b, Getter)]; - log!( - &mut self.inner.logger, - "!! MonoContext callback to new_channel. returning ports {:?}!", - [kp, kg], - ); - [kp, kg] - } - fn new_random(&mut self) -> u64 { - type Bytes8 = [u8; std::mem::size_of::()]; - let mut bytes = Bytes8::default(); - getrandom::getrandom(&mut bytes).unwrap(); - let val = unsafe { std::mem::transmute::(bytes) }; - log!( - &mut self.inner.logger, - "!! MonoContext callback to new_random. returning val {:?}!", - val, - ); - val - } -} - -impl SolutionStorage { - fn is_clear(&self) -> bool { - self.subtree_id_to_index.is_empty() - && self.subtree_solutions.is_empty() - && self.old_local.is_empty() - && self.new_local.is_empty() - } - fn clear(&mut self) { - self.subtree_id_to_index.clear(); - self.subtree_solutions.clear(); - self.old_local.clear(); - self.new_local.clear(); - } - pub(crate) fn reset(&mut self, subtree_ids: impl Iterator) { - self.subtree_id_to_index.clear(); - self.subtree_solutions.clear(); - self.old_local.clear(); - self.new_local.clear(); - for key in subtree_ids { - self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); - self.subtree_solutions.push(Default::default()) - } - } - - pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { - self.new_local.iter() - } - - pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { - let Self { old_local, new_local, .. } = self; - new_local.drain().map(move |local| { - old_local.insert(local.clone()); - local - }) - } - - pub(crate) fn submit_and_digest_subtree_solution( - &mut self, - logger: &mut String, - subtree_id: SubtreeId, - predicate: Predicate, - ) { - log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate); - let index = self.subtree_id_to_index[&subtree_id]; - let left = 0..index; - let right = (index + 1)..self.subtree_solutions.len(); - - let Self { subtree_solutions, new_local, old_local, .. } = self; - let was_new = subtree_solutions[index].insert(predicate.clone()); - if was_new { - let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); - Self::elaborate_into_new_local_rec( - logger, - predicate, - set_visitor, - old_local, - new_local, - ); - } - } - - fn elaborate_into_new_local_rec<'a, 'b>( - logger: &mut String, - partial: Predicate, - mut set_visitor: impl Iterator> + Clone, - old_local: &'b HashSet, - new_local: &'a mut HashSet, - ) { - if let Some(set) = set_visitor.next() { - // incomplete solution. keep traversing - for pred in set.iter() { - if let Some(elaborated) = pred.union_with(&partial) { - Self::elaborate_into_new_local_rec( - logger, - elaborated, - set_visitor.clone(), - old_local, - new_local, - ) - } - } - } else { - // recursive stop condition. `partial` is a local subtree solution - if !old_local.contains(&partial) { - // ... and it hasn't been found before - log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); - new_local.insert(partial); - } - } - } -} -impl PolyContext for BranchPContext<'_, '_> { - type D = ProtocolD; - - fn is_firing(&mut self, port: PortId) -> Option { - assert!(self.ports.contains(&port)); - let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; - let val = self.predicate.query(channel_id); - log!( - &mut self.m_ctx.inner.logger, - "!! PolyContext callback to is_firing by {:?}! returning {:?}", - self.m_ctx.my_subtree_id, - val, - ); - val - } - fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - assert!(self.ports.contains(&port)); - let val = self.inbox.get(&port); - log!( - &mut self.m_ctx.inner.logger, - "!! PolyContext callback to read_msg by {:?}! returning {:?}", - self.m_ctx.my_subtree_id, - val, - ); - val - } -} diff --git a/src/runtime/retired/connector.rs b/src/runtime/retired/connector.rs deleted file mode 100644 index 5a10a83706fdca6bb462cc2d8b0ec2dea08a846f..0000000000000000000000000000000000000000 --- a/src/runtime/retired/connector.rs +++ /dev/null @@ -1,187 +0,0 @@ -use crate::common::*; -use crate::runtime::{errors::*, *}; - -pub fn random_controller_id() -> ControllerId { - type Bytes8 = [u8; std::mem::size_of::()]; - let mut bytes = Bytes8::default(); - getrandom::getrandom(&mut bytes).unwrap(); - unsafe { std::mem::transmute::(bytes) } -} - -impl Default for Unconfigured { - fn default() -> Self { - let controller_id = random_controller_id(); - Self { controller_id } - } -} -impl Default for Connector { - fn default() -> Self { - Self::Unconfigured(Unconfigured::default()) - } -} -impl Connector { - /// Configure the Connector with the given Pdl description. - pub fn configure(&mut self, pdl: &[u8], main_component: &[u8]) -> Result<(), ConfigErr> { - use ConfigErr::*; - let controller_id = match self { - Connector::Configured(_) => return Err(AlreadyConfigured), - Connector::Connected(_) => return Err(AlreadyConnected), - Connector::Unconfigured(Unconfigured { controller_id }) => *controller_id, - }; - let protocol_description = Arc::new(ProtocolD::parse(pdl).map_err(ParseErr)?); - let polarities = protocol_description.component_polarities(main_component)?; - let configured = Configured { - controller_id, - protocol_description, - bindings: Default::default(), - polarities, - main_component: main_component.to_vec(), - logger: "Logger created!\n".into(), - }; - *self = Connector::Configured(configured); - Ok(()) - } - - /// Bind the (configured) connector's port corresponding to the - pub fn bind_port( - &mut self, - proto_port_index: usize, - binding: PortBinding, - ) -> Result<(), PortBindErr> { - use PortBindErr::*; - match self { - Connector::Unconfigured { .. } => Err(NotConfigured), - Connector::Connected(_) => Err(AlreadyConnected), - Connector::Configured(configured) => { - if configured.polarities.len() <= proto_port_index { - return Err(IndexOutOfBounds); - } - configured.bindings.insert(proto_port_index, binding); - Ok(()) - } - } - } - pub fn connect(&mut self, timeout: Duration) -> Result<(), ConnectErr> { - let deadline = Instant::now() + timeout; - use ConnectErr::*; - let configured = match self { - Connector::Unconfigured { .. } => return Err(NotConfigured), - Connector::Connected(_) => return Err(AlreadyConnected), - Connector::Configured(configured) => configured, - }; - // 1. Unwrap bindings or err - let bound_proto_interface: Vec<(_, _)> = configured - .polarities - .iter() - .copied() - .enumerate() - .map(|(native_index, polarity)| { - let binding = configured - .bindings - .get(&native_index) - .copied() - .ok_or(PortNotBound { native_index })?; - Ok((binding, polarity)) - }) - .collect::, ConnectErr>>()?; - let (controller, native_interface) = Controller::connect( - configured.controller_id, - &configured.main_component, - configured.protocol_description.clone(), - &bound_proto_interface[..], - &mut configured.logger, - deadline, - )?; - *self = Connector::Connected(Connected { - native_interface, - sync_batches: vec![Default::default()], - controller, - }); - Ok(()) - } - pub fn get_mut_logger(&mut self) -> Option<&mut String> { - match self { - Connector::Configured(configured) => Some(&mut configured.logger), - Connector::Connected(connected) => Some(&mut connected.controller.inner.logger), - _ => None, - } - } - - pub fn put(&mut self, native_port_index: usize, payload: Payload) -> Result<(), PortOpErr> { - use PortOpErr::*; - let connected = match self { - Connector::Connected(connected) => connected, - _ => return Err(NotConnected), - }; - let (port, native_polarity) = - *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?; - if native_polarity != Putter { - return Err(WrongPolarity); - } - let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!"); - if sync_batch.puts.contains_key(&port) { - return Err(DuplicateOperation); - } - sync_batch.puts.insert(port, payload); - Ok(()) - } - - pub fn get(&mut self, native_port_index: usize) -> Result<(), PortOpErr> { - use PortOpErr::*; - let connected = match self { - Connector::Connected(connected) => connected, - _ => return Err(NotConnected), - }; - let (port, native_polarity) = - *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?; - if native_polarity != Getter { - return Err(WrongPolarity); - } - let sync_batch = connected.sync_batches.iter_mut().last().expect("no sync batch!"); - if sync_batch.gets.contains(&port) { - return Err(DuplicateOperation); - } - sync_batch.gets.insert(port); - Ok(()) - } - pub fn next_batch(&mut self) -> Result { - let connected = match self { - Connector::Connected(connected) => connected, - _ => return Err(()), - }; - connected.sync_batches.push(SyncBatch::default()); - Ok(connected.sync_batches.len() - 2) - } - - pub fn sync(&mut self, timeout: Duration) -> Result { - let deadline = Instant::now() + timeout; - use SyncErr::*; - let connected = match self { - Connector::Connected(connected) => connected, - _ => return Err(NotConnected), - }; - - // do the synchronous round! - let res = - connected.controller.sync_round(Some(deadline), Some(connected.sync_batches.drain(..))); - connected.sync_batches.push(SyncBatch::default()); - res?; - Ok(connected.controller.inner.mono_n.result.as_mut().expect("qqqs").0) - } - - pub fn read_gotten(&self, native_port_index: usize) -> Result<&[u8], ReadGottenErr> { - use ReadGottenErr::*; - let connected = match self { - Connector::Connected(connected) => connected, - _ => return Err(NotConnected), - }; - let &(key, polarity) = - connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?; - if polarity != Getter { - return Err(WrongPolarity); - } - let result = connected.controller.inner.mono_n.result.as_ref().ok_or(NoPreviousRound)?; - let payload = result.1.get(&key).ok_or(DidNotGet)?; - Ok(payload.as_slice()) - } -} diff --git a/src/runtime/retired/endpoint.rs b/src/runtime/retired/endpoint.rs deleted file mode 100644 index 359692552320f0bfbcbd269c5a5827581545e405..0000000000000000000000000000000000000000 --- a/src/runtime/retired/endpoint.rs +++ /dev/null @@ -1,219 +0,0 @@ -use crate::common::*; -use crate::runtime::{errors::*, Predicate}; -use mio::{Evented, PollOpt, Ready}; - -pub(crate) enum Endpoint { - Memory { s: mio_extras::channel::Sender, r: mio_extras::channel::Receiver }, - Network(NetworkEndpoint), -} - -#[derive(Debug)] -pub(crate) struct EndpointExt { - pub endpoint: Endpoint, - pub info: EndpointInfo, -} -#[derive(Debug, Copy, Clone)] -pub struct EndpointInfo { - pub polarity: Polarity, - pub channel_id: ChannelId, -} - -#[derive(Debug, Clone)] -pub(crate) enum Decision { - Failure, - Success(Predicate), -} - -#[derive(Clone, Debug)] -pub(crate) enum Msg { - SetupMsg(SetupMsg), - CommMsg(CommMsg), -} -#[derive(Clone, Debug)] -pub(crate) enum SetupMsg { - // sent by the passive endpoint to the active endpoint - ChannelSetup { info: EndpointInfo }, - LeaderEcho { maybe_leader: ControllerId }, - LeaderAnnounce { leader: ControllerId }, - YouAreMyParent, -} -impl Into for SetupMsg { - fn into(self) -> Msg { - Msg::SetupMsg(self) - } -} - -#[derive(Clone, Debug)] -pub(crate) struct CommMsg { - pub round_index: usize, - pub contents: CommMsgContents, -} -#[derive(Clone, Debug)] -pub(crate) enum CommMsgContents { - SendPayload { payload_predicate: Predicate, payload: Payload }, - Elaborate { partial_oracle: Predicate }, // SINKWARD - Failure, // SINKWARD - Announce { decision: Decision }, // SINKAWAYS -} - -pub struct NetworkEndpoint { - stream: mio::net::TcpStream, - inbox: Vec, - outbox: Vec, -} - -impl std::fmt::Debug for Endpoint { - fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { - let s = match self { - Endpoint::Memory { .. } => "Memory", - Endpoint::Network(..) => "Network", - }; - f.write_fmt(format_args!("Endpoint::{}", s)) - } -} - -impl CommMsgContents { - pub fn into_msg(self, round_index: usize) -> Msg { - Msg::CommMsg(CommMsg { round_index, contents: self }) - } -} - -impl From for ConnectErr { - fn from(e: EndpointErr) -> Self { - match e { - EndpointErr::Disconnected => ConnectErr::Disconnected, - EndpointErr::MetaProtocolDeviation => ConnectErr::MetaProtocolDeviation, - } - } -} -impl Endpoint { - // asymmetric - // pub(crate) fn from_fresh_stream(stream: mio::net::TcpStream) -> Self { - // Self::Network(NetworkEndpoint { stream, inbox: vec![], outbox: vec![] }) - // } - pub(crate) fn from_fresh_stream_and_inbox(stream: mio::net::TcpStream, inbox: Vec) -> Self { - Self::Network(NetworkEndpoint { stream, inbox, outbox: vec![] }) - } - - // symmetric - pub fn new_memory_pair() -> [Self; 2] { - let (s1, r1) = mio_extras::channel::channel::(); - let (s2, r2) = mio_extras::channel::channel::(); - [Self::Memory { s: s1, r: r2 }, Self::Memory { s: s2, r: r1 }] - } - pub fn send(&mut self, msg: Msg) -> Result<(), EndpointErr> { - match self { - Self::Memory { s, .. } => s.send(msg).map_err(|_| EndpointErr::Disconnected), - Self::Network(NetworkEndpoint { stream, outbox, .. }) => { - use crate::runtime::serde::Ser; - outbox.ser(&msg).expect("ser failed"); - loop { - use std::io::Write; - match stream.write(outbox) { - Ok(0) => return Ok(()), - Ok(bytes_written) => { - outbox.drain(0..bytes_written); - } - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - panic!("sending shouldn't WouldBlock") - } - Err(_e) => return Err(EndpointErr::Disconnected), - } - } - } - } - } - pub fn recv(&mut self) -> Result, EndpointErr> { - match self { - Self::Memory { r, .. } => match r.try_recv() { - Ok(msg) => Ok(Some(msg)), - Err(std::sync::mpsc::TryRecvError::Empty) => Ok(None), - Err(std::sync::mpsc::TryRecvError::Disconnected) => Err(EndpointErr::Disconnected), - }, - Self::Network(NetworkEndpoint { stream, inbox, .. }) => { - // populate inbox as much as possible - 'read_loop: loop { - use std::io::Read; - match stream.read_to_end(inbox) { - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop, - Ok(0) => break 'read_loop, - Ok(_) => (), - Err(_e) => return Err(EndpointErr::Disconnected), - } - } - use crate::runtime::serde::{De, MonitoredReader}; - let mut monitored = MonitoredReader::from(&inbox[..]); - match De::::de(&mut monitored) { - Ok(msg) => { - let msg_size2 = monitored.bytes_read(); - inbox.drain(0..(msg_size2.try_into().unwrap())); - Ok(Some(msg)) - } - Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(None), - Err(_) => Err(EndpointErr::MetaProtocolDeviation), - } - } - } - } -} - -impl Evented for Endpoint { - fn register( - &self, - poll: &Poll, - token: Token, - interest: Ready, - opts: PollOpt, - ) -> Result<(), std::io::Error> { - match self { - Self::Memory { r, .. } => r.register(poll, token, interest, opts), - Self::Network(n) => n.register(poll, token, interest, opts), - } - } - - fn reregister( - &self, - poll: &Poll, - token: Token, - interest: Ready, - opts: PollOpt, - ) -> Result<(), std::io::Error> { - match self { - Self::Memory { r, .. } => r.reregister(poll, token, interest, opts), - Self::Network(n) => n.reregister(poll, token, interest, opts), - } - } - - fn deregister(&self, poll: &Poll) -> Result<(), std::io::Error> { - match self { - Self::Memory { r, .. } => r.deregister(poll), - Self::Network(n) => n.deregister(poll), - } - } -} - -impl Evented for NetworkEndpoint { - fn register( - &self, - poll: &Poll, - token: Token, - interest: Ready, - opts: PollOpt, - ) -> Result<(), std::io::Error> { - self.stream.register(poll, token, interest, opts) - } - - fn reregister( - &self, - poll: &Poll, - token: Token, - interest: Ready, - opts: PollOpt, - ) -> Result<(), std::io::Error> { - self.stream.reregister(poll, token, interest, opts) - } - - fn deregister(&self, poll: &Poll) -> Result<(), std::io::Error> { - self.stream.deregister(poll) - } -} diff --git a/src/runtime/retired/errors.rs b/src/runtime/retired/errors.rs deleted file mode 100644 index 13e0b13fcdb65e7fc63feecce192c378817c2714..0000000000000000000000000000000000000000 --- a/src/runtime/retired/errors.rs +++ /dev/null @@ -1,94 +0,0 @@ -use crate::common::*; - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum PortBindErr { - AlreadyConnected, - IndexOutOfBounds, - NotConfigured, - ParseErr, - AlreadyConfigured, -} -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum ReadGottenErr { - NotConnected, - IndexOutOfBounds, - WrongPolarity, - NoPreviousRound, - DidNotGet, -} -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum PortOpErr { - IndexOutOfBounds, - NotConnected, - WrongPolarity, - DuplicateOperation, -} -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ConfigErr { - AlreadyConnected, - ParseErr(String), - AlreadyConfigured, - NoSuchComponent, - NonPortTypeParameters, -} -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum ConnectErr { - PortNotBound { native_index: usize }, - NotConfigured, - AlreadyConnected, - MetaProtocolDeviation, - Disconnected, - PollInitFailed, - MessengerRecvErr(MessengerRecvErr), - Timeout, - PollingFailed, - PolarityMatched(SocketAddr), - AcceptFailed(SocketAddr), - PassiveConnectFailed(SocketAddr), - BindFailed(SocketAddr), -} -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum PollDeadlineErr { - PollingFailed, - Timeout, -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum EndpointErr { - Disconnected, - MetaProtocolDeviation, -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum SyncErr { - NotConnected, - MessengerRecvErr(MessengerRecvErr), - Inconsistent, - Timeout, - ElaborateFromNonChild, - AnnounceFromNonParent, - PayloadPremiseExcludesTheChannel(ChannelId), - UnexpectedSetupMsg, - EndpointErr(EndpointErr), - EvalErr(EvalErr), -} -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum EvalErr { - ComponentExitWhileBranching, -} -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum MessengerRecvErr { - PollingFailed, - EndpointErr(PortId, EndpointErr), -} -impl From for ConfigErr { - fn from(e: MainComponentErr) -> Self { - use ConfigErr as C; - use MainComponentErr as M; - match e { - M::NoSuchComponent => C::NoSuchComponent, - M::NonPortTypeParameters => C::NonPortTypeParameters, - _ => todo!(), - } - } -} diff --git a/src/runtime/retired/experimental/api.rs b/src/runtime/retired/experimental/api.rs deleted file mode 100644 index d7c14448e78077fececb9450df8805217fc68e23..0000000000000000000000000000000000000000 --- a/src/runtime/retired/experimental/api.rs +++ /dev/null @@ -1,821 +0,0 @@ -use super::bits::{usizes_for_bits, BitChunkIter, BitMatrix, Pair, TRUE_CHUNK}; -use super::vec_storage::VecStorage; -use crate::common::*; -use crate::runtime::endpoint::EndpointExt; -use crate::runtime::endpoint::EndpointInfo; -use crate::runtime::endpoint::{Endpoint, Msg, SetupMsg}; -use crate::runtime::errors::EndpointErr; -use crate::runtime::errors::MessengerRecvErr; -use crate::runtime::errors::PollDeadlineErr; -use crate::runtime::MessengerState; -use crate::runtime::Messengerlike; -use crate::runtime::ReceivedMsg; -use crate::runtime::{ProtocolD, ProtocolS}; - -use std::net::SocketAddr; -use std::sync::Arc; - -pub enum Coupling { - Active, - Passive, -} - -#[derive(Debug)] -struct Family { - parent: Option, - children: HashSet, -} - -pub struct Binding { - pub coupling: Coupling, - pub polarity: Polarity, - pub addr: SocketAddr, -} - -pub struct InPort(Port); // InPort and OutPort are AFFINE (exposed to Rust API) -pub struct OutPort(Port); -impl From for Port { - fn from(x: InPort) -> Self { - x.0 - } -} -impl From for Port { - fn from(x: OutPort) -> Self { - x.0 - } -} - -#[derive(Default, Debug)] -struct ChannelIndexStream { - next: u32, -} -impl ChannelIndexStream { - fn next(&mut self) -> u32 { - self.next += 1; - self.next - 1 - } -} - -enum Connector { - Connecting(Connecting), - Connected(Connected), -} - -#[derive(Default)] -pub struct Connecting { - bindings: Vec, -} -trait Binds { - fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> T; -} -impl Binds for Connecting { - fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> InPort { - self.bindings.push(Binding { coupling, polarity: Polarity::Getter, addr }); - InPort(Port(self.bindings.len() - 1)) - } -} -impl Binds for Connecting { - fn bind(&mut self, coupling: Coupling, addr: SocketAddr) -> OutPort { - self.bindings.push(Binding { coupling, polarity: Polarity::Putter, addr }); - OutPort(Port(self.bindings.len() - 1)) - } -} - -#[derive(Debug, Clone)] -pub enum ConnectErr { - BindErr(SocketAddr), - NewSocketErr(SocketAddr), - AcceptErr(SocketAddr), - ConnectionShutdown(SocketAddr), - PortKindMismatch(Port, SocketAddr), - EndpointErr(Port, EndpointErr), - PollInitFailed, - PollingFailed, - Timeout, -} - -#[derive(Debug)] -struct Component { - protocol: Arc, - port_set: HashSet, - identifier: Arc<[u8]>, - state: Option, // invariant between rounds: Some() -} - -impl From for ConnectErr { - fn from(e: PollDeadlineErr) -> Self { - use PollDeadlineErr as P; - match e { - P::PollingFailed => Self::PollingFailed, - P::Timeout => Self::Timeout, - } - } -} -impl From for ConnectErr { - fn from(e: MessengerRecvErr) -> Self { - use MessengerRecvErr as M; - match e { - M::PollingFailed => Self::PollingFailed, - M::EndpointErr(port, err) => Self::EndpointErr(port, err), - } - } -} -impl Connecting { - fn random_controller_id() -> ControllerId { - type Bytes8 = [u8; std::mem::size_of::()]; - let mut bytes = Bytes8::default(); - getrandom::getrandom(&mut bytes).unwrap(); - unsafe { - // safe: - // 1. All random bytes give valid Bytes8 - // 2. Bytes8 and ControllerId have same valid representations - std::mem::transmute::(bytes) - } - } - fn test_stream_connectivity(stream: &mut TcpStream) -> bool { - use std::io::Write; - stream.write(&[]).is_ok() - } - fn new_connected( - &self, - controller_id: ControllerId, - timeout: Option, - ) -> Result { - use ConnectErr::*; - - /////////////////////////////////////////////////////// - // 1. bindings correspond with ports 0..bindings.len(). For each: - // - reserve a slot in endpoint_exts. - // - store the port in `native_ports' set. - let mut endpoint_exts = VecStorage::::with_reserved_range(self.bindings.len()); - let native_ports = (0..self.bindings.len()).map(Port).collect(); - - // 2. create MessengerState structure for polling channels - let edge = PollOpt::edge(); - let [ready_r, ready_w] = [Ready::readable(), Ready::writable()]; - let mut ms = - MessengerState::with_event_capacity(self.bindings.len()).map_err(|_| PollInitFailed)?; - - // 3. create one TODO task per (port,binding) as a vector with indices in lockstep. - // we will drain it gradually so we store elements of type Option where all are initially Some(_) - enum Todo { - PassiveAccepting { listener: TcpListener, channel_id: ChannelId }, - ActiveConnecting { stream: TcpStream }, - PassiveConnecting { stream: TcpStream, channel_id: ChannelId }, - ActiveRecving { endpoint: Endpoint }, - } - let mut channel_index_stream = ChannelIndexStream::default(); - let mut todos = self - .bindings - .iter() - .enumerate() - .map(|(index, binding)| { - Ok(Some(match binding.coupling { - Coupling::Passive => { - let channel_index = channel_index_stream.next(); - let channel_id = ChannelId { controller_id, channel_index }; - let listener = - TcpListener::bind(&binding.addr).map_err(|_| BindErr(binding.addr))?; - ms.poll.register(&listener, Token(index), ready_r, edge).unwrap(); // registration unique - Todo::PassiveAccepting { listener, channel_id } - } - Coupling::Active => { - let stream = TcpStream::connect(&binding.addr) - .map_err(|_| NewSocketErr(binding.addr))?; - ms.poll.register(&stream, Token(index), ready_w, edge).unwrap(); // registration unique - Todo::ActiveConnecting { stream } - } - })) - }) - .collect::>, ConnectErr>>()?; - let mut num_todos_remaining = todos.len(); - - // 4. handle incoming events until all TODOs are completed OR we timeout - let deadline = timeout.map(|t| Instant::now() + t); - let mut polled_undrained_later = IndexSet::<_>::default(); - let mut backoff_millis = 10; - while num_todos_remaining > 0 { - ms.poll_events_until(deadline)?; - for event in ms.events.iter() { - let token = event.token(); - let index = token.0; - let binding = &self.bindings[index]; - match todos[index].take() { - None => { - polled_undrained_later.insert(index); - } - Some(Todo::PassiveAccepting { listener, channel_id }) => { - let (stream, _peer_addr) = - listener.accept().map_err(|_| AcceptErr(binding.addr))?; - ms.poll.deregister(&listener).expect("wer"); - ms.poll.register(&stream, token, ready_w, edge).expect("3y5"); - todos[index] = Some(Todo::PassiveConnecting { stream, channel_id }); - } - Some(Todo::ActiveConnecting { mut stream }) => { - let todo = if Self::test_stream_connectivity(&mut stream) { - ms.poll.reregister(&stream, token, ready_r, edge).expect("52"); - let endpoint = Endpoint::from_fresh_stream(stream); - Todo::ActiveRecving { endpoint } - } else { - ms.poll.deregister(&stream).expect("wt"); - std::thread::sleep(Duration::from_millis(backoff_millis)); - backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3; - let stream = TcpStream::connect(&binding.addr).unwrap(); - ms.poll.register(&stream, token, ready_w, edge).expect("PAC 3"); - Todo::ActiveConnecting { stream } - }; - todos[index] = Some(todo); - } - Some(Todo::PassiveConnecting { mut stream, channel_id }) => { - if !Self::test_stream_connectivity(&mut stream) { - return Err(ConnectionShutdown(binding.addr)); - } - ms.poll.reregister(&stream, token, ready_r, edge).expect("55"); - let polarity = binding.polarity; - let info = EndpointInfo { polarity, channel_id }; - let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info }); - let mut endpoint = Endpoint::from_fresh_stream(stream); - endpoint.send(msg).map_err(|e| EndpointErr(Port(index), e))?; - let endpoint_ext = EndpointExt { endpoint, info }; - endpoint_exts.occupy_reserved(index, endpoint_ext); - num_todos_remaining -= 1; - } - Some(Todo::ActiveRecving { mut endpoint }) => { - let ekey = Port(index); - 'recv_loop: while let Some(msg) = - endpoint.recv().map_err(|e| EndpointErr(ekey, e))? - { - if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg { - if info.polarity == binding.polarity { - return Err(PortKindMismatch(ekey, binding.addr)); - } - let channel_id = info.channel_id; - let info = EndpointInfo { polarity: binding.polarity, channel_id }; - ms.polled_undrained.insert(ekey); - let endpoint_ext = EndpointExt { endpoint, info }; - endpoint_exts.occupy_reserved(index, endpoint_ext); - num_todos_remaining -= 1; - break 'recv_loop; - } else { - ms.delayed.push(ReceivedMsg { recipient: ekey, msg }); - } - } - } - } - } - } - assert_eq!(None, endpoint_exts.iter_reserved().next()); - drop(todos); - - /////////////////////////////////////////////////////// - // 1. construct `family', i.e. perform the sink tree setup procedure - use {Msg::SetupMsg as S, SetupMsg::*}; - let mut messenger = (&mut ms, &mut endpoint_exts); - impl Messengerlike for (&mut MessengerState, &mut VecStorage) { - fn get_state_mut(&mut self) -> &mut MessengerState { - self.0 - } - fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint { - &mut self - .1 - .get_occupied_mut(ekey.to_raw() as usize) - .expect("OUT OF BOUNDS") - .endpoint - } - } - - // 1. broadcast my ID as the first echo. await reply from all in net_keylist - let neighbors = (0..self.bindings.len()).map(Port); - let echo = S(LeaderEcho { maybe_leader: controller_id }); - let mut awaiting = IndexSet::::with_capacity(neighbors.len()); - for n in neighbors.clone() { - messenger.send(n, echo.clone()).map_err(|e| EndpointErr(n, e))?; - awaiting.insert(n); - } - - // 2. Receive incoming replies. whenever a higher-id echo arrives, - // adopt it as leader, sender as parent, and reset the await set. - let mut parent: Option = None; - let mut my_leader = controller_id; - messenger.undelay_all(); - 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let ReceivedMsg { recipient, msg } = messenger.recv_until(deadline)?.ok_or(Timeout)?; - match msg { - S(LeaderAnnounce { leader }) => { - // someone else completed the echo and became leader first! - // the sender is my parent - parent = Some(recipient); - my_leader = leader; - awaiting.clear(); - break 'echo_loop; - } - S(LeaderEcho { maybe_leader }) => { - use Ordering::*; - match maybe_leader.cmp(&my_leader) { - Less => { /* ignore */ } - Equal => { - awaiting.remove(&recipient); - if awaiting.is_empty() { - if let Some(p) = parent { - // return the echo to my parent - messenger - .send(p, S(LeaderEcho { maybe_leader })) - .map_err(|e| EndpointErr(p, e))?; - } else { - // DECIDE! - break 'echo_loop; - } - } - } - Greater => { - // join new echo - parent = Some(recipient); - my_leader = maybe_leader; - let echo = S(LeaderEcho { maybe_leader: my_leader }); - awaiting.clear(); - if neighbors.len() == 1 { - // immediately reply to parent - messenger - .send(recipient, echo.clone()) - .map_err(|e| EndpointErr(recipient, e))?; - } else { - for n in neighbors.clone() { - if n != recipient { - messenger - .send(n, echo.clone()) - .map_err(|e| EndpointErr(n, e))?; - awaiting.insert(n); - } - } - } - } - } - } - msg => messenger.delay(ReceivedMsg { recipient, msg }), - } - } - match parent { - None => assert_eq!( - my_leader, controller_id, - "I've got no parent, but I consider {:?} the leader?", - my_leader - ), - Some(parent) => assert_ne!( - my_leader, controller_id, - "I have {:?} as parent, but I consider myself ({:?}) the leader?", - parent, controller_id - ), - } - - // 3. broadcast leader announcement (except to parent: confirm they are your parent) - // in this loop, every node sends 1 message to each neighbor - let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); - for n in neighbors.clone() { - let msg = - if Some(n) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() }; - messenger.send(n, msg).map_err(|e| EndpointErr(n, e))?; - } - - // await 1 message from all non-parents - for n in neighbors.clone() { - if Some(n) != parent { - awaiting.insert(n); - } - } - let mut children = HashSet::default(); - messenger.undelay_all(); - while !awaiting.is_empty() { - let ReceivedMsg { recipient, msg } = messenger.recv_until(deadline)?.ok_or(Timeout)?; - let recipient = recipient; - match msg { - S(YouAreMyParent) => { - assert!(awaiting.remove(&recipient)); - children.insert(recipient); - } - S(SetupMsg::LeaderAnnounce { leader }) => { - assert!(awaiting.remove(&recipient)); - assert!(leader == my_leader); - assert!(Some(recipient) != parent); - // they wouldn't send me this if they considered me their parent - } - _ => messenger.delay(ReceivedMsg { recipient, msg }), - } - } - let family = Family { parent, children }; - - // done! - Ok(Connected { - components: Default::default(), - controller_id, - channel_index_stream, - endpoint_exts, - native_ports, - family, - ephemeral: Default::default(), - }) - } - ///////// - pub fn connect_using_id( - &mut self, - controller_id: ControllerId, - timeout: Option, - ) -> Result { - // 1. try and create a connection from these bindings with self immutable. - let connected = self.new_connected(controller_id, timeout)?; - // 2. success! drain self and return - self.bindings.clear(); - Ok(connected) - } - pub fn connect(&mut self, timeout: Option) -> Result { - self.connect_using_id(Self::random_controller_id(), timeout) - } -} - -#[derive(Debug)] -pub struct Connected { - native_ports: HashSet, - controller_id: ControllerId, - channel_index_stream: ChannelIndexStream, - endpoint_exts: VecStorage, - components: VecStorage, - family: Family, - ephemeral: Ephemeral, -} -#[derive(Debug, Default)] -struct Ephemeral { - // invariant: between rounds this is cleared - machines: Vec, - bit_matrix: BitMatrix, - assignment_to_bit_property: HashMap<(ChannelId, bool), usize>, - usize_buf: Vec, -} -impl Ephemeral { - fn clear(&mut self) { - self.bit_matrix = Default::default(); - self.usize_buf.clear(); - self.machines.clear(); - self.assignment_to_bit_property.clear(); - } -} -#[derive(Debug)] -struct Machine { - component_index: usize, - state: ProtocolS, -} -struct MonoCtx<'a> { - another_pass: &'a mut bool, -} -impl MonoContext for MonoCtx<'_> { - type D = ProtocolD; - type S = ProtocolS; - - fn new_component(&mut self, moved_keys: HashSet, init_state: Self::S) { - todo!() - } - fn new_channel(&mut self) -> [Key; 2] { - todo!() - } - fn new_random(&mut self) -> u64 { - todo!() - } -} -impl Connected { - pub fn new_component( - &mut self, - protocol: &Arc, - identifier: &Arc<[u8]>, - moved_port_list: &[Port], - ) -> Result<(), MainComponentErr> { - ////////////////////////////////////////// - // 1. try and create a new component (without mutating self) - use MainComponentErr::*; - let moved_port_set = { - let mut set: HashSet = Default::default(); - for &port in moved_port_list.iter() { - if !self.native_ports.contains(&port) { - return Err(CannotMovePort(port)); - } - if !set.insert(port) { - return Err(DuplicateMovedPort(port)); - } - } - set - }; - // moved_port_set is disjoint to native_ports - let expected_polarities = protocol.component_polarities(identifier)?; - if moved_port_list.len() != expected_polarities.len() { - return Err(WrongNumberOfParamaters { expected: expected_polarities.len() }); - } - // correct polarity list - for (param_index, (&port, &expected_polarity)) in - moved_port_list.iter().zip(expected_polarities.iter()).enumerate() - { - let polarity = - self.endpoint_exts.get_occupied(port.0).ok_or(UnknownPort(port))?.info.polarity; - if polarity != expected_polarity { - return Err(WrongPortPolarity { param_index, port }); - } - } - let state = Some(protocol.new_main_component(identifier, &moved_port_list)); - let component = Component { - port_set: moved_port_set, - protocol: protocol.clone(), - identifier: identifier.clone(), - state, - }; - ////////////////////////////// - // success! mutate self and return Ok - self.native_ports.retain(|e| !component.port_set.contains(e)); - self.components.new_occupied(component); - Ok(()) - } - pub fn new_channel(&mut self) -> (OutPort, InPort) { - assert!(self.endpoint_exts.len() <= std::u32::MAX as usize - 2); - let channel_id = ChannelId { - controller_id: self.controller_id, - channel_index: self.channel_index_stream.next(), - }; - let [e0, e1] = Endpoint::new_memory_pair(); - let kp = self.endpoint_exts.new_occupied(EndpointExt { - info: EndpointInfo { channel_id, polarity: Putter }, - endpoint: e0, - }); - let kg = self.endpoint_exts.new_occupied(EndpointExt { - info: EndpointInfo { channel_id, polarity: Getter }, - endpoint: e1, - }); - (OutPort(Port(kp)), InPort(Port(kg))) - } - pub fn sync_set(&mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs]) -> Result<(), ()> { - // For every component, take its state and make a singleton machine - for (component_index, component) in self.components.iter_mut().enumerate() { - let state = component.state.take().unwrap(); - let machine = Machine { component_index, state }; - self.ephemeral.machines.push(machine); - } - - // Grow property matrix. has |machines| entities and {to_run => 0, to_remove => 1} properties - const PROP_TO_RUN: usize = 0; - const PROP_TO_REMOVE: usize = 1; - self.ephemeral - .bit_matrix - .grow_to(Pair { property: 2, entity: self.ephemeral.machines.len() as u32 }); - // Set to_run property for all existing machines - self.ephemeral.bit_matrix.batch_mut(move |p| p[PROP_TO_RUN] = TRUE_CHUNK); - - ///////////// - // perform mono runs, adding and removing TO_RUN property bits bits, and adding PROP_TO_REMOVE property bits - let mut usize_buf = vec![]; - let mut another_pass = true; - while another_pass { - another_pass = false; - let machine_index_iter = self - .ephemeral - .bit_matrix - .iter_entities_where(&mut usize_buf, move |p| p[PROP_TO_RUN]); - for machine_index in machine_index_iter { - let machine = &mut self.ephemeral.machines[machine_index as usize]; - let component = self.components.get_occupied(machine.component_index).unwrap(); - let mut ctx = MonoCtx { another_pass: &mut another_pass }; - // TODO ctx doesn't work. it may callback to create new machines (setting their TO_RUN and another_pass=true) - match machine.state.pre_sync_run(&mut ctx, &component.protocol) { - MonoBlocker::Inconsistent => todo!(), // make entire state inconsistent! - MonoBlocker::ComponentExit => self - .ephemeral - .bit_matrix - .set(Pair { entity: machine_index, property: PROP_TO_REMOVE as u32 }), - MonoBlocker::SyncBlockStart => self - .ephemeral - .bit_matrix - .unset(Pair { entity: machine_index, property: PROP_TO_RUN as u32 }), - } - } - } - // no machines have property TO_RUN - - // from back to front, swap_remove all machines with PROP_TO_REMOVE - let machine_index_iter = self - .ephemeral - .bit_matrix - .iter_entities_where_rev(&mut usize_buf, move |p| p[PROP_TO_REMOVE]); - for machine_index in machine_index_iter { - let machine = self.ephemeral.machines.swap_remove(machine_index as usize); - drop(machine); - } - - // replace old matrix full of bogus data with a new (fresh) one for the set of machines - // henceforth, machines(entities) and properties won't shrink or move. - self.ephemeral.bit_matrix = - BitMatrix::new(Pair { entity: self.ephemeral.machines.len() as u32 * 2, property: 8 }); - - // !!! TODO poly run until solution is found - - //////////////////// - let solution_assignments: Vec<(ChannelId, bool)> = vec![]; - // solution has been found. time to find a - - // logically destructure self so we can read and write to different fields interleaved... - let Self { - components, - ephemeral: Ephemeral { bit_matrix, assignment_to_bit_property, usize_buf, machines }, - .. - } = self; - - // !!!!!!! TODO MORE HERE - - let machine_index_iter = bit_matrix.iter_entities_where(usize_buf, move |p| { - solution_assignments.iter().fold(TRUE_CHUNK, |chunk, assignment| { - let &bit_property = assignment_to_bit_property.get(assignment).unwrap(); - chunk & p[bit_property] - }) - }); - for machine_index in machine_index_iter { - let machine = &machines[machine_index as usize]; - let component = &mut components.get_occupied_mut(machine.component_index).unwrap(); - let was = component.state.replace(machine.state.clone()); - assert!(was.is_none()); // 2+ machines matched the solution for this component! - println!("visiting machine at index {:?}", machine_index); - } - for component in self.components.iter() { - assert!(component.state.is_some()); // 0 machines matched the solution for this component! - } - self.ephemeral.clear(); - println!("B {:#?}", self); - Ok(()) - } - pub fn sync_subsets( - &mut self, - _inbuf: &mut [u8], - _ops: &mut [PortOpRs], - bit_subsets: &[&[usize]], - ) -> Result { - for (batch_index, bit_subset) in bit_subsets.iter().enumerate() { - println!("batch_index {:?}", batch_index); - let chunk_iter = bit_subset.iter().copied(); - for index in BitChunkIter::new(chunk_iter) { - println!(" index {:?}", index); - } - } - Ok(0) - } -} - -macro_rules! bitslice { - ($( $num:expr ),*) => {{ - &[0 $( | (1usize << $num) )*] - }}; -} - -#[test] -fn api_new_test() { - let mut c = Connecting::default(); - let net_out: OutPort = c.bind(Coupling::Active, "127.0.0.1:8000".parse().unwrap()); - let net_in: InPort = c.bind(Coupling::Active, "127.0.0.1:8001".parse().unwrap()); - let proto_0 = Arc::new(ProtocolD::parse(b"").unwrap()); - let mut c = c.connect(None).unwrap(); - let (mem_out, mem_in) = c.new_channel(); - let mut inbuf = [0u8; 64]; - let identifier: Arc<[u8]> = b"sync".to_vec().into(); - c.new_component(&proto_0, &identifier, &[net_in.into(), mem_out.into()]).unwrap(); - let mut ops = [ - PortOpRs::In { msg_range: None, port: &mem_in }, - PortOpRs::Out { msg: b"hey", port: &net_out, optional: false }, - PortOpRs::Out { msg: b"hi?", port: &net_out, optional: true }, - PortOpRs::Out { msg: b"yo!", port: &net_out, optional: false }, - ]; - c.sync_set(&mut inbuf, &mut ops).unwrap(); - c.sync_subsets(&mut inbuf, &mut ops, &[bitslice! {0,1,2}]).unwrap(); -} - -#[repr(C)] -pub struct PortOp { - msgptr: *mut u8, // read if OUT, field written if IN, will point into buf - msglen: usize, // read if OUT, written if IN, won't exceed buf - port: Port, - optional: bool, // no meaning if -} - -pub enum PortOpRs<'a> { - In { msg_range: Option>, port: &'a InPort }, - Out { msg: &'a [u8], port: &'a OutPort, optional: bool }, -} - -unsafe fn c_sync_set( - connected: &mut Connected, - inbuflen: usize, - inbufptr: *mut u8, - opslen: usize, - opsptr: *mut PortOp, -) -> i32 { - let buf = as_mut_slice(inbuflen, inbufptr); - let ops = as_mut_slice(opslen, opsptr); - let (subset_index, wrote) = sync_inner(connected, buf); - assert_eq!(0, subset_index); - for op in ops { - if let Some(range) = wrote.get(&op.port) { - op.msgptr = inbufptr.add(range.start); - op.msglen = range.end - range.start; - } - } - 0 -} - -unsafe fn c_sync_subset( - connected: &mut Connected, - inbuflen: usize, - inbufptr: *mut u8, - opslen: usize, - opsptr: *mut PortOp, - subsetslen: usize, - subsetsptr: *const *const usize, -) -> i32 { - let buf: &mut [u8] = as_mut_slice(inbuflen, inbufptr); - let ops: &mut [PortOp] = as_mut_slice(opslen, opsptr); - let subsets: &[*const usize] = as_const_slice(subsetslen, subsetsptr); - let subsetlen = usizes_for_bits(opslen); - // don't yet know subsetptr; which subset fires unknown! - - let (subset_index, wrote) = sync_inner(connected, buf); - let subsetptr: *const usize = subsets[subset_index]; - let subset: &[usize] = as_const_slice(subsetlen, subsetptr); - - for index in BitChunkIter::new(subset.iter().copied()) { - let op = &mut ops[index as usize]; - if let Some(range) = wrote.get(&op.port) { - op.msgptr = inbufptr.add(range.start); - op.msglen = range.end - range.start; - } - } - subset_index as i32 -} - -// dummy fn for the actual synchronous round -fn sync_inner<'c, 'b>( - _connected: &'c mut Connected, - _buf: &'b mut [u8], -) -> (usize, &'b HashMap>) { - todo!() -} - -unsafe fn as_mut_slice<'a, T>(len: usize, ptr: *mut T) -> &'a mut [T] { - std::slice::from_raw_parts_mut(ptr, len) -} -unsafe fn as_const_slice<'a, T>(len: usize, ptr: *const T) -> &'a [T] { - std::slice::from_raw_parts(ptr, len) -} - -#[test] -fn api_connecting() { - let addrs: [SocketAddr; 3] = [ - "127.0.0.1:8888".parse().unwrap(), - "127.0.0.1:8889".parse().unwrap(), - "127.0.0.1:8890".parse().unwrap(), - ]; - - lazy_static::lazy_static! { - static ref PROTOCOL: Arc = { - static PDL: &[u8] = b" - primitive sync(in i, out o) { - while(true) synchronous { - put(o, get(i)); - } - } - "; - Arc::new(ProtocolD::parse(PDL).unwrap()) - }; - } - - const TIMEOUT: Option = Some(Duration::from_secs(1)); - let handles = vec![ - std::thread::spawn(move || { - let mut c = Connecting::default(); - let p_in: InPort = c.bind(Coupling::Passive, addrs[0]); - let p_out: OutPort = c.bind(Coupling::Active, addrs[1]); - let mut c = c.connect(TIMEOUT).unwrap(); - println!("c {:#?}", &c); - - let identifier = b"sync".to_vec().into(); - c.new_component(&PROTOCOL, &identifier, &[p_in.into(), p_out.into()]).unwrap(); - println!("c {:#?}", &c); - - let mut inbuf = vec![]; - let mut port_ops = []; - c.sync_set(&mut inbuf, &mut port_ops).unwrap(); - }), - std::thread::spawn(move || { - let mut connecting = Connecting::default(); - let _a: OutPort = connecting.bind(Coupling::Active, addrs[0]); - let _b: InPort = connecting.bind(Coupling::Passive, addrs[1]); - let _c: InPort = connecting.bind(Coupling::Active, addrs[2]); - let _connected = connecting.connect(TIMEOUT).unwrap(); - }), - std::thread::spawn(move || { - let mut connecting = Connecting::default(); - let _a: OutPort = connecting.bind(Coupling::Passive, addrs[2]); - let _connected = connecting.connect(TIMEOUT).unwrap(); - }), - ]; - for h in handles { - h.join().unwrap(); - } -} diff --git a/src/runtime/retired/experimental/bits.rs b/src/runtime/retired/experimental/bits.rs deleted file mode 100644 index 03166bd28f37e82ebc65c230be5a794f6ae4143e..0000000000000000000000000000000000000000 --- a/src/runtime/retired/experimental/bits.rs +++ /dev/null @@ -1,457 +0,0 @@ -use crate::common::*; -use std::alloc::Layout; - -/// Given an iterator over BitChunk Items, iterates over the indices (each represented as a u32) for which the bit is SET, -/// treating the bits in the BitChunk as a contiguous array. -/// e.g. input [0b111000, 0b11] gives output [3, 4, 5, 32, 33]. -/// observe that the bits per chunk are ordered from least to most significant bits, yielding smaller to larger usizes. -/// assumes chunk_iter will yield no more than std::u32::MAX / 32 chunks - -pub const fn usize_bytes() -> usize { - std::mem::size_of::() -} -pub const fn usize_bits() -> usize { - usize_bytes() * 8 -} -pub const fn usizes_for_bits(bits: usize) -> usize { - (bits + (usize_bits() - 1)) / usize_bits() -} - -type Chunk = usize; -type BitIndex = usize; - -pub(crate) struct BitChunkIter> { - cached: usize, - chunk_iter: I, - next_bit_index: BitIndex, -} -impl> BitChunkIter { - pub fn new(chunk_iter: I) -> Self { - // first chunk is always a dummy zero, as if chunk_iter yielded Some(FALSE_CHUNK). - // Consequences: - // 1. our next_bit_index is always off by usize_bits() (we correct for it in Self::next) (no additional overhead) - // 2. we cache Chunk and not Option, because chunk_iter.next() is only called in Self::next. - Self { chunk_iter, next_bit_index: 0, cached: 0 } - } -} -impl> Iterator for BitChunkIter { - type Item = BitIndex; - fn next(&mut self) -> Option { - let mut chunk = self.cached; - - // loop until either: - // 1. there are no more Items to return, or - // 2. chunk encodes 1+ Items, one of which we will return. - while chunk == 0 { - // chunk has no bits set! get the next one... - chunk = self.chunk_iter.next()?; - - // ... and jump self.next_bit_index to the next multiple of usize_bits(). - self.next_bit_index = (self.next_bit_index + usize_bits()) & !(usize_bits() - 1); - } - // there exists 1+ set bits in chunk - // assert(chunk > 0); - - // Until the least significant bit of chunk is 1: - // 1. shift chunk to the right, - // 2. and increment self.next_bit_index accordingly - // effectively performs a little binary search, shifting 32, then 16, ... - // TODO perhaps there is a more efficient SIMD op for this? - const N_INIT: BitIndex = usize_bits() / 2; - let mut n = N_INIT; - while n >= 1 { - // n is [32,16,8,4,2,1] on 64-bit machine - // this loop is unrolled with release optimizations - let n_least_significant_mask = (1 << n) - 1; - if chunk & n_least_significant_mask == 0 { - // no 1 set within 0..n least significant bits. - self.next_bit_index += n; - chunk >>= n; - } - n /= 2; - } - // least significant bit of chunk is 1. Item to return is known. - // assert(chunk & 1 == 1) - - // prepare our state for the next time Self::next is called. - // Overwrite self.cached such that its shifted state is retained, - // and jump over the bit whose index we are about to return. - self.next_bit_index += 1; - self.cached = chunk >> 1; - - // returned index is usize_bits() smaller than self.next_bit_index because we use an - // off-by-usize_bits() encoding to avoid having to cache an Option. - Some(self.next_bit_index - 1 - usize_bits()) - } -} - -pub(crate) struct BitChunkIterRev> { - cached: usize, - chunk_iter: I, - next_bit_index: BitIndex, -} -impl> BitChunkIterRev { - pub fn new(chunk_iter: I) -> Self { - let next_bit_index = chunk_iter.len() * usize_bits(); - Self { chunk_iter, next_bit_index, cached: 0 } - } -} -impl> Iterator for BitChunkIterRev { - type Item = BitIndex; - fn next(&mut self) -> Option { - let mut chunk = self.cached; - if chunk == 0 { - self.next_bit_index += usize_bits(); - loop { - self.next_bit_index -= usize_bits(); - chunk = self.chunk_iter.next()?; - if chunk != 0 { - break; - } - } - } - const N_INIT: BitIndex = usize_bits() / 2; - let mut n = N_INIT; - while n >= 1 { - let n_most_significant_mask = !0 << (usize_bits() - n); - if chunk & n_most_significant_mask == 0 { - self.next_bit_index -= n; - chunk <<= n; - } - n /= 2; - } - self.cached = chunk << 1; - self.next_bit_index -= 1; - Some(self.next_bit_index) - } -} - -/* --properties--> - ___ ___ ___ ___ - |___|___|___|___| - | |___|___|___|___| - | |___|___|___|___| - | |___|___|___|___| - | - V - entity chunks (groups of size usize_bits()) -*/ - -// TODO newtypes Entity and Property - -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub struct Pair { - pub entity: u32, - pub property: u32, -} -impl From<[u32; 2]> for Pair { - fn from([entity, property]: [u32; 2]) -> Self { - Pair { entity, property } - } -} -impl Default for BitMatrix { - fn default() -> Self { - Self::new(Pair { entity: 0, property: 0 }) - } -} -pub struct BitMatrix { - buffer: *mut usize, - bounds: Pair, - layout: Layout, // layout of the currently-allocated buffer -} -impl Drop for BitMatrix { - fn drop(&mut self) { - unsafe { - // ? - std::alloc::dealloc(self.buffer as *mut u8, self.layout); - } - } -} -impl Debug for BitMatrix { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - struct FmtRow<'a> { - me: &'a BitMatrix, - property: usize, - }; - impl Debug for FmtRow<'_> { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let row_chunks = BitMatrix::row_chunks(self.me.bounds.property as usize); - let column_chunks = BitMatrix::column_chunks(self.me.bounds.entity as usize); - write!(f, "|")?; - for entity_chunk in 0..column_chunks { - let mut chunk = - unsafe { *self.me.buffer.add(row_chunks * entity_chunk + self.property) }; - let end = if entity_chunk + 1 == column_chunks { - self.me.bounds.entity % usize_bits() as u32 - } else { - usize_bits() as u32 - }; - for _ in 0..end { - let c = match chunk & 1 { - 0 => '0', - _ => '1', - }; - write!(f, "{}", c)?; - chunk >>= 1; - } - write!(f, "_")?; - } - Ok(()) - } - } - let row_chunks = BitMatrix::row_chunks(self.bounds.property as usize); - let iter = (0..row_chunks).map(move |property| FmtRow { me: self, property }); - f.debug_list().entries(iter).finish() - } -} -impl BitMatrix { - #[inline] - const fn row_of(entity: usize) -> usize { - entity / usize_bits() - } - #[inline] - const fn row_chunks(property_bound: usize) -> usize { - property_bound - } - #[inline] - const fn column_chunks(entity_bound: usize) -> usize { - usizes_for_bits(entity_bound) - } - #[inline] - fn offsets_unchecked(&self, at: Pair) -> [usize; 2] { - let o_in = at.entity as usize % usize_bits(); - let row = Self::row_of(at.entity as usize); - let row_chunks = self.bounds.property as usize; - let o_of = row * row_chunks + at.property as usize; - [o_of, o_in] - } - // returns a u32 which has bits 000...000111...111 - // for the last JAGGED chunk given the column size - // if the last chunk is not jagged (when entity_bound % 32 == 0) - // None is returned, - // otherwise Some(x) is returned such that x & chunk would mask out - // the bits NOT in 0..entity_bound - fn last_row_chunk_mask(entity_bound: u32) -> Option { - let zero_prefix_len = entity_bound as usize % usize_bits(); - if zero_prefix_len == 0 { - None - } else { - Some(!0 >> (usize_bits() - zero_prefix_len)) - } - } - fn assert_within_bounds(&self, at: Pair) { - assert!(at.entity < self.bounds.entity); - assert!(at.property < self.bounds.property); - } - - fn layout_for(total_chunks: usize) -> std::alloc::Layout { - unsafe { - // this layout is ALWAYS valid: - // 1. size is always nonzero - // 2. size is always a multiple of 4 and 4-aligned - Layout::from_size_align_unchecked(usize_bytes() * total_chunks.max(1), usize_bytes()) - } - } - ///////// - pub fn get_bounds(&self) -> &Pair { - &self.bounds - } - pub fn grow_to(&mut self, bounds: Pair) { - assert!(bounds.entity >= self.bounds.entity); - assert!(bounds.property >= self.bounds.property); - - let old_row_chunks = Self::row_chunks(self.bounds.property as usize); - let old_col_chunks = Self::column_chunks(self.bounds.entity as usize); - let new_row_chunks = Self::row_chunks(bounds.property as usize); - let new_col_chunks = Self::column_chunks(bounds.entity as usize); - - let new_layout = Self::layout_for(new_row_chunks * new_col_chunks); - let new_buffer = unsafe { - let new_buffer = std::alloc::alloc(new_layout) as *mut usize; - let mut src: *mut usize = self.buffer; - let mut dest: *mut usize = new_buffer; - let row_chunk_diff = new_row_chunks - old_row_chunks; - for _col_idx in 0..old_col_chunks { - src.copy_to_nonoverlapping(dest, old_row_chunks); - src = src.add(old_row_chunks); - dest = dest.add(old_row_chunks); - if row_chunk_diff > 0 { - dest.write_bytes(0u8, row_chunk_diff); - dest = dest.add(row_chunk_diff); - } - } - let last_zero_chunks = (new_col_chunks - old_col_chunks) * new_row_chunks; - dest.write_bytes(0u8, last_zero_chunks); - new_buffer - }; - self.layout = new_layout; - self.buffer = new_buffer; - self.bounds = bounds; - } - pub fn clear(&mut self) { - let total_chunks = Self::row_chunks(self.bounds.property as usize) - * Self::column_chunks(self.bounds.entity as usize); - unsafe { - self.buffer.write_bytes(0u8, total_chunks); - } - } - pub fn new(bounds: Pair) -> Self { - let total_chunks = Self::row_chunks(bounds.property as usize) - * Self::column_chunks(bounds.entity as usize); - let layout = Self::layout_for(total_chunks); - let buffer; - unsafe { - buffer = std::alloc::alloc(layout) as *mut usize; - buffer.write_bytes(0u8, total_chunks); - }; - Self { buffer, bounds, layout } - } - pub fn set(&mut self, at: Pair) { - self.assert_within_bounds(at); - let [o_of, o_in] = self.offsets_unchecked(at); - unsafe { *self.buffer.add(o_of) |= 1 << o_in }; - } - pub fn unset(&mut self, at: Pair) { - self.assert_within_bounds(at); - let [o_of, o_in] = self.offsets_unchecked(at); - unsafe { *self.buffer.add(o_of) &= !(1 << o_in) }; - } - pub fn test(&self, at: Pair) -> bool { - self.assert_within_bounds(at); - let [o_of, o_in] = self.offsets_unchecked(at); - unsafe { *self.buffer.add(o_of) & 1 << o_in != 0 } - } - - pub fn batch_mut<'a, 'b>(&mut self, mut chunk_mut_fn: impl FnMut(&'b mut [BitChunk])) { - let row_chunks = Self::row_chunks(self.bounds.property as usize); - let column_chunks = Self::column_chunks(self.bounds.entity as usize); - let mut ptr = self.buffer; - for _row in 0..column_chunks { - let slice; - unsafe { - let slicey = std::slice::from_raw_parts_mut(ptr, row_chunks); - slice = std::mem::transmute(slicey); - ptr = ptr.add(row_chunks); - } - chunk_mut_fn(slice); - } - if let Some(mask) = Self::last_row_chunk_mask(self.bounds.entity) { - // TODO TEST - let mut ptr = unsafe { self.buffer.add((column_chunks - 1) * row_chunks) }; - for _ in 0..row_chunks { - unsafe { - *ptr &= mask; - ptr = ptr.add(1); - } - } - } - } - - /// given: - /// 1. a buffer to work with - /// 2. a _fold function_ for combining the properties of a given entity - /// and returning a new derived property (working ) - pub fn iter_entities_where<'a, 'b>( - &'a self, - buf: &'b mut Vec, - mut fold_fn: impl FnMut(&'b [BitChunk]) -> BitChunk, - ) -> impl Iterator + 'b { - let buf_start = buf.len(); - let row_chunks = Self::row_chunks(self.bounds.property as usize); - let column_chunks = Self::column_chunks(self.bounds.entity as usize); - let mut ptr = self.buffer; - for _row in 0..column_chunks { - let slice; - unsafe { - let slicey = std::slice::from_raw_parts(ptr, row_chunks); - slice = std::mem::transmute(slicey); - ptr = ptr.add(row_chunks); - } - let chunk = fold_fn(slice); - buf.push(chunk.0); - } - if let Some(mask) = Self::last_row_chunk_mask(self.bounds.entity) { - *buf.iter_mut().last().unwrap() &= mask; - } - BitChunkIter::new(buf.drain(buf_start..)).map(|x| x as u32) - } - pub fn iter_entities_where_rev<'a, 'b>( - &'a self, - buf: &'b mut Vec, - mut fold_fn: impl FnMut(&'b [BitChunk]) -> BitChunk, - ) -> impl Iterator + 'b { - let buf_start = buf.len(); - let row_chunks = Self::row_chunks(self.bounds.property as usize); - let column_chunks = Self::column_chunks(self.bounds.entity as usize); - let mut ptr = self.buffer; - for _row in 0..column_chunks { - let slice; - unsafe { - let slicey = std::slice::from_raw_parts(ptr, row_chunks); - slice = std::mem::transmute(slicey); - ptr = ptr.add(row_chunks); - } - let chunk = fold_fn(slice); - buf.push(chunk.0); - } - if let Some(mask) = Self::last_row_chunk_mask(self.bounds.entity) { - *buf.iter_mut().last().unwrap() &= mask; - } - BitChunkIterRev::new(buf.drain(buf_start..).rev()).map(|x| x as u32) - } -} - -use derive_more::*; -#[derive( - Debug, Copy, Clone, BitAnd, Not, BitOr, BitXor, BitAndAssign, BitOrAssign, BitXorAssign, -)] -#[repr(transparent)] -pub struct BitChunk(usize); -impl BitChunk { - const fn any(self) -> bool { - self.0 != FALSE_CHUNK.0 - } - const fn all(self) -> bool { - self.0 == TRUE_CHUNK.0 - } -} -pub const TRUE_CHUNK: BitChunk = BitChunk(!0); -pub const FALSE_CHUNK: BitChunk = BitChunk(0); - -#[test] -fn matrix_test() { - let mut m = BitMatrix::new(Pair { entity: 70, property: 3 }); - m.set([2, 0].into()); - m.set([40, 1].into()); - m.set([40, 2].into()); - m.set([40, 0].into()); - println!("{:#?}", &m); - - m.batch_mut(|p| p[0] = TRUE_CHUNK); - println!("{:#?}", &m); - - for i in (0..40).step_by(7) { - m.unset([i, 0].into()); - } - m.unset([62, 0].into()); - println!("{:#?}", &m); - - m.batch_mut(move |p| p[1] = p[0] ^ TRUE_CHUNK); - println!("{:#?}", &m); - - let mut buf = vec![]; - for index in m.iter_entities_where(&mut buf, move |p| p[1]) { - println!("index {}", index); - } - for index in m.iter_entities_where_rev(&mut buf, move |p| p[1]) { - println!("index {}", index); - } -} - -#[test] -fn bit_chunk_iter_rev() { - let x = &[0b1, 0b1000011, 0, 0, 0b101]; - for i in BitChunkIterRev::new(x.iter().copied()) { - println!("i = {:?}", i); - } -} diff --git a/src/runtime/retired/experimental/ecs.rs b/src/runtime/retired/experimental/ecs.rs deleted file mode 100644 index d4452d82c61141bf4f92cecc669c10ea0fa7afae..0000000000000000000000000000000000000000 --- a/src/runtime/retired/experimental/ecs.rs +++ /dev/null @@ -1,906 +0,0 @@ -use crate::common::*; -use crate::runtime::endpoint::EndpointExt; -use crate::runtime::ProtocolS; -use std::collections::HashMap; - -/// invariant: last element is not zero. -/// => all values out of bounds are implicitly absent. -/// i.e., &[0,1] means {1<<32, 0} while &[0,1] is identical to &[1] and means {1}. -#[derive(Debug, Default)] -struct BitSet(Vec); -impl BitSet { - fn as_slice(&self) -> &[u32] { - self.0.as_slice() - } - fn iter(&self) -> impl Iterator + '_ { - self.0.iter().copied() - } - fn is_empty(&self) -> bool { - // relies on the invariant: no trailing zero u32's - self.0.is_empty() - } - fn clear(&mut self) { - self.0.clear(); - } - fn set_ones_until(&mut self, mut end: usize) { - self.0.clear(); - loop { - if end >= 32 { - // full 32 bits of 1 - self.0.push(!0u32); - } else { - if end > 0 { - // #end ones, with a (32-end) prefix of zeroes - self.0.push(!0u32 >> (32 - end)); - } - return; - } - } - } - #[inline(always)] - fn index_decomposed(index: usize) -> [usize; 2] { - // [chunk_index, chunk_bit] - [index / 32, index % 32] - } - fn test(&self, at: usize) -> bool { - let [chunk_index, chunk_bit] = Self::index_decomposed(at); - match self.0.get(chunk_index) { - None => false, - Some(&chunk) => (chunk & (1 << chunk_bit)) != 0, - } - } - fn set(&mut self, at: usize) { - let [chunk_index, chunk_bit] = Self::index_decomposed(at); - if chunk_index >= self.0.len() { - self.0.resize(chunk_index + 1, 0u32); - } - let chunk = unsafe { - // SAFE! previous line ensures sufficient size - self.0.get_unchecked_mut(chunk_index) - }; - *chunk |= 1 << chunk_bit; - } - fn unset(&mut self, at: usize) { - let [chunk_index, chunk_bit] = Self::index_decomposed(at); - if chunk_index < self.0.len() { - let chunk = unsafe { - // SAFE! previous line ensures sufficient size - self.0.get_unchecked_mut(chunk_index) - }; - *chunk &= !(1 << chunk_bit); - while let Some(0u32) = self.0.iter().copied().last() { - self.0.pop(); - } - } - } -} - -/// Converts an iterator over contiguous u32 chunks into an iterator over usize -/// e.g. input [0b111000, 0b11] gives output [3, 4, 5, 32, 33] -/// observe that the bits per chunk are ordered from least to most significant bits, yielding smaller to larger usizes. -/// works by draining the inner u32 chunk iterator one u32 at a time, then draining that chunk until its 0. -struct BitChunkIter> { - chunk_iter: I, - next_bit_index: usize, - cached: u32, -} - -impl> BitChunkIter { - fn new(chunk_iter: I) -> Self { - // first chunk is always a dummy zero, as if chunk_iter yielded Some(0). - // Consequences: - // 1. our next_bit_index is always off by 32 (we correct for it in Self::next) (no additional overhead) - // 2. we cache u32 and not Option, because chunk_iter.next() is only called in Self::next. - Self { chunk_iter, next_bit_index: 0, cached: 0 } - } -} -impl> Iterator for BitChunkIter { - type Item = usize; - fn next(&mut self) -> Option { - let mut chunk = self.cached; - - // loop until either: - // 1. there are no more Items to return, or - // 2. chunk encodes 1+ Items, one of which we will return. - while chunk == 0 { - // chunk is still empty! get the next one... - chunk = self.chunk_iter.next()?; - - // ... and jump self.next_bit_index to the next multiple of 32. - self.next_bit_index = (self.next_bit_index + 32) & !(32 - 1); - } - // assert(chunk > 0); - - // Shift the contents of chunk until the least significant bit is 1. - // ... being sure to increment next_bit_index accordingly. - #[inline(always)] - fn skip_n_zeroes(chunk: &mut u32, n: usize, next_bit_index: &mut usize) { - if *chunk & ((1 << n) - 1) == 0 { - // n least significant bits are zero. skip n bits. - *next_bit_index += n; - *chunk >>= n; - } - } - skip_n_zeroes(&mut chunk, 16, &mut self.next_bit_index); - skip_n_zeroes(&mut chunk, 08, &mut self.next_bit_index); - skip_n_zeroes(&mut chunk, 04, &mut self.next_bit_index); - skip_n_zeroes(&mut chunk, 02, &mut self.next_bit_index); - skip_n_zeroes(&mut chunk, 01, &mut self.next_bit_index); - // least significant bit of chunk is 1. - // assert(chunk & 1 == 1) - - // prepare our state for the next time Self::next is called. - // Overwrite self.cached such that its shifted state is retained, - // and jump over the bit whose index we are about to return. - self.next_bit_index += 1; - self.cached = chunk >> 1; - - // returned index is 32 smaller than self.next_bit_index because we use an - // off-by-32 encoding to avoid having to cache an Option. - Some(self.next_bit_index - 1 - 32) - } -} - -/// Returns an iterator over chunks of bits where ALL of the given -/// bitsets have 1. -struct AndChunkIter<'a> { - // this value is not overwritten during iteration - // invariant: !sets.is_empty() - sets: &'a [&'a [u32]], - - next_chunk_index: usize, -} -impl<'a> AndChunkIter<'a> { - fn new(sets: &'a [&'a [u32]]) -> Self { - let sets = if sets.is_empty() { &[&[] as &[_]] } else { sets }; - Self { sets, next_chunk_index: 0 } - } -} -impl Iterator for AndChunkIter<'_> { - type Item = u32; - fn next(&mut self) -> Option { - let old_chunk_index = self.next_chunk_index; - self.next_chunk_index += 1; - self.sets.iter().fold(Some(!0u32), move |a, b| { - let a = a?; - let b = *b.get(old_chunk_index)?; - Some(a & b) - }) - } -} - -#[test] -fn test_bit_iter() { - static SETS: &[&[u32]] = &[ - // - &[0b101001, 0b101001], - &[0b100001, 0b101001], - ]; - let iter = BitChunkIter::new(AndChunkIter::new(SETS)); - let indices = iter.collect::>(); - println!("indices {:?}", indices); -} - -enum Entity { - Payload(Payload), - Machine { state: ProtocolS, component_index: usize }, -} - -struct PortKey(usize); -struct EntiKey(usize); -struct CompKey(usize); - -struct ComponentInfo { - port_keyset: HashSet, - protocol: Arc, -} -#[derive(Default)] -struct Connection { - ecs: Ecs, - round_solution: Vec<(ChannelId, bool)>, // encodes an ASSIGNMENT - ekey_channel_ids: Vec, // all channel Ids for local keys - component_info: Vec, - endpoint_exts: Vec, -} - -/// Invariant: every component is either: -/// in to_run = (to_run_r U to_run_w) -/// or in ONE of the ekeys (which means it is blocked by a get on that ekey) -/// or in sync_ended (because they reached the end of their sync block) -/// or in inconsistent (because they are inconsistent) -#[derive(Default)] -struct Ecs { - entities: Vec, // machines + payloads - assignments: HashMap<(ChannelId, bool), BitSet>, - payloads: BitSet, - ekeys: HashMap, - inconsistent: BitSet, - sync_ended: BitSet, - to_run_r: BitSet, // read from and drained while... - to_run_w: BitSet, // .. written to and populated. } -} -impl Debug for Ecs { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let elen = self.entities.len(); - - write!(f, "{:<30}", "payloads")?; - print_flag_bits(f, &self.payloads, elen)?; - - write!(f, "{:<30}", "inconsistent")?; - print_flag_bits(f, &self.inconsistent, elen)?; - write!(f, "{:<30}", "sync_ended")?; - print_flag_bits(f, &self.sync_ended, elen)?; - write!(f, "{:<30}", "to_run_r")?; - print_flag_bits(f, &self.to_run_r, elen)?; - write!(f, "{:<30}", "to_run_w")?; - print_flag_bits(f, &self.to_run_w, elen)?; - - for (assignment, bitset) in self.assignments.iter() { - write!(f, "{:<30?}", assignment)?; - print_flag_bits(f, bitset, elen)?; - } - for (ekey, bitset) in self.ekeys.iter() { - write!(f, "Ekey {:<30?}", ekey)?; - print_flag_bits(f, bitset, elen)?; - } - Ok(()) - } -} -fn print_flag_bits(f: &mut Formatter, bitset: &BitSet, elen: usize) -> std::fmt::Result { - for i in 0..elen { - f.pad(match bitset.test(i) { - true => "1", - false => "0", - })?; - } - write!(f, "\n") -} - -struct Protocol { - // TODO -} - -struct Msg { - assignments: Vec<(ChannelId, bool)>, // invariant: no two elements have same ChannelId value - payload: Payload, -} - -impl Connection { - fn new_channel(&mut self) -> [PortKey; 2] { - todo!() - } - fn round(&mut self) { - // 1. at the start of the round we throw away all assignments. - // we are going to shift entities around, so all bitsets need to be cleared anyway. - self.ecs.assignments.clear(); - self.ecs.payloads.clear(); - self.ecs.ekeys.clear(); - self.ecs.inconsistent.clear(); - self.ecs.to_run_r.clear(); - self.ecs.to_run_w.clear(); - self.ecs.sync_ended.clear(); - - // 2. We discard all payloads; they are all stale now. - // All machines are contiguous in the vector - self.ecs - .entities - .retain(move |entity| if let Entity::Machine { .. } = entity { true } else { false }); - - // 3. initially, all the components need a chance to run in MONO mode - self.ecs.to_run_r.set_ones_until(self.ecs.entities.len()); - - // 4. INVARIANT established: - // for all State variants in self.entities, - // exactly one bit throughout the fields of csb is set. - - // 5. Run all machines in (csb.to_run_r U csb.to_run_w). - // Single, logical set is broken into readable / writable parts to allow concurrent reads / writes safely. - while !self.ecs.to_run_r.is_empty() { - for _eid in self.ecs.to_run_r.iter() { - // TODO run and possbibly manipulate self.to_run_w - } - self.ecs.to_run_r.clear(); - std::mem::swap(&mut self.ecs.to_run_r, &mut self.ecs.to_run_w); - } - assert!(self.ecs.to_run_w.is_empty()); - - #[allow(unreachable_code)] // DEBUG - 'recv_loop: loop { - let ekey: usize = todo!(); - let msg: Msg = todo!(); - // 1. check if this message is redundant, i.e., there is already an equivalent payload with predicate >= this one. - // ie. starting from all payloads - - // 2. try and find a payload whose predicate is the same or more general than this one - // if it exists, drop the message; it is uninteresting. - let ekey_bitset = self.ecs.ekeys.get(&ekey); - if let Some(_eid) = ekey_bitset.map(move |ekey_bitset| { - let mut slice_builder = vec![]; - // collect CONFLICTING assignments into slice_builder - for &(channel_id, boolean) in msg.assignments.iter() { - if let Some(bitset) = self.ecs.assignments.get(&(channel_id, !boolean)) { - slice_builder.push(bitset.as_slice()); - } - } - let chunk_iter = - InNoneExceptIter::new(slice_builder.as_slice(), ekey_bitset.as_slice()); - BitChunkIter::new(chunk_iter).next() - }) { - // _eid is a payload whose predicate is at least as general - // drop this message! - continue 'recv_loop; - } - - // 3. insert this payload as an entity, overwriting an existing LESS GENERAL payload if it exists. - let payload_eid: usize = if let Some(eid) = ekey_bitset.and_then(move |ekey_bitset| { - let mut slice_builder = vec![]; - slice_builder.push(ekey_bitset.as_slice()); - for assignment in msg.assignments.iter() { - if let Some(bitset) = self.ecs.assignments.get(assignment) { - slice_builder.push(bitset.as_slice()); - } - } - let chunk_iter = AndChunkIter::new(slice_builder.as_slice()); - BitChunkIter::new(chunk_iter).next() - }) { - // overwrite this entity index. - eid - } else { - // nothing to overwrite. add a new payload entity. - let eid = self.ecs.entities.len(); - self.ecs.entities.push(Entity::Payload(msg.payload)); - for &assignment in msg.assignments.iter() { - let mut bitset = self.ecs.assignments.entry(assignment).or_default(); - bitset.set(eid); - } - self.ecs.payloads.set(eid); - eid - }; - - self.feed_msg(payload_eid, ekey); - // TODO run all in self.ecs.to_run_w - } - } - - fn run_poly_p(&mut self, machine_eid: usize) { - match self.ecs.entities.get_mut(machine_eid) { - Some(Entity::Machine { component_index, state }) => { - // TODO run the machine - use PolyBlocker as Pb; - let blocker: Pb = todo!(); - match blocker { - Pb::Inconsistent => self.ecs.inconsistent.set(machine_eid), - Pb::CouldntCheckFiring(key) => { - // 1. clone the machine - let state_true = state.clone(); - let machine_eid_true = self.ecs.entities.len(); - self.ecs.entities.push(Entity::Machine { - state: state_true, - component_index: *component_index, - }); - // 2. copy the assignments of the existing machine to the new one - for bitset in self.ecs.assignments.values() { - if bitset.test(machine_eid) { - bitset.set(machine_eid_true); - } - } - // 3. give the old machine FALSE and the new machine TRUE - let channel_id = - self.endpoint_exts.get(key.to_raw() as usize).unwrap().info.channel_id; - self.ecs - .assignments - .entry((channel_id, false)) - .or_default() - .set(machine_eid); - self.ecs - .assignments - .entry((channel_id, true)) - .or_default() - .set(machine_eid_true); - self.run_poly_p(machine_eid); - self.run_poly_p(machine_eid_true); - } - _ => todo!(), - } - - // 1. make the assignment of this machine concrete WRT its ports - let component_info = self.component_info.get(*component_index).unwrap(); - for &ekey in component_info.port_keyset.iter() { - let channel_id = self.endpoint_exts.get(ekey.0).unwrap().info.channel_id; - let test = self - .ecs - .assignments - .get(&(channel_id, true)) - .map(move |bitset| bitset.test(machine_eid)) - .unwrap_or(false); - if !test { - // TRUE assignment wasn't set - // so set FALSE assignment (no effect if already set) - self.ecs - .assignments - .entry((channel_id, false)) - .or_default() - .set(machine_eid); - } - } - // 2. this machine becomes solved - self.ecs.sync_ended.set(machine_eid); - self.generate_new_solutions(machine_eid); - // TODO run this machine to a poly blocker - // potentially mark as inconsistent, blocked on some key, or solved - // if solved - } - _ => unreachable!(), - } - } - - fn generate_new_solutions(&mut self, newly_solved_machine_eid: usize) { - // this vector will be used to store assignments from self.ekey_channel_ids to elements in {true, false} - let mut solution_prefix = vec![]; - self.generate_new_solutions_rec(newly_solved_machine_eid, &mut solution_prefix); - // let all_channel_ids = - // let mut slice_builder = vec![]; - } - fn generate_new_solutions_rec( - &mut self, - newly_solved_machine_eid: usize, - solution_prefix: &mut Vec, - ) { - let eid = newly_solved_machine_eid; - let n = solution_prefix.len(); - if let Some(&channel_id) = self.ekey_channel_ids.get(n) { - if let Some(assignment) = self.machine_assignment_for(eid, channel_id) { - // this machine already gives an assignment - solution_prefix.push(assignment); - self.generate_new_solutions_rec(eid, solution_prefix); - solution_prefix.pop(); - } else { - // this machine does not give an assignment. try both branches! - solution_prefix.push(false); - self.generate_new_solutions_rec(eid, solution_prefix); - solution_prefix.pop(); - solution_prefix.push(true); - self.generate_new_solutions_rec(eid, solution_prefix); - solution_prefix.pop(); - } - } else { - println!("SOLUTION:"); - for (channel_id, assignment) in self.ekey_channel_ids.iter().zip(solution_prefix.iter()) - { - println!("{:?} => {:?}", channel_id, assignment); - } - // SOLUTION COMPLETE! - return; - } - } - - fn machine_assignment_for(&self, machine_eid: usize, channel_id: ChannelId) -> Option { - let test = move |bitset: &BitSet| bitset.test(machine_eid); - self.ecs - .assignments - .get(&(channel_id, true)) - .map(test) - .or_else(move || self.ecs.assignments.get(&(channel_id, false)).map(test)) - } - - fn feed_msg(&mut self, payload_eid: usize, ekey: usize) { - // 1. identify the component who: - // * is blocked on this ekey, - // * and has a predicate at least as strict as that of this payload - let mut slice_builder = vec![]; - let ekey_bitset = - self.ecs.ekeys.get_mut(&ekey).expect("Payload sets this => cannot be empty"); - slice_builder.push(ekey_bitset.as_slice()); - for bitset in self.ecs.assignments.values() { - // it doesn't matter which assignment! just that this payload sets it too - if bitset.test(payload_eid) { - slice_builder.push(bitset.as_slice()); - } - } - let chunk_iter = - InAllExceptIter::new(slice_builder.as_slice(), self.ecs.payloads.as_slice()); - let mut iter = BitChunkIter::new(chunk_iter); - if let Some(machine_eid) = iter.next() { - // TODO is it possible for there to be 2+ iterations? I'm thinking No - // RUN THIS MACHINE - ekey_bitset.unset(machine_eid); - self.ecs.to_run_w.set(machine_eid); - } - } -} - -struct InAllExceptIter<'a> { - next_chunk_index: usize, - in_all: &'a [&'a [u32]], - except: &'a [u32], -} -impl<'a> InAllExceptIter<'a> { - fn new(in_all: &'a [&'a [u32]], except: &'a [u32]) -> Self { - Self { in_all, except, next_chunk_index: 0 } - } -} -impl<'a> Iterator for InAllExceptIter<'a> { - type Item = u32; - fn next(&mut self) -> Option { - let i = self.next_chunk_index; - self.next_chunk_index += 1; - let init = self.except.get(i).map(move |&x| !x).or(Some(1)); - self.in_all.iter().fold(init, move |folding, slice| { - let a = folding?; - let b = slice.get(i).copied().unwrap_or(0); - Some(a & !b) - }) - } -} - -struct InNoneExceptIter<'a> { - next_chunk_index: usize, - in_none: &'a [&'a [u32]], - except: &'a [u32], -} -impl<'a> InNoneExceptIter<'a> { - fn new(in_none: &'a [&'a [u32]], except: &'a [u32]) -> Self { - Self { in_none, except, next_chunk_index: 0 } - } -} -impl<'a> Iterator for InNoneExceptIter<'a> { - type Item = u32; - fn next(&mut self) -> Option { - let i = self.next_chunk_index; - self.next_chunk_index += 1; - let init = self.except.get(i).copied()?; - Some(self.in_none.iter().fold(init, move |folding, slice| { - let a = folding; - let b = slice.get(i).copied().unwrap_or(0); - a & !b - })) - } -} - -/* -The idea is we have a set of component machines that fork whenever they reflect on the oracle to make concrete their predicates. -their speculative execution procedure BLOCKS whenever they reflect on the contents of a message that has not yet arrived. -the promise is, therefore, never to forget about these blocked machines. -the only event that unblocks a machine - -operations needed: -1. FORK -given a component and a predicate, -create and retain a clone of the component, and the predicate, with one additional assignment - -2. GET -when running a machine with {state S, predicate P}, it may try to get a message at K. -IF there exists a payload at K with predicate P2 s.t. P2 >= P, feed S the message and continue. -ELSE list (S,P,K) as BLOCKED and... -for all payloads X at K with predicate P2 s.t. P2 < P, fork S to create S2. store it with predicate P2 and feed it X and continue - -2. RECV -when receiving a payload at key K with predicate P, -IF there exists a payload at K with predicate P2 where P2 >= P, discard the new one and continue. -ELSE if there exists a payload at K with predicate P2 where P2 < P, assert their contents are identical, overwrite P2 with P try feed this payload to any BLOCKED machines -ELSE insert this payload with P and K as a new payload, and feed it to any compatible machines blocked on K - - - -==================== -EXTREME approach: -1. entities: {states} U {payloads} -2. ecs: {} - -================== -*/ - -impl Debug for FlagMatrix { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - for r in 0..self.dims[0] { - write!(f, "|")?; - for c in 0..self.dims[1] { - write!( - f, - "{}", - match self.test([r, c]) { - false => '0', - true => '1', - } - )?; - } - write!(f, "|\n")?; - } - Ok(()) - } -} - -// invariant: all bits outside of 0..columns and 0..rows BUT in the allocated space are ZERO -struct FlagMatrix { - bytes: *mut u32, - u32s_total: usize, - u32s_per_row: usize, - dims: [usize; 2], -} -#[inline(always)] -fn ceiling_to_mul_32(value: usize) -> usize { - (value + 31) & !31 -} -impl Drop for FlagMatrix { - fn drop(&mut self) { - let layout = Self::layout_for(self.u32s_total); - unsafe { - // ? - std::alloc::dealloc(self.bytes as *mut u8, layout); - } - } -} -impl FlagMatrix { - fn get_dims(&self) -> &[usize; 2] { - &self.dims - } - - fn set_entire_row(&mut self, row: usize) { - assert!(row < self.dims[0]); - let mut cols_left = self.dims[1]; - unsafe { - let mut ptr = self.bytes.add(self.offset_of_chunk_unchecked([row, 0])); - while cols_left >= 32 { - *ptr = !0u32; - cols_left -= 32; - ptr = ptr.add(1); - } - if cols_left > 0 { - // jagged chunk! - *ptr |= (!0) >> (32 - cols_left); - } - } - } - fn unset_entire_row(&mut self, row: usize) { - assert!(row < self.dims[0]); - let mut cols_left = self.dims[1]; - unsafe { - let mut ptr = self.bytes.add(self.offset_of_chunk_unchecked([row, 0])); - while cols_left > 0 { - *ptr = 0u32; - cols_left -= 32; - ptr = ptr.add(1); - } - } - } - - fn reshape(&mut self, new_dims: [usize; 2]) { - dbg!(self.u32s_total, self.u32s_per_row); - - // 1. calc new u32s_per_row - let new_u32s_per_row = match ceiling_to_mul_32(new_dims[1]) / 32 { - min if min > self.u32s_per_row => Some(min * 2), - _ => None, - }; - - // 2. calc new u32s_total - let new_u32s_total = match new_u32s_per_row.unwrap_or(self.u32s_per_row) * new_dims[0] { - min if min > self.u32s_total => Some(min * 2), - _ => None, - }; - - // 3. set any bits no longer in columns to zero - let new_last_chunk_zero_prefix = new_dims[1] % 32; - if new_dims[1] < self.dims[1] { - let old_min_u32_per_row = ceiling_to_mul_32(new_dims[1]) / 32; - let new_min_u32_per_row = ceiling_to_mul_32(self.dims[1]) / 32; - let common_rows = self.dims[0].min(new_dims[0]); - if old_min_u32_per_row < new_min_u32_per_row { - // zero chunks made entirely of removed columns - for row in 0..common_rows { - unsafe { - self.bytes - .add(self.offset_of_chunk_unchecked([row, old_min_u32_per_row])) - .write_bytes(0u8, new_min_u32_per_row - old_min_u32_per_row); - } - } - } - if new_last_chunk_zero_prefix > 0 { - // wipe out new_last_chunk_zero_prefix-most significant bits of all new last column chunks - let mask: u32 = !0u32 >> new_last_chunk_zero_prefix; - for row in 0..common_rows { - let o_of = self.offset_of_chunk_unchecked([row, new_min_u32_per_row - 1]); - unsafe { *self.bytes.add(o_of) &= mask }; - } - } - } - - // 4. if we won't do a new allocation, zero any bit no longer in rows - if new_dims[0] < self.dims[0] && new_u32s_total.is_none() { - // zero all bytes from beginning of first removed row, - // to end of last removed row - unsafe { - self.bytes - .add(self.offset_of_chunk_unchecked([new_dims[0], 0])) - .write_bytes(0u8, self.u32s_per_row * (self.dims[0] - new_dims[0])); - } - } - - dbg!(new_u32s_per_row, new_u32s_total); - match [new_u32s_per_row, new_u32s_total] { - [None, None] => { /* do nothing */ } - [None, Some(new_u32s_total)] => { - // realloc only! column alignment is still OK - // assert!(new_u32s_total > self.u32s_total); - let old_layout = Self::layout_for(self.u32s_total); - let new_layout = Self::layout_for(new_u32s_total); - let new_bytes = unsafe { - let new_bytes = std::alloc::alloc(new_layout) as *mut u32; - // copy the previous total - self.bytes.copy_to_nonoverlapping(new_bytes, self.u32s_total); - // and zero the remainder - new_bytes - .add(self.u32s_total) - .write_bytes(0u8, new_u32s_total - self.u32s_total); - // drop the previous buffer - std::alloc::dealloc(self.bytes as *mut u8, old_layout); - new_bytes - }; - self.bytes = new_bytes; - println!("AFTER {:?}", self.bytes); - self.u32s_total = new_u32s_total; - } - [Some(new_u32s_per_row), None] => { - // shift only! - // assert!(new_u32s_per_row > self.u32s_per_row); - for r in (0..self.dims[0]).rev() { - // iterate in REVERSE order because new row[n] may overwrite old row[n+m] - unsafe { - let src = self.bytes.add(r * self.u32s_per_row); - let dest = self.bytes.add(r * new_u32s_per_row); - // copy the used prefix - src.copy_to(dest, self.u32s_per_row); - // and zero the remainder - dest.add(self.u32s_per_row) - .write_bytes(0u8, new_u32s_per_row - self.u32s_per_row); - } - } - self.u32s_per_row = new_u32s_per_row; - } - [Some(new_u32s_per_row), Some(new_u32s_total)] => { - // alloc AND shift! - // assert!(new_u32s_total > self.u32s_total); - // assert!(new_u32s_per_row > self.u32s_per_row); - let old_layout = Self::layout_for(self.u32s_total); - let new_layout = Self::layout_for(new_u32s_total); - let new_bytes = unsafe { std::alloc::alloc(new_layout) as *mut u32 }; - for r in 0..self.dims[0] { - // iterate forwards over rows! - unsafe { - let src = self.bytes.add(r * self.u32s_per_row); - let dest = new_bytes.add(r * new_u32s_per_row); - // copy the used prefix - src.copy_to_nonoverlapping(dest, self.u32s_per_row); - // and zero the remainder - dest.add(self.u32s_per_row) - .write_bytes(0u8, new_u32s_per_row - self.u32s_per_row); - } - } - let fresh_rows_at = self.dims[0] * new_u32s_per_row; - unsafe { - new_bytes.add(fresh_rows_at).write_bytes(0u8, new_u32s_total - fresh_rows_at); - } - unsafe { std::alloc::dealloc(self.bytes as *mut u8, old_layout) }; - self.u32s_per_row = new_u32s_per_row; - self.bytes = new_bytes; - self.u32s_total = new_u32s_total; - } - } - self.dims = new_dims; - } - - fn layout_for(u32s_total: usize) -> std::alloc::Layout { - unsafe { - // this layout is ALWAYS valid: - // 1. size is always nonzero - // 2. size is always a multiple of 4 and 4-aligned - std::alloc::Layout::from_size_align_unchecked(4 * u32s_total.max(1), 4) - } - } - fn new(dims: [usize; 2], extra_dim_space: [usize; 2]) -> Self { - let u32s_per_row = ceiling_to_mul_32(dims[1] + extra_dim_space[1]) / 32; - let u32s_total = u32s_per_row * (dims[0] + extra_dim_space[0]); - let layout = Self::layout_for(u32s_total); - let bytes = unsafe { - // allocate - let bytes = std::alloc::alloc(layout) as *mut u32; - // and zero - bytes.write_bytes(0u8, u32s_total); - bytes - }; - Self { bytes, u32s_total, u32s_per_row, dims } - } - fn assert_within_bounds(&self, at: [usize; 2]) { - assert!(at[0] < self.dims[0]); - assert!(at[1] < self.dims[1]); - } - #[inline(always)] - fn offset_of_chunk_unchecked(&self, at: [usize; 2]) -> usize { - (self.u32s_per_row * at[0]) + at[1] / 32 - } - #[inline(always)] - fn offsets_unchecked(&self, at: [usize; 2]) -> [usize; 2] { - let of_chunk = self.offset_of_chunk_unchecked(at); - let in_chunk = at[1] % 32; - [of_chunk, in_chunk] - } - fn set(&mut self, at: [usize; 2]) { - self.assert_within_bounds(at); - let [o_of, o_in] = self.offsets_unchecked(at); - unsafe { *self.bytes.add(o_of) |= 1 << o_in }; - } - fn unset(&mut self, at: [usize; 2]) { - self.assert_within_bounds(at); - let [o_of, o_in] = self.offsets_unchecked(at); - unsafe { *self.bytes.add(o_of) &= !(1 << o_in) }; - } - fn test(&self, at: [usize; 2]) -> bool { - self.assert_within_bounds(at); - let [o_of, o_in] = self.offsets_unchecked(at); - unsafe { *self.bytes.add(o_of) & (1 << o_in) != 0 } - } - unsafe fn copy_chunk_unchecked(&self, row: usize, col_chunk_index: usize) -> u32 { - let o_of = (self.u32s_per_row * row) + col_chunk_index; - *self.bytes.add(o_of) - } - - /// return an efficient interator over column indices c in the range 0..self.dims[1] - /// where self.test([t_row, c]) && f_rows.iter().all(|&f_row| !self.test([f_row, c])) - fn col_iter_t1fn<'a, 'b: 'a>( - &'a self, - t_row: usize, - f_rows: &'b [usize], - ) -> impl Iterator + 'a { - // 1. ensure all ROWS indices are in range. - assert!(t_row < self.dims[0]); - for &f_row in f_rows.iter() { - assert!(f_row < self.dims[0]); - } - - // 2. construct an unsafe iterator over chunks - // column_chunk_range ensures all col_chunk_index values are in range. - let column_chunk_range = 0..ceiling_to_mul_32(self.dims[1]) / 32; - let chunk_iter = column_chunk_range.map(move |col_chunk_index| { - // SAFETY: all rows and columns have already been bounds-checked. - let t_chunk = unsafe { self.copy_chunk_unchecked(t_row, col_chunk_index) }; - f_rows.iter().fold(t_chunk, |chunk, &f_row| { - let f_chunk = unsafe { self.copy_chunk_unchecked(f_row, col_chunk_index) }; - chunk & !f_chunk - }) - }); - - // 3. yield columns indices from the chunk iterator - BitChunkIter::new(chunk_iter) - } -} - -// trait RwMatrixBits { -// fn set(&mut self, at: [usize;2]); -// fn unset(&mut self, at: [usize;2]); -// fn set_entire_row(&mut self, row: usize); -// fn unset_entire_row(&mut self, row: usize); -// } - -// struct MatrixRefW<'a> { -// _inner: usize, -// } -// impl<'a> MatrixRefW<'a> { - -// } - -#[test] -fn matrix() { - let mut m = FlagMatrix::new([6, 6], [0, 0]); - for i in 0..5 { - m.set([0, i]); - m.set([i, i]); - } - m.set_entire_row(5); - println!("{:?}", &m); - m.reshape([6, 40]); - let iter = m.col_iter_t1fn(0, &[1, 2, 3]); - for c in iter { - println!("{:?}", c); - } - println!("{:?}", &m); -} diff --git a/src/runtime/retired/experimental/mod.rs b/src/runtime/retired/experimental/mod.rs deleted file mode 100644 index b7882ebf608663b00d91a7f37c681971f1e5b9e4..0000000000000000000000000000000000000000 --- a/src/runtime/retired/experimental/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -// mod api; -// mod bits; -mod pdl; -// mod vec_storage; diff --git a/src/runtime/retired/experimental/pdl.rs b/src/runtime/retired/experimental/pdl.rs deleted file mode 100644 index 0f2d07cbbc6af35251da6794fd5fb9b624c58b2c..0000000000000000000000000000000000000000 --- a/src/runtime/retired/experimental/pdl.rs +++ /dev/null @@ -1,46 +0,0 @@ -use crate::common::Port; - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -struct Pdl { - ops: Vec, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -enum Op { - JumpIfEq, // - SyncStart, // - SyncEnd, // - Exit, // - Firing, // pop port A. push bool B. - Put, // pop port A. pop payload B. - Get, // pop port A. push payload B. - // - PushConst(Value), - Store, // pop unslong A. pop B. store[A] := B. - Load, // pop unslong A. push store[A]. - Dup, // pop A. push A. push A. - // - Lt, // pop A. pop B. push bool C. - Eq, // pop A. pop B. push bool C. - Add, // pop integer A. pop integer B. push A+B - Neg, // pop signed integer A. push -A. - // - Nand, // pop bool A. pop bool B. push nand(A,B) -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -struct State { - op_index: usize, - store: Vec, // TODO multiple frames - stack: Vec, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -enum Value { - Null, - Int(i32), - Bool(bool), - UnsLong(u64), - Payload(Vec), - Port(Port), -} diff --git a/src/runtime/retired/experimental/predicate.rs b/src/runtime/retired/experimental/predicate.rs deleted file mode 100644 index c3c1299a25a78fe1044543127a8a705f17949bdc..0000000000000000000000000000000000000000 --- a/src/runtime/retired/experimental/predicate.rs +++ /dev/null @@ -1,240 +0,0 @@ -use crate::common::*; -use crate::runtime::ProtocolS; -use core::ops::Index; -use core::ops::IndexMut; - -use std::collections::BTreeMap; - -// we assume a dense ChannelIndex domain! - -enum CommonSatisfier { - FormerNotLatter, - LatterNotFormer, - Equivalent, - New(T), - Nonexistant, -} - -type ChunkType = u16; -const MASK_BITS: ChunkType = 0x_AA_AA; // 101010... - -#[test] -fn mask_ok() { - assert_eq!(!0, MASK_BITS | (MASK_BITS >> 1)); - assert_eq!(0, MASK_BITS & (MASK_BITS >> 1)); -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -struct TernChunk(ChunkType); // invariant: no pair is 01 - -impl TernChunk { - fn overwrite(&mut self, index: usize, value: bool) -> Option { - assert!(index < Self::vars_per_chunk()); - let mask_bit_mask = 1 << (index * 2 + 1); - let bool_bit_mask = 1 << (index * 2); - let ret = if self.0 & mask_bit_mask != 0 { - let was_value = self.0 & bool_bit_mask != 0; - if was_value != value { - // flip the value bit - self.0 ^= bool_bit_mask; - } - Some(was_value) - } else { - if value { - // set the value bit - self.0 |= bool_bit_mask; - } - None - }; - // set the mask bit - self.0 |= mask_bit_mask; - ret - } - fn new_singleton(index: usize, value: bool) -> Self { - assert!(index < Self::vars_per_chunk()); - let mask_bits = 1 << (index * 2 + 1); - let maybe_bit: ChunkType = value as ChunkType; - assert_eq!(maybe_bit == 1, value); - assert!(maybe_bit <= 1); - let bool_bits = maybe_bit << (index * 2); - Self(mask_bits | bool_bits) - } - const fn vars_per_chunk() -> usize { - std::mem::size_of::() / 2 - } - #[inline] - fn query(self, index: usize) -> Option { - assert!(index < Self::vars_per_chunk()); - let mask_bit_mask = 1 << (index * 2 + 1); - let bool_bit_mask = 1 << (index * 2); - if self.0 & mask_bit_mask != 0 { - Some(self.0 & bool_bit_mask != 0) - } else { - None - } - } - fn mutual_satisfaction(self, othe: Self) -> [bool; 2] { - let s_mask = self.0 & MASK_BITS; - let o_mask = othe.0 & MASK_BITS; - let both_mask = s_mask & o_mask; - let diff = self.0 ^ othe.0; - let masked_diff = diff & (both_mask >> 1); - if masked_diff != 0 { - [false; 2] - } else { - let s_sat_o = s_mask & !o_mask == 0; - let o_sat_s = o_mask & !s_mask == 0; - [s_sat_o, o_sat_s] - } - } - - /// Returns whether self satisfies other - /// false iff either: - /// 1. there exists a pair which you specify and I dont. - // i.e., self has 00, othe has 1? - /// 2. we both specify a variable with different values. - /// i.e., self has 10, othe has 11 or vice versa. - fn satisfies(self, othe: Self) -> bool { - let s_mask = self.0 & MASK_BITS; - let o_mask = othe.0 & MASK_BITS; - let both_mask = s_mask & o_mask; - let diff = self.0 ^ othe.0; - - // FALSE if othe has a 1X pair where self has a 1(!X) pair - let masked_diff = diff & (both_mask >> 1); - - // FALSE if othe has a 1X pair where self has a 0Y pair. - let o_not_s_mask = o_mask & !s_mask; - - o_not_s_mask | masked_diff == 0 - } - - fn common_satisfier(self, othe: Self) -> Option { - let s_mask = self.0 & MASK_BITS; - let o_mask = othe.0 & MASK_BITS; - let both_mask = s_mask & o_mask; - let diff = self.0 ^ othe.0; - let masked_diff = diff & (both_mask >> 1); - if masked_diff != 0 { - // an inconsistency exists - None - } else { - let s_vals = (s_mask >> 1) & self.0; - let o_vals = (o_mask >> 1) & othe.0; - let new = s_mask | o_mask | s_vals | o_vals; - Some(Self(new)) - } - } -} - -struct TernSet(Vec); // invariant: last byte != 00 -impl TernSet { - fn new_singleton(index: ChannelIndex, value: bool) -> Self { - let which_chunk = index as usize / TernChunk::vars_per_chunk(); - let inner_index = index as usize % TernChunk::vars_per_chunk(); - let it = std::iter::repeat(TernChunk(0)) - .take(which_chunk) - .chain(std::iter::once(TernChunk::new_singleton(inner_index, value))); - Self(it.collect()) - } - fn overwrite(&mut self, index: ChannelIndex, value: bool) -> Option { - let which_chunk = index as usize / TernChunk::vars_per_chunk(); - let inner_index = index as usize % TernChunk::vars_per_chunk(); - if let Some(tern_chunk) = self.0.get_mut(which_chunk) { - tern_chunk.overwrite(inner_index, value) - } else { - self.0.reserve(which_chunk - self.0.len()); - self.0.resize(which_chunk, TernChunk(0)); - self.0.push(TernChunk::new_singleton(inner_index, value)); - None - } - } - - fn query(&self, index: ChannelIndex) -> Option { - let which_chunk = index as usize / TernChunk::vars_per_chunk(); - self.0.get(which_chunk).copied().and_then(move |tern_chunk| { - tern_chunk.query(index as usize % TernChunk::vars_per_chunk()) - }) - } - fn satisfies(&self, othe: &Self) -> bool { - self.0.len() >= othe.0.len() && self.0.iter().zip(&othe.0).all(|(s, o)| s.satisfies(*o)) - } - fn common_satisfier(&self, othe: &Self) -> CommonSatisfier { - use CommonSatisfier as Cs; - let [slen, olen] = [self.0.len(), othe.0.len()]; - let [mut s_sat_o, mut o_sat_s] = [slen >= olen, slen <= olen]; - for (s, o) in self.0.iter().zip(&othe.0) { - let [s2, o2] = s.mutual_satisfaction(*o); - s_sat_o &= s2; - o_sat_s &= o2; - } - match [s_sat_o, o_sat_s] { - [true, true] => Cs::Equivalent, - [true, false] => Cs::FormerNotLatter, - [false, true] => Cs::LatterNotFormer, - [false, false] => Cs::New(Self( - self.0.iter().zip(&othe.0).map(|(s, o)| s.common_satisfier(*o).unwrap()).collect(), - )), - } - } - #[inline] - fn restore_invariant(&mut self) { - while self.0.iter().copied().last() == Some(TernChunk(0)) { - self.0.pop(); - } - } - fn is_empty(&self) -> bool { - self.0.is_empty() - } -} - -struct Predicate(BTreeMap); -impl Predicate { - pub fn overwrite(&mut self, channel_id: ChannelId, value: bool) -> Option { - let ChannelId { controller_id, channel_index } = channel_id; - use std::collections::btree_map::Entry; - match self.0.entry(controller_id) { - Entry::Occupied(mut x) => x.get_mut().overwrite(channel_index, value), - Entry::Vacant(x) => { - x.insert(TernSet::new_singleton(channel_index, value)); - None - } - } - } - pub fn query(&self, channel_id: ChannelId) -> Option { - let ChannelId { controller_id, channel_index } = channel_id; - self.0.get(&controller_id).and_then(move |tern_set| tern_set.query(channel_index)) - } - pub fn satisfies(&self, other: &Self) -> bool { - let mut s_it = self.0.iter(); - let mut s = if let Some(s) = s_it.next() { - s - } else { - return other.0.is_empty(); - }; - for (oid, ob) in other.0.iter() { - while s.0 < oid { - s = if let Some(s) = s_it.next() { - s - } else { - return false; - }; - } - if s.0 > oid || !s.1.satisfies(ob) { - return false; - } - } - true - } - - pub fn common_satisfier(&self, othe: &Self) -> CommonSatisfier { - // use CommonSatisfier as Cs; - // let [slen, olen] = [self.0.len(), othe.0.len()]; - // let [mut s_sat_o, mut o_sat_s] = [slen >= olen, slen <= olen]; - // let [mut s_it, mut o_it] = [self.0.iter(), othe.0.iter()]; - // let [mut s, mut o] = [s_it.next(), o_it.next()]; - todo!() - } -} - -//////////////////////////// diff --git a/src/runtime/retired/experimental/vec_storage.rs b/src/runtime/retired/experimental/vec_storage.rs deleted file mode 100644 index eaed13575a8f1ec095491a317c6f6e0ed023eb31..0000000000000000000000000000000000000000 --- a/src/runtime/retired/experimental/vec_storage.rs +++ /dev/null @@ -1,333 +0,0 @@ -use super::bits::{usize_bits, BitChunkIter}; -use crate::common::*; -use core::mem::MaybeUninit; - -#[derive(Default)] -struct Bitvec(Vec); -impl Bitvec { - #[inline(always)] - fn offsets_of(i: usize) -> [usize; 2] { - [i / usize_bits(), i % usize_bits()] - } - // assumes read will not go out of bounds - unsafe fn insert(&mut self, i: usize) { - let [o_of, o_in] = Self::offsets_of(i); - let chunk = self.0.get_unchecked_mut(o_of); - *chunk |= 1 << o_in; - } - // assumes read will not go out of bounds - unsafe fn remove(&mut self, i: usize) -> bool { - let [o_of, o_in] = Self::offsets_of(i); - let chunk = self.0.get_unchecked_mut(o_of); - let singleton_mask = 1 << o_in; - let was = (*chunk & singleton_mask) != 0; - *chunk &= !singleton_mask; - was - } - // assumes read will not go out of bounds - #[inline] - unsafe fn contains(&self, i: usize) -> bool { - let [o_of, o_in] = Self::offsets_of(i); - (*self.0.get_unchecked(o_of) & (1 << o_in)) != 0 - } - fn pop_first(&mut self) -> Option { - let i = self.first()?; - unsafe { self.remove(i) }; - Some(i) - } - fn iter(&self) -> impl Iterator + '_ { - BitChunkIter::new(self.0.iter().copied()).map(|x| x as usize) - } - fn first(&self) -> Option { - self.iter().next() - } -} - -// A T-type arena which: -// 1. does not check for the ABA problem -// 2. imposes the object keys on the user -// 3. allows the reservation of a space (getting the key) to precede the value being provided. -// 4. checks for user error -// -// Data contains values in one of three states: -// 1. occupied: ininitialized. will be dropped. -// 2. vacant: uninitialized. may be reused implicitly. won't be dropped. -// 2. reserved: uninitialized. may be occupied implicitly. won't be dropped. -// -// element access is O(1) -// removal is O(1) amortized. -// insertion is O(N) with a small constant factor, -// doing at worst one linear probe through N/(word_size) contiguous words -// -// invariant A: data elements are inititalized <=> occupied bit is set -// invariant B: occupied and vacant have an empty intersection -// invariant C: (vacant U occupied) subset of (0..data.len) -// invariant D: last element of data is not in VACANT state -// invariant E: number of allocated bits in vacant and occupied >= data.len() -// invariant F: vacant_bit_count == vacant.iter().count() -pub struct VecStorage { - data: Vec>, - occupied: Bitvec, - vacant: Bitvec, - occupied_bit_count: usize, -} -impl Default for VecStorage { - fn default() -> Self { - Self { - data: Default::default(), - vacant: Default::default(), - occupied: Default::default(), - occupied_bit_count: 0, - } - } -} -impl Debug for VecStorage { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - enum FmtT<'a, T> { - Vacant(usize), - Reserved(usize), - Occupied(usize, &'a T), - }; - impl Debug for FmtT<'_, T> { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - match self { - FmtT::Vacant(i) => write!(f, "{} => Vacant", i), - FmtT::Reserved(i) => write!(f, "{} =>Reserved", i), - FmtT::Occupied(i, t) => { - write!(f, "{} => Occupied(", i)?; - t.fmt(f)?; - write!(f, ")") - } - } - } - } - - let iter = (0..self.data.len()).map(|i| unsafe { - // 1. data bounds are checked by construction of i. - // 2. occupied index => valid data is read. - // 3. bitset bounds are ensured by invariant E. - if self.occupied.contains(i) { - FmtT::Occupied(i, &*self.data.get_unchecked(i).as_ptr()) - } else if self.vacant.contains(i) { - FmtT::Vacant(i) - } else { - FmtT::Reserved(i) - } - }); - f.debug_list().entries(iter).finish() - } -} -impl Drop for VecStorage { - fn drop(&mut self) { - self.clear(); - } -} -impl VecStorage { - // ASSUMES that i in 0..self.data.len() - unsafe fn get_occupied_unchecked(&self, i: usize) -> Option<&T> { - if self.occupied.contains(i) { - // 2. Invariant A => reading valid ata - Some(&*self.data.get_unchecked(i).as_ptr()) - } else { - None - } - } - ////////////// - pub fn len(&self) -> usize { - self.occupied_bit_count - } - pub fn with_reserved_range(range_end: usize) -> Self { - let mut data = Vec::with_capacity(range_end); - unsafe { - // data is uninitialized, as intended - data.set_len(range_end); - } - let bitset_len = (range_end + (usize_bits() - 1)) / usize_bits(); - let chunk_iter = std::iter::repeat(0usize).take(bitset_len); - Self { - data, - vacant: Bitvec(chunk_iter.clone().collect()), - occupied: Bitvec(chunk_iter.collect()), - occupied_bit_count: 0, - } - } - pub fn clear(&mut self) { - for i in 0..self.data.len() { - // SAFE: bitvec bounds ensured by invariant E - if unsafe { self.occupied.contains(i) } { - // invariant A: this element is OCCUPIED - unsafe { - // 1. by construction, i is in bounds - // 2. i is occupied => initialized data is being dropped - drop(self.data.get_unchecked_mut(i).as_ptr().read()); - } - } - } - self.vacant.0.clear(); - self.occupied.0.clear(); - self.occupied_bit_count = 0; - } - pub fn iter(&self) -> impl Iterator { - (0..self.data.len()).filter_map(move |i| unsafe { self.get_occupied_unchecked(i) }) - } - pub fn iter_mut(&mut self) -> impl Iterator { - (0..self.data.len()).filter_map(move |i| unsafe { - // SAFE: bitvec bounds ensured by invariant E - if self.occupied.contains(i) { - // Invariant A => reading valid data - Some(&mut *self.data.get_unchecked_mut(i).as_mut_ptr()) - } else { - None - } - }) - } - pub fn get_occupied(&self, i: usize) -> Option<&T> { - if i >= self.data.len() { - None - } else { - unsafe { - // index is within bounds - self.get_occupied_unchecked(i) - } - } - } - pub fn get_occupied_mut(&mut self, i: usize) -> Option<&mut T> { - // SAFE: bitvec bounds ensured by invariant E - if i < self.data.len() && unsafe { self.occupied.contains(i) } { - unsafe { - // 1. index is within bounds - // 2. Invariant A => reading valid ata - Some(&mut *self.data.get_unchecked_mut(i).as_mut_ptr()) - } - } else { - None - } - } - pub fn new_reserved(&mut self) -> usize { - if let Some(i) = self.vacant.pop_first() { - i - } else { - let bitsets_need_another_chunk = self.data.len() % usize_bits() == 0; - // every (usize_bits())th time self.data grows by 1, bitsets grow by usize_bits(). - if bitsets_need_another_chunk { - self.vacant.0.push(0usize); - self.occupied.0.push(0usize); - } - self.data.push(MaybeUninit::uninit()); - self.data.len() - 1 - } - } - pub fn occupy_reserved(&mut self, i: usize, t: T) { - // SAFE: bitvec bounds ensured by invariant E - assert!(i < self.data.len()); - // element is within bounds - assert!(unsafe { !self.occupied.contains(i) && !self.vacant.contains(i) }); - // element is surely reserved - unsafe { - // 1. invariant C => write is within bounds - // 2. i WAS reserved => no initialized data is being overwritten - self.data.get_unchecked_mut(i).as_mut_ptr().write(t); - self.occupied.insert(i); - }; - self.occupied_bit_count += 1; - } - pub fn new_occupied(&mut self, t: T) -> usize { - let i = self.new_reserved(); - unsafe { - // 1. invariant C => write is within bounds - // 2. i WAS reserved => no initialized data is being overwritten - self.data.get_unchecked_mut(i).as_mut_ptr().write(t); - self.occupied.insert(i); - }; - self.occupied_bit_count += 1; - i - } - pub fn vacate(&mut self, i: usize) -> Option { - // SAFE: bitvec bounds ensured by invariant E - if i >= self.data.len() || unsafe { self.vacant.contains(i) } { - // already vacant. nothing to do here - return None; - } - // i is certainly within bounds of self.data - // SAFE: bitvec bounds ensured by invariant E - let value = if unsafe { self.occupied.remove(i) } { - unsafe { - // 1. index is within bounds - // 2. i is occupied => initialized data is being read - self.occupied_bit_count -= 1; - Some(self.data.get_unchecked_mut(i).as_ptr().read()) - } - } else { - // reservations have no data to drop - None - }; - // Mark as vacant... - if i + 1 == self.data.len() { - // ... by truncating self.data. - // must truncate to avoid violating invariant D. - // pops at least once: - while let Some(_) = self.data.pop() { - let pop_next = self - .data - .len() - .checked_sub(1) - .map(|index| unsafe { - // SAFE: bitvec bounds ensured by invariant E - self.vacant.remove(index) - }) - .unwrap_or(false); - if !pop_next { - break; - } - } - } else { - // ... by populating self.vacant. - // SAFE: bitvec bounds ensured by invariant E - unsafe { self.vacant.insert(i) }; - } - value - } - pub fn iter_reserved(&self) -> impl Iterator + '_ { - BitChunkIter::new(self.occupied.0.iter().zip(self.vacant.0.iter()).map(|(&a, &b)| !(a | b))) - .take_while(move |&x| x < self.data.len()) - } -} - -#[test] -fn vec_storage() { - #[derive(Debug)] - struct Foo; - impl Drop for Foo { - fn drop(&mut self) { - println!("DROPPING FOO!"); - } - } - let mut v = VecStorage::with_reserved_range(4); - let i0 = v.new_occupied(Foo); - println!("{:?}", &v); - - let i1 = v.new_reserved(); - println!("{:?}", &v); - - println!("reserved {:?}", v.iter_reserved().collect::>()); - - println!("q {:?}", v.vacate(i0)); - println!("{:?}", &v); - - println!("q {:?}", v.vacate(2)); - println!("{:?}", &v); - - println!("q {:?}", v.vacate(1)); - println!("{:?}", &v); - - v.occupy_reserved(i1, Foo); - println!("{:?}", &v); - - *v.get_occupied_mut(i1).unwrap() = Foo; - println!("{:?}", &v); - - println!("q {:?}", v.vacate(i1)); - println!("{:?}", &v); - println!("q {:?}", v.vacate(3)); - println!("{:?}", &v); -} diff --git a/src/runtime/retired/ffi.rs b/src/runtime/retired/ffi.rs deleted file mode 100644 index 977ed37f9c9b2b968279f7a2734cdece37daa5c5..0000000000000000000000000000000000000000 --- a/src/runtime/retired/ffi.rs +++ /dev/null @@ -1,374 +0,0 @@ -use crate::common::*; -use crate::runtime::*; - -use core::cell::RefCell; -use std::os::raw::{c_char, c_int, c_uchar, c_uint}; - -struct StoredError { - filled: bool, - buf: Vec, -} -thread_local! { - // stores a string. DOES store the null terminator - static LAST_ERROR: RefCell = RefCell::new(StoredError { filled: false, buf: Vec::with_capacity(128) } ); -} - -const NULL_TERMINATOR: c_char = b'\0' as c_char; -// Silly HACK: rust uses MAX alignment of 128 bytes for fields (no effect) but causes -// cbindgen tool to make this struct OPAQUE (which is what we want). - -// NOT null terminated -fn overwrite_last_error(error_msg: &[u8]) { - LAST_ERROR.with(|stored_error| { - let mut stored_error = stored_error.borrow_mut(); - stored_error.filled = true; - stored_error.buf.clear(); - let error_msg = unsafe { &*(error_msg as *const [u8] as *const [i8]) }; - stored_error.buf.extend_from_slice(error_msg); - stored_error.buf.push(NULL_TERMINATOR); - }) -} - -unsafe fn as_rust_str R>(s: *const c_char, f: F) -> Option { - as_rust_bytes(s, |bytes| { - let s = std::str::from_utf8(bytes).ok()?; - Some(f(s)) - }) -} - -unsafe fn as_rust_bytes R>(s: *const c_char, f: F) -> R { - let len = c_str_len(s); - let s = s as *const u8; - let bytes: &[u8] = std::slice::from_raw_parts(s, len); - f(bytes) -} - -unsafe fn c_str_len(s: *const c_char) -> usize { - let mut len = 0; - while *(s.offset(len.try_into().unwrap())) != NULL_TERMINATOR { - len += 1; - } - len -} - -unsafe fn try_parse_addr(s: *const c_char) -> Option { - as_rust_str(s, |s| s.parse().ok()).and_then(|x| x) -} - -/////////////////////////////////////// - -/// Returns a pointer into the error buffer for reading as a null-terminated string -/// Returns null if there is no error in the buffer. -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_error_peek() -> *const c_char { - LAST_ERROR.with(|stored_error| { - let stored_error = stored_error.borrow(); - if stored_error.filled { - stored_error.buf.as_ptr() - } else { - std::ptr::null() - } - }) -} - -/// Resets the error message buffer. -/// Returns: -/// - 0 if an error was cleared -/// - 1 if there was no error to clear -/// # Safety -/// TODO -#[no_mangle] -pub extern "C" fn connector_error_clear() -> c_int { - LAST_ERROR.with(|stored_error| { - let mut stored_error = stored_error.borrow_mut(); - if stored_error.filled { - stored_error.buf.clear(); - stored_error.filled = false; - 0 - } else { - 1 - } - }) -} - -/// Creates and returns Reowolf Connector structure allocated on the heap. -#[no_mangle] -pub extern "C" fn connector_new() -> *mut Connector { - Box::into_raw(Box::new(Connector::default())) -} - -/// Creates and returns Reowolf Connector structure allocated on the heap. -#[no_mangle] -pub extern "C" fn connector_with_controller_id(controller_id: ControllerId) -> *mut Connector { - Box::into_raw(Box::new(Connector::Unconfigured(Unconfigured { controller_id }))) -} - -/// Configures the given Reowolf connector with a protocol description in PDL. -/// Returns: -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_configure( - connector: *mut Connector, - pdl: *mut c_char, - main: *mut c_char, -) -> c_int { - let mut b = Box::from_raw(connector); // unsafe! - let ret = as_rust_bytes(pdl, |pdl_bytes| { - as_rust_bytes(main, |main_bytes| match b.configure(pdl_bytes, main_bytes) { - Ok(()) => 0, - Err(e) => { - overwrite_last_error(format!("{:?}", e).as_bytes()); - -1 - } - }) - }); - Box::into_raw(b); // don't drop! - ret -} - -/// Provides a binding annotation for the port with the given index with "native": -/// (The port is exposed for reading and writing from the application) -/// Returns: -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_bind_native( - connector: *mut Connector, - proto_port_index: usize, -) -> c_int { - // use PortBindErr::*; - let mut b = Box::from_raw(connector); // unsafe! - let ret = match b.bind_port(proto_port_index, PortBinding::Native) { - Ok(()) => 0, - Err(e) => { - overwrite_last_error(format!("{:?}", e).as_bytes()); - -1 - } - }; - Box::into_raw(b); // don't drop! - ret -} - -/// Provides a binding annotation for the port with the given index with "native": -/// (The port is exposed for reading and writing from the application) -/// Returns: -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_bind_passive( - connector: *mut Connector, - proto_port_index: c_uint, - address: *const c_char, -) -> c_int { - if let Some(addr) = try_parse_addr(address) { - // use PortBindErr::*; - let mut b = Box::from_raw(connector); // unsafe! - let ret = - match b.bind_port(proto_port_index.try_into().unwrap(), PortBinding::Passive(addr)) { - Ok(()) => 0, - Err(e) => { - overwrite_last_error(format!("{:?}", e).as_bytes()); - -1 - } - }; - Box::into_raw(b); // don't drop! - ret - } else { - overwrite_last_error(b"Failed to parse input as ip address!"); - -1 - } -} - -/// Provides a binding annotation for the port with the given index with "active": -/// (The port will conenct to a "passive" port at the given address during connect()) -/// Returns: -/// - 0 for success -/// - 1 if the port was already bound and was left unchanged -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_bind_active( - connector: *mut Connector, - proto_port_index: c_uint, - address: *const c_char, -) -> c_int { - if let Some(addr) = try_parse_addr(address) { - // use PortBindErr::*; - let mut b = Box::from_raw(connector); // unsafe! - let ret = match b.bind_port(proto_port_index.try_into().unwrap(), PortBinding::Active(addr)) - { - Ok(()) => 0, - Err(e) => { - overwrite_last_error(format!("{:?}", e).as_bytes()); - -1 - } - }; - Box::into_raw(b); // don't drop! - ret - } else { - overwrite_last_error(b"Failed to parse input as ip address!"); - -1 - } -} - -/// Provides a binding annotation for the port with the given index with "active": -/// (The port will conenct to a "passive" port at the given address during connect()) -/// Returns: -/// - 0 SUCCESS: connected successfully -/// - TODO error codes -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_connect( - connector: *mut Connector, - timeout_millis: u64, -) -> c_int { - let mut b = Box::from_raw(connector); // unsafe! - let ret = match b.connect(Duration::from_millis(timeout_millis)) { - Ok(()) => 0, - Err(e) => { - overwrite_last_error(format!("{:?}", e).as_bytes()); - -1 - } - }; - Box::into_raw(b); // don't drop! - ret -} - -/// Destroys the given connector, freeing its underlying resources. -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_destroy(connector: *mut Connector) { - let c = Box::from_raw(connector); // unsafe! - drop(c); // for readability -} - -/// Prepares to synchronously put a message at the given port, reading it from the given buffer. -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_put( - connector: *mut Connector, - proto_port_index: c_uint, - buf_ptr: *mut c_uchar, - msg_len: c_uint, -) -> c_int { - let buf = std::slice::from_raw_parts_mut(buf_ptr, msg_len.try_into().unwrap()); - let vec: Vec = buf.to_vec(); // unsafe - let mut b = Box::from_raw(connector); // unsafe! - let ret = b.put(proto_port_index.try_into().unwrap(), vec.into()); - Box::into_raw(b); // don't drop! - match ret { - Ok(()) => 0, - Err(e) => { - overwrite_last_error(format!("{:?}", e).as_bytes()); - -1 - } - } -} - -/// Prepares to synchronously put a message at the given port, writing it to the given buffer. -/// - 0 SUCCESS -/// - 1 this port has the wrong direction -/// - 2 this port is already marked to get -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_get( - connector: *mut Connector, - proto_port_index: c_uint, -) -> c_int { - let mut b = Box::from_raw(connector); // unsafe! - let ret = b.get(proto_port_index.try_into().unwrap()); - Box::into_raw(b); // don't drop! - // use PortOperationErr::*; - match ret { - Ok(()) => 0, - Err(e) => { - overwrite_last_error(format!("{:?}", e).as_bytes()); - -1 - } - } -} - -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_gotten( - connector: *mut Connector, - proto_port_index: c_uint, - buf_ptr_outptr: *mut *const c_uchar, - len_outptr: *mut c_uint, -) -> c_int { - let b = Box::from_raw(connector); // unsafe! - let ret = b.read_gotten(proto_port_index.try_into().unwrap()); - // use ReadGottenErr::*; - let result = match ret { - Ok(ptr_slice) => { - let buf_ptr = ptr_slice.as_ptr(); - let len = ptr_slice.len().try_into().unwrap(); - buf_ptr_outptr.write(buf_ptr); - len_outptr.write(len); - 0 - } - Err(e) => { - overwrite_last_error(format!("{:?}", e).as_bytes()); - -1 - } - }; - Box::into_raw(b); // don't drop! - result -} - -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_dump_log(connector: *mut Connector) -> c_int { - let mut b = Box::from_raw(connector); // unsafe! - let result = match b.get_mut_logger() { - Some(s) => { - println!("{}", s); - 0 - } - None => 1, - }; - Box::into_raw(b); // don't drop! - result -} - -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_next_batch(connector: *mut Connector) -> c_int { - let mut b = Box::from_raw(connector); // unsafe! - let result = match b.next_batch() { - Ok(batch_index) => batch_index.try_into().unwrap(), - Err(e) => { - overwrite_last_error(format!("{:?}", e).as_bytes()); - -1 - } - }; - Box::into_raw(b); // don't drop! - result -} - -/// # Safety -/// TODO -#[no_mangle] -pub unsafe extern "C" fn connector_sync(connector: *mut Connector, timeout_millis: u64) -> c_int { - let mut b = Box::from_raw(connector); // unsafe! - let result = match b.sync(Duration::from_millis(timeout_millis)) { - Ok(batch_index) => batch_index.try_into().unwrap(), - Err(SyncErr::Timeout) => -1, // timeout! - Err(e) => { - overwrite_last_error(format!("{:?}", e).as_bytes()); - -2 - } - }; - Box::into_raw(b); // don't drop! - result -} diff --git a/src/runtime/retired/serde.rs b/src/runtime/retired/serde.rs deleted file mode 100644 index c9a4ec1cc7c843c8452e1058dda4d79bab2866d2..0000000000000000000000000000000000000000 --- a/src/runtime/retired/serde.rs +++ /dev/null @@ -1,273 +0,0 @@ -use crate::common::*; -use crate::runtime::*; -use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use std::io::{ErrorKind::InvalidData, Read, Write}; - -pub trait Ser: Write { - fn ser(&mut self, t: &T) -> Result<(), std::io::Error>; -} -pub trait De: Read { - fn de(&mut self) -> Result; -} - -pub struct MonitoredReader { - bytes: usize, - r: R, -} -impl From for MonitoredReader { - fn from(r: R) -> Self { - Self { r, bytes: 0 } - } -} -impl MonitoredReader { - pub fn bytes_read(&self) -> usize { - self.bytes - } -} -impl Read for MonitoredReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - let n = self.r.read(buf)?; - self.bytes += n; - Ok(n) - } -} - -///////////////////////////////////////// - -struct VarLenInt(u64); - -macro_rules! ser_seq { - ( $w:expr ) => {{ - io::Result::Ok(()) - }}; - ( $w:expr, $first:expr ) => {{ - $w.ser($first) - }}; - ( $w:expr, $first:expr, $( $x:expr ),+ ) => {{ - $w.ser($first)?; - ser_seq![$w, $( $x ),*] - }}; -} -///////////////////////////////////////// - -impl Ser for W { - fn ser(&mut self, t: &PortId) -> Result<(), std::io::Error> { - self.ser(&t.controller_id)?; - self.ser(&VarLenInt(t.port_index as u64)) - } -} - -impl De for R { - fn de(&mut self) -> Result { - Ok(PortId { controller_id: self.de()?, port_index: De::::de(self)?.0 as u32 }) - } -} - -impl Ser for W { - fn ser(&mut self, t: &bool) -> Result<(), std::io::Error> { - self.ser(&match t { - true => b'T', - false => b'F', - }) - } -} -impl De for R { - fn de(&mut self) -> Result { - let b: u8 = self.de()?; - Ok(match b { - b'T' => true, - b'F' => false, - _ => return Err(InvalidData.into()), - }) - } -} - -impl Ser for W { - fn ser(&mut self, t: &u8) -> Result<(), std::io::Error> { - self.write_u8(*t) - } -} -impl De for R { - fn de(&mut self) -> Result { - self.read_u8() - } -} - -impl Ser for W { - fn ser(&mut self, t: &u16) -> Result<(), std::io::Error> { - self.write_u16::(*t) - } -} -impl De for R { - fn de(&mut self) -> Result { - self.read_u16::() - } -} - -impl Ser for W { - fn ser(&mut self, t: &u32) -> Result<(), std::io::Error> { - self.write_u32::(*t) - } -} -impl De for R { - fn de(&mut self) -> Result { - self.read_u32::() - } -} - -impl Ser for W { - fn ser(&mut self, t: &u64) -> Result<(), std::io::Error> { - self.write_u64::(*t) - } -} -impl De for R { - fn de(&mut self) -> Result { - self.read_u64::() - } -} - -impl Ser for W { - fn ser(&mut self, t: &Payload) -> Result<(), std::io::Error> { - self.ser(&VarLenInt(t.len() as u64))?; - for byte in t.as_slice() { - self.ser(byte)?; - } - Ok(()) - } -} -impl De for R { - fn de(&mut self) -> Result { - let VarLenInt(len) = self.de()?; - let mut x = Vec::with_capacity(len as usize); - for _ in 0..len { - x.push(self.de()?); - } - Ok(x.into()) - } -} - -impl Ser for W { - fn ser(&mut self, t: &VarLenInt) -> Result<(), std::io::Error> { - integer_encoding::VarIntWriter::write_varint(self, t.0).map(|_| ()) - } -} -impl De for R { - fn de(&mut self) -> Result { - integer_encoding::VarIntReader::read_varint(self).map(VarLenInt) - } -} - -impl Ser for W { - fn ser(&mut self, t: &Predicate) -> Result<(), std::io::Error> { - self.ser(&VarLenInt(t.assigned.len() as u64))?; - for (channel_id, boolean) in &t.assigned { - ser_seq![self, channel_id, boolean]?; - } - Ok(()) - } -} -impl De for R { - fn de(&mut self) -> Result { - let VarLenInt(len) = self.de()?; - let mut assigned = BTreeMap::::default(); - for _ in 0..len { - assigned.insert(self.de()?, self.de()?); - } - Ok(Predicate { assigned }) - } -} -impl Ser for W { - fn ser(&mut self, t: &Decision) -> Result<(), std::io::Error> { - match t { - Decision::Failure => self.ser(&b'F'), - Decision::Success(predicate) => { - self.ser(&b'S')?; - self.ser(predicate) - } - } - } -} -impl De for R { - fn de(&mut self) -> Result { - let b: u8 = self.de()?; - Ok(match b { - b'F' => Decision::Failure, - b'S' => Decision::Success(self.de()?), - _ => return Err(InvalidData.into()), - }) - } -} - -impl Ser for W { - fn ser(&mut self, t: &Polarity) -> Result<(), std::io::Error> { - self.ser(&match t { - Polarity::Putter => b'P', - Polarity::Getter => b'G', - }) - } -} -impl De for R { - fn de(&mut self) -> Result { - let b: u8 = self.de()?; - Ok(match b { - b'P' => Polarity::Putter, - b'G' => Polarity::Getter, - _ => return Err(InvalidData.into()), - }) - } -} -impl Ser for W { - fn ser(&mut self, t: &Msg) -> Result<(), std::io::Error> { - use {CommMsgContents::*, SetupMsg::*}; - match t { - Msg::SetupMsg(s) => match s { - // [flag, data] - MyPortInfo { polarity, port } => ser_seq![self, &0u8, polarity, port], - LeaderEcho { maybe_leader } => ser_seq![self, &1u8, maybe_leader], - LeaderAnnounce { leader } => ser_seq![self, &2u8, leader], - YouAreMyParent => ser_seq![self, &3u8], - }, - Msg::CommMsg(CommMsg { round_index, contents }) => { - // [flag, round_num, data] - let varlenint = &VarLenInt(*round_index as u64); - match contents { - SendPayload { payload_predicate, payload } => { - ser_seq![self, &4u8, varlenint, payload_predicate, payload] - } - Elaborate { partial_oracle } => ser_seq![self, &5u8, varlenint, partial_oracle], - Announce { decision } => ser_seq![self, &6u8, varlenint, decision], - Failure => ser_seq![self, &7u8, varlenint], - } - } - } - } -} -impl De for R { - fn de(&mut self) -> Result { - use {CommMsgContents::*, SetupMsg::*}; - let b: u8 = self.de()?; - Ok(match b { - 0..=3 => Msg::SetupMsg(match b { - // [flag, data] - 0u8 => MyPortInfo { polarity: self.de()?, port: self.de()? }, - 1u8 => LeaderEcho { maybe_leader: self.de()? }, - 2u8 => LeaderAnnounce { leader: self.de()? }, - 3u8 => YouAreMyParent, - _ => unreachable!(), - }), - 4..=7 => { - // [flag, round_num, data] - let VarLenInt(varlenint) = self.de()?; - let contents = match b { - 4u8 => SendPayload { payload_predicate: self.de()?, payload: self.de()? }, - 5u8 => Elaborate { partial_oracle: self.de()? }, - 6u8 => Announce { decision: self.de()? }, - 7u8 => Failure, - _ => unreachable!(), - }; - Msg::CommMsg(CommMsg { round_index: varlenint as usize, contents }) - } - _ => return Err(InvalidData.into()), - }) - } -} diff --git a/src/runtime/retired/setup.rs b/src/runtime/retired/setup.rs deleted file mode 100644 index 7e9d70da2fd3abcbb45dfdeac03859ae25d1b645..0000000000000000000000000000000000000000 --- a/src/runtime/retired/setup.rs +++ /dev/null @@ -1,484 +0,0 @@ -use crate::common::*; -use crate::runtime::{ - actors::{MonoN, MonoP}, - endpoint::*, - errors::*, - *, -}; - -#[derive(Debug)] -enum EndpointExtTodo { - Finished(EndpointExt), - ActiveConnecting { addr: SocketAddr, polarity: Polarity, stream: TcpStream }, - ActiveRecving { addr: SocketAddr, polarity: Polarity, endpoint: Endpoint }, - PassiveAccepting { addr: SocketAddr, info: EndpointInfo, listener: TcpListener }, - PassiveConnecting { addr: SocketAddr, info: EndpointInfo, stream: TcpStream }, -} - -///////////////////// IMPL ///////////////////// -impl Controller { - // Given port bindings and a protocol config, create a connector with 1 native node - pub fn connect( - major: ControllerId, - main_component: &[u8], - protocol_description: Arc, - bound_proto_interface: &[(PortBinding, Polarity)], - logger: &mut String, - deadline: Instant, - ) -> Result<(Self, Vec<(PortId, Polarity)>), ConnectErr> { - use ConnectErr::*; - - log!(logger, "CONNECT PHASE START! MY CID={:?} STARTING LOGGER ~", major); - - let mut channel_id_stream = ChannelIdStream::new(major); - let mut endpoint_ext_todos = Arena::default(); - - let mut ports_native = vec![]; - let mut ports_proto = vec![]; - let mut ports_network = vec![]; - - let mut native_interface = vec![]; - - /* - 1. - allocate an EndpointExtTodo for every native and interface port - - store all the resulting ports in two portlists for the interfaces of the native and proto components - native: [a, c, f] - | | | - | | | - proto: [b, d, e, g] - ^todo - arena: - */ - for &(binding, polarity) in bound_proto_interface.iter() { - match binding { - PortBinding::Native => { - let channel_id = channel_id_stream.next(); - let ([port_native, port_proto], native_polarity) = { - let [p, g] = Endpoint::new_memory_pair(); - let mut endpoint_to_port = |endpoint, polarity| { - endpoint_ext_todos.alloc(EndpointExtTodo::Finished(EndpointExt { - endpoint, - info: EndpointInfo { polarity, channel_id }, - })) - }; - let pport = endpoint_to_port(p, Putter); - let gport = endpoint_to_port(g, Getter); - let port_pair = match polarity { - Putter => [gport, pport], - Getter => [pport, gport], - }; - (port_pair, !polarity) - }; - native_interface.push((port_native, native_polarity)); - ports_native.push(port_native); - ports_proto.push(port_proto); - } - PortBinding::Passive(addr) => { - let channel_id = channel_id_stream.next(); - let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::PassiveAccepting { - addr, - info: EndpointInfo { polarity, channel_id }, - listener: TcpListener::bind(&addr).map_err(|_| BindFailed(addr))?, - }); - ports_network.push(port_proto); - ports_proto.push(port_proto); - } - PortBinding::Active(addr) => { - let port_proto = endpoint_ext_todos.alloc(EndpointExtTodo::ActiveConnecting { - addr, - polarity, - stream: TcpStream::connect(&addr).unwrap(), - }); - ports_network.push(port_proto); - ports_proto.push(port_proto); - } - } - } - log!(logger, "{:03?} setup todos...", major); - - // 2. convert the arena to Arena and return the - let (mut messenger_state, mut endpoint_exts) = - Self::finish_endpoint_ext_todos(major, logger, endpoint_ext_todos, deadline)?; - - let n_mono = MonoN { ports: ports_native.into_iter().collect(), result: None }; - let p_monos = vec![MonoP { - state: protocol_description.new_main_component(main_component, &ports_proto), - ports: ports_proto.into_iter().collect(), - }]; - - // 6. Become a node in a sink tree, computing {PARENT, CHILDREN} from {NEIGHBORS} - let family = Self::setup_sink_tree_family( - major, - logger, - &mut endpoint_exts, - &mut messenger_state, - ports_network, - deadline, - )?; - - log!(logger, "CONNECT PHASE END! ~"); - let inner = ControllerInner { - family, - messenger_state, - channel_id_stream, - endpoint_exts, - mono_ps: p_monos, - mono_n: n_mono, - round_index: 0, - logger: { - let mut l = String::default(); - std::mem::swap(&mut l, logger); - l - }, - }; - let controller = Self { - protocol_description, - inner, - ephemeral: Default::default(), - // round_histories: vec![], - unrecoverable_error: None, - }; - Ok((controller, native_interface)) - } - - // with mio v0.6 attempting to read bytes into a nonempty buffer appears to - // be the only reliably platform-independent means of testing the connectivity of - // a mio::TcpStream (see Self::connection_testing_read). - // as this unavoidably MAY read some crucial payload bytes, we have to be careful - // to pass these potentially populated buffers into the Endpoint, or bytes may be lost. - // This is done with Endpoint::from_fresh_stream_and_inbox. - fn connection_testing_read(stream: &mut TcpStream, inbox: &mut Vec) -> std::io::Result<()> { - inbox.clear(); - use std::io::Read; - match stream.read_to_end(inbox) { - Ok(0) => unreachable!("Ok(0) on read should return Err instead!"), - Ok(_) => Ok(()), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()), - Err(e) => Err(e), - } - } - - // inserts - fn finish_endpoint_ext_todos( - major: ControllerId, - logger: &mut String, - mut endpoint_ext_todos: Arena, - deadline: Instant, - ) -> Result<(MessengerState, Arena), ConnectErr> { - use {ConnectErr::*, EndpointExtTodo::*}; - - // 1. define and setup a poller and event loop - let edge = PollOpt::edge(); - let [ready_r, ready_w] = [Ready::readable(), Ready::writable()]; - let mut ms = MessengerState { - poll: Poll::new().map_err(|_| PollInitFailed)?, - events: Events::with_capacity(endpoint_ext_todos.len()), - delayed: vec![], - undelayed: vec![], - polled_undrained: Default::default(), - }; - - // 2. Register all EndpointExtTodos with ms.poll. each has one of {Endpoint, TcpStream, TcpListener} - // 3. store the portset of EndpointExtTodos which are not Finished in `to_finish`. - let mut to_finish = HashSet::<_>::default(); - log!(logger, "endpoint_ext_todos len {:?}", endpoint_ext_todos.len()); - for (port, t) in endpoint_ext_todos.iter() { - let token = port.to_token(); - match t { - ActiveRecving { .. } | PassiveConnecting { .. } => unreachable!(), - Finished(EndpointExt { endpoint, .. }) => { - ms.poll.register(endpoint, token, ready_r, edge) - } - ActiveConnecting { stream, .. } => { - to_finish.insert(port); - ms.poll.register(stream, token, ready_w, edge) - } - PassiveAccepting { listener, .. } => { - to_finish.insert(port); - ms.poll.register(listener, token, ready_r, edge) - } - } - .expect("register first"); - } - // invariant: every EndpointExtTodo has one thing registered with mio - - // 4. until all in endpoint_ext_todos are Finished variant, handle events - let mut polled_undrained_later = IndexSet::<_>::default(); - let mut backoff_millis = 10; - // see Self::connection_testing_read for why we populate Endpoint inboxes here. - let mut next_inbox = vec![]; - while !to_finish.is_empty() { - ms.poll_events(deadline).map_err(|e| { - log!(logger, "{:03?} timing out", major); - e - })?; - for event in ms.events.iter() { - log!(logger, "event {:#?}", event); - let token = event.token(); - let port = PortId::from_token(token); - let entry = endpoint_ext_todos.get_mut(port).unwrap(); - match entry { - Finished(_) => { - polled_undrained_later.insert(port); - } - PassiveAccepting { addr, listener, .. } => { - log!(logger, "{:03?} start PassiveAccepting...", major); - assert!(event.readiness().is_readable()); - let (stream, _peer_addr) = - listener.accept().map_err(|_| AcceptFailed(*addr))?; - ms.poll.deregister(listener).expect("wer"); - ms.poll.register(&stream, token, ready_w, edge).expect("3y5"); - take_mut::take(entry, |e| { - assert_let![PassiveAccepting { addr, info, .. } = e => { - PassiveConnecting { addr, info, stream } - }] - }); - log!(logger, "{:03?} ... end PassiveAccepting", major); - } - PassiveConnecting { addr, stream, .. } => { - log!(logger, "{:03?} start PassiveConnecting...", major); - assert!(event.readiness().is_writable()); - if Self::connection_testing_read(stream, &mut next_inbox).is_err() { - return Err(PassiveConnectFailed(*addr)); - } - ms.poll.reregister(stream, token, ready_r, edge).expect("52"); - let mut res = Ok(()); - take_mut::take(entry, |e| { - let mut inbox = vec![]; - std::mem::swap(&mut inbox, &mut next_inbox); - assert_let![PassiveConnecting { info, stream, .. } = e => { - let mut endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox); - let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info }); - res = endpoint.send(msg); - Finished(EndpointExt { info, endpoint }) - }] - }); - res?; - log!(logger, "{:03?} ... end PassiveConnecting", major); - assert!(to_finish.remove(&port)); - } - ActiveConnecting { addr, stream, .. } => { - log!(logger, "{:03?} start ActiveConnecting...", major); - assert!(event.readiness().is_writable()); - if Self::connection_testing_read(stream, &mut next_inbox).is_ok() { - // connect successful - log!(logger, "Connectivity test passed"); - ms.poll.reregister(stream, token, ready_r, edge).expect("52"); - take_mut::take(entry, |e| { - let mut inbox = vec![]; - std::mem::swap(&mut inbox, &mut next_inbox); - assert_let![ActiveConnecting { stream, polarity, addr } = e => { - let endpoint = Endpoint::from_fresh_stream_and_inbox(stream, inbox); - ActiveRecving { endpoint, polarity, addr } - }] - }); - } else { - // connect failure. retry! - log!(logger, "CONNECT FAIL"); - ms.poll.deregister(stream).expect("wt"); - std::thread::sleep(Duration::from_millis(backoff_millis)); - backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3; - let mut new_stream = TcpStream::connect(addr).unwrap(); - ms.poll.register(&new_stream, token, ready_w, edge).expect("PAC 3"); - std::mem::swap(stream, &mut new_stream); - } - log!(logger, "{:03?} ... end ActiveConnecting", major); - } - ActiveRecving { addr, polarity, endpoint } => { - log!(logger, "{:03?} start ActiveRecving...", major); - assert!(event.readiness().is_readable()); - 'recv_loop: while let Some(msg) = endpoint.recv()? { - if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg { - if info.polarity == *polarity { - return Err(PolarityMatched(*addr)); - } - take_mut::take(entry, |e| { - assert_let![ActiveRecving { polarity, endpoint, .. } = e => { - let info = EndpointInfo { polarity, channel_id: info.channel_id }; - Finished(EndpointExt { info, endpoint }) - }] - }); - ms.polled_undrained.insert(port); - assert!(to_finish.remove(&port)); - break 'recv_loop; - } else { - ms.delayed.push(ReceivedMsg { recipient: port, msg }); - } - } - log!(logger, "{:03?} ... end ActiveRecving", major); - } - } - } - } - for port in polled_undrained_later { - ms.polled_undrained.insert(port); - } - let endpoint_exts = endpoint_ext_todos.type_convert(|(_, todo)| match todo { - Finished(endpoint_ext) => endpoint_ext, - _ => unreachable!(), - }); - Ok((ms, endpoint_exts)) - } - - fn setup_sink_tree_family( - major: ControllerId, - logger: &mut String, - endpoint_exts: &mut Arena, - messenger_state: &mut MessengerState, - neighbors: Vec, - deadline: Instant, - ) -> Result { - use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*}; - - log!(logger, "neighbors {:?}", &neighbors); - - let mut messenger = (messenger_state, endpoint_exts); - impl Messengerlike for (&mut MessengerState, &mut Arena) { - fn get_state_mut(&mut self) -> &mut MessengerState { - self.0 - } - fn get_endpoint_mut(&mut self, port: PortId) -> &mut Endpoint { - &mut self.1.get_mut(port).expect("OUT OF BOUNDS").endpoint - } - } - - // 1. broadcast my ID as the first echo. await reply from all in net_portlist - let echo = S(LeaderEcho { maybe_leader: major }); - let mut awaiting = IndexSet::with_capacity(neighbors.len()); - for &n in neighbors.iter() { - log!(logger, "{:?}'s initial echo to {:?}, {:?}", major, n, &echo); - messenger.send(n, echo.clone())?; - awaiting.insert(n); - } - - // 2. Receive incoming replies. whenever a higher-id echo arrives, - // adopt it as leader, sender as parent, and reset the await set. - let mut parent: Option = None; - let mut my_leader = major; - messenger.undelay_all(); - 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?; - log!(logger, "{:?} GOT {:?} {:?}", major, &recipient, &msg); - match msg { - S(LeaderAnnounce { leader }) => { - // someone else completed the echo and became leader first! - // the sender is my parent - parent = Some(recipient); - my_leader = leader; - awaiting.clear(); - break 'echo_loop; - } - S(LeaderEcho { maybe_leader }) => { - use Ordering::*; - match maybe_leader.cmp(&my_leader) { - Less => { /* ignore */ } - Equal => { - awaiting.remove(&recipient); - if awaiting.is_empty() { - if let Some(p) = parent { - // return the echo to my parent - messenger.send(p, S(LeaderEcho { maybe_leader }))?; - } else { - // DECIDE! - break 'echo_loop; - } - } - } - Greater => { - // join new echo - log!(logger, "{:?} setting leader to {:?}", major, recipient); - parent = Some(recipient); - my_leader = maybe_leader; - let echo = S(LeaderEcho { maybe_leader: my_leader }); - awaiting.clear(); - if neighbors.len() == 1 { - // immediately reply to parent - log!( - logger, - "{:?} replying echo to parent {:?} immediately", - major, - recipient - ); - messenger.send(recipient, echo.clone())?; - } else { - for &n in neighbors.iter() { - if n != recipient { - log!( - logger, - "{:?} repeating echo {:?} to {:?}", - major, - &echo, - n - ); - messenger.send(n, echo.clone())?; - awaiting.insert(n); - } - } - } - } - } - } - msg => messenger.delay(ReceivedMsg { recipient, msg }), - } - } - match parent { - None => assert_eq!( - my_leader, major, - "I've got no parent, but I consider {:?} the leader?", - my_leader - ), - Some(parent) => assert_ne!( - my_leader, major, - "I have {:?} as parent, but I consider myself ({:?}) the leader?", - parent, major - ), - } - - log!(logger, "{:?} DONE WITH ECHO! Leader has cid={:?}", major, my_leader); - - // 3. broadcast leader announcement (except to parent: confirm they are your parent) - // in this loop, every node sends 1 message to each neighbor - let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader }); - for &k in neighbors.iter() { - let msg = - if Some(k) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() }; - log!(logger, "{:?} ANNOUNCING to {:?} {:?}", major, k, &msg); - messenger.send(k, msg)?; - } - - // await 1 message from all non-parents - for &n in neighbors.iter() { - if Some(n) != parent { - awaiting.insert(n); - } - } - let mut children = Vec::default(); - messenger.undelay_all(); - while !awaiting.is_empty() { - let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?; - match msg { - S(YouAreMyParent) => { - assert!(awaiting.remove(&recipient)); - children.push(recipient); - } - S(SetupMsg::LeaderAnnounce { leader }) => { - assert!(awaiting.remove(&recipient)); - assert!(leader == my_leader); - assert!(Some(recipient) != parent); - // they wouldn't send me this if they considered me their parent - } - _ => messenger.delay(ReceivedMsg { recipient, msg }), - } - } - Ok(ControllerFamily { parent_port: parent, children_ports: children }) - } -} - -impl Messengerlike for Controller { - fn get_state_mut(&mut self) -> &mut MessengerState { - &mut self.inner.messenger_state - } - fn get_endpoint_mut(&mut self, port: PortId) -> &mut Endpoint { - &mut self.inner.endpoint_exts.get_mut(port).expect("OUT OF BOUNDS").endpoint - } -} diff --git a/src/runtime/retired/v2.rs b/src/runtime/retired/v2.rs deleted file mode 100644 index abd043454dd94e5c1060e66ecada817add63cf96..0000000000000000000000000000000000000000 --- a/src/runtime/retired/v2.rs +++ /dev/null @@ -1,8 +0,0 @@ -use crate::common::{ - Arc, ControllerId, Duration, HashMap, HashSet, Instant, Payload, Polarity, Port, PortId, - ProtocolDescription, SocketAddr, -}; -use crate::runtime::endpoint::{Endpoint, Msg}; -use crate::runtime::*; - -/////////////////////////////