Changeset - 0de39654770f
[Not reviewed]
0 7 0
MH - 3 years ago 2022-01-16 12:30:16
contact@maxhenger.nl
WIP: Closing ports
7 files changed with 147 insertions and 40 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use super::runtime::*;
 
use super::component::*;
 

	
 
#[derive(Copy, Clone)]
 
pub struct PortId(pub u32);
 

	
 

	
 
impl PortId {
 
    /// This value is not significant, it is chosen to make debugging easier: a
 
    /// very large port number is more likely to shine a light on bugs.
 
    pub fn new_invalid() -> Self {
 
        return Self(u32::MAX);
 
    }
 
}
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub enum PortState {
 
    Open,
 
    Blocked,
 
    Closed,
 
}
 

	
 
pub struct Port {
 
    pub self_id: PortId,
 
    pub peer_id: PortId,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub peer_comp_id: CompId,
 
}
 

	
 
pub struct Channel {
 
    pub putter_id: PortId,
 
    pub getter_id: PortId,
 
}
 

	
 
pub struct DataMessage {
 
    pub data_header: MessageDataHeader,
 
    pub sync_header: MessageSyncHeader,
 
    pub content: ValueGroup,
 
}
 

	
 
pub struct MessageSyncHeader {
 
    pub sync_round: u32,
 
}
 

	
 
pub struct MessageDataHeader {
 
    pub expected_mapping: Vec<(PortId, u32)>,
 
    pub new_mapping: u32,
 
    pub source_port: PortId,
 
    pub target_port: PortId,
 
}
 

	
 
pub struct ControlMessage {
 
    pub id: ControlId,
 
    pub sender_comp_id: CompId,
 
    pub target_port_id: Option<PortId>,
 
    pub content: ControlMessageContent,
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub enum ControlMessageContent {
 
    Ack,
 
    BlockPort(PortId),
 
    UnblockPort(PortId),
 
    ClosePort(PortId),
 
    PortPeerChangedBlock(PortId),
 
    PortPeerChangedUnblock(PortId, CompId),
 
}
 

	
 
pub enum Message {
 
    Data(DataMessage),
 
    Control(ControlMessage),
 
}
 

	
 
impl Message {
 
    pub(crate) fn target_port(&self) -> Option<PortId> {
 
        match self {
 
            Message::Data(v) =>
 
                return Some(v.data_header.target_port),
 
            Message::Control(v) =>
 
                return v.target_port_id,
 
        }
 
    }
 
}
 

	
 

	
src/runtime2/component/component_pdl.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::store::QueueDynMpsc;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::*;
 
use super::control_layer::*;
 
use super::consensus::Consensus;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
    Exit,
 
}
 

	
 
pub struct CompCtx {
 
    pub id: CompId,
 
    pub ports: Vec<Port>,
 
    pub peers: Vec<Peer>,
 
    pub messages: Vec<ValueGroup>, // same size as "ports"
 
    pub port_id_counter: u32,
 
}
 

	
 
impl Default for CompCtx {
 
    fn default() -> Self {
 
        return Self{
 
            id: CompId(0),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            messages: Vec::new(),
 
            port_id_counter: 0,
 
        }
 
    }
 
}
 

	
 
impl CompCtx {
 
    fn take_message(&mut self, port_id: PortId) -> Option<ValueGroup> {
 
        let port_index = self.get_port_index(port_id).unwrap();
 
        let old_value = &mut self.messages[port_index];
 
        if old_value.values.is_empty() {
 
            return None;
 
        }
 

	
 
        // Replace value in array with an empty one
 
        let mut message = ValueGroup::new_stack(Vec::new());
 
        std::mem::swap(old_value, &mut message);
 
        return Some(message);
 
    }
 

	
 
    fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Closed,
 
