From 9e771c9cf8d3a805279537378a07af20fd5f0a82 2022-01-12 09:41:07 From: MH Date: 2022-01-12 09:41:07 Subject: [PATCH] WIP: Control messaging between components --- diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index f3397eeba71f6fff803b9329a782911f1e2652ee..4fee962a9b41daa870d84b2b60e71585d5b630a3 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -1,3 +1,4 @@ +use crate::protocol::eval::*; use super::runtime::*; #[derive(Copy, Clone)] @@ -11,6 +12,7 @@ impl PortId { pub struct Peer { pub id: CompId, + pub num_associated_ports: u32, pub(crate) handle: CompHandle, } @@ -21,6 +23,7 @@ pub enum PortKind { pub enum PortState { Open, + Blocked, Closed, } @@ -29,10 +32,36 @@ pub struct Port { pub peer_id: PortId, pub kind: PortKind, pub state: PortState, - pub local_peer_index: u32, + pub peer_comp_id: CompId, +} + +pub struct Channel { + pub putter_id: PortId, + pub getter_id: PortId, +} + +pub struct DataMessage { + pub source_port_id: PortId, + pub target_port_id: PortId, + pub content: ValueGroup, +} + +pub struct ControlMessage { + pub id: u32, + pub sender_comp_id: CompId, + pub content: ControlContent, +} + +pub enum ControlContent { + Ack, + Ping, + PortPeerChangedBlock, + PortPeerChangedUnblock, +} + +pub enum Message { + Data(DataMessage), + Control(ControlMessage), } -/// Public inbox: accessible by all threads. Essentially a MPSC channel -pub struct InboxPublic { -} \ No newline at end of file diff --git a/src/runtime2/component.rs b/src/runtime2/component.rs deleted file mode 100644 index 654c6ce2d4b6af1b9d4de3130ee304b4e0620985..0000000000000000000000000000000000000000 --- a/src/runtime2/component.rs +++ /dev/null @@ -1,216 +0,0 @@ -use crate::protocol::*; -use crate::protocol::eval::{ - PortId as EvalPortId, Prompt, - ValueGroup, Value, - EvalContinuation, EvalResult, EvalError -}; - -use super::runtime::*; -use super::scheduler::SchedulerCtx; -use super::communication::*; - -pub enum CompScheduling { - Immediate, - Requeue, - Sleep, - Exit, -} - -pub struct CompCtx { - pub id: CompId, - pub ports: Vec, - pub peers: Vec, - pub messages: Vec, // same size as "ports" -} - -impl CompCtx { - fn take_message(&mut self, port_id: PortId) -> Option { - let old_value = &mut self.messages[port_id.0 as usize]; - if old_value.values.is_empty() { - return None; - } - - // Replace value in array with an empty one - let mut message = ValueGroup::new_stack(Vec::new()); - std::mem::swap(old_value, &mut message); - return Some(message); - } - - fn find_peer(&self, port_id: PortId) -> &Peer { - let port_info = &self.ports[port_id.0 as usize]; - let peer_info = &self.peers[port_info.local_peer_index as usize]; - return peer_info; - } -} - -pub enum ExecStmt { - CreatedChannel((Value, Value)), - PerformedPut, - PerformedGet(ValueGroup), - None, -} - -impl ExecStmt { - fn take(&mut self) -> ExecStmt { - let mut value = ExecStmt::None; - std::mem::swap(self, &mut value); - return value; - } - - fn is_none(&self) -> bool { - match self { - ExecStmt::None => return true, - _ => return false, - } - } -} - -pub struct ExecCtx { - stmt: ExecStmt, -} - -impl RunContext for ExecCtx { - fn performed_put(&mut self, _port: EvalPortId) -> bool { - match self.stmt.take() { - ExecStmt::None => return false, - ExecStmt::PerformedPut => return true, - _ => unreachable!(), - } - } - - fn performed_get(&mut self, _port: EvalPortId) -> Option { - match self.stmt.take() { - ExecStmt::None => return None, - ExecStmt::PerformedGet(value) => return Some(value), - _ => unreachable!(), - } - } - - fn fires(&mut self, _port: EvalPortId) -> Option { - todo!("remove fires") - } - - fn performed_fork(&mut self) -> Option { - todo!("remove fork") - } - - fn created_channel(&mut self) -> Option<(Value, Value)> { - match self.stmt.take() { - ExecStmt::None => return None, - ExecStmt::CreatedChannel(ports) => return Some(ports), - _ => unreachable!(), - } - } -} - -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub(crate) enum Mode { - NonSync, - Sync, - BlockedGet, - BlockedPut, -} - -pub(crate) struct CompPDL { - pub mode: Mode, - pub mode_port: PortId, // when blocked on a port - pub mode_value: ValueGroup, // when blocked on a put - pub prompt: Prompt, - pub exec_ctx: ExecCtx, -} - -impl CompPDL { - pub(crate) fn new(initial_state: Prompt) -> Self { - return Self{ - mode: Mode::NonSync, - mode_port: PortId::new_invalid(), - mode_value: ValueGroup::default(), - prompt: initial_state, - exec_ctx: ExecCtx{ - stmt: ExecStmt::None, - } - } - } - - pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { - use EvalContinuation as EC; - - let run_result = self.execute_prompt(&sched_ctx)?; - - match run_result { - EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned - EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), - // Results that can be returned in sync mode - EC::SyncBlockEnd => { - debug_assert_eq!(self.mode, Mode::Sync); - self.handle_sync_end(sched_ctx, comp_ctx); - }, - EC::BlockGet(port_id) => { - debug_assert_eq!(self.mode, Mode::Sync); - - let port_id = transform_port_id(port_id); - if let Some(message) = comp_ctx.take_message(port_id) { - // We can immediately receive and continue - debug_assert!(self.exec_ctx.stmt.is_none()); - self.exec_ctx.stmt = ExecStmt::PerformedGet(message); - return Ok(CompScheduling::Immediate); - } else { - // We need to wait - self.mode = Mode::BlockedGet; - self.mode_port = port_id; - return Ok(CompScheduling::Sleep); - } - }, - EC::Put(port_id, value) => { - debug_assert_eq!(self.mode, Mode::Sync); - - let port_id = transform_port_id(port_id); - let peer = comp_ctx.find_peer(port_id); - }, - // Results that can be returned outside of sync mode - EC::ComponentTerminated => { - debug_assert_eq!(self.mode, Mode::NonSync); - - }, - EC::SyncBlockStart => { - debug_assert_eq!(self.mode, Mode::NonSync); - self.handle_sync_start(sched_ctx, comp_ctx); - }, - EC::NewComponent(definition_id, monomorph_idx, arguments) => { - debug_assert_eq!(self.mode, Mode::NonSync); - - }, - EC::NewChannel => { - debug_assert_eq!(self.mode, Mode::NonSync); - - } - } - - return Ok(CompScheduling::Sleep); - } - - fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { - let mut step_result = EvalContinuation::Stepping; - while let EvalContinuation::Stepping = step_result { - step_result = self.prompt.step( - &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, - &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx, - )?; - } - - return Ok(step_result) - } - - fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - - } - - fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - - } -} - -#[inline] -fn transform_port_id(port_id: EvalPortId) -> PortId { - return PortId(port_id.id); -} \ No newline at end of file diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs new file mode 100644 index 0000000000000000000000000000000000000000..48f002ef7e5f0619d249bdcee564d62977161b9f --- /dev/null +++ b/src/runtime2/component/component_pdl.rs @@ -0,0 +1,430 @@ +use crate::protocol::*; +use crate::protocol::eval::{ + PortId as EvalPortId, Prompt, + ValueGroup, Value, + EvalContinuation, EvalResult, EvalError +}; + +use crate::runtime2::store::QueueDynMpsc; +use crate::runtime2::runtime::*; +use crate::runtime2::scheduler::SchedulerCtx; +use crate::runtime2::communication::*; + +pub enum CompScheduling { + Immediate, + Requeue, + Sleep, + Exit, +} + +pub struct CompCtx { + pub id: CompId, + pub ports: Vec, + pub peers: Vec, + pub messages: Vec, // same size as "ports" + pub port_id_counter: u32, +} + +impl Default for CompCtx { + fn default() -> Self { + return Self{ + id: CompId(0), + ports: Vec::new(), + peers: Vec::new(), + messages: Vec::new(), + port_id_counter: 0, + } + } +} + +impl CompCtx { + fn take_message(&mut self, port_id: PortId) -> Option { + let port_index = self.get_port_index(port_id).unwrap(); + let old_value = &mut self.messages[port_index]; + if old_value.values.is_empty() { + return None; + } + + // Replace value in array with an empty one + let mut message = ValueGroup::new_stack(Vec::new()); + std::mem::swap(old_value, &mut message); + return Some(message); + } + + fn find_peer(&self, port_id: PortId) -> (&Port, &Peer) { + let port_index = self.get_port_index(port_id).unwrap(); + let port_info = &self.ports[port_index]; + let peer_index = self.get_peer_index(port_info.peer_comp_id).unwrap(); + let peer_info = &self.peers[peer_index]; + return (port_info, peer_info); + } + + fn create_channel(&mut self) -> Channel { + let putter_id = PortId(self.take_port_id()); + let getter_id = PortId(self.take_port_id()); + self.ports.push(Port{ + self_id: putter_id, + peer_id: getter_id, + kind: PortKind::Putter, + state: PortState::Open, + peer_comp_id: self.id, + }); + self.ports.push(Port{ + self_id: getter_id, + peer_id: putter_id, + kind: PortKind::Getter, + state: PortState::Closed, + peer_comp_id: self.id, + }); + + return Channel{ putter_id, getter_id }; + } + + fn get_port_index(&self, port_id: PortId) -> Option { + for (index, port) in self.ports.iter().enumerate() { + if port.self_id == port_id { + return Some(index); + } + } + + return None; + } + + fn get_peer_index(&self, peer_id: CompId) -> Option { + for (index, peer) in self.peers.iter().enumerate() { + if peer.id == peer_id { + return Some(index); + } + } + + return None; + } + + fn take_port_id(&mut self) -> u32 { + let port_id = self.port_id_counter; + self.port_id_counter = self.port_id_counter.wrapping_add(1); + return port_id; + } +} + +pub enum ExecStmt { + CreatedChannel((Value, Value)), + PerformedPut, + PerformedGet(ValueGroup), + None, +} + +impl ExecStmt { + fn take(&mut self) -> ExecStmt { + let mut value = ExecStmt::None; + std::mem::swap(self, &mut value); + return value; + } + + fn is_none(&self) -> bool { + match self { + ExecStmt::None => return true, + _ => return false, + } + } +} + +pub struct ExecCtx { + stmt: ExecStmt, +} + +impl RunContext for ExecCtx { + fn performed_put(&mut self, _port: EvalPortId) -> bool { + match self.stmt.take() { + ExecStmt::None => return false, + ExecStmt::PerformedPut => return true, + _ => unreachable!(), + } + } + + fn performed_get(&mut self, _port: EvalPortId) -> Option { + match self.stmt.take() { + ExecStmt::None => return None, + ExecStmt::PerformedGet(value) => return Some(value), + _ => unreachable!(), + } + } + + fn fires(&mut self, _port: EvalPortId) -> Option { + todo!("remove fires") + } + + fn performed_fork(&mut self) -> Option { + todo!("remove fork") + } + + fn created_channel(&mut self) -> Option<(Value, Value)> { + match self.stmt.take() { + ExecStmt::None => return None, + ExecStmt::CreatedChannel(ports) => return Some(ports), + _ => unreachable!(), + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) enum Mode { + NonSync, + Sync, + BlockedGet, + BlockedPut, +} + +pub(crate) struct CompPDL { + pub mode: Mode, + pub mode_port: PortId, // when blocked on a port + pub mode_value: ValueGroup, // when blocked on a put + pub prompt: Prompt, + pub exec_ctx: ExecCtx, +} + +impl CompPDL { + pub(crate) fn new(initial_state: Prompt) -> Self { + return Self{ + mode: Mode::NonSync, + mode_port: PortId::new_invalid(), + mode_value: ValueGroup::default(), + prompt: initial_state, + exec_ctx: ExecCtx{ + stmt: ExecStmt::None, + } + } + } + + pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + use EvalContinuation as EC; + + let run_result = self.execute_prompt(&sched_ctx)?; + + match run_result { + EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned + EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), + // Results that can be returned in sync mode + EC::SyncBlockEnd => { + debug_assert_eq!(self.mode, Mode::Sync); + self.handle_sync_end(sched_ctx, comp_ctx); + }, + EC::BlockGet(port_id) => { + debug_assert_eq!(self.mode, Mode::Sync); + + let port_id = transform_port_id(port_id); + if let Some(message) = comp_ctx.take_message(port_id) { + // We can immediately receive and continue + debug_assert!(self.exec_ctx.stmt.is_none()); + self.exec_ctx.stmt = ExecStmt::PerformedGet(message); + return Ok(CompScheduling::Immediate); + } else { + // We need to wait + self.mode = Mode::BlockedGet; + self.mode_port = port_id; + return Ok(CompScheduling::Sleep); + } + }, + EC::Put(port_id, value) => { + debug_assert_eq!(self.mode, Mode::Sync); + let port_id = transform_port_id(port_id); + Self::send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); + }, + // Results that can be returned outside of sync mode + EC::ComponentTerminated => { + debug_assert_eq!(self.mode, Mode::NonSync); + return Ok(CompScheduling::Exit); + }, + EC::SyncBlockStart => { + debug_assert_eq!(self.mode, Mode::NonSync); + self.handle_sync_start(sched_ctx, comp_ctx); + }, + EC::NewComponent(definition_id, monomorph_idx, arguments) => { + debug_assert_eq!(self.mode, Mode::NonSync); + }, + EC::NewChannel => { + debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert!(self.exec_ctx.stmt.is_none()); + let channel = comp_ctx.create_channel(); + self.exec_ctx.stmt = ExecStmt::CreatedChannel(( + Value::Output(port_id_to_eval(channel.putter_id)), + Value::Input(port_id_to_eval(channel.getter_id)) + )); + return Ok(CompScheduling::Immediate); + } + } + + return Ok(CompScheduling::Sleep); + } + + fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { + let mut step_result = EvalContinuation::Stepping; + while let EvalContinuation::Stepping = step_result { + step_result = self.prompt.step( + &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, + &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx, + )?; + } + + return Ok(step_result) + } + + fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + + } + + fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + + } + + fn send_message_and_wake_up(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, port_id: PortId, value: ValueGroup) { + use std::sync::atomic::Ordering; + + let (port_info, peer_info) = comp_ctx.find_peer(port_id); + peer_info.handle.inbox.push(Message::Data(DataMessage{ + source_port_id: port_id, + target_port_id: port_info.peer_id, + content: value, + })); + + let should_wake_up = peer_info.handle.sleeping.compare_exchange( + true, false, Ordering::AcqRel, Ordering::Relaxed + ).is_ok(); + + if should_wake_up { + let comp_key = unsafe{ peer_info.id.upgrade() }; + sched_ctx.runtime.enqueue_work(comp_key); + } + } + + fn create_component_and_transfer_ports(sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) { + let component = CompPDL::new(prompt); + let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true); + let created_ctx = &mut component.ctx; + + for port_id in ports.iter().copied() { + // Transfer port + let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); + Self::add_port_to_component(sched_ctx, created_ctx, port_info); + + // Maybe remove peer from the creator + if let Some(peer_info) = peer_info { + let remove_from_runtime = peer_info.handle.decrement_users(); + if remove_from_runtime { + let removed_comp_key = unsafe{ peer_info.id.upgrade() }; + sched_ctx.runtime.destroy_component(removed_comp_key); + } + } + } + + // Start scheduling + sched_ctx.runtime.enqueue_work(comp_key); + } + + /// Removes a port from a component. Also decrements the port counter in + /// the peer component's entry. If that hits 0 then it will be removed and + /// returned. If returned then the caller is responsible for decrementing + /// the atomic counters of the peer component's handle. + fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option) { + use std::sync::atomic::Ordering; + + let port_index = comp_ctx.get_port_index(port_id).unwrap(); + let port_info = comp_ctx.ports.remove(port_index); + + // If the component owns the peer, then we don't have to decrement the + // number of peers (because we don't have an entry for ourselves) + if port_info.peer_comp_id == comp_ctx.id { + return (port_info, None); + } + + let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap(); + let peer_info = &mut comp_ctx.peers[peer_index]; + peer_info.num_associated_ports -= 1; + + // Check if we still have other ports referencing this peer + if peer_info.num_associated_ports != 0 { + return (port_info, None); + } + + let peer_info = comp_ctx.peers.remove(peer_index); + return (port_info, Some(peer_info)); + } + + fn add_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_info: Port) { + // Add the port info + let peer_comp_id = port_info.peer_comp_id; + debug_assert!(!comp_ctx.ports.iter().any(|v| v.self_id == port_info.self_id)); + comp_ctx.ports.push(port_info); + + // Increment counters on peer, or create entry for peer if it doesn't + // exist yet. + match comp_ctx.peers.iter().position(|v| v.id == peer_comp_id) { + Some(peer_index) => { + let peer_info = &mut comp_ctx.peers[peer_index]; + peer_info.num_associated_ports += 1; + }, + None => { + let handle = sched_ctx.runtime.get_component_public(peer_comp_id); + handle.increment_users(); + comp_ctx.peers.push(Peer{ + id: peer_comp_id, + num_associated_ports: 1, + handle, + }); + } + } + } +} + +#[inline] +fn port_id_from_eval(port_id: EvalPortId) -> PortId { + return PortId(port_id.id); +} + +#[inline] +fn port_id_to_eval(port_id: PortId) -> EvalPortId { + return EvalPortId{ id: port_id.0 }; +} + +/// Recursively goes through the value group, attempting to find ports. +/// Duplicates will only be added once. +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { + // Helper to check a value for a port and recurse if needed. + use crate::protocol::eval::Value; + + fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { + match value { + Value::Input(port_id) | Value::Output(port_id) => { + // This is an actual port + let cur_port = PortId(port_id.id); + for prev_port in ports.iter() { + if *prev_port == cur_port { + // Already added + return; + } + } + + ports.push(cur_port); + }, + Value::Array(heap_pos) | + Value::Message(heap_pos) | + Value::String(heap_pos) | + Value::Struct(heap_pos) | + Value::Union(_, heap_pos) => { + // Reference to some dynamic thing which might contain ports, + // so recurse + let heap_region = &group.regions[*heap_pos as usize]; + for embedded_value in heap_region { + find_port_in_value(group, embedded_value, ports); + } + }, + _ => {}, // values we don't care about + } + } + + // Clear the ports, then scan all the available values + ports.clear(); + for value in &value_group.values { + find_port_in_value(value_group, value, ports); + } +} \ No newline at end of file diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs new file mode 100644 index 0000000000000000000000000000000000000000..75bd7c1f096fa10c0b355d0c4ea5577bab7eedcd --- /dev/null +++ b/src/runtime2/component/control_layer.rs @@ -0,0 +1,52 @@ +use crate::runtime2::runtime::*; +use crate::runtime2::communication::*; +use crate::runtime2::component::*; + +struct ControlEntry { + id: u32, + ack_countdown: u32, + content: ControlContent, + ack_action: ControlAction, +} + +enum ControlContent { + PeerChange(ControlPeerChange), +} + +struct ControlPeerChange { + source_port: PortId, + target_port: PortId, // if sent to this port + new_target_comp: CompId, // redirect to this component +} + +/// Action to be taken when the `Ack`s for a control entry has come in. +enum ControlAction { + Nothing, + AckOwnEntry(u32), // ack an entry we own ourselves + ScheduleComponent(CompId), // schedule a particular component for execution +} + +/// Handling/sending control messages. +pub(crate) struct ControlLayer { + id_counter: u32, + entries: Vec, +} + +impl ControlLayer { + fn handle_created_component(&mut self, creator_ctx: &CompCtx, created_ctx: &CompCtx) { + for peer in &created_ctx.peers { + // TODO: Optimize when we ourselves are the peer. + + // Create entry that will unblock the peer if it confirms that all + // of its ports have been blocked + + peer.handle.inbox.push(Message::) + } + } + + fn take_id(&mut self) -> u32 { + let id = self.id_counter; + self.id_counter += 1; + return id; + } +} \ No newline at end of file diff --git a/src/runtime2/component/mod.rs b/src/runtime2/component/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..cda01712f7e718a45a591d978e57d3b6f2c92d9c --- /dev/null +++ b/src/runtime2/component/mod.rs @@ -0,0 +1,4 @@ +mod component_pdl; +mod control_layer; + +pub(crate) use component_pdl::{CompPDL, CompCtx, CompScheduling}; \ No newline at end of file diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 0d8f13909ebaee5567dfdb99a6022e5d3bc58ba0..621b50e17daa9b070687640d9e6baab332fe03f5 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -4,8 +4,9 @@ use std::collections::VecDeque; use crate::protocol::*; +use super::communication::Message; use super::component::{CompCtx, CompPDL}; -use super::store::ComponentStore; +use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer}; // ----------------------------------------------------------------------------- // Component @@ -40,23 +41,42 @@ impl CompId { } } -/// In-runtime storage of a component +/// Private fields of a component, may only be modified by a single thread at +/// a time. pub(crate) struct RuntimeComp { pub public: CompPublic, - pub private: CompPrivate, + pub code: CompPDL, + pub ctx: CompCtx, + pub inbox: QueueDynMpsc } /// Should contain everything that is accessible in a thread-safe manner pub(crate) struct CompPublic { pub sleeping: AtomicBool, - pub num_handles: AtomicU32, // modified upon creating/dropping `CompHandle` instances + pub num_handles: AtomicU32, // manually modified (!) + pub inbox: QueueDynProducer, } -/// Handle to public part of a component. +/// Handle to public part of a component. Would be nice if we could +/// automagically manage the `num_handles` counter. But when it reaches zero we +/// need to manually remove the handle from the runtime. So be careful. pub(crate) struct CompHandle { target: *const CompPublic, } +impl CompHandle { + pub(crate) fn increment_users(&self) { + let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel); + debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed + } + + /// Returns true if the component should be destroyed + pub(crate) fn decrement_users(&self) -> bool { + let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel); + return old_count == 1; + } +} + impl std::ops::Deref for CompHandle { type Target = CompPublic; @@ -65,13 +85,6 @@ impl std::ops::Deref for CompHandle { } } -/// May contain non thread-safe fields. Accessed only by the scheduler which -/// will temporarily "own" the component. -pub(crate) struct CompPrivate { - pub code: CompPDL, - pub ctx: CompCtx, -} - // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- @@ -119,29 +132,30 @@ impl Runtime { // Creating/destroying components - pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> CompKey { + /// Creates a new component. Note that the public part will be properly + /// initialized, but the private fields (e.g. owned ports, peers, etc.) + /// are not. + pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) { + let inbox_queue = QueueDynMpsc::new(16); + let inbox_producer = inbox_queue.producer(); let comp = RuntimeComp{ public: CompPublic{ sleeping: AtomicBool::new(initially_sleeping), num_handles: AtomicU32::new(1), // the component itself acts like a handle + inbox: inbox_producer, }, - private: CompPrivate{ - code: comp, - ctx: CompCtx{ - id: CompId(0), - ports: Vec::new(), - peers: Vec::new(), - messages: Vec::new(), - } - } + code: comp, + ctx: CompCtx::default(), + inbox: inbox_queue, }; let index = self.components.create(comp); // TODO: just do a reserve_index followed by a consume_index or something - self.components.get_mut(index).private.ctx.id = CompId(index); + let component = self.components.get_mut(index); + component.ctx.id = CompId(index); - return CompKey(index); + return (CompKey(index), component); } pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp { @@ -149,9 +163,9 @@ impl Runtime { return component; } - pub(crate) fn get_component_public(&self, id: CompId) -> &CompPublic { + pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle { let component = self.components.get(id.0); - return &component.public; + return CompHandle{ target: &component.public }; } pub(crate) fn destroy_component(&self, key: CompKey) { diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 047cd25f8717f6f22e55404210a125a5d2ad56ef..bf620950f3942fc085ebf3d4db60f089afd0f2ae 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -2,6 +2,7 @@ use std::sync::atomic::Ordering; use super::component::*; use super::runtime::*; +use super::communication::*; /// Data associated with a scheduler thread pub(crate) struct Scheduler { @@ -38,7 +39,7 @@ impl Scheduler { // be re-executed immediately. let mut new_scheduling = CompScheduling::Immediate; while let CompScheduling::Immediate = new_scheduling { - new_scheduling = component.private.code.run(&scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error"); + new_scheduling = component.code.run(&scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error"); } // Handle the new scheduling diff --git a/src/runtime2/store/component.rs b/src/runtime2/store/component.rs index 52f8ace8bd4d3b0161ee35d3ce417d5e111d811c..8076e2cd65bf6b646ba448ee27f53f227f7e2ce9 100644 --- a/src/runtime2/store/component.rs +++ b/src/runtime2/store/component.rs @@ -61,7 +61,7 @@ struct Inner { index_mask: usize, } -type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner>; +type InnerShared<'a, T> = UnfairSeLockSharedGuard<'a, Inner>; impl ComponentStore { pub fn new(initial_size: usize) -> Self { @@ -133,22 +133,22 @@ impl ComponentStore { } #[inline] - fn pop_freelist_index<'a>(&'a self, mut read_lock: InnerRead<'a, T>) -> (InnerRead<'a, T>, u32) { + fn pop_freelist_index<'a>(&'a self, mut shared_lock: InnerShared<'a, T>) -> (InnerShared<'a, T>, u32) { 'attempt_read: loop { // Load indices and check for reallocation condition - let current_size = read_lock.size; + let current_size = shared_lock.size; let mut read_index = self.read_head.load(Ordering::Relaxed); let limit_index = self.limit_head.load(Ordering::Acquire); if read_index == limit_index { - read_lock = self.reallocate(current_size, read_lock); + shared_lock = self.reallocate(current_size, shared_lock); continue 'attempt_read; } loop { - let preemptive_read = read_lock.freelist[read_index & read_lock.index_mask]; + let preemptive_read = shared_lock.freelist[read_index & shared_lock.index_mask]; if let Err(actual_read_index) = self.read_head.compare_exchange( - read_index, (read_index + 1) & read_lock.compare_mask, + read_index, (read_index + 1) & shared_lock.compare_mask, Ordering::AcqRel, Ordering::Acquire ) { // We need to try again @@ -157,13 +157,13 @@ impl ComponentStore { } // If here then we performed the read - return (read_lock, preemptive_read); + return (shared_lock, preemptive_read); } } } #[inline] - fn initialize_at_index(&self, read_lock: InnerRead, index: u32, value: T) { + fn initialize_at_index(&self, read_lock: InnerShared, index: u32, value: T) { let mut target_ptr = read_lock.data[index as usize]; unsafe { @@ -179,7 +179,7 @@ impl ComponentStore { } #[inline] - fn push_freelist_index(&self, read_lock: &InnerRead, index_to_put_back: u32) { + fn push_freelist_index(&self, read_lock: &InnerShared, index_to_put_back: u32) { // Acquire an index in the freelist to which we can write let mut cur_write_index = self.write_head.load(Ordering::Relaxed); let mut new_write_index = (cur_write_index + 1) & read_lock.compare_mask; @@ -208,14 +208,14 @@ impl ComponentStore { } #[inline] - fn destruct_at_index(&self, read_lock: &InnerRead, index: u32) { + fn destruct_at_index(&self, read_lock: &InnerShared, index: u32) { let target_ptr = read_lock.data[index as usize]; unsafe{ ptr::drop_in_place(target_ptr); } } // NOTE: Bit of a mess, and could have a cleanup with better logic for the // resizing. Maybe even a different indexing scheme... - fn reallocate(&self, old_size: usize, inner: InnerRead) -> InnerRead { + fn reallocate(&self, old_size: usize, inner: InnerShared) -> InnerShared { drop(inner); { // After dropping read lock, acquire write lock diff --git a/src/runtime2/store/mod.rs b/src/runtime2/store/mod.rs index e5bc5f7fe36343e731f085ddf0304e04170c5b25..6f8669b4a8264897378c32714cafa33f69901b99 100644 --- a/src/runtime2/store/mod.rs +++ b/src/runtime2/store/mod.rs @@ -2,8 +2,9 @@ #[cfg(test)] mod tests; -pub mod component; pub mod unfair_se_lock; +pub mod component; pub mod queue_mpsc; -pub(crate) use component::ComponentStore; \ No newline at end of file +pub(crate) use component::ComponentStore; +pub(crate) use queue_mpsc::{QueueDynMpsc, QueueDynProducer}; \ No newline at end of file diff --git a/src/runtime2/store/queue_mpsc.rs b/src/runtime2/store/queue_mpsc.rs index e6a1c67340302f6c23d5d9e22b30d89499eeccb9..10e659838299bbee643fd5cadadcecbb2d8b1467 100644 --- a/src/runtime2/store/queue_mpsc.rs +++ b/src/runtime2/store/queue_mpsc.rs @@ -257,7 +257,9 @@ impl Drop for QueueDynProducer { // producer end is `Send`, because in debug mode we make sure that there are no // more producers when the queue is destroyed. But is not sync, because that -// would circumvent our atomic counter shenanigans. +// would circumvent our atomic counter shenanigans. Although, now that I think +// about it, we're rather likely to just drop a single "producer" into the +// public part of a component. unsafe impl Send for QueueDynProducer{} #[inline]