diff --git a/src/runtime/experimental/api.rs b/src/runtime/experimental/api.rs index b2fb237337db0cbd99dede2e0181562eb8e11620..2532056a07867a730c8730444571df18107c5bea 100644 --- a/src/runtime/experimental/api.rs +++ b/src/runtime/experimental/api.rs @@ -1,4 +1,4 @@ -use super::bits::{usizes_for_bits, BitChunkIter, BitMatrix}; +use super::bits::{usizes_for_bits, BitChunkIter, BitMatrix, Pair, TRUE_CHUNK}; use super::vec_storage::VecStorage; use crate::common::*; use crate::runtime::endpoint::EndpointExt; @@ -99,7 +99,7 @@ struct Component { protocol: Arc, port_set: HashSet, identifier: Arc<[u8]>, - state: ProtocolS, + state: Option, // invariant between rounds: Some() } impl From for ConnectErr { @@ -443,7 +443,41 @@ pub struct Connected { } #[derive(Debug, Default)] struct Ephemeral { + // invariant: between rounds this is cleared + machines: Vec, bit_matrix: BitMatrix, + assignment_to_bit_property: HashMap<(ChannelId, bool), usize>, + usize_buf: Vec, +} +impl Ephemeral { + fn clear(&mut self) { + self.bit_matrix = Default::default(); + self.usize_buf.clear(); + self.machines.clear(); + self.assignment_to_bit_property.clear(); + } +} +#[derive(Debug)] +struct Machine { + component_index: usize, + state: ProtocolS, +} +struct MonoCtx<'a> { + another_pass: &'a mut bool, +} +impl MonoContext for MonoCtx<'_> { + type D = ProtocolD; + type S = ProtocolS; + + fn new_component(&mut self, moved_keys: HashSet, init_state: Self::S) { + todo!() + } + fn new_channel(&mut self) -> [Key; 2] { + todo!() + } + fn new_random(&mut self) -> u64 { + todo!() + } } impl Connected { pub fn new_component( @@ -482,7 +516,7 @@ impl Connected { return Err(WrongPortPolarity { param_index, port }); } } - let state = protocol.new_main_component(identifier, &moved_port_list); + let state = Some(protocol.new_main_component(identifier, &moved_port_list)); let component = Component { port_set: moved_port_set, protocol: protocol.clone(), @@ -513,6 +547,83 @@ impl Connected { (OutPort(Port(kp)), InPort(Port(kg))) } pub fn sync_set(&mut self, _inbuf: &mut [u8], _ops: &mut [PortOpRs]) -> Result<(), ()> { + // For every component, take its state and make a singleton machine + for (component_index, component) in self.components.iter_mut().enumerate() { + let machine = Machine { component_index, state: component.state.take().unwrap() }; + self.ephemeral.machines.push(machine); + } + // Grow property matrix. has |machines| entities and {to_run => 0, to_remove => 1} properties + const PROP_TO_RUN: usize = 0; + const PROP_TO_REMOVE: usize = 1; + self.ephemeral + .bit_matrix + .grow_to(Pair { property: 2, entity: self.ephemeral.machines.len() as u32 }); + // Set to_run property for all existing machines + self.ephemeral.bit_matrix.batch_mut(move |p| p[PROP_TO_RUN] = TRUE_CHUNK); + + ///////////// + // perform mono runs, adding and removing TO_RUN property bits + let mut usize_buf = vec![]; + let mut another_pass = true; + while another_pass { + another_pass = false; + let machine_index_iter = self + .ephemeral + .bit_matrix + .iter_entities_where(&mut usize_buf, move |p| p[PROP_TO_RUN]); + for machine_index in machine_index_iter { + let machine = &mut self.ephemeral.machines[machine_index as usize]; + let component = self.components.get_occupied(machine.component_index).unwrap(); + let mut ctx = MonoCtx { another_pass: &mut another_pass }; + match machine.state.pre_sync_run(&mut ctx, &component.protocol) { + MonoBlocker::Inconsistent => todo!(), + MonoBlocker::ComponentExit => self + .ephemeral + .bit_matrix + .set(Pair { entity: machine_index, property: PROP_TO_REMOVE as u32 }), + MonoBlocker::SyncBlockStart => self + .ephemeral + .bit_matrix + .unset(Pair { entity: machine_index, property: PROP_TO_RUN as u32 }), + } + } + } + // no machines have property TO_RUN + + // from back to front, swap_remove all machines with PROP_TO_REMOVE + let machine_index_iter = self + .ephemeral + .bit_matrix + .iter_entities_where_rev(&mut usize_buf, move |p| p[PROP_TO_REMOVE]); + self.ephemeral.bit_matrix = Default::default(); // clear matrix + for machine_index in machine_index_iter { + self.ephemeral.machines.swap_remove(machine_index as usize); + } + + // logically destructure self so we can read and write to different fields interleaved... + let solution_assignments: Vec<(ChannelId, bool)> = vec![]; + let Self { + components, + ephemeral: Ephemeral { bit_matrix, assignment_to_bit_property, usize_buf, machines }, + .. + } = self; + + // !!!!!!! TODO MORE HERE + + let machine_index_iter = bit_matrix.iter_entities_where(usize_buf, move |p| { + solution_assignments.iter().fold(TRUE_CHUNK, |chunk, assignment| { + let &bit_property = assignment_to_bit_property.get(assignment).unwrap(); + chunk & p[bit_property] + }) + }); + for machine_index in machine_index_iter { + let machine = &machines[machine_index as usize]; + let component = &mut components.get_occupied_mut(machine.component_index).unwrap(); + component.state = Some(machine.state.clone()); + println!("visiting machine at index {:?}", machine_index); + } + self.ephemeral.clear(); + println!("B {:#?}", self); Ok(()) } pub fn sync_subsets( @@ -660,14 +771,19 @@ fn api_connecting() { const TIMEOUT: Option = Some(Duration::from_secs(1)); let handles = vec![ std::thread::spawn(move || { - let mut connecting = Connecting::default(); - let p_in: InPort = connecting.bind(Coupling::Passive, addrs[0]); - let p_out: OutPort = connecting.bind(Coupling::Active, addrs[1]); - let mut connected = connecting.connect(TIMEOUT).unwrap(); + let mut c = Connecting::default(); + let p_in: InPort = c.bind(Coupling::Passive, addrs[0]); + let p_out: OutPort = c.bind(Coupling::Active, addrs[1]); + let mut c = c.connect(TIMEOUT).unwrap(); + println!("c {:#?}", &c); + let identifier = b"sync".to_vec().into(); - println!("connected {:#?}", &connected); - connected.new_component(&PROTOCOL, &identifier, &[p_in.into(), p_out.into()]).unwrap(); - println!("connected {:#?}", &connected); + c.new_component(&PROTOCOL, &identifier, &[p_in.into(), p_out.into()]).unwrap(); + println!("c {:#?}", &c); + + let mut inbuf = vec![]; + let mut port_ops = []; + c.sync_set(&mut inbuf, &mut port_ops).unwrap(); }), std::thread::spawn(move || { let mut connecting = Connecting::default(); diff --git a/src/runtime/experimental/bits.rs b/src/runtime/experimental/bits.rs index ebe9b0a6c41e6585190951947255e9ae498bc4d4..cc943eecdc99d552b2c3a2c647661bfe81e186bd 100644 --- a/src/runtime/experimental/bits.rs +++ b/src/runtime/experimental/bits.rs @@ -19,6 +19,7 @@ pub const fn usizes_for_bits(bits: usize) -> usize { type Chunk = usize; type BitIndex = usize; + pub(crate) struct BitChunkIter> { cached: usize, chunk_iter: I, @@ -26,7 +27,7 @@ pub(crate) struct BitChunkIter> { } impl> BitChunkIter { pub fn new(chunk_iter: I) -> Self { - // first chunk is always a dummy zero, as if chunk_iter yielded Some(FALSE). + // first chunk is always a dummy zero, as if chunk_iter yielded Some(FALSE_CHUNK). // Consequences: // 1. our next_bit_index is always off by usize_bits() (we correct for it in Self::next) (no additional overhead) // 2. we cache Chunk and not Option, because chunk_iter.next() is only called in Self::next. @@ -84,6 +85,47 @@ impl> Iterator for BitChunkIter { } } +pub(crate) struct BitChunkIterRev> { + cached: usize, + chunk_iter: I, + next_bit_index: BitIndex, +} +impl> BitChunkIterRev { + pub fn new(chunk_iter: I) -> Self { + let next_bit_index = chunk_iter.len() * usize_bits(); + Self { chunk_iter, next_bit_index, cached: 0 } + } +} +impl> Iterator for BitChunkIterRev { + type Item = BitIndex; + fn next(&mut self) -> Option { + let mut chunk = self.cached; + if chunk == 0 { + self.next_bit_index += usize_bits(); + loop { + self.next_bit_index -= usize_bits(); + chunk = self.chunk_iter.next()?; + if chunk != 0 { + break; + } + } + } + const N_INIT: BitIndex = usize_bits() / 2; + let mut n = N_INIT; + while n >= 1 { + let n_most_significant_mask = !0 << (usize_bits() - n); + if chunk & n_most_significant_mask == 0 { + self.next_bit_index -= n; + chunk <<= n; + } + n /= 2; + } + self.cached = chunk << 1; + self.next_bit_index -= 1; + Some(self.next_bit_index) + } +} + /* --properties--> ___ ___ ___ ___ |___|___|___|___| @@ -98,9 +140,9 @@ impl> Iterator for BitChunkIter { // TODO newtypes Entity and Property #[derive(Debug, Copy, Clone, Eq, PartialEq)] -struct Pair { - entity: u32, - property: u32, +pub struct Pair { + pub entity: u32, + pub property: u32, } impl From<[u32; 2]> for Pair { fn from([entity, property]: [u32; 2]) -> Self { @@ -173,7 +215,7 @@ impl BitMatrix { } #[inline] const fn column_chunks(entity_bound: usize) -> usize { - usizes_for_bits(entity_bound + 1) + usizes_for_bits(entity_bound) } #[inline] fn offsets_unchecked(&self, at: Pair) -> [usize; 2] { @@ -211,12 +253,40 @@ impl BitMatrix { } } ///////// + pub fn grow_to(&mut self, bounds: Pair) { + assert!(bounds.entity >= self.bounds.entity); + assert!(bounds.property >= self.bounds.property); - fn reshape(&mut self, bounds: Pair) { - todo!() + let old_row_chunks = Self::row_chunks(self.bounds.property as usize); + let old_col_chunks = Self::column_chunks(self.bounds.entity as usize); + let new_row_chunks = Self::row_chunks(bounds.property as usize); + let new_col_chunks = Self::column_chunks(bounds.entity as usize); + + let new_layout = Self::layout_for(new_row_chunks * new_col_chunks); + let new_buffer = unsafe { + let new_buffer = std::alloc::alloc(new_layout) as *mut usize; + let mut src: *mut usize = self.buffer; + let mut dest: *mut usize = new_buffer; + let row_chunk_diff = new_row_chunks - old_row_chunks; + for _col_idx in 0..old_col_chunks { + src.copy_to_nonoverlapping(dest, old_row_chunks); + src = src.add(old_row_chunks); + dest = dest.add(old_row_chunks); + if row_chunk_diff > 0 { + dest.write_bytes(0u8, row_chunk_diff); + dest = dest.add(row_chunk_diff); + } + } + let last_zero_chunks = (new_col_chunks - old_col_chunks) * new_row_chunks; + dest.write_bytes(0u8, last_zero_chunks); + new_buffer + }; + self.layout = new_layout; + self.buffer = new_buffer; + self.bounds = bounds; } - fn new(bounds: Pair) -> Self { + pub fn new(bounds: Pair) -> Self { let total_chunks = Self::row_chunks(bounds.property as usize) * Self::column_chunks(bounds.entity as usize); let layout = Self::layout_for(total_chunks); @@ -227,23 +297,23 @@ impl BitMatrix { }; Self { buffer, bounds, layout } } - fn set(&mut self, at: Pair) { + pub fn set(&mut self, at: Pair) { self.assert_within_bounds(at); let [o_of, o_in] = self.offsets_unchecked(at); unsafe { *self.buffer.add(o_of) |= 1 << o_in }; } - fn unset(&mut self, at: Pair) { + pub fn unset(&mut self, at: Pair) { self.assert_within_bounds(at); let [o_of, o_in] = self.offsets_unchecked(at); unsafe { *self.buffer.add(o_of) &= !(1 << o_in) }; } - fn test(&self, at: Pair) -> bool { + pub fn test(&self, at: Pair) -> bool { self.assert_within_bounds(at); let [o_of, o_in] = self.offsets_unchecked(at); unsafe { *self.buffer.add(o_of) & 1 << o_in != 0 } } - fn batch_mut<'a, 'b>(&mut self, mut chunk_mut_fn: impl FnMut(&'b mut [BitChunk])) { + pub fn batch_mut<'a, 'b>(&mut self, mut chunk_mut_fn: impl FnMut(&'b mut [BitChunk])) { let row_chunks = Self::row_chunks(self.bounds.property as usize); let column_chunks = Self::column_chunks(self.bounds.entity as usize); let mut ptr = self.buffer; @@ -272,7 +342,7 @@ impl BitMatrix { /// 1. a buffer to work with /// 2. a _fold function_ for combining the properties of a given entity /// and returning a new derived property (working ) - fn iter_entities_where<'a, 'b>( + pub fn iter_entities_where<'a, 'b>( &'a self, buf: &'b mut Vec, mut fold_fn: impl FnMut(&'b [BitChunk]) -> BitChunk, @@ -296,6 +366,30 @@ impl BitMatrix { } BitChunkIter::new(buf.drain(buf_start..)).map(|x| x as u32) } + pub fn iter_entities_where_rev<'a, 'b>( + &'a self, + buf: &'b mut Vec, + mut fold_fn: impl FnMut(&'b [BitChunk]) -> BitChunk, + ) -> impl Iterator + 'b { + let buf_start = buf.len(); + let row_chunks = Self::row_chunks(self.bounds.property as usize); + let column_chunks = Self::column_chunks(self.bounds.entity as usize); + let mut ptr = self.buffer; + for _row in 0..column_chunks { + let slice; + unsafe { + let slicey = std::slice::from_raw_parts(ptr, row_chunks); + slice = std::mem::transmute(slicey); + ptr = ptr.add(row_chunks); + } + let chunk = fold_fn(slice); + buf.push(chunk.0); + } + if let Some(mask) = Self::last_row_chunk_mask(self.bounds.entity) { + *buf.iter_mut().last().unwrap() &= mask; + } + BitChunkIterRev::new(buf.drain(buf_start..).rev()).map(|x| x as u32) + } } use derive_more::*; @@ -306,14 +400,14 @@ use derive_more::*; pub struct BitChunk(usize); impl BitChunk { const fn any(self) -> bool { - self.0 != FALSE.0 + self.0 != FALSE_CHUNK.0 } const fn all(self) -> bool { - self.0 == TRUE.0 + self.0 == TRUE_CHUNK.0 } } -const TRUE: BitChunk = BitChunk(!0); -const FALSE: BitChunk = BitChunk(0); +pub const TRUE_CHUNK: BitChunk = BitChunk(!0); +pub const FALSE_CHUNK: BitChunk = BitChunk(0); #[test] fn matrix_test() { @@ -322,22 +416,33 @@ fn matrix_test() { m.set([40, 1].into()); m.set([40, 2].into()); m.set([40, 0].into()); - println!("{:?}", &m); + println!("{:#?}", &m); - m.batch_mut(|p| p[0] = TRUE); - println!("{:?}", &m); + m.batch_mut(|p| p[0] = TRUE_CHUNK); + println!("{:#?}", &m); for i in (0..40).step_by(7) { m.unset([i, 0].into()); } m.unset([62, 0].into()); - println!("{:?}", &m); + println!("{:#?}", &m); - m.batch_mut(move |p| p[1] = p[0] ^ TRUE); - println!("{:?}", &m); + m.batch_mut(move |p| p[1] = p[0] ^ TRUE_CHUNK); + println!("{:#?}", &m); let mut buf = vec![]; for index in m.iter_entities_where(&mut buf, move |p| p[1]) { println!("index {}", index); } + for index in m.iter_entities_where_rev(&mut buf, move |p| p[1]) { + println!("index {}", index); + } +} + +#[test] +fn bit_chunk_iter_rev() { + let x = &[0b1, 0b1000011, 0, 0, 0b101]; + for i in BitChunkIterRev::new(x.iter().copied()) { + println!("i = {:?}", i); + } }