Changeset - 5415acc02756
[Not reviewed]
0 7 0
mh - 3 years ago 2022-01-28 18:40:04
contact@maxhenger.nl
WIP: Refactored port/peer management, pending more bugfixes to component shutdown
7 files changed with 173 insertions and 121 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_context.rs
Show inline comments
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    pub self_id: PortId,
 
    pub peer_comp_id: CompId, // eventually consistent
 
    pub peer_port_id: PortId, // eventually consistent
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool,
 
}
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
/// Port and peer management structure. Will keep a local reference counter to
 
/// the ports associate with peers, additionally manages the atomic reference
 
/// counter associated with the peers' component handles.
 
pub struct CompCtx {
 
    pub id: CompId,
 
    ports: Vec<Port>,
 
    peers: Vec<Peer>,
 
    port_id_counter: u32,
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub struct LocalPortHandle(PortId);
 

	
 
#[derive(Copy, Clone)]
 
pub struct LocalPeerHandle(CompId);
 

	
 
impl CompCtx {
 
    /// Creates a new component context based on a reserved entry in the
 
    /// component store. This reservation is used such that we already know our
 
    /// assigned ID.
 
    pub(crate) fn new(reservation: &CompReserved) -> Self {
 
        return Self{
 
            id: reservation.id(),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            port_id_counter: 0,
 
        }
 
    }
 

	
 
    /// Creates a new channel that is fully owned by the component associated
 
    /// with this context.
 
    pub(crate) fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_port_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_port_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            associated_with_peer: false,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adds a new port. Make sure to call `add_peer` afterwards.
 
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
 
    }
 

	
 
    /// Removes a port. Make sure you called `remove_peer` first.
 
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
 
        let port_index = self.must_get_port_index(port_handle);
 
        let port = self.ports.remove(port_index);
 
        debug_assert!(!port.associated_with_peer);
 
