Changeset - 3d271f7cfd7e
[Not reviewed]
0 4 0
MH - 3 years ago 2022-02-02 16:07:36
contact@maxhenger.nl
WIP: Fix bug due to conflicting closed-port states
4 files changed with 71 insertions and 54 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use super::runtime::*;
 
use super::component::*;
 

	
 
// -----------------------------------------------------------------------------
 
// Generic types
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct PortId(pub u32);
 

	
 
impl PortId {
 
    /// This value is not significant, it is chosen to make debugging easier: a
 
    /// very large port number is more likely to shine a light on bugs.
 
    pub fn new_invalid() -> Self {
 
        return Self(u32::MAX);
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub enum PortState {
 
    Open,
 
    Blocked,
 
    BlockedDueToPeerChange,
 
    BlockedDueToFullBuffers,
 
    Closed,
 
}
 

	
 
impl PortState {
 
    pub fn is_blocked(&self) -> bool {
 
        match self {
 
            PortState::BlockedDueToPeerChange | PortState::BlockedDueToFullBuffers => true,
 
            PortState::Open | PortState::Closed => false,
 
        }
 
    }
 
}
 

	
 
pub struct Channel {
 
    pub putter_id: PortId,
 
    pub getter_id: PortId,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Data messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct DataMessage {
 
    pub data_header: MessageDataHeader,
 
    pub sync_header: MessageSyncHeader,
 
    pub content: ValueGroup,
 
}
 

	
 
#[derive(Debug)]
 
pub struct MessageDataHeader {
 
    pub expected_mapping: Vec<(PortId, Option<u32>)>,
 
    pub new_mapping: u32,
 
    pub source_port: PortId,
 
    pub target_port: PortId,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Sync messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct SyncMessage {
 
    pub sync_header: MessageSyncHeader,
 
    pub content: SyncMessageContent,
 
}
 

	
 
#[derive(Debug)]
 
pub enum SyncLocalSolutionEntry {
 
    Putter(SyncSolutionPutterPort),
 
    Getter(SyncSolutionGetterPort),
 
}
 

	
 
pub type SyncLocalSolution = Vec<SyncLocalSolutionEntry>;
 

	
 
/// Getter port in a solution. Upon receiving a message it is certain about who
 
/// its peer is.
 
#[derive(Debug)]
 
pub struct SyncSolutionGetterPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
    pub mapping: u32,
 
}
 

	
 
/// Putter port in a solution. A putter may not be certain about who its peer
 
/// component/port is.
 
#[derive(Debug)]
 
pub struct SyncSolutionPutterPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub mapping: u32,
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncSolutionChannel {
 
    pub putter: Option<SyncSolutionPutterPort>,
 
    pub getter: Option<SyncSolutionGetterPort>,
 
}
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum SyncRoundDecision {
 
    None,
 
    Solution,
 
    Failure,
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncPartialSolution {
 
    pub channel_mapping: Vec<SyncSolutionChannel>,
 
    pub decision: SyncRoundDecision,
 
}
 

	
 
impl Default for SyncPartialSolution {
 
    fn default() -> Self {
 
        return Self{
 
            channel_mapping: Vec::new(),
 
            decision: SyncRoundDecision::None,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub enum SyncMessageContent {
 
    NotificationOfLeader,
 
    LocalSolution(CompId, SyncLocalSolution), // local solution of the specified component
 
    PartialSolution(SyncPartialSolution), // partial solution of multiple components
 
    GlobalSolution,
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -140,535 +140,540 @@ impl CompPDL {
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Running component and handling changes in global component state
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.mode {
 
            Mode::NonSync | Mode::Sync => {},
 
            Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => {
 
                return Ok(CompScheduling::Sleep);
 
            }
 
            Mode::StartExit => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            Mode::BusyExit => {
 
                if self.control.has_acks_remaining() {
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.mode = Mode::Exit;
 
                    return Ok(CompScheduling::Exit);
 
                }
 
            },
 
            Mode::Exit => {
 
                return Ok(CompScheduling::Exit);
 
            }
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        self.handle_received_data_message(sched_ctx, comp_ctx, port_handle);
 

	
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return Ok(CompScheduling::Immediate);
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return Ok(CompScheduling::Sleep);
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
                    self.mode_port = port_id;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                if port_info.state == PortState::Blocked {
 
                    todo!("handle blocked port");
 
                if port_info.state.is_blocked() {
 
                    self.mode = Mode::BlockedPut;
 
                    self.mode_port = port_id;
 
                    self.mode_value = value;
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value);
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                    return Ok(CompScheduling::Immediate);
 
                }
 
                self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value);
 
                self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.mode = Mode::StartExit; // next call we'll take care of the exit
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                self.inbox_main.push(None);
 
                self.inbox_main.push(None);
 
                return Ok(CompScheduling::Immediate);
 
            }
 
        }
 
    }
 

	
 
    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
 
        let mut step_result = EvalContinuation::Stepping;
 
        while let EvalContinuation::Stepping = step_result {
 
            step_result = self.prompt.step(
 
                &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
                &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx,
 
            )?;
 
        }
 

	
 
        return Ok(step_result)
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component starting sync mode");
 
        self.consensus.notify_sync_start(comp_ctx);
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.mode = Mode::Sync;
 
    }
 

	
 
    /// Handles end of sync. The conclusion to the sync round might arise
 
    /// immediately (and be handled immediately), or might come later through
 
    /// messaging. In any case the component should be scheduled again
 
    /// immediately
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        self.mode = Mode::SyncEnd;
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    /// Handles decision from the consensus round. This will cause a change in
 
    /// the internal `Mode`, such that the next call to `run` can take the
 
    /// appropriate next steps.
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        debug_assert_eq!(self.mode, Mode::SyncEnd);
 
        sched_ctx.log(&format!("Handling sync decision: {:?}", decision));
 
        let is_success = match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return;
 
            },
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        // If here then we've reached a decision
 
        if is_success {
 
            self.mode = Mode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
        } else {
 
            self.mode = Mode::StartExit;
 
        }
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component exiting");
 
        debug_assert_eq!(self.mode, Mode::StartExit);
 
        self.mode = Mode::BusyExit;
 

	
 
        // Doing this by index, then retrieving the handle is a bit rediculous,
 
        // but Rust is being Rust with its borrowing rules.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port = comp_ctx.get_port_by_index_mut(port_index);
 
            if port.state == PortState::Closed {
 
                // Already closed, or in the process of being closed
 
                continue;
 
            }
 

	
 
            // Mark as closed
 
            let port_id = port.self_id;
 
            port.state = PortState::Closed;
 

	
 
            // Notify peer of closing
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling messages
 
    // -------------------------------------------------------------------------
 

	
 
    fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) {
 
        let port_info = comp_ctx.get_port(source_port_handle);
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true);
 
    }
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
    /// will handle putting it in the correct place, and potentially blocking
 
    /// the port in case too many messages are being received.
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        // Check if we can insert it directly into the storage associated with
 
        // the port
 
        let target_port_id = message.data_header.target_port;
 
        let port_handle = comp_ctx.get_port_handle(target_port_id);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 

	
 
            // After direct insertion, check if this component's execution is 
 
            // blocked on receiving a message on that port
 
            debug_assert_ne!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // because we could insert directly
 
            debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly
 
            if self.mode == Mode::BlockedGet && self.mode_port == target_port_id {
 
                // We were indeed blocked
 
                self.mode = Mode::Sync;
 
                self.mode_port = PortId::new_invalid();
 
            }
 
            
 
            return;
 
        }
 

	
 
