Changeset - 135965141596
[Not reviewed]
0 4 0
mh - 3 years ago 2022-03-30 14:35:10
contact@maxhenger.nl
Factor out common component shutdown handling
4 files changed with 137 insertions and 31 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -216,6 +216,62 @@ pub(crate) fn default_handle_control_message(
 
    }
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
/// ports. Should only be called once per component (which is ensured by
 
/// checking and modifying the mode in the execution state).
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    sched_ctx.log("Component starting exit");
 
    exec_state.mode = CompMode::BusyExit;
 

	
 
    // Iterating by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state == PortState::Closed {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 

	
 
        // Mark as closed
 
        let port_id = port.self_id;
 
        port.state = PortState::Closed;
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
}
 

	
 
/// Handles a component waiting until all peers are notified that it is quitting
 
/// (i.e. after calling `default_handle_start_exit`).
 
pub(crate) fn default_handle_busy_exit(
 
    exec_state: &mut CompExecState, control: &ControlLayer,
 
    sched_ctx: &SchedulerCtx
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
 
    if control.has_acks_remaining() {
 
        sched_ctx.log("Component busy exiting, still has `Ack`s remaining");
 
        return CompScheduling::Sleep;
 
    } else {
 
        sched_ctx.log("Component busy exiting, now shutting down");
 
        exec_state.mode = CompMode::Exit;
 
        return CompScheduling::Exit;
 
    }
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
src/runtime2/component/component_ip.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use crate::protocol::eval::{ValueGroup, EvalError};
 
use crate::runtime2::*;
 
use super::*;
 
use super::component::*;
 
use super::component::{self, Component, CompExecState, CompScheduling, CompMode};
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
/// TODO: Temporary component to figure out what to do with custom components.
 
///     This component sends random numbers between two u32 limits
 
pub struct ComponentRandomU32 {
 
    // Properties for this specific component
 
    output_port_id: PortId,
 
    random_minimum: u32,
 
    random_maximum: u32,
 
    // Generic state-tracking
 
    exec_state: CompExecState,
 
    control: ControlLayer,
 
    consensus: Consensus,
 
}
 

	
 
impl Component for ComponentRandomU32 {
 
@@ -20,18 +27,46 @@ impl Component for ComponentRandomU32 {
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match message {
 
            Message::Data(message) => unreachable!(),
 
            Message::Data(_message) => unreachable!(),
 
            Message::Sync(message) => {
 

	
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
            },
 
            Message::Control(message) => {
 

	
 
                component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        todo!()
 
        sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect => {
 
                // impossible for this component, no input ports and no select
 
                // blocks
 
                unreachable!();
 
            }
 
            CompMode::NonSync => {
 
                // If in non-sync mode then we check if the arguments make sense
 
                // (at some point in the future, this is just a testing
 
                // component)
 
            },
 
            CompMode::Sync => {
 

	
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep),
 
            CompMode::StartExit => return Ok(component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
 
            )),
 
            CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            )),
 
            CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
 
        }
 
    }
 
}
 

	
 
@@ -39,7 +74,7 @@ impl ComponentRandomU32 {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        debug_assert_eq!(arguments.values.len(), 3);
 
        debug_assert!(arguments.regions.is_empty());
 
        let port_id = port_id_from_eval(arguments.values[0].as_port_id());
 
        let port_id = component::port_id_from_eval(arguments.values[0].as_port_id());
 
        let minimum = arguments.values[1].as_uint32();
 
        let maximum = arguments.values[2].as_uint32();
 

	
 
@@ -47,6 +82,27 @@ impl ComponentRandomU32 {
 
            output_port_id: port_id,
 
            random_minimum: minimum,
 
            random_maximum: maximum,
 
            exec_state: CompExecState::new(),
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
        }
 
    }
 

	
 

	
 

	
 
    fn handle_sync_decision(&mut self, _sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        let success = match decision {
 
            SyncRoundDecision::None => return,
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
        if success {
 
            self.exec_state.mode = CompMode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
        } else {
 
            self.exec_state.mode = CompMode::StartExit;
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -272,21 +272,13 @@ impl Component for CompPDL {
 
            CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => {
 
                return Ok(CompScheduling::Sleep);
 
            }
 
            CompMode::StartExit => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            CompMode::BusyExit => {
 
                if self.control.has_acks_remaining() {
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.exec_state.mode = CompMode::Exit;
 
                    return Ok(CompScheduling::Exit);
 
                }
 
            },
 
            CompMode::Exit => {
 
                return Ok(CompScheduling::Exit);
 
            }
 
            CompMode::StartExit => return Ok(component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
 
            )),
 
            CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            )),
 
            CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 
@@ -471,22 +463,20 @@ impl CompPDL {
 
    /// appropriate next steps.
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.mode));
 
        let is_success = match decision {
 
        match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return;
 
            },
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        // If here then we've reached a decision
 
            SyncRoundDecision::Solution => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
        if is_success {
 
                self.exec_state.mode = CompMode::NonSync;
 
                self.consensus.notify_sync_decision(decision);
 
        } else {
 
            },
 
            SyncRoundDecision::Failure => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
                self.exec_state.mode = CompMode::StartExit;
 
            },
 
        }
 
    }
 

	
src/runtime2/mod.rs
Show inline comments
 
@@ -7,4 +7,8 @@ mod scheduler;
 

	
 
pub use runtime::Runtime;
 
pub(crate) use scheduler::SchedulerCtx;
 
pub(crate) use communication::{Message, ControlMessage, SyncMessage, DataMessage};
 
\ No newline at end of file
 
pub(crate) use communication::{
 
    PortId, PortKind, PortState,
 
    Message, ControlMessage, SyncMessage, DataMessage,
 
    SyncRoundDecision
 
};
 
\ No newline at end of file
0 comments (0 inline, 0 general)