            peer_comp_id: self.id,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    fn get_port(&self, port_id: PortId) -> &Port {
 
        let index = self.get_port_index(port_id).unwrap();
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, port_id: PortId) -> &mut Port {
 
        let index = self.get_port_index(port_id).unwrap();
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_index(&self, port_id: PortId) -> Option<usize> {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_id == port_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn get_peer(&self, peer_id: CompId) -> &Peer {
 
    pub(crate) fn get_peer(&self, peer_id: CompId) -> &Peer {
 
        let index = self.get_peer_index(peer_id).unwrap();
 
        return &self.peers[index];
 
    }
 

	
 
    fn get_peer_mut(&mut self, peer_id: CompId) -> &mut Peer {
 
        let index = self.get_peer_index(peer_id).unwrap();
 
        return &mut self.peers[index];
 
    }
 

	
 
    pub(crate) fn get_peer_index(&self, peer_id: CompId) -> Option<usize> {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == peer_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn take_port_id(&mut self) -> u32 {
 
        let port_id = self.port_id_counter;
 
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
 
        return port_id;
 
    }
 
}
 

	
 
pub enum ExecStmt {
 
    CreatedChannel((Value, Value)),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    None,
 
}
 

	
 
impl ExecStmt {
 
    fn take(&mut self) -> ExecStmt {
 
        let mut value = ExecStmt::None;
 
        std::mem::swap(self, &mut value);
 
        return value;
 
    }
 

	
 
    fn is_none(&self) -> bool {
 
        match self {
 
            ExecStmt::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
pub struct ExecCtx {
 
    stmt: ExecStmt,
 
}
 

	
 
impl RunContext for ExecCtx {
 
    fn performed_put(&mut self, _port: EvalPortId) -> bool {
 
        match self.stmt.take() {
 
            ExecStmt::None => return false,
 
            ExecStmt::PerformedPut => return true,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_get(&mut self, _port: EvalPortId) -> Option<ValueGroup> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::PerformedGet(value) => return Some(value),
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn fires(&mut self, _port: EvalPortId) -> Option<Value> {
 
        todo!("remove fires")
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        todo!("remove fork")
 
    }
 

	
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::CreatedChannel(ports) => return Some(ports),
 
            _ => unreachable!(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum Mode {
 
    NonSync,
 
    Sync,
 
    BlockedGet,
 
    BlockedPut,
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub mode: Mode,
 
    pub mode_port: PortId, // when blocked on a port
 
    pub mode_value: ValueGroup, // when blocked on a put
 
    pub prompt: Prompt,
 
    pub control: ControlLayer,
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: Vec<Option<DataMessage>>,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt) -> Self {
 
        return Self{
 
            mode: Mode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: u32,
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main: Vec::new(),
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn handle_setup(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        self.inbox.resize(comp_ctx.ports.len(), None);
 
    }
 

	
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        if let Some(new_target) = self.control.should_reroute(&message) {
 
            let target = sched_ctx.runtime.get_component_public(new_target);
 
            target.inbox.push(message);
 

	
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_Ctx, message);
 
            },
 
        }
 
    }
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 

	
 
                let port_id = transform_port_id(port_id);
 
                if let Some(message) = comp_ctx.take_message(port_id) {
 
                    // We can immediately receive and continue
 
                    debug_assert!(self.exec_ctx.stmt.is_none());
 
                    self.exec_ctx.stmt = ExecStmt::PerformedGet(message);
 
                    return Ok(CompScheduling::Immediate);
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
                    self.mode_port = port_id;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                let port_info = comp_ctx.get_port(port_id);
 
                if port_info.state == PortState::Blocked {
 

	
 
                } else {
 

	
 
                }
 
                self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                return Ok(CompScheduling::Exit);
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 

	
 
                let mut ports = Vec::new(); // TODO: Optimize
 
                let protocol = &sched_ctx.runtime.protocol;
 
                find_ports_in_value_group(&arguments, &mut ports);
 
                let prompt = Prompt::new(
 
                    &protocol.types, &protocol.heap,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &workspace_ports);
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                return Ok(CompScheduling::Immediate);
 
            }
 
        }
 
    }
 

	
 
    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
 
        let mut step_result = EvalContinuation::Stepping;
 
        while let EvalContinuation::Stepping = step_result {
 
            step_result = self.prompt.step(
 
                &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
                &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx,
 
            )?;
 
        }
 

	
 
        return Ok(step_result)
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        self.consensus.notify_sync_start(comp_ctx);
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.mode = Mode::Sync;
 
    }
 

	
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        self.consensus.notify_sync_end();
 
        debug_assert_eq!(self.mode, Mode::Sync);
 
        self.mode = Mode::NonSync;
 
    }
 

	
 
    fn send_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) {
 
        use std::sync::atomic::Ordering;
 

	
 
        let port_info = comp_ctx.get_port(source_port_id);
 
        let peer_info = comp_ctx.get_peer(port_info.peer_comp_id);
 
        let annotated_message = self.consensus.annotate_message_data(port_info, value);
 
        peer_info.handle.inbox.push(Message::Data(annotated_message));
 

	
 
        let should_wake_up = peer_info.handle.sleeping.compare_exchange(
 
            true, false, Ordering::AcqRel, Ordering::Relaxed
 
        ).is_ok();
 

	
 
        if should_wake_up {
 
            let comp_key = unsafe{ peer_info.id.upgrade() };
 
            sched_ctx.runtime.enqueue_work(comp_key);
 
        }
 
    }
 

	
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        // Check if we can insert it directly into the storage associated with
 
        // the port
 
        let target_port_id = message.data_header.target_port;
 
        let port_index = comp_ctx.get_port_index(target_port_id).unwrap();
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 

	
 
            // After direct insertion, check if this component's execution is 
 
            // blocked on receiving a message on that port
 
            debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly
 
            if self.mode == Mode::BlockedGet && self.mode_port == message.data_header.target_port {
 
                // We were indeed blocked
 
                self.mode = Mode::Sync;
 
                self.mode_port = PortId::new_invalid();
 
            }
 
            
 
            return;
 
        }
 

	
 
        // The direct inbox is full, so the port will become (or was already) blocked
 
        let port_info = &mut comp_ctx.ports[port_index];
 
        debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 

	
 
        if port_info.state == PortState::Open {
 
            let (target_comp_id, block_message) =
 
                self.control.mark_port_blocked(target_port_id, comp_ctx);
 
            debug_assert_eq!(port_info.peer_comp_id, target_comp_id);
 

	
 
            let peer = comp_ctx.get_peer(target_comp_id);
 
            peer.handle.inbox.push(Message::Control(block_message));
 
            wake_up_if_sleeping(sched_ctx, target_comp_id, &peer.handle);
 
        }
 

	
 
        // But we still need to remember the message, so:
 
        self.inbox_backup.push(message);
 
    }
 

	
 
    fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) {
 
        match message.content {
 
            ControlMessageContent::Ack => {
 
                let mut to_ack = message.id;
 
                loop {
 
                    let action = self.control.handle_ack(to_ack, comp_ctx);
 
                    let action = self.control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
                    match action {
 
                        AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => {
 
                            // FIX @NoDirectHandle
 
                            let handle = sched_ctx.runtime.get_component_public(target_comp);
 
                            handle.inbox.push(Message::Control(message));
 
                            wake_up_if_sleeping(sched_ctx, target_comp, &handle);
 
                            to_ack = new_to_ack;
 
                        },
 
                        AckAction::ScheduleComponent(to_schedule) => {
 
                            // FIX @NoDirectHandle
 
                            let handle = sched_ctx.runtime.get_component_public(to_schedule);
 
                            wake_up_if_sleeping(sched_ctx, to_schedule, &handle);
 
                            break;
 
                        },
 
                        AckAction::None => {
 
                            break;
 
                        }
 
                    }
 
                }
 
            },
 
            ControlMessageContent::BlockPort(port_id) => {
 
                // On of our messages was accepted, but the port should be
 
                // blocked.
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                if port_info.state != PortState::Closed {
 
                    debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages
 
                    port_info.state = PortState::Blocked;
 
                }
 
            },
 
            ControlMessageContent::ClosePort(port_id) => {
 
                // Request to close the port. We immediately comply and remove
 
                // the component handle as well
 
                let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
                let port_info = &mut comp_ctx.ports[port_index];
 
                port_info.state = PortState::Closed;
 

	
 
                let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap();
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports -= 1;
 
                if peer_info.num_associated_ports == 0 {
 
                    // TODO: @Refactor clean up all these uses of "num_associated_ports"
 
                    let should_remove = peer_info.handle.decrement_users();
 
                    if should_remove {
 
                        let comp_key = unsafe{ peer_info.id.upgrade() };
 
                        sched_ctx.runtime.destroy_component(comp_key);
 
                    }
 

	
 
                    comp_ctx.peers.remove(peer_index);
 
                }
 
            }
 
            ControlMessageContent::UnblockPort(port_id) => {
 
                // We were previously blocked (or already closed)
 
                let port_info = comp_ctx.get_port(port_id);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed);
 
                if port_info.state == PortState::Blocked {
 
                    self.unblock_port(sched_ctx, comp_ctx, port_id);
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
                // The peer of our port has just changed. So we are asked to
 
                // temporarily block the port (while our original recipient is
 
                // potentially rerouting some of the in-flight messages) and
 
                // Ack. Then we wait for the `unblock` call.
 
                debug_assert_eq!(message.target_port_id, port_id);
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 
                if port_info.state == PortState::Open {
 
                    port_info.state = PortState::Blocked;
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => {
 
                debug_assert_eq!(message.target_port_id, port_id);
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert!(port_info.state == PortState::Blocked);
 
                port_info.peer_comp_id = new_comp_id;
 
                self.unblock_port(sched_ctx, comp_ctx, port_id);
 
            }
 
        }
 
    }
 

	
 
    fn unblock_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) {
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        port_info.state = PortState::Open;
 

	
 
        if self.mode == Mode::BlockedPut && port_id == self.mode_port {
 
            // We were blocked on the port that just became unblocked, so
 
            // send the message.
 
            let mut replacement = ValueGroup::default();
 
            std::mem::swap(&mut replacement, &mut self.mode_value);
 
            self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement);
 

	
 
            self.mode = Mode::Sync;
 
            self.mode_port = PortId::new_invalid();
 
        }
 
    }
 

	
 
    fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) {
 
        let component = CompPDL::new(prompt);
 
        let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true);
 
