From bf4c0ee5ba65b15be6083517ba501b66c05b6640 2022-01-25 16:46:08 From: mh Date: 2022-01-25 16:46:08 Subject: [PATCH] WIP: Changed component creation logic once more --- diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 0d2fd6149906e06134d68780b528a22d5a1101b9..7f84183fdc9885c463fad673a24142500abeceb7 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -29,7 +29,7 @@ pub enum PortKind { Getter, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PortState { Open, Blocked, @@ -189,6 +189,16 @@ impl Message { return None, } } + + pub(crate) fn modify_target_port(&mut self, port_id: PortId) { + match self { + Message::Data(v) => + v.data_header.target_port = port_id, + Message::Control(v) => + v.target_port_id = Some(port_id), + Message::Sync(_) => unreachable!(), // should never be called for this message type + } + } } diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 412526fa881f5b902fa8cbb023a8b80070be17e5..283de1a84b879d95adbb7446ff8bbee7bca11b4d 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -1,4 +1,5 @@ use crate::protocol::*; +use crate::protocol::ast::DefinitionId; use crate::protocol::eval::{ PortId as EvalPortId, Prompt, ValueGroup, Value, @@ -28,10 +29,10 @@ pub struct CompCtx { pub port_id_counter: u32, } -impl Default for CompCtx { - fn default() -> Self { +impl CompCtx { + pub(crate) fn new(reservation: &CompReserved) -> Self { return Self{ - id: CompId(0), + id: reservation.id(), ports: Vec::new(), peers: Vec::new(), messages: Vec::new(), @@ -46,6 +47,8 @@ struct MessageView<'a> { } impl CompCtx { + /// Creates a new channel that is fully owned by the component associated + /// with this context. fn create_channel(&mut self) -> Channel { let putter_id = PortId(self.take_port_id()); let getter_id = PortId(self.take_port_id()); @@ -67,6 +70,63 @@ impl CompCtx { return Channel{ putter_id, getter_id }; } + /// Adopts a port transferred by another component. Essentially copies all + /// port data but creates a new ID. Caller should ensure that the other + /// endpoint becomes aware of this ID. + fn adopt_port(&mut self, to_transfer: &Port) -> &mut Port { + let port_id = PortId(self.take_port_id()); + let port_index = self.ports.len(); + self.ports.push(Port{ + self_id: port_id, + peer_id: to_transfer.peer_id, + kind: to_transfer.kind, + state: to_transfer.state, + peer_comp_id: to_transfer.peer_comp_id, + }); + return &mut self.ports[port_index]; + } + + /// Adds a peer (or increments the "associated port" counter). Hence caller + /// must make sure that this makes sense. + fn add_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId, peer_handle: Option<&CompHandle>) { + match self.get_peer_index(peer_id) { + Some(peer_index) => { + let peer_info = &mut self.peers[peer_index]; + peer_info.num_associated_ports += 1; + }, + None => { + let handle = if let Some(handle) = peer_handle { + handle.clone() + } else { + sched_ctx.runtime.get_component_public(peer_id) + }; + + self.peers.push(Peer{ + id: peer_id, + num_associated_ports: 1, + handle, + }) + } + } + } + + /// Removes a peer (or decrements the "associated port" counter). If there + /// are no more references to the peer then the handle will be destroyed. + fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId) { + let peer_index = self.get_peer_index(peer_id).unwrap(); + let peer_info = &mut self.peers[peer_index]; + peer_info.num_associated_ports -= 1; + + if peer_info.num_associated_ports == 0 { + let mut peer = self.peers.remove(peer_index); + let should_remove = peer.handle.decrement_users(); + if should_remove { + let key = unsafe{ peer.id.upgrade() }; + sched_ctx.runtime.destroy_component(key); + } + } + } + pub(crate) fn get_port(&self, port_id: PortId) -> &Port { let index = self.get_port_index(port_id).unwrap(); return &self.ports[index]; @@ -237,9 +297,9 @@ impl CompPDL { } } - pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { sched_ctx.log(&format!("handling message: {:?}", message)); - if let Some(new_target) = self.control.should_reroute(&message) { + if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); @@ -333,15 +393,10 @@ impl CompPDL { }, EC::NewComponent(definition_id, monomorph_idx, arguments) => { debug_assert_eq!(self.mode, Mode::NonSync); - - let mut ports = Vec::new(); // TODO: Optimize - let protocol = &sched_ctx.runtime.protocol; - find_ports_in_value_group(&arguments, &mut ports); - let prompt = Prompt::new( - &protocol.types, &protocol.heap, + self.create_component_and_transfer_ports2( + sched_ctx, comp_ctx, definition_id, monomorph_idx, arguments ); - self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &ports); return Ok(CompScheduling::Requeue); }, EC::NewChannel => { @@ -635,77 +690,134 @@ impl CompPDL { } } - fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) { - let component = CompPDL::new(prompt, ports.len()); - let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true); - let created_ctx = &mut component.ctx; - - let mut has_reroute_entry = false; - let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); - - for port_id in ports.iter().copied() { - // Create temporary reroute entry if the peer is another component - let port_info = creator_ctx.get_port(port_id); - debug_assert_ne!(port_info.state, PortState::Blocked); - if port_info.peer_comp_id == creator_ctx.id { - // We own the peer port. So retrieve it and modify the peer directly - let peer_port_id = port_info.peer_id; - let port_info = creator_ctx.get_port_mut(peer_port_id); - port_info.peer_comp_id = created_ctx.id; - Self::add_peer_associated_port_to_component(sched_ctx, creator_ctx, created_ctx.id); + fn create_component_and_transfer_ports2( + &mut self, + sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, + definition_id: DefinitionId, monomorph_index: i32, mut arguments: ValueGroup + ) { + struct PortPair{ creator: PortId, created: PortId } + let mut port_id_pairs = Vec::new(); + + let reservation = sched_ctx.runtime.start_create_pdl_component(); + let mut created_ctx = CompCtx::new(&reservation); + + // Take all the ports ID that are in the `args` (and currently belong to + // the creator component) and translate them into new IDs that are + // associated with the component we're about to create + let mut arg_iter = ValueGroupIter::new(&mut arguments); + while let Some(port_reference) = arg_iter.next() { + // Create port entry for new component + let creator_port_id = port_reference.id; + let creator_port = creator_ctx.get_port(creator_port_id); + let created_port = created_ctx.adopt_port(creator_port); + let created_port_id = created_port.self_id; + + port_id_pairs.push(PortPair{ + creator: creator_port_id, + created: created_port_id, + }); + + // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) + let arg_value = if let Some(heap_pos) = port_reference.heap_pos { + &mut arg_iter.group.regions[heap_pos][port_reference.index] } else { - // We don't own the peer port, so send the appropriate messages - // to the peer component and notify the control layer - has_reroute_entry = true; - let message = self.control.add_reroute_entry( - creator_ctx.id, port_info.peer_id, port_info.peer_comp_id, - port_info.self_id, created_ctx.id, schedule_entry_id - ); - let peer_info = creator_ctx.get_peer(port_info.peer_comp_id); - peer_info.handle.send_message(sched_ctx, message, true); + &mut arg_iter.group.values[port_reference.index] + }; + match arg_value { + Value::Input(id) => *id = port_id_to_eval(created_port_id), + Value::Output(id) => *id = port_id_to_eval(created_port_id), + _ => unreachable!(), } + } - // Take out any potential messages for the peer - let creator_port_index = creator_ctx.get_port_index(port_id).unwrap(); - let port_main_message = self.inbox_main[creator_port_index].take(); + // For each transferred port pair set their peer components to the + // correct values. This will only change the values for the ports of + // the new component. + let mut created_component_has_remote_peers = false; + + for pair in port_id_pairs.iter() { + let creator_port_info = creator_ctx.get_port(pair.creator); + let created_port_info = created_ctx.get_port_mut(pair.created); + + if created_port_info.peer_comp_id == creator_ctx.id { + // Port peer is owned by the creator as well + let created_peer_port_index = port_id_pairs + .iter() + .position(|v| v.creator == creator_port_info.peer_id); + match created_peer_port_index { + Some(created_peer_port_index) => { + // Peer port moved to the new component as well + let peer_pair = &port_id_pairs[created_peer_port_index]; + created_port_info.peer_id = peer_pair.created; + created_port_info.peer_comp_id = reservation.id(); + }, + None => { + // Peer port remains with creator component. + created_port_info.peer_comp_id = creator_ctx.id; + created_ctx.add_peer(sched_ctx, creator_ctx.id, None); + } + } + } else { + // Peer is a different component + let peer_info = creator_ctx.get_peer(created_port_info.peer_comp_id); + created_ctx.add_peer(sched_ctx, peer_info.id, Some(&peer_info.handle)); + created_component_has_remote_peers = true; + } + } - // Transfer port and create temporary reroute entry - let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); - if port_info.state == PortState::Blocked { - todo!("Think about this when you're not tired!"); + // We'll now actually turn our reservation for a new component into an + // actual component. Note that we initialize it as "not sleeping" as + // its initial scheduling might be performed based on `Ack`s in response + // to message exchanges between remote peers. + let prompt = Prompt::new( + &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, + definition_id, monomorph_index, arguments, + ); + let component = CompPDL::new(prompt, port_id_pairs.len()); + let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( + reservation, component, created_ctx, false, + ); + let created_ctx = &component.ctx; + + // Now modify the creator's ports: remove every transferred port and + // potentially remove the peer component + for pair in port_id_pairs.iter() { + let creator_port_index = creator_ctx.get_port_index(pair.creator).unwrap(); + let creator_port_info = creator_ctx.ports.remove(creator_port_index); + if creator_port_info.peer_comp_id != creator_ctx.id { + creator_ctx.remove_peer(sched_ctx, creator_port_info.peer_comp_id); } - Self::add_port_to_component(sched_ctx, created_ctx, port_info); - - // Transfer the taken messages - let created_port_index = created_ctx.get_port_index(port_id).unwrap(); - component.code.inbox_main[created_port_index] = port_main_message; - let mut message_index = 0; - while message_index < self.inbox_backup.len() { - if self.inbox_backup[message_index].data_header.target_port == port_id { - // Move this message - let message = self.inbox_backup.remove(message_index); - component.code.inbox_backup.push(message); - } else { - message_index += 1; - } + + let created_port_info = created_ctx.get_port(pair.created); + if created_port_info.peer_comp_id == creator_ctx.id { + // This is the cause where the creator obtains a reference + // to the created component + let peer_port_info = creator_ctx.get_port_mut(created_port_info.peer_id); + peer_port_info.peer_comp_id = created_ctx.id; + creator_ctx.add_peer(sched_ctx, created_ctx.id, None); } + } - // Maybe remove peer from the creator - if let Some(mut peer_info) = peer_info { - let remove_from_runtime = peer_info.handle.decrement_users(); - if remove_from_runtime { - let removed_comp_key = unsafe{ peer_info.id.upgrade() }; - sched_ctx.runtime.destroy_component(removed_comp_key); + // By now all ports have been transferred. We'll now do any of the setup + // for rerouting/messaging + if created_component_has_remote_peers { + let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); + for pair in port_id_pairs.iter() { + let port_info = created_ctx.get_port(pair.created); + if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { + let message = self.control.add_reroute_entry( + creator_ctx.id, port_info.peer_id, port_info.peer_comp_id, + pair.creator, pair.created, created_ctx.id, + schedule_entry_id + ); + let peer_info = created_ctx.get_peer(port_info.peer_comp_id); + peer_info.handle.send_message(sched_ctx, message, true); } } + } else { + // Peer can be scheduled immediately + sched_ctx.runtime.enqueue_work(created_key); } - - if !has_reroute_entry { - // We can schedule the component immediately - self.control.remove_schedule_entry(schedule_entry_id); - component.public.sleeping.store(false, std::sync::atomic::Ordering::Release); - sched_ctx.runtime.enqueue_work(comp_key); - } // else: wait for the `Ack`s, they will trigger the scheduling of the component } /// Removes a port from a component. Also decrements the port counter in @@ -735,16 +847,6 @@ impl CompPDL { return (port_info, Some(peer_info)); } - /// Adds a port to the component context. The peer (or its counter) will be - /// updated accordingly. - fn add_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_info: Port) { - // Add the port info - let peer_comp_id = port_info.peer_comp_id; - debug_assert!(!comp_ctx.ports.iter().any(|v| v.self_id == port_info.self_id)); - comp_ctx.ports.push(port_info); - Self::add_peer_associated_port_to_component(sched_ctx, comp_ctx, peer_comp_id); - } - /// Only adds/updates a peer for a given port. This function assumes (but /// does not check!) that the port was not considered to belong to that peer /// before calling this function. @@ -834,4 +936,82 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve for value in &value_group.values { find_port_in_value(value_group, value, ports); } +} + +struct ValueGroupIter<'a> { + group: &'a mut ValueGroup, + heap_stack: Vec<(usize, usize)>, + index: usize, +} + +impl<'a> ValueGroupIter<'a> { + fn new(group: &'a mut ValueGroup) -> Self { + return Self{ group, heap_stack: Vec::new(), index: 0 } + } +} + +struct ValueGroupPortRef { + id: PortId, + heap_pos: Option, // otherwise: on stack + index: usize, +} + +impl<'a> Iterator for ValueGroupIter<'a> { + type Item = ValueGroupPortRef; + + fn next(&mut self) -> Option { + // Enter loop that keeps iterating until a port is found + loop { + if let Some(pos) = self.heap_stack.last() { + let (heap_pos, region_index) = *pos; + if region_index >= self.group.regions[heap_pos].len() { + self.heap_stack.pop(); + continue; + } + + let value = &self.group.regions[heap_pos][region_index]; + self.heap_stack.last_mut().unwrap().1 += 1; + + match value { + Value::Input(id) | Value::Output(id) => { + let id = PortId(id.id); + return Some(ValueGroupPortRef{ + id, + heap_pos: Some(heap_pos), + index: region_index, + }); + }, + _ => {}, + } + + if let Some(heap_pos) = value.get_heap_pos() { + self.heap_stack.push((heap_pos as usize, 0)); + } + } else { + if self.index >= self.group.values.len() { + return None; + } + + let value = &mut self.group.values[self.index]; + self.index += 1; + + match value { + Value::Input(id) | Value::Output(id) => { + let id = PortId(id.id); + return Some(ValueGroupPortRef{ + id, + heap_pos: None, + index: self.index - 1 + }); + }, + _ => {}, + } + + // Not a port, check if we need to enter a heap region + if let Some(heap_pos) = value.get_heap_pos() { + self.heap_stack.push((heap_pos as usize, 0)); + } // else: just consider the next value + } + } + } } \ No newline at end of file diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index e3b2fe6ed59fa15590522db25db480dca6c7dfe8..86d9e8f1f040229c5a56a9336370a13ee9327aa6 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -29,7 +29,8 @@ enum ControlContent { struct ContentPeerChange { source_port: PortId, source_comp: CompId, - target_port: PortId, + old_target_port: PortId, + new_target_port: PortId, new_target_comp: CompId, schedule_entry_id: ControlId, } @@ -47,7 +48,7 @@ pub(crate) struct ControlLayer { } impl ControlLayer { - pub(crate) fn should_reroute(&self, message: &Message) -> Option { + pub(crate) fn should_reroute(&self, message: &mut Message) -> Option { // Safety note: rerouting should occur during the time when we're // notifying a peer of a new component. During this period that // component hasn't been executed yet, so cannot have died yet. @@ -60,7 +61,8 @@ impl ControlLayer { let target_port = target_port.unwrap(); for entry in &self.entries { if let ControlContent::PeerChange(entry) = &entry.content { - if entry.target_port == target_port { + if entry.old_target_port == target_port { + message.modify_target_port(entry.new_target_port); return Some(entry.new_target_comp); } } @@ -91,7 +93,7 @@ impl ControlLayer { sender_comp_id: comp_ctx.id, target_port_id: Some(content.source_port), content: ControlMessageContent::PortPeerChangedUnblock( - content.source_port, + content.new_target_port, content.new_target_comp ) }; @@ -159,7 +161,7 @@ impl ControlLayer { pub(crate) fn add_reroute_entry( &mut self, creator_comp_id: CompId, source_port_id: PortId, source_comp_id: CompId, - target_port_id: PortId, new_comp_id: CompId, + old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId, schedule_entry_id: ControlId, ) -> Message { let entry_id = self.take_id(); @@ -169,7 +171,8 @@ impl ControlLayer { content: ControlContent::PeerChange(ContentPeerChange{ source_port: source_port_id, source_comp: source_comp_id, - target_port: target_port_id, + old_target_port: old_target_port_id, + new_target_port: new_target_port_id, new_target_comp: new_comp_id, schedule_entry_id, }), diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 24844a612a6856a7090dc72c3948aeae6e1626b8..9b4c2ad4eda55e0eaf97a0b220c8417f253acc8e 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -7,7 +7,7 @@ use crate::runtime2::component::wake_up_if_sleeping; use super::communication::Message; use super::component::{CompCtx, CompPDL}; -use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer}; +use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer}; use super::scheduler::*; // ----------------------------------------------------------------------------- @@ -43,6 +43,17 @@ impl CompId { } } +/// Handle to a component that is being created. +pub(crate) struct CompReserved { + reservation: ComponentReservation, +} + +impl CompReserved { + pub(crate) fn id(&self) -> CompId { + return CompId(self.reservation.index) + } +} + /// Private fields of a component, may only be modified by a single thread at /// a time. pub(crate) struct RuntimeComp { @@ -208,10 +219,42 @@ impl RuntimeInner { // Creating/destroying components + pub(crate) fn start_create_pdl_component(&self) -> CompReserved { + self.increment_active_components(); + let reservation = self.components.reserve(); + return CompReserved{ reservation }; + } + + pub(crate) fn finish_create_pdl_component( + &self, reserved: CompReserved, + component: CompPDL, mut context: CompCtx, initially_sleeping: bool, + ) -> (CompKey, &mut RuntimeComp) { + let inbox_queue = QueueDynMpsc::new(16); + let inbox_producer = inbox_queue.producer(); + + let _id = reserved.id(); + context.id = reserved.id(); + let component = RuntimeComp { + public: CompPublic{ + sleeping: AtomicBool::new(initially_sleeping), + num_handles: AtomicU32::new(1), // the component itself acts like a handle + inbox: inbox_producer, + }, + code: component, + ctx: context, + inbox: inbox_queue, + }; + + let index = self.components.submit(reserved.reservation, component); + debug_assert_eq!(index, _id.0); + let component = self.components.get_mut(index); + + return (CompKey(index), component); + } + /// Creates a new component. Note that the public part will be properly - /// initialized, but the private fields (e.g. owned ports, peers, etc.) - /// are not. - pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) { + /// initialized, but not all private fields are. + pub(crate) fn create_pdl_component(&self, comp: CompPDL, ctx: CompCtx, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) { let inbox_queue = QueueDynMpsc::new(16); let inbox_producer = inbox_queue.producer(); let comp = RuntimeComp{ @@ -221,7 +264,7 @@ impl RuntimeInner { inbox: inbox_producer, }, code: comp, - ctx: CompCtx::default(), + ctx, inbox: inbox_queue, }; diff --git a/src/runtime2/store/component.rs b/src/runtime2/store/component.rs index d5dc283cdb2a376c8286b7eef849feb9f4c85c66..cd35878591ace87d2922fd5483a73d85c508fdd8 100644 --- a/src/runtime2/store/component.rs +++ b/src/runtime2/store/component.rs @@ -43,6 +43,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard}; +/// Generic store of components. Essentially a resizable freelist (implemented +/// as a ringbuffer) combined with an array of actual elements. pub struct ComponentStore { inner: UnfairSeLock>, read_head: AtomicUsize, @@ -53,6 +55,8 @@ pub struct ComponentStore { unsafe impl Send for ComponentStore{} unsafe impl Sync for ComponentStore{} +/// Contents of the `ComponentStore` that require a shared/exclusive locking +/// mechanism for consistency. struct Inner { freelist: Vec, data: Vec<*mut T>, @@ -63,6 +67,29 @@ struct Inner { type InnerShared<'a, T> = UnfairSeLockSharedGuard<'a, Inner>; +/// Reservation of a slot in the component store. Corresponds to the case where +/// an index has been taken from the freelist, but the element has not yet been +/// initialized +pub struct ComponentReservation { + pub(crate) index: u32, + #[cfg(debug_assertions)] submitted: bool, +} + +impl ComponentReservation { + fn new(index: u32) -> Self { + return Self{ + index, + #[cfg(debug_assertions)] submitted: false, + } + } +} + +impl Drop for ComponentReservation { + fn drop(&mut self) { + debug_assert!(self.submitted); + } +} + impl ComponentStore { pub fn new(initial_size: usize) -> Self { Self::assert_valid_size(initial_size); @@ -96,10 +123,23 @@ impl ComponentStore { pub fn create(&self, value: T) -> u32 { let lock = self.inner.lock_shared(); let (lock, index) = self.pop_freelist_index(lock); - self.initialize_at_index(lock, index, value); + Self::initialize_at_index(lock, index, value); return index; } + pub fn reserve(&self) -> ComponentReservation { + let lock = self.inner.lock_shared(); + let (lock, index) = self.pop_freelist_index(lock); + return ComponentReservation::new(index); + } + + pub fn submit(&self, mut reservation: ComponentReservation, value: T) -> u32 { + dbg_code!({ reservation.submitted = true; }); + let lock = self.inner.lock_shared(); + Self::initialize_at_index(lock, reservation.index, value); + return reservation.index; + } + /// Destroys an element at the provided `index`. The caller must make sure /// that it does not use any previously received references to the data at /// this index, and that no more calls to `get` are performed using this @@ -163,7 +203,7 @@ impl ComponentStore { } #[inline] - fn initialize_at_index(&self, read_lock: InnerShared, index: u32, value: T) { + fn initialize_at_index(read_lock: InnerShared, index: u32, value: T) { let mut target_ptr = read_lock.data[index as usize]; unsafe { diff --git a/src/runtime2/store/mod.rs b/src/runtime2/store/mod.rs index 6f8669b4a8264897378c32714cafa33f69901b99..4a98436f426f3914a1784eb7a8eb9353e1861b52 100644 --- a/src/runtime2/store/mod.rs +++ b/src/runtime2/store/mod.rs @@ -6,5 +6,5 @@ pub mod unfair_se_lock; pub mod component; pub mod queue_mpsc; -pub(crate) use component::ComponentStore; +pub(crate) use component::{ComponentStore, ComponentReservation}; pub(crate) use queue_mpsc::{QueueDynMpsc, QueueDynProducer}; \ No newline at end of file diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 5523acfc876170b88d69435262ec2e3956e6fd19..00483c0c9a5217fe43870aa270e3e618ffad4fe0 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -1,7 +1,19 @@ use crate::protocol::*; use crate::protocol::eval::*; use crate::runtime2::runtime::*; -use crate::runtime2::component::CompPDL; +use crate::runtime2::component::{CompCtx, CompPDL}; + +fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) { + let prompt = rt.inner.protocol.new_component( + module_name.as_bytes(), routine_name.as_bytes(), args + ).expect("create prompt"); + let reserved = rt.inner.start_create_pdl_component(); + let ctx = CompCtx::new(&reserved); + let (key, _) = rt.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false); + rt.inner.enqueue_work(key); +} + +fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) } #[test] fn test_component_creation() { @@ -14,11 +26,7 @@ fn test_component_creation() { let rt = Runtime::new(1, pd); for i in 0..20 { - let prompt = rt.inner.protocol.new_component(b"", b"nothing_at_all", ValueGroup::new_stack(Vec::new())) - .expect("component creation"); - let comp = CompPDL::new(prompt, 0); - let (key, _) = rt.inner.create_pdl_component(comp, false); - rt.inner.enqueue_work(key); + create_component(&rt, "", "nothing_at_all", no_args()); } } @@ -44,8 +52,5 @@ fn test_component_communication() { ").expect("compilation"); let rt = Runtime::new(1, pd); - let prompt = rt.inner.protocol.new_component(b"", b"constructor", ValueGroup::new_stack(Vec::new())) - .expect("creation"); - let (key, _) = rt.inner.create_pdl_component(CompPDL::new(prompt, 0), false); - rt.inner.enqueue_work(key); + create_component(&rt, "", "constructor", no_args()); } \ No newline at end of file