Changeset - 9e771c9cf8d3
[Not reviewed]
1 6 3
MH - 3 years ago 2022-01-12 09:41:07
contact@maxhenger.nl
WIP: Control messaging between components
10 files changed with 578 insertions and 261 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use super::runtime::*;
 

	
 
#[derive(Copy, Clone)]
 
@@ -11,6 +12,7 @@ impl PortId {
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
@@ -21,6 +23,7 @@ pub enum PortKind {
 

	
 
pub enum PortState {
 
    Open,
 
    Blocked,
 
    Closed,
 
}
 

	
 
@@ -29,10 +32,36 @@ pub struct Port {
 
    pub peer_id: PortId,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub local_peer_index: u32,
 
    pub peer_comp_id: CompId,
 
}
 

	
 
pub struct Channel {
 
    pub putter_id: PortId,
 
    pub getter_id: PortId,
 
}
 

	
 
pub struct DataMessage {
 
    pub source_port_id: PortId,
 
    pub target_port_id: PortId,
 
    pub content: ValueGroup,
 
}
 

	
 
pub struct ControlMessage {
 
    pub id: u32,
 
    pub sender_comp_id: CompId,
 
    pub content: ControlContent,
 
}
 

	
 
pub enum ControlContent {
 
    Ack,
 
    Ping,
 
    PortPeerChangedBlock,
 
    PortPeerChangedUnblock,
 
}
 

	
 
pub enum Message {
 
    Data(DataMessage),
 
    Control(ControlMessage),
 
}
 

	
 
/// Public inbox: accessible by all threads. Essentially a MPSC channel
 
pub struct InboxPublic {
 

	
 
}
 
\ No newline at end of file
src/runtime2/component.rs
Show inline comments
 
deleted file
src/runtime2/component/component_pdl.rs
Show inline comments
 
new file 100644
 
use crate::protocol::*;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::store::QueueDynMpsc;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
    Exit,
 
}
 

	
 
pub struct CompCtx {
 
    pub id: CompId,
 
    pub ports: Vec<Port>,
 
    pub peers: Vec<Peer>,
 
    pub messages: Vec<ValueGroup>, // same size as "ports"
 
    pub port_id_counter: u32,
 
}
 

	
 
impl Default for CompCtx {
 
    fn default() -> Self {
 
        return Self{
 
            id: CompId(0),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            messages: Vec::new(),
 
            port_id_counter: 0,
 
        }
 
    }
 
}
 

	
 
impl CompCtx {
 
    fn take_message(&mut self, port_id: PortId) -> Option<ValueGroup> {
 
        let port_index = self.get_port_index(port_id).unwrap();
 
        let old_value = &mut self.messages[port_index];
 
        if old_value.values.is_empty() {
 
            return None;
 
        }
 

	
 
        // Replace value in array with an empty one
 
        let mut message = ValueGroup::new_stack(Vec::new());
 
        std::mem::swap(old_value, &mut message);
 
        return Some(message);
 
    }
 

	
 
    fn find_peer(&self, port_id: PortId) -> (&Port, &Peer) {
 
        let port_index = self.get_port_index(port_id).unwrap();
 
        let port_info = &self.ports[port_index];
 
        let peer_index = self.get_peer_index(port_info.peer_comp_id).unwrap();
 
        let peer_info = &self.peers[peer_index];
 
        return (port_info, peer_info);
 
    }
 

	
 
    fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Closed,
 
            peer_comp_id: self.id,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    fn get_port_index(&self, port_id: PortId) -> Option<usize> {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_id == port_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn get_peer_index(&self, peer_id: CompId) -> Option<usize> {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == peer_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn take_port_id(&mut self) -> u32 {
 
        let port_id = self.port_id_counter;
 
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
 
        return port_id;
 
    }
 
}
 

	
 
pub enum ExecStmt {
 
    CreatedChannel((Value, Value)),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    None,
 
}
 

	
 
impl ExecStmt {
 
    fn take(&mut self) -> ExecStmt {
 
        let mut value = ExecStmt::None;
 
        std::mem::swap(self, &mut value);
 
        return value;
 
    }
 

	
 
    fn is_none(&self) -> bool {
 
        match self {
 
            ExecStmt::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
pub struct ExecCtx {
 
    stmt: ExecStmt,
 
}
 

	
 
impl RunContext for ExecCtx {
 
    fn performed_put(&mut self, _port: EvalPortId) -> bool {
 
        match self.stmt.take() {
 
            ExecStmt::None => return false,
 
            ExecStmt::PerformedPut => return true,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_get(&mut self, _port: EvalPortId) -> Option<ValueGroup> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::PerformedGet(value) => return Some(value),
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn fires(&mut self, _port: EvalPortId) -> Option<Value> {
 
        todo!("remove fires")
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        todo!("remove fork")
 
    }
 

	
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::CreatedChannel(ports) => return Some(ports),
 
            _ => unreachable!(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum Mode {
 
    NonSync,
 
    Sync,
 
    BlockedGet,
 
    BlockedPut,
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub mode: Mode,
 
    pub mode_port: PortId, // when blocked on a port
 
    pub mode_value: ValueGroup, // when blocked on a put
 
    pub prompt: Prompt,
 
    pub exec_ctx: ExecCtx,
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt) -> Self {
 
        return Self{
 
            mode: Mode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            prompt: initial_state,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 

	
 
                let port_id = transform_port_id(port_id);
 
                if let Some(message) = comp_ctx.take_message(port_id) {
 
                    // We can immediately receive and continue
 
                    debug_assert!(self.exec_ctx.stmt.is_none());
 
                    self.exec_ctx.stmt = ExecStmt::PerformedGet(message);
 
                    return Ok(CompScheduling::Immediate);
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
                    self.mode_port = port_id;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let port_id = transform_port_id(port_id);
 
                Self::send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value);
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                return Ok(CompScheduling::Exit);
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
            },
 
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                return Ok(CompScheduling::Immediate);
 
            }
 
        }
 

	
 
        return Ok(CompScheduling::Sleep);
 
    }
 

	
 
    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
 
        let mut step_result = EvalContinuation::Stepping;
 
        while let EvalContinuation::Stepping = step_result {
 
            step_result = self.prompt.step(
 
                &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
                &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx,
 
            )?;
 
        }
 

	
 
        return Ok(step_result)
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 

	
 
    }
 

	
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 

	
 
    }
 

	
 
    fn send_message_and_wake_up(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, port_id: PortId, value: ValueGroup) {
 
        use std::sync::atomic::Ordering;
 

	
 
        let (port_info, peer_info) = comp_ctx.find_peer(port_id);
 
        peer_info.handle.inbox.push(Message::Data(DataMessage{
 
            source_port_id: port_id,
 
            target_port_id: port_info.peer_id,
 
            content: value,
 
        }));
 

	
 
        let should_wake_up = peer_info.handle.sleeping.compare_exchange(
 
            true, false, Ordering::AcqRel, Ordering::Relaxed
 
        ).is_ok();
 

	
 
        if should_wake_up {
 
            let comp_key = unsafe{ peer_info.id.upgrade() };
 
            sched_ctx.runtime.enqueue_work(comp_key);
 
        }
 
    }
 

	
 
    fn create_component_and_transfer_ports(sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) {
 
        let component = CompPDL::new(prompt);
 
        let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true);
 
        let created_ctx = &mut component.ctx;
 

	
 
        for port_id in ports.iter().copied() {
 
            // Transfer port
 
            let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id);
 
            Self::add_port_to_component(sched_ctx, created_ctx, port_info);
 

	
 
            // Maybe remove peer from the creator
 
            if let Some(peer_info) = peer_info {
 
                let remove_from_runtime = peer_info.handle.decrement_users();
 
                if remove_from_runtime {
 
                    let removed_comp_key = unsafe{ peer_info.id.upgrade() };
 
                    sched_ctx.runtime.destroy_component(removed_comp_key);
 
                }
 
            }
 
        }
 

	
 
        // Start scheduling
 
        sched_ctx.runtime.enqueue_work(comp_key);
 
    }
 

	
 
    /// Removes a port from a component. Also decrements the port counter in
 
    /// the peer component's entry. If that hits 0 then it will be removed and
 
    /// returned. If returned then the caller is responsible for decrementing
 
    /// the atomic counters of the peer component's handle.
 
    fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option<Peer>) {
 
        use std::sync::atomic::Ordering;
 

	
 
        let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
        let port_info = comp_ctx.ports.remove(port_index);
 

	
 
        // If the component owns the peer, then we don't have to decrement the
 
        // number of peers (because we don't have an entry for ourselves)
 
        if port_info.peer_comp_id == comp_ctx.id {
 
            return (port_info, None);
 
        }
 

	
 
        let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap();
 
        let peer_info = &mut comp_ctx.peers[peer_index];
 
        peer_info.num_associated_ports -= 1;
 

	
 
        // Check if we still have other ports referencing this peer
 
        if peer_info.num_associated_ports != 0 {
 
            return (port_info, None);
 
        }
 

	
 
        let peer_info = comp_ctx.peers.remove(peer_index);
 
        return (port_info, Some(peer_info));
 
    }
 

	
 
    fn add_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_info: Port) {
 
        // Add the port info
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert!(!comp_ctx.ports.iter().any(|v| v.self_id == port_info.self_id));
 
        comp_ctx.ports.push(port_info);
 

	
 
        // Increment counters on peer, or create entry for peer if it doesn't
 
        // exist yet.
 
        match comp_ctx.peers.iter().position(|v| v.id == peer_comp_id) {
 
            Some(peer_index) => {
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = sched_ctx.runtime.get_component_public(peer_comp_id);
 
                handle.increment_users();
 
                comp_ctx.peers.push(Peer{
 
                    id: peer_comp_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                });
 
            }
 
        }
 
    }
 
}
 

	
 
#[inline]
 
fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 

	
 
#[inline]
 
fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    use crate::protocol::eval::Value;
 

	
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortId(port_id.id);
 
                for prev_port in ports.iter() {
 
                    if *prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/control_layer.rs
Show inline comments
 
new file 100644
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::*;
 

	
 
struct ControlEntry {
 
    id: u32,
 
    ack_countdown: u32,
 
    content: ControlContent,
 
    ack_action: ControlAction,
 
}
 

	
 
enum ControlContent {
 
    PeerChange(ControlPeerChange),
 
}
 

	
 
struct ControlPeerChange {
 
    source_port: PortId,
 
    target_port: PortId, // if sent to this port
 
    new_target_comp: CompId, // redirect to this component
 
}
 

	
 
/// Action to be taken when the `Ack`s for a control entry has come in.
 
enum ControlAction {
 
    Nothing,
 
    AckOwnEntry(u32), // ack an entry we own ourselves
 
    ScheduleComponent(CompId), // schedule a particular component for execution
 
}
 

	
 
/// Handling/sending control messages.
 
pub(crate) struct ControlLayer {
 
    id_counter: u32,
 
    entries: Vec<ControlEntry>,
 
}
 

	
 
impl ControlLayer {
 
    fn handle_created_component(&mut self, creator_ctx: &CompCtx, created_ctx: &CompCtx) {
 
        for peer in &created_ctx.peers {
 
            // TODO: Optimize when we ourselves are the peer.
 

	
 
            // Create entry that will unblock the peer if it confirms that all
 
            // of its ports have been blocked
 

	
 
            peer.handle.inbox.push(Message::)
 
        }
 
    }
 

	
 
    fn take_id(&mut self) -> u32 {
 
        let id = self.id_counter;
 
        self.id_counter += 1;
 
        return id;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/mod.rs
Show inline comments
 
new file 100644
 
mod component_pdl;
 
mod control_layer;
 

	
 
pub(crate) use component_pdl::{CompPDL, CompCtx, CompScheduling};
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
@@ -4,8 +4,9 @@ use std::collections::VecDeque;
 

	
 
use crate::protocol::*;
 

	
 
use super::communication::Message;
 
use super::component::{CompCtx, CompPDL};
 
use super::store::ComponentStore;
 
use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer};
 

	
 
// -----------------------------------------------------------------------------
 
// Component
 
@@ -40,23 +41,42 @@ impl CompId {
 
    }
 
}
 

	
 
/// In-runtime storage of a component
 
/// Private fields of a component, may only be modified by a single thread at
 
/// a time.
 
pub(crate) struct RuntimeComp {
 
    pub public: CompPublic,
 
    pub private: CompPrivate,
 
    pub code: CompPDL,
 
    pub ctx: CompCtx,
 
    pub inbox: QueueDynMpsc<Message>
 
}
 

	
 
/// Should contain everything that is accessible in a thread-safe manner
 
pub(crate) struct CompPublic {
 
    pub sleeping: AtomicBool,
 
    pub num_handles: AtomicU32, // modified upon creating/dropping `CompHandle` instances
 
    pub num_handles: AtomicU32, // manually modified (!)
 
    pub inbox: QueueDynProducer<Message>,
 
}
 

	
 
/// Handle to public part of a component.
 
/// Handle to public part of a component. Would be nice if we could
 
/// automagically manage the `num_handles` counter. But when it reaches zero we
 
/// need to manually remove the handle from the runtime. So be careful.
 
pub(crate) struct CompHandle {
 
    target: *const CompPublic,
 
}
 

	
 
impl CompHandle {
 
    pub(crate) fn increment_users(&self) {
 
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
 
    }
 

	
 
    /// Returns true if the component should be destroyed
 
    pub(crate) fn decrement_users(&self) -> bool {
 
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        return old_count == 1;
 
    }
 
}
 

	
 
impl std::ops::Deref for CompHandle {
 
    type Target = CompPublic;
 

	
 
@@ -65,13 +85,6 @@ impl std::ops::Deref for CompHandle {
 
    }
 
}
 

	
 
/// May contain non thread-safe fields. Accessed only by the scheduler which
 
/// will temporarily "own" the component.
 
pub(crate) struct CompPrivate {
 
    pub code: CompPDL,
 
    pub ctx: CompCtx,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 
@@ -119,29 +132,30 @@ impl Runtime {
 

	
 
    // Creating/destroying components
 

	
 
    pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> CompKey {
 
    /// Creates a new component. Note that the public part will be properly
 
    /// initialized, but the private fields (e.g. owned ports, peers, etc.)
 
    /// are not.
 
    pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) {
 
        let inbox_queue = QueueDynMpsc::new(16);
 
        let inbox_producer = inbox_queue.producer();
 
        let comp = RuntimeComp{
 
            public: CompPublic{
 
                sleeping: AtomicBool::new(initially_sleeping),
 
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
 
                inbox: inbox_producer,
 
            },
 
            private: CompPrivate{
 
                code: comp,
 
                ctx: CompCtx{
 
                    id: CompId(0),
 
                    ports: Vec::new(),
 
                    peers: Vec::new(),
 
                    messages: Vec::new(),
 
                }
 
            }
 
            code: comp,
 
            ctx: CompCtx::default(),
 
            inbox: inbox_queue,
 
        };
 

	
 
        let index = self.components.create(comp);
 

	
 
        // TODO: just do a reserve_index followed by a consume_index or something
 
        self.components.get_mut(index).private.ctx.id = CompId(index);
 
        let component = self.components.get_mut(index);
 
        component.ctx.id = CompId(index);
 

	
 
        return CompKey(index);
 
        return (CompKey(index), component);
 
    }
 

	
 
    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
 
@@ -149,9 +163,9 @@ impl Runtime {
 
        return component;
 
    }
 

	
 
    pub(crate) fn get_component_public(&self, id: CompId) -> &CompPublic {
 
    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
 
        let component = self.components.get(id.0);
 
        return &component.public;
 
        return CompHandle{ target: &component.public };
 
    }
 

	
 
    pub(crate) fn destroy_component(&self, key: CompKey) {
src/runtime2/scheduler.rs
Show inline comments
 
@@ -2,6 +2,7 @@ use std::sync::atomic::Ordering;
 

	
 
use super::component::*;
 
use super::runtime::*;
 
use super::communication::*;
 

	
 
/// Data associated with a scheduler thread
 
pub(crate) struct Scheduler {
 
@@ -38,7 +39,7 @@ impl Scheduler {
 
            // be re-executed immediately.
 
            let mut new_scheduling = CompScheduling::Immediate;
 
            while let CompScheduling::Immediate = new_scheduling {
 
                new_scheduling = component.private.code.run(&scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error");
 
                new_scheduling = component.code.run(&scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error");
 
            }
 

	
 
            // Handle the new scheduling
src/runtime2/store/component.rs
Show inline comments
 
@@ -61,7 +61,7 @@ struct Inner<T: Sized> {
 
    index_mask: usize,
 
}
 

	
 
type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 
type InnerShared<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 

	
 
impl<T: Sized> ComponentStore<T> {
 
    pub fn new(initial_size: usize) -> Self {
 
@@ -133,22 +133,22 @@ impl<T: Sized> ComponentStore<T> {
 
    }
 

	
 
    #[inline]
 
    fn pop_freelist_index<'a>(&'a self, mut read_lock: InnerRead<'a, T>) -> (InnerRead<'a, T>, u32) {
 
    fn pop_freelist_index<'a>(&'a self, mut shared_lock: InnerShared<'a, T>) -> (InnerShared<'a, T>, u32) {
 
        'attempt_read: loop {
 
            // Load indices and check for reallocation condition
 
            let current_size = read_lock.size;
 
            let current_size = shared_lock.size;
 
            let mut read_index = self.read_head.load(Ordering::Relaxed);
 
            let limit_index = self.limit_head.load(Ordering::Acquire);
 

	
 
            if read_index == limit_index {
 
                read_lock = self.reallocate(current_size, read_lock);
 
                shared_lock = self.reallocate(current_size, shared_lock);
 
                continue 'attempt_read;
 
            }
 

	
 
            loop {
 
                let preemptive_read = read_lock.freelist[read_index & read_lock.index_mask];
 
                let preemptive_read = shared_lock.freelist[read_index & shared_lock.index_mask];
 
                if let Err(actual_read_index) = self.read_head.compare_exchange(
 
                    read_index, (read_index + 1) & read_lock.compare_mask,
 
                    read_index, (read_index + 1) & shared_lock.compare_mask,
 
                    Ordering::AcqRel, Ordering::Acquire
 
                ) {
 
                    // We need to try again
 
@@ -157,13 +157,13 @@ impl<T: Sized> ComponentStore<T> {
 
                }
 

	
 
                // If here then we performed the read
 
                return (read_lock, preemptive_read);
 
                return (shared_lock, preemptive_read);
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn initialize_at_index(&self, read_lock: InnerRead<T>, index: u32, value: T) {
 
    fn initialize_at_index(&self, read_lock: InnerShared<T>, index: u32, value: T) {
 
        let mut target_ptr = read_lock.data[index as usize];
 

	
 
        unsafe {
 
@@ -179,7 +179,7 @@ impl<T: Sized> ComponentStore<T> {
 
    }
 

	
 
    #[inline]
 
    fn push_freelist_index(&self, read_lock: &InnerRead<T>, index_to_put_back: u32) {
 
    fn push_freelist_index(&self, read_lock: &InnerShared<T>, index_to_put_back: u32) {
 
        // Acquire an index in the freelist to which we can write
 
        let mut cur_write_index = self.write_head.load(Ordering::Relaxed);
 
        let mut new_write_index = (cur_write_index + 1) & read_lock.compare_mask;
 
@@ -208,14 +208,14 @@ impl<T: Sized> ComponentStore<T> {
 
    }
 

	
 
    #[inline]
 
    fn destruct_at_index(&self, read_lock: &InnerRead<T>, index: u32) {
 
    fn destruct_at_index(&self, read_lock: &InnerShared<T>, index: u32) {
 
        let target_ptr = read_lock.data[index as usize];
 
        unsafe{ ptr::drop_in_place(target_ptr); }
 
    }
 

	
 
    // NOTE: Bit of a mess, and could have a cleanup with better logic for the
 
    // resizing. Maybe even a different indexing scheme...
 
    fn reallocate(&self, old_size: usize, inner: InnerRead<T>) -> InnerRead<T> {
 
    fn reallocate(&self, old_size: usize, inner: InnerShared<T>) -> InnerShared<T> {
 
        drop(inner);
 
        {
 
            // After dropping read lock, acquire write lock
src/runtime2/store/mod.rs
Show inline comments
 
@@ -2,8 +2,9 @@
 
#[cfg(test)]
 
mod tests;
 

	
 
pub mod component;
 
pub mod unfair_se_lock;
 
pub mod component;
 
pub mod queue_mpsc;
 

	
 
pub(crate) use component::ComponentStore;
 
\ No newline at end of file
 
pub(crate) use component::ComponentStore;
 
pub(crate) use queue_mpsc::{QueueDynMpsc, QueueDynProducer};
 
\ No newline at end of file
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
@@ -257,7 +257,9 @@ impl<T> Drop for QueueDynProducer<T> {
 

	
 
// producer end is `Send`, because in debug mode we make sure that there are no
 
// more producers when the queue is destroyed. But is not sync, because that
 
// would circumvent our atomic counter shenanigans.
 
// would circumvent our atomic counter shenanigans. Although, now that I think
 
// about it, we're rather likely to just drop a single "producer" into the
 
// public part of a component.
 
unsafe impl<T> Send for QueueDynProducer<T>{}
 

	
 
#[inline]
0 comments (0 inline, 0 general)