        // The direct inbox is full, so the port will become (or was already) blocked
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 
        debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked());
 

	
 
        if port_info.state == PortState::Open {
 
            comp_ctx.set_port_state(port_handle, PortState::Blocked);
 
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            let (peer_handle, message) =
 
                self.control.initiate_port_blocking(comp_ctx, port_handle);
 

	
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        // But we still need to remember the message, so:
 
        self.inbox_backup.push(message);
 
    }
 

	
 
    /// Handles when a message has been handed off from the inbox to the PDL
 
    /// code. We check to see if there are more messages waiting and, if not,
 
    /// then we handle the case where the port might have been blocked
 
    /// previously.
 
    fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) {
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        debug_assert!(self.inbox_main[port_index].is_none()); // this function should be called after the message is taken out
 

	
 
        // Check for any more messages
 
        let port_info = comp_ctx.get_port(port_handle);
 
        for message_index in 0..self.inbox_backup.len() {
 
            let message = &self.inbox_backup[message_index];
 
            if message.data_header.target_port == port_info.self_id {
 
                // One more message for this port
 
                let message = self.inbox_backup.remove(message_index);
 
                debug_assert_eq!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // since we had >1 message on the port
 
                debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we had >1 message on the port
 
                self.inbox_main[port_index] = Some(message);
 

	
 
                return;
 
            }
 
        }
 

	
 
        // Did not have any more messages. So if we were blocked, then we need
 
        // to send the "unblock" message.
 
        if port_info.state == PortState::Blocked {
 
        if port_info.state == PortState::BlockedDueToFullBuffers {
 
            comp_ctx.set_port_state(port_handle, PortState::Open);
 
            let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 
    }
 

	
 
    fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) {
 
        // Little local utility to send an Ack
 
        fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_handle: LocalPeerHandle) {
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{
 
                id: causer_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: None,
 
                content: ControlMessageContent::Ack,
 
            }), true);
 
        }
 

	
 
        // Handle the content of the control message, and optionally Ack it
 
        match message.content {
 
            ControlMessageContent::Ack => {
 
                self.handle_ack(sched_ctx, comp_ctx, message.id);
 
            },
 
            ControlMessageContent::BlockPort(port_id) => {
 
                // On of our messages was accepted, but the port should be
 
                // blocked.
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                if port_info.state != PortState::Closed {
 
                    comp_ctx.set_port_state(port_handle, PortState::Blocked);
 
                if port_info.state == PortState::Open {
 
                    // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes
 
                    comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
                }
 
            },
 
            ControlMessageContent::ClosePort(port_id) => {
 
                // Request to close the port. We immediately comply and remove
 
                // the component handle as well
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
                let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
                // One exception to sending an `Ack` is if we just closed the
 
                // port ourselves, meaning that the `ClosePort` messages got
 
                // sent to one another.
 
                if let Some(control_id) = self.control.has_close_port_entry(port_handle, comp_ctx) {
 
                    self.handle_ack(sched_ctx, comp_ctx, control_id);
 
                } else {
 
                    send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle);
 
                    comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
 
                    comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed
 
                }
 
            },
 
            ControlMessageContent::UnblockPort(port_id) => {
 
                // We were previously blocked (or already closed)
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed);
 
                if port_info.state == PortState::Blocked {
 
                if port_info.state == PortState::BlockedDueToFullBuffers {
 
                    self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle);
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
                // The peer of our port has just changed. So we are asked to
 
                // temporarily block the port (while our original recipient is
 
                // potentially rerouting some of the in-flight messages) and
 
                // Ack. Then we wait for the `unblock` call.
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                comp_ctx.set_port_state(port_handle, PortState::Blocked);
 
                comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange);
 

	
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle);
 
            },
 
            ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
                let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap());
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert!(port_info.state == PortState::Blocked);
 
                debug_assert!(port_info.state == PortState::BlockedDueToPeerChange);
 
                let old_peer_id = port_info.peer_comp_id;
 

	
 
                comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false);
 

	
 
                let port_info = comp_ctx.get_port_mut(port_handle);
 
                port_info.peer_comp_id = new_comp_id;
 
                port_info.peer_port_id = new_port_id;
 
                comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None);
 
                self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle);
 
            }
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncEnd);
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    /// Little helper that notifies the control layer of an `Ack`, and takes the
 
    /// appropriate subsequent action
 
    fn handle_ack(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control_id: ControlId) {
 
        let mut to_ack = control_id;
 
        loop {
 
            let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
            match action {
 
                AckAction::SendMessage(target_comp, message) => {
 
                    // FIX @NoDirectHandle
 
                    let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                    handle.send_message(sched_ctx, Message::Control(message), true);
 
                    let _should_remove = handle.decrement_users();
 
                    debug_assert!(_should_remove.is_none());
 
                },
 
                AckAction::ScheduleComponent(to_schedule) => {
 
                    // FIX @NoDirectHandle
 
                    let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 

	
 
                    // Note that the component is intentionally not
 
                    // sleeping, so we just wake it up
 
                    debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
 
                    let key = unsafe{ to_schedule.upgrade() };
 
                    sched_ctx.runtime.enqueue_work(key);
 
                    let _should_remove = handle.decrement_users();
 
                    debug_assert!(_should_remove.is_none());
 
                },
 
                AckAction::None => {}
 
            }
 

	
 
            match new_to_ack {
 
                Some(new_to_ack) => to_ack = new_to_ack,
 
                None => break,
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling ports
 
    // -------------------------------------------------------------------------
 

	
 
    /// Unblocks a port, potentially continuing execution of the component, in
 
    /// response to a message that told us to unblock a previously blocked
 
    fn handle_unblock_port_instruction(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) {
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        let port_id = port_info.self_id;
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        debug_assert!(port_info.state.is_blocked());
 
        port_info.state = PortState::Open;
 

	
 
        if self.mode == Mode::BlockedPut && port_id == self.mode_port {
 
            // We were blocked on the port that just became unblocked, so
 
            // send the message.
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            let mut replacement = ValueGroup::default();
 
            std::mem::swap(&mut replacement, &mut self.mode_value);
 
            self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, replacement);
 

	
 
            self.mode = Mode::Sync;
 
            self.mode_port = PortId::new_invalid();
 
        }
 
    }
 

	
 
    fn create_component_and_transfer_ports(
 
        &mut self,
 
        sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx,
 
        definition_id: DefinitionId, monomorph_index: i32, mut arguments: ValueGroup
 
    ) {
 
        struct PortPair{
 
            creator_handle: LocalPortHandle,
 
            creator_id: PortId,
 
            created_handle: LocalPortHandle,
 
            created_id: PortId,
 
        }
 
        let mut port_id_pairs = Vec::new();
 

	
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        // Take all the ports ID that are in the `args` (and currently belong to
 
        // the creator component) and translate them into new IDs that are
 
        // associated with the component we're about to create
 
        let mut arg_iter = ValueGroupIter::new(&mut arguments);
 
        while let Some(port_reference) = arg_iter.next() {
 
            // Create port entry for new component
 
            let creator_port_id = port_reference.id;
 
            let creator_port_handle = creator_ctx.get_port_handle(creator_port_id);
 
            let creator_port = creator_ctx.get_port(creator_port_handle);
 
            let created_port_handle = created_ctx.add_port(
 
                creator_port.peer_comp_id, creator_port.peer_port_id,
 
                creator_port.kind, creator_port.state
 
            );
 
            let created_port = created_ctx.get_port(created_port_handle);
 
            let created_port_id = created_port.self_id;
 

	
 
            port_id_pairs.push(PortPair{
 
                creator_handle: creator_port_handle,
 
                creator_id: creator_port_id,
 
                created_handle: created_port_handle,
 
                created_id: created_port_id,
 
            });
 

	
 
            // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues)
 
            let arg_value = if let Some(heap_pos) = port_reference.heap_pos {
 
                &mut arg_iter.group.regions[heap_pos][port_reference.index]
 
            } else {
 
                &mut arg_iter.group.values[port_reference.index]
 
            };
 
            match arg_value {
 
                Value::Input(id) => *id = port_id_to_eval(created_port_id),
 
                Value::Output(id) => *id = port_id_to_eval(created_port_id),
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // For each transferred port pair set their peer components to the
 
        // correct values. This will only change the values for the ports of
 
        // the new component.
 
        let mut created_component_has_remote_peers = false;
 

	
 
        for pair in port_id_pairs.iter() {
 
            let creator_port_info = creator_ctx.get_port(pair.creator_handle);
 
            let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // Port peer is owned by the creator as well
 
                let created_peer_port_index = port_id_pairs
 
                    .iter()
 
                    .position(|v| v.creator_id == creator_port_info.peer_port_id);
 
                match created_peer_port_index {
 
                    Some(created_peer_port_index) => {
 
                        // Peer port moved to the new component as well. So
 
                        // adjust IDs appropriately.
 
                        let peer_pair = &port_id_pairs[created_peer_port_index];
 
                        created_port_info.peer_port_id = peer_pair.created_id;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                        todo!("either add 'self peer', or remove that idea from Ctx altogether")
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        println!("DEBUG: Setting peer for port {:?} of component {:?} to {:?}", created_port_info.self_id, reservation.id(), creator_ctx.id);
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None);
 
                    }
src/runtime2/component/control_layer.rs
Show inline comments
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::*;
 

	
 
use super::component_context::*;
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) struct ControlId(u32);
 

	
 
impl ControlId {
 
    /// Like other invalid IDs, this one doesn't care any significance, but is
 
    /// just set at u32::MAX to hopefully bring out bugs sooner.
 
    fn new_invalid() -> Self {
 
        return ControlId(u32::MAX);
 
    }
 
}
 

	
 
struct ControlEntry {
 
    id: ControlId,
 
    ack_countdown: u32,
 
    content: ControlContent,
 
}
 

	
 
enum ControlContent {
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(CompId),
 
    BlockedPort(PortId),
 
    ClosedPort(PortId),
 
}
 

	
 
struct ContentPeerChange {
 
    source_port: PortId,
 
    source_comp: CompId,
 
    old_target_port: PortId,
 
    new_target_port: PortId,
 
    new_target_comp: CompId,
 
    schedule_entry_id: ControlId,
 
}
 

	
 
struct ControlClosedPort {
 
    closed_port: PortId,
 
    exit_entry_id: Option<ControlId>,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessage(CompId, ControlMessage),
 
    ScheduleComponent(CompId),
 
}
 

	
 
/// Handling/sending control messages.
 
pub(crate) struct ControlLayer {
 
    id_counter: ControlId,
 
    entries: Vec<ControlEntry>,
 
}
 

	
 
impl ControlLayer {
 
    pub(crate) fn should_reroute(&self, message: &mut Message) -> Option<CompId> {
 
        // Safety note: rerouting should occur during the time when we're
 
        // notifying a peer of a new component. During this period that
 
        // component hasn't been executed yet, so cannot have died yet.
 
        // FIX @NoDirectHandle
 
        let target_port = message.target_port();
 
        if target_port.is_none() {
 
            return None;
 
        }
 

	
 
        let target_port = target_port.unwrap();
 
        for entry in &self.entries {
 
            if let ControlContent::PeerChange(entry) = &entry.content {
 
                if entry.old_target_port == target_port {
 
                    message.modify_target_port(entry.new_target_port);
 
                    return Some(entry.new_target_comp);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an acknowledgement. The returned action must be performed by the
 
    /// caller. The optionally returned `ControlId` must be used and passed to
 
    /// `handle_ack` again.
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> (AckAction, Option<ControlId>) {
 
        let entry_index = self.get_entry_index_by_id(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 

	
 
        entry.ack_countdown -= 1;
 
        if entry.ack_countdown != 0 {
 
            return (AckAction::None, None);
 
        }
 

	
 
        let entry = self.entries.remove(entry_index);
 

	
 
        // All `Ack`s received, take action based on the kind of entry
 
        match entry.content {
 
            ControlContent::PeerChange(content) => {
 
                // If change of peer is ack'd. Then we are certain we have
 
                // rerouted all of the messages, and the sender's port can now
 
                // be unblocked again.
 
                let target_comp_id = content.source_comp;
 
                let message_to_send = ControlMessage{
 
                    id: ControlId::new_invalid(),
 
                    sender_comp_id: comp_ctx.id,
 
                    target_port_id: Some(content.source_port),
 
                    content: ControlMessageContent::PortPeerChangedUnblock(
 
                        content.new_target_port,
 
                        content.new_target_comp
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 

	
 
                return (
 
                    AckAction::SendMessage(target_comp_id, message_to_send),
 
                    Some(to_ack)
 
                );
 
            },
 
            ControlContent::ScheduleComponent(to_schedule) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return (AckAction::ScheduleComponent(to_schedule), None);
 
            },
 
            ControlContent::BlockedPort(_) => unreachable!(),
 
            ControlContent::ClosedPort(closed_port) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_handle = comp_ctx.get_port_handle(closed_port);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let port_peer_comp_id = port_info.peer_comp_id;
 
                debug_assert_eq!(port_info.state, PortState::Closed);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, port_peer_comp_id, true); // remove if closed
 

	
 
                return (AckAction::None, None);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn has_acks_remaining(&self) -> bool {
 
        return !self.entries.is_empty();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer (due to component creation)
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::ScheduleComponent(to_schedule_id),
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Removes a schedule entry. Only used if the caller preemptively called
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
    }
 

	
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
        old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId,
 
        schedule_entry_id: ControlId,
 
    ) -> Message {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::PeerChange(ContentPeerChange{
 
                source_port: source_port_id,
 
                source_comp: source_comp_id,
 
                old_target_port: old_target_port_id,
 
                new_target_port: new_target_port_id,
 
                new_target_comp: new_comp_id,
 
                schedule_entry_id,
 
            }),
 
        });
 

	
 
        // increment counter on schedule entry
 
        let entry_index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        self.entries[entry_index].ack_countdown += 1;
 

	
 
        return Message::Control(ControlMessage{
 
            id: entry_id,
 
            sender_comp_id: creator_comp_id,
 
            target_port_id: Some(source_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock(source_port_id)
 
        })
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn has_close_port_entry(&self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> Option<ControlId> {
 
        let port = comp_ctx.get_port(port_handle);
 
        let port_id = port.self_id;
 
        for entry in self.entries.iter() {
 
            if let ControlContent::ClosedPort(entry_port_id) = &entry.content {
 
                if *entry_port_id == port_id {
 
                    return Some(entry.id);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Initiates the control message procedures for closing a port. Caller must
 
    /// make sure that the port state has already been set to `Closed`.
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
 
        let port = comp_ctx.get_port(port_handle);
 
        let peer_port_id = port.peer_port_id;
 
        debug_assert!(port.state == PortState::Closed);
 

	
 
        // Construct the port-closing entry
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::ClosedPort(port.self_id),
 
        });
 

	
 
        // Return the messages notifying peer of the closed port
 
        let peer_handle = comp_ctx.get_peer_handle(port.peer_comp_id);
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(peer_port_id),
 
            }
 
        );
 
    }
 

	
 
    /// Adds a control entry to track that a port is blocked. Expects the caller
 
    /// to have set the port's state to blocking already. The returned tuple
 
    /// contains a message and the peer to send it to.
 
    /// Generates the control message used to indicate to a peer that a port
 
    /// should be blocked (expects the caller to have set the port's state to
 
    /// blocked).
 
    pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block
 
        debug_assert_eq!(port_info.state, PortState::Blocked); // contract with caller
 
        debug_assert_eq!(port_info.state, PortState::BlockedDueToFullBuffers); // contract with caller
 

	
 
        let peer_port_id = port_info.peer_port_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0,
 
            content: ControlContent::BlockedPort(port_info.self_id),
 
        });
 

	
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                id: ControlId::new_invalid(),
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_port_id),
 
                content: ControlMessageContent::BlockPort(peer_port_id),
 
            }
 
        );
 
    }
 

	
 
    /// Removes the control entry that tracks that a port is blocked. Expects
 
    /// the caller to have already marked the port as unblocked. Again the
 
    /// returned tuple contains a message and the target it is intended for
 
    /// Generates a messages used to indicate to a peer that a port should be
 
    /// unblocked again.
 
    pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking
 
        debug_assert_eq!(port_info.state, PortState::Open); // contract with caller, the locally stored entry ensures we were blocked before
 

	
 
        let position = self.entries.iter()
 
            .position(|v| {
 
                if let ControlContent::BlockedPort(blocked_port_id) = &v.content {
 
                    if *blocked_port_id == port_info.self_id {
 
                        return true;
 
                    }
 
                }
 
                return false;
 
            })
 
            .unwrap();
 

	
 
        let entry = self.entries.remove(position);
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry.id,
 
                id: ControlId::new_invalid(),
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_port_id),
 
                content: ControlMessageContent::UnblockPort(port_info.peer_port_id)
 
            }
 
        );
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn take_id(&mut self) -> ControlId {
 
        let id = self.id_counter;
 
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
 
        return id;
 
    }
 

	
 
    fn get_entry_index_by_id(&self, entry_id: ControlId) -> Option<usize> {
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if entry.id == entry_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
impl Default for ControlLayer {
 
    fn default() -> Self {
 
        return ControlLayer{
 
            id_counter: ControlId(0),
 
            entries: Vec::new(),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_pdl_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let (key, _) = rt.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    for i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication() {
 
fn test_component_communication_a() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o) {
 
        print(\"sender\");
 
        sync put(o, 1);
 
    }
 
    primitive receiver(in<u32> i) {
 
        print(\"receiver\");
 
        sync auto a = get(i);
 
    }
 
    composite constructor() {
 
        channel o -> i;
 
        print(\"creating sender\");
 
        new sender(o);
 
        print(\"creating receiver\");
 
        new receiver(i);
 
        print(\"done\");
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_component_communication_b() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o, u32 rounds) {
 
        u32 index = 0;
 
        sync while (index < rounds) {
 
            put(o, index);
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive receiver(in<u32> i, u32 rounds) {
 
        u32 index = 0;
 
        sync while (index < rounds) {
 
            auto val = get(i);
 
            while (val != index) {} // infinite loop if incorrect value is received
 
            index += 1;
 
        }
 
    }
 

	
 
    composite constructor() {
 
        channel o -> i;
 
        new sender(o, 5);
 
        new receiver(i, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)