diff --git a/src/runtime/experimental/api.rs b/src/runtime/experimental/api.rs index b2fb237337db0cbd99dede2e0181562eb8e11620..2532056a07867a730c8730444571df18107c5bea 100644 --- a/src/runtime/experimental/api.rs +++ b/src/runtime/experimental/api.rs @@ -1,4 +1,4 @@ -use super::bits::{usizes_for_bits, BitChunkIter, BitMatrix}; +use super::bits::{usizes_for_bits, BitChunkIter, BitMatrix, Pair, TRUE_CHUNK}; use super::vec_storage::VecStorage; use crate::common::*; use crate::runtime::endpoint::EndpointExt; @@ -99,7 +99,7 @@ struct Component { protocol: Arc, port_set: HashSet, identifier: Arc<[u8]>, - state: ProtocolS, + state: Option, // invariant between rounds: Some() } impl From for ConnectErr { @@ -443,7 +443,41 @@ pub struct Connected { } #[derive(Debug, Default)] struct Ephemeral { + // invariant: between rounds this is cleared + machines: Vec, bit_matrix: BitMatrix, + assignment_to_bit_property: HashMap<(ChannelId, bool), usize>, + usize_buf: Vec, +} +impl Ephemeral { + fn clear(&mut self) { + self.bit_matrix = Default::default(); + self.usize_buf.clear(); + self.machines.clear(); + self.assignment_to_bit_property.clear(); + } +} +#[derive(Debug)] +struct Machine { + component_index: usize, + state: ProtocolS, +} +struct MonoCtx<'a> { + another_pass: &'a mut bool, +} +impl MonoContext for MonoCtx<'_> { + type D = ProtocolD; + type S = ProtocolS; + + fn new_component(&mut self, moved_keys: HashSet, init_state: Self::S) { + todo!() + } + fn new_channel(&mut self) -> [Key; 2] { + todo!() + } + fn new_random(&mut self) -> u64 { + todo!() + } } impl Connected { pub fn new_component( @@ -482,7 +516,7 @@ impl Connected { return Err(WrongPortPolarity { param_index, port }); } } - let state = protocol.new_main_component(identifier, &moved_port_list); + let state = Some(protocol.new_main_component(identifier, &moved_port_list)); let component = Component { port_set: moved_port_set, protocol: protocol.clone(), @@ -513,6 +547,83 @@ impl Connected { (OutPort(Port(kp)), InPort(Port(kg))) } pub fn sync_set(&mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs]) -> Result<(), ()> { + // For every component, take its state and make a singleton machine + for (component_index, component) in self.components.iter_mut().enumerate() { + let machine = Machine { component_index, state: component.state.take().unwrap() }; + self.ephemeral.machines.push(machine); + } + // Grow property matrix. has |machines| entities and {to_run => 0, to_remove => 1} properties + const PROP_TO_RUN: usize = 0; + const PROP_TO_REMOVE: usize = 1; + self.ephemeral + .bit_matrix + .grow_to(Pair { property: 2, entity: self.ephemeral.machines.len() as u32 }); + // Set to_run property for all existing machines + self.ephemeral.bit_matrix.batch_mut(move |p| p[PROP_TO_RUN] = TRUE_CHUNK); + + ///////////// + // perform mono runs, adding and removing TO_RUN property bits + let mut usize_buf = vec![]; + let mut another_pass = true; + while another_pass { + another_pass = false; + let machine_index_iter = self + .ephemeral + .bit_matrix + .iter_entities_where(&mut usize_buf, move |p| p[PROP_TO_RUN]); + for machine_index in machine_index_iter { + let machine = &mut self.ephemeral.machines[machine_index as usize]; + let component = self.components.get_occupied(machine.component_index).unwrap(); + let mut ctx = MonoCtx { another_pass: &mut another_pass }; + match machine.state.pre_sync_run(&mut ctx, &component.protocol) { + MonoBlocker::Inconsistent => todo!(), + MonoBlocker::ComponentExit => self + .ephemeral + .bit_matrix + .set(Pair { entity: machine_index, property: PROP_TO_REMOVE as u32 }), + MonoBlocker::SyncBlockStart => self + .ephemeral + .bit_matrix + .unset(Pair { entity: machine_index, property: PROP_TO_RUN as u32 }), + } + } + } + // no machines have property TO_RUN + + // from back to front, swap_remove all machines with PROP_TO_REMOVE + let machine_index_iter = self + .ephemeral + .bit_matrix + .iter_entities_where_rev(&mut usize_buf, move |p| p[PROP_TO_REMOVE]); + self.ephemeral.bit_matrix = Default::default(); // clear matrix + for machine_index in machine_index_iter { + self.ephemeral.machines.swap_remove(machine_index as usize); + } + + // logically destructure self so we can read and write to different fields interleaved... + let solution_assignments: Vec<(ChannelId, bool)> = vec![]; + let Self { + components, + ephemeral: Ephemeral { bit_matrix, assignment_to_bit_property, usize_buf, machines }, + .. + } = self; + + // !!!!!!! TODO MORE HERE + + let machine_index_iter = bit_matrix.iter_entities_where(usize_buf, move |p| { + solution_assignments.iter().fold(TRUE_CHUNK, |chunk, assignment| { + let &bit_property = assignment_to_bit_property.get(assignment).unwrap(); + chunk & p[bit_property] + }) + }); + for machine_index in machine_index_iter { + let machine = &machines[machine_index as usize]; + let component = &mut components.get_occupied_mut(machine.component_index).unwrap(); + component.state = Some(machine.state.clone()); + println!("visiting machine at index {:?}", machine_index); + } + self.ephemeral.clear(); + println!("B {:#?}", self); Ok(()) } pub fn sync_subsets( @@ -660,14 +771,19 @@ fn api_connecting() { const TIMEOUT: Option = Some(Duration::from_secs(1)); let handles = vec![ std::thread::spawn(move || { - let mut connecting = Connecting::default(); - let p_in: InPort = connecting.bind(Coupling::Passive, addrs[0]); - let p_out: OutPort = connecting.bind(Coupling::Active, addrs[1]); - let mut connected = connecting.connect(TIMEOUT).unwrap(); + let mut c = Connecting::default(); + let p_in: InPort = c.bind(Coupling::Passive, addrs[0]); + let p_out: OutPort = c.bind(Coupling::Active, addrs[1]); + let mut c = c.connect(TIMEOUT).unwrap(); + println!("c {:#?}", &c); + let identifier = b"sync".to_vec().into(); - println!("connected {:#?}", &connected); - connected.new_component(&PROTOCOL, &identifier, &[p_in.into(), p_out.into()]).unwrap(); - println!("connected {:#?}", &connected); + c.new_component(&PROTOCOL, &identifier, &[p_in.into(), p_out.into()]).unwrap(); + println!("c {:#?}", &c); + + let mut inbuf = vec![]; + let mut port_ops = []; + c.sync_set(&mut inbuf, &mut port_ops).unwrap(); }), std::thread::spawn(move || { let mut connecting = Connecting::default();