        let created_ctx = &mut component.ctx;
 

	
 
        let mut has_reroute_entry = false;
 
        let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 

	
 
        for port_id in ports.iter().copied() {
 
            // Create temporary reroute entry if the peer is another component
 
            let port_info = creator_ctx.get_port(port_id);
 
            debug_assert_ne!(port_info.state, PortState::Blocked);
 
            if port_info.peer_comp_id == creator_ctx.id {
 
                // We own the peer port. So retrieve it and modify the peer directly
 
                let port_info = creator_ctx.get_port_mut(port_info.peer_id);
 
                port_info.peer_comp_id = created_ctx.id;
 
            } else {
 
                // We don't own the port, so send the appropriate messages and
 
                // notify the control layer
 
                has_reroute_entry = true;
 
                let message = self.control.add_reroute_entry(
 
                    creator_ctx.id, port_info.peer_id, port_info.peer_comp_id,
 
                    port_info.self_id, created_ctx.id, schedule_entry_id
 
                );
 
                let peer_info = creator_ctx.get_peer(port_info.peer_comp_id);
 
                peer_info.handle.inbox.push(message);
 
            }
 

	
 
            // Transfer port and create temporary reroute entry
 
            let (mut port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id);
 
            if port_info.state == PortState::Blocked {
 
                todo!("Think about this when you're not tired!");
 
            }
 
            Self::add_port_to_component(sched_ctx, created_ctx, port_info);
 

	
 
            // Maybe remove peer from the creator
 
            if let Some(mut peer_info) = peer_info {
 
                let remove_from_runtime = peer_info.handle.decrement_users();
 
                if remove_from_runtime {
 
                    let removed_comp_key = unsafe{ peer_info.id.upgrade() };
 
                    sched_ctx.runtime.destroy_component(removed_comp_key);
 
                }
 
            }
 
        }
 

	
 
        if !has_reroute_entry {
 
            // We can schedule the component immediately
 
            self.control.remove_schedule_entry(schedule_entry_id);
 
            sched_ctx.runtime.enqueue_work(comp_key);
 
        } // else: wait for the `Ack`s, they will trigger the scheduling of the component
 
    }
 

	
 
    /// Removes a port from a component. Also decrements the port counter in
 
    /// the peer component's entry. If that hits 0 then it will be removed and
 
    /// returned. If returned then the caller is responsible for decrementing
 
    /// the atomic counters of the peer component's handle.
 
    fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option<Peer>) {
 
        use std::sync::atomic::Ordering;
 

	
 
        let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
        let port_info = comp_ctx.ports.remove(port_index);
 

	
 
        // If the component owns the peer, then we don't have to decrement the
 
        // number of peers (because we don't have an entry for ourselves)
 
        if port_info.peer_comp_id == comp_ctx.id {
 
            return (port_info, None);
 
        }
 

	
 
        let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap();
 
        let peer_info = &mut comp_ctx.peers[peer_index];
 
        peer_info.num_associated_ports -= 1;
 

	
 
        // Check if we still have other ports referencing this peer
 
        if peer_info.num_associated_ports != 0 {
 
            return (port_info, None);
 
        }
 

	
 
        let peer_info = comp_ctx.peers.remove(peer_index);
 
        return (port_info, Some(peer_info));
 
    }
 

	
 
    fn add_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_info: Port) {
 
        // Add the port info
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert!(!comp_ctx.ports.iter().any(|v| v.self_id == port_info.self_id));
 
        comp_ctx.ports.push(port_info);
 

	
 
        // Increment counters on peer, or create entry for peer if it doesn't
 
        // exist yet.
 
        match comp_ctx.peers.iter().position(|v| v.id == peer_comp_id) {
 
            Some(peer_index) => {
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = sched_ctx.runtime.get_component_public(peer_comp_id);
 
                comp_ctx.peers.push(Peer{
 
                    id: peer_comp_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                });
 
            }
 
        }
 
    }
 

	
 
    fn change_port_peer_component(
 
        &mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
        port_id: PortId, new_peer_comp_id: CompId
 
    ) {
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        let cur_peer_comp_id = port_info.peer_comp_id;
 
        let cur_peer_info = comp_ctx.get_peer_mut(cur_peer_comp_id);
 
        cur_peer_info.num_associated_ports -= 1;
 

	
 
        if cur_peer_info.num_associated_ports == 0 {
 
            let should_remove = cur_peer_info.handle.decrement_users();
 
            if should_remove {
 
                let cur_peer_comp_key = unsafe{ cur_peer_comp_id.upgrade() };
 
                sched_ctx.runtime.destroy_component(cur_peer_comp_key);
 

	
 
            }
 
        }
 
    }
 
}
 

	
 
