Changeset - 0de39654770f
[Not reviewed]
0 7 0
MH - 3 years ago 2022-01-16 12:30:16
contact@maxhenger.nl
WIP: Closing ports
7 files changed with 147 insertions and 40 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -75,6 +75,7 @@ pub enum ControlMessageContent {
 
    Ack,
 
    BlockPort(PortId),
 
    UnblockPort(PortId),
 
    ClosePort(PortId),
 
    PortPeerChangedBlock(PortId),
 
    PortPeerChangedUnblock(PortId, CompId),
 
}
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -10,6 +10,7 @@ use crate::runtime2::runtime::*;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::*;
 
use super::control_layer::*;
 
use super::consensus::Consensus;
 

	
 
@@ -95,7 +96,7 @@ impl CompCtx {
 
        return None;
 
    }
 

	
 
    fn get_peer(&self, peer_id: CompId) -> &Peer {
 
    pub(crate) fn get_peer(&self, peer_id: CompId) -> &Peer {
 
        let index = self.get_peer_index(peer_id).unwrap();
 
        return &self.peers[index];
 
    }
 
@@ -216,7 +217,7 @@ impl CompPDL {
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: u32,
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
@@ -410,7 +411,7 @@ impl CompPDL {
 
            ControlMessageContent::Ack => {
 
                let mut to_ack = message.id;
 
                loop {
 
                    let action = self.control.handle_ack(to_ack, comp_ctx);
 
                    let action = self.control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
                    match action {
 
                        AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => {
 
                            // FIX @NoDirectHandle
 
@@ -441,6 +442,27 @@ impl CompPDL {
 
                    port_info.state = PortState::Blocked;
 
                }
 
            },
 
            ControlMessageContent::ClosePort(port_id) => {
 
                // Request to close the port. We immediately comply and remove
 
                // the component handle as well
 
                let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
                let port_info = &mut comp_ctx.ports[port_index];
 
                port_info.state = PortState::Closed;
 

	
 
                let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap();
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports -= 1;
 
                if peer_info.num_associated_ports == 0 {
 
                    // TODO: @Refactor clean up all these uses of "num_associated_ports"
 
                    let should_remove = peer_info.handle.decrement_users();
 
                    if should_remove {
 
                        let comp_key = unsafe{ peer_info.id.upgrade() };
 
                        sched_ctx.runtime.destroy_component(comp_key);
 
                    }
 

	
 
                    comp_ctx.peers.remove(peer_index);
 
                }
 
            }
 
            ControlMessageContent::UnblockPort(port_id) => {
 
                // We were previously blocked (or already closed)
 
                let port_info = comp_ctx.get_port(port_id);
 
@@ -665,20 +687,4 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 

	
 
/// If the component is sleeping, then that flag will be atomically set to
 
/// false. If we're the ones that made that happen then we add it to the work
 
/// queue.
 
fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) {
 
    use std::sync::atomic::Ordering;
 

	
 
    let should_wake_up = handle.sleeping
 
        .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
 
        .is_ok();
 

	
 
    if should_wake_up {
 
        let comp_key = unsafe{ comp_id.upgrade() };
 
        sched_ctx.runtime.enqueue_work(comp_key);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -21,8 +21,9 @@ struct ControlEntry {
 

	
 
enum ControlContent {
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(ContentScheduleComponent),
 
    BlockedPort(ContentBlockedPort),
 
    ScheduleComponent(CompId),
 
    BlockedPort(PortId),
 
    ClosedPort(PortId),
 
}
 

	
 
struct ContentPeerChange {
 
@@ -33,14 +34,6 @@ struct ContentPeerChange {
 
    schedule_entry_id: ControlId,
 
}
 

	
 
struct ContentScheduleComponent {
 
    to_schedule: CompId,
 
}
 

	
 
struct ContentBlockedPort {
 
    blocked_port: PortId,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessageAndAck(CompId, ControlMessage, ControlId),
 
@@ -76,7 +69,7 @@ impl ControlLayer {
 
        return None;
 
    }
 

	
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, comp_ctx: &CompCtx) -> AckAction {
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction {
 
        let entry_index = self.get_entry_index(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 
@@ -105,7 +98,7 @@ impl ControlLayer {
 
                let to_ack = content.schedule_entry_id;
 

	
 
                self.entries.remove(entry_index);
 
                self.handle_ack(to_ack, comp_ctx);
 
                self.handle_ack(to_ack, sched_ctx, comp_ctx);
 

	
 
                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
 
            },
 
@@ -115,6 +108,28 @@ impl ControlLayer {
 
                return AckAction::ScheduleComponent(content.to_schedule);
 
            },
 
            ControlContent::BlockedPort(_) => unreachable!(),
 
            ControlContent::ClosedPort(port_id) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_index = comp_ctx.get_port_index(*port_id).unwrap();
 
                debug_assert_eq!(comp_ctx.ports[port_index].state, PortState::Blocked);
 
                let peer_id = comp_ctx.ports[port_index].peer_comp_id;
 
                let peer_index = comp_ctx.get_peer_index(peer_id).unwrap();
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports -= 1;
 

	
 
                if peer_info.num_associated_ports == 0 {
 
                    let should_remove = peer_info.handle.decrement_users();
 
                    if should_remove {
 
                        let comp_key = unsafe{ peer_info.id.upgrade() };
 
                        sched_ctx.runtime.destroy_component(comp_key);
 
                    }
 

	
 
                    comp_ctx.peers.remove(peer_index);
 
                }
 

	
 
                return AckAction::None;
 
            }
 
        }
 
    }
 

	
 
@@ -181,9 +196,38 @@ impl ControlLayer {
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking and unblocking ports
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> {
 
        let port = comp_ctx.get_port_mut(port_id);
 
        debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked);
 

	
 
        port.state = PortState::Closed;
 

	
 
        if port.peer_comp_id == comp_ctx.id {
 
            // We own the other end of the channel as well
 
            return None;
 
        }
 

	
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::ClosedPort(port_id),
 
        });
 

	
 
        return Some((
 
            port.peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port.peer_id),
 
                content: ControlMessageContent::ClosePort(port.peer_id),
 
            }
 
        ));
 
    }
 

	
 
    pub(crate) fn mark_port_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // TODO: Feels like this shouldn't be an entry. Hence this class should
 
        //  be renamed. Lets see where the code ends up being
src/runtime2/component/mod.rs
Show inline comments
 
@@ -3,4 +3,23 @@ mod control_layer;
 
mod consensus;
 

	
 
pub(crate) use component_pdl::{CompPDL, CompCtx, CompScheduling};
 
pub(crate) use control_layer::{ControlId};
 
\ No newline at end of file
 
pub(crate) use control_layer::{ControlId};
 

	
 
use super::scheduler::*;
 
use super::runtime::*;
 

	
 
/// If the component is sleeping, then that flag will be atomically set to
 
/// false. If we're the ones that made that happen then we add it to the work
 
/// queue.
 
pub(crate) fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) {
 
    use std::sync::atomic::Ordering;
 

	
 
    let should_wake_up = handle.sleeping
 
        .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
 
        .is_ok();
 

	
 
    if should_wake_up {
 
        let comp_key = unsafe{ comp_id.upgrade() };
 
        sched_ctx.runtime.enqueue_work(comp_key);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
@@ -16,7 +16,7 @@ use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer};
 
/// of these. Only with a key one may retrieve privately-accessible memory for
 
/// a component. Practically just a generational index, like `CompId` is.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct CompKey(u32);
 
pub(crate) struct CompKey(pub u32);
 

	
 
impl CompKey {
 
    pub(crate) fn downgrade(&self) -> CompId {
 
@@ -26,7 +26,7 @@ impl CompKey {
 

	
 
/// Generational ID of a component
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct CompId(u32);
 
pub struct CompId(pub u32);
 

	
 
impl CompId {
 
    pub(crate) fn new_invalid() -> CompId {
 
@@ -51,6 +51,9 @@ pub(crate) struct RuntimeComp {
 
}
 

	
 
/// Should contain everything that is accessible in a thread-safe manner
 
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
 
//  "foolproof" to lighten the mental burden of using the `num_handles`
 
//  variable.
 
pub(crate) struct CompPublic {
 
    pub sleeping: AtomicBool,
 
    pub num_handles: AtomicU32, // manually modified (!)
 
@@ -199,6 +202,7 @@ impl Runtime {
 
    }
 

	
 
    pub(crate) fn destroy_component(&self, key: CompKey) {
 
        debug_assert_eq!(self.get_component(key).public.num_handles.load(Ordering::Acquire), 0);
 
        self.components.destroy(key.0);
 
    }
 

	
src/runtime2/scheduler.rs
Show inline comments
 
@@ -40,7 +40,6 @@ impl Scheduler {
 
            }
 

	
 
            let comp_key = comp_key.unwrap();
 
            let comp_id = comp_key.downgrade();
 
            let component = self.runtime.get_component(comp_key);
 

	
 
            // Run the component until it no longer indicates that it needs to
 
@@ -55,7 +54,7 @@ impl Scheduler {
 
                CompScheduling::Immediate => unreachable!(),
 
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
 
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
 
                CompScheduling::Exit => { self.mark_component_as_exiting(comp_key, component); }
 
                CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); }
 
            }
 
        }
 
    }
 
@@ -67,10 +66,33 @@ impl Scheduler {
 
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
 

	
 
        component.public.sleeping.store(true, Ordering::Release);
 
        todo!("check for messages");
 
        if component.inbox.can_pop() {
 
            let should_reschedule = component.public.sleeping
 
                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
 
                .is_ok();
 

	
 
            if should_reschedule {
 
                self.runtime.enqueue_work(key);
 
            }
 
        }
 
    }
 

	
 
    fn mark_component_as_exiting(&self, key: CompKey, component: &mut RuntimeComp) {
 
        todo!("do something")
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
        for port_index in 0..component.ctx.ports.len() {
 
            let port_info = &component.ctx.ports[port_index];
 
            if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.id, comp_ctx) {
 
                let peer_info = component.ctx.get_peer(peer_id);
 
                peer_info.handle.inbox.push(Message::Control(message));
 

	
 
                wake_up_if_sleeping(sched_ctx, peer_id, &peer_info.handle);
 
            }
 
        }
 

	
 
        let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        let new_count = old_count - 1;
 
        if new_count == 0 {
 
            let comp_key = unsafe{ component.ctx.id.upgrade() };
 
            sched_ctx.runtime.destroy_component(comp_key);
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
@@ -78,6 +78,17 @@ impl<T> QueueDynMpsc<T> {
 
        return QueueDynProducer::new(self);
 
    }
 

	
 
    /// Return `true` if a subsequent call to `pop` will return a value. Note
 
    /// that if it returns `false`, there *might* also be a value returned by
 
    /// `pop`.
 
    pub fn can_pop(&mut self) -> bool {
 
        let data_lock = self.inner.data.lock_shared();
 
        let cur_read = self.inner.read_head.load(Ordering::Acquire);
 
        let cur_limit = self.inner.limit_head.load(Ordering::Acquire);
 
        let buf_size = data_lock.data.cap() as u32;
 
        return (cur_read + buf_size) & data_lock.compare_mask != cur_limit;
 
    }
 

	
 
    /// Perform an attempted read from the queue. It might be that some producer
 
    /// is putting something in the queue while this function is executing, and
 
    /// we don't get the consume it.
0 comments (0 inline, 0 general)