diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 092b5d9e444392d7cb5a700e5143f1979e258a6b..8278f9b270dbf19094fabca7e6de128b79f9d9a0 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -13,6 +13,7 @@ use crate::runtime2::communication::*; use super::component::{ self, + InboxMain, InboxBackup, GetResult, CompExecState, Component, CompScheduling, CompError, CompMode, ExitReason, port_id_from_eval, port_id_to_eval }; @@ -107,8 +108,6 @@ enum SelectDecision { Case(u32), // contains case index, should be passed along to PDL code } -type InboxMain = Vec>; - impl SelectState { fn new() -> Self { return Self{ @@ -308,8 +307,7 @@ impl Component for CompPDL { EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { - debug_assert_eq!(self.exec_state.mode, CompMode::Sync); - self.handle_sync_end(sched_ctx, comp_ctx); + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Immediate; }, EC::BlockGet(expr_id, port_id) => { @@ -317,53 +315,22 @@ impl Component for CompPDL { debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); - let port_handle = comp_ctx.get_port_handle(port_id); - - let port_info = comp_ctx.get_port_mut(port_handle); - port_info.last_instruction = PortInstruction::SourceLocation(expr_id); - let port_is_closed = port_info.state == PortState::Closed; - - let port_index = comp_ctx.get_port_index(port_handle); - if let Some(message) = &self.inbox_main[port_index] { - // Check if we can actually receive the message - if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { - // Message was received. Make sure any blocked peers and - // pending messages are handled. - let message = self.inbox_main[port_index].take().unwrap(); - let receive_result = component::default_handle_received_data_message( - port_id, PortInstruction::SourceLocation(expr_id), - &mut self.inbox_main[port_index], &mut self.inbox_backup, - comp_ctx, sched_ctx, &mut self.control - ); - if let Err(location_and_message) = receive_result { - self.handle_generic_component_error(sched_ctx, location_and_message); - return CompScheduling::Immediate - } else { - self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); - return CompScheduling::Immediate; - } - } else { - let protocol = &sched_ctx.runtime.protocol; - self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( - &self.prompt, &protocol.modules, &protocol.heap, expr_id, - String::from("Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s") - ))); + match component::default_attempt_get( + &mut self.exec_state, port_id, PortInstruction::SourceLocation(expr_id), + &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, + &mut self.control, &mut self.consensus + ) { + GetResult::Received(message) => { + self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); + return CompScheduling::Immediate; + }, + GetResult::NoMessage => { return CompScheduling::Sleep; + }, + GetResult::Error(location_and_message) => { + self.handle_generic_component_error(sched_ctx, location_and_message); + return CompScheduling::Immediate; } - } else if port_is_closed { - // No messages, but getting makes no sense as the port is - // closed. - let peer_id = comp_ctx.get_port(port_handle).peer_comp_id; - let protocol = &sched_ctx.runtime.protocol; - self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( - &self.prompt, &protocol.modules, &protocol.heap, expr_id, - format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0) - ))); - return CompScheduling::Immediate; - } else { - // We need to wait - self.exec_state.set_as_blocked_get(port_id); - return CompScheduling::Sleep; } }, EC::Put(expr_id, port_id, value) => { @@ -446,8 +413,9 @@ impl Component for CompPDL { return CompScheduling::Immediate; }, EC::SyncBlockStart => { - debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); - self.handle_sync_start(sched_ctx, comp_ctx); + component::default_handle_sync_start( + &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus + ); return CompScheduling::Immediate; }, EC::NewComponent(definition_id, type_id, arguments) => { @@ -525,17 +493,6 @@ impl CompPDL { self.exec_state.mode = CompMode::Sync; } - /// Handles end of sync. The conclusion to the sync round might arise - /// immediately (and be handled immediately), or might come later through - /// messaging. In any case the component should be scheduled again - /// immediately - fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - sched_ctx.log("Component ending sync mode (now waiting for solution)"); - let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); - self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); - } - fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log(&format!("Component exiting (reason: {:?}", self.exec_state.exit_reason)); debug_assert_eq!(self.exec_state.mode, CompMode::StartExit); @@ -578,10 +535,8 @@ impl CompPDL { self.consensus.handle_incoming_data_message(comp_ctx, &message); } - let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); - let port_index = comp_ctx.get_port_index(port_handle); match component::default_handle_incoming_data_message( - &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message, + &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control ) { IncomingData::PlacedInSlot => {