#[inline]
 
fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 

	
 
#[inline]
 
fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    use crate::protocol::eval::Value;
 

	
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortId(port_id.id);
 
                for prev_port in ports.iter() {
 
                    if *prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 

	
 
/// If the component is sleeping, then that flag will be atomically set to
 
/// false. If we're the ones that made that happen then we add it to the work
 
/// queue.
 
fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) {
 
    use std::sync::atomic::Ordering;
 

	
 
    let should_wake_up = handle.sleeping
 
        .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
 
        .is_ok();
 

	
 
    if should_wake_up {
 
        let comp_key = unsafe{ comp_id.upgrade() };
 
        sched_ctx.runtime.enqueue_work(comp_key);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/control_layer.rs
Show inline comments
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::*;
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) struct ControlId(u32);
 

	
 
impl ControlId {
 
    /// Like other invalid IDs, this one doesn't care any significance, but is
 
    /// just set at u32::MAX to hopefully bring out bugs sooner.
 
    fn new_invalid() -> Self {
 
        return ControlId(u32::MAX);
 
    }
 
}
 

	
 
struct ControlEntry {
 
    id: ControlId,
 
    ack_countdown: u32,
 
    content: ControlContent,
 
}
 

	
 
enum ControlContent {
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(ContentScheduleComponent),
 
    BlockedPort(ContentBlockedPort),
 
    ScheduleComponent(CompId),
 
    BlockedPort(PortId),
 
    ClosedPort(PortId),
 
}
 

	
 
struct ContentPeerChange {
 
    source_port: PortId,
 
    source_comp: CompId,
 
    target_port: PortId,
 
    new_target_comp: CompId,
 
    schedule_entry_id: ControlId,
 
}
 

	
 
struct ContentScheduleComponent {
 
    to_schedule: CompId,
 
}
 

	
 
struct ContentBlockedPort {
 
    blocked_port: PortId,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessageAndAck(CompId, ControlMessage, ControlId),
 
    ScheduleComponent(CompId),
 
}
 

	
 
/// Handling/sending control messages.
 
pub(crate) struct ControlLayer {
 
    id_counter: ControlId,
 
    entries: Vec<ControlEntry>,
 
}
 

	
 
impl ControlLayer {
 
    pub(crate) fn should_reroute(&self, message: &Message) -> Option<CompId> {
 
        // Safety note: rerouting should occur during the time when we're
 
        // notifying a peer of a new component. During this period that
 
        // component hasn't been executed yet, so cannot have died yet.
 
        // FIX @NoDirectHandle
 
        let target_port = message.target_port();
 
        if target_port.is_none() {
 
            return None;
 
        }
 

	
 
        let target_port = target_port.unwrap();
 
        for entry in &self.entries {
 
            if let ControlContent::PeerChange(entry) = &entry.content {
 
                if entry.target_port == target_port {
 
                    return Some(entry.new_target_comp);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, comp_ctx: &CompCtx) -> AckAction {
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction {
 
        let entry_index = self.get_entry_index(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 

	
 
        entry.ack_countdown -= 1;
 
        if entry.ack_countdown != 0 {
 
            return AckAction::None;
 
        }
 

	
 
        // All `Ack`s received, take action based on the kind of entry
 
        match &entry.content {
 
            ControlContent::PeerChange(content) => {
 
                // If change of peer is ack'd. Then we are certain we have
 
                // rerouted all of the messages, and the sender's port can now
 
                // be unblocked again.
 
                let target_comp_id = content.source_comp;
 
                let message_to_send = ControlMessage{
 
                    id: ControlId::new_invalid(),
 
                    sender_comp_id: comp_ctx.id,
 
                    target_port_id: Some(content.source_port),
 
                    content: ControlMessageContent::PortPeerChangedUnblock(
 
                        content.source_port,
 
                        content.new_target_comp
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 

	
 
                self.entries.remove(entry_index);
 
                self.handle_ack(to_ack, comp_ctx);
 
                self.handle_ack(to_ack, sched_ctx, comp_ctx);
 

	
 
                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
 
            },
 
            ControlContent::ScheduleComponent(content) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return AckAction::ScheduleComponent(content.to_schedule);
 
            },
 
            ControlContent::BlockedPort(_) => unreachable!(),
 
            ControlContent::ClosedPort(port_id) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_index = comp_ctx.get_port_index(*port_id).unwrap();
 
                debug_assert_eq!(comp_ctx.ports[port_index].state, PortState::Blocked);
 
                let peer_id = comp_ctx.ports[port_index].peer_comp_id;
 
                let peer_index = comp_ctx.get_peer_index(peer_id).unwrap();
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports -= 1;
 

	
 
                if peer_info.num_associated_ports == 0 {
 
                    let should_remove = peer_info.handle.decrement_users();
 
                    if should_remove {
 
                        let comp_key = unsafe{ peer_info.id.upgrade() };
 
                        sched_ctx.runtime.destroy_component(comp_key);
 
                    }
 

	
 
                    comp_ctx.peers.remove(peer_index);
 
                }
 

	
 
                return AckAction::None;
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer (due to component creation)
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::ScheduleComponent(ContentScheduleComponent{
 
                to_schedule: to_schedule_id
 
            }),
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Removes a schedule entry. Only used if the caller preemptively called
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
    }
 

	
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
        target_port_id: PortId, new_comp_id: CompId,
 
        schedule_entry_id: ControlId,
 
    ) -> Message {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::PeerChange(ContentPeerChange{
 
                source_port: source_port_id,
 
                source_comp: source_comp_id,
 
                target_port: target_port_id,
 
                new_target_comp: new_comp_id,
 
                schedule_entry_id,
 
            }),
 
        });
 

	
 
        // increment counter on schedule entry
 
        for entry in &mut self.entries {
 
            if entry.id == schedule_entry_id {
 
                entry.ack_countdown += 1;
 
                break;
 
            }
 
        }
 

	
 
        return Message::Control(ControlMessage{
 
            id: entry_id,
 
            sender_comp_id: creator_comp_id,
 
            target_port_id: Some(source_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock(source_port_id)
 
        })
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking and unblocking ports
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> {
 
        let port = comp_ctx.get_port_mut(port_id);
 
        debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked);
 

	
 
        port.state = PortState::Closed;
 

	
 
        if port.peer_comp_id == comp_ctx.id {
 
            // We own the other end of the channel as well
 
            return None;
 
        }
 

	
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::ClosedPort(port_id),
 
        });
 

	
 
        return Some((
 
            port.peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port.peer_id),
 
                content: ControlMessageContent::ClosePort(port.peer_id),
 
            }
 
        ));
 
    }
 

	
 
    pub(crate) fn mark_port_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // TODO: Feels like this shouldn't be an entry. Hence this class should
 
        //  be renamed. Lets see where the code ends up being
 
        let entry_id = self.take_id();
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues
 
        port_info.state = PortState::Blocked;
 

	
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0,
 
            content: ControlContent::BlockedPort(ContentBlockedPort{
 
                blocked_port: port_id,
 
            }),
 
        });
 

	
 
        return (
 
            port_info.peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_id),
 
                content: ControlMessageContent::BlockPort(port_info.peer_id),
 
            }
 
        );
 
    }
 

	
 
    pub(crate) fn mark_port_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // Find the entry that contains the blocking entry for the port
 
        let mut entry_index = usize::MAX;
 
        let mut entry_id = ControlId::MAX;
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if let ControlContent::BlockedPort(block_entry) = &entry.content {
 
                if block_entry.blocked_port == port_id {
 
                    entry_index = index;
 
                    entry_id = entry.id;
 
                    break;
 
                }
 
            }
 
        }
 

	
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        port_info.state = PortState::Open;
 

	
 
        return (
 
            port_info.peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_id),
 
                content: ControlMessageContent::UnblockPort(port_info.peer_id),
 
            }
 
        )
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn take_id(&mut self) -> ControlId {
 
        let id = self.id_counter;
 
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
 
        return id;
 
    }
 

	
 
    fn get_entry_index(&self, entry_id: ControlId) -> Option<usize> {
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if entry.id == entry_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
impl Default for ControlLayer {
 
    fn default() -> Self {
 
        return ControlLayer{
 
            id_counter: ControlId(0),
 
            entries: Vec::new(),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/mod.rs
Show inline comments
 
mod component_pdl;
 
mod control_layer;
 
mod consensus;
 

	
 
pub(crate) use component_pdl::{CompPDL, CompCtx, CompScheduling};
 
pub(crate) use control_layer::{ControlId};
 
\ No newline at end of file
 
pub(crate) use control_layer::{ControlId};
 

	
 
use super::scheduler::*;
 
use super::runtime::*;
 

	
 
/// If the component is sleeping, then that flag will be atomically set to
 
/// false. If we're the ones that made that happen then we add it to the work
 
/// queue.
 
pub(crate) fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) {
 
    use std::sync::atomic::Ordering;
 

	
 
    let should_wake_up = handle.sleeping
 
        .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
 
        .is_ok();
 

	
 
    if should_wake_up {
 
        let comp_key = unsafe{ comp_id.upgrade() };
 
        sched_ctx.runtime.enqueue_work(comp_key);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::*;
 

	
 
use super::communication::Message;
 
use super::component::{CompCtx, CompPDL};
 
use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer};
 

	
 
// -----------------------------------------------------------------------------
 
// Component
 
// -----------------------------------------------------------------------------
 

	
 
/// Key to a component. Type system somewhat ensures that there can only be one
 
/// of these. Only with a key one may retrieve privately-accessible memory for
 
/// a component. Practically just a generational index, like `CompId` is.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct CompKey(u32);
 
pub(crate) struct CompKey(pub u32);
 

	
 
impl CompKey {
 
    pub(crate) fn downgrade(&self) -> CompId {
 
        return CompId(self.0);
 
    }
 
}
 

	
 
/// Generational ID of a component
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct CompId(u32);
 
pub struct CompId(pub u32);
 

	
 
impl CompId {
 
    pub(crate) fn new_invalid() -> CompId {
 
        return CompId(u32::MAX);
 
    }
 

	
 
    /// Upgrade component ID to component key. Unsafe because the caller needs
 
    /// to make sure that only one component key can exist at a time (to ensure
 
    /// a component can only be scheduled/executed by one thread).
 
    pub(crate) unsafe fn upgrade(&self) -> CompKey {
 
        return CompKey(self.0);
 
    }
 
}
 

	
 
/// Private fields of a component, may only be modified by a single thread at
 
/// a time.
 
pub(crate) struct RuntimeComp {
 
    pub public: CompPublic,
 
    pub code: CompPDL,
 
    pub ctx: CompCtx,
 
    pub inbox: QueueDynMpsc<Message>
 
}
 

	
 
/// Should contain everything that is accessible in a thread-safe manner
 
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
 
//  "foolproof" to lighten the mental burden of using the `num_handles`
 
//  variable.
 
pub(crate) struct CompPublic {
 
    pub sleeping: AtomicBool,
 
    pub num_handles: AtomicU32, // manually modified (!)
 
    pub inbox: QueueDynProducer<Message>,
 
}
 

	
 
/// Handle to public part of a component. Would be nice if we could
 
/// automagically manage the `num_handles` counter. But when it reaches zero we
 
/// need to manually remove the handle from the runtime. So we just have debug
 
/// code to make sure this actually happens.
 
pub(crate) struct CompHandle {
 
    target: *const CompPublic,
 
    #[cfg(debug_assertions)] decremented: bool,
 
}
 

	
 
impl CompHandle {
 
    fn new(public: &CompPublic) -> CompHandle {
 
        let handle = CompHandle{
 
            target: public,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
        handle.increment_users();
 
        return handle;
 
    }
 

	
 
    fn increment_users(&self) {
 
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
 
    }
 

	
 
    /// Returns true if the component should be destroyed
 
    pub(crate) fn decrement_users(&mut self) -> bool {
 
        debug_assert!(!self.decremented, "illegal to 'decrement_users' twice");
 
        dbg_code!(self.decremented = true);
 
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        return old_count == 1;
 
    }
 
}
 

	
 
impl Clone for CompHandle {
 
    fn clone(&self) -> Self {
 
        debug_assert!(!self.decremented, "illegal to clone after 'decrement_users'");
 
        self.increment_users();
 
        return CompHandle{
 
            target: self.target,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
    }
 
}
 

	
 
impl std::ops::Deref for CompHandle {
 
    type Target = CompPublic;
 

	
 
    fn deref(&self) -> &Self::Target {
 
        return unsafe{ &*self.target };
 
    }
 
}
 

	
 
impl Drop for CompHandle {
 
    fn drop(&mut self) {
 
        debug_assert!(self.decremented, "need call to 'decrement_users' before dropping");
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 

	
 
pub type RuntimeHandle = Arc<Runtime>;
 

	
 
/// Memory that is maintained by "the runtime". In practice it is maintained by
 
/// multiple schedulers, and this serves as the common interface to that memory.
 
pub struct Runtime {
 
    pub protocol: ProtocolDescription,
 
    components: ComponentStore<RuntimeComp>,
 
    work_queue: Mutex<VecDeque<CompKey>>,
 
    work_condvar: Condvar,
 
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
 
}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
 
        assert!(num_threads > 0, "need a thread to perform work");
 
        return Runtime{
 
            protocol: protocol_description,
 
            components: ComponentStore::new(128),
 
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
 
            work_condvar: Condvar::new(),
 
            active_elements: AtomicU32::new(0),
 
        };
 
    }
 

	
 
    // Scheduling and retrieving work
 

	
 
    pub(crate) fn take_work(&self) -> Option<CompKey> {
 
        let mut lock = self.work_queue.lock().unwrap();
 
        while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 {
 
            lock = self.work_condvar.wait(lock).unwrap();
 
        }
 

	
 
        return lock.pop_front();
 
    }
 

	
 
    pub(crate) fn enqueue_work(&self, key: CompKey) {
 
        let mut lock = self.work_queue.lock().unwrap();
 
        lock.push_back(key);
 
        self.work_condvar.notify_one();
 
    }
 

	
 
    // Creating/destroying components
 

	
 
    /// Creates a new component. Note that the public part will be properly
 
    /// initialized, but the private fields (e.g. owned ports, peers, etc.)
 
    /// are not.
 
    pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) {
 
        let inbox_queue = QueueDynMpsc::new(16);
 
        let inbox_producer = inbox_queue.producer();
 
        let comp = RuntimeComp{
 
            public: CompPublic{
 
                sleeping: AtomicBool::new(initially_sleeping),
 
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
 
                inbox: inbox_producer,
 
            },
 
            code: comp,
 
            ctx: CompCtx::default(),
 
            inbox: inbox_queue,
 
        };
 

	
 
        let index = self.components.create(comp);
 

	
 
        // TODO: just do a reserve_index followed by a consume_index or something
 
        let component = self.components.get_mut(index);
 
        component.ctx.id = CompId(index);
 

	
 
        return (CompKey(index), component);
 
    }
 

	
 
    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
 
        let component = self.components.get_mut(key.0);
 
        return component;
 
    }
 

	
 
    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
 
        let component = self.components.get(id.0);
 
        return CompHandle::new(&component.public);
 
    }
 

	
 
    pub(crate) fn destroy_component(&self, key: CompKey) {
 
        debug_assert_eq!(self.get_component(key).public.num_handles.load(Ordering::Acquire), 0);
 
        self.components.destroy(key.0);
 
    }
 

	
 
    // Interacting with components
 
}
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::atomic::Ordering;
 

	
 
use super::component::*;
 
use super::runtime::*;
 
use super::communication::*;
 

	
 
/// Data associated with a scheduler thread
 
pub(crate) struct Scheduler {
 
    runtime: RuntimeHandle,
 
    scheduler_id: u32,
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub runtime: &'a Runtime,
 
}
 

	
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a Runtime) -> Self {
 
        return Self {
 
            runtime,
 
        }
 
    }
 
}
 

	
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: RuntimeHandle, scheduler_id: u32) -> Self {
 
        return Scheduler{ runtime, scheduler_id }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime);
 

	
 
        'run_loop: loop {
 
            // Wait until we have something to do (or need to quit)
 
            let comp_key = self.runtime.take_work();
 
            if comp_key.is_none() {
 
                break 'run_loop;
 
            }
 

	
 
            let comp_key = comp_key.unwrap();
 
            let comp_id = comp_key.downgrade();
 
            let component = self.runtime.get_component(comp_key);
 

	
 
            // Run the component until it no longer indicates that it needs to
 
            // be re-executed immediately.
 
            let mut new_scheduling = CompScheduling::Immediate;
 
            while let CompScheduling::Immediate = new_scheduling {
 
                new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error");
 
            }
 

	
 
            // Handle the new scheduling
 
            match new_scheduling {
 
                CompScheduling::Immediate => unreachable!(),
 
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
 
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
 
                CompScheduling::Exit => { self.mark_component_as_exiting(comp_key, component); }
 
                CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); }
 
            }
 
        }
 
    }
 

	
 
    // local utilities
 

	
 
    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
 
        debug_assert_eq!(key.downgrade(), component.private.ctx.id); // make sure component matches key
 
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
 

	
 
        component.public.sleeping.store(true, Ordering::Release);
 
        todo!("check for messages");
 
        if component.inbox.can_pop() {
 
            let should_reschedule = component.public.sleeping
 
                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
 
                .is_ok();
 

	
 
            if should_reschedule {
 
                self.runtime.enqueue_work(key);
 
            }
 
        }
 
    }
 

	
 
    fn mark_component_as_exiting(&self, key: CompKey, component: &mut RuntimeComp) {
 
        todo!("do something")
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
        for port_index in 0..component.ctx.ports.len() {
 
            let port_info = &component.ctx.ports[port_index];
 
            if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.id, comp_ctx) {
 
                let peer_info = component.ctx.get_peer(peer_id);
 
                peer_info.handle.inbox.push(Message::Control(message));
 

	
 
                wake_up_if_sleeping(sched_ctx, peer_id, &peer_info.handle);
 
            }
 
        }
 

	
 
        let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        let new_count = old_count - 1;
 
        if new_count == 0 {
 
            let comp_key = unsafe{ component.ctx.id.upgrade() };
 
            sched_ctx.runtime.destroy_component(comp_key);
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
use std::sync::atomic::{AtomicU32, Ordering};
 

	
 
use crate::collections::RawArray;
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
/// Multiple-producer single-consumer queue. Generally used in the publicly
 
/// accessible fields of a component. The holder of this struct should be the
 
/// consumer. To retrieve access to the producer-side: call `producer()`.
 
///
 
/// This is a queue that will resize (indefinitely) if it becomes full, and will
 
/// not shrink. So probably a temporary thing.
 
///
 
/// In debug mode we'll make sure that there are no producers when the queue is
 
/// dropped. We don't do this in release mode because the runtime is written
 
/// such that components always remain alive (hence, this queue will remain
 
/// accessible) while there are references to it.
 
// NOTE: Addendum to the above remark, not true if the thread owning the
 
// consumer sides crashes, unwinds, and drops the `Box` with it. Question is: do
 
// I want to take that into account?
 
pub struct QueueDynMpsc<T> {
 
    // Entire contents are boxed up such that we can create producers that have
 
    // a pointer to the contents.
 
    inner: Box<Shared<T>>
 
}
 

	
 
// One may move around the queue between threads, as long as there is only one
 
// instance of it.
 
unsafe impl<T> Send for QueueDynMpsc<T>{}
 

	
 
/// Shared data between queue consumer and the queue producers
 
struct Shared<T> {
 
    data: UnfairSeLock<Inner<T>>,
 
    read_head: AtomicU32,
 
    write_head: AtomicU32,
 
    limit_head: AtomicU32,
 
    #[cfg(debug_assertions)] dbg: AtomicU32,
 
}
 

	
 
/// Locked by an exclusive/shared lock. Exclusive lock is obtained when the
 
/// inner data array is resized.
 
struct Inner<T> {
 
    data: RawArray<T>,
 
    compare_mask: u32,
 
    read_mask: u32,
 
}
 

	
 
type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 

	
 
impl<T> QueueDynMpsc<T> {
 
    /// Constructs a new MPSC queue. Note that the initial capacity is always
 
    /// increased to the next power of 2 (if it isn't already).
 
    pub fn new(initial_capacity: usize) -> Self {
 
        let initial_capacity = initial_capacity.next_power_of_two();
 
        assert_correct_capacity(initial_capacity);
 

	
 
        let mut data = RawArray::new();
 
        data.resize(initial_capacity);
 

	
 
        let initial_capacity = initial_capacity as u32;
 

	
 
        return Self{
 
            inner: Box::new(Shared {
 
                data: UnfairSeLock::new(Inner{
 
                    data,
 
                    compare_mask: (2 * initial_capacity) - 1,
 
                    read_mask: initial_capacity - 1,
 
                }),
 
                read_head: AtomicU32::new(0),
 
                write_head: AtomicU32::new(initial_capacity),
 
                limit_head: AtomicU32::new(initial_capacity),
 
                #[cfg(debug_assertions)] dbg: AtomicU32::new(0),
 
            }),
 
        };
 
    }
 

	
 
    #[inline]
 
    pub fn producer(&self) -> QueueDynProducer<T> {
 
        return QueueDynProducer::new(self);
 
    }
 

	
 
    /// Return `true` if a subsequent call to `pop` will return a value. Note
 
    /// that if it returns `false`, there *might* also be a value returned by
 
    /// `pop`.
 
    pub fn can_pop(&mut self) -> bool {
 
        let data_lock = self.inner.data.lock_shared();
 
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
        let buf_size = data_lock.data.cap() as u32;
 
        return (cur_read + buf_size) & data_lock.compare_mask != cur_limit;
 
    }
 

	
 
    /// Perform an attempted read from the queue. It might be that some producer
 
    /// is putting something in the queue while this function is executing, and
 
    /// we don't get the consume it.
 
    pub fn pop(&mut self) -> Option<T> {
 
        let data_lock = self.inner.data.lock_shared();
 
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
        let buf_size = data_lock.data.cap() as u32;
 

	
 
        if (cur_read + buf_size) & data_lock.compare_mask != cur_limit {
 
            // Make a bitwise copy of the value and return it. The receiver is
 
            // responsible for dropping it.
 
            unsafe {
 
                let source = data_lock.data.get((cur_read & data_lock.read_mask) as usize);
 
                let value = std::ptr::read(source);
 
                // We can perform a store since we're the only ones modifying
 
                // the atomic.
 
                self.inner.read_head.store((cur_read + 1) & data_lock.compare_mask, Ordering::Release);
 
                return Some(value);
 
            }
 
        } else {
 
            return None;
 
        }
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynMpsc<T> {
 
    fn drop(&mut self) {
 
        // There should be no more `QueueDynProducer` pointers to this queue
 
        dbg_code!(assert_eq!(self.inner.dbg.load(Ordering::Acquire), 0));
 
        // And so the limit head should be equal to the write head
 
        let data_lock = self.inner.data.lock_shared();
 
        let write_index = self.inner.write_head.load(Ordering::Acquire);
 
        assert_eq!(self.inner.limit_head.load(Ordering::Acquire), write_index);
 

	
 
        // Every item that has not yet been taken out of the queue needs to
 
        // have its destructor called. We immediately apply the
 
        // increment-by-size trick and wait until we've hit the write head.
 
        let mut read_index = self.inner.read_head.load(Ordering::Acquire);
 
        read_index += data_lock.data.cap() as u32;
 
        while read_index & data_lock.compare_mask != write_index {
 
            unsafe {
 
                let target = data_lock.data.get((read_index & data_lock.read_mask) as usize);
 
                std::ptr::drop_in_place(target);
 
            }
 
            read_index += 1;
 
        }
 
    }
 
}
 

	
 
pub struct QueueDynProducer<T> {
 
    queue: *const Shared<T>,
 
}
 

	
 
impl<T> QueueDynProducer<T> {
 
    fn new(consumer: &QueueDynMpsc<T>) -> Self {
 
        dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
 
        unsafe {
 
            // If you only knew the power of the dark side! Obi-Wan never told
 
            // you what happened to your father!
 
            let queue: *const _ = std::mem::transmute(consumer.inner.as_ref());
 
            return Self{ queue };
 
        }
 
    }
 

	
 
    pub fn push(&self, value: T) {
 
        let queue = unsafe{ &*self.queue };
 

	
 
        let mut data_lock = queue.data.lock_shared();
 
        let mut write_index = queue.write_head.load(Ordering::Acquire);
 

	
 
        'attempt_write: loop {
 
            let read_index = queue.read_head.load(Ordering::Acquire);
 

	
 
            if write_index == read_index { // both stored as [0, 2*capacity), so we can check equality without bitwise ANDing
 
                // Need to resize, try loading read/write index afterwards
 
                let expected_capacity = data_lock.data.cap();
 
                data_lock = self.resize(data_lock, expected_capacity);
 
                write_index = queue.write_head.load(Ordering::Acquire);
 
                continue 'attempt_write;
 
            }
 

	
 
            // If here try to advance write index
 
            let new_write_index = (write_index + 1) & data_lock.compare_mask;
 
            if let Err(actual_write_index) = queue.write_head.compare_exchange(
 
                write_index, new_write_index, Ordering::AcqRel, Ordering::Acquire
 
            ) {
 
                write_index = actual_write_index;
 
                continue 'attempt_write;
 
            }
 

	
 
            // We're now allowed to write at `write_index`
 
            unsafe {
 
                std::ptr::write(data_lock.data.get((write_index & data_lock.read_mask) as usize), value);
 
            }
 

	
 
            // Update limit head to let reader obtain the written value in a
 
            // CAS-loop
 
            while let Err(_) = queue.limit_head.compare_exchange_weak(
 
                write_index, new_write_index,
 
                Ordering::AcqRel, Ordering::Relaxed
 
            ) {}
 

	
 
            return;
 
        }
 
    }
 

	
 
    fn resize(&self, shared_lock: InnerRead<T>, expected_capacity: usize) -> InnerRead<T> {
 
        drop(shared_lock);
 
        let queue = unsafe{ &*self.queue };
 

	
 
        {
 
            let mut exclusive_lock = queue.data.lock_exclusive();
 

	
 
            // We hold the exclusive lock, but someone else might have done the resizing, and so:
 
            if exclusive_lock.data.cap() == expected_capacity {
 
                let old_capacity = expected_capacity;
 
                let new_capacity = 2 * old_capacity;
 
                assert_correct_capacity(new_capacity);
 

	
 
                // Resize by a factor of two, and make the two halves identical.
 
                exclusive_lock.data.resize(new_capacity);
 
                for idx in old_capacity..new_capacity {
 
                    unsafe {
 
                        let target = exclusive_lock.data.get(idx);
 
                        let source = exclusive_lock.data.get(idx - old_capacity);
 
                        std::ptr::write(target, std::ptr::read(source));
 
                    }
 
                }
 

	
 
                // Modify all atomics to reflect that we just resized the
 
                // underlying buffer. We have that everything between the read
 
                // index and the write index is readable. And the following
 
                // preserves that property, while increasing the size from
 
                // `old_capacity` to `new_capacity`.
 
                // Note that the addition of `new_capacity` to `write_head` is
 
                // to ensure the ringbuffer can distinguish the cases where the
 
                // ringbuffer is full, and when it is empty.
 
                let mut read_index = queue.read_head.load(Ordering::Acquire);
 
                let mut write_index = queue.write_head.load(Ordering::Acquire);
 
                debug_assert_eq!(write_index, queue.limit_head.load(Ordering::Acquire)); // since we have exclusive access
 

	
 
                let is_full = read_index == write_index; // before bitwise AND-mask
 
                read_index &= exclusive_lock.read_mask;
 
                write_index &= exclusive_lock.read_mask;
 

	
 
                let new_capacity = new_capacity as u32;
 
                if read_index <= write_index && !is_full { // which means: (read index < write_index) || buffer_is_empty
 
                    // The readable elements do not wrap around the ringbuffer
 
                    write_index += new_capacity;
 
                } else {
 
                    // The readable elements do wrap around the ringbuffer
 
                    write_index += old_capacity as u32;
 
                    write_index += new_capacity;
 
                }
 

	
 
                queue.read_head.store(read_index, Ordering::Release);
 
                queue.limit_head.store(write_index, Ordering::Release);
 
                queue.write_head.store(write_index, Ordering::Release);
 

	
 
                // Update the masks
 
                exclusive_lock.read_mask = new_capacity - 1;
 
                exclusive_lock.compare_mask = (2 * new_capacity) - 1;
 
            }
 
        }
 

	
 
        // Reacquire shared lock
 
        return queue.data.lock_shared();
 
    }
 
}
 

	
 
impl<T> Drop for QueueDynProducer<T> {
 
    fn drop(&mut self) {
 
        dbg_code!(unsafe{ (*self.queue).dbg.fetch_sub(1, Ordering::AcqRel) });
 
    }
 
}
 

	
 
// producer end is `Send`, because in debug mode we make sure that there are no
 
// more producers when the queue is destroyed. But is not sync, because that
 
// would circumvent our atomic counter shenanigans. Although, now that I think
 
// about it, we're rather likely to just drop a single "producer" into the
 
// public part of a component.
 
unsafe impl<T> Send for QueueDynProducer<T>{}
 

	
 
#[inline]
 
fn assert_correct_capacity(capacity: usize) {
 
    assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2);
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
0 comments (0 inline, 0 general)