Changeset - e7df1d2ae35f
[Not reviewed]
0 7 1
mh - 3 years ago 2022-01-27 19:51:05
contact@maxhenger.nl
WIP: Updated port management to be more maintainable
8 files changed with 469 insertions and 418 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -17,12 +17,6 @@ impl PortId {
 
    }
 
}
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
@@ -36,15 +30,6 @@ pub enum PortState {
 
    Closed,
 
}
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    pub self_id: PortId,
 
    pub peer_id: PortId, // eventually consistent
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub peer_comp_id: CompId, // eventually consistent
 
}
 

	
 
pub struct Channel {
 
    pub putter_id: PortId,
 
    pub getter_id: PortId,
src/runtime2/component/component_context.rs
Show inline comments
 
new file 100644
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    pub self_id: PortId,
 
    pub peer_comp_id: CompId, // eventually consistent
 
    pub peer_port_id: PortId, // eventually consistent
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool,
 
}
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
/// Port and peer management structure. Will keep a local reference counter to
 
/// the ports associate with peers, additionally manages the atomic reference
 
/// counter associated with the peers' component handles.
 
pub struct CompCtx {
 
    pub id: CompId,
 
    ports: Vec<Port>,
 
    peers: Vec<Peer>,
 
    port_id_counter: u32,
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub struct LocalPortHandle(PortId);
 

	
 
#[derive(Copy, Clone)]
 
pub struct LocalPeerHandle(CompId);
 

	
 
impl CompCtx {
 
    /// Creates a new component context based on a reserved entry in the
 
    /// component store. This reservation is used such that we already know our
 
    /// assigned ID.
 
    pub(crate) fn new(reservation: &CompReserved) -> Self {
 
        return Self{
 
            id: reservation.id(),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            port_id_counter: 0,
 
        }
 
    }
 

	
 
    /// Creates a new channel that is fully owned by the component associated
 
    /// with this context.
 
    pub(crate) fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_port_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_port_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            associated_with_peer: false,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adds a new port. Make sure to call `add_peer` afterwards.
 
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
 
    }
 

	
 
    /// Removes a port. Make sure you called `remove_peer` first.
 
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
 
        let port_index = self.must_get_port_index(port_handle);
 
        let port = self.ports.remove(port_index);
 
        debug_assert!(!port.associated_with_peer);
 
