Changeset - 135965141596
[Not reviewed]
0 4 0
mh - 3 years ago 2022-03-30 14:35:10
contact@maxhenger.nl
Factor out common component shutdown handling
4 files changed with 141 insertions and 35 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -123,190 +123,246 @@ pub(crate) fn create_component(
 
        // User-defined component
 
        let prompt = Prompt::new(
 
            &protocol.types, &protocol.heap,
 
            definition_id, type_id, arguments
 
        );
 
        let component = CompPDL::new(prompt, num_ports);
 
        return Box::new(component);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Generic component messaging utilities (for sending and receiving)
 
// -----------------------------------------------------------------------------
 

	
 
/// Handles control messages in the default way. Note that this function may
 
/// take a lot of actions in the name of the caller: pending messages may be
 
/// sent, ports may become blocked/unblocked, etc. So the execution
 
/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`)
 
/// state may all change.
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(control, message.id, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::BlockPort(port_id) => {
 
            // One of our messages was accepted, but the port should be
 
            // blocked.
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            if port_info.state == PortState::Open {
 
                // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes
 
                comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            }
 
        },
 
        ControlMessageContent::ClosePort(port_id) => {
 
            // Request to close the port. We immediately comply and remove
 
            // the component handle as well
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
 
            } else {
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
 
                comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed
 
            }
 
        },
 
        ControlMessageContent::UnblockPort(port_id) => {
 
            // We were previously blocked (or already closed)
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            if port_info.state == PortState::BlockedDueToFullBuffers {
 
                default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
            }
 
        },
 
        ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
            // The peer of our port has just changed. So we are asked to
 
            // temporarily block the port (while our original recipient is
 
            // potentially rerouting some of the in-flight messages) and
 
            // Ack. Then we wait for the `unblock` call.
 
            debug_assert_eq!(message.target_port_id, Some(port_id));
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange);
 

	
 
            let port_info = comp_ctx.get_port(port_handle);
 
            let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
            default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
            let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap());
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert!(port_info.state == PortState::BlockedDueToPeerChange);
 
            let old_peer_id = port_info.peer_comp_id;
 

	
 
            comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false);
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = new_comp_id;
 
            port_info.peer_port_id = new_port_id;
 
            comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None);
 
            default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
        }
 
    }
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
/// ports. Should only be called once per component (which is ensured by
 
/// checking and modifying the mode in the execution state).
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    sched_ctx.log("Component starting exit");
 
    exec_state.mode = CompMode::BusyExit;
 

	
 
    // Iterating by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state == PortState::Closed {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 

	
 
        // Mark as closed
 
        let port_id = port.self_id;
 
        port.state = PortState::Closed;
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
}
 

	
 
/// Handles a component waiting until all peers are notified that it is quitting
 
/// (i.e. after calling `default_handle_start_exit`).
 
pub(crate) fn default_handle_busy_exit(
 
    exec_state: &mut CompExecState, control: &ControlLayer,
 
    sched_ctx: &SchedulerCtx
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
 
    if control.has_acks_remaining() {
 
        sched_ctx.log("Component busy exiting, still has `Ack`s remaining");
 
        return CompScheduling::Sleep;
 
    } else {
 
        sched_ctx.log("Component busy exiting, now shutting down");
 
        exec_state.mode = CompMode::Exit;
 
        return CompScheduling::Exit;
 
    }
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
 

	
 
/// Handles an `Ack` for the control layer.
 
fn default_handle_ack(
 
    control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) {
 
    // Since an `Ack` may cause another one, handle them in a loop
 
    let mut to_ack = control_id;
 
    loop {
 
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
        match action {
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                handle.send_message(sched_ctx, Message::Control(message), true);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::ScheduleComponent(to_schedule) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 

	
 
                // Note that the component is intentionally not
 
                // sleeping, so we just wake it up
 
                debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
 
                let key = unsafe { to_schedule.upgrade() };
 
                sched_ctx.runtime.enqueue_work(key);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::None => {}
 
        }
 

	
 
        match new_to_ack {
 
            Some(new_to_ack) => to_ack = new_to_ack,
 
            None => break,
 
        }
 
    }
 
}
 

	
 
/// Little helper for sending the most common kind of `Ack`
 
fn default_send_ack(
 
    causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
 
) {
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{
 
        id: causer_of_ack_id,
 
        sender_comp_id: comp_ctx.id,
 
        target_port_id: None,
 
        content: ControlMessageContent::Ack
 
    }), true);
 
}
 

	
 
/// Handles the unblocking of a putter port. In case there is a pending message
 
/// on that port then it will be sent.
 
fn default_handle_unblock_put(
 
    exec_state: &mut CompExecState, consensus: &mut Consensus,
 
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
) {
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    let port_id = port_info.self_id;
 
    debug_assert!(port_info.state.is_blocked());
 
    port_info.state = PortState::Open;
 

	
 
    if exec_state.is_blocked_on_put(port_id) {
 
        // Annotate the message that we're going to send
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        let to_send = exec_state.mode_value.take();
 
        let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send);
 

	
 
        // Retrieve peer to send the message
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
 
    }
 
}
 

	
 

	
 
#[inline]
 
pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
src/runtime2/component/component_ip.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use crate::protocol::eval::{ValueGroup, EvalError};
 
use crate::runtime2::*;
 
use super::*;
 
use super::component::*;
 
use super::component::{self, Component, CompExecState, CompScheduling, CompMode};
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
/// TODO: Temporary component to figure out what to do with custom components.
 
///     This component sends random numbers between two u32 limits
 
pub struct ComponentRandomU32 {
 
    // Properties for this specific component
 
    output_port_id: PortId,
 
    random_minimum: u32,
 
    random_maximum: u32,
 
    // Generic state-tracking
 
    exec_state: CompExecState,
 
    control: ControlLayer,
 
    consensus: Consensus,
 
}
 

	
 
impl Component for ComponentRandomU32 {
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
 
        // Impossible since this component does not have any input ports in its
 
        // signature.
 
        unreachable!();
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match message {
 
            Message::Data(message) => unreachable!(),
 
            Message::Data(_message) => unreachable!(),
 
            Message::Sync(message) => {
 

	
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
            },
 
            Message::Control(message) => {
 

	
 
                component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        todo!()
 
        sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect => {
 
                // impossible for this component, no input ports and no select
 
                // blocks
 
                unreachable!();
 
            }
 
            CompMode::NonSync => {
 
                // If in non-sync mode then we check if the arguments make sense
 
                // (at some point in the future, this is just a testing
 
                // component)
 
            },
 
            CompMode::Sync => {
 

	
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep),
 
            CompMode::StartExit => return Ok(component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
 
            )),
 
            CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            )),
 
            CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
 
        }
 
    }
 
}
 

	
 
impl ComponentRandomU32 {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        debug_assert_eq!(arguments.values.len(), 3);
 
        debug_assert!(arguments.regions.is_empty());
 
        let port_id = port_id_from_eval(arguments.values[0].as_port_id());
 
        let port_id = component::port_id_from_eval(arguments.values[0].as_port_id());
 
        let minimum = arguments.values[1].as_uint32();
 
        let maximum = arguments.values[2].as_uint32();
 

	
 
        return Self{
 
            output_port_id: port_id,
 
            random_minimum: minimum,
 
            random_maximum: maximum,
 
            exec_state: CompExecState::new(),
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
        }
 
    }
 

	
 

	
 

	
 
    fn handle_sync_decision(&mut self, _sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        let success = match decision {
 
            SyncRoundDecision::None => return,
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
        if success {
 
            self.exec_state.mode = CompMode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
        } else {
 
            self.exec_state.mode = CompMode::StartExit;
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -179,407 +179,397 @@ impl SelectState {
 
        if self.cases.is_empty() {
 
            // If there are no cases then we can immediately reach a "bogus
 
            // decision".
 
            return SelectDecision::Case(0);
 
        }
 

	
 
        // Need to check for valid case
 
        'case_loop: for (case_index, case) in self.cases.iter().enumerate() {
 
            for port_handle in case.involved_ports.iter().copied() {
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if inbox[port_index].is_none() {
 
                    // Condition not satisfied
 
                    continue 'case_loop;
 
                }
 
            }
 

	
 
            // If here then the case guard is satisfied
 
            self.candidates_workspace.push(case_index);
 
        }
 

	
 
        if self.candidates_workspace.is_empty() {
 
            return SelectDecision::None;
 
        } else {
 
            let candidate_index = self.random.get_u64() as usize % self.candidates_workspace.len();
 
            return SelectDecision::Case(self.candidates_workspace[candidate_index] as u32);
 
        }
 
    }
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub exec_state: CompExecState,
 
    pub select_state: SelectState,
 
    pub prompt: Prompt,
 
    pub control: ControlLayer,
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: InboxMain,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl Component for CompPDL {
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        let port_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 
        } else {
 
            self.inbox_backup.push(message);
 
        }
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        sched_ctx.log(&format!("handling message: {:#?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.exec_state.mode {
 
            CompMode::NonSync | CompMode::Sync => {
 
                // continue and run PDL code
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => {
 
                return Ok(CompScheduling::Sleep);
 
            }
 
            CompMode::StartExit => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            CompMode::BusyExit => {
 
                if self.control.has_acks_remaining() {
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.exec_state.mode = CompMode::Exit;
 
                    return Ok(CompScheduling::Exit);
 
                }
 
            },
 
            CompMode::Exit => {
 
                return Ok(CompScheduling::Exit);
 
            }
 
            CompMode::StartExit => return Ok(component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
 
            )),
 
            CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            )),
 
            CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        self.handle_received_data_message(sched_ctx, comp_ctx, port_handle);
 

	
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return Ok(CompScheduling::Immediate);
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return Ok(CompScheduling::Sleep);
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.exec_state.set_as_blocked_get(port_id);
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                if port_info.state.is_blocked() {
 
                    self.exec_state.set_as_blocked_put(port_id, value);
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value);
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                    return Ok(CompScheduling::Immediate);
 
                }
 
            },
 
            EC::SelectStart(num_cases, _num_ports) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.select_state.handle_select_start(num_cases);
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::SelectRegisterPort(case_index, port_index, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) {
 
                    todo!("handle registering a port multiple times");
 
                }
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SelectWait => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let select_decision = self.select_state.handle_select_waiting_point(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    // Reached a conclusion, so we can continue immediately
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.exec_state.mode = CompMode::Sync;
 
                    return Ok(CompScheduling::Immediate);
 
                } else {
 
                    // No decision yet
 
                    self.exec_state.mode = CompMode::BlockedSelect;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.exec_state.mode = CompMode::StartExit; // next call we'll take care of the exit
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, type_id, arguments
 
                );
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                self.inbox_main.push(None);
 
                self.inbox_main.push(None);
 
                return Ok(CompScheduling::Immediate);
 
            }
 
        }
 
    }
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self {
 
        let mut inbox_main = Vec::new();
 
        inbox_main.reserve(num_ports);
 
        for _ in 0..num_ports {
 
            inbox_main.push(None);
 
        }
 

	
 
        return Self{
 
            exec_state: CompExecState::new(),
 
            select_state: SelectState::new(),
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main,
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Running component and handling changes in global component state
 
    // -------------------------------------------------------------------------
 

	
 
    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
 
        let mut step_result = EvalContinuation::Stepping;
 
        while let EvalContinuation::Stepping = step_result {
 
            step_result = self.prompt.step(
 
                &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
                &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx,
 
            )?;
 
        }
 

	
 
        return Ok(step_result)
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component starting sync mode");
 
        self.consensus.notify_sync_start(comp_ctx);
 
        for message in self.inbox_main.iter() {
 
            if let Some(message) = message {
 
                self.consensus.handle_new_data_message(comp_ctx, message);
 
            }
 
        }
 
        debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
        self.exec_state.mode = CompMode::Sync;
 
    }
 

	
 
    /// Handles end of sync. The conclusion to the sync round might arise
 
    /// immediately (and be handled immediately), or might come later through
 
    /// messaging. In any case the component should be scheduled again
 
    /// immediately
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        self.exec_state.mode = CompMode::SyncEnd;
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    /// Handles decision from the consensus round. This will cause a change in
 
    /// the internal `Mode`, such that the next call to `run` can take the
 
    /// appropriate next steps.
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.mode));
 
        let is_success = match decision {
 
        match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return;
 
            },
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        // If here then we've reached a decision
 
        debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
        if is_success {
 
            self.exec_state.mode = CompMode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
        } else {
 
            self.exec_state.mode = CompMode::StartExit;
 
            SyncRoundDecision::Solution => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
                self.exec_state.mode = CompMode::NonSync;
 
                self.consensus.notify_sync_decision(decision);
 
            },
 
            SyncRoundDecision::Failure => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
                self.exec_state.mode = CompMode::StartExit;
 
            },
 
        }
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component exiting");
 
        debug_assert_eq!(self.exec_state.mode, CompMode::StartExit);
 
        self.exec_state.mode = CompMode::BusyExit;
 

	
 
        // Doing this by index, then retrieving the handle is a bit rediculous,
 
        // but Rust is being Rust with its borrowing rules.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port = comp_ctx.get_port_by_index_mut(port_index);
 
            if port.state == PortState::Closed {
 
                // Already closed, or in the process of being closed
 
                continue;
 
            }
 

	
 
            // Mark as closed
 
            let port_id = port.self_id;
 
            port.state = PortState::Closed;
 

	
 
            // Notify peer of closing
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling messages
 
    // -------------------------------------------------------------------------
 

	
 
    fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) {
 
        let port_info = comp_ctx.get_port(source_port_handle);
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true);
 
    }
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
    /// will handle putting it in the correct place, and potentially blocking
 
    /// the port in case too many messages are being received.
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        // Whatever we do, glean information from headers in message
 
        if self.exec_state.mode.is_in_sync_block() {
 
            self.consensus.handle_new_data_message(comp_ctx, &message);
 
        }
 

	
 
        // Check if we can insert it directly into the storage associated with
 
        // the port
 
        let target_port_id = message.data_header.target_port;
 
        let port_handle = comp_ctx.get_port_handle(target_port_id);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 

	
 
            // After direct insertion, check if this component's execution is 
 
            // blocked on receiving a message on that port
 
            debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly
 
            if self.exec_state.is_blocked_on_get(target_port_id) {
 
                // We were indeed blocked
 
                self.exec_state.mode = CompMode::Sync;
 
                self.exec_state.mode_port = PortId::new_invalid();
 
            } else if self.exec_state.mode == CompMode::BlockedSelect {
 
                let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.exec_state.mode = CompMode::Sync;
 
                }
 
            }
 
            
 
            return;
 
        }
 

	
 
        // The direct inbox is full, so the port will become (or was already) blocked
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked());
 

	
 
        if port_info.state == PortState::Open {
 
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            let (peer_handle, message) =
 
                self.control.initiate_port_blocking(comp_ctx, port_handle);
 

	
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        // But we still need to remember the message, so:
 
        self.inbox_backup.push(message);
 
    }
 

	
 
    /// Handles when a message has been handed off from the inbox to the PDL
 
    /// code. We check to see if there are more messages waiting and, if not,
 
    /// then we handle the case where the port might have been blocked
src/runtime2/mod.rs
Show inline comments
 
mod store;
 
mod runtime;
 
mod component;
 
mod communication;
 
mod scheduler;
 
#[cfg(test)] mod tests;
 

	
 
pub use runtime::Runtime;
 
pub(crate) use scheduler::SchedulerCtx;
 
pub(crate) use communication::{Message, ControlMessage, SyncMessage, DataMessage};
 
\ No newline at end of file
 
pub(crate) use communication::{
 
    PortId, PortKind, PortState,
 
    Message, ControlMessage, SyncMessage, DataMessage,
 
    SyncRoundDecision
 
};
 
\ No newline at end of file
0 comments (0 inline, 0 general)