Changeset - c04f7fea1a62
[Not reviewed]
0 7 0
mh - 3 years ago 2022-01-16 13:40:10
contact@maxhenger.nl
WIP: Fixing compile errors
7 files changed with 54 insertions and 52 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use super::runtime::*;
 
use super::component::*;
 

	
 
#[derive(Copy, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct PortId(pub u32);
 

	
 

	
 
impl PortId {
 
    /// This value is not significant, it is chosen to make debugging easier: a
 
    /// very large port number is more likely to shine a light on bugs.
 
    pub fn new_invalid() -> Self {
 
        return Self(u32::MAX);
 
    }
 
}
 

	
 
pub struct Peer {
 
@@ -55,25 +55,25 @@ pub struct DataMessage {
 
pub struct MessageSyncHeader {
 
    pub sync_round: u32,
 
}
 

	
 
pub struct MessageDataHeader {
 
    pub expected_mapping: Vec<(PortId, u32)>,
 
    pub new_mapping: u32,
 
    pub source_port: PortId,
 
    pub target_port: PortId,
 
}
 

	
 
pub struct ControlMessage {
 
    pub id: ControlId,
 
    pub(crate) id: ControlId,
 
    pub sender_comp_id: CompId,
 
    pub target_port_id: Option<PortId>,
 
    pub content: ControlMessageContent,
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub enum ControlMessageContent {
 
    Ack,
 
    BlockPort(PortId),
 
    UnblockPort(PortId),
 
    ClosePort(PortId),
 
    PortPeerChangedBlock(PortId),
src/runtime2/component/component_pdl.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::store::QueueDynMpsc;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::*;
 
use super::control_layer::*;
 
use super::consensus::Consensus;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
@@ -200,81 +199,83 @@ pub(crate) struct CompPDL {
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: Vec<Option<DataMessage>>,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt) -> Self {
 
    pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self {
 
        let mut inbox_main = Vec::new();
 
        inbox_main.reserve(num_ports);
 
        for _ in 0..num_ports {
 
            inbox_main.push(None);
 
        }
 

	
 
        return Self{
 
            mode: Mode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main: Vec::new(),
 
            inbox_main,
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn handle_setup(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        self.inbox.resize(comp_ctx.ports.len(), None);
 
    }
 

	
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        if let Some(new_target) = self.control.should_reroute(&message) {
 
            let target = sched_ctx.runtime.get_component_public(new_target);
 
            target.inbox.push(message);
 

	
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_Ctx, message);
 
                self.handle_incoming_control_message(sched_ctx, comp_ctx, message);
 
            },
 
        }
 
    }
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 

	
 
                let port_id = transform_port_id(port_id);
 
                let port_id = port_id_from_eval(port_id);
 
                if let Some(message) = comp_ctx.take_message(port_id) {
 
                    // We can immediately receive and continue
 
                    debug_assert!(self.exec_ctx.stmt.is_none());
 
                    self.exec_ctx.stmt = ExecStmt::PerformedGet(message);
 
                    return Ok(CompScheduling::Immediate);
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
                    self.mode_port = port_id;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
@@ -301,25 +302,25 @@ impl CompPDL {
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 

	
 
                let mut ports = Vec::new(); // TODO: Optimize
 
                let protocol = &sched_ctx.runtime.protocol;
 
                find_ports_in_value_group(&arguments, &mut ports);
 
                let prompt = Prompt::new(
 
                    &protocol.types, &protocol.heap,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &workspace_ports);
 
                self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &ports);
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                return Ok(CompScheduling::Immediate);
 
            }
 
@@ -370,41 +371,42 @@ impl CompPDL {
 

	
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        // Check if we can insert it directly into the storage associated with
 
        // the port
 
        let target_port_id = message.data_header.target_port;
 
        let port_index = comp_ctx.get_port_index(target_port_id).unwrap();
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 

	
 
            // After direct insertion, check if this component's execution is 
 
            // blocked on receiving a message on that port
 
            debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly
 
            if self.mode == Mode::BlockedGet && self.mode_port == message.data_header.target_port {
 
            if self.mode == Mode::BlockedGet && self.mode_port == target_port_id {
 
                // We were indeed blocked
 
                self.mode = Mode::Sync;
 
                self.mode_port = PortId::new_invalid();
 
            }
 
            
 
            return;
 
        }
 

	
 
        // The direct inbox is full, so the port will become (or was already) blocked
 
        let port_info = &mut comp_ctx.ports[port_index];
 
        debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 
        let _peer_comp_id = port_info.peer_comp_id;
 

	
 
        if port_info.state == PortState::Open {
 
            let (target_comp_id, block_message) =
 
                self.control.mark_port_blocked(target_port_id, comp_ctx);
 
            debug_assert_eq!(port_info.peer_comp_id, target_comp_id);
 
            debug_assert_eq!(_peer_comp_id, target_comp_id);
 

	
 
            let peer = comp_ctx.get_peer(target_comp_id);
 
            peer.handle.inbox.push(Message::Control(block_message));
 
            wake_up_if_sleeping(sched_ctx, target_comp_id, &peer.handle);
 
        }
 

	
 
        // But we still need to remember the message, so:
 
        self.inbox_backup.push(message);
 
    }
 

	
 
    fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) {
 
        match message.content {
 
@@ -438,27 +440,28 @@ impl CompPDL {
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                if port_info.state != PortState::Closed {
 
                    debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages
 
                    port_info.state = PortState::Blocked;
 
                }
 
            },
 
            ControlMessageContent::ClosePort(port_id) => {
 
                // Request to close the port. We immediately comply and remove
 
                // the component handle as well
 
                let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
                let port_info = &mut comp_ctx.ports[port_index];
 
                let peer_comp_id = port_info.peer_comp_id;
 
                port_info.state = PortState::Closed;
 

	
 
                let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap();
 
                let peer_index = comp_ctx.get_peer_index(peer_comp_id).unwrap();
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports -= 1;
 
                if peer_info.num_associated_ports == 0 {
 
                    // TODO: @Refactor clean up all these uses of "num_associated_ports"
 
                    let should_remove = peer_info.handle.decrement_users();
 
                    if should_remove {
 
                        let comp_key = unsafe{ peer_info.id.upgrade() };
 
                        sched_ctx.runtime.destroy_component(comp_key);
 
                    }
 

	
 
                    comp_ctx.peers.remove(peer_index);
 
                }
 
@@ -468,33 +471,33 @@ impl CompPDL {
 
                let port_info = comp_ctx.get_port(port_id);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed);
 
                if port_info.state == PortState::Blocked {
 
                    self.unblock_port(sched_ctx, comp_ctx, port_id);
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
                // The peer of our port has just changed. So we are asked to
 
                // temporarily block the port (while our original recipient is
 
                // potentially rerouting some of the in-flight messages) and
 
                // Ack. Then we wait for the `unblock` call.
 
                debug_assert_eq!(message.target_port_id, port_id);
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 
                if port_info.state == PortState::Open {
 
                    port_info.state = PortState::Blocked;
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => {
 
                debug_assert_eq!(message.target_port_id, port_id);
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert!(port_info.state == PortState::Blocked);
 
                port_info.peer_comp_id = new_comp_id;
 
                self.unblock_port(sched_ctx, comp_ctx, port_id);
 
            }
 
        }
 
    }
 

	
 
    fn unblock_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) {
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        port_info.state = PortState::Open;
 
@@ -503,53 +506,54 @@ impl CompPDL {
 
            // We were blocked on the port that just became unblocked, so
 
            // send the message.
 
            let mut replacement = ValueGroup::default();
 
            std::mem::swap(&mut replacement, &mut self.mode_value);
 
            self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement);
 

	
 
            self.mode = Mode::Sync;
 
            self.mode_port = PortId::new_invalid();
 
        }
 
    }
 

	
 
    fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) {
 
        let component = CompPDL::new(prompt);
 
        let component = CompPDL::new(prompt, ports.len());
 
        let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true);
 
        let created_ctx = &mut component.ctx;
 

	
 
        let mut has_reroute_entry = false;
 
        let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 

	
 
        for port_id in ports.iter().copied() {
 
            // Create temporary reroute entry if the peer is another component
 
            let port_info = creator_ctx.get_port(port_id);
 
            debug_assert_ne!(port_info.state, PortState::Blocked);
 
            if port_info.peer_comp_id == creator_ctx.id {
 
                // We own the peer port. So retrieve it and modify the peer directly
 
                let port_info = creator_ctx.get_port_mut(port_info.peer_id);
 
                let peer_port_id = port_info.peer_id;
 
                let port_info = creator_ctx.get_port_mut(peer_port_id);
 
                port_info.peer_comp_id = created_ctx.id;
 
            } else {
 
                // We don't own the port, so send the appropriate messages and
 
                // notify the control layer
 
                has_reroute_entry = true;
 
                let message = self.control.add_reroute_entry(
 
                    creator_ctx.id, port_info.peer_id, port_info.peer_comp_id,
 
                    port_info.self_id, created_ctx.id, schedule_entry_id
 
                );
 
                let peer_info = creator_ctx.get_peer(port_info.peer_comp_id);
 
                peer_info.handle.inbox.push(message);
 
            }
 

	
 
            // Transfer port and create temporary reroute entry
 
            let (mut port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id);
 
            let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id);
 
            if port_info.state == PortState::Blocked {
 
                todo!("Think about this when you're not tired!");
 
            }
 
            Self::add_port_to_component(sched_ctx, created_ctx, port_info);
 

	
 
            // Maybe remove peer from the creator
 
            if let Some(mut peer_info) = peer_info {
 
                let remove_from_runtime = peer_info.handle.decrement_users();
 
                if remove_from_runtime {
 
                    let removed_comp_key = unsafe{ peer_info.id.upgrade() };
 
                    sched_ctx.runtime.destroy_component(removed_comp_key);
 
                }
 
@@ -559,26 +563,24 @@ impl CompPDL {
 
        if !has_reroute_entry {
 
            // We can schedule the component immediately
 
            self.control.remove_schedule_entry(schedule_entry_id);
 
            sched_ctx.runtime.enqueue_work(comp_key);
 
        } // else: wait for the `Ack`s, they will trigger the scheduling of the component
 
    }
 

	
 
    /// Removes a port from a component. Also decrements the port counter in
 
    /// the peer component's entry. If that hits 0 then it will be removed and
 
    /// returned. If returned then the caller is responsible for decrementing
 
    /// the atomic counters of the peer component's handle.
 
    fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option<Peer>) {
 
        use std::sync::atomic::Ordering;
 

	
 
        let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
        let port_info = comp_ctx.ports.remove(port_index);
 

	
 
        // If the component owns the peer, then we don't have to decrement the
 
        // number of peers (because we don't have an entry for ourselves)
 
        if port_info.peer_comp_id == comp_ctx.id {
 
            return (port_info, None);
 
        }
 

	
 
        let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap();
 
        let peer_info = &mut comp_ctx.peers[peer_index];
 
        peer_info.num_associated_ports -= 1;
 
@@ -641,26 +643,24 @@ fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 

	
 
#[inline]
 
fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    use crate::protocol::eval::Value;
 

	
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortId(port_id.id);
 
                for prev_port in ports.iter() {
 
                    if *prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
src/runtime2/component/consensus.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::communication::*;
 

	
 
use super::component_pdl::*;
 

	
 
pub struct PortAnnotation {
 
    id: PortId,
 
    mapping: Option<u32>,
 
}
 

	
 
impl PortAnnotation {
 
    fn new(id: PortId) -> Self {
 
        return Self{ id, mapping: None }
 
@@ -43,25 +43,25 @@ impl Consensus {
 

	
 
        return DataMessage{ data_header, sync_header, content };
 
    }
 

	
 
    pub(crate) fn notify_sync_end(&mut self) {
 
        self.round = self.round.wrapping_add(1);
 
        todo!("implement sync end")
 
    }
 

	
 
    pub(crate) fn transfer_ports(&mut self, comp_ctx: &CompCtx) {
 
        let mut needs_setting_ports = false;
 
        if comp_ctx.ports.len() != self.ports.len() {
 
            ports_same = true;
 
            needs_setting_ports = true;
 
        } else {
 
            for idx in 0..comp_ctx.ports.len() {
 
                let comp_port_id = comp_ctx.ports[idx].self_id;
 
                let cons_port_id = self.ports[idx].id;
 
                if comp_port_id != cons_port_id {
 
                    needs_setting_ports = true;
 
                    break;
 
                }
 
            }
 
        }
 

	
 
        if needs_setting_ports {
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -93,28 +93,28 @@ impl ControlLayer {
 
                    content: ControlMessageContent::PortPeerChangedUnblock(
 
                        content.source_port,
 
                        content.new_target_comp
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 

	
 
                self.entries.remove(entry_index);
 
                self.handle_ack(to_ack, sched_ctx, comp_ctx);
 

	
 
                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
 
            },
 
            ControlContent::ScheduleComponent(content) => {
 
            ControlContent::ScheduleComponent(to_schedule) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return AckAction::ScheduleComponent(content.to_schedule);
 
                return AckAction::ScheduleComponent(*to_schedule);
 
            },
 
            ControlContent::BlockedPort(_) => unreachable!(),
 
            ControlContent::ClosedPort(port_id) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_index = comp_ctx.get_port_index(*port_id).unwrap();
 
                debug_assert_eq!(comp_ctx.ports[port_index].state, PortState::Blocked);
 
                let peer_id = comp_ctx.ports[port_index].peer_comp_id;
 
                let peer_index = comp_ctx.get_peer_index(peer_id).unwrap();
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports -= 1;
 

	
 
@@ -134,27 +134,25 @@ impl ControlLayer {
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer (due to component creation)
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::ScheduleComponent(ContentScheduleComponent{
 
                to_schedule: to_schedule_id
 
            }),
 
            content: ControlContent::ScheduleComponent(to_schedule_id),
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Removes a schedule entry. Only used if the caller preemptively called
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
@@ -192,103 +190,107 @@ impl ControlLayer {
 
            sender_comp_id: creator_comp_id,
 
            target_port_id: Some(source_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock(source_port_id)
 
        })
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> {
 
        let port = comp_ctx.get_port_mut(port_id);
 
        let peer_port_id = port.peer_id;
 
        let peer_comp_id = port.peer_comp_id;
 
        debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked);
 

	
 
        port.state = PortState::Closed;
 

	
 
        if port.peer_comp_id == comp_ctx.id {
 
        if peer_comp_id == comp_ctx.id {
 
            // We own the other end of the channel as well
 
            return None;
 
        }
 

	
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::ClosedPort(port_id),
 
        });
 

	
 
        return Some((
 
            port.peer_comp_id,
 
            peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port.peer_id),
 
                content: ControlMessageContent::ClosePort(port.peer_id),
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(peer_port_id),
 
            }
 
        ));
 
    }
 

	
 
    pub(crate) fn mark_port_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // TODO: Feels like this shouldn't be an entry. Hence this class should
 
        //  be renamed. Lets see where the code ends up being
 
        let entry_id = self.take_id();
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        let peer_port_id = port_info.peer_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues
 
        port_info.state = PortState::Blocked;
 

	
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0,
 
            content: ControlContent::BlockedPort(ContentBlockedPort{
 
                blocked_port: port_id,
 
            }),
 
            content: ControlContent::BlockedPort(port_id),
 
        });
 

	
 
        return (
 
            port_info.peer_comp_id,
 
            peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_id),
 
                content: ControlMessageContent::BlockPort(port_info.peer_id),
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::BlockPort(peer_port_id),
 
            }
 
        );
 
    }
 

	
 
    pub(crate) fn mark_port_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // Find the entry that contains the blocking entry for the port
 
        let mut entry_index = usize::MAX;
 
        let mut entry_id = ControlId::MAX;
 
        let mut entry_id = ControlId::new_invalid();
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if let ControlContent::BlockedPort(block_entry) = &entry.content {
 
                if block_entry.blocked_port == port_id {
 
            if let ControlContent::BlockedPort(blocked_port) = &entry.content {
 
                if *blocked_port == port_id {
 
                    entry_index = index;
 
                    entry_id = entry.id;
 
                    break;
 
                }
 
            }
 
        }
 

	
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        let peer_port_id = port_info.peer_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        port_info.state = PortState::Open;
 

	
 
        return (
 
            port_info.peer_comp_id,
 
            peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_id),
 
                content: ControlMessageContent::UnblockPort(port_info.peer_id),
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::UnblockPort(peer_port_id),
 
            }
 
        )
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn take_id(&mut self) -> ControlId {
 
        let id = self.id_counter;
 
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
 
        return id;
src/runtime2/scheduler.rs
Show inline comments
 
@@ -37,59 +37,59 @@ impl Scheduler {
 
            let comp_key = self.runtime.take_work();
 
            if comp_key.is_none() {
 
                break 'run_loop;
 
            }
 

	
 
            let comp_key = comp_key.unwrap();
 
            let component = self.runtime.get_component(comp_key);
 

	
 
            // Run the component until it no longer indicates that it needs to
 
            // be re-executed immediately.
 
            let mut new_scheduling = CompScheduling::Immediate;
 
            while let CompScheduling::Immediate = new_scheduling {
 
                new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error");
 
                new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
 
            }
 

	
 
            // Handle the new scheduling
 
            match new_scheduling {
 
                CompScheduling::Immediate => unreachable!(),
 
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
 
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
 
                CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); }
 
            }
 
        }
 
    }
 

	
 
    // local utilities
 

	
 
    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
 
        debug_assert_eq!(key.downgrade(), component.private.ctx.id); // make sure component matches key
 
        debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
 
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
 

	
 
        component.public.sleeping.store(true, Ordering::Release);
 
        if component.inbox.can_pop() {
 
            let should_reschedule = component.public.sleeping
 
                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
 
                .is_ok();
 

	
 
            if should_reschedule {
 
                self.runtime.enqueue_work(key);
 
            }
 
        }
 
    }
 

	
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
        for port_index in 0..component.ctx.ports.len() {
 
            let port_info = &component.ctx.ports[port_index];
 
            if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.id, comp_ctx) {
 
            if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.self_id, &mut component.ctx) {
 
                let peer_info = component.ctx.get_peer(peer_id);
 
                peer_info.handle.inbox.push(Message::Control(message));
 

	
 
                wake_up_if_sleeping(sched_ctx, peer_id, &peer_info.handle);
 
            }
 
        }
 

	
 
        let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        let new_count = old_count - 1;
 
        if new_count == 0 {
 
            let comp_key = unsafe{ component.ctx.id.upgrade() };
 
            sched_ctx.runtime.destroy_component(comp_key);
src/runtime2/store/component.rs
Show inline comments
 
@@ -28,25 +28,25 @@
 
 * Some obvious flaws with this implementation:
 
 *  - Because of the freelist implementation we will generally allocate all of
 
 *    the data pointers that are available (i.e. if we have a buffer of size
 
 *    64, but we generally use 33 elements, than we'll have 64 elements
 
 *    allocated), which might be wasteful at larger array sizes (which are
 
 *    always powers of two).
 
 *  - A lot of concurrent operations are not necessary: we may move some of the
 
 *    access to the global concurrent datastructure by an initial access to some
 
 *    kind of thread-local datastructure.
 
 */
 

	
 
use std::mem::transmute;
 
use std::alloc::{alloc, dealloc, Layout};
 
use std::alloc::{dealloc, Layout};
 
use std::ptr;
 
use std::sync::atomic::{AtomicUsize, Ordering};
 

	
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
pub struct ComponentStore<T: Sized> {
 
    inner: UnfairSeLock<Inner<T>>,
 
    read_head: AtomicUsize,
 
    write_head: AtomicUsize,
 
    limit_head: AtomicUsize,
 
}
 

	
src/runtime2/store/unfair_se_lock.rs
Show inline comments
 
@@ -69,25 +69,25 @@ impl<T> UnfairSeLock<T> {
 
            if shared & EXCLUSIVE_BIT != 0 {
 
                shared = self.wait_until_not_exclusive(shared);
 
            }
 

	
 
            // We want to set the write bit
 
            let new_shared = shared | EXCLUSIVE_BIT;
 
            match self.shared.compare_exchange(shared, new_shared, Ordering::AcqRel, Ordering::Acquire) {
 
                Ok(_) => {
 
                    // We've acquired the write lock, but we still might have
 
                    // to wait until the reader count is at 0.
 
                    shared = new_shared;
 
                    if shared != EXCLUSIVE_BIT {
 
                        shared = self.wait_until_not_shared(shared);
 
                        self.wait_until_not_shared(shared);
 
                    }
 

	
 
                    return UnfairSeLockExclusiveGuard::new(self);
 
                },
 
                Err(actual_value) => { shared = actual_value; }
 
            }
 
        }
 
    }
 

	
 
    fn wait_until_not_exclusive(&self, mut shared: u32) -> u32 {
 
        // Assume this is only called when the EXCLUSIVE_BIT is set
 
        debug_assert_eq!(shared & EXCLUSIVE_BIT, EXCLUSIVE_BIT);
0 comments (0 inline, 0 general)