        return port;
 
    }
 

	
 
    /// Adds a new peer. This must be called for every port, no matter the
 
    /// component the channel is connected to. If a `CompHandle` is supplied,
 
    /// then it will be used to add the peer. Otherwise it will be retrieved
 
    /// from the runtime using its ID.
 
    pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) {
 
        let self_id = self.id;
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_comp_id);
 
        debug_assert!(!port.associated_with_peer);
 
        if !self.requires_peer_reference(port) {
 
        if !Self::requires_peer_reference(port, self_id) {
 
            return;
 
        }
 

	
 
        dbg_code!(port.associated_with_peer = true);
 
        match self.get_peer_index_by_id(peer_comp_id) {
 
            Some(peer_index) => {
 
                let peer = &mut self.peers[peer_index];
 
                peer.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = match handle {
 
                    Some(handle) => handle.clone(),
 
                    None => sched_ctx.runtime.get_component_public(peer_comp_id)
 
                };
 
                self.peers.push(Peer{
 
                    id: peer_comp_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                });
 
            }
 
        }
 
    }
 

	
 
    /// Removes a peer associated with a port.
 
    pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId) {
 
        let self_id = self.id;
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_id);
 
        if !self.requires_peer_reference(port) {
 
        if !Self::requires_peer_reference(port, self_id) {
 
            return;
 
        }
 

	
 
        debug_assert!(port.associated_with_peer);
 
        dbg_code!(port.associated_with_peer = false);
 
        let peer_index = self.get_peer_index_by_id(peer_id).unwrap();
 
        let peer = &mut self.peers[peer_index];
 
        peer.num_associated_ports -= 1;
 
        println!(" ****** DEBUG: Removed peer {:?} from {:?}, now at {}", peer.id, self_id, peer.num_associated_ports);
 
        if peer.num_associated_ports == 0 {
 
            let mut peer = self.peers.remove(peer_index);
 
            if let Some(key) = peer.handle.decrement_users() {
 
                debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component
 
                sched_ctx.runtime.destroy_component(key);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) {
 
        let port_info = self.get_port_mut(port_handle);
 
        debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state
 
        port_info.state = new_state;
 
    }
 

	
 
    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
 
        return LocalPortHandle(port_id);
 
    }
 

	
 
    // should perhaps be revised, used in main inbox
 
    pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize {
 
        return self.must_get_port_index(port_handle);
 
    }
 

	
 
    pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle {
 
        return LocalPeerHandle(peer_id);
 
    }
 

	
 
    pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port {
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &self.peers[index];
 
    }
 

	
 
    pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &mut self.peers[index];
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports(&self) -> impl Iterator<Item=&Port> {
 
        return self.ports.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator<Item=&mut Port> {
 
        return self.ports.iter_mut();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_peers(&self) -> impl Iterator<Item=&Peer> {
 
        return self.peers.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn num_ports(&self) -> usize {
 
        return self.ports.len();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Local utilities
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    fn requires_peer_reference(&self, port: &Port) -> bool {
 
        return port.state == PortState::Closed;
 
    fn requires_peer_reference(port: &Port, self_id: CompId) -> bool {
 
        return port.state != PortState::Closed && port.peer_comp_id != self_id;
 
    }
 

	
 
    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn get_peer_index_by_id(&self, comp_id: CompId) -> Option<usize> {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == comp_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn take_port_id(&mut self) -> u32 {
 
        let port_id = self.port_id_counter;
 
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
 
        return port_id;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/component_pdl.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::ast::DefinitionId;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::*;
 
use super::component_context::*;
 
use super::control_layer::*;
 
use super::consensus::Consensus;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
    Exit,
 
}
 

	
 
pub enum ExecStmt {
 
    CreatedChannel((Value, Value)),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    None,
 
}
 

	
 
impl ExecStmt {
 
    fn take(&mut self) -> ExecStmt {
 
        let mut value = ExecStmt::None;
 
        std::mem::swap(self, &mut value);
 
        return value;
 
    }
 

	
 
    fn is_none(&self) -> bool {
 
        match self {
 
            ExecStmt::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
pub struct ExecCtx {
 
    stmt: ExecStmt,
 
}
 

	
 
impl RunContext for ExecCtx {
 
    fn performed_put(&mut self, _port: EvalPortId) -> bool {
 
        match self.stmt.take() {
 
            ExecStmt::None => return false,
 
            ExecStmt::PerformedPut => return true,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_get(&mut self, _port: EvalPortId) -> Option<ValueGroup> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::PerformedGet(value) => return Some(value),
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn fires(&mut self, _port: EvalPortId) -> Option<Value> {
 
        todo!("remove fires")
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        todo!("remove fork")
 
    }
 

	
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::CreatedChannel(ports) => return Some(ports),
 
            _ => unreachable!(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum Mode {
 
    NonSync, // not in sync mode
 
    Sync, // in sync mode, can interact with other components
 
    SyncFail, // something went wrong during sync mode (deadlocked, error, whatever)
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet,
 
    BlockedPut,
 
    StartExit, // temp state
 
    Exit,
 
}
 

	
 
impl Mode {
 
    fn can_run(&self) -> bool {
 
        match self {
 
            Mode::NonSync | Mode::Sync =>
 
                return true,
 
            Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::StartExit | Mode::Exit =>
 
                return false,
 
        }
 
    }
 
    StartExit, // temporary state: if encountered then we start the shutdown process
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub mode: Mode,
 
    pub mode_port: PortId, // when blocked on a port
 
    pub mode_value: ValueGroup, // when blocked on a put
 
    pub prompt: Prompt,
 
    pub control: ControlLayer,
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: Vec<Option<DataMessage>>,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self {
 
        let mut inbox_main = Vec::new();
 
        inbox_main.reserve(num_ports);
 
        for _ in 0..num_ports {
 
            inbox_main.push(None);
 
        }
 

	
 
        return Self{
 
            mode: Mode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main,
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        sched_ctx.log(&format!("handling message: {:#?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Running component and handling changes in global component state
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        if self.mode == Mode::StartExit {
 
            self.mode = Mode::Exit;
 
            return Ok(CompScheduling::Exit);
 
        }
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.mode));
 

	
 
        let can_run = self.mode.can_run();
 
        sched_ctx.log(&format!("Running component (mode: {:?}, can run: {})", self.mode, can_run));
 
        if !can_run {
 
            return Ok(CompScheduling::Sleep);
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.mode {
 
            Mode::NonSync | Mode::Sync => {},
 
            Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => {
 
                return Ok(CompScheduling::Sleep);
 
            }
 
            Mode::StartExit => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            Mode::BusyExit => {
 
                if self.control.has_acks_remaining() {
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.mode = Mode::Exit;
 
                    return Ok(CompScheduling::Exit);
 
                }
 
            },
 
            Mode::Exit => {
 
                return Ok(CompScheduling::Exit);
 
            }
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let scheduling = self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(scheduling.unwrap_or(CompScheduling::Immediate));
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        self.handle_received_data_message(sched_ctx, comp_ctx, port_handle);
 

	
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return Ok(CompScheduling::Immediate);
 
                    } else {
 
                        self.mode = Mode::SyncFail;
 
                        todo!("handle sync failure due to message deadlock");
 
                        return Ok(CompScheduling::Sleep);
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
                    self.mode_port = port_id;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                if port_info.state == PortState::Blocked {
 
                    todo!("handle blocked port");
 
                }
 
                self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value);
 
                self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Exit);
 
                self.mode = Mode::StartExit; // next call we'll take care of the exit
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                self.inbox_main.push(None);
 
                self.inbox_main.push(None);
 
                return Ok(CompScheduling::Immediate);
 
            }
 
        }
 
    }
 

	
 
    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
 
        let mut step_result = EvalContinuation::Stepping;
 
        while let EvalContinuation::Stepping = step_result {
 
            step_result = self.prompt.step(
 
                &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
                &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx,
 
            )?;
 
        }
 

	
 
        return Ok(step_result)
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component starting sync mode");
 
        self.consensus.notify_sync_start(comp_ctx);
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.mode = Mode::Sync;
 
    }
 

	
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Option<CompScheduling> {
 
    /// Handles end of sync. The conclusion to the sync round might arise
 
    /// immediately (and be handled immediately), or might come later through
 
    /// messaging. In any case the component should be scheduled again
 
    /// immediately
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        self.mode = Mode::SyncEnd;
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision)
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) -> Option<CompScheduling> {
 
    /// Handles decision from the consensus round. This will cause a change in
 
    /// the internal `Mode`, such that the next call to `run` can take the
 
    /// appropriate next steps.
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        debug_assert_eq!(self.mode, Mode::SyncEnd);
 
        sched_ctx.log(&format!("Handling sync decision: {:?}", decision));
 
        let is_success = match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return None;
 
                return;
 
            },
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        // If here then we've reached a decision
 
        self.mode = Mode::NonSync;
 
        if is_success {
 
            self.mode = Mode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
            return None;
 
        } else {
 
            todo!("handle this better, show some kind of error");
 
            self.handle_component_exit(sched_ctx, comp_ctx);
 
            self.mode = Mode::Exit;
 
            return Some(CompScheduling::Exit);
 
            self.mode = Mode::StartExit;
 
        }
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component exiting");
 
        debug_assert_eq!(self.mode, Mode::NonSync); // not a perfect assert, but just to remind myself: cannot exit while in sync
 
        debug_assert_eq!(self.mode, Mode::StartExit);
 
        self.mode = Mode::BusyExit;
 

	
 
        // Doing this by index, then retrieving the handle is a bit rediculous,
 
        // but Rust is being Rust with its borrowing rules.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port = comp_ctx.get_port_by_index_mut(port_index);
 
            if port.state == PortState::Closed {
 
                // Already closed, or in the process of being closed
 
                continue;
 
            }
 

	
 
            // Mark as closed
 
            let port_id = port.self_id;
 
            port.state = PortState::Closed;
 

	
 
        // Note: for now we have that the scheduler handles exiting. I don't
 
        // know if that is a good idea, we'll see
 
        self.mode = Mode::Exit;
 
            // Notify peer of closing
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling messages
 
    // -------------------------------------------------------------------------
 

	
 
    fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) {
 
        let port_info = comp_ctx.get_port(source_port_handle);
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true);
 
    }
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
    /// will handle putting it in the correct place, and potentially blocking
 
    /// the port in case too many messages are being received.
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        // Check if we can insert it directly into the storage associated with
 
        // the port
 
        let target_port_id = message.data_header.target_port;
 
        let port_handle = comp_ctx.get_port_handle(target_port_id);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 

	
 
            // After direct insertion, check if this component's execution is 
 
            // blocked on receiving a message on that port
 
            debug_assert_ne!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // because we could insert directly
 
            if self.mode == Mode::BlockedGet && self.mode_port == target_port_id {
 
                // We were indeed blocked
 
                self.mode = Mode::Sync;
 
                self.mode_port = PortId::new_invalid();
 
            }
 
            
 
            return;
 
        }
 

	
 
        // The direct inbox is full, so the port will become (or was already) blocked
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 

	
 
        if port_info.state == PortState::Open {
 
            comp_ctx.set_port_state(port_handle, PortState::Blocked);
 
            let (peer_handle, message) =
 
                self.control.initiate_port_blocking(comp_ctx, port_handle);
 

	
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        // But we still need to remember the message, so:
 
        self.inbox_backup.push(message);
 
    }
 

	
 
    /// Handles when a message has been handed off from the inbox to the PDL
 
    /// code. We check to see if there are more messages waiting and, if not,
 
    /// then we handle the case where the port might have been blocked
 
    /// previously.
 
    fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) {
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        debug_assert!(self.inbox_main[port_index].is_none()); // this function should be called after the message is taken out
 

	
 
        // Check for any more messages
 
        let port_info = comp_ctx.get_port(port_handle);
 
        for message_index in 0..self.inbox_backup.len() {
 
            let message = &self.inbox_backup[message_index];
 
            if message.data_header.target_port == port_info.self_id {
 
                // One more message for this port
 
                let message = self.inbox_backup.remove(message_index);
 
                debug_assert_eq!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // since we had >1 message on the port
 
                self.inbox_main[port_index] = Some(message);
 

	
 
                return;
 
            }
 
        }
 

	
 
        // Did not have any more messages. So if we were blocked, then we need
 
        // to send the "unblock" message.
 
        if port_info.state == PortState::Blocked {
 
            comp_ctx.set_port_state(port_handle, PortState::Open);
 
            let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 
    }
 

	
 
    fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) {
 
        // Little local utility to send an Ack
 
        fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_handle: LocalPeerHandle) {
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{
 
                id: causer_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: None,
 
                content: ControlMessageContent::Ack,
 
            }), true);
 
        }
 

	
 
        // Handle the content of the control message, and optionally Ack it
 
        match message.content {
 
            ControlMessageContent::Ack => {
 
                let mut to_ack = message.id;
 
                loop {
 
                    let action = self.control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
                    let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
                    match action {
 
                        AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => {
 
                        AckAction::SendMessage(target_comp, message) => {
 
                            // FIX @NoDirectHandle
 
                            let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                            handle.send_message(sched_ctx, Message::Control(message), true);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(_should_remove.is_none());
 
                            to_ack = new_to_ack;
 
                        },
 
                        AckAction::ScheduleComponent(to_schedule) => {
 
                            // FIX @NoDirectHandle
 
                            let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 

	
 
                            // Note that the component is intentionally not
 
                            // sleeping, so we just wake it up
 
                            debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
 
                            let key = unsafe{ to_schedule.upgrade() };
 
                            sched_ctx.runtime.enqueue_work(key);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(_should_remove.is_none());
 
                            break;
 
                        },
 
                        AckAction::None => {
 
                            break;
 
                        }
 
                        AckAction::None => {}
 
                    }
 

	
 
                    match new_to_ack {
 
                        Some(new_to_ack) => to_ack = new_to_ack,
 
                        None => break,
 
                    }
 
                }
 
            },
 
            ControlMessageContent::BlockPort(port_id) => {
 
                // On of our messages was accepted, but the port should be
 
                // blocked.
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                if port_info.state != PortState::Closed {
 
                    comp_ctx.set_port_state(port_handle, PortState::Blocked);
 
                }
 
            },
 
            ControlMessageContent::ClosePort(port_id) => {
 
                // Request to close the port. We immediately comply and remove
 
                // the component handle as well
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
                let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
                comp_ctx.set_port_state(port_handle, PortState::Closed);
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id);
 
                comp_ctx.set_port_state(port_handle, PortState::Closed);
 
            },
 
            ControlMessageContent::UnblockPort(port_id) => {
 
                // We were previously blocked (or already closed)
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed);
 
                if port_info.state == PortState::Blocked {
 
                    self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle);
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
                // The peer of our port has just changed. So we are asked to
 
                // temporarily block the port (while our original recipient is
 
                // potentially rerouting some of the in-flight messages) and
 
                // Ack. Then we wait for the `unblock` call.
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                comp_ctx.set_port_state(port_handle, PortState::Blocked);
 

	
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle);
 
            },
 
            ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
                let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap());
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert!(port_info.state == PortState::Blocked);
 
                let old_peer_id = port_info.peer_comp_id;
 

	
 
                comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id);
 

	
 
                let port_info = comp_ctx.get_port_mut(port_handle);
 
                port_info.peer_comp_id = new_comp_id;
 
                port_info.peer_port_id = new_port_id;
 
                comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None);
 
                self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle);
 
            }
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncEnd);
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
        if self.mode == Mode::Exit {
 
            // TODO: Bit hacky, move this around
 
            self.mode = Mode::StartExit;
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling ports
 
    // -------------------------------------------------------------------------
 

	
 
    /// Unblocks a port, potentially continuing execution of the component, in
 
    /// response to a message that told us to unblock a previously blocked
 
    fn handle_unblock_port_instruction(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) {
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        let port_id = port_info.self_id;
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        port_info.state = PortState::Open;
 

	
 
        if self.mode == Mode::BlockedPut && port_id == self.mode_port {
 
            // We were blocked on the port that just became unblocked, so
 
            // send the message.
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            let mut replacement = ValueGroup::default();
 
            std::mem::swap(&mut replacement, &mut self.mode_value);
 
            self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, replacement);
 

	
 
            self.mode = Mode::Sync;
 
            self.mode_port = PortId::new_invalid();
 
        }
 
    }
 

	
 
    fn create_component_and_transfer_ports(
 
        &mut self,
 
        sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx,
 
        definition_id: DefinitionId, monomorph_index: i32, mut arguments: ValueGroup
 
    ) {
 
        struct PortPair{
 
            creator_handle: LocalPortHandle,
 
            creator_id: PortId,
 
            created_handle: LocalPortHandle,
 
            created_id: PortId,
 
        }
 
        let mut port_id_pairs = Vec::new();
 

	
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        // Take all the ports ID that are in the `args` (and currently belong to
 
        // the creator component) and translate them into new IDs that are
 
        // associated with the component we're about to create
 
        let mut arg_iter = ValueGroupIter::new(&mut arguments);
 
        while let Some(port_reference) = arg_iter.next() {
 
            // Create port entry for new component
 
            let creator_port_id = port_reference.id;
 
            let creator_port_handle = creator_ctx.get_port_handle(creator_port_id);
 
            let creator_port = creator_ctx.get_port(creator_port_handle);
 
            let created_port_handle = created_ctx.add_port(
 
                creator_port.peer_comp_id, creator_port.peer_port_id,
 
                creator_port.kind, creator_port.state
 
            );
 
            let created_port = created_ctx.get_port(created_port_handle);
 
            let created_port_id = created_port.self_id;
 

	
 
            port_id_pairs.push(PortPair{
 
                creator_handle: creator_port_handle,
 
                creator_id: creator_port_id,
 
                created_handle: created_port_handle,
 
                created_id: created_port_id,
 
            });
 

	
 
            // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues)
 
            let arg_value = if let Some(heap_pos) = port_reference.heap_pos {
 
                &mut arg_iter.group.regions[heap_pos][port_reference.index]
 
            } else {
 
                &mut arg_iter.group.values[port_reference.index]
 
            };
 
            match arg_value {
 
                Value::Input(id) => *id = port_id_to_eval(created_port_id),
 
                Value::Output(id) => *id = port_id_to_eval(created_port_id),
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // For each transferred port pair set their peer components to the
 
        // correct values. This will only change the values for the ports of
 
        // the new component.
 
        let mut created_component_has_remote_peers = false;
 

	
 
        for pair in port_id_pairs.iter() {
 
            let creator_port_info = creator_ctx.get_port(pair.creator_handle);
 
            let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // Port peer is owned by the creator as well
 
                let created_peer_port_index = port_id_pairs
 
                    .iter()
 
                    .position(|v| v.creator_id == creator_port_info.peer_port_id);
 
                match created_peer_port_index {
 
                    Some(created_peer_port_index) => {
 
                        // Peer port moved to the new component as well. So
 
                        // adjust IDs appropriately.
 
                        let peer_pair = &port_id_pairs[created_peer_port_index];
 
                        created_port_info.peer_port_id = peer_pair.created_id;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                        todo!("either add 'self peer', or remove that idea from Ctx altogether")
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        println!("DEBUG: Setting peer for port {:?} of component {:?} to {:?}", created_port_info.self_id, reservation.id(), creator_ctx.id);
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None);
 
                    }
 
                }
 
            } else {
 
                // Peer is a different component
 
                let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = creator_ctx.get_peer(peer_handle);
 
                created_ctx.add_peer(pair.created_handle, sched_ctx, peer_info.id, Some(&peer_info.handle));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 

	
 
        // We'll now actually turn our reservation for a new component into an
 
        // actual component. Note that we initialize it as "not sleeping" as
 
        // its initial scheduling might be performed based on `Ack`s in response
 
        // to message exchanges between remote peers.
 
        let prompt = Prompt::new(
 
            &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
            definition_id, monomorph_index, arguments,
 
        );
 
        let component = CompPDL::new(prompt, port_id_pairs.len());
 
        let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component(
 
            reservation, component, created_ctx, false,
 
        );
 
        let created_ctx = &component.ctx;
 

	
 
        // Now modify the creator's ports: remove every transferred port and
 
        // potentially remove the peer component. Here is also where we will
 
        // transfer messages in the main inbox.
 
        for pair in port_id_pairs.iter() {
 
            // Remove peer if appropriate
 
            let creator_port_info = creator_ctx.get_port(pair.creator_handle);
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator_handle);
 
            creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_port_info.peer_comp_id);
 
            let creator_peer_comp_id = creator_port_info.peer_comp_id;
 
            creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id);
 
            creator_ctx.remove_port(pair.creator_handle);
 

	
 
            // Transfer any messages
 
            let created_port_index = created_ctx.get_port_index(pair.created_handle);
 
            let created_port_info = created_ctx.get_port(pair.created_handle);
 
            debug_assert!(component.code.inbox_main[created_port_index].is_none());
 
            if let Some(mut message) = self.inbox_main.remove(creator_port_index) {
 
                message.data_header.target_port = pair.created_id;
 
                component.code.inbox_main[created_port_index] = Some(message);
 
            }
 

	
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                let message = &self.inbox_backup[message_index];
 
                if message.data_header.target_port == pair.creator_id {
 
                    // transfer message
 
                    let mut message = self.inbox_backup.remove(message_index);
 
                    message.data_header.target_port = pair.created_id;
 
                    component.code.inbox_backup.push(message);
 
                } else {
 
                    message_index += 1;
 
                }
 
            }
 

	
 
            // Handle potential channel between creator and created component
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id);
 
                let peer_port_info = creator_ctx.get_port_mut(peer_port_handle);
 
                peer_port_info.peer_comp_id = created_ctx.id;
 
                creator_ctx.add_peer(pair.created_handle, sched_ctx, created_ctx.id, None);
 
                creator_ctx.add_peer(peer_port_handle, sched_ctx, created_ctx.id, None);
 
            }
 
        }
 

	
 
        // By now all ports have been transferred. We'll now do any of the setup
 
        // for rerouting/messaging
 
        if created_component_has_remote_peers {
 
            let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 
            for pair in port_id_pairs.iter() {
 
                let port_info = created_ctx.get_port(pair.created_handle);
 
                if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                    let message = self.control.add_reroute_entry(
 
                        creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id,
 
                        pair.creator_id, pair.created_id, created_ctx.id,
 
                        schedule_entry_id
 
                    );
 
                    let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                    let peer_info = created_ctx.get_peer(peer_handle);
 
                    peer_info.handle.send_message(sched_ctx, message, true);
 
                }
 
            }
 
        } else {
 
            // Peer can be scheduled immediately
 
            sched_ctx.runtime.enqueue_work(created_key);
 
        }
 
    }
 
}
 

	
 
