diff --git a/Cargo.toml b/Cargo.toml index 0111c5509eb738484982e085a3da446386f226d0..2155619a3cad5b2052b080c0d95f0bd937d99be1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,7 @@ internet=["libc"] [dependencies] -libc = { version = "^0.2", optional = true } # raw sockets -mio = { version = "0.8", features = ["os-poll"] } # cross-platform IO notification queue +libc = { version = "^0.2", optional = true } # raw sockets, epoll access # randomness rand = "0.8.4" diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index 51fd02d09163c89b2881c71691a6931795858990..610e58090392659f40c6b10c781a3d93fb55c371 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -203,8 +203,8 @@ pub enum EvalContinuation { SyncBlockEnd, NewFork, BlockFires(PortId), - BlockGet(PortId), - Put(PortId, ValueGroup), + BlockGet(ExpressionId, PortId), + Put(ExpressionId, PortId, ValueGroup), SelectStart(u32, u32), // (num_cases, num_ports_total) SelectRegisterPort(u32, u32, PortId), // (case_index, port_index_in_case, port_id) SelectWait, // wait until select can continue @@ -630,7 +630,7 @@ impl Prompt { // get run again after we've received a message. cur_frame.expr_values.push_front(value.clone()); cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id)); - return Ok(EvalContinuation::BlockGet(port_id)); + return Ok(EvalContinuation::BlockGet(expr_id, port_id)); } } }, @@ -657,7 +657,7 @@ impl Prompt { cur_frame.expr_values.push_front(port_value); cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id)); let value_group = ValueGroup::from_store(&self.store, &[deref_msg_value]); - return Ok(EvalContinuation::Put(port_id, value_group)); + return Ok(EvalContinuation::Put(expr_id, port_id, value_group)); } }, Method::Fires => { diff --git a/src/runtime/connector.rs b/src/runtime/connector.rs index 2aabac130545b88c7fb292d41e42d6a7a12b7493..61c5cf3d33ceefe32088c782e08608ecce5e0d81 100644 --- a/src/runtime/connector.rs +++ b/src/runtime/connector.rs @@ -333,7 +333,7 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; }, - EvalContinuation::BlockGet(port_id) => { + EvalContinuation::BlockGet(_expr_id, port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. let port_id = PortIdLocal::new(port_id.id); @@ -391,7 +391,7 @@ impl ConnectorPDL { let right_branch = &mut self.tree[right_id]; right_branch.prepared = PreparedStatement::ForkedExecution(false); } - EvalContinuation::Put(port_id, content) => { + EvalContinuation::Put(_expr_id, port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.id); let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 7be0b8b8ec97bb4981dccfaab90c50de37fcba94..f563446fba8df70db1e867ae1f42ab2d0d0c9b80 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -111,6 +111,7 @@ pub struct SyncSolutionGetterPort { pub peer_comp_id: CompId, pub peer_port_id: PortId, pub mapping: u32, + pub failed: bool, } /// Putter port in a solution. A putter may not be certain about who its peer @@ -120,6 +121,7 @@ pub struct SyncSolutionPutterPort { pub self_comp_id: CompId, pub self_port_id: PortId, pub mapping: u32, + pub failed: bool, } #[derive(Debug)] @@ -181,7 +183,7 @@ pub enum ControlMessageContent { PortPeerChangedUnblock(PortId, CompId), } -#[derive(Debug)] +#[derive(Copy, Clone, Debug)] pub struct ControlMessageClosePort { pub port_to_close: PortId, // ID of the receiving port pub closed_in_sync_round: bool, // needed to ensure correct handling of errors diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index f3a14e53bda533ed9cf9c8fcb57c487321bf6225..1ce574c330f24cb8b74fc25d7d5fdb35d5e6c00e 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1,3 +1,5 @@ +use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter}; + use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId}; use crate::protocol::*; use crate::runtime2::*; @@ -17,6 +19,29 @@ pub enum CompScheduling { Exit, } +/// Potential error emitted by a component +pub enum CompError { + /// Error originating from the code executor. Hence has an associated + /// source location. + Executor(EvalError), + /// Error originating from a component, but not necessarily associated with + /// a location in the source. + Component(String), // TODO: Maybe a different embedded value in the future? + /// Pure runtime error. Not necessarily originating from the component + /// itself. Should be treated as a very severe runtime-compromising error. + Runtime(RtError), +} + +impl FmtDisplay for CompError { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + match self { + CompError::Executor(v) => v.fmt(f), + CompError::Component(v) => v.fmt(f), + CompError::Runtime(v) => v.fmt(f), + } + } +} + /// Generic representation of a component (as viewed by a scheduler). pub(crate) trait Component { /// Called upon the creation of the component. Note that the scheduler @@ -39,7 +64,7 @@ pub(crate) trait Component { /// Called if the component's routine should be executed. The return value /// can be used to indicate when the routine should be run again. - fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result; + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling; } /// Representation of the generic operating mode of a component. @@ -51,8 +76,8 @@ pub(crate) enum CompMode { BlockedGet, // blocked because we need to receive a message on a particular port BlockedPut, // component is blocked because the port is blocked BlockedSelect, // waiting on message to complete the select statement - StartExit, // temporary state: if encountered then we start the shutdown process - BusyExit, // temporary state: waiting for Acks for all the closed ports + StartExit, // temporary state: if encountered then we start the shutdown process. + BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } @@ -65,6 +90,43 @@ impl CompMode { NonSync | StartExit | BusyExit | Exit => false, } } + + pub(crate) fn is_busy_exiting(&self) -> bool { + use CompMode::*; + + match self { + NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => false, + StartExit | BusyExit => true, + Exit => false, + } + } +} + +#[derive(Debug)] +pub(crate) enum ExitReason { + Termination, // regular termination of component + ErrorInSync, + ErrorNonSync, +} + +impl ExitReason { + pub(crate) fn is_in_sync(&self) -> bool { + use ExitReason::*; + + match self { + Termination | ErrorNonSync => false, + ErrorInSync => true, + } + } + + pub(crate) fn is_error(&self) -> bool { + use ExitReason::*; + + match self { + Termination => false, + ErrorInSync | ErrorNonSync => true, + } + } } /// Component execution state: the execution mode along with some descriptive @@ -74,6 +136,7 @@ pub(crate) struct CompExecState { pub mode: CompMode, pub mode_port: PortId, // valid if blocked on a port (put/get) pub mode_value: ValueGroup, // valid if blocked on a put + pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode } impl CompExecState { @@ -82,9 +145,15 @@ impl CompExecState { mode: CompMode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), + exit_reason: ExitReason::Termination, } } + pub(crate) fn set_as_start_exit(&mut self, reason: ExitReason) { + self.mode = CompMode::StartExit; + self.exit_reason = reason; + } + pub(crate) fn set_as_blocked_get(&mut self, port: PortId) { self.mode = CompMode::BlockedGet; self.mode_port = port; @@ -153,19 +222,26 @@ pub(crate) fn create_component( /// scheduling value must be used. #[must_use] pub(crate) fn default_send_data_message( - exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup, + exec_state: &mut CompExecState, transmitting_port_id: PortId, + port_instruction: PortInstruction, value: ValueGroup, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx -) -> Result { // @nocommit: Something better than Err(String) +) -> Result { debug_assert_eq!(exec_state.mode, CompMode::Sync); let port_handle = comp_ctx.get_port_handle(transmitting_port_id); + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = port_instruction; + let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); if port_info.state == PortState::Closed { // Note: normally peer is eventually consistent, but if it has shut down // then we can be sure it is consistent (I think?) - return Err(format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)) + return Err(( + port_info.last_instruction, + format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0) + )) } else if port_info.state.is_blocked() { // Port is blocked, so we cannot send exec_state.set_as_blocked_put(transmitting_port_id, value); @@ -245,13 +321,25 @@ pub(crate) fn default_handle_incoming_data_message( /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( - targeted_port: PortId, slot: &mut Option, inbox_backup: &mut Vec, + targeted_port: PortId, port_instruction: PortInstruction, + slot: &mut Option, inbox_backup: &mut Vec, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer -) { +) -> Result<(), (PortInstruction, String)> { debug_assert!(slot.is_none()); // because we've just received from it - // Check if there are any more messages in the backup buffer + // Modify last-known location where port instruction was retrieved let port_handle = comp_ctx.get_port_handle(targeted_port); + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = port_instruction; + + if port_info.state == PortState::Closed { + return Err(( + port_info.last_instruction, + format!("Cannot 'get' because the channel is closed")) + ); + } + + // Check if there are any more messages in the backup buffer let port_info = comp_ctx.get_port(port_handle); for message_index in 0..inbox_backup.len() { let message = &inbox_backup[message_index]; @@ -261,7 +349,7 @@ pub(crate) fn default_handle_received_data_message( debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup *slot = Some(message); - return; + return Ok(()); } } @@ -273,6 +361,8 @@ pub(crate) fn default_handle_received_data_message( let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } + + return Ok(()); } /// Handles control messages in the default way. Note that this function may @@ -283,7 +373,7 @@ pub(crate) fn default_handle_received_data_message( pub(crate) fn default_handle_control_message( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx -) -> Result<(), String> { // @nocommit, use something else than Err(String) +) -> Result<(), (PortInstruction, String)> { match message.content { ControlMessageContent::Ack => { default_handle_ack(control, message.id, sched_ctx, comp_ctx); @@ -302,15 +392,17 @@ pub(crate) fn default_handle_control_message( ControlMessageContent::ClosePort(content) => { // Request to close the port. We immediately comply and remove // the component handle as well - let port_handle = comp_ctx.get_port_handle(content.port_id); - let port_info = comp_ctx.get_port_mut(port_handle); - let peer_comp_id = port_info.peer_comp_id; - let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + let port_handle = comp_ctx.get_port_handle(content.port_to_close); // We're closing the port, so we will always update the peer of the // port (in case of error messages) + let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = message.sender_comp_id; + let port_info = comp_ctx.get_port(port_handle); + let peer_comp_id = port_info.peer_comp_id; + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + // One exception to sending an `Ack` is if we just closed the // port ourselves, meaning that the `ClosePort` messages got // sent to one another. @@ -334,19 +426,10 @@ pub(crate) fn default_handle_control_message( // closing the port, that we don't consider the sync round to // have failed by mistake. if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used { - let error_message = match last_instruction { - PortInstruction::None => unreachable!(), // port was used - PortInstruction::NoSource => format!( - "Peer component (id:{}) shut down, so operation on port cannot have succeeded", - message.sender_comp_id.0 - ), - PortInstruction::SourceLocation(source_location) => format!( - "Peer component (id:{}) shut down, so this operation cannot have succeeded", - message.sender_comp_id.0 - ), - }; - - return Err(error_message); + return Err(( + last_instruction, + format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0) + )); } } }, @@ -398,13 +481,21 @@ pub(crate) fn default_handle_control_message( #[must_use] pub(crate) fn default_handle_start_exit( exec_state: &mut CompExecState, control: &mut ControlLayer, - sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); sched_ctx.log("Component starting exit"); exec_state.mode = CompMode::BusyExit; + let exit_inside_sync = exec_state.exit_reason.is_in_sync(); + + // If exiting while inside sync mode, report to the leader of the current + // round that we've failed. + if exit_inside_sync { + let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx); + default_handle_sync_decision(sched_ctx, exec_state, decision, consensus); + } - // Iterating by index to work around borrowing rules + // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); if port.state == PortState::Closed { @@ -418,7 +509,7 @@ pub(crate) fn default_handle_start_exit( // Notify peer of closing let port_handle = comp_ctx.get_port_handle(port_id); - let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx); + let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx); let peer_info = comp_ctx.get_peer(peer); peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } @@ -447,28 +538,68 @@ pub(crate) fn default_handle_busy_exit( /// Handles a potential synchronous round decision. If there was a decision then /// the `Some(success)` value indicates whether the round succeeded or not. /// Might also end up changing the `ExecState`. +/// +/// Might be called in two cases: +/// 1. The component is in regular execution mode, at the end of a sync round, +/// and is waiting for a solution to the round. +/// 2. The component has encountered an error during a sync round and is +/// exiting, hence is waiting for a "Failure" message from the leader. pub(crate) fn default_handle_sync_decision( - exec_state: &mut CompExecState, decision: SyncRoundDecision, - consensus: &mut Consensus + sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, + decision: SyncRoundDecision, consensus: &mut Consensus ) -> Option { - debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); let success = match decision { SyncRoundDecision::None => return None, SyncRoundDecision::Solution => true, SyncRoundDecision::Failure => false, }; - debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); + debug_assert!( + exec_state.mode == CompMode::SyncEnd || ( + exec_state.mode.is_busy_exiting() && exec_state.exit_reason.is_error() + ) || ( + exec_state.mode.is_in_sync_block() && decision == SyncRoundDecision::Failure + ) + ); + + sched_ctx.log(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode)); + consensus.notify_sync_decision(decision); if success { + // We cannot get a success message if the component has encountered an + // error. + debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); exec_state.mode = CompMode::NonSync; - consensus.notify_sync_decision(decision); return Some(true); } else { - exec_state.mode = CompMode::StartExit; + // We may get failure both in all possible cases. But we should only + // modify the execution state if we're not already in exit mode + if !exec_state.mode.is_busy_exiting() { + sched_ctx.error("failed synchronous round, initiating exit"); + exec_state.set_as_start_exit(ExitReason::ErrorNonSync); + } return Some(false); } } +/// Performs the default action of printing the provided error, and then putting +/// the component in the state where it will shut down. Only to be used for +/// builtin components: their error message construction is simpler (and more +/// common) as they don't have any source code. +pub(crate) fn default_handle_error_for_builtin( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, + location_and_message: (PortInstruction, String) +) { + let (_location, message) = location_and_message; + sched_ctx.error(&message); + + let exit_reason = if exec_state.mode.is_in_sync_block() { + ExitReason::ErrorInSync + } else { + ExitReason::ErrorNonSync + }; + + exec_state.set_as_start_exit(exit_reason); +} #[inline] pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index fc49944ad2baeff421181bece2c7b2567731d203..1a8ae2ec64d3393c240a88d43ed921d95a9ad204 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -4,7 +4,7 @@ use crate::runtime2::communication::*; use crate::protocol::ExpressionId; -/// Helper struct to remember when the last operation on the port took place +/// Helper struct to remember when the last operation on the port took place. #[derive(Debug, PartialEq, Copy, Clone)] pub enum PortInstruction { None, diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 3b9c9ca77e42b135a396a2965dbfcba32f8b81cb..e9e41ac593a5b19ee65ba52fb9d25647a1a9fe4c 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -1,6 +1,6 @@ -use crate::protocol::eval::{ValueGroup, Value, EvalError}; +use crate::protocol::eval::{ValueGroup, Value}; use crate::runtime2::*; -use crate::runtime2::component::{CompCtx, CompId}; +use crate::runtime2::component::{CompCtx, CompId, PortInstruction}; use crate::runtime2::stdlib::internet::*; use crate::runtime2::poll::*; @@ -104,13 +104,15 @@ impl Component for ComponentTcpClient { }, Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); }, Message::Control(message) => { - component::default_handle_control_message( + if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx - ); + ) { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + } }, Message::Poll => { sched_ctx.log("Received polling event"); @@ -118,7 +120,7 @@ impl Component for ComponentTcpClient { } } - fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); match self.exec_state.mode { @@ -132,20 +134,20 @@ impl Component for ComponentTcpClient { SocketState::Connected(_socket) => { if self.sync_state == SyncState::FinishSyncThenQuit { // Previous request was to let the component shut down - self.exec_state.mode = CompMode::StartExit; + self.exec_state.set_as_start_exit(ExitReason::Termination); } else { // Reset for a new request self.sync_state = SyncState::AwaitingCmd; self.consensus.notify_sync_start(comp_ctx); self.exec_state.mode = CompMode::Sync; } - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; }, SocketState::Error => { // Could potentially send an error message to the // connected component. - self.exec_state.mode = CompMode::StartExit; - return Ok(CompScheduling::Immediate); + self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync); + return CompScheduling::Immediate; } } }, @@ -159,48 +161,54 @@ impl Component for ComponentTcpClient { // Check which command we're supposed to execute. let message = self.inbox_main.take().unwrap(); let target_port_id = message.data_header.target_port; - component::default_handle_received_data_message( - target_port_id, &mut self.inbox_main, &mut self.inbox_backup, + let receive_result = component::default_handle_received_data_message( + target_port_id, PortInstruction::NoSource, + &mut self.inbox_main, &mut self.inbox_backup, comp_ctx, sched_ctx, &mut self.control ); - let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); - if tag_value == self.input_union_send_tag_value { - // Retrieve bytes from the message - self.byte_buffer.clear(); - let union_content = &message.content.regions[embedded_heap_pos as usize]; - debug_assert_eq!(union_content.len(), 1); - let array_heap_pos = union_content[0].as_array(); - let array_values = &message.content.regions[array_heap_pos as usize]; - self.byte_buffer.reserve(array_values.len()); - for value in array_values { - self.byte_buffer.push(value.as_uint8()); - } - - self.sync_state = SyncState::Putting; - return Ok(CompScheduling::Immediate); - } else if tag_value == self.input_union_receive_tag_value { - // Component requires a `recv` - self.sync_state = SyncState::Getting; - return Ok(CompScheduling::Immediate); - } else if tag_value == self.input_union_finish_tag_value { - // Component requires us to end the sync round - self.sync_state = SyncState::FinishSync; - return Ok(CompScheduling::Immediate); - } else if tag_value == self.input_union_shutdown_tag_value { - // Component wants to close the connection - self.sync_state = SyncState::FinishSyncThenQuit; - return Ok(CompScheduling::Immediate); + if let Err(location_and_message) = receive_result { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate; } else { - unreachable!("got tag_value {}", tag_value) + let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); + if tag_value == self.input_union_send_tag_value { + // Retrieve bytes from the message + self.byte_buffer.clear(); + let union_content = &message.content.regions[embedded_heap_pos as usize]; + debug_assert_eq!(union_content.len(), 1); + let array_heap_pos = union_content[0].as_array(); + let array_values = &message.content.regions[array_heap_pos as usize]; + self.byte_buffer.reserve(array_values.len()); + for value in array_values { + self.byte_buffer.push(value.as_uint8()); + } + + self.sync_state = SyncState::Putting; + return CompScheduling::Immediate; + } else if tag_value == self.input_union_receive_tag_value { + // Component requires a `recv` + self.sync_state = SyncState::Getting; + return CompScheduling::Immediate; + } else if tag_value == self.input_union_finish_tag_value { + // Component requires us to end the sync round + self.sync_state = SyncState::FinishSync; + return CompScheduling::Immediate; + } else if tag_value == self.input_union_shutdown_tag_value { + // Component wants to close the connection + self.sync_state = SyncState::FinishSyncThenQuit; + return CompScheduling::Immediate; + } else { + unreachable!("got tag_value {}", tag_value) + } } } else { todo!("handle sync failure due to message deadlock"); - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } } else { self.exec_state.set_as_blocked_get(self.pdl_input_port_id); - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } }, SyncState::Putting => { @@ -216,7 +224,7 @@ impl Component for ComponentTcpClient { }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { - return Ok(CompScheduling::Sleep); // wait until notified + return CompScheduling::Sleep; // wait until notified } else { todo!("handle socket.send error {:?}", err) } @@ -226,10 +234,10 @@ impl Component for ComponentTcpClient { // If here then we're done putting the data, we can // finish the sync round - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); - return Ok(CompScheduling::Immediate); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + return CompScheduling::Immediate; }, SyncState::Getting => { // We're going to try and receive a single message. If @@ -243,13 +251,19 @@ impl Component for ComponentTcpClient { Ok(num_received) => { self.byte_buffer.resize(num_received, 0); let message_content = self.bytes_to_data_message_content(&self.byte_buffer); - let scheduling = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, message_content, sched_ctx, &mut self.consensus, comp_ctx); - self.sync_state = SyncState::AwaitingCmd; - return Ok(scheduling); + let send_result = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, message_content, sched_ctx, &mut self.consensus, comp_ctx); + if let Err(location_and_message) = send_result { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate; + } else { + let scheduling = send_result.unwrap(); + self.sync_state = SyncState::AwaitingCmd; + return scheduling; + } }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { - return Ok(CompScheduling::Sleep); // wait until polled + return CompScheduling::Sleep; // wait until polled } else { todo!("handle socket.receive error {:?}", err) } @@ -257,26 +271,26 @@ impl Component for ComponentTcpClient { } }, SyncState::FinishSync | SyncState::FinishSyncThenQuit => { - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); - return Ok(CompScheduling::Requeue); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + return CompScheduling::Requeue; }, } }, CompMode::BlockedGet => { // Entered when awaiting a new command debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; }, CompMode::SyncEnd | CompMode::BlockedPut => - return Ok(CompScheduling::Sleep), + return CompScheduling::Sleep, CompMode::StartExit => - return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)), + return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus), CompMode::BusyExit => - return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)), + return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx), CompMode::Exit => - return Ok(component::default_handle_exit(&self.exec_state)), + return component::default_handle_exit(&self.exec_state), } } } diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index dd3081ee1d12512335b33c706a690cfa21fc7612..46424fe798589d6830b7a9877dfb6cf8e960a2bb 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -13,7 +13,7 @@ use crate::runtime2::communication::*; use super::component::{ self, - CompExecState, Component, CompScheduling, CompMode, + CompExecState, Component, CompScheduling, CompError, CompMode, ExitReason, port_id_from_eval, port_id_to_eval }; use super::component_context::*; @@ -256,10 +256,12 @@ impl Component for CompPDL { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { - component::default_handle_control_message( + if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx - ); + ) { + self.handle_component_error(sched_ctx, location_and_message); + } }, Message::Sync(message) => { self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); @@ -270,7 +272,7 @@ impl Component for CompPDL { } } - fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { use EvalContinuation as EC; sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode)); @@ -282,18 +284,32 @@ impl Component for CompPDL { // continue and run PDL code }, CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } - CompMode::StartExit => return Ok(component::default_handle_start_exit( - &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx - )), - CompMode::BusyExit => return Ok(component::default_handle_busy_exit( + CompMode::StartExit => return component::default_handle_start_exit( + &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus + ), + CompMode::BusyExit => return component::default_handle_busy_exit( &mut self.exec_state, &self.control, sched_ctx - )), - CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), + ), + CompMode::Exit => return component::default_handle_exit(&self.exec_state), + } + + let run_result = self.execute_prompt(&sched_ctx); + if let Err(error) = run_result { + // TODO: Cleanup before @nocommit + sched_ctx.error(&format!("{}", error)); + let exit_reason = if self.exec_state.mode.is_in_sync_block() { + ExitReason::ErrorInSync + } else { + ExitReason::ErrorNonSync + }; + + self.exec_state.set_as_start_exit(exit_reason); + return CompScheduling::Immediate; } - let run_result = self.execute_prompt(&sched_ctx)?; + let run_result = run_result.unwrap(); match run_result { EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned @@ -302,9 +318,9 @@ impl Component for CompPDL { EC::SyncBlockEnd => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.handle_sync_end(sched_ctx, comp_ctx); - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; }, - EC::BlockGet(port_id) => { + EC::BlockGet(expr_id, port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); debug_assert!(self.exec_ctx.stmt.is_none()); @@ -317,44 +333,55 @@ impl Component for CompPDL { // Message was received. Make sure any blocked peers and // pending messages are handled. let message = self.inbox_main[port_index].take().unwrap(); - component::default_handle_received_data_message( - port_id, &mut self.inbox_main[port_index], &mut self.inbox_backup, + let receive_result = component::default_handle_received_data_message( + port_id, PortInstruction::SourceLocation(expr_id), + &mut self.inbox_main[port_index], &mut self.inbox_backup, comp_ctx, sched_ctx, &mut self.control ); - - self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); - return Ok(CompScheduling::Immediate); + if let Err(location_and_message) = receive_result { + self.handle_component_error(sched_ctx, location_and_message); + return CompScheduling::Immediate + } else { + self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); + return CompScheduling::Immediate; + } } else { todo!("handle sync failure due to message deadlock"); - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } } else { // We need to wait self.exec_state.set_as_blocked_get(port_id); - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } }, - EC::Put(port_id, value) => { + EC::Put(expr_id, port_id, value) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); sched_ctx.log(&format!("Putting value {:?}", value)); // Send the message let target_port_id = port_id_from_eval(port_id); - let scheduling = component::default_send_data_message( - &mut self.exec_state, target_port_id, value, + let send_result = component::default_send_data_message( + &mut self.exec_state, target_port_id, + PortInstruction::SourceLocation(expr_id), value, sched_ctx, &mut self.consensus, comp_ctx ); - - // When `run` is called again (potentially after becoming - // unblocked) we need to instruct the executor that we performed - // the `put` - self.exec_ctx.stmt = ExecStmt::PerformedPut; - return Ok(scheduling); + if let Err(location_and_message) = send_result { + self.handle_component_error(sched_ctx, location_and_message); + return CompScheduling::Immediate; + } else { + // When `run` is called again (potentially after becoming + // unblocked) we need to instruct the executor that we performed + // the `put` + let scheduling = send_result.unwrap(); + self.exec_ctx.stmt = ExecStmt::PerformedPut; + return scheduling; + } }, EC::SelectStart(num_cases, _num_ports) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.select_state.handle_select_start(num_cases); - return Ok(CompScheduling::Requeue); + return CompScheduling::Requeue; }, EC::SelectRegisterPort(case_index, port_index, port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); @@ -362,7 +389,7 @@ impl Component for CompPDL { if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { todo!("handle registering a port multiple times"); } - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; }, EC::SelectWait => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); @@ -371,22 +398,22 @@ impl Component for CompPDL { // Reached a conclusion, so we can continue immediately self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); self.exec_state.mode = CompMode::Sync; - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; } else { // No decision yet self.exec_state.mode = CompMode::BlockedSelect; - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { - self.exec_state.mode = CompMode::StartExit; // next call we'll take care of the exit - return Ok(CompScheduling::Immediate); + self.exec_state.set_as_start_exit(ExitReason::Termination); + return CompScheduling::Immediate; }, EC::SyncBlockStart => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; }, EC::NewComponent(definition_id, type_id, arguments) => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); @@ -394,7 +421,7 @@ impl Component for CompPDL { sched_ctx, comp_ctx, definition_id, type_id, arguments ); - return Ok(CompScheduling::Requeue); + return CompScheduling::Requeue; }, EC::NewChannel => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); @@ -406,7 +433,7 @@ impl Component for CompPDL { )); self.inbox_main.push(None); self.inbox_main.push(None); - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; } } } @@ -469,37 +496,16 @@ impl CompPDL { /// immediately fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component ending sync mode (now waiting for solution)"); - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; - self.handle_sync_decision(sched_ctx, comp_ctx, decision); - } - - /// Handles decision from the consensus round. This will cause a change in - /// the internal `Mode`, such that the next call to `run` can take the - /// appropriate next steps. - fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { - sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.exec_state.mode)); - match decision { - SyncRoundDecision::None => { - // No decision yet - return; - }, - SyncRoundDecision::Solution => { - debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); - self.exec_state.mode = CompMode::NonSync; - self.consensus.notify_sync_decision(decision); - }, - SyncRoundDecision::Failure => { - debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); - self.exec_state.mode = CompMode::StartExit; - }, - } + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - sched_ctx.log("Component exiting"); + sched_ctx.log(&format!("Component exiting (reason: {:?}", self.exec_state.exit_reason)); debug_assert_eq!(self.exec_state.mode, CompMode::StartExit); self.exec_state.mode = CompMode::BusyExit; + let exit_inside_sync = self.exec_state.exit_reason.is_in_sync(); // Doing this by index, then retrieving the handle is a bit rediculous, // but Rust is being Rust with its borrowing rules. @@ -516,7 +522,7 @@ impl CompPDL { // Notify peer of closing let port_handle = comp_ctx.get_port_handle(port_id); - let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx); + let (peer, message) = self.control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx); let peer_info = comp_ctx.get_peer(peer); peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } @@ -560,7 +566,36 @@ impl CompPDL { fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - self.handle_sync_decision(sched_ctx, comp_ctx, decision); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + } + + /// Handles an error coming from the generic `component::handle_xxx` + /// functions. Hence accepts argument as a tuple. + fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) { + // Retrieve location and message, display in terminal + let (location, message) = location_and_message; + let error = match location { + PortInstruction::None => CompError::Component(message), + PortInstruction::NoSource => unreachable!(), // for debugging: all in-sync errors are associated with a source location + PortInstruction::SourceLocation(expression_id) => { + let protocol = &sched_ctx.runtime.protocol; + CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, + expression_id, message + )) + } + }; + + sched_ctx.error(&format!("{}", error)); + + // Set state to handle subsequent error + let exit_reason = if self.exec_state.mode.is_in_sync_block() { + ExitReason::ErrorInSync + } else { + ExitReason::ErrorNonSync + }; + + self.exec_state.set_as_start_exit(exit_reason); } // ------------------------------------------------------------------------- diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index dce5d709a86cf6b0e73d63bef292113aa38f691c..dc8a63334b8e19a37604d51f35b79cd67734cf8d 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -1,11 +1,15 @@ use rand::prelude as random; use rand::RngCore; -use crate::protocol::eval::{ValueGroup, Value, EvalError}; +use crate::protocol::eval::{ValueGroup, Value}; use crate::runtime2::*; use super::*; -use super::component::{self, Component, CompExecState, CompScheduling, CompMode}; +use super::component::{ + self, + Component, CompExecState, CompScheduling, + CompMode, ExitReason +}; use super::control_layer::*; use super::consensus::*; @@ -42,19 +46,21 @@ impl Component for ComponentRandomU32 { Message::Data(_message) => unreachable!(), Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); }, Message::Control(message) => { - component::default_handle_control_message( + if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx - ); + ) { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + } }, Message::Poll => unreachable!(), } } - fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { @@ -73,7 +79,7 @@ impl Component for ComponentRandomU32 { } if self.num_sends >= self.max_num_sends { - self.exec_state.mode = CompMode::StartExit; + self.exec_state.set_as_start_exit(ExitReason::Termination); } else { sched_ctx.log("Entering sync mode"); self.did_perform_send = false; @@ -81,7 +87,7 @@ impl Component for ComponentRandomU32 { self.exec_state.mode = CompMode::Sync; } - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; }, CompMode::Sync => { // This component just sends a single message, then waits until @@ -94,34 +100,41 @@ impl Component for ComponentRandomU32 { random += self.random_minimum; let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]); - let scheduling = component::default_send_data_message( - &mut self.exec_state, self.output_port_id, value_group, + let send_result = component::default_send_data_message( + &mut self.exec_state, self.output_port_id, + PortInstruction::NoSource, value_group, sched_ctx, &mut self.consensus, comp_ctx ); - // Blocked or not, we set `did_perform_send` to true. If - // blocked then the moment we become unblocked (and are back - // at the `Sync` mode) we have sent the message. - self.did_perform_send = true; - self.num_sends += 1; - return Ok(scheduling) + if let Err(location_and_message) = send_result { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate + } else { + // Blocked or not, we set `did_perform_send` to true. If + // blocked then the moment we become unblocked (and are back + // at the `Sync` mode) we have sent the message. + let scheduling = send_result.unwrap(); + self.did_perform_send = true; + self.num_sends += 1; + return scheduling + } } else { // Message was sent, finish this sync round sched_ctx.log("Waiting for consensus"); self.exec_state.mode = CompMode::SyncEnd; - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); - return Ok(CompScheduling::Requeue); + let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + return CompScheduling::Requeue; } }, - CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), - CompMode::StartExit => return Ok(component::default_handle_start_exit( - &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx - )), - CompMode::BusyExit => return Ok(component::default_handle_busy_exit( + CompMode::SyncEnd | CompMode::BlockedPut => return CompScheduling::Sleep, + CompMode::StartExit => return component::default_handle_start_exit( + &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus + ), + CompMode::BusyExit => return component::default_handle_busy_exit( &mut self.exec_state, &self.control, sched_ctx - )), - CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), + ), + CompMode::Exit => return component::default_handle_exit(&self.exec_state), } } } diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 881a7f6fc3400aded9a2d379dc1f08f2f2212613..8ce9cc65383a4572dec0b20619390671d6e85d25 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -226,7 +226,11 @@ impl SolutionCombiner { let putter = channel.putter.as_ref().unwrap(); let getter = channel.getter.as_ref().unwrap(); - return Some(putter.mapping == getter.mapping); + return Some( + !putter.failed && + !getter.failed && + putter.mapping == getter.mapping + ); } /// Determines the global solution if all components have contributed their @@ -313,34 +317,24 @@ impl Consensus { /// Notifies the consensus management that the PDL code has reached the end /// of a sync block. A local solution will be submitted, after which we wait /// until the participants in the round (hopefully) reach a conclusion. - pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { + pub(crate) fn notify_sync_end_success(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { debug_assert_eq!(self.mode, Mode::SyncBusy); self.mode = Mode::SyncAwaitingSolution; - // Submit our port mapping as a solution - let mut local_solution = Vec::with_capacity(self.ports.len()); - for port in &self.ports { - if let Some(mapping) = port.mapping { - let port_handle = comp_ctx.get_port_handle(port.self_port_id); - let port_info = comp_ctx.get_port(port_handle); - let new_entry = match port_info.kind { - PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{ - self_comp_id: comp_ctx.id, - self_port_id: port_info.self_id, - mapping - }), - PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{ - self_comp_id: comp_ctx.id, - self_port_id: port_info.self_id, - peer_comp_id: port.peer_comp_id, - peer_port_id: port.peer_port_id, - mapping - }) - }; - local_solution.push(new_entry); - } - } + let local_solution = self.generate_local_solution(comp_ctx, false); + let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution); + return decision; + } + + /// Notifies the consensus management that the component has encountered a + /// critical error during the synchronous round. Hence we should report that + /// we've failed and wait until all the participants have been notified of + /// the error. + pub(crate) fn notify_sync_end_failure(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { + debug_assert_eq!(self.mode, Mode::SyncBusy); + self.mode = Mode::SyncAwaitingSolution; + let local_solution = self.generate_local_solution(comp_ctx, true); let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution); return decision; } @@ -380,7 +374,6 @@ impl Consensus { /// is used to determine peers of `get`ter ports. // TODO: The use of this function is rather ugly. Find a more robust // scheme about owners of `get`ter ports not knowing about their peers. - // (also, figure out why this was written again, I forgot). pub(crate) fn handle_incoming_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) { let target_handle = comp_ctx.get_port_handle(message.data_header.target_port); let target_index = comp_ctx.get_port_index(target_handle); @@ -642,9 +635,38 @@ impl Consensus { } // ------------------------------------------------------------------------- - // Creating message headers + // Small utilities // ------------------------------------------------------------------------- + fn generate_local_solution(&self, comp_ctx: &CompCtx, failed: bool) -> SyncLocalSolution { + let mut local_solution = Vec::with_capacity(self.ports.len()); + for port in &self.ports { + if let Some(mapping) = port.mapping { + let port_handle = comp_ctx.get_port_handle(port.self_port_id); + let port_info = comp_ctx.get_port(port_handle); + let new_entry = match port_info.kind { + PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{ + self_comp_id: comp_ctx.id, + self_port_id: port_info.self_id, + mapping, + failed + }), + PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{ + self_comp_id: comp_ctx.id, + self_port_id: port_info.self_id, + peer_comp_id: port.peer_comp_id, + peer_port_id: port.peer_port_id, + mapping, + failed + }) + }; + local_solution.push(new_entry); + } + } + + return local_solution; + } + fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader { let mut expected_mapping = Vec::with_capacity(self.ports.len()); let mut port_index = usize::MAX; diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 15f4cac2faa53c69514a2da1fc15064197ad6c27..17d575a6a87403c4d4ee81b8b29f8004a42df580 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -215,7 +215,7 @@ impl ControlLayer { /// Initiates the control message procedures for closing a port. Caller must /// make sure that the port state has already been set to `Closed`. - pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) { + pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, exit_inside_sync: bool, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) { let port = comp_ctx.get_port(port_handle); let peer_port_id = port.peer_port_id; debug_assert!(port.state == PortState::Closed); @@ -236,7 +236,10 @@ impl ControlLayer { id: entry_id, sender_comp_id: comp_ctx.id, target_port_id: Some(peer_port_id), - content: ControlMessageContent::ClosePort(peer_port_id), + content: ControlMessageContent::ClosePort(ControlMessageClosePort{ + port_to_close: peer_port_id, + closed_in_sync_round: exit_inside_sync, + }), } ); } diff --git a/src/runtime2/component/mod.rs b/src/runtime2/component/mod.rs index e0c03a618ed4196164f14d4721d4cd67ac5da257..e64f6bd1b2674b57f3bb68a2a470c5f51c2f3ebc 100644 --- a/src/runtime2/component/mod.rs +++ b/src/runtime2/component/mod.rs @@ -8,7 +8,7 @@ mod component_internet; pub(crate) use component::{Component, CompScheduling}; pub(crate) use component_pdl::{CompPDL}; -pub(crate) use component_context::CompCtx; +pub(crate) use component_context::{CompCtx, PortInstruction}; pub(crate) use control_layer::{ControlId}; use super::scheduler::*; diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs index 75cf8e9a90c00b6f9d785db8c8ce75221c69f0a3..541a3536be2429a8d61424d696731668c305774c 100644 --- a/src/runtime2/poll/mod.rs +++ b/src/runtime2/poll/mod.rs @@ -172,7 +172,6 @@ impl PollingThread { } pub(crate) fn run(&mut self) { - use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::Message; const NUM_EVENTS: usize = 256; diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index d3159c82aec89f2d63ea6b11a71917be3a285417..806afa09d70cd7d809d1b06eac90437ad16928a9 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -37,6 +37,11 @@ impl<'a> SchedulerCtx<'a> { println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text); } } + + pub(crate) fn error(&self, text: &str) { + // TODO: Probably not always use colour + println!("[s:{:02}, c:{:03}] \x1b[0;31m{}\x1b[0m", self.id, self.comp, text); + } } impl Scheduler { @@ -67,7 +72,7 @@ impl Scheduler { while let Some(message) = component.inbox.pop() { component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message); } - new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error"); + new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx); } // Handle the new scheduling diff --git a/src/runtime2/stdlib/internet.rs b/src/runtime2/stdlib/internet.rs index a3d150a9efea7e821564ec0521e37461918cb2dc..19235bcc6ba9280a75aa6770ad44135da2f89485 100644 --- a/src/runtime2/stdlib/internet.rs +++ b/src/runtime2/stdlib/internet.rs @@ -1,13 +1,12 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::mem::size_of; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::io::Error as IoError; use libc::{ c_int, sockaddr_in, sockaddr_in6, in_addr, in6_addr, socket, bind, listen, accept, connect, close, }; -use mio::{event, Interest, Registry, Token}; use crate::runtime2::poll::{AsFileDescriptor, FileDescriptor};