Changeset - 5415acc02756
[Not reviewed]
0 7 0
mh - 3 years ago 2022-01-28 18:40:04
contact@maxhenger.nl
WIP: Refactored port/peer management, pending more bugfixes to component shutdown
7 files changed with 173 insertions and 121 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -95,10 +95,11 @@ impl CompCtx {
 
    /// then it will be used to add the peer. Otherwise it will be retrieved
 
    /// from the runtime using its ID.
 
    pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) {
 
        let self_id = self.id;
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_comp_id);
 
        debug_assert!(!port.associated_with_peer);
 
        if !self.requires_peer_reference(port) {
 
        if !Self::requires_peer_reference(port, self_id) {
 
            return;
 
        }
 

	
 
@@ -124,9 +125,10 @@ impl CompCtx {
 

	
 
    /// Removes a peer associated with a port.
 
    pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId) {
 
        let self_id = self.id;
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_id);
 
        if !self.requires_peer_reference(port) {
 
        if !Self::requires_peer_reference(port, self_id) {
 
            return;
 
        }
 

	
 
@@ -135,6 +137,7 @@ impl CompCtx {
 
        let peer_index = self.get_peer_index_by_id(peer_id).unwrap();
 
        let peer = &mut self.peers[peer_index];
 
        peer.num_associated_ports -= 1;
 
        println!(" ****** DEBUG: Removed peer {:?} from {:?}, now at {}", peer.id, self_id, peer.num_associated_ports);
 
        if peer.num_associated_ports == 0 {
 
            let mut peer = self.peers.remove(peer_index);
 
            if let Some(key) = peer.handle.decrement_users() {
 
@@ -173,6 +176,10 @@ impl CompCtx {
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port {
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &self.peers[index];
 
@@ -188,6 +195,11 @@ impl CompCtx {
 
        return self.ports.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator<Item=&mut Port> {
 
        return self.ports.iter_mut();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_peers(&self) -> impl Iterator<Item=&Peer> {
 
        return self.peers.iter();
 
@@ -203,8 +215,8 @@ impl CompCtx {
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    fn requires_peer_reference(&self, port: &Port) -> bool {
 
        return port.state == PortState::Closed;
 
    fn requires_peer_reference(port: &Port, self_id: CompId) -> bool {
 
        return port.state != PortState::Closed && port.peer_comp_id != self_id;
 
    }
 

	
 
    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -6,11 +6,9 @@ use crate::protocol::eval::{
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::*;
 
use super::component_context::*;
 
use super::control_layer::*;
 
use super::consensus::Consensus;
 
@@ -86,23 +84,12 @@ impl RunContext for ExecCtx {
 
pub(crate) enum Mode {
 
    NonSync, // not in sync mode
 
    Sync, // in sync mode, can interact with other components
 
    SyncFail, // something went wrong during sync mode (deadlocked, error, whatever)
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet,
 
    BlockedPut,
 
    StartExit, // temp state
 
    Exit,
 
}
 

	
 
impl Mode {
 
    fn can_run(&self) -> bool {
 
        match self {
 
            Mode::NonSync | Mode::Sync =>
 
                return true,
 
            Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::StartExit | Mode::Exit =>
 
                return false,
 
        }
 
    }
 
    StartExit, // temporary state: if encountered then we start the shutdown process
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
}
 

	
 
pub(crate) struct CompPDL {
 
@@ -176,15 +163,30 @@ impl CompPDL {
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        if self.mode == Mode::StartExit {
 
            self.mode = Mode::Exit;
 
            return Ok(CompScheduling::Exit);
 
        }
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.mode));
 

	
 
        let can_run = self.mode.can_run();
 
        sched_ctx.log(&format!("Running component (mode: {:?}, can run: {})", self.mode, can_run));
 
        if !can_run {
 
            return Ok(CompScheduling::Sleep);
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.mode {
 
            Mode::NonSync | Mode::Sync => {},
 
            Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => {
 
                return Ok(CompScheduling::Sleep);
 
            }
 
            Mode::StartExit => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            Mode::BusyExit => {
 
                if self.control.has_acks_remaining() {
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.mode = Mode::Exit;
 
                    return Ok(CompScheduling::Exit);
 
                }
 
            },
 
            Mode::Exit => {
 
                return Ok(CompScheduling::Exit);
 
            }
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 
@@ -195,8 +197,8 @@ impl CompPDL {
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let scheduling = self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(scheduling.unwrap_or(CompScheduling::Immediate));
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
@@ -216,7 +218,7 @@ impl CompPDL {
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return Ok(CompScheduling::Immediate);
 
                    } else {
 
                        self.mode = Mode::SyncFail;
 
                        todo!("handle sync failure due to message deadlock");
 
                        return Ok(CompScheduling::Sleep);
 
                    }
 
                } else {
 
@@ -240,8 +242,8 @@ impl CompPDL {
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Exit);
 
                self.mode = Mode::StartExit; // next call we'll take care of the exit
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
@@ -290,45 +292,65 @@ impl CompPDL {
 
        self.mode = Mode::Sync;
 
    }
 

	
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Option<CompScheduling> {
 
    /// Handles end of sync. The conclusion to the sync round might arise
 
    /// immediately (and be handled immediately), or might come later through
 
    /// messaging. In any case the component should be scheduled again
 
    /// immediately
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        self.mode = Mode::SyncEnd;
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision)
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) -> Option<CompScheduling> {
 
    /// Handles decision from the consensus round. This will cause a change in
 
    /// the internal `Mode`, such that the next call to `run` can take the
 
    /// appropriate next steps.
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        debug_assert_eq!(self.mode, Mode::SyncEnd);
 
        sched_ctx.log(&format!("Handling sync decision: {:?}", decision));
 
        let is_success = match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return None;
 
                return;
 
            },
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        // If here then we've reached a decision
 
        self.mode = Mode::NonSync;
 
        if is_success {
 
            self.mode = Mode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
            return None;
 
        } else {
 
            todo!("handle this better, show some kind of error");
 
            self.handle_component_exit(sched_ctx, comp_ctx);
 
            self.mode = Mode::Exit;
 
            return Some(CompScheduling::Exit);
 
            self.mode = Mode::StartExit;
 
        }
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component exiting");
 
        debug_assert_eq!(self.mode, Mode::NonSync); // not a perfect assert, but just to remind myself: cannot exit while in sync
 
        debug_assert_eq!(self.mode, Mode::StartExit);
 
        self.mode = Mode::BusyExit;
 

	
 
        // Doing this by index, then retrieving the handle is a bit rediculous,
 
        // but Rust is being Rust with its borrowing rules.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port = comp_ctx.get_port_by_index_mut(port_index);
 
            if port.state == PortState::Closed {
 
                // Already closed, or in the process of being closed
 
                continue;
 
            }
 

	
 
            // Mark as closed
 
            let port_id = port.self_id;
 
            port.state = PortState::Closed;
 

	
 
        // Note: for now we have that the scheduler handles exiting. I don't
 
        // know if that is a good idea, we'll see
 
        self.mode = Mode::Exit;
 
            // Notify peer of closing
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
@@ -433,15 +455,14 @@ impl CompPDL {
 
            ControlMessageContent::Ack => {
 
                let mut to_ack = message.id;
 
                loop {
 
                    let action = self.control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
                    let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
                    match action {
 
                        AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => {
 
                        AckAction::SendMessage(target_comp, message) => {
 
                            // FIX @NoDirectHandle
 
                            let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                            handle.send_message(sched_ctx, Message::Control(message), true);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(_should_remove.is_none());
 
                            to_ack = new_to_ack;
 
                        },
 
                        AckAction::ScheduleComponent(to_schedule) => {
 
                            // FIX @NoDirectHandle
 
@@ -454,11 +475,13 @@ impl CompPDL {
 
                            sched_ctx.runtime.enqueue_work(key);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(_should_remove.is_none());
 
                            break;
 
                        },
 
                        AckAction::None => {
 
                            break;
 
                        }
 
                        AckAction::None => {}
 
                    }
 

	
 
                    match new_to_ack {
 
                        Some(new_to_ack) => to_ack = new_to_ack,
 
                        None => break,
 
                    }
 
                }
 
            },
 
@@ -479,9 +502,9 @@ impl CompPDL {
 
                let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
                let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
                comp_ctx.set_port_state(port_handle, PortState::Closed);
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id);
 
                comp_ctx.set_port_state(port_handle, PortState::Closed);
 
            },
 
            ControlMessageContent::UnblockPort(port_id) => {
 
                // We were previously blocked (or already closed)
 
@@ -636,6 +659,7 @@ impl CompPDL {
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        println!("DEBUG: Setting peer for port {:?} of component {:?} to {:?}", created_port_info.self_id, reservation.id(), creator_ctx.id);
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None);
 
                    }
 
@@ -670,7 +694,8 @@ impl CompPDL {
 
            // Remove peer if appropriate
 
            let creator_port_info = creator_ctx.get_port(pair.creator_handle);
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator_handle);
 
            creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_port_info.peer_comp_id);
 
            let creator_peer_comp_id = creator_port_info.peer_comp_id;
 
            creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id);
 
            creator_ctx.remove_port(pair.creator_handle);
 

	
 
            // Transfer any messages
 
@@ -700,7 +725,7 @@ impl CompPDL {
 
                let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id);
 
                let peer_port_info = creator_ctx.get_port_mut(peer_port_handle);
 
                peer_port_info.peer_comp_id = created_ctx.id;
 
                creator_ctx.add_peer(pair.created_handle, sched_ctx, created_ctx.id, None);
 
                creator_ctx.add_peer(peer_port_handle, sched_ctx, created_ctx.id, None);
 
            }
 
        }
 

	
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -3,7 +3,6 @@ use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::component_pdl::*;
 
use super::component_context::*;
 

	
 
pub struct PortAnnotation {
 
@@ -556,7 +555,7 @@ impl Consensus {
 
            });
 
            handle.send_message(sched_ctx, message, true);
 
            let _should_remove = handle.decrement_users();
 
            debug_assert!(!_should_remove);
 
            debug_assert!(_should_remove.is_none());
 
        }
 
    }
 

	
 
@@ -565,8 +564,7 @@ impl Consensus {
 
        let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.send_message(sched_ctx, message, true);
 
        let should_remove = leader_info.decrement_users();
 
        if should_remove {
 
            let key = unsafe{ self.highest_id.upgrade() };
 
        if let Some(key) = should_remove {
 
            sched_ctx.runtime.destroy_component(key);
 
        }
 
    }
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -37,9 +37,14 @@ struct ContentPeerChange {
 
    schedule_entry_id: ControlId,
 
}
 

	
 
struct ControlClosedPort {
 
    closed_port: PortId,
 
    exit_entry_id: Option<ControlId>,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessageAndAck(CompId, ControlMessage, ControlId),
 
    SendMessage(CompId, ControlMessage),
 
    ScheduleComponent(CompId),
 
}
 

	
 
@@ -73,18 +78,23 @@ impl ControlLayer {
 
        return None;
 
    }
 

	
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction {
 
    /// Handles an acknowledgement. The returned action must be performed by the
 
    /// caller. The optionally returned `ControlId` must be used and passed to
 
    /// `handle_ack` again.
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> (AckAction, Option<ControlId>) {
 
        let entry_index = self.get_entry_index_by_id(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 

	
 
        entry.ack_countdown -= 1;
 
        if entry.ack_countdown != 0 {
 
            return AckAction::None;
 
            return (AckAction::None, None);
 
        }
 

	
 
        let entry = self.entries.remove(entry_index);
 

	
 
        // All `Ack`s received, take action based on the kind of entry
 
        match &entry.content {
 
        match entry.content {
 
            ControlContent::PeerChange(content) => {
 
                // If change of peer is ack'd. Then we are certain we have
 
                // rerouted all of the messages, and the sender's port can now
 
@@ -100,29 +110,36 @@ impl ControlLayer {
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 
                self.entries.remove(entry_index);
 

	
 
                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
 
                return (
 
                    AckAction::SendMessage(target_comp_id, message_to_send),
 
                    Some(to_ack)
 
                );
 
            },
 
            ControlContent::ScheduleComponent(to_schedule) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return AckAction::ScheduleComponent(*to_schedule);
 
                return (AckAction::ScheduleComponent(to_schedule), None);
 
            },
 
            ControlContent::BlockedPort(_) => unreachable!(),
 
            ControlContent::ClosedPort(port_id) => {
 
            ControlContent::ClosedPort(closed_port) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_handle = comp_ctx.get_port_handle(*port_id);
 
                let port_handle = comp_ctx.get_port_handle(closed_port);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let port_peer_comp_id = port_info.peer_comp_id;
 
                debug_assert_eq!(port_info.state, PortState::Closed);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, port_info.peer_comp_id);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, port_peer_comp_id);
 

	
 
                return AckAction::None;
 
                return (AckAction::None, None);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn has_acks_remaining(&self) -> bool {
 
        return !self.entries.is_empty();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer (due to component creation)
 
    // -------------------------------------------------------------------------
 
@@ -169,12 +186,8 @@ impl ControlLayer {
 
        });
 

	
 
        // increment counter on schedule entry
 
        for entry in &mut self.entries {
 
            if entry.id == schedule_entry_id {
 
                entry.ack_countdown += 1;
 
                break;
 
            }
 
        }
 
        let entry_index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        self.entries[entry_index].ack_countdown += 1;
 

	
 
        return Message::Control(ControlMessage{
 
            id: entry_id,
 
@@ -188,36 +201,33 @@ impl ControlLayer {
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: PortHandle, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> {
 
        let port = comp_ctx.get_port_mut(port_handle);
 
        let port_id = port.self_id;
 
        let peer_port_id = port.peer_port_id;
 
        let peer_comp_id = port.peer_comp_id;
 
        debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked);
 

	
 
        port.state = PortState::Closed;
 

	
 
        if peer_comp_id == comp_ctx.id {
 
            // We own the other end of the channel as well.
 
            return None;
 
        }
 
    /// Initiates the control message procedures for closing a port. Caller must
 
    /// make sure that the port state has already been set to `Closed`.
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
 
        let port = comp_ctx.get_port(port_handle);
 
        let peer_port_id = port.peer_port_id;
 
        debug_assert!(port.state == PortState::Closed);
 

	
 
        // Construct the port-closing entry
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::ClosedPort(port_id),
 
            content: ControlContent::ClosedPort(port.self_id),
 
        });
 

	
 
        return Some((
 
            peer_comp_id,
 
        // Return the messages notifying peer of the closed port
 
        let peer_handle = comp_ctx.get_peer_handle(port.peer_comp_id);
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(peer_port_id),
 
            }
 
        ));
 
        );
 
    }
 

	
 
    /// Adds a control entry to track that a port is blocked. Expects the caller
src/runtime2/runtime.rs
Show inline comments
 
@@ -59,7 +59,8 @@ pub(crate) struct RuntimeComp {
 
    pub public: CompPublic,
 
    pub code: CompPDL,
 
    pub ctx: CompCtx,
 
    pub inbox: QueueDynMpsc<Message>
 
    pub inbox: QueueDynMpsc<Message>,
 
    pub exiting: bool,
 
}
 

	
 
/// Should contain everything that is accessible in a thread-safe manner
 
@@ -109,9 +110,10 @@ impl CompHandle {
 
    /// Returns the `CompKey` to the component if it should be destroyed
 
    pub(crate) fn decrement_users(&mut self) -> Option<CompKey> {
 
        debug_assert!(!self.decremented, "illegal to 'decrement_users' twice");
 
        dbg_code!(self.decremented = true);
 
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        let new_count = old_count - 1;
 
        dbg_code!(self.decremented = true);
 
        println!(" ****** DEBUG [handle]: Decremented count to {} for {:?}", new_count, self.id);
 
        if new_count == 0 {
 
            return Some(unsafe{ self.id.upgrade() });
 
        }
 
@@ -248,6 +250,7 @@ impl RuntimeInner {
 
            code: component,
 
            ctx: context,
 
            inbox: inbox_queue,
 
            exiting: false,
 
        };
 

	
 
        let index = self.components.submit(reserved.reservation, component);
 
@@ -271,6 +274,7 @@ impl RuntimeInner {
 
            code: comp,
 
            ctx,
 
            inbox: inbox_queue,
 
            exiting: false,
 
        };
 

	
 
        let index = self.components.create(comp);
 
@@ -294,7 +298,11 @@ impl RuntimeInner {
 
    }
 

	
 
    pub(crate) fn destroy_component(&self, key: CompKey) {
 
        debug_assert_eq!(self.get_component(key).public.num_handles.load(Ordering::Acquire), 0);
 
        dbg_code!({
 
            let component = self.get_component(key);
 
            debug_assert!(component.exiting);
 
            debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0);
 
        });
 
        self.decrement_active_components();
 
        self.components.destroy(key.0);
 
    }
 
@@ -314,7 +322,7 @@ impl RuntimeInner {
 
        if new_val == 0 {
 
            // Just to be sure, in case the last thing that gets destroyed is an
 
            // API instead of a thread.
 
            let lock = self.work_queue.lock();
 
            let _lock = self.work_queue.lock();
 
            self.work_condvar.notify_all();
 
        }
 
    }
src/runtime2/scheduler.rs
Show inline comments
 
@@ -3,7 +3,6 @@ use std::sync::atomic::Ordering;
 

	
 
use super::component::*;
 
use super::runtime::*;
 
use super::communication::*;
 

	
 
/// Data associated with a scheduler thread
 
pub(crate) struct Scheduler {
 
@@ -74,6 +73,10 @@ impl Scheduler {
 

	
 
    // local utilities
 

	
 
    /// Marks component as sleeping, if after marking itself as sleeping the
 
    /// inbox contains messages then the component will be immediately
 
    /// rescheduled. After calling this function the component should not be
 
    /// executed anymore.
 
    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
 
        debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
 
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
 
@@ -90,30 +93,26 @@ impl Scheduler {
 
        }
 
    }
 

	
 
    /// Marks the component as exiting by removing the reference it holds to
 
    /// itself. Afterward the component will enter "normal" sleeping mode (if it
 
    /// has not yet been destroyed)
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
        // Send messages that all ports will be closed
 
        for port_index in 0..component.ctx.ports.len() {
 
            let port_info = &component.ctx.ports[port_index];
 
            if let Some((peer_id, message)) = component.code.control.initiate_port_closing(port_info.self_id, &mut component.ctx) {
 
                let peer_info = component.ctx.get_peer(peer_id);
 
                peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        // If we didn't yet decrement our reference count, do so now
 
        let comp_key = unsafe{ component.ctx.id.upgrade() };
 

	
 
        if !component.exiting {
 
            component.exiting = true;
 

	
 
            let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
            let new_count = old_count - 1;
 
            println!(" ****** DEBUG [ sched]: Decremented count to {} for {:?}", new_count, component.ctx.id);
 
            if new_count == 0 {
 
                sched_ctx.runtime.destroy_component(comp_key);
 
                return;
 
            }
 
        }
 

	
 
        // Remove all references to the peers that we have
 
        for mut peer in component.ctx.peers.drain(..) {
 
            let should_remove = peer.handle.decrement_users();
 
            if should_remove {
 
                let key = unsafe{ peer.id.upgrade() };
 
                sched_ctx.runtime.destroy_component(key);
 
            }
 
        }
 

	
 
        let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        let new_count = old_count - 1;
 
        if new_count == 0 {
 
            let comp_key = unsafe{ component.ctx.id.upgrade() };
 
            sched_ctx.runtime.destroy_component(comp_key);
 
        }
 
        // Enter "regular" sleeping mode
 
        self.mark_component_as_sleeping(comp_key, component);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/component.rs
Show inline comments
 
@@ -129,7 +129,7 @@ impl<T: Sized> ComponentStore<T> {
 

	
 
    pub fn reserve(&self) -> ComponentReservation {
 
        let lock = self.inner.lock_shared();
 
        let (lock, index) = self.pop_freelist_index(lock);
 
        let (_lock, index) = self.pop_freelist_index(lock);
 
        return ComponentReservation::new(index);
 
    }
 

	
0 comments (0 inline, 0 general)