#[inline]
 
fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 

	
 
#[inline]
 
fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortId(port_id.id);
 
                for prev_port in ports.iter() {
 
                    if *prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 

	
 
struct ValueGroupIter<'a> {
 
    group: &'a mut ValueGroup,
 
    heap_stack: Vec<(usize, usize)>,
 
    index: usize,
 
}
 

	
 
impl<'a> ValueGroupIter<'a> {
 
    fn new(group: &'a mut ValueGroup) -> Self {
 
        return Self{ group, heap_stack: Vec::new(), index: 0 }
 
    }
 
}
 

	
 
struct ValueGroupPortRef {
 
    id: PortId,
 
    heap_pos: Option<usize>, // otherwise: on stack
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for ValueGroupIter<'a> {
 
    type Item = ValueGroupPortRef;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Enter loop that keeps iterating until a port is found
 
        loop {
 
            if let Some(pos) = self.heap_stack.last() {
 
                let (heap_pos, region_index) = *pos;
 
                if region_index >= self.group.regions[heap_pos].len() {
 
                    self.heap_stack.pop();
 
                    continue;
 
                }
 

	
 
                let value = &self.group.regions[heap_pos][region_index];
 
                self.heap_stack.last_mut().unwrap().1 += 1;
 

	
 
                match value {
 
                    Value::Input(id) | Value::Output(id) => {
 
                        let id = PortId(id.id);
 
                        return Some(ValueGroupPortRef{
 
                            id,
 
                            heap_pos: Some(heap_pos),
 
                            index: region_index,
 
                        });
 
                    },
 
                    _ => {},
 
                }
 

	
 
                if let Some(heap_pos) = value.get_heap_pos() {
 
                    self.heap_stack.push((heap_pos as usize, 0));
 
                }
 
            } else {
 
                if self.index >= self.group.values.len() {
 
                    return None;
 
                }
 

	
 
                let value = &mut self.group.values[self.index];
 
                self.index += 1;
 

	
 
                match value {
 
                    Value::Input(id) | Value::Output(id) => {
 
                        let id = PortId(id.id);
 
                        return Some(ValueGroupPortRef{
 
                            id,
 
                            heap_pos: None,
 
                            index: self.index - 1
 
                        });
 
                    },
 
                    _ => {},
 
                }
 

	
 
                // Not a port, check if we need to enter a heap region
 
                if let Some(heap_pos) = value.get_heap_pos() {
 
                    self.heap_stack.push((heap_pos as usize, 0));
 
                } // else: just consider the next value
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/consensus.rs
Show inline comments
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::component_pdl::*;
 
use super::component_context::*;
 

	
 
pub struct PortAnnotation {
 
    self_comp_id: CompId,
 
    self_port_id: PortId,
 
    peer_comp_id: CompId, // only valid for getter ports
 
    peer_port_id: PortId, // only valid for getter ports
 
    mapping: Option<u32>,
 
}
 

	
 
impl PortAnnotation {
 
    fn new(comp_id: CompId, port_id: PortId) -> Self {
 
        return Self{
 
            self_comp_id: comp_id,
 
            self_port_id: port_id,
 
            peer_comp_id: CompId::new_invalid(),
 
            peer_port_id: PortId::new_invalid(),
 
            mapping: None
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
enum Mode {
 
    NonSync,
 
    SyncBusy,
 
    SyncAwaitingSolution,
 
}
 

	
 
struct SolutionCombiner {
 
    solution: SyncPartialSolution,
 
    matched_channels: usize,
 
}
 

	
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self {
 
            solution: SyncPartialSolution::default(),
 
            matched_channels: 0,
 
        }
 
    }
 

	
 
    #[inline]
 
    fn has_contributions(&self) -> bool {
 
        return !self.solution.channel_mapping.is_empty();
 
    }
 

	
 
    /// Returns a decision for the current round. If there is no decision (yet)
 
    /// then `RoundDecision::None` is returned.
 
    fn get_decision(&self) -> SyncRoundDecision {
 
        if self.matched_channels == self.solution.channel_mapping.len() {
 
            debug_assert_ne!(self.solution.decision, SyncRoundDecision::None);
 
            return self.solution.decision;
 
        }
 

	
 
        return SyncRoundDecision::None; // even in case of failure: wait for everyone.
 
    }
 

	
 
    fn combine_with_partial_solution(&mut self, partial: SyncPartialSolution) {
 
        debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution);
 
        debug_assert_ne!(partial.decision, SyncRoundDecision::Solution);
 

	
 
        if partial.decision == SyncRoundDecision::Failure {
 
            self.solution.decision = SyncRoundDecision::Failure;
 
        }
 

	
 
        for entry in partial.channel_mapping {
 
            let channel_index = if entry.getter.is_some() && entry.putter.is_some() {
 
                let channel_index = self.solution.channel_mapping.len();
 
                self.solution.channel_mapping.push(entry);
 
                self.matched_channels += 1;
 

	
 
                channel_index
 
            } else if let Some(putter) = entry.putter {
 
                self.combine_with_putter_port(putter)
 
            } else if let Some(getter) = entry.getter {
 
                self.combine_with_getter_port(getter)
 
            } else {
 
                unreachable!(); // both putter and getter are None
 
            };
 

	
 
            let channel = &self.solution.channel_mapping[channel_index];
 
            if let Some(consistent) = Self::channel_is_consistent(channel) {
 
                if !consistent {
 
                    self.solution.decision = SyncRoundDecision::Failure;
 
                }
 
                self.matched_channels += 1;
 
            }
 
        }
 

	
 
        self.update_solution();
 
    }
 

	
 
    /// Combines the currently stored global solution (if any) with the newly
 
    /// provided local solution. Make sure to check the `has_decision` return
 
    /// value afterwards.
 
    fn combine_with_local_solution(&mut self, comp_id: CompId, solution: SyncLocalSolution) {
 
        debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution);
 

	
 
        // Combine partial solution with the local solution entries
 
        for entry in solution {
 
            // Match the current entry up with its peer endpoint, or add a new
 
            // entry.
 
            let channel_index = match entry {
 
                SyncLocalSolutionEntry::Putter(putter) => {
 
                    self.combine_with_putter_port(putter)
 
                },
 
                SyncLocalSolutionEntry::Getter(getter) => {
 
                    self.combine_with_getter_port(getter)
 
                }
 
            };
 

	
 
            // Check if channel is now consistent
 
            let channel = &self.solution.channel_mapping[channel_index];
 
            if let Some(consistent) = Self::channel_is_consistent(channel) {
 
                if !consistent {
 
                    self.solution.decision = SyncRoundDecision::Failure;
 
                }
 
                self.matched_channels += 1;
 
            }
 
        }
 

	
 
        self.update_solution();
 
    }
 

	
 
    /// Takes whatever partial solution is present in the solution combiner and
 
    /// returns it. The solution combiner's solution will end up being empty.
 
    /// This is used when a new leader is found and we need to pass along our
 
    /// partial results.
 
    fn take_partial_solution(&mut self) -> SyncPartialSolution {
 
        let mut partial_solution = SyncPartialSolution::default();
 
        std::mem::swap(&mut partial_solution, &mut self.solution);
 
        self.clear();
 

	
 
        return partial_solution;
 
    }
 

	
 
    fn clear(&mut self) {
 
        self.solution.channel_mapping.clear();
 
        self.solution.decision = SyncRoundDecision::None;
 
        self.matched_channels = 0;
 
    }
 

	
 
    // --- Small utilities for combining solutions
 

	
 
    fn combine_with_putter_port(&mut self, putter: SyncSolutionPutterPort) -> usize {
 
        let channel_index = self.get_channel_index_for_putter(putter.self_comp_id, putter.self_port_id);
 
        if let Some(channel_index) = channel_index {
 
            let channel = &mut self.solution.channel_mapping[channel_index];
 
            debug_assert!(channel.putter.is_none());
 
            channel.putter = Some(putter);
 

	
 
            return channel_index;
 
        } else {
 
            let channel_index = self.solution.channel_mapping.len();
 
            self.solution.channel_mapping.push(SyncSolutionChannel{
 
                putter: Some(putter),
 
                getter: None,
 
            });
 

	
 
            return channel_index;
 
        }
 
    }
 

	
 
    fn combine_with_getter_port(&mut self, getter: SyncSolutionGetterPort) -> usize {
 
        let channel_index = self.get_channel_index_for_getter(getter.peer_comp_id, getter.peer_port_id);
 
        if let Some(channel_index) = channel_index {
 
            let channel = &mut self.solution.channel_mapping[channel_index];
 
            debug_assert!(channel.getter.is_none());
 
            channel.getter = Some(getter);
 

	
 
            return channel_index;
 
        } else {
 
            let channel_index = self.solution.channel_mapping.len();
 
            self.solution.channel_mapping.push(SyncSolutionChannel{
 
                putter: None,
 
                getter: Some(getter)
 
            });
 

	
 
            return channel_index;
 
        }
 
    }
 

	
 
    /// Retrieve index of the channel containing a getter port that has received
 
    /// from the specified putter port.
 
    fn get_channel_index_for_putter(&self, putter_comp_id: CompId, putter_port_id: PortId) -> Option<usize> {
 
        for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() {
 
            if let Some(getter) = &channel.getter {
 
                if getter.peer_comp_id == putter_comp_id && getter.peer_port_id == putter_port_id {
 
                    return Some(channel_index);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Retrieve index of the channel for a getter port. To find this channel
 
    /// the **peer** component/port IDs of the getter port are used.
 
    fn get_channel_index_for_getter(&self, peer_comp_id: CompId, peer_port_id: PortId) -> Option<usize> {
 
        for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() {
 
            if let Some(putter) = &channel.putter {
 
                if putter.self_comp_id == peer_comp_id && putter.self_port_id == peer_port_id {
 
                    return Some(channel_index);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn channel_is_consistent(channel: &SyncSolutionChannel) -> Option<bool> {
 
        if channel.putter.is_none() || channel.getter.is_none() {
 
            return None;
 
        }
 

	
 
        let putter = channel.putter.as_ref().unwrap();
 
        let getter = channel.getter.as_ref().unwrap();
 
        return Some(putter.mapping == getter.mapping);
 
    }
 

	
 
    /// Determines the global solution if all components have contributed their
 
    /// local solutions.
 
    fn update_solution(&mut self) {
 
        if self.matched_channels == self.solution.channel_mapping.len() {
 
            if self.solution.decision != SyncRoundDecision::Failure {
 
                self.solution.decision = SyncRoundDecision::Solution;
 
            }
 
        }
 
    }
 
}
 

	
 
/// Tracking consensus state
 
pub struct Consensus {
 
    // General state of consensus manager
 
    mapping_counter: u32,
 
    mode: Mode,
 
    // State associated with sync round
 
    round_index: u32,
 
    highest_id: CompId,
 
    ports: Vec<PortAnnotation>,
 
    // State associated with arriving at a solution and being a (temporary)
 
    // leader in the consensus round
 
    solution: SolutionCombiner,
 
}
 

	
 
impl Consensus {
 
    pub(crate) fn new() -> Self {
 
        return Self{
 
            round_index: 0,
 
            highest_id: CompId::new_invalid(),
 
            ports: Vec::new(),
 
            mapping_counter: 0,
 
            mode: Mode::NonSync,
 
            solution: SolutionCombiner::new(),
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Managing sync state
 
    // -------------------------------------------------------------------------
 

	
 
    /// Notifies the consensus management that the PDL code has reached the
 
    /// start of a sync block.
 
    pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) {
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.highest_id = comp_ctx.id;
 
        self.mapping_counter = 0;
 
        self.mode = Mode::SyncBusy;
 
        self.make_ports_consistent_with_ctx(comp_ctx);
 
    }
 

	
 
    /// Notifies the consensus management that the PDL code has reached the end
 
    /// of a sync block. A local solution will be submitted, after which we wait
 
    /// until the participants in the round (hopefully) reach a conclusion.
 
    pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        // Submit our port mapping as a solution
 
        let mut local_solution = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                let port_handle = comp_ctx.get_port_handle(port.self_port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let new_entry = match port_info.kind {
 
                    PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        mapping
 
                    }),
 
                    PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        peer_comp_id: port.peer_comp_id,
 
                        peer_port_id: port.peer_port_id,
 
                        mapping
 
                    })
 
                };
 
                local_solution.push(new_entry);
 
            }
 
        }
 

	
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution);
 
        return decision;
 
    }
 

	
 
    /// Notifies that a decision has been reached. Note that the caller should
 
    /// still take the appropriate actions based on the decision it is supplying
 
    /// to the consensus layer.
 
    pub(crate) fn notify_sync_decision(&mut self, _decision: SyncRoundDecision) {
 
        // Reset everything for the next round
 
        debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
        self.mode = Mode::NonSync;
 
        self.round_index = self.round_index.wrapping_add(1);
 

	
 
        for port in self.ports.iter_mut() {
 
            port.mapping = None;
 
        }
 

	
 
        self.solution.clear();
 
    }
 

	
 
    fn make_ports_consistent_with_ctx(&mut self, comp_ctx: &CompCtx) {
 
        let mut needs_setting_ports = false;
 
        if comp_ctx.num_ports() != self.ports.len() {
 
            needs_setting_ports = true;
 
        } else {
 
            for (idx, port) in comp_ctx.iter_ports().enumerate() {
 
                let comp_port_id = port.self_id;
 
                let cons_port_id = self.ports[idx].self_port_id;
 
                if comp_port_id != cons_port_id {
 
                    needs_setting_ports = true;
 
                    break;
 
                }
 
            }
 
        }
 

	
 
        if needs_setting_ports {
 
            self.ports.clear();
 
            self.ports.reserve(comp_ctx.num_ports());
 
            for port in comp_ctx.iter_ports() {
 
                self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id))
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling inbound and outbound messages
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == port_info.self_id));
 
        let data_header = self.create_data_header_and_update_mapping(port_info);
 
        let sync_header = self.create_sync_header(comp_ctx);
 

	
 
        return DataMessage{ data_header, sync_header, content };
 
    }
 

	
 
    /// Checks if the data message can be received (due to port annotations), if
 
    /// it can then `true` is returned and the caller is responsible for handing
 
    /// the message of to the PDL code. Otherwise the message cannot be
 
    /// received.
 
    pub(crate) fn try_receive_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: &DataMessage) -> bool {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == message.data_header.target_port));
 

	
 
        // Make sure the expected mapping matches the currently stored mapping
 
        for (expected_id, expected_annotation) in &message.data_header.expected_mapping {
 
            let got_annotation = self.get_annotation(*expected_id);
 
            if got_annotation != *expected_annotation {
 
                return false;
 
            }
 
        }
 

	
 
        // Expected mapping matches current mapping, so we will receive the message
 
        self.set_annotation(message.sync_header.sending_id, &message.data_header);
 

	
 
        // Handle the sync header embedded within the data message
 
        self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header);
 

	
 
        return true;
 
    }
 

	
 
    /// Receives the sync message and updates the consensus state appropriately.
 
    pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> SyncRoundDecision {
 
        // Whatever happens: handle the sync header (possibly changing the
 
        // currently registered leader)
 
        self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header);
 

	
 
        match message.content {
 
            SyncMessageContent::NotificationOfLeader => {
 
                return SyncRoundDecision::None;
 
            },
 
            SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => {
 
                return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution);
 
            },
 
            SyncMessageContent::PartialSolution(partial_solution) => {
 
                return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution);
 
            },
 
            SyncMessageContent::GlobalSolution => {
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); // leader can only find global- if we submitted local solution
 
                return SyncRoundDecision::Solution;
 
            },
 
            SyncMessageContent::GlobalFailure => {
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
                return SyncRoundDecision::Failure;
 
            }
 
        }
 
    }
 

	
 
    fn handle_sync_header(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, header: &MessageSyncHeader) {
 
        if header.highest_id.0 > self.highest_id.0 {
 
            // Sender knows of someone with a higher ID. So store highest ID,
 
            // notify all peers, and forward local solutions
 
            self.highest_id = header.highest_id;
 
            for peer in comp_ctx.iter_peers() {
 
                if peer.id == header.sending_id {
 
                    continue;
 
                }
 

	
 
                let message = SyncMessage{
 
                    sync_header: self.create_sync_header(comp_ctx),
 
                    content: SyncMessageContent::NotificationOfLeader,
 
                };
 
                peer.handle.send_message(sched_ctx, Message::Sync(message), true);
 
            }
 

	
 
            self.forward_partial_solution(sched_ctx, comp_ctx);
 
        } else if header.highest_id.0 < self.highest_id.0 {
 
            // Sender has a lower ID, so notify it of our higher one
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::NotificationOfLeader,
 
            };
 
            let peer_handle = comp_ctx.get_peer_handle(header.sending_id);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Sync(message), true);
 
        } // else: exactly equal
 
    }
 

	
 
    fn get_annotation(&self, port_id: PortId) -> Option<u32> {
 
        for annotation in self.ports.iter() {
 
            if annotation.self_port_id == port_id {
 
                return annotation.mapping;
 
            }
 
        }
 

	
 
        debug_assert!(false);
 
        return None;
 
    }
 

	
 
    fn set_annotation(&mut self, source_comp_id: CompId, data_header: &MessageDataHeader) {
 
        for annotation in self.ports.iter_mut() {
 
            if annotation.self_port_id == data_header.target_port {
 
                annotation.peer_comp_id = source_comp_id;
 
                annotation.peer_port_id = data_header.source_port;
 
                annotation.mapping = Some(data_header.new_mapping);
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Leader-related methods
 
    // -------------------------------------------------------------------------
 

	
 
    fn forward_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // not leader
 

	
 
        // Make sure that we have something to send
 
        if !self.solution.has_contributions() {
 
            return;
 
        }
 

	
 
        // Swap the container with the partial solution and then send it along
 
        let partial_solution = self.solution.take_partial_solution();
 
        self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(SyncMessage{
 
            sync_header: self.create_sync_header(comp_ctx),
 
            content: SyncMessageContent::PartialSolution(partial_solution),
 
        }));
 
    }
 

	
 
    fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> SyncRoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader
 
            self.solution.combine_with_local_solution(solution_sender_id, solution);
 
            let round_decision = self.solution.get_decision();
 
            if round_decision != SyncRoundDecision::None {
 
                self.broadcast_decision(sched_ctx, comp_ctx, round_decision);
 
            }
 
            return round_decision;
 
        } else {
 
            // Forward the solution
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::LocalSolution(solution_sender_id, solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return SyncRoundDecision::None;
 
        }
 
    }
 

	
 
    fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> SyncRoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader, combine existing and new solution
 
            self.solution.combine_with_partial_solution(solution);
 
            let round_decision = self.solution.get_decision();
 
            if round_decision != SyncRoundDecision::None {
 
                self.broadcast_decision(sched_ctx, comp_ctx, round_decision);
 
            }
 
            return round_decision;
 
        } else {
 
            // Forward the partial solution
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::PartialSolution(solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return SyncRoundDecision::None;
 
        }
 
    }
 

	
 
    fn broadcast_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, decision: SyncRoundDecision) {
 
        debug_assert_eq!(self.highest_id, comp_ctx.id);
 

	
 
        let is_success = match decision {
 
            SyncRoundDecision::None => unreachable!(),
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        let mut peers = Vec::with_capacity(self.solution.solution.channel_mapping.len()); // TODO: @Performance
 

	
 
        for channel in self.solution.solution.channel_mapping.iter() {
 
            let getter = channel.getter.as_ref().unwrap();
 
            if getter.self_comp_id != comp_ctx.id && !peers.contains(&getter.self_comp_id) {
 
                peers.push(getter.self_comp_id);
 
            }
 
            if getter.peer_comp_id != comp_ctx.id && !peers.contains(&getter.peer_comp_id) {
 
                peers.push(getter.peer_comp_id);
 
            }
 
        }
 

	
 
        for peer in peers {
 
            let mut handle = sched_ctx.runtime.get_component_public(peer);
 
            let message = Message::Sync(SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure },
 
            });
 
            handle.send_message(sched_ctx, message, true);
 
            let _should_remove = handle.decrement_users();
 
            debug_assert!(!_should_remove);
 
            debug_assert!(_should_remove.is_none());
 
        }
 
    }
 

	
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader
 
        let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.send_message(sched_ctx, message, true);
 
        let should_remove = leader_info.decrement_users();
 
        if should_remove {
 
            let key = unsafe{ self.highest_id.upgrade() };
 
        if let Some(key) = should_remove {
 
            sched_ctx.runtime.destroy_component(key);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Creating message headers
 
    // -------------------------------------------------------------------------
 

	
 
    fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        let mut port_index = usize::MAX;
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_port_id == port_info.self_id {
 
                port_index = index;
 
            }
 
            expected_mapping.push((port.self_port_id, port.mapping));
 
        }
 

	
 
        let new_mapping = self.take_mapping();
 
        self.ports[port_index].mapping = Some(new_mapping);
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        return MessageDataHeader{
 
            expected_mapping,
 
            new_mapping,
 
            source_port: port_info.self_id,
 
            target_port: port_info.peer_port_id,
 
        };
 
    }
 

	
 
    #[inline]
 
    fn create_sync_header(&self, comp_ctx: &CompCtx) -> MessageSyncHeader {
 
        return MessageSyncHeader{
 
            sync_round: self.round_index,
 
            sending_id: comp_ctx.id,
 
            highest_id: self.highest_id,
 
        };
 
    }
 

	
 
    #[inline]
 
    fn take_mapping(&mut self) -> u32 {
 
        let mapping = self.mapping_counter;
 
        self.mapping_counter = self.mapping_counter.wrapping_add(1);
 
        return mapping;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/control_layer.rs
Show inline comments
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::*;
 

	
 
use super::component_context::*;
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) struct ControlId(u32);
 

	
 
impl ControlId {
 
    /// Like other invalid IDs, this one doesn't care any significance, but is
 
    /// just set at u32::MAX to hopefully bring out bugs sooner.
 
    fn new_invalid() -> Self {
 
        return ControlId(u32::MAX);
 
    }
 
}
 

	
 
struct ControlEntry {
 
    id: ControlId,
 
    ack_countdown: u32,
 
    content: ControlContent,
 
}
 

	
 
enum ControlContent {
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(CompId),
 
    BlockedPort(PortId),
 
    ClosedPort(PortId),
 
}
 

	
 
struct ContentPeerChange {
 
    source_port: PortId,
 
    source_comp: CompId,
 
    old_target_port: PortId,
 
    new_target_port: PortId,
 
    new_target_comp: CompId,
 
    schedule_entry_id: ControlId,
 
}
 

	
 
struct ControlClosedPort {
 
    closed_port: PortId,
 
    exit_entry_id: Option<ControlId>,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessageAndAck(CompId, ControlMessage, ControlId),
 
    SendMessage(CompId, ControlMessage),
 
    ScheduleComponent(CompId),
 
}
 

	
 
/// Handling/sending control messages.
 
pub(crate) struct ControlLayer {
 
    id_counter: ControlId,
 
    entries: Vec<ControlEntry>,
 
}
 

	
 
impl ControlLayer {
 
    pub(crate) fn should_reroute(&self, message: &mut Message) -> Option<CompId> {
 
        // Safety note: rerouting should occur during the time when we're
 
        // notifying a peer of a new component. During this period that
 
        // component hasn't been executed yet, so cannot have died yet.
 
        // FIX @NoDirectHandle
 
        let target_port = message.target_port();
 
        if target_port.is_none() {
 
            return None;
 
        }
 

	
 
        let target_port = target_port.unwrap();
 
        for entry in &self.entries {
 
            if let ControlContent::PeerChange(entry) = &entry.content {
 
                if entry.old_target_port == target_port {
 
                    message.modify_target_port(entry.new_target_port);
 
                    return Some(entry.new_target_comp);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction {
 
    /// Handles an acknowledgement. The returned action must be performed by the
 
    /// caller. The optionally returned `ControlId` must be used and passed to
 
    /// `handle_ack` again.
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> (AckAction, Option<ControlId>) {
 
        let entry_index = self.get_entry_index_by_id(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 

	
 
        entry.ack_countdown -= 1;
 
        if entry.ack_countdown != 0 {
 
            return AckAction::None;
 
            return (AckAction::None, None);
 
        }
 

	
 
        let entry = self.entries.remove(entry_index);
 

	
 
        // All `Ack`s received, take action based on the kind of entry
 
        match &entry.content {
 
        match entry.content {
 
            ControlContent::PeerChange(content) => {
 
                // If change of peer is ack'd. Then we are certain we have
 
                // rerouted all of the messages, and the sender's port can now
 
                // be unblocked again.
 
                let target_comp_id = content.source_comp;
 
                let message_to_send = ControlMessage{
 
                    id: ControlId::new_invalid(),
 
                    sender_comp_id: comp_ctx.id,
 
                    target_port_id: Some(content.source_port),
 
                    content: ControlMessageContent::PortPeerChangedUnblock(
 
                        content.new_target_port,
 
                        content.new_target_comp
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 
                self.entries.remove(entry_index);
 

	
 
                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
 
                return (
 
                    AckAction::SendMessage(target_comp_id, message_to_send),
 
                    Some(to_ack)
 
                );
 
            },
 
            ControlContent::ScheduleComponent(to_schedule) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return AckAction::ScheduleComponent(*to_schedule);
 
                return (AckAction::ScheduleComponent(to_schedule), None);
 
            },
 
            ControlContent::BlockedPort(_) => unreachable!(),
 
            ControlContent::ClosedPort(port_id) => {
 
            ControlContent::ClosedPort(closed_port) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_handle = comp_ctx.get_port_handle(*port_id);
 
                let port_handle = comp_ctx.get_port_handle(closed_port);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let port_peer_comp_id = port_info.peer_comp_id;
 
                debug_assert_eq!(port_info.state, PortState::Closed);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, port_info.peer_comp_id);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, port_peer_comp_id);
 

	
 
                return AckAction::None;
 
                return (AckAction::None, None);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn has_acks_remaining(&self) -> bool {
 
        return !self.entries.is_empty();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer (due to component creation)
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::ScheduleComponent(to_schedule_id),
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Removes a schedule entry. Only used if the caller preemptively called
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
    }
 

	
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
        old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId,
 
        schedule_entry_id: ControlId,
 
    ) -> Message {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::PeerChange(ContentPeerChange{
 
                source_port: source_port_id,
 
                source_comp: source_comp_id,
 
                old_target_port: old_target_port_id,
 
                new_target_port: new_target_port_id,
 
                new_target_comp: new_comp_id,
 
                schedule_entry_id,
 
            }),
 
        });
 

	
 
        // increment counter on schedule entry
 
        for entry in &mut self.entries {
 
            if entry.id == schedule_entry_id {
 
                entry.ack_countdown += 1;
 
                break;
 
            }
 
        }
 
        let entry_index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        self.entries[entry_index].ack_countdown += 1;
 

	
 
        return Message::Control(ControlMessage{
 
            id: entry_id,
 
            sender_comp_id: creator_comp_id,
 
            target_port_id: Some(source_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock(source_port_id)
 
        })
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: PortHandle, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> {
 
        let port = comp_ctx.get_port_mut(port_handle);
 
        let port_id = port.self_id;
 
        let peer_port_id = port.peer_port_id;
 
        let peer_comp_id = port.peer_comp_id;
 
        debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked);
 

	
 
        port.state = PortState::Closed;
 

	
 
        if peer_comp_id == comp_ctx.id {
 
            // We own the other end of the channel as well.
 
            return None;
 
        }
 
    /// Initiates the control message procedures for closing a port. Caller must
 
    /// make sure that the port state has already been set to `Closed`.
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
 
        let port = comp_ctx.get_port(port_handle);
 
        let peer_port_id = port.peer_port_id;
 
        debug_assert!(port.state == PortState::Closed);
 

	
 
        // Construct the port-closing entry
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::ClosedPort(port_id),
 
            content: ControlContent::ClosedPort(port.self_id),
 
        });
 

	
 
        return Some((
 
            peer_comp_id,
 
        // Return the messages notifying peer of the closed port
 
        let peer_handle = comp_ctx.get_peer_handle(port.peer_comp_id);
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(peer_port_id),
 
            }
 
        ));
 
        );
 
    }
 

	
 
    /// Adds a control entry to track that a port is blocked. Expects the caller
 
    /// to have set the port's state to blocking already. The returned tuple
 
    /// contains a message and the peer to send it to.
 
    pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block
 
        debug_assert_eq!(port_info.state, PortState::Blocked); // contract with caller
 

	
 
        let peer_port_id = port_info.peer_port_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0,
 
            content: ControlContent::BlockedPort(port_info.self_id),
 
        });
 

	
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_port_id),
 
                content: ControlMessageContent::BlockPort(peer_port_id),
 
            }
 
        );
 
    }
 

	
 
    /// Removes the control entry that tracks that a port is blocked. Expects
 
    /// the caller to have already marked the port as unblocked. Again the
 
    /// returned tuple contains a message and the target it is intended for
 
    pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking
 
        debug_assert_eq!(port_info.state, PortState::Open); // contract with caller, the locally stored entry ensures we were blocked before
 

	
 
        let position = self.entries.iter()
 
            .position(|v| {
 
                if let ControlContent::BlockedPort(blocked_port_id) = &v.content {
 
                    if *blocked_port_id == port_info.self_id {
 
                        return true;
 
                    }
 
                }
 
                return false;
 
            })
 
            .unwrap();
 

	
 
        let entry = self.entries.remove(position);
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry.id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_port_id),
 
                content: ControlMessageContent::UnblockPort(port_info.peer_port_id)
 
            }
 
        );
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn take_id(&mut self) -> ControlId {
 
        let id = self.id_counter;
 
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
 
        return id;
 
    }
 

	
 
    fn get_entry_index_by_id(&self, entry_id: ControlId) -> Option<usize> {
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if entry.id == entry_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
impl Default for ControlLayer {
 
    fn default() -> Self {
 
        return ControlLayer{
 
            id_counter: ControlId(0),
 
            entries: Vec::new(),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::*;
 

	
 
use super::communication::Message;
 
use super::component::{wake_up_if_sleeping, CompPDL, CompCtx};
 
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
 
use super::scheduler::*;
 

	
 
// -----------------------------------------------------------------------------
 
// Component
 
// -----------------------------------------------------------------------------
 

	
 
/// Key to a component. Type system somewhat ensures that there can only be one
 
/// of these. Only with a key one may retrieve privately-accessible memory for
 
/// a component. Practically just a generational index, like `CompId` is.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct CompKey(pub u32);
 

	
 
impl CompKey {
 
    pub(crate) fn downgrade(&self) -> CompId {
 
        return CompId(self.0);
 
    }
 
}
 

	
 
/// Generational ID of a component
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct CompId(pub u32);
 

	
 
impl CompId {
 
    pub(crate) fn new_invalid() -> CompId {
 
        return CompId(u32::MAX);
 
    }
 

	
 
    /// Upgrade component ID to component key. Unsafe because the caller needs
 
    /// to make sure that only one component key can exist at a time (to ensure
 
    /// a component can only be scheduled/executed by one thread).
 
    pub(crate) unsafe fn upgrade(&self) -> CompKey {
 
        return CompKey(self.0);
 
    }
 
}
 

	
 
/// Handle to a component that is being created.
 
pub(crate) struct CompReserved {
 
    reservation: ComponentReservation,
 
}
 

	
 
impl CompReserved {
 
    pub(crate) fn id(&self) -> CompId {
 
        return CompId(self.reservation.index)
 
    }
 
}
 

	
 
/// Private fields of a component, may only be modified by a single thread at
 
/// a time.
 
pub(crate) struct RuntimeComp {
 
    pub public: CompPublic,
 
    pub code: CompPDL,
 
    pub ctx: CompCtx,
 
    pub inbox: QueueDynMpsc<Message>
 
    pub inbox: QueueDynMpsc<Message>,
 
    pub exiting: bool,
 
}
 

	
 
/// Should contain everything that is accessible in a thread-safe manner
 
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
 
//  "foolproof" to lighten the mental burden of using the `num_handles`
 
//  variable.
 
pub(crate) struct CompPublic {
 
    pub sleeping: AtomicBool,
 
    pub num_handles: AtomicU32, // manually modified (!)
 
    inbox: QueueDynProducer<Message>,
 
}
 

	
 
/// Handle to public part of a component. Would be nice if we could
 
/// automagically manage the `num_handles` counter. But when it reaches zero we
 
/// need to manually remove the handle from the runtime. So we just have debug
 
/// code to make sure this actually happens.
 
pub(crate) struct CompHandle {
 
    target: *const CompPublic,
 
    id: CompId, // TODO: @Remove after debugging
 
    #[cfg(debug_assertions)] decremented: bool,
 
}
 

	
 
impl CompHandle {
 
    fn new(id: CompId, public: &CompPublic) -> CompHandle {
 
        let handle = CompHandle{
 
            target: public,
 
            id,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
        handle.increment_users();
 
        return handle;
 
    }
 

	
 
    pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) {
 
        sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message));
 
        self.inbox.push(message);
 
        if try_wake_up {
 
            wake_up_if_sleeping(sched_ctx, self.id, self);
 
        }
 
    }
 

	
 
    fn increment_users(&self) {
 
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
 
    }
 

	
 
    /// Returns the `CompKey` to the component if it should be destroyed
 
    pub(crate) fn decrement_users(&mut self) -> Option<CompKey> {
 
        debug_assert!(!self.decremented, "illegal to 'decrement_users' twice");
 
        dbg_code!(self.decremented = true);
 
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        let new_count = old_count - 1;
 
        dbg_code!(self.decremented = true);
 
        println!(" ****** DEBUG [handle]: Decremented count to {} for {:?}", new_count, self.id);
 
        if new_count == 0 {
 
            return Some(unsafe{ self.id.upgrade() });
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
impl Clone for CompHandle {
 
    fn clone(&self) -> Self {
 
        debug_assert!(!self.decremented, "illegal to clone after 'decrement_users'");
 
        self.increment_users();
 
        return CompHandle{
 
            target: self.target,
 
            id: self.id,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
    }
 
}
 

	
 
impl std::ops::Deref for CompHandle {
 
    type Target = CompPublic;
 

	
 
    fn deref(&self) -> &Self::Target {
 
        debug_assert!(!self.decremented); // cannot access if control is relinquished
 
        return unsafe{ &*self.target };
 
    }
 
}
 

	
 
impl Drop for CompHandle {
 
    fn drop(&mut self) {
 
        debug_assert!(self.decremented, "need call to 'decrement_users' before dropping");
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 

	
 
pub struct Runtime {
 
    pub(crate) inner: Arc<RuntimeInner>,
 
    threads: Vec<std::thread::JoinHandle<()>>,
 
}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
 
        assert!(num_threads > 0, "need a thread to perform work");
 
        let runtime_inner = Arc::new(RuntimeInner {
 
            protocol: protocol_description,
 
            components: ComponentStore::new(128),
 
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
 
            work_condvar: Condvar::new(),
 
            active_elements: AtomicU32::new(1),
 
        });
 
        let mut runtime = Runtime {
 
            inner: runtime_inner,
 
            threads: Vec::with_capacity(num_threads as usize),
 
        };
 

	
 
        for thread_index in 0..num_threads {
 
            let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index);
 
            let thread_handle = std::thread::spawn(move || {
 
                scheduler.run();
 
            });
 

	
 
            runtime.threads.push(thread_handle);
 
        }
 

	
 
        return runtime;
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.decrement_active_components();
 
        for handle in self.threads.drain(..) {
 
            handle.join().expect("join scheduler thread");
 
        }
 
    }
 
}
 

	
 
/// Memory that is maintained by "the runtime". In practice it is maintained by
 
/// multiple schedulers, and this serves as the common interface to that memory.
 
pub(crate) struct RuntimeInner {
 
    pub protocol: ProtocolDescription,
 
    components: ComponentStore<RuntimeComp>,
 
    work_queue: Mutex<VecDeque<CompKey>>,
 
    work_condvar: Condvar,
 
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
 
}
 

	
 
impl RuntimeInner {
 
    // Scheduling and retrieving work
 

	
 
    pub(crate) fn take_work(&self) -> Option<CompKey> {
 
        let mut lock = self.work_queue.lock().unwrap();
 
        while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 {
 
            lock = self.work_condvar.wait(lock).unwrap();
 
        }
 

	
 
        // We have work, or the schedulers should exit.
 
        return lock.pop_front();
 
    }
 

	
 
    pub(crate) fn enqueue_work(&self, key: CompKey) {
 
        let mut lock = self.work_queue.lock().unwrap();
 
        lock.push_back(key);
 
        self.work_condvar.notify_one();
 
    }
 

	
 
    // Creating/destroying components
 

	
 
    pub(crate) fn start_create_pdl_component(&self) -> CompReserved {
 
        self.increment_active_components();
 
        let reservation = self.components.reserve();
 
        return CompReserved{ reservation };
 
    }
 

	
 
    pub(crate) fn finish_create_pdl_component(
 
        &self, reserved: CompReserved,
 
        component: CompPDL, mut context: CompCtx, initially_sleeping: bool,
 
    ) -> (CompKey, &mut RuntimeComp) {
 
        let inbox_queue = QueueDynMpsc::new(16);
 
        let inbox_producer = inbox_queue.producer();
 

	
 
        let _id = reserved.id();
 
        context.id = reserved.id();
 
        let component = RuntimeComp {
 
            public: CompPublic{
 
                sleeping: AtomicBool::new(initially_sleeping),
 
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
 
                inbox: inbox_producer,
 
            },
 
            code: component,
 
            ctx: context,
 
            inbox: inbox_queue,
 
            exiting: false,
 
        };
 

	
 
        let index = self.components.submit(reserved.reservation, component);
 
        debug_assert_eq!(index, _id.0);
 
        let component = self.components.get_mut(index);
 

	
 
        return (CompKey(index), component);
 
    }
 

	
 
    /// Creates a new component. Note that the public part will be properly
 
    /// initialized, but not all private fields are.
 
    pub(crate) fn create_pdl_component(&self, comp: CompPDL, ctx: CompCtx, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) {
 
        let inbox_queue = QueueDynMpsc::new(16);
 
        let inbox_producer = inbox_queue.producer();
 
        let comp = RuntimeComp{
 
            public: CompPublic{
 
                sleeping: AtomicBool::new(initially_sleeping),
 
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
 
                inbox: inbox_producer,
 
            },
 
            code: comp,
 
            ctx,
 
            inbox: inbox_queue,
 
            exiting: false,
 
        };
 

	
 
        let index = self.components.create(comp);
 

	
 
        // TODO: just do a reserve_index followed by a consume_index or something
 
        self.increment_active_components();
 
        let component = self.components.get_mut(index);
 
        component.ctx.id = CompId(index);
 

	
 
        return (CompKey(index), component);
 
    }
 

	
 
    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
 
        let component = self.components.get_mut(key.0);
 
        return component;
 
    }
 

	
 
    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
 
        let component = self.components.get(id.0);
 
        return CompHandle::new(id, &component.public);
 
    }
 

	
 
    pub(crate) fn destroy_component(&self, key: CompKey) {
 
        debug_assert_eq!(self.get_component(key).public.num_handles.load(Ordering::Acquire), 0);
 
        dbg_code!({
 
            let component = self.get_component(key);
 
            debug_assert!(component.exiting);
 
            debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0);
 
        });
 
        self.decrement_active_components();
 
        self.components.destroy(key.0);
 
    }
 

	
 
    // Tracking number of active interfaces and the active components
 

	
 
    #[inline]
 
    fn increment_active_components(&self) {
 
        let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0.
 
    }
 

	
 
    fn decrement_active_components(&self) {
 
        let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel);
 
        debug_assert!(old_val > 0); // something wrong with incr/decr logic
 
        let new_val = old_val - 1;
 
        if new_val == 0 {
 
            // Just to be sure, in case the last thing that gets destroyed is an
 
            // API instead of a thread.
 
            let lock = self.work_queue.lock();
 
            let _lock = self.work_queue.lock();
 
            self.work_condvar.notify_all();
 
        }
 
    }
 
}
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 

	
 
use super::component::*;
 
use super::runtime::*;
 
use super::communication::*;
 

	
 
/// Data associated with a scheduler thread
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub runtime: &'a RuntimeInner,
 
    pub id: u32,
 
    pub comp: u32,
 
}
 

	
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a RuntimeInner, id: u32) -> Self {
 
        return Self {
 
            runtime,
 
            id,
 
            comp: 0,
 
        }
 
    }
 

	
 
    pub(crate) fn log(&self, text: &str) {
 
        println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
 
    }
 
}
 

	
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Scheduler{ runtime, scheduler_id }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id);
 

	
 
        'run_loop: loop {
 
            // Wait until we have something to do (or need to quit)
 
            let comp_key = self.runtime.take_work();
 
            if comp_key.is_none() {
 
                break 'run_loop;
 
            }
 

	
 
            let comp_key = comp_key.unwrap();
 
            let component = self.runtime.get_component(comp_key);
 
            scheduler_ctx.comp = comp_key.0;
 

	
 
            // Run the component until it no longer indicates that it needs to
 
            // be re-executed immediately.
 
            let mut new_scheduling = CompScheduling::Immediate;
 
            while let CompScheduling::Immediate = new_scheduling {
 
                while let Some(message) = component.inbox.pop() {
 
                    component.code.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
 
                }
 
                new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
 
            }
 

	
 
            // Handle the new scheduling
 
            match new_scheduling {
 
                CompScheduling::Immediate => unreachable!(),
 
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
 
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
 
                CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); }
 
            }
 
        }
 
    }
 

	
 
    // local utilities
 

	
 
    /// Marks component as sleeping, if after marking itself as sleeping the
 
    /// inbox contains messages then the component will be immediately
 
    /// rescheduled. After calling this function the component should not be
 
    /// executed anymore.
 
    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
 
        debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
 
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
 

	
 
        component.public.sleeping.store(true, Ordering::Release);
 
        if component.inbox.can_pop() {
 
            let should_reschedule = component.public.sleeping
 
                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
 
                .is_ok();
 

	
 
            if should_reschedule {
 
                self.runtime.enqueue_work(key);
 
            }
 
        }
 
    }
 

	
 
    /// Marks the component as exiting by removing the reference it holds to
 
    /// itself. Afterward the component will enter "normal" sleeping mode (if it
 
    /// has not yet been destroyed)
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
        // Send messages that all ports will be closed
 
        for port_index in 0..component.ctx.ports.len() {
 
            let port_info = &component.ctx.ports[port_index];
 
            if let Some((peer_id, message)) = component.code.control.initiate_port_closing(port_info.self_id, &mut component.ctx) {
 
                let peer_info = component.ctx.get_peer(peer_id);
 
                peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        // If we didn't yet decrement our reference count, do so now
 
        let comp_key = unsafe{ component.ctx.id.upgrade() };
 

	
 
        if !component.exiting {
 
            component.exiting = true;
 

	
 
            let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
            let new_count = old_count - 1;
 
            println!(" ****** DEBUG [ sched]: Decremented count to {} for {:?}", new_count, component.ctx.id);
 
            if new_count == 0 {
 
                sched_ctx.runtime.destroy_component(comp_key);
 
                return;
 
            }
 
        }
 

	
 
        // Remove all references to the peers that we have
 
        for mut peer in component.ctx.peers.drain(..) {
 
            let should_remove = peer.handle.decrement_users();
 
            if should_remove {
 
                let key = unsafe{ peer.id.upgrade() };
 
                sched_ctx.runtime.destroy_component(key);
 
            }
 
        }
 

	
 
        let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        let new_count = old_count - 1;
 
        if new_count == 0 {
 
            let comp_key = unsafe{ component.ctx.id.upgrade() };
 
            sched_ctx.runtime.destroy_component(comp_key);
 
        }
 
        // Enter "regular" sleeping mode
 
        self.mark_component_as_sleeping(comp_key, component);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/component.rs
Show inline comments
 
/*
 
 * Component Store
 
 *
 
 * Concurrent datastructure for creating/destroying/retrieving components using
 
 * their ID. It is essentially a variation on a concurrent freelist. We store an
 
 * array of (potentially null) pointers to data. Indices into this array that
 
 * are unused (but may be left allocated) are in a freelist. So creating a new
 
 * bit of data involves taking an index from this freelist. Destruction involves
 
 * putting the index back.
 
 *
 
 * This datastructure takes care of the threadsafe implementation of the
 
 * freelist and calling the data's destructor when needed. Note that it is not
 
 * completely safe (in Rust's sense of the word) because it is possible to
 
 * get more than one mutable reference to a piece of data. Likewise it is
 
 * possible to put back bogus indices into the freelist, which will destroy the
 
 * integrity of the datastructure.
 
 *
 
 * Some underlying assumptions that led to this design (note that I haven't
 
 * actually checked these conditions or performed any real profiling, yet):
 
 *  - Resizing the freelist should be very rare. The datastructure should grow
 
 *    to some kind of maximum size and stay at that size.
 
 *  - Creation should (preferably) be faster than deletion of data. Reason being
 
 *    that creation implies we're creating a component that has code to be
 
 *    executed. Better to quickly be able to execute code than being able to
 
 *    quickly tear down finished components.
 
 *  - Retrieval is much more likely than creation/destruction.
 
 *
 
 * Some obvious flaws with this implementation:
 
 *  - Because of the freelist implementation we will generally allocate all of
 
 *    the data pointers that are available (i.e. if we have a buffer of size
 
 *    64, but we generally use 33 elements, than we'll have 64 elements
 
 *    allocated), which might be wasteful at larger array sizes (which are
 
 *    always powers of two).
 
 *  - A lot of concurrent operations are not necessary: we may move some of the
 
 *    access to the global concurrent datastructure by an initial access to some
 
 *    kind of thread-local datastructure.
 
 */
 

	
 
use std::mem::transmute;
 
use std::alloc::{dealloc, Layout};
 
use std::ptr;
 
use std::sync::atomic::{AtomicUsize, Ordering};
 

	
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
/// Generic store of components. Essentially a resizable freelist (implemented
 
/// as a ringbuffer) combined with an array of actual elements.
 
pub struct ComponentStore<T: Sized> {
 
    inner: UnfairSeLock<Inner<T>>,
 
    read_head: AtomicUsize,
 
    write_head: AtomicUsize,
 
    limit_head: AtomicUsize,
 
}
 

	
 
unsafe impl<T: Sized> Send for ComponentStore<T>{}
 
unsafe impl<T: Sized> Sync for ComponentStore<T>{}
 

	
 
/// Contents of the `ComponentStore` that require a shared/exclusive locking
 
/// mechanism for consistency.
 
struct Inner<T: Sized> {
 
    freelist: Vec<u32>,
 
    data: Vec<*mut T>,
 
    size: usize,
 
    compare_mask: usize,
 
    index_mask: usize,
 
}
 

	
 
type InnerShared<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 

	
 
/// Reservation of a slot in the component store. Corresponds to the case where
 
/// an index has been taken from the freelist, but the element has not yet been
 
/// initialized
 
pub struct ComponentReservation {
 
    pub(crate) index: u32,
 
    #[cfg(debug_assertions)] submitted: bool,
 
}
 

	
 
impl ComponentReservation {
 
    fn new(index: u32) -> Self {
 
        return Self{
 
            index,
 
            #[cfg(debug_assertions)] submitted: false,
 
        }
 
    }
 
}
 

	
 
impl Drop for ComponentReservation {
 
    fn drop(&mut self) {
 
        debug_assert!(self.submitted);
 
    }
 
}
 

	
 
impl<T: Sized> ComponentStore<T> {
 
    pub fn new(initial_size: usize) -> Self {
 
        Self::assert_valid_size(initial_size);
 

	
 
        // Fill initial freelist and preallocate data array
 
        let mut initial_freelist = Vec::with_capacity(initial_size);
 
        for idx in 0..initial_size {
 
            initial_freelist.push(idx as u32)
 
        }
 

	
 
        let mut initial_data = Vec::new();
 
        initial_data.resize(initial_size, ptr::null_mut());
 

	
 
        // Return initial store
 
        return Self{
 
            inner: UnfairSeLock::new(Inner{
 
                freelist: initial_freelist,
 
                data: initial_data,
 
                size: initial_size,
 
                compare_mask: 2*initial_size - 1,
 
                index_mask: initial_size - 1,
 
            }),
 
            read_head: AtomicUsize::new(0),
 
            write_head: AtomicUsize::new(initial_size),
 
            limit_head: AtomicUsize::new(initial_size),
 
        };
 
    }
 

	
 
    /// Creates a new element initialized to the provided `value`. This returns
 
    /// the index at which the element can be retrieved.
 
    pub fn create(&self, value: T) -> u32 {
 
        let lock = self.inner.lock_shared();
 
        let (lock, index) = self.pop_freelist_index(lock);
 
        Self::initialize_at_index(lock, index, value);
 
        return index;
 
    }
 

	
 
    pub fn reserve(&self) -> ComponentReservation {
 
        let lock = self.inner.lock_shared();
 
        let (lock, index) = self.pop_freelist_index(lock);
 
        let (_lock, index) = self.pop_freelist_index(lock);
 
        return ComponentReservation::new(index);
 
    }
 

	
 
    pub fn submit(&self, mut reservation: ComponentReservation, value: T) -> u32 {
 
        dbg_code!({ reservation.submitted = true; });
 
        let lock = self.inner.lock_shared();
 
        Self::initialize_at_index(lock, reservation.index, value);
 
        return reservation.index;
 
    }
 

	
 
    /// Destroys an element at the provided `index`. The caller must make sure
 
    /// that it does not use any previously received references to the data at
 
    /// this index, and that no more calls to `get` are performed using this
 
    /// index. This is allowed again if the index has been reacquired using
 
    /// `create`.
 
    pub fn destroy(&self, index: u32) {
 
        let lock = self.inner.lock_shared();
 
        self.destruct_at_index(&lock, index);
 
        self.push_freelist_index(&lock, index);
 
    }
 

	
 
    /// Retrieves an element by reference
 
    pub fn get(&self, index: u32) -> &T {
 
        let lock = self.inner.lock_shared();
 
        let value = lock.data[index as usize];
 
        unsafe {
 
            debug_assert!(!value.is_null());
 
            return &*value;
 
        }
 
    }
 

	
 
    /// Retrieves an element by mutable reference. The caller should ensure that
 
    /// use of that mutability is thread-safe
 
    pub fn get_mut(&self, index: u32) -> &mut T {
 
        let lock = self.inner.lock_shared();
 
        let value = lock.data[index as usize];
 
        unsafe {
 
            debug_assert!(!value.is_null());
 
            return &mut *value;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn pop_freelist_index<'a>(&'a self, mut shared_lock: InnerShared<'a, T>) -> (InnerShared<'a, T>, u32) {
 
        'attempt_read: loop {
 
            // Load indices and check for reallocation condition
 
            let current_size = shared_lock.size;
 
            let mut read_index = self.read_head.load(Ordering::Relaxed);
 
            let limit_index = self.limit_head.load(Ordering::Acquire);
 

	
 
            if read_index == limit_index {
 
                shared_lock = self.reallocate(current_size, shared_lock);
 
                continue 'attempt_read;
 
            }
 

	
 
            loop {
 
                let preemptive_read = shared_lock.freelist[read_index & shared_lock.index_mask];
 
                if let Err(actual_read_index) = self.read_head.compare_exchange(
 
                    read_index, (read_index + 1) & shared_lock.compare_mask,
 
                    Ordering::AcqRel, Ordering::Acquire
 
                ) {
 
                    // We need to try again
 
                    read_index = actual_read_index;
 
                    continue 'attempt_read;
 
                }
 

	
 
                // If here then we performed the read
 
                return (shared_lock, preemptive_read);
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn initialize_at_index(read_lock: InnerShared<T>, index: u32, value: T) {
 
        let mut target_ptr = read_lock.data[index as usize];
 

	
 
        unsafe {
 
            if target_ptr.is_null() {
 
                let layout = Layout::for_value(&value);
 
                target_ptr = std::alloc::alloc(layout).cast();
 
                let rewrite: *mut *mut T = transmute(read_lock.data.as_ptr());
 
                *rewrite.add(index as usize) = target_ptr;
 
            }
 

	
 
            std::ptr::write(target_ptr, value);
 
        }
 
    }
 

	
 
    #[inline]
 
    fn push_freelist_index(&self, read_lock: &InnerShared<T>, index_to_put_back: u32) {
 
        // Acquire an index in the freelist to which we can write
 
        let mut cur_write_index = self.write_head.load(Ordering::Relaxed);
 
        let mut new_write_index = (cur_write_index + 1) & read_lock.compare_mask;
 
        while let Err(actual_write_index) = self.write_head.compare_exchange(
 
            cur_write_index, new_write_index,
 
            Ordering::AcqRel, Ordering::Acquire
 
        ) {
 
            cur_write_index = actual_write_index;
 
            new_write_index = (cur_write_index + 1) & read_lock.compare_mask;
 
        }
 

	
 
        // We own the data at the index, write to it and notify reader through
 
        // limit_head that it can be read from. Note that we cheat around the
 
        // rust mutability system here :)
 
        unsafe {
 
            let target: *mut u32 = transmute(read_lock.freelist.as_ptr());
 
            *(target.add(cur_write_index & read_lock.index_mask)) = index_to_put_back;
 
        }
 

	
 
        // Essentially spinlocking, relaxed failure ordering because the logic
 
        // is that a write first moves the `write_head`, then the `limit_head`.
 
        while let Err(_) = self.limit_head.compare_exchange(
 
            cur_write_index, new_write_index,
 
            Ordering::AcqRel, Ordering::Relaxed
 
        ) {};
 
    }
 

	
 
    #[inline]
 
    fn destruct_at_index(&self, read_lock: &InnerShared<T>, index: u32) {
 
        let target_ptr = read_lock.data[index as usize];
 
        unsafe{ ptr::drop_in_place(target_ptr); }
 
    }
 

	
 
    // NOTE: Bit of a mess, and could have a cleanup with better logic for the
 
    // resizing. Maybe even a different indexing scheme...
 
    fn reallocate(&self, old_size: usize, inner: InnerShared<T>) -> InnerShared<T> {
 
        drop(inner);
 
        {
 
            // After dropping read lock, acquire write lock
 
            let mut lock = self.inner.lock_exclusive();
 

	
 
            if old_size == lock.size {
 
                // We are the thread that is supposed to reallocate
 
                let new_size = old_size * 2;
 
                Self::assert_valid_size(new_size);
 

	
 
                // Note that the atomic indices are in the range [0, new_size)
 
                // already, so we need to be careful
 
                let new_index_mask = new_size - 1;
 
                let new_compare_mask = (2 * new_size) - 1;
 
                lock.data.resize(new_size, ptr::null_mut());
 
                lock.freelist.resize(new_size, 0);
 
                for idx in 0..old_size {
 
                    lock.freelist[old_size + idx] = lock.freelist[idx];
 
                }
 

	
 
                // We need to fill the freelist with the indices of all of the
 
                // new elements that we have just created.
 
                debug_assert_eq!(self.limit_head.load(Ordering::SeqCst), self.write_head.load(Ordering::SeqCst));
 
                let old_read_index = self.read_head.load(Ordering::SeqCst);
 
                let old_write_index = self.write_head.load(Ordering::SeqCst);
 

	
 
                if old_read_index > old_write_index {
 
                    // Read index wraps, so keep it as-is and fill
 
                    let new_read_index = old_read_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (new_read_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    self.read_head.store(new_read_index, Ordering::SeqCst);
 
                    debug_assert!(new_read_index < 2*new_size);
 
                    debug_assert!(old_write_index.wrapping_sub(new_read_index) & new_compare_mask <= new_size);
 
                } else {
 
                    // No wrapping, so increment write index
 
                    let new_write_index = old_write_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (old_write_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    // Update write/limit heads
 
                    self.write_head.store(new_write_index, Ordering::SeqCst);
 
                    self.limit_head.store(new_write_index, Ordering::SeqCst);
 
                    debug_assert!(new_write_index < 2*new_size);
 
                    debug_assert!(new_write_index.wrapping_sub(old_read_index) & new_compare_mask <= new_size);
 
                }
 

	
 
                // Update sizes and masks
 
                lock.size = new_size;
 
                lock.compare_mask = new_compare_mask;
 
                lock.index_mask = new_index_mask;
 
            } // else: someone else allocated, so we don't have to
 
        }
 

	
 
        // We've dropped the write lock, acquire the read lock again
 
        return self.inner.lock_shared();
 
    }
 

	
 
    #[inline]
 
    fn assert_valid_size(size: usize) {
 
        // Condition the size needs to adhere to. Some are a bit excessive, but
 
        // we don't hit this check very often
 
        assert!(
 
            size.is_power_of_two() &&
 
                size >= 4 &&
 
                size <= usize::MAX / 2 &&
 
                size <= u32::MAX as usize
 
        );
 
    }
 
}
 

	
 
impl<T: Sized> Drop for ComponentStore<T> {
 
    fn drop(&mut self) {
 
        let value_layout = Layout::from_size_align(
 
            std::mem::size_of::<T>(), std::mem::align_of::<T>()
 
        ).unwrap();
 

	
 
        // Note that if the indices exist in the freelist then the destructor
 
        // has already been called. So handle them first
 
        let mut lock = self.inner.lock_exclusive();
 

	
 
        let read_index = self.read_head.load(Ordering::Acquire);
 
        let write_index = self.write_head.load(Ordering::Acquire);
 
        debug_assert_eq!(write_index, self.limit_head.load(Ordering::Acquire));
 

	
 
        let mut index = read_index;
 
        while index != write_index {
 
            let dealloc_index = lock.freelist[index & lock.index_mask] as usize;
 
            let target_ptr = lock.data[dealloc_index];
 

	
 
            unsafe {
 
                dealloc(target_ptr.cast(), value_layout);
 
                lock.data[dealloc_index] = ptr::null_mut();
 
            }
 

	
 
            index += 1;
 
            index &= lock.compare_mask;
 
        }
 

	
 
        // With all of those set to null, we'll just iterate through all
 
        // pointers and destruct+deallocate the ones not set to null yet
 
        for target_ptr in lock.data.iter().copied() {
 
            if !target_ptr.is_null() {
 
                unsafe {
 
                    ptr::drop_in_place(target_ptr);
 
                    dealloc(target_ptr.cast(), value_layout);
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 
    use super::super::tests::*;
 

	
 
    use rand::prelude::*;
 
    use rand_pcg::Pcg32;
 

	
 
    fn seeds() -> Vec<[u8;16]> {
 
        return vec![
 
            [241, 47, 70, 87, 240, 246, 20, 173, 219, 143, 74, 23, 158, 58, 205, 172],
 
            [178, 112, 230, 205, 230, 178, 2, 90, 162, 218, 49, 196, 224, 222, 208, 43],
 
            [245, 42, 35, 167, 153, 205, 221, 144, 200, 253, 144, 117, 176, 231, 17, 70],
 
            [143, 39, 177, 216, 124, 96, 225, 39, 30, 82, 239, 193, 133, 58, 255, 193],
 
            [25, 105, 10, 52, 161, 212, 190, 112, 178, 193, 68, 249, 167, 153, 172, 144],
 
        ]
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_simple_unthreaded() {
 
        const NUM_ROUNDS: usize = 5;
 
        const NUM_ELEMENTS: usize = 1024;
 

	
 
        let store = ComponentStore::new(32);
 
        let counters = Counters::new();
 

	
 
        let mut indices = Vec::with_capacity(NUM_ELEMENTS);
 
        for _round_index in 0..NUM_ROUNDS {
 
            // Creation round
 
            for value in 0..NUM_ELEMENTS {
 
                let new_resource = Resource::new(&counters, value as u64);
 
                let new_index = store.create(new_resource);
 
                indices.push(new_index);
 
            }
 

	
 
            // Checking round
 
            for el_index in indices.iter().copied() {
 
                let element = store.get(el_index);
 
                assert_eq!(element.val, el_index as u64);
 
            }
 

	
 
            // Destruction round
 
            for el_index in indices.iter().copied() {
 
                store.destroy(el_index);
 
            }
 

	
 
            indices.clear();
 
        }
 

	
 
        let num_expected = (NUM_ROUNDS * NUM_ELEMENTS) as u64;
 
        assert_ctor_eq!(counters, num_expected);
 
        assert_dtor_eq!(counters, num_expected);
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_simple_threaded() {
 
        const MAX_SIZE: usize = 1024;
 
        const NUM_THREADS: usize = 4;
 
        const NUM_PER_THREAD: usize = MAX_SIZE / NUM_THREADS;
 
        const NUM_ROUNDS: usize = 4;
 

	
 
        assert!(MAX_SIZE % NUM_THREADS == 0);
 

	
 
        let store = Arc::new(ComponentStore::new(16));
 
        let counters = Counters::new();
 

	
 
        let mut threads = Vec::with_capacity(NUM_THREADS);
 
        for thread_index in 0..NUM_THREADS {
 
            // Setup local clones to move into the thread
 
            let store = store.clone();
 
            let first_index = thread_index * NUM_PER_THREAD;
 
            let last_index = (thread_index + 1) * NUM_PER_THREAD;
 
            let counters = counters.clone();
 

	
 
            let handle = std::thread::spawn(move || {
 
                let mut indices = Vec::with_capacity(last_index - first_index);
 
                for _round_index in 0..NUM_ROUNDS {
 
                    // Creation round
 
                    for value in first_index..last_index {
 
                        let el_index = store.create(Resource::new(&counters, value as u64));
 
                        indices.push(el_index);
 
                    }
 

	
 
                    // Checking round
 
                    for (value_offset, el_index) in indices.iter().copied().enumerate() {
 
                        let element = store.get(el_index);
 
                        assert_eq!(element.val, (first_index + value_offset) as u64);
 
                    }
 

	
 
                    // Destruction round
 
                    for el_index in indices.iter().copied() {
 
                        store.destroy(el_index);
 
                    }
 

	
 
                    indices.clear();
 
                }
 
            });
 
            threads.push(handle);
 
        }
 

	
 
        for thread in threads {
 
            thread.join().expect("clean exit");
 
        }
 

	
 
        let num_expected = (NUM_ROUNDS * MAX_SIZE) as u64;
 
        assert_ctor_eq!(counters, num_expected);
 
        assert_dtor_eq!(counters, num_expected);
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_random_threaded() {
 
        const NUM_ROUNDS: usize = 4;
 
        const NUM_THREADS: usize = 4;
 
        const NUM_OPERATIONS: usize = 1024;
 
        const NUM_OPS_PER_THREAD: usize = NUM_OPERATIONS / NUM_THREADS;
 
        const NUM_OPS_PER_ROUND: usize = NUM_OPS_PER_THREAD / NUM_ROUNDS;
 
        const NUM_STORED_PER_THREAD: usize = 32;
 

	
 
        assert!(NUM_OPERATIONS % NUM_THREADS == 0);
 
        assert!(NUM_OPS_PER_THREAD / 2 > NUM_STORED_PER_THREAD);
 

	
 
        let seeds = seeds();
 
        for seed_index in 0..seeds.len() {
 
            // Setup store, counters and threads
 
            let store = Arc::new(ComponentStore::new(16));
 
            let counters = Counters::new();
 

	
 
            let mut threads = Vec::with_capacity(NUM_THREADS);
 
            for thread_index in 0..NUM_THREADS {
 
                // Setup local clones to move into the thread
 
                let store = store.clone();
 
                let counters = counters.clone();
 

	
 
                // Setup local rng
 
                let mut seed = seeds[seed_index];
 
                for seed_val_idx in 0..16 {
 
                    seed[seed_val_idx] ^= thread_index as u8; // blegh
 
                }
 
                let mut rng = Pcg32::from_seed(seed);
 

	
 
                let handle = std::thread::spawn(move || {
0 comments (0 inline, 0 general)