        return port;
 
    }
 

	
 
    /// Adds a new peer. This must be called for every port, no matter the
 
    /// component the channel is connected to. If a `CompHandle` is supplied,
 
    /// then it will be used to add the peer. Otherwise it will be retrieved
 
    /// from the runtime using its ID.
 
    pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) {
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_comp_id);
 
        debug_assert!(!port.associated_with_peer);
 
        if !self.requires_peer_reference(port) {
 
            return;
 
        }
 

	
 
        dbg_code!(port.associated_with_peer = true);
 
        match self.get_peer_index_by_id(peer_comp_id) {
 
            Some(peer_index) => {
 
                let peer = &mut self.peers[peer_index];
 
                peer.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = match handle {
 
                    Some(handle) => handle.clone(),
 
                    None => sched_ctx.runtime.get_component_public(peer_comp_id)
 
                };
 
                self.peers.push(Peer{
 
                    id: peer_comp_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                });
 
            }
 
        }
 
    }
 

	
 
    /// Removes a peer associated with a port.
 
    pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId) {
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_id);
 
        if !self.requires_peer_reference(port) {
 
            return;
 
        }
 

	
 
        debug_assert!(port.associated_with_peer);
 
        dbg_code!(port.associated_with_peer = false);
 
        let peer_index = self.get_peer_index_by_id(peer_id).unwrap();
 
        let peer = &mut self.peers[peer_index];
 
        peer.num_associated_ports -= 1;
 
        if peer.num_associated_ports == 0 {
 
            let mut peer = self.peers.remove(peer_index);
 
            if let Some(key) = peer.handle.decrement_users() {
 
                debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component
 
                sched_ctx.runtime.destroy_component(key);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) {
 
        let port_info = self.get_port_mut(port_handle);
 
        debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state
 
        port_info.state = new_state;
 
    }
 

	
 
    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
 
        return LocalPortHandle(port_id);
 
    }
 

	
 
    // should perhaps be revised, used in main inbox
 
    pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize {
 
        return self.must_get_port_index(port_handle);
 
    }
 

	
 
    pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle {
 
        return LocalPeerHandle(peer_id);
 
    }
 

	
 
    pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &self.peers[index];
 
    }
 

	
 
    pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &mut self.peers[index];
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports(&self) -> impl Iterator<Item=&Port> {
 
        return self.ports.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_peers(&self) -> impl Iterator<Item=&Peer> {
 
        return self.peers.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn num_ports(&self) -> usize {
 
        return self.ports.len();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Local utilities
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    fn requires_peer_reference(&self, port: &Port) -> bool {
 
        return port.state == PortState::Closed;
 
    }
 

	
 
    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn get_peer_index_by_id(&self, comp_id: CompId) -> Option<usize> {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == comp_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn take_port_id(&mut self) -> u32 {
 
        let port_id = self.port_id_counter;
 
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
 
        return port_id;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -11,6 +11,7 @@ use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::*;
 
use super::component_context::*;
 
use super::control_layer::*;
 
use super::consensus::Consensus;
 

	
 
@@ -21,159 +22,6 @@ pub enum CompScheduling {
 
    Exit,
 
}
 

	
 
pub struct CompCtx {
 
    pub id: CompId,
 
    pub ports: Vec<Port>,
 
    pub peers: Vec<Peer>,
 
    pub messages: Vec<Option<DataMessage>>, // same size as "ports"
 
    pub port_id_counter: u32,
 
}
 

	
 
impl CompCtx {
 
    pub(crate) fn new(reservation: &CompReserved) -> Self {
 
        return Self{
 
            id: reservation.id(),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            messages: Vec::new(),
 
            port_id_counter: 0,
 
        }
 
    }
 
}
 

	
 
struct MessageView<'a> {
 
    index: usize,
 
    pub message: &'a DataMessage,
 
}
 

	
 
impl CompCtx {
 
    /// Creates a new channel that is fully owned by the component associated
 
    /// with this context.
 
    fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adopts a port transferred by another component. Essentially copies all
 
    /// port data but creates a new ID. Caller should ensure that the other
 
    /// endpoint becomes aware of this ID.
 
    fn adopt_port(&mut self, to_transfer: &Port) -> &mut Port {
 
        let port_id = PortId(self.take_port_id());
 
        let port_index = self.ports.len();
 
        self.ports.push(Port{
 
            self_id: port_id,
 
            peer_id: to_transfer.peer_id,
 
            kind: to_transfer.kind,
 
            state: to_transfer.state,
 
            peer_comp_id: to_transfer.peer_comp_id,
 
        });
 
        return &mut self.ports[port_index];
 
    }
 

	
 
    /// Adds a peer (or increments the "associated port" counter). Hence caller
 
    /// must make sure that this makes sense.
 
    fn add_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId, peer_handle: Option<&CompHandle>) {
 
        match self.get_peer_index(peer_id) {
 
            Some(peer_index) => {
 
                let peer_info = &mut self.peers[peer_index];
 
                peer_info.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = if let Some(handle) = peer_handle {
 
                    handle.clone()
 
                } else {
 
                    sched_ctx.runtime.get_component_public(peer_id)
 
                };
 

	
 
                self.peers.push(Peer{
 
                    id: peer_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                })
 
            }
 
        }
 
    }
 

	
 
    /// Removes a peer (or decrements the "associated port" counter). If there
 
    /// are no more references to the peer then the handle will be destroyed.
 
    fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId) {
 
        let peer_index = self.get_peer_index(peer_id).unwrap();
 
        let peer_info = &mut self.peers[peer_index];
 
        peer_info.num_associated_ports -= 1;
 

	
 
        if peer_info.num_associated_ports == 0 {
 
            let mut peer = self.peers.remove(peer_index);
 
            let should_remove = peer.handle.decrement_users();
 
            if should_remove {
 
                let key = unsafe{ peer.id.upgrade() };
 
                sched_ctx.runtime.destroy_component(key);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn get_port(&self, port_id: PortId) -> &Port {
 
        let index = self.get_port_index(port_id).unwrap();
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, port_id: PortId) -> &mut Port {
 
        let index = self.get_port_index(port_id).unwrap();
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_index(&self, port_id: PortId) -> Option<usize> {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_id == port_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub(crate) fn get_peer(&self, peer_id: CompId) -> &Peer {
 
        let index = self.get_peer_index(peer_id).unwrap();
 
        return &self.peers[index];
 
    }
 

	
 
    fn get_peer_mut(&mut self, peer_id: CompId) -> &mut Peer {
 
        let index = self.get_peer_index(peer_id).unwrap();
 
        return &mut self.peers[index];
 
    }
 

	
 
    pub(crate) fn get_peer_index(&self, peer_id: CompId) -> Option<usize> {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == peer_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn take_port_id(&mut self) -> u32 {
 
        let port_id = self.port_id_counter;
 
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
 
        return port_id;
 
    }
 
}
 

	
 
pub enum ExecStmt {
 
    CreatedChannel((Value, Value)),
 
    PerformedPut,
 
@@ -242,6 +90,7 @@ pub(crate) enum Mode {
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet,
 
    BlockedPut,
 
    StartExit, // temp state
 
    Exit,
 
}
 

	
 
@@ -250,7 +99,7 @@ impl Mode {
 
        match self {
 
            Mode::NonSync | Mode::Sync =>
 
                return true,
 
            Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::Exit =>
 
            Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::StartExit | Mode::Exit =>
 
                return false,
 
        }
 
    }
 
@@ -303,7 +152,7 @@ impl CompPDL {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(!_should_remove);
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
@@ -327,6 +176,11 @@ impl CompPDL {
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        if self.mode == Mode::StartExit {
 
            self.mode = Mode::Exit;
 
            return Ok(CompScheduling::Exit);
 
        }
 

	
 
        let can_run = self.mode.can_run();
 
        sched_ctx.log(&format!("Running component (mode: {:?}, can run: {})", self.mode, can_run));
 
        if !can_run {
 
@@ -349,13 +203,15 @@ impl CompPDL {
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        self.handle_received_data_message(sched_ctx, comp_ctx, port_handle);
 

	
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return Ok(CompScheduling::Immediate);
 
@@ -373,11 +229,12 @@ impl CompPDL {
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                let port_info = comp_ctx.get_port(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                if port_info.state == PortState::Blocked {
 
                    todo!("handle blocked port");
 
                }
 
                self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, value);
 
                self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value);
 
                self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                return Ok(CompScheduling::Immediate);
 
            },
 
@@ -393,7 +250,7 @@ impl CompPDL {
 
            },
 
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.create_component_and_transfer_ports2(
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
@@ -436,11 +293,13 @@ impl CompPDL {
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Option<CompScheduling> {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        self.mode = Mode::SyncEnd;
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision)
 
    }
 

	
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) -> Option<CompScheduling> {
 
        debug_assert_eq!(self.mode, Mode::Sync);
 
        debug_assert_eq!(self.mode, Mode::SyncEnd);
 
        sched_ctx.log(&format!("Handling sync decision: {:?}", decision));
 
        let is_success = match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
@@ -451,14 +310,14 @@ impl CompPDL {
 
        };
 

	
 
        // If here then we've reached a decision
 
        self.mode = Mode::NonSync;
 
        if is_success {
 
            self.mode = Mode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
            return None;
 
        } else {
 
            todo!("handle this better, show some kind of error");
 
            self.mode = Mode::Exit;
 
            self.handle_component_exit(sched_ctx, comp_ctx);
 
            self.mode = Mode::Exit;
 
            return Some(CompScheduling::Exit);
 
        }
 
    }
 
@@ -476,9 +335,10 @@ impl CompPDL {
 
    // Handling messages
 
    // -------------------------------------------------------------------------
 

	
 
    fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) {
 
        let port_info = comp_ctx.get_port(source_port_id);
 
        let peer_info = comp_ctx.get_peer(port_info.peer_comp_id);
 
    fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) {
 
        let port_info = comp_ctx.get_port(source_port_handle);
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true);
 
    }
 
@@ -490,13 +350,14 @@ impl CompPDL {
 
        // Check if we can insert it directly into the storage associated with
 
        // the port
 
        let target_port_id = message.data_header.target_port;
 
        let port_index = comp_ctx.get_port_index(target_port_id).unwrap();
 
        let port_handle = comp_ctx.get_port_handle(target_port_id);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 

	
 
            // After direct insertion, check if this component's execution is 
 
            // blocked on receiving a message on that port
 
            debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly
 
            debug_assert_ne!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // because we could insert directly
 
            if self.mode == Mode::BlockedGet && self.mode_port == target_port_id {
 
                // We were indeed blocked
 
                self.mode = Mode::Sync;
 
@@ -507,17 +368,16 @@ impl CompPDL {
 
        }
 

	
 
        // The direct inbox is full, so the port will become (or was already) blocked
 
        let port_info = &mut comp_ctx.ports[port_index];
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 
        let _peer_comp_id = port_info.peer_comp_id;
 

	
 
        if port_info.state == PortState::Open {
 
            let (target_comp_id, block_message) =
 
                self.control.set_port_and_peer_blocked(target_port_id, comp_ctx);
 
            debug_assert_eq!(_peer_comp_id, target_comp_id);
 
            comp_ctx.set_port_state(port_handle, PortState::Blocked);
 
            let (peer_handle, message) =
 
                self.control.initiate_port_blocking(comp_ctx, port_handle);
 

	
 
            let peer = comp_ctx.get_peer(target_comp_id);
 
            peer.handle.send_message(sched_ctx, Message::Control(block_message), true);
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        // But we still need to remember the message, so:
 
@@ -528,36 +388,38 @@ impl CompPDL {
 
    /// code. We check to see if there are more messages waiting and, if not,
 
    /// then we handle the case where the port might have been blocked
 
    /// previously.
 
    fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) {
 
        let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
        debug_assert!(self.inbox_main[port_index].is_none()); // because we just received it
 
    fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) {
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        debug_assert!(self.inbox_main[port_index].is_none()); // this function should be called after the message is taken out
 

	
 
        // Check for any more messages
 
        let port_info = comp_ctx.get_port(port_handle);
 
        for message_index in 0..self.inbox_backup.len() {
 
            let message = &self.inbox_backup[message_index];
 
            if message.data_header.target_port == port_id {
 
            if message.data_header.target_port == port_info.self_id {
 
                // One more message for this port
 
                let message = self.inbox_backup.remove(message_index);
 
                debug_assert_eq!(comp_ctx.get_port(port_id).state, PortState::Blocked); // since we had >1 message on the port
 
                debug_assert_eq!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // since we had >1 message on the port
 
                self.inbox_main[port_index] = Some(message);
 

	
 
                return;
 
            }
 
        }
 

	
 
        // Did not have any more messages. So if we were blocked, then we need
 
        // to send the "unblock" message.
 
        let port_info = &comp_ctx.ports[port_index];
 
        if port_info.state == PortState::Blocked {
 
            let (peer_comp_id, message) = self.control.set_port_and_peer_unblocked(port_id, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer_comp_id);
 
            comp_ctx.set_port_state(port_handle, PortState::Open);
 
            let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 
    }
 

	
 
    fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) {
 
        // Little local utility to send an Ack
 
        fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_port_id: PortId, peer_comp_id: CompId) {
 
            let peer_info = comp_ctx.get_peer(peer_comp_id);
 
        fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_handle: LocalPeerHandle) {
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{
 
                id: causer_id,
 
                sender_comp_id: comp_ctx.id,
 
@@ -578,7 +440,7 @@ impl CompPDL {
 
                            let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                            handle.send_message(sched_ctx, Message::Control(message), true);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(!_should_remove);
 
                            debug_assert!(_should_remove.is_none());
 
                            to_ack = new_to_ack;
 
                        },
 
                        AckAction::ScheduleComponent(to_schedule) => {
 
@@ -591,7 +453,7 @@ impl CompPDL {
 
                            let key = unsafe{ to_schedule.upgrade() };
 
                            sched_ctx.runtime.enqueue_work(key);
 
                            let _should_remove = handle.decrement_users();
 
                            debug_assert!(!_should_remove);
 
                            debug_assert!(_should_remove.is_none());
 
                            break;
 
                        },
 
                        AckAction::None => {
 
@@ -603,45 +465,32 @@ impl CompPDL {
 
            ControlMessageContent::BlockPort(port_id) => {
 
                // On of our messages was accepted, but the port should be
 
                // blocked.
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                if port_info.state != PortState::Closed {
 
                    debug_assert_ne!(port_info.state, PortState::Blocked); // implies unnecessary messages
 
                    port_info.state = PortState::Blocked;
 
                    comp_ctx.set_port_state(port_handle, PortState::Blocked);
 
                }
 
            },
 
            ControlMessageContent::ClosePort(port_id) => {
 
                // Request to close the port. We immediately comply and remove
 
                // the component handle as well
 
                let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
                let port_info = &mut comp_ctx.ports[port_index];
 
                let peer_port_id = port_info.peer_id;
 
                let peer_comp_id = port_info.peer_comp_id;
 
                port_info.state = PortState::Closed;
 

	
 
                let peer_index = comp_ctx.get_peer_index(peer_comp_id).unwrap();
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports -= 1;
 
                if peer_info.num_associated_ports == 0 {
 
                    // TODO: @Refactor clean up all these uses of "num_associated_ports"
 
                    let should_remove = peer_info.handle.decrement_users();
 
                    if should_remove {
 
                        let comp_key = unsafe{ peer_info.id.upgrade() };
 
                        sched_ctx.runtime.destroy_component(comp_key);
 
                    }
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
                let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
                    comp_ctx.peers.remove(peer_index);
 
                }
 

	
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id);
 
            }
 
                comp_ctx.set_port_state(port_handle, PortState::Closed);
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id);
 
            },
 
            ControlMessageContent::UnblockPort(port_id) => {
 
                // We were previously blocked (or already closed)
 
                let port_info = comp_ctx.get_port(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed);
 
                if port_info.state == PortState::Blocked {
 
                    self.unblock_local_port(sched_ctx, comp_ctx, port_id);
 
                    self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle);
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
@@ -650,43 +499,50 @@ impl CompPDL {
 
                // potentially rerouting some of the in-flight messages) and
 
                // Ack. Then we wait for the `unblock` call.
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 
                if port_info.state == PortState::Open {
 
                    port_info.state = PortState::Blocked;
 
                }
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                comp_ctx.set_port_state(port_handle, PortState::Blocked);
 

	
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
                let peer_port_id = port_info.peer_id;
 
                let peer_comp_id = port_info.peer_comp_id;
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_port_id, peer_comp_id);
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle);
 
            },
 
            ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => {
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                let old_peer_comp_id = port_info.peer_comp_id;
 
            ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
                let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap());
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert!(port_info.state == PortState::Blocked);
 
                let old_peer_id = port_info.peer_comp_id;
 

	
 
                comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id);
 

	
 
                let port_info = comp_ctx.get_port_mut(port_handle);
 
                port_info.peer_comp_id = new_comp_id;
 
                comp_ctx.add_peer(sched_ctx, new_comp_id, None);
 
                comp_ctx.remove_peer(sched_ctx, old_peer_comp_id);
 
                self.unblock_local_port(sched_ctx, comp_ctx, port_id);
 
                port_info.peer_port_id = new_port_id;
 
                comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None);
 
                self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle);
 
            }
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> Option<CompScheduling> {
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        return self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
        debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncEnd);
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
        if self.mode == Mode::Exit {
 
            // TODO: Bit hacky, move this around
 
            self.mode = Mode::StartExit;
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling ports
 
    // -------------------------------------------------------------------------
 

	
 
    /// Marks the local port as being unblocked. If the execution was blocked on
 
    /// sending a message over this port, then execution will continue and the
 
    /// message will be sent.
 
    fn unblock_local_port(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_id: PortId) {
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
    /// Unblocks a port, potentially continuing execution of the component, in
 
    /// response to a message that told us to unblock a previously blocked
 
    fn handle_unblock_port_instruction(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) {
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        let port_id = port_info.self_id;
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        port_info.state = PortState::Open;
 

	
 
@@ -696,19 +552,24 @@ impl CompPDL {
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            let mut replacement = ValueGroup::default();
 
            std::mem::swap(&mut replacement, &mut self.mode_value);
 
            self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement);
 
            self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, replacement);
 

	
 
            self.mode = Mode::Sync;
 
            self.mode_port = PortId::new_invalid();
 
        }
 
    }
 

	
 
    fn create_component_and_transfer_ports2(
 
    fn create_component_and_transfer_ports(
 
        &mut self,
 
        sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx,
 
        definition_id: DefinitionId, monomorph_index: i32, mut arguments: ValueGroup
 
    ) {
 
        struct PortPair{ creator: PortId, created: PortId }
 
        struct PortPair{
 
            creator_handle: LocalPortHandle,
 
            creator_id: PortId,
 
            created_handle: LocalPortHandle,
 
            created_id: PortId,
 
        }
 
        let mut port_id_pairs = Vec::new();
 

	
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
@@ -721,13 +582,20 @@ impl CompPDL {
 
        while let Some(port_reference) = arg_iter.next() {
 
            // Create port entry for new component
 
            let creator_port_id = port_reference.id;
 
            let creator_port = creator_ctx.get_port(creator_port_id);
 
            let created_port = created_ctx.adopt_port(creator_port);
 
            let creator_port_handle = creator_ctx.get_port_handle(creator_port_id);
 
            let creator_port = creator_ctx.get_port(creator_port_handle);
 
            let created_port_handle = created_ctx.add_port(
 
                creator_port.peer_comp_id, creator_port.peer_port_id,
 
                creator_port.kind, creator_port.state
 
            );
 
            let created_port = created_ctx.get_port(created_port_handle);
 
            let created_port_id = created_port.self_id;
 

	
 
            port_id_pairs.push(PortPair{
 
                creator: creator_port_id,
 
                created: created_port_id,
 
                creator_handle: creator_port_handle,
 
                creator_id: creator_port_id,
 
                created_handle: created_port_handle,
 
                created_id: created_port_id,
 
            });
 

	
 
            // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues)
 
@@ -749,31 +617,34 @@ impl CompPDL {
 
        let mut created_component_has_remote_peers = false;
 

	
 
        for pair in port_id_pairs.iter() {
 
            let creator_port_info = creator_ctx.get_port(pair.creator);
 
            let created_port_info = created_ctx.get_port_mut(pair.created);
 
            let creator_port_info = creator_ctx.get_port(pair.creator_handle);
 
            let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // Port peer is owned by the creator as well
 
                let created_peer_port_index = port_id_pairs
 
                    .iter()
 
                    .position(|v| v.creator == creator_port_info.peer_id);
 
                    .position(|v| v.creator_id == creator_port_info.peer_port_id);
 
                match created_peer_port_index {
 
                    Some(created_peer_port_index) => {
 
                        // Peer port moved to the new component as well
 
                        // Peer port moved to the new component as well. So
 
                        // adjust IDs appropriately.
 
                        let peer_pair = &port_id_pairs[created_peer_port_index];
 
                        created_port_info.peer_id = peer_pair.created;
 
                        created_port_info.peer_port_id = peer_pair.created_id;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                        todo!("either add 'self peer', or remove that idea from Ctx altogether")
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.add_peer(sched_ctx, creator_ctx.id, None);
 
                        created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None);
 
                    }
 
                }
 
            } else {
 
                // Peer is a different component
 
                let peer_info = creator_ctx.get_peer(created_port_info.peer_comp_id);
 
                created_ctx.add_peer(sched_ctx, peer_info.id, Some(&peer_info.handle));
 
                let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = creator_ctx.get_peer(peer_handle);
 
                created_ctx.add_peer(pair.created_handle, sched_ctx, peer_info.id, Some(&peer_info.handle));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 
@@ -797,28 +668,27 @@ impl CompPDL {
 
        // transfer messages in the main inbox.
 
        for pair in port_id_pairs.iter() {
 
            // Remove peer if appropriate
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator).unwrap();
 
            let creator_port_info = creator_ctx.ports.remove(creator_port_index);
 
            if creator_port_info.peer_comp_id != creator_ctx.id {
 
                creator_ctx.remove_peer(sched_ctx, creator_port_info.peer_comp_id);
 
            }
 
            let creator_port_info = creator_ctx.get_port(pair.creator_handle);
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator_handle);
 
            creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_port_info.peer_comp_id);
 
            creator_ctx.remove_port(pair.creator_handle);
 

	
 
            // Transfer any messages
 
            let created_port_index = created_ctx.get_port_index(pair.created).unwrap();
 
            let created_port_info = &created_ctx.ports[created_port_index];
 
            let created_port_index = created_ctx.get_port_index(pair.created_handle);
 
            let created_port_info = created_ctx.get_port(pair.created_handle);
 
            debug_assert!(component.code.inbox_main[created_port_index].is_none());
 
            if let Some(mut message) = self.inbox_main.remove(creator_port_index) {
 
                message.data_header.target_port = pair.created;
 
                message.data_header.target_port = pair.created_id;
 
                component.code.inbox_main[created_port_index] = Some(message);
 
            }
 

	
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                let message = &self.inbox_backup[message_index];
 
                if message.data_header.target_port == pair.creator {
 
                if message.data_header.target_port == pair.creator_id {
 
                    // transfer message
 
                    let mut message = self.inbox_backup.remove(message_index);
 
                    message.data_header.target_port = pair.created;
 
                    message.data_header.target_port = pair.created_id;
 
                    component.code.inbox_backup.push(message);
 
                } else {
 
                    message_index += 1;
 
@@ -827,9 +697,10 @@ impl CompPDL {
 

	
 
            // Handle potential channel between creator and created component
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                let peer_port_info = creator_ctx.get_port_mut(created_port_info.peer_id);
 
                let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id);
 
                let peer_port_info = creator_ctx.get_port_mut(peer_port_handle);
 
                peer_port_info.peer_comp_id = created_ctx.id;
 
                creator_ctx.add_peer(sched_ctx, created_ctx.id, None);
 
                creator_ctx.add_peer(pair.created_handle, sched_ctx, created_ctx.id, None);
 
            }
 
        }
 

	
 
@@ -838,14 +709,15 @@ impl CompPDL {
 
        if created_component_has_remote_peers {
 
            let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 
            for pair in port_id_pairs.iter() {
 
                let port_info = created_ctx.get_port(pair.created);
 
                let port_info = created_ctx.get_port(pair.created_handle);
 
                if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                    let message = self.control.add_reroute_entry(
 
                        creator_ctx.id, port_info.peer_id, port_info.peer_comp_id,
 
                        pair.creator, pair.created, created_ctx.id,
 
                        creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id,
 
                        pair.creator_id, pair.created_id, created_ctx.id,
 
                        schedule_entry_id
 
                    );
 
                    let peer_info = created_ctx.get_peer(port_info.peer_comp_id);
 
                    let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                    let peer_info = created_ctx.get_peer(peer_handle);
 
                    peer_info.handle.send_message(sched_ctx, message, true);
 
                }
 
            }
 
@@ -854,72 +726,6 @@ impl CompPDL {
 
            sched_ctx.runtime.enqueue_work(created_key);
 
        }
 
    }
 

	
 
    /// Removes a port from a component. Also decrements the port counter in
 
    /// the peer component's entry. If that hits 0 then it will be removed and
 
    /// returned. If returned then the caller is responsible for decrementing
 
    /// the atomic counters of the peer component's handle.
 
    fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option<Peer>) {
 
        let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
        let port_info = comp_ctx.ports.remove(port_index);
 

	
 
        // If the component owns the peer, then we don't have to decrement the
 
        // number of peers (because we don't have an entry for ourselves)
 
        if port_info.peer_comp_id == comp_ctx.id {
 
            return (port_info, None);
 
        }
 

	
 
        let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap();
 
        let peer_info = &mut comp_ctx.peers[peer_index];
 
        peer_info.num_associated_ports -= 1;
 

	
 
        // Check if we still have other ports referencing this peer
 
        if peer_info.num_associated_ports != 0 {
 
            return (port_info, None);
 
        }
 

	
 
        let peer_info = comp_ctx.peers.remove(peer_index);
 
        return (port_info, Some(peer_info));
 
    }
 

	
 
    /// Only adds/updates a peer for a given port. This function assumes (but
 
    /// does not check!) that the port was not considered to belong to that peer
 
    /// before calling this function.
 
    fn add_peer_associated_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, peer_id: CompId) {
 
        match comp_ctx.get_peer_index(peer_id) {
 
            Some(peer_index) => {
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = sched_ctx.runtime.get_component_public(peer_id);
 
                comp_ctx.peers.push(Peer{
 
                    id: peer_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                });
 
            }
 
        }
 
    }
 

	
 
    fn change_port_peer_component(
 
        &mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
        port_id: PortId, new_peer_comp_id: CompId
 
    ) {
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        let cur_peer_comp_id = port_info.peer_comp_id;
 
        let cur_peer_info = comp_ctx.get_peer_mut(cur_peer_comp_id);
 
        cur_peer_info.num_associated_ports -= 1;
 

	
 
        if cur_peer_info.num_associated_ports == 0 {
 
            let should_remove = cur_peer_info.handle.decrement_users();
 
            if should_remove {
 
                let cur_peer_comp_key = unsafe{ cur_peer_comp_id.upgrade() };
 
                sched_ctx.runtime.destroy_component(cur_peer_comp_key);
 

	
 
            }
 
        }
 
    }
 
}
 

	
 
#[inline]
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -4,6 +4,7 @@ use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::component_pdl::*;
 
use super::component_context::*;
 

	
 
pub struct PortAnnotation {
 
    self_comp_id: CompId,
 
@@ -286,7 +287,8 @@ impl Consensus {
 
        let mut local_solution = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                let port_info = comp_ctx.get_port(port.self_port_id);
 
                let port_handle = comp_ctx.get_port_handle(port.self_port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let new_entry = match port_info.kind {
 
                    PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{
 
                        self_comp_id: comp_ctx.id,
 
@@ -327,11 +329,11 @@ impl Consensus {
 

	
 
    fn make_ports_consistent_with_ctx(&mut self, comp_ctx: &CompCtx) {
 
        let mut needs_setting_ports = false;
 
        if comp_ctx.ports.len() != self.ports.len() {
 
        if comp_ctx.num_ports() != self.ports.len() {
 
            needs_setting_ports = true;
 
        } else {
 
            for idx in 0..comp_ctx.ports.len() {
 
                let comp_port_id = comp_ctx.ports[idx].self_id;
 
            for (idx, port) in comp_ctx.iter_ports().enumerate() {
 
                let comp_port_id = port.self_id;
 
                let cons_port_id = self.ports[idx].self_port_id;
 
                if comp_port_id != cons_port_id {
 
                    needs_setting_ports = true;
 
@@ -342,8 +344,8 @@ impl Consensus {
 

	
 
        if needs_setting_ports {
 
            self.ports.clear();
 
            self.ports.reserve(comp_ctx.ports.len());
 
            for port in &comp_ctx.ports {
 
            self.ports.reserve(comp_ctx.num_ports());
 
            for port in comp_ctx.iter_ports() {
 
                self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id))
 
            }
 
        }
 
@@ -419,7 +421,7 @@ impl Consensus {
 
            // Sender knows of someone with a higher ID. So store highest ID,
 
            // notify all peers, and forward local solutions
 
            self.highest_id = header.highest_id;
 
            for peer in &comp_ctx.peers {
 
            for peer in comp_ctx.iter_peers() {
 
                if peer.id == header.sending_id {
 
                    continue;
 
                }
 
@@ -438,7 +440,8 @@ impl Consensus {
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::NotificationOfLeader,
 
            };
 
            let peer_info = comp_ctx.get_peer(header.sending_id);
 
            let peer_handle = comp_ctx.get_peer_handle(header.sending_id);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Sync(message), true);
 
        } // else: exactly equal
 
    }
 
@@ -589,7 +592,7 @@ impl Consensus {
 
            expected_mapping,
 
            new_mapping,
 
            source_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
            target_port: port_info.peer_port_id,
 
        };
 
    }
 

	
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -2,6 +2,8 @@ use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::*;
 

	
 
use super::component_context::*;
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) struct ControlId(u32);
 

	
 
@@ -72,7 +74,7 @@ impl ControlLayer {
 
    }
 

	
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction {
 
        let entry_index = self.get_entry_index(entry_id).unwrap();
 
        let entry_index = self.get_entry_index_by_id(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 

	
 
@@ -111,22 +113,10 @@ impl ControlLayer {
 
            ControlContent::ClosedPort(port_id) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_index = comp_ctx.get_port_index(*port_id).unwrap();
 
                debug_assert_eq!(comp_ctx.ports[port_index].state, PortState::Blocked);
 
                let peer_id = comp_ctx.ports[port_index].peer_comp_id;
 
                let peer_index = comp_ctx.get_peer_index(peer_id).unwrap();
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports -= 1;
 

	
 
                if peer_info.num_associated_ports == 0 {
 
                    let should_remove = peer_info.handle.decrement_users();
 
                    if should_remove {
 
                        let comp_key = unsafe{ peer_info.id.upgrade() };
 
                        sched_ctx.runtime.destroy_component(comp_key);
 
                    }
 

	
 
                    comp_ctx.peers.remove(peer_index);
 
                }
 
                let port_handle = comp_ctx.get_port_handle(*port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert_eq!(port_info.state, PortState::Closed);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, port_info.peer_comp_id);
 

	
 
                return AckAction::None;
 
            }
 
@@ -153,7 +143,7 @@ impl ControlLayer {
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index(schedule_entry_id).unwrap();
 
        let index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
    }
 
@@ -198,9 +188,10 @@ impl ControlLayer {
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> {
 
        let port = comp_ctx.get_port_mut(port_id);
 
        let peer_port_id = port.peer_id;
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: PortHandle, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> {
 
        let port = comp_ctx.get_port_mut(port_handle);
 
        let port_id = port.self_id;
 
        let peer_port_id = port.peer_port_id;
 
        let peer_comp_id = port.peer_comp_id;
 
        debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked);
 

	
 
@@ -229,63 +220,67 @@ impl ControlLayer {
 
        ));
 
    }
 

	
 
    pub(crate) fn set_port_and_peer_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // TODO: Feels like this shouldn't be an entry. Hence this class should
 
        //  be renamed. Lets see where the code ends up being
 
        let entry_id = self.take_id();
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        let peer_port_id = port_info.peer_id;
 
    /// Adds a control entry to track that a port is blocked. Expects the caller
 
    /// to have set the port's state to blocking already. The returned tuple
 
    /// contains a message and the peer to send it to.
 
    pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block
 
        debug_assert_eq!(port_info.state, PortState::Blocked); // contract with caller
 

	
 
        let peer_port_id = port_info.peer_port_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues
 
        port_info.state = PortState::Blocked;
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0,
 
            content: ControlContent::BlockedPort(port_id),
 
            content: ControlContent::BlockedPort(port_info.self_id),
 
        });
 

	
 
        return (
 
            peer_comp_id,
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                target_port_id: Some(port_info.peer_port_id),
 
                content: ControlMessageContent::BlockPort(peer_port_id),
 
            }
 
        );
 
    }
 

	
 
    pub(crate) fn set_port_and_peer_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // Find the entry that contains the blocking entry for the port
 
        let mut entry_index = usize::MAX;
 
        let mut entry_id = ControlId::new_invalid();
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if let ControlContent::BlockedPort(blocked_port) = &entry.content {
 
                if *blocked_port == port_id {
 
                    entry_index = index;
 
                    entry_id = entry.id;
 
                    break;
 
    /// Removes the control entry that tracks that a port is blocked. Expects
 
    /// the caller to have already marked the port as unblocked. Again the
 
    /// returned tuple contains a message and the target it is intended for
 
    pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking
 
        debug_assert_eq!(port_info.state, PortState::Open); // contract with caller, the locally stored entry ensures we were blocked before
 

	
 
        let position = self.entries.iter()
 
            .position(|v| {
 
                if let ControlContent::BlockedPort(blocked_port_id) = &v.content {
 
                    if *blocked_port_id == port_info.self_id {
 
                        return true;
 
                    }
 
                }
 
            }
 
        }
 
                return false;
 
            })
 
            .unwrap();
 

	
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        let peer_port_id = port_info.peer_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we blocked it because of receiving too many messages
 
        port_info.state = PortState::Open;
 
        let entry = self.entries.remove(position);
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
        return (
 
            peer_comp_id,
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                id: entry.id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::UnblockPort(peer_port_id),
 
                target_port_id: Some(port_info.peer_port_id),
 
                content: ControlMessageContent::UnblockPort(port_info.peer_port_id)
 
            }
 
        )
 
        );
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
@@ -298,7 +293,7 @@ impl ControlLayer {
 
        return id;
 
    }
 

	
 
    fn get_entry_index(&self, entry_id: ControlId) -> Option<usize> {
 
    fn get_entry_index_by_id(&self, entry_id: ControlId) -> Option<usize> {
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if entry.id == entry_id {
 
                return Some(index);
src/runtime2/component/mod.rs
Show inline comments
 
mod component_pdl;
 
mod component_context;
 
mod control_layer;
 
mod consensus;
 

	
 
pub(crate) use component_pdl::{CompPDL, CompCtx, CompScheduling};
 
pub(crate) use component_pdl::{CompPDL, CompScheduling};
 
pub(crate) use component_context::CompCtx;
 
pub(crate) use control_layer::{ControlId};
 

	
 
use super::scheduler::*;
src/runtime2/runtime.rs
Show inline comments
 
@@ -3,10 +3,9 @@ use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::*;
 
use crate::runtime2::component::wake_up_if_sleeping;
 

	
 
use super::communication::Message;
 
use super::component::{CompCtx, CompPDL};
 
use super::component::{wake_up_if_sleeping, CompPDL, CompCtx};
 
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
 
use super::scheduler::*;
 

	
 
@@ -107,12 +106,17 @@ impl CompHandle {
 
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
 
    }
 

	
 
    /// Returns true if the component should be destroyed
 
    pub(crate) fn decrement_users(&mut self) -> bool {
 
    /// Returns the `CompKey` to the component if it should be destroyed
 
    pub(crate) fn decrement_users(&mut self) -> Option<CompKey> {
 
        debug_assert!(!self.decremented, "illegal to 'decrement_users' twice");
 
        dbg_code!(self.decremented = true);
 
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        return old_count == 1;
 
        let new_count = old_count - 1;
 
        if new_count == 0 {
 
            return Some(unsafe{ self.id.upgrade() });
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
@@ -132,6 +136,7 @@ impl std::ops::Deref for CompHandle {
 
    type Target = CompPublic;
 

	
 
    fn deref(&self) -> &Self::Target {
 
        debug_assert!(!self.decremented); // cannot access if control is relinquished
 
        return unsafe{ &*self.target };
 
    }
 
}
src/runtime2/scheduler.rs
Show inline comments
 
@@ -91,14 +91,24 @@ impl Scheduler {
 
    }
 

	
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
        // Send messages that all ports will be closed
 
        for port_index in 0..component.ctx.ports.len() {
 
            let port_info = &component.ctx.ports[port_index];
 
            if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.self_id, &mut component.ctx) {
 
            if let Some((peer_id, message)) = component.code.control.initiate_port_closing(port_info.self_id, &mut component.ctx) {
 
                let peer_info = component.ctx.get_peer(peer_id);
 
                peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
            }
 
        }
 

	
 
        // Remove all references to the peers that we have
 
        for mut peer in component.ctx.peers.drain(..) {
 
            let should_remove = peer.handle.decrement_users();
 
            if should_remove {
 
                let key = unsafe{ peer.id.upgrade() };
 
                sched_ctx.runtime.destroy_component(key);
 
            }
 
        }
 

	
 
        let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        let new_count = old_count - 1;
 
        if new_count == 0 {
0 comments (0 inline, 0 general)