From 93081320c9fa81d20832dc602d2d92ac3622d157 2022-03-23 11:25:13 From: mh Date: 2022-03-23 11:25:13 Subject: [PATCH] Introduce trait for components --- diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index d68a0207767ee01904a1e6028d7742ccf9f63d8d..73ee7e4cc4ce5f8a272539990fa31a4ae357d8b0 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1,3 +1,4 @@ +use crate::protocol::eval::EvalError; use crate::runtime2::*; use super::CompCtx; @@ -8,7 +9,17 @@ pub enum CompScheduling { Exit, } +/// Generic representation of a component (as viewed by a scheduler). pub(crate) trait Component { - fn handle_message(sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message); - fn run(sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling; + /// Called if the component is created by another component and the messages + /// are being transferred between the two. + fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage); + + /// Called if the component receives a new message. The component is + /// responsible for deciding where that messages goes. + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message); + + /// Called if the component's routine should be executed. The return value + /// can be used to indicate when the routine should be run again. + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result; } \ No newline at end of file diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index f312249d6981fb59443ae041b5fe53248bfffde0..6486b1c87a6d42a94cc1cd75d577e4e036523d1b 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -10,7 +10,7 @@ use crate::protocol::eval::{ use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; -use super::component::CompScheduling; +use super::component::*; use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; @@ -243,32 +243,18 @@ pub(crate) struct CompPDL { pub inbox_backup: Vec, } -impl CompPDL { - pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { - let mut inbox_main = Vec::new(); - inbox_main.reserve(num_ports); - for _ in 0..num_ports { - inbox_main.push(None); - } - - return Self{ - mode: Mode::NonSync, - mode_port: PortId::new_invalid(), - mode_value: ValueGroup::default(), - select: SelectState::new(), - prompt: initial_state, - control: ControlLayer::default(), - consensus: Consensus::new(), - sync_counter: 0, - exec_ctx: ExecCtx{ - stmt: ExecStmt::None, - }, - inbox_main, - inbox_backup: Vec::new(), +impl Component for CompPDL { + fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) { + let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); + let port_index = comp_ctx.get_port_index(port_handle); + if self.inbox_main[port_index].is_none() { + self.inbox_main[port_index] = Some(message); + } else { + self.inbox_backup.push(message); } } - pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { sched_ctx.log(&format!("handling message: {:#?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); @@ -291,11 +277,7 @@ impl CompPDL { } } - // ------------------------------------------------------------------------- - // Running component and handling changes in global component state - // ------------------------------------------------------------------------- - - pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; sched_ctx.log(&format!("Running component (mode: {:?})", self.mode)); @@ -442,6 +424,36 @@ impl CompPDL { } } } +} + +impl CompPDL { + pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { + let mut inbox_main = Vec::new(); + inbox_main.reserve(num_ports); + for _ in 0..num_ports { + inbox_main.push(None); + } + + return Self{ + mode: Mode::NonSync, + mode_port: PortId::new_invalid(), + mode_value: ValueGroup::default(), + select: SelectState::new(), + prompt: initial_state, + control: ControlLayer::default(), + consensus: Consensus::new(), + sync_counter: 0, + exec_ctx: ExecCtx{ + stmt: ExecStmt::None, + }, + inbox_main, + inbox_backup: Vec::new(), + } + } + + // ------------------------------------------------------------------------- + // Running component and handling changes in global component state + // ------------------------------------------------------------------------- fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { let mut step_result = EvalContinuation::Stepping; @@ -879,11 +891,9 @@ impl CompPDL { let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, ); - let created_ctx = &component.ctx; // Now modify the creator's ports: remove every transferred port and - // potentially remove the peer component. Here is also where we will - // transfer messages in the main inbox. + // potentially remove the peer component. for pair in port_id_pairs.iter() { // Remove peer if appropriate let creator_port_info = creator_ctx.get_port(pair.creator_handle); @@ -893,12 +903,9 @@ impl CompPDL { creator_ctx.remove_port(pair.creator_handle); // Transfer any messages - let created_port_index = created_ctx.get_port_index(pair.created_handle); - let created_port_info = created_ctx.get_port(pair.created_handle); - debug_assert!(component.code.inbox_main[created_port_index].is_none()); if let Some(mut message) = self.inbox_main.remove(creator_port_index) { message.data_header.target_port = pair.created_id; - component.code.inbox_main[created_port_index] = Some(message); + component.component.adopt_message(&mut component.ctx, message) } let mut message_index = 0; @@ -908,25 +915,29 @@ impl CompPDL { // transfer message let mut message = self.inbox_backup.remove(message_index); message.data_header.target_port = pair.created_id; - component.code.inbox_backup.push(message); + component.component.adopt_message(&mut component.ctx, message); } else { message_index += 1; } } // Handle potential channel between creator and created component + let created_port_info = component.ctx.get_port(pair.created_handle); + if created_port_info.peer_comp_id == creator_ctx.id { let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); - peer_port_info.peer_comp_id = created_ctx.id; + peer_port_info.peer_comp_id = component.ctx.id; peer_port_info.peer_port_id = created_port_info.self_id; - creator_ctx.add_peer(peer_port_handle, sched_ctx, created_ctx.id, None); + creator_ctx.add_peer(peer_port_handle, sched_ctx, component.ctx.id, None); } } - // By now all ports have been transferred. We'll now do any of the setup - // for rerouting/messaging + // By now all ports and messages have been transferred. If there are any + // peers that need to be notified about this new component, then we + // initiate the protocol that will notify everyone here. if created_component_has_remote_peers { + let created_ctx = &component.ctx; let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); for pair in port_id_pairs.iter() { let port_info = created_ctx.get_port(pair.created_handle); diff --git a/src/runtime2/component/mod.rs b/src/runtime2/component/mod.rs index 72240a091c4b44ed376ff93fef76f4dc15b5e120..3b34d472f560a256641693c1d22ea24e77edc60a 100644 --- a/src/runtime2/component/mod.rs +++ b/src/runtime2/component/mod.rs @@ -4,7 +4,8 @@ mod control_layer; mod consensus; mod component; -pub(crate) use component_pdl::{CompPDL, CompScheduling}; +pub(crate) use component::{Component, CompScheduling}; +pub(crate) use component_pdl::{CompPDL}; pub(crate) use component_context::CompCtx; pub(crate) use control_layer::{ControlId}; diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 93d9db7f74210151b07b2858bb925c97c1ede825..6ec88c480c45afdf81dd51a0bf1bc7810fc2bc4e 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -7,4 +7,4 @@ mod scheduler; pub use runtime::Runtime; pub(crate) use scheduler::SchedulerCtx; -pub(crate) use communication::Message; \ No newline at end of file +pub(crate) use communication::{Message, ControlMessage, SyncMessage, DataMessage}; \ No newline at end of file diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 5c6ec8c980e4c03212e62d4123cdac96b437f0a2..4f2a18d823c102b3582bfd433ff9c2ae37619d4c 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -5,7 +5,7 @@ use std::collections::VecDeque; use crate::protocol::*; use super::communication::Message; -use super::component::{wake_up_if_sleeping, CompPDL, CompCtx}; +use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx}; use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer}; use super::scheduler::*; @@ -53,11 +53,12 @@ impl CompReserved { } } -/// Private fields of a component, may only be modified by a single thread at -/// a time. +/// Representation of a runtime component. Contains the bookkeeping variables +/// for the schedulers, the publicly accessible fields, and the private fields +/// that should only be accessed by the thread running the component's routine. pub(crate) struct RuntimeComp { pub public: CompPublic, - pub code: CompPDL, + pub component: Box, pub ctx: CompCtx, pub inbox: QueueDynMpsc, pub exiting: bool, @@ -246,9 +247,9 @@ impl RuntimeInner { return CompReserved{ reservation }; } - pub(crate) fn finish_create_pdl_component( + pub(crate) fn finish_create_pdl_component( &self, reserved: CompReserved, - component: CompPDL, mut context: CompCtx, initially_sleeping: bool, + component: C, mut context: CompCtx, initially_sleeping: bool, ) -> (CompKey, &mut RuntimeComp) { let inbox_queue = QueueDynMpsc::new(16); let inbox_producer = inbox_queue.producer(); @@ -261,7 +262,7 @@ impl RuntimeInner { num_handles: AtomicU32::new(1), // the component itself acts like a handle inbox: inbox_producer, }, - code: component, + component: Box::new(component), ctx: context, inbox: inbox_queue, exiting: false, diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index e7359488f13d940ccc259ceabb1cb744c0b6de23..87bdb80e316c95be1016a1347b9f6c5aaaaba5fa 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -61,9 +61,9 @@ impl Scheduler { let mut new_scheduling = CompScheduling::Immediate; while let CompScheduling::Immediate = new_scheduling { while let Some(message) = component.inbox.pop() { - component.code.handle_message(&mut scheduler_ctx, &mut component.ctx, message); + component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message); } - new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error"); + new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error"); } // Handle the new scheduling