diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index e350d706a207397f98e63d266c188ed64efcefec..2a49e852f3e325391f077e5ea1978c0524b1fd9c 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -216,6 +216,62 @@ pub(crate) fn default_handle_control_message( } } +/// Handles a component initiating the exiting procedure, and closing all of its +/// ports. Should only be called once per component (which is ensured by +/// checking and modifying the mode in the execution state). +pub(crate) fn default_handle_start_exit( + exec_state: &mut CompExecState, control: &mut ControlLayer, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx +) -> CompScheduling { + debug_assert_eq!(exec_state.mode, CompMode::StartExit); + sched_ctx.log("Component starting exit"); + exec_state.mode = CompMode::BusyExit; + + // Iterating by index to work around borrowing rules + for port_index in 0..comp_ctx.num_ports() { + let port = comp_ctx.get_port_by_index_mut(port_index); + if port.state == PortState::Closed { + // Already closed, or in the process of being closed + continue; + } + + // Mark as closed + let port_id = port.self_id; + port.state = PortState::Closed; + + // Notify peer of closing + let port_handle = comp_ctx.get_port_handle(port_id); + let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx); + let peer_info = comp_ctx.get_peer(peer); + peer_info.handle.send_message(sched_ctx, Message::Control(message), true); + } + + return CompScheduling::Immediate; // to check if we can shut down immediately +} + +/// Handles a component waiting until all peers are notified that it is quitting +/// (i.e. after calling `default_handle_start_exit`). +pub(crate) fn default_handle_busy_exit( + exec_state: &mut CompExecState, control: &ControlLayer, + sched_ctx: &SchedulerCtx +) -> CompScheduling { + debug_assert_eq!(exec_state.mode, CompMode::BusyExit); + if control.has_acks_remaining() { + sched_ctx.log("Component busy exiting, still has `Ack`s remaining"); + return CompScheduling::Sleep; + } else { + sched_ctx.log("Component busy exiting, now shutting down"); + exec_state.mode = CompMode::Exit; + return CompScheduling::Exit; + } +} + +#[inline] +pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { + debug_assert_eq!(exec_state.mode, CompMode::Exit); + return CompScheduling::Exit; +} + // ----------------------------------------------------------------------------- // Internal messaging/state utilities // ----------------------------------------------------------------------------- diff --git a/src/runtime2/component/component_ip.rs b/src/runtime2/component/component_ip.rs index a3a83aa59df50bf6bb336fd7cbd53aaaf66551b8..87c65d3baa7173c5dabd768677bd9cea5cce17ba 100644 --- a/src/runtime2/component/component_ip.rs +++ b/src/runtime2/component/component_ip.rs @@ -1,14 +1,21 @@ -use crate::protocol::eval::*; +use crate::protocol::eval::{ValueGroup, EvalError}; use crate::runtime2::*; use super::*; -use super::component::*; +use super::component::{self, Component, CompExecState, CompScheduling, CompMode}; +use super::control_layer::*; +use super::consensus::*; /// TODO: Temporary component to figure out what to do with custom components. /// This component sends random numbers between two u32 limits pub struct ComponentRandomU32 { + // Properties for this specific component output_port_id: PortId, random_minimum: u32, random_maximum: u32, + // Generic state-tracking + exec_state: CompExecState, + control: ControlLayer, + consensus: Consensus, } impl Component for ComponentRandomU32 { @@ -20,18 +27,46 @@ impl Component for ComponentRandomU32 { fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { match message { - Message::Data(message) => unreachable!(), + Message::Data(_message) => unreachable!(), Message::Sync(message) => { - + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + self.handle_sync_decision(sched_ctx, comp_ctx, decision); }, Message::Control(message) => { - + component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx + ); } } } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { - todo!() + sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); + + match self.exec_state.mode { + CompMode::BlockedGet | CompMode::BlockedSelect => { + // impossible for this component, no input ports and no select + // blocks + unreachable!(); + } + CompMode::NonSync => { + // If in non-sync mode then we check if the arguments make sense + // (at some point in the future, this is just a testing + // component) + }, + CompMode::Sync => { + + }, + CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), + CompMode::StartExit => return Ok(component::default_handle_start_exit( + &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx + )), + CompMode::BusyExit => return Ok(component::default_handle_busy_exit( + &mut self.exec_state, &self.control, sched_ctx + )), + CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), + } } } @@ -39,7 +74,7 @@ impl ComponentRandomU32 { pub(crate) fn new(arguments: ValueGroup) -> Self { debug_assert_eq!(arguments.values.len(), 3); debug_assert!(arguments.regions.is_empty()); - let port_id = port_id_from_eval(arguments.values[0].as_port_id()); + let port_id = component::port_id_from_eval(arguments.values[0].as_port_id()); let minimum = arguments.values[1].as_uint32(); let maximum = arguments.values[2].as_uint32(); @@ -47,6 +82,27 @@ impl ComponentRandomU32 { output_port_id: port_id, random_minimum: minimum, random_maximum: maximum, + exec_state: CompExecState::new(), + control: ControlLayer::default(), + consensus: Consensus::new(), + } + } + + + + fn handle_sync_decision(&mut self, _sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { + let success = match decision { + SyncRoundDecision::None => return, + SyncRoundDecision::Solution => true, + SyncRoundDecision::Failure => false, + }; + + debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); + if success { + self.exec_state.mode = CompMode::NonSync; + self.consensus.notify_sync_decision(decision); + } else { + self.exec_state.mode = CompMode::StartExit; } } } \ No newline at end of file diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 236cff963e1047d61a0c3f607543776650f6b2b6..1ff5db7821f8ffaa23ef05b0672f5d5a7f0afb4e 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -272,21 +272,13 @@ impl Component for CompPDL { CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { return Ok(CompScheduling::Sleep); } - CompMode::StartExit => { - self.handle_component_exit(sched_ctx, comp_ctx); - return Ok(CompScheduling::Immediate); - }, - CompMode::BusyExit => { - if self.control.has_acks_remaining() { - return Ok(CompScheduling::Sleep); - } else { - self.exec_state.mode = CompMode::Exit; - return Ok(CompScheduling::Exit); - } - }, - CompMode::Exit => { - return Ok(CompScheduling::Exit); - } + CompMode::StartExit => return Ok(component::default_handle_start_exit( + &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx + )), + CompMode::BusyExit => return Ok(component::default_handle_busy_exit( + &mut self.exec_state, &self.control, sched_ctx + )), + CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), } let run_result = self.execute_prompt(&sched_ctx)?; @@ -471,22 +463,20 @@ impl CompPDL { /// appropriate next steps. fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.mode)); - let is_success = match decision { + match decision { SyncRoundDecision::None => { // No decision yet return; }, - SyncRoundDecision::Solution => true, - SyncRoundDecision::Failure => false, - }; - - // If here then we've reached a decision - debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); - if is_success { - self.exec_state.mode = CompMode::NonSync; - self.consensus.notify_sync_decision(decision); - } else { - self.exec_state.mode = CompMode::StartExit; + SyncRoundDecision::Solution => { + debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); + self.exec_state.mode = CompMode::NonSync; + self.consensus.notify_sync_decision(decision); + }, + SyncRoundDecision::Failure => { + debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); + self.exec_state.mode = CompMode::StartExit; + }, } } diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 6ec88c480c45afdf81dd51a0bf1bc7810fc2bc4e..e5a3b8eb2fba77ff1fb0480a18e6aad656c5d425 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -7,4 +7,8 @@ mod scheduler; pub use runtime::Runtime; pub(crate) use scheduler::SchedulerCtx; -pub(crate) use communication::{Message, ControlMessage, SyncMessage, DataMessage}; \ No newline at end of file +pub(crate) use communication::{ + PortId, PortKind, PortState, + Message, ControlMessage, SyncMessage, DataMessage, + SyncRoundDecision +}; \ No newline at end of file