diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index f312249d6981fb59443ae041b5fe53248bfffde0..6486b1c87a6d42a94cc1cd75d577e4e036523d1b 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -10,7 +10,7 @@ use crate::protocol::eval::{ use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; -use super::component::CompScheduling; +use super::component::*; use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; @@ -243,32 +243,18 @@ pub(crate) struct CompPDL { pub inbox_backup: Vec, } -impl CompPDL { - pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { - let mut inbox_main = Vec::new(); - inbox_main.reserve(num_ports); - for _ in 0..num_ports { - inbox_main.push(None); - } - - return Self{ - mode: Mode::NonSync, - mode_port: PortId::new_invalid(), - mode_value: ValueGroup::default(), - select: SelectState::new(), - prompt: initial_state, - control: ControlLayer::default(), - consensus: Consensus::new(), - sync_counter: 0, - exec_ctx: ExecCtx{ - stmt: ExecStmt::None, - }, - inbox_main, - inbox_backup: Vec::new(), +impl Component for CompPDL { + fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) { + let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); + let port_index = comp_ctx.get_port_index(port_handle); + if self.inbox_main[port_index].is_none() { + self.inbox_main[port_index] = Some(message); + } else { + self.inbox_backup.push(message); } } - pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { sched_ctx.log(&format!("handling message: {:#?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); @@ -291,11 +277,7 @@ impl CompPDL { } } - // ------------------------------------------------------------------------- - // Running component and handling changes in global component state - // ------------------------------------------------------------------------- - - pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; sched_ctx.log(&format!("Running component (mode: {:?})", self.mode)); @@ -442,6 +424,36 @@ impl CompPDL { } } } +} + +impl CompPDL { + pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { + let mut inbox_main = Vec::new(); + inbox_main.reserve(num_ports); + for _ in 0..num_ports { + inbox_main.push(None); + } + + return Self{ + mode: Mode::NonSync, + mode_port: PortId::new_invalid(), + mode_value: ValueGroup::default(), + select: SelectState::new(), + prompt: initial_state, + control: ControlLayer::default(), + consensus: Consensus::new(), + sync_counter: 0, + exec_ctx: ExecCtx{ + stmt: ExecStmt::None, + }, + inbox_main, + inbox_backup: Vec::new(), + } + } + + // ------------------------------------------------------------------------- + // Running component and handling changes in global component state + // ------------------------------------------------------------------------- fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { let mut step_result = EvalContinuation::Stepping; @@ -879,11 +891,9 @@ impl CompPDL { let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, ); - let created_ctx = &component.ctx; // Now modify the creator's ports: remove every transferred port and - // potentially remove the peer component. Here is also where we will - // transfer messages in the main inbox. + // potentially remove the peer component. for pair in port_id_pairs.iter() { // Remove peer if appropriate let creator_port_info = creator_ctx.get_port(pair.creator_handle); @@ -893,12 +903,9 @@ impl CompPDL { creator_ctx.remove_port(pair.creator_handle); // Transfer any messages - let created_port_index = created_ctx.get_port_index(pair.created_handle); - let created_port_info = created_ctx.get_port(pair.created_handle); - debug_assert!(component.code.inbox_main[created_port_index].is_none()); if let Some(mut message) = self.inbox_main.remove(creator_port_index) { message.data_header.target_port = pair.created_id; - component.code.inbox_main[created_port_index] = Some(message); + component.component.adopt_message(&mut component.ctx, message) } let mut message_index = 0; @@ -908,25 +915,29 @@ impl CompPDL { // transfer message let mut message = self.inbox_backup.remove(message_index); message.data_header.target_port = pair.created_id; - component.code.inbox_backup.push(message); + component.component.adopt_message(&mut component.ctx, message); } else { message_index += 1; } } // Handle potential channel between creator and created component + let created_port_info = component.ctx.get_port(pair.created_handle); + if created_port_info.peer_comp_id == creator_ctx.id { let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); - peer_port_info.peer_comp_id = created_ctx.id; + peer_port_info.peer_comp_id = component.ctx.id; peer_port_info.peer_port_id = created_port_info.self_id; - creator_ctx.add_peer(peer_port_handle, sched_ctx, created_ctx.id, None); + creator_ctx.add_peer(peer_port_handle, sched_ctx, component.ctx.id, None); } } - // By now all ports have been transferred. We'll now do any of the setup - // for rerouting/messaging + // By now all ports and messages have been transferred. If there are any + // peers that need to be notified about this new component, then we + // initiate the protocol that will notify everyone here. if created_component_has_remote_peers { + let created_ctx = &component.ctx; let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); for pair in port_id_pairs.iter() { let port_info = created_ctx.get_port(pair.created_handle);