diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 914ad380d3a355ed5516cf05c7063c80499fcdcc..cddd244fa3f52157eb6ff24e617dd061daf02408 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -181,13 +181,11 @@ impl CompExecState { } } -/// Generic representation of a component's inbox. -// NOTE: It would be nice to remove the `backup` at some point and make the -// blocking occur the moment a peer tries to send a message. -pub(crate) struct CompInbox { - main: Vec>, - backup: Vec, -} +// TODO: Replace when implementing port sending. Should probably be incorporated +// into CompCtx (and rename CompCtx into CompComms) +pub(crate) type InboxMain = Vec>; +pub(crate) type InboxMainRef = [Option]; +pub(crate) type InboxBackup = Vec; /// Creates a new component based on its definition. Meaning that if it is a /// user-defined component then we set up the PDL code state. Otherwise we @@ -281,10 +279,13 @@ pub(crate) enum IncomingData { // nicest if the sending component can figure out it cannot send any more data. #[must_use] pub(crate) fn default_handle_incoming_data_message( - exec_state: &mut CompExecState, port_value_slot: &mut Option, + exec_state: &mut CompExecState, inbox_main: &mut InboxMain, comp_ctx: &mut CompCtx, incoming_message: DataMessage, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> IncomingData { + let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port); + let port_index = comp_ctx.get_port_index(port_handle); + let port_value_slot = &mut inbox_main[port_index]; let target_port_id = incoming_message.data_header.target_port; if port_value_slot.is_none() { @@ -295,7 +296,6 @@ pub(crate) fn default_handle_incoming_data_message( dbg_code!({ // Our port cannot have been blocked itself, because we're able to // directly insert the message into its slot. - let port_handle = comp_ctx.get_port_handle(target_port_id); assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); }); @@ -310,7 +310,6 @@ pub(crate) fn default_handle_incoming_data_message( } else { // Slot is already full, so if the port was previously opened, it will // now become closed - let port_handle = comp_ctx.get_port_handle(target_port_id); let port_info = comp_ctx.get_port_mut(port_handle); debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future @@ -326,19 +325,80 @@ pub(crate) fn default_handle_incoming_data_message( } } +pub(crate) enum GetResult { + Received(DataMessage), + NoMessage, + Error((PortInstruction, String)), +} + +/// Default attempt at trying to receive from a port (i.e. through a `get`, or +/// the equivalent operation for a builtin component). `target_port` is the port +/// we're trying to receive from, and the `target_port_instruction` is the +/// instruction we're attempting on this port. +pub(crate) fn default_attempt_get( + exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction, + inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, + comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus +) -> GetResult { + let port_handle = comp_ctx.get_port_handle(target_port); + let port_index = comp_ctx.get_port_index(port_handle); + + if let Some(message) = &inbox_main[port_index] { + if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { + // We're allowed to receive this message + let message = inbox_main[port_index].take().unwrap(); + debug_assert_eq!(target_port, message.data_header.target_port); + + // Note: we can still run into an unrecoverable error when actually + // receiving this message + match default_handle_received_data_message( + target_port, target_port_instruction, inbox_main, inbox_backup, + comp_ctx, sched_ctx, control, + ) { + Ok(()) => return GetResult::Received(message), + Err(location_and_message) => return GetResult::Error(location_and_message) + } + } else { + // We're not allowed to receive this message. This means that the + // receiver is attempting to receive something out of order with + // respect to the sender. + return GetResult::Error((target_port_instruction, String::from( + "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s" + ))); + } + } else { + // We don't have a message waiting for us. + let port_info = comp_ctx.get_port(port_handle); + let port_is_closed = port_info.state == PortState::Closed; + if port_is_closed { + let peer_id = port_info.peer_comp_id; + return GetResult::Error(( + target_port_instruction, + format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0) + )); + } + + // No error ocurred, so enter the BlockedGet state + exec_state.set_as_blocked_get(target_port); + return GetResult::NoMessage; + } +} + /// Default handling that has been received through a `get`. Will check if any /// more messages are waiting, and if the corresponding port was blocked because /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( targeted_port: PortId, port_instruction: PortInstruction, - slot: &mut Option, inbox_backup: &mut Vec, + inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> Result<(), (PortInstruction, String)> { + let port_handle = comp_ctx.get_port_handle(targeted_port); + let port_index = comp_ctx.get_port_index(port_handle); + let slot = &mut inbox_main[port_index]; debug_assert!(slot.is_none()); // because we've just received from it // Modify last-known location where port instruction was retrieved - let port_handle = comp_ctx.get_port_handle(targeted_port); let port_info = comp_ctx.get_port_mut(port_handle); port_info.last_instruction = port_instruction; @@ -498,9 +558,42 @@ pub(crate) fn default_handle_control_message( /// Handles a component entering the synchronous block. Will ensure that the /// `Consensus` and the `ComponentCtx` are initialized properly. -pub(crate) fn default_handle_start_sync( - exec_state: &mut CompExecState, -) {} +pub(crate) fn default_handle_sync_start( + exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus +) { + sched_ctx.log("Component starting sync mode"); + + // If any messages are present for this sync round, set the appropriate flag + // and notify the consensus handler of the present messages + consensus.notify_sync_start(comp_ctx); + for (port_index, message) in inbox_main.iter().enumerate() { + if let Some(message) = message { + consensus.handle_incoming_data_message(comp_ctx, message); + let port_info = comp_ctx.get_port_by_index_mut(port_index); + port_info.received_message_for_sync = true; + } + } + + // Modify execution state + debug_assert_eq!(exec_state.mode, CompMode::NonSync); + exec_state.mode = CompMode::Sync; +} + +/// Handles a component that has reached the end of the sync block. This does +/// not necessarily mean that the component will go into the `NonSync` mode, as +/// it might have to wait for the leader to finish the round for everyone (see +/// `default_handle_sync_decision`) +pub(crate) fn default_handle_sync_end( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + consensus: &mut Consensus +) { + sched_ctx.log("Component ending sync mode (but possibly waiting for a solution)"); + debug_assert_eq!(exec_state.mode, CompMode::Sync); + let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx); + exec_state.mode = CompMode::SyncEnd; + default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus); +} /// Handles a component initiating the exiting procedure, and closing all of its /// ports. Should only be called once per component (which is ensured by diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index cd80e3b817f8818688e722288db1721673a55d6c..9a9ea3d99d1f8737734c2680d40beb4bbb6fd797 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -41,8 +41,8 @@ pub struct ComponentTcpClient { socket_state: SocketState, sync_state: SyncState, poll_ticket: Option, - inbox_main: Option, - inbox_backup: Vec, + inbox_main: InboxMain, + inbox_backup: InboxBackup, pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket input_union_send_tag_value: i64, @@ -90,8 +90,9 @@ impl Component for ComponentTcpClient { } fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { - if self.inbox_main.is_none() { - self.inbox_main = Some(message); + let slot = &mut self.inbox_main[0]; + if slot.is_none() { + *slot = Some(message); } else { self.inbox_backup.push(message); } @@ -138,8 +139,9 @@ impl Component for ComponentTcpClient { } else { // Reset for a new request self.sync_state = SyncState::AwaitingCmd; - self.consensus.notify_sync_start(comp_ctx); - self.exec_state.mode = CompMode::Sync; + component::default_handle_sync_start( + &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus + ); } return CompScheduling::Immediate; }, @@ -155,62 +157,48 @@ impl Component for ComponentTcpClient { // When in sync mode: wait for a command to come in match self.sync_state { SyncState::AwaitingCmd => { - if let Some(message) = &self.inbox_main { - self.consensus.handle_incoming_data_message(comp_ctx, &message); - if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) { - // Check which command we're supposed to execute. - let message = self.inbox_main.take().unwrap(); - let target_port_id = message.data_header.target_port; - let receive_result = component::default_handle_received_data_message( - target_port_id, PortInstruction::NoSource, - &mut self.inbox_main, &mut self.inbox_backup, - comp_ctx, sched_ctx, &mut self.control - ); - - if let Err(location_and_message) = receive_result { - component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); - return CompScheduling::Immediate; - } else { - let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); - if tag_value == self.input_union_send_tag_value { - // Retrieve bytes from the message - self.byte_buffer.clear(); - let union_content = &message.content.regions[embedded_heap_pos as usize]; - debug_assert_eq!(union_content.len(), 1); - let array_heap_pos = union_content[0].as_array(); - let array_values = &message.content.regions[array_heap_pos as usize]; - self.byte_buffer.reserve(array_values.len()); - for value in array_values { - self.byte_buffer.push(value.as_uint8()); - } - - self.sync_state = SyncState::Putting; - return CompScheduling::Immediate; - } else if tag_value == self.input_union_receive_tag_value { - // Component requires a `recv` - self.sync_state = SyncState::Getting; - return CompScheduling::Immediate; - } else if tag_value == self.input_union_finish_tag_value { - // Component requires us to end the sync round - self.sync_state = SyncState::FinishSync; - return CompScheduling::Immediate; - } else if tag_value == self.input_union_shutdown_tag_value { - // Component wants to close the connection - self.sync_state = SyncState::FinishSyncThenQuit; - return CompScheduling::Immediate; - } else { - unreachable!("got tag_value {}", tag_value) + match component::default_attempt_get( + &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource, + &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, + &mut self.control, &mut self.consensus + ) { + GetResult::Received(message) => { + let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); + if tag_value == self.input_union_send_tag_value { + // Retrieve bytes from the message + self.byte_buffer.clear(); + let union_content = &message.content.regions[embedded_heap_pos as usize]; + debug_assert_eq!(union_content.len(), 1); + let array_heap_pos = union_content[0].as_array(); + let array_values = &message.content.regions[array_heap_pos as usize]; + self.byte_buffer.reserve(array_values.len()); + for value in array_values { + self.byte_buffer.push(value.as_uint8()); } + + self.sync_state = SyncState::Putting; + } else if tag_value == self.input_union_receive_tag_value { + // Component requires a `recv` + self.sync_state = SyncState::Getting; + } else if tag_value == self.input_union_finish_tag_value { + // Component requires us to end the sync round + self.sync_state = SyncState::FinishSync; + } else if tag_value == self.input_union_shutdown_tag_value { + // Component wants to close the connection + self.sync_state = SyncState::FinishSyncThenQuit; + } else { + unreachable!("got tag_value {}", tag_value) } - } else { - todo!("handle sync failure due to message deadlock"); + + return CompScheduling::Immediate; + }, + GetResult::NoMessage => { return CompScheduling::Sleep; + }, + GetResult::Error(location_and_message) => { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate; } - } else { - let port_handle = comp_ctx.get_port_handle(self.pdl_input_port_id); - comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::NoSource; - self.exec_state.set_as_blocked_get(self.pdl_input_port_id); - return CompScheduling::Sleep; } }, SyncState::Putting => { @@ -236,10 +224,8 @@ impl Component for ComponentTcpClient { // If here then we're done putting the data, we can // finish the sync round - let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); - self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); - return CompScheduling::Immediate; + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + return CompScheduling::Requeue; }, SyncState::Getting => { // We're going to try and receive a single message. If @@ -273,9 +259,7 @@ impl Component for ComponentTcpClient { } }, SyncState::FinishSync | SyncState::FinishSyncThenQuit => { - let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); - self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; }, } @@ -327,7 +311,7 @@ impl ComponentTcpClient { socket_state: SocketState::Connected(socket.unwrap()), sync_state: SyncState::AwaitingCmd, poll_ticket: None, - inbox_main: None, + inbox_main: vec![None], inbox_backup: Vec::new(), input_union_send_tag_value: -1, input_union_receive_tag_value: -1, diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 092b5d9e444392d7cb5a700e5143f1979e258a6b..8278f9b270dbf19094fabca7e6de128b79f9d9a0 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -13,6 +13,7 @@ use crate::runtime2::communication::*; use super::component::{ self, + InboxMain, InboxBackup, GetResult, CompExecState, Component, CompScheduling, CompError, CompMode, ExitReason, port_id_from_eval, port_id_to_eval }; @@ -107,8 +108,6 @@ enum SelectDecision { Case(u32), // contains case index, should be passed along to PDL code } -type InboxMain = Vec>; - impl SelectState { fn new() -> Self { return Self{ @@ -308,8 +307,7 @@ impl Component for CompPDL { EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { - debug_assert_eq!(self.exec_state.mode, CompMode::Sync); - self.handle_sync_end(sched_ctx, comp_ctx); + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Immediate; }, EC::BlockGet(expr_id, port_id) => { @@ -317,53 +315,22 @@ impl Component for CompPDL { debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); - let port_handle = comp_ctx.get_port_handle(port_id); - - let port_info = comp_ctx.get_port_mut(port_handle); - port_info.last_instruction = PortInstruction::SourceLocation(expr_id); - let port_is_closed = port_info.state == PortState::Closed; - - let port_index = comp_ctx.get_port_index(port_handle); - if let Some(message) = &self.inbox_main[port_index] { - // Check if we can actually receive the message - if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { - // Message was received. Make sure any blocked peers and - // pending messages are handled. - let message = self.inbox_main[port_index].take().unwrap(); - let receive_result = component::default_handle_received_data_message( - port_id, PortInstruction::SourceLocation(expr_id), - &mut self.inbox_main[port_index], &mut self.inbox_backup, - comp_ctx, sched_ctx, &mut self.control - ); - if let Err(location_and_message) = receive_result { - self.handle_generic_component_error(sched_ctx, location_and_message); - return CompScheduling::Immediate - } else { - self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); - return CompScheduling::Immediate; - } - } else { - let protocol = &sched_ctx.runtime.protocol; - self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( - &self.prompt, &protocol.modules, &protocol.heap, expr_id, - String::from("Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s") - ))); + match component::default_attempt_get( + &mut self.exec_state, port_id, PortInstruction::SourceLocation(expr_id), + &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, + &mut self.control, &mut self.consensus + ) { + GetResult::Received(message) => { + self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); + return CompScheduling::Immediate; + }, + GetResult::NoMessage => { return CompScheduling::Sleep; + }, + GetResult::Error(location_and_message) => { + self.handle_generic_component_error(sched_ctx, location_and_message); + return CompScheduling::Immediate; } - } else if port_is_closed { - // No messages, but getting makes no sense as the port is - // closed. - let peer_id = comp_ctx.get_port(port_handle).peer_comp_id; - let protocol = &sched_ctx.runtime.protocol; - self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( - &self.prompt, &protocol.modules, &protocol.heap, expr_id, - format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0) - ))); - return CompScheduling::Immediate; - } else { - // We need to wait - self.exec_state.set_as_blocked_get(port_id); - return CompScheduling::Sleep; } }, EC::Put(expr_id, port_id, value) => { @@ -446,8 +413,9 @@ impl Component for CompPDL { return CompScheduling::Immediate; }, EC::SyncBlockStart => { - debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); - self.handle_sync_start(sched_ctx, comp_ctx); + component::default_handle_sync_start( + &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus + ); return CompScheduling::Immediate; }, EC::NewComponent(definition_id, type_id, arguments) => { @@ -525,17 +493,6 @@ impl CompPDL { self.exec_state.mode = CompMode::Sync; } - /// Handles end of sync. The conclusion to the sync round might arise - /// immediately (and be handled immediately), or might come later through - /// messaging. In any case the component should be scheduled again - /// immediately - fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - sched_ctx.log("Component ending sync mode (now waiting for solution)"); - let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); - self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); - } - fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log(&format!("Component exiting (reason: {:?}", self.exec_state.exit_reason)); debug_assert_eq!(self.exec_state.mode, CompMode::StartExit); @@ -578,10 +535,8 @@ impl CompPDL { self.consensus.handle_incoming_data_message(comp_ctx, &message); } - let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); - let port_index = comp_ctx.get_port_index(port_handle); match component::default_handle_incoming_data_message( - &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message, + &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control ) { IncomingData::PlacedInSlot => { diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index 07bb9bbee0bdedaaa31cd3339b942d37b71ac007..d493f00a294c4ad9d274aca600a7120f8a1126e3 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -83,8 +83,9 @@ impl Component for ComponentRandomU32 { } else { sched_ctx.log("Entering sync mode"); self.did_perform_send = false; - self.consensus.notify_sync_start(comp_ctx); - self.exec_state.mode = CompMode::Sync; + component::default_handle_sync_start( + &mut self.exec_state, &mut [], sched_ctx, comp_ctx, &mut self.consensus + ); } return CompScheduling::Immediate; @@ -120,10 +121,7 @@ impl Component for ComponentRandomU32 { } } else { // Message was sent, finish this sync round - sched_ctx.log("Waiting for consensus"); - self.exec_state.mode = CompMode::SyncEnd; - let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; } },