/* * Default toolkit for creating components. Contains handlers for initiating and * responding to various events. */ use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter}; use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId}; use crate::protocol::*; use crate::runtime2::*; use crate::runtime2::communication::*; use super::{CompCtx, CompPDL, CompId}; use super::component_context::*; use super::component_random::*; use super::component_internet::*; use super::control_layer::*; use super::consensus::*; pub enum CompScheduling { Immediate, Requeue, Sleep, Exit, } /// Potential error emitted by a component pub enum CompError { /// Error originating from the code executor. Hence has an associated /// source location. Executor(EvalError), /// Error originating from a component, but not necessarily associated with /// a location in the source. Component(String), // TODO: Maybe a different embedded value in the future? /// Pure runtime error. Not necessarily originating from the component /// itself. Should be treated as a very severe runtime-compromising error. Runtime(RtError), } impl FmtDisplay for CompError { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { match self { CompError::Executor(v) => v.fmt(f), CompError::Component(v) => v.fmt(f), CompError::Runtime(v) => v.fmt(f), } } } /// Generic representation of a component (as viewed by a scheduler). pub(crate) trait Component { /// Called upon the creation of the component. Note that the scheduler /// context is officially running another component (the component that is /// creating the new component). fn on_creation(&mut self, comp_id: CompId, sched_ctx: &SchedulerCtx); /// Called when a component crashes or wishes to exit. So is not called /// right before destruction, other components may still hold a handle to /// the component and send it messages! fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx); /// Called if the component is created by another component and the messages /// are being transferred between the two. fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage); /// Called if the component receives a new message. The component is /// responsible for deciding where that messages goes. fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message); /// Called if the component's routine should be executed. The return value /// can be used to indicate when the routine should be run again. fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling; } /// Representation of the generic operating mode of a component. Although not /// every state may be used by every kind of (builtin) component, this allows /// writing standard handlers for particular events in a component's lifetime. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum CompMode { NonSync, // not in sync mode Sync, // in sync mode, can interact with other components SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block BlockedGet, // blocked because we need to receive a message on a particular port BlockedPut, // component is blocked because the port is blocked BlockedSelect, // waiting on message to complete the select statement PutPortsBlockedTransferredPorts, // sending a message with ports, those sent ports are (partly) blocked PutPortsBlockedAwaitingAcks, // sent out PPC message for blocking transferred ports, now awaiting Acks PutPortsBlockedSendingPort, // sending a message with ports, message sent through a still-blocked port NewComponentBlocked, // waiting until ports are in the appropriate state to create a new component StartExit, // temporary state: if encountered then we start the shutdown process. BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } impl CompMode { pub(crate) fn is_in_sync_block(&self) -> bool { use CompMode::*; match self { Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | PutPortsBlockedTransferredPorts | PutPortsBlockedAwaitingAcks | PutPortsBlockedSendingPort => true, NonSync | NewComponentBlocked | StartExit | BusyExit | Exit => false, } } pub(crate) fn is_busy_exiting(&self) -> bool { use CompMode::*; match self { NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | PutPortsBlockedTransferredPorts | PutPortsBlockedAwaitingAcks | PutPortsBlockedSendingPort | NewComponentBlocked => false, StartExit | BusyExit => true, Exit => false, } } } #[derive(Debug)] pub(crate) enum ExitReason { Termination, // regular termination of component ErrorInSync, ErrorNonSync, } impl ExitReason { pub(crate) fn is_in_sync(&self) -> bool { use ExitReason::*; match self { Termination | ErrorNonSync => false, ErrorInSync => true, } } pub(crate) fn is_error(&self) -> bool { use ExitReason::*; match self { Termination => false, ErrorInSync | ErrorNonSync => true, } } } /// Component execution state: the execution mode along with some descriptive /// fields. Fields are public for ergonomic reasons, use member functions when /// appropriate. pub(crate) struct CompExecState { pub mode: CompMode, pub mode_port: PortId, // valid if blocked on a port (put/get) pub mode_value: ValueGroup, // valid if blocked on a put pub mode_component: (ProcedureDefinitionId, TypeId), pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode } impl CompExecState { pub(crate) fn new() -> Self { return Self{ mode: CompMode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), mode_component: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()), exit_reason: ExitReason::Termination, } } pub(crate) fn set_as_start_exit(&mut self, reason: ExitReason) { self.mode = CompMode::StartExit; self.exit_reason = reason; } pub(crate) fn set_as_blocked_get(&mut self, port: PortId) { self.mode = CompMode::BlockedGet; self.mode_port = port; debug_assert!(self.mode_value.values.is_empty()); } pub(crate) fn set_as_create_component_blocked( &mut self, proc_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup ) { self.mode = CompMode::NewComponentBlocked; self.mode_value = arguments; self.mode_component = (proc_id, type_id); } pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool { return self.mode == CompMode::BlockedGet && self.mode_port == port; } pub(crate) fn set_as_blocked_put_without_ports(&mut self, port: PortId, value: ValueGroup) { self.mode = CompMode::BlockedPut; self.mode_port = port; self.mode_value = value; } pub(crate) fn set_as_blocked_put_with_ports(&mut self, port: PortId, value: ValueGroup) { self.mode = CompMode::PutPortsBlockedTransferredPorts; self.mode_port = port; self.mode_value = value; } pub(crate) fn is_blocked_on_put_without_ports(&self, port: PortId) -> bool { return self.mode == CompMode::BlockedPut && self.mode_port == port; } pub(crate) fn is_blocked_on_create_component(&self) -> bool { return self.mode == CompMode::NewComponentBlocked; } } // TODO: Replace when implementing port sending. Should probably be incorporated // into CompCtx (and rename CompCtx into CompComms) pub(crate) type InboxMain = Vec>; pub(crate) type InboxMainRef = [Option]; pub(crate) type InboxBackup = Vec; /// Creates a new component based on its definition. Meaning that if it is a /// user-defined component then we set up the PDL code state. Otherwise we /// construct a custom component. This does NOT take care of port and message /// management. pub(crate) fn create_component( protocol: &ProtocolDescription, definition_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup, num_ports: usize ) -> Box { let definition = &protocol.heap[definition_id]; debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite); if definition.source.is_builtin() { // Builtin component let component: Box = match definition.source { ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)), ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)), _ => unreachable!(), }; return component; } else { // User-defined component let prompt = Prompt::new( &protocol.types, &protocol.heap, definition_id, type_id, arguments ); let component = CompPDL::new(prompt, num_ports); return Box::new(component); } } // ----------------------------------------------------------------------------- // Generic component messaging utilities (for sending and receiving) // ----------------------------------------------------------------------------- /// Default handling of sending a data message. In case the port is blocked then /// the `ExecState` will become blocked as well. Note that /// `default_handle_control_message` will ensure that the port becomes /// unblocked if so instructed by the receiving component. The returned /// scheduling value must be used. #[must_use] pub(crate) fn default_send_data_message( exec_state: &mut CompExecState, transmitting_port_id: PortId, port_instruction: PortInstruction, value: ValueGroup, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, control: &mut ControlLayer, comp_ctx: &mut CompCtx ) -> Result { debug_assert_eq!(exec_state.mode, CompMode::Sync); let port_handle = comp_ctx.get_port_handle(transmitting_port_id); let port_info = comp_ctx.get_port_mut(port_handle); port_info.last_instruction = port_instruction; let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); let mut ports = Vec::new(); find_ports_in_value_group(&value, &mut ports); if port_info.state.is_closed() { // Note: normally peer is eventually consistent, but if it has shut down // then we can be sure it is consistent (I think?) return Err(( port_info.last_instruction, format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0) )) } else if !ports.is_empty() { start_send_message_with_ports( transmitting_port_id, port_instruction, value, exec_state, comp_ctx, sched_ctx, control )?; return Ok(CompScheduling::Sleep); } else if port_info.state.is_blocked() { // Port is blocked, so we cannot send exec_state.set_as_blocked_put_without_ports(transmitting_port_id, value); return Ok(CompScheduling::Sleep); } else { // Port is not blocked and no ports to transfer: send to the peer let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); return Ok(CompScheduling::Immediate); } } pub(crate) enum IncomingData { PlacedInSlot, SlotFull(DataMessage), } /// Default handling of receiving a data message. In case there is no room for /// the message it is returned from this function. Note that this function is /// different from PDL code performing a `get` on a port; this is the case where /// the message first arrives at the component. // NOTE: This is supposed to be a somewhat temporary implementation. It would be // nicest if the sending component can figure out it cannot send any more data. #[must_use] pub(crate) fn default_handle_incoming_data_message( exec_state: &mut CompExecState, inbox_main: &mut InboxMain, comp_ctx: &mut CompCtx, incoming_message: DataMessage, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> IncomingData { let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); comp_ctx.get_port_mut(port_handle).received_message_for_sync = true; let port_value_slot = &mut inbox_main[port_index]; let target_port_id = incoming_message.data_header.target_port; if port_value_slot.is_none() { // We can put the value in the slot *port_value_slot = Some(incoming_message); // Check if we're blocked on receiving this message. dbg_code!({ // Our port cannot have been blocked itself, because we're able to // directly insert the message into its slot. assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); }); if exec_state.is_blocked_on_get(target_port_id) { // Return to normal operation exec_state.mode = CompMode::Sync; exec_state.mode_port = PortId::new_invalid(); debug_assert!(exec_state.mode_value.values.is_empty()); } return IncomingData::PlacedInSlot } else { // Slot is already full, so if the port was previously opened, it will // now become closed let port_info = comp_ctx.get_port_mut(port_handle); if port_info.state.is_open() { port_info.state.set(PortStateFlag::BlockedDueToFullBuffers); let (peer_handle, message) = control.initiate_port_blocking(comp_ctx, port_handle); let peer = comp_ctx.get_peer(peer_handle); peer.handle.send_message_logged(sched_ctx, Message::Control(message), true); } return IncomingData::SlotFull(incoming_message) } } pub(crate) enum GetResult { Received(DataMessage), NoMessage, Error((PortInstruction, String)), } /// Default attempt at trying to receive from a port (i.e. through a `get`, or /// the equivalent operation for a builtin component). `target_port` is the port /// we're trying to receive from, and the `target_port_instruction` is the /// instruction we're attempting on this port. pub(crate) fn default_attempt_get( exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus ) -> GetResult { let port_handle = comp_ctx.get_port_handle(target_port); let port_index = comp_ctx.get_port_index(port_handle); let port_info = comp_ctx.get_port_mut(port_handle); port_info.last_instruction = target_port_instruction; if port_info.state.is_closed() { let peer_id = port_info.peer_comp_id; return GetResult::Error(( target_port_instruction, format!("Cannot get from this port, as the peer component (id:{}) closed the port", peer_id.0) )); } if let Some(message) = &inbox_main[port_index] { if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // We're allowed to receive this message let mut message = inbox_main[port_index].take().unwrap(); debug_assert_eq!(target_port, message.data_header.target_port); // Note: we can still run into an unrecoverable error when actually // receiving this message match default_handle_received_data_message( target_port, target_port_instruction, &mut message, inbox_main, inbox_backup, comp_ctx, sched_ctx, control, ) { Ok(()) => return GetResult::Received(message), Err(location_and_message) => return GetResult::Error(location_and_message) } } else { // We're not allowed to receive this message. This means that the // receiver is attempting to receive something out of order with // respect to the sender. return GetResult::Error((target_port_instruction, String::from( "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s" ))); } } else { // We don't have a message waiting for us and the port is not blocked. // So enter the BlockedGet state exec_state.set_as_blocked_get(target_port); return GetResult::NoMessage; } } /// Default handling that has been received through a `get`. Will check if any /// more messages are waiting, and if the corresponding port was blocked because /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> Result<(), (PortInstruction, String)> { let port_handle = comp_ctx.get_port_handle(targeted_port); let port_index = comp_ctx.get_port_index(port_handle); debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it // If we received any ports, add them to the port tracking and inbox struct. // Then notify the peers that they can continue sending to this port, but // now at a new address. for received_port in &mut message.ports { // Transfer messages to main/backup inbox let _new_inbox_index = inbox_main.len(); if !received_port.messages.is_empty() { inbox_main.push(Some(received_port.messages.remove(0))); inbox_backup.extend(received_port.messages.drain(..)); } else { inbox_main.push(None); } // Create a new port locally let mut new_port_state = received_port.state; new_port_state.set(PortStateFlag::Received); let new_port_handle = comp_ctx.add_port( received_port.peer_comp, received_port.peer_port, received_port.kind, new_port_state ); debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle)); comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp)); let new_port = comp_ctx.get_port(new_port_handle); // Replace all references to the port in the received message for message_location in received_port.locations.iter().copied() { let value = message.content.get_value_mut(message_location); match value { Value::Input(_) => { debug_assert_eq!(new_port.kind, PortKind::Getter); *value = Value::Input(port_id_to_eval(new_port.self_id)); }, Value::Output(_) => { debug_assert_eq!(new_port.kind, PortKind::Putter); *value = Value::Output(port_id_to_eval(new_port.self_id)); }, _ => unreachable!(), } } // Let the peer know that the port can now be used let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{ id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(new_port.peer_port_id), content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id) }), true); } // Modify last-known location where port instruction was retrieved let port_info = comp_ctx.get_port(port_handle); debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller debug_assert!(port_info.state.is_open()); // checked by caller // Check if there are any more messages in the backup buffer for message_index in 0..inbox_backup.len() { let message = &inbox_backup[message_index]; if message.data_header.target_port == targeted_port { // One more message, place it in the slot let message = inbox_backup.remove(message_index); debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup inbox_main[port_index] = Some(message); return Ok(()); } } // Did not have any more messages, so if we were blocked, then we need to // unblock the port now (and inform the peer of this unblocking) if port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers) { let port_info = comp_ctx.get_port_mut(port_handle); port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle); let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true); } return Ok(()); } /// Handles control messages in the default way. Note that this function may /// take a lot of actions in the name of the caller: pending messages may be /// sent, ports may become blocked/unblocked, etc. So the execution /// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`) /// state may all change. pub(crate) fn default_handle_control_message( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) -> Result<(), (PortInstruction, String)> { match message.content { ControlMessageContent::Ack => { default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?; }, ControlMessageContent::BlockPort => { // One of our messages was accepted, but the port should be // blocked. let port_to_block = message.target_port_id.unwrap(); let port_handle = comp_ctx.get_port_handle(port_to_block); let port_info = comp_ctx.get_port_mut(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); port_info.state.set(PortStateFlag::BlockedDueToFullBuffers); }, ControlMessageContent::ClosePort(content) => { // Request to close the port. We immediately comply and remove // the component handle as well let port_to_close = message.target_port_id.unwrap(); let port_handle = comp_ctx.get_port_handle(port_to_close); // We're closing the port, so we will always update the peer of the // port (in case of error messages) let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = message.sender_comp_id; port_info.close_at_sync_end = true; // might be redundant (we might set it closed now) let peer_comp_id = port_info.peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); // One exception to sending an `Ack` is if we just closed the // port ourselves, meaning that the `ClosePort` messages got // sent to one another. if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) { // The two components (sender and this component) are closing // the channel at the same time. So we don't care about the // content of the `ClosePort` message. default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?; } else { // Respond to the message let port_info = comp_ctx.get_port(port_handle); let last_instruction = port_info.last_instruction; let port_has_had_message = port_info.received_message_for_sync; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); comp_ctx.change_port_peer(sched_ctx, port_handle, None); // Handle any possible error conditions (which boil down to: the // port has been used, but the peer has died). If not in sync // mode then we close the port immediately. // Note that `port_was_used` does not mean that any messages // were actually received. It might also mean that e.g. the // component attempted a `get`, but there were no messages, so // now it is in the `BlockedGet` state. let port_was_used = last_instruction != PortInstruction::None; if exec_state.mode.is_in_sync_block() { let closed_during_sync_round = content.closed_in_sync_round && port_was_used; let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used; if closed_during_sync_round || closed_before_sync_round { return Err(( last_instruction, format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0) )); } } else { let port_info = comp_ctx.get_port_mut(port_handle); port_info.state.set(PortStateFlag::Closed); } } }, ControlMessageContent::UnblockPort => { // We were previously blocked (or already closed) let port_to_unblock = message.target_port_id.unwrap(); let port_handle = comp_ctx.get_port_handle(port_to_unblock); let port_info = comp_ctx.get_port_mut(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); default_handle_recently_unblocked_port( exec_state, control, consensus, port_handle, sched_ctx, comp_ctx, inbox_main, inbox_backup )?; }, ControlMessageContent::PortPeerChangedBlock => { // The peer of our port has just changed. So we are asked to // temporarily block the port (while our original recipient is // potentially rerouting some of the in-flight messages) and // Ack. Then we wait for the `unblock` call. let port_to_change = message.target_port_id.unwrap(); let port_handle = comp_ctx.get_port_handle(port_to_change); let port_info = comp_ctx.get_port_mut(port_handle); let peer_comp_id = port_info.peer_comp_id; port_info.state.set(PortStateFlag::BlockedDueToPeerChange); let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); }, ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { let port_to_change = message.target_port_id.unwrap(); let port_handle = comp_ctx.get_port_handle(port_to_change); let port_info = comp_ctx.get_port(port_handle); debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange)); let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_port_id = new_port_id; port_info.state.clear(PortStateFlag::BlockedDueToPeerChange); comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id)); default_handle_recently_unblocked_port( exec_state, control, consensus, port_handle, sched_ctx, comp_ctx, inbox_main, inbox_backup )?; } } return Ok(()); } /// Handles a component entering the synchronous block. Will ensure that the /// `Consensus` and the `ComponentCtx` are initialized properly. pub(crate) fn default_handle_sync_start( exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) { sched_ctx.info("Component starting sync mode"); // If any messages are present for this sync round, set the appropriate flag // and notify the consensus handler of the present messages consensus.notify_sync_start(comp_ctx); for (port_index, message) in inbox_main.iter().enumerate() { if let Some(message) = message { consensus.handle_incoming_data_message(comp_ctx, message); let port_info = comp_ctx.get_port_by_index_mut(port_index); port_info.received_message_for_sync = true; } } // Modify execution state debug_assert_eq!(exec_state.mode, CompMode::NonSync); exec_state.mode = CompMode::Sync; } /// Handles a component that has reached the end of the sync block. This does /// not necessarily mean that the component will go into the `NonSync` mode, as /// it might have to wait for the leader to finish the round for everyone (see /// `default_handle_sync_decision`) pub(crate) fn default_handle_sync_end( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) { sched_ctx.info("Component ending sync mode (but possibly waiting for a solution)"); debug_assert_eq!(exec_state.mode, CompMode::Sync); let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx); exec_state.mode = CompMode::SyncEnd; default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus); } /// Handles a component initiating the exiting procedure, and closing all of its /// ports. Should only be called once per component (which is ensured by /// checking and modifying the mode in the execution state). #[must_use] pub(crate) fn default_handle_start_exit( exec_state: &mut CompExecState, control: &mut ControlLayer, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); for port_index in 0..comp_ctx.num_ports() { let port_info = comp_ctx.get_port_by_index_mut(port_index); if port_info.state.is_blocked() { return CompScheduling::Sleep; } } sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason)); exec_state.mode = CompMode::BusyExit; let exit_inside_sync = exec_state.exit_reason.is_in_sync(); // If exiting while inside sync mode, report to the leader of the current // round that we've failed. if exit_inside_sync { let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx); default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus); } // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); println!("DEBUG: Considering port:\n{:?}", port); if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end { // Already closed, or in the process of being closed continue; } // Mark as closed let port_id = port.self_id; port.state.set(PortStateFlag::Closed); // Notify peer of closing let port_handle = comp_ctx.get_port_handle(port_id); let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx); let peer_info = comp_ctx.get_peer(peer); peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true); } return CompScheduling::Immediate; // to check if we can shut down immediately } /// Handles a component waiting until all peers are notified that it is quitting /// (i.e. after calling `default_handle_start_exit`). #[must_use] pub(crate) fn default_handle_busy_exit( exec_state: &mut CompExecState, control: &ControlLayer, sched_ctx: &SchedulerCtx ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::BusyExit); if control.has_acks_remaining() { sched_ctx.info("Component busy exiting, still has `Ack`s remaining"); return CompScheduling::Sleep; } else { sched_ctx.info("Component busy exiting, now shutting down"); exec_state.mode = CompMode::Exit; return CompScheduling::Exit; } } /// Handles a potential synchronous round decision. If there was a decision then /// the `Some(success)` value indicates whether the round succeeded or not. /// Might also end up changing the `ExecState`. /// /// Might be called in two cases: /// 1. The component is in regular execution mode, at the end of a sync round, /// and is waiting for a solution to the round. /// 2. The component has encountered an error during a sync round and is /// exiting, hence is waiting for a "Failure" message from the leader. pub(crate) fn default_handle_sync_decision( sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, decision: SyncRoundDecision, consensus: &mut Consensus ) -> Option { let success = match decision { SyncRoundDecision::None => return None, SyncRoundDecision::Solution => true, SyncRoundDecision::Failure => false, }; debug_assert!( exec_state.mode == CompMode::SyncEnd || ( exec_state.mode.is_busy_exiting() && exec_state.exit_reason.is_error() ) || ( exec_state.mode.is_in_sync_block() && decision == SyncRoundDecision::Failure ) ); sched_ctx.info(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode)); consensus.notify_sync_decision(decision); if success { // We cannot get a success message if the component has encountered an // error. for port_index in 0..comp_ctx.num_ports() { let port_info = comp_ctx.get_port_by_index_mut(port_index); if port_info.close_at_sync_end { port_info.state.set(PortStateFlag::Closed); } port_info.state.clear(PortStateFlag::Received); } debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); exec_state.mode = CompMode::NonSync; return Some(true); } else { // We may get failure both in all possible cases. But we should only // modify the execution state if we're not already in exit mode if !exec_state.mode.is_busy_exiting() { sched_ctx.error("failed synchronous round, initiating exit"); exec_state.set_as_start_exit(ExitReason::ErrorNonSync); } return Some(false); } } pub(crate) fn default_start_create_component( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, definition_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup ) { debug_assert_eq!(exec_state.mode, CompMode::NonSync); let mut transferred_ports = Vec::new(); find_ports_in_value_group(&arguments, &mut transferred_ports); // Set execution state as waiting until we can create the component. If we // can do so right away, then we will. exec_state.set_as_create_component_blocked(definition_id, type_id, arguments); if ports_not_blocked(comp_ctx, &transferred_ports) { perform_create_component(exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup); } } /// Actually creates a component (and assumes that the caller made sure that /// none of the ports are involved in a blocking operation). pub(crate) fn perform_create_component( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx, control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { // Small internal utilities struct PortPair { instantiator_id: PortId, instantiator_handle: LocalPortHandle, created_id: PortId, created_handle: LocalPortHandle, is_open: bool, } // Retrieve ports from the arguments debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked); let (procedure_id, procedure_type_id) = exec_state.mode_component; let mut arguments = exec_state.mode_value.take(); let mut ports = Vec::new(); find_ports_in_value_group(&arguments, &mut ports); debug_assert!(ports_not_blocked(instantiator_ctx, &ports)); // Reserve a location for the new component let reservation = sched_ctx.runtime.start_create_component(); let mut created_ctx = CompCtx::new(&reservation); let mut port_pairs = Vec::with_capacity(ports.len()); // Go over all the ports that will be transferred. Since the ports will get // a new ID in the new component, we will take care of that here. for (port_location, instantiator_port_id) in &ports { // Retrieve port information from instantiator let instantiator_port_id = *instantiator_port_id; let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id); let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle); // Create port at created component let created_port_handle = created_ctx.add_port( instantiator_port.peer_comp_id, instantiator_port.peer_port_id, instantiator_port.kind, instantiator_port.state ); let created_port = created_ctx.get_port(created_port_handle); let created_port_id = created_port.self_id; // Modify port ID in the arguments to the new component and store them // for later access let is_open = instantiator_port.state.is_open(); port_pairs.push(PortPair{ instantiator_id: instantiator_port_id, instantiator_handle: instantiator_port_handle, created_id: created_port_id, created_handle: created_port_handle, is_open, }); for location in port_location.iter().copied() { let value = arguments.get_value_mut(location); match value { Value::Input(id) => *id = port_id_to_eval(created_port_id), Value::Output(id) => *id = port_id_to_eval(created_port_id), _ => unreachable!(), } } } // For each of the ports in the newly created component we set the peer to // the correct value. We will not yet change the peer on the instantiator's // ports (as we haven't yet stored the new component in the runtime's // component storage) let mut created_component_has_remote_peers = false; for pair in port_pairs.iter() { let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle); let created_port_info = created_ctx.get_port_mut(pair.created_handle); if created_port_info.peer_comp_id == instantiator_ctx.id { // The peer of the created component's port seems to be the // instantiator. let created_port_peer_index = port_pairs.iter() .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id); match created_port_peer_index { Some(created_port_peer_index) => { // However, the peer port is also moved to the new // component, so the complete channel is owned by the new // component. let peer_pair = &port_pairs[created_port_peer_index]; created_port_info.peer_port_id = peer_pair.created_id; created_port_info.peer_comp_id = reservation.id(); }, None => { // Peer port remains with instantiator. However, we cannot // set the peer on the instantiator yet, because the new // component has not yet been stored in the runtime's // component storage. So we do this later created_port_info.peer_comp_id = instantiator_ctx.id; if pair.is_open { created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id)); } } } } else { // Peer is a different component if pair.is_open { // And the port is still open, so we need to notify the peer let peer_handle = instantiator_ctx.get_peer_handle(created_port_info.peer_comp_id); let peer_info = instantiator_ctx.get_peer(peer_handle); created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id)); created_component_has_remote_peers = true; } } } // Now we store the new component into the runtime's component storage using // the reservation. let component = create_component( &sched_ctx.runtime.protocol, procedure_id, procedure_type_id, arguments, port_pairs.len() ); let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component( reservation, component, created_ctx, false ); let created_ctx = &mut created_runtime_component.ctx; let created_component = &mut created_runtime_component.component; created_component.on_creation(created_key.downgrade(), sched_ctx); // We now pass along the messages that the instantiator component still has // that belong to the new component. At the same time we'll take care of // setting the correct peer of the instantiator component for pair in port_pairs.iter() { // Transferring the messages and removing the port from the // instantiator component let instantiator_port_index = instantiator_ctx.get_port_index(pair.instantiator_handle); instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None); instantiator_ctx.remove_port(pair.instantiator_handle); if let Some(mut message) = inbox_main[instantiator_port_index].take() { message.data_header.target_port = pair.created_id; created_component.adopt_message(created_ctx, message); } let mut message_index = 0; while message_index < inbox_backup.len() { let message = &inbox_backup[message_index]; if message.data_header.target_port == pair.instantiator_id { // Transfer the message let mut message = inbox_backup.remove(message_index); message.data_header.target_port = pair.created_id; created_component.adopt_message(created_ctx, message); } else { // Message does not belong to the port pair that we're // transferring to the new component. message_index += 1; } } // Here we take care of the case where the instantiator previously owned // both ends of the channel, but has transferred one port to the new // component (hence creating a channel between the instantiator // component and the new component). let created_port_info = created_ctx.get_port(pair.created_handle); if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id { // Note: the port we're receiving here belongs to the instantiator // and is NOT in the "port_pairs" array. let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id); let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle); instantiator_port_info.peer_port_id = created_port_info.self_id; instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id)); } } // Finally: if we did move ports around whose peers are different // components, then we'll initiate the appropriate protocol to notify them. if created_component_has_remote_peers { let schedule_entry_id = control.add_schedule_entry(created_ctx.id); for pair in &port_pairs { let port_info = created_ctx.get_port(pair.created_handle); if pair.is_open && port_info.peer_comp_id != instantiator_ctx.id && port_info.peer_comp_id != created_ctx.id { // Peer component is not the instantiator, and it is not the // new component itself let message = control.add_reroute_entry( instantiator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id, pair.instantiator_id, pair.created_id, created_ctx.id, schedule_entry_id ); let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = created_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, message, true); } } } else { // We can schedule the component immediately, we do not have to wait // for any peers: there are none. sched_ctx.runtime.enqueue_work(created_key); } exec_state.mode = CompMode::NonSync; exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()); } pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> bool { for (_port_locations, port_id) in ports { let port_handle = comp_ctx.get_port_handle(*port_id); let port_info = comp_ctx.get_port(port_handle); if port_info.state.is_blocked_due_to_port_change() { return false; } } return true; } /// Performs the default action of printing the provided error, and then putting /// the component in the state where it will shut down. Only to be used for /// builtin components: their error message construction is simpler (and more /// common) as they don't have any source code. pub(crate) fn default_handle_error_for_builtin( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String) ) { let (_location, message) = location_and_message; sched_ctx.error(&message); let exit_reason = if exec_state.mode.is_in_sync_block() { ExitReason::ErrorInSync } else { ExitReason::ErrorNonSync }; exec_state.set_as_start_exit(exit_reason); } #[inline] pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { debug_assert_eq!(_exec_state.mode, CompMode::Exit); return CompScheduling::Exit; } // ----------------------------------------------------------------------------- // Internal messaging/state utilities // ----------------------------------------------------------------------------- /// Sends a message without any transmitted ports. Does not check if sending /// is actually valid. fn send_message_without_ports( sending_port_handle: LocalPortHandle, value: ValueGroup, comp_ctx: &CompCtx, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, ) { let port_info = comp_ctx.get_port(sending_port_handle); debug_assert!(port_info.state.can_send()); let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); } /// Prepares sending a message that contains ports. Only once a particular /// protocol has completed (where we notify all the peers that the ports will /// be transferred) will we actually send the message to the recipient. fn start_send_message_with_ports( sending_port_id: PortId, sending_port_instruction: PortInstruction, value: ValueGroup, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> Result<(), (PortInstruction, String)> { debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send // Retrieve ports we're going to transfer let sending_port_handle = comp_ctx.get_port_handle(sending_port_id); let sending_port_info = comp_ctx.get_port_mut(sending_port_handle); sending_port_info.last_instruction = sending_port_instruction; let mut transmit_ports = Vec::new(); find_ports_in_value_group(&value, &mut transmit_ports); debug_assert!(!transmit_ports.is_empty()); // required from caller // Enter the state where we'll wait until all transferred ports are not // blocked. exec_state.set_as_blocked_put_with_ports(sending_port_id, value); if ports_not_blocked(comp_ctx, &transmit_ports) { // Ports are not blocked, so we can send them right away. perform_send_message_with_ports_notify_peers( exec_state, comp_ctx, sched_ctx, control, transmit_ports )?; } // else: wait until they become unblocked return Ok(()) } fn perform_send_message_with_ports_notify_peers( exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer, transmit_ports: EncounteredPorts ) -> Result<(), (PortInstruction, String)> { // Check we're in the correct state in debug mode debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedTransferredPorts); debug_assert!(ports_not_blocked(comp_ctx, &transmit_ports)); // Set up the final Ack that triggers us to send our final message let unblock_put_control_id = control.add_unblock_put_with_ports_entry(); for (_, port_id) in &transmit_ports { let transmit_port_handle = comp_ctx.get_port_handle(*port_id); let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle); let peer_comp_id = transmit_port_info.peer_comp_id; let peer_port_id = transmit_port_info.peer_port_id; // Note: we checked earlier that we are currently in sync mode. Now we // will check if we've already used the port we're about to transmit. if !transmit_port_info.last_instruction.is_none() { let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port); let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction; return Err(( sending_port_instruction, String::from("Cannot transmit one of the ports in this message, as it is used in this sync round") )); } if transmit_port_info.state.is_set(PortStateFlag::Transmitted) { let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port); let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction; return Err(( sending_port_instruction, String::from("Cannot transmit one of the ports in this message, as that port is already transmitted") )); } // Set the flag for transmission transmit_port_info.state.set(PortStateFlag::Transmitted); // Block the peer of the port let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id); println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message); let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, message, true); } // We've set up the protocol, once all the PPC's are blocked we are supposed // to transfer the message to the recipient. So store it temporarily exec_state.mode = CompMode::PutPortsBlockedAwaitingAcks; return Ok(()); } /// Performs the transmission of a data message that contains ports. These were /// all stored in the component's execution state by the /// `prepare_send_message_with_ports` function. Port must be ready to send! fn perform_send_message_with_ports_to_receiver( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) -> Result<(), (PortInstruction, String)> { debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedSendingPort); // Find all ports again let mut transmit_ports = Vec::new(); find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports); // Retrieve the port over which we're going to send the message let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); let port_info = comp_ctx.get_port(port_handle); if !port_info.state.is_open() { return Err(( port_info.last_instruction, String::from("cannot send over this port, as it is closed") )); } debug_assert!(!port_info.state.is_blocked_due_to_port_change()); // caller should have checked this let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); // Change state back to its default exec_state.mode = CompMode::Sync; let message_value = exec_state.mode_value.take(); exec_state.mode_port = PortId::new_invalid(); // Annotate the data message let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value); // And further enhance the message by adding data about the ports that are // being transferred for (port_locations, transmit_port_id) in transmit_ports { let transmit_port_handle = comp_ctx.get_port_handle(transmit_port_id); let transmit_port_info = comp_ctx.get_port(transmit_port_handle); let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup); let mut transmit_port_state = transmit_port_info.state; debug_assert!(transmit_port_state.is_set(PortStateFlag::Transmitted)); transmit_port_state.clear(PortStateFlag::Transmitted); annotated_message.ports.push(TransmittedPort{ locations: port_locations, messages: transmit_messages, peer_comp: transmit_port_info.peer_comp_id, peer_port: transmit_port_info.peer_port_id, kind: transmit_port_info.kind, state: transmit_port_state }); comp_ctx.change_port_peer(sched_ctx, transmit_port_handle, None); } // And finally, send the message to the peer let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); return Ok(()); } /// Handles an `Ack` for the control layer. fn default_handle_ack( exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) -> Result<(), (PortInstruction, String)>{ // Since an `Ack` may cause another one, handle them in a loop let mut to_ack = control_id; loop { let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx); match action { AckAction::SendMessage(target_comp, message) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(target_comp); handle.send_message_logged(sched_ctx, Message::Control(message), true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); }, AckAction::ScheduleComponent(to_schedule) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(to_schedule); // Note that the component is intentionally not // sleeping, so we just wake it up debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); let key = unsafe { to_schedule.upgrade() }; sched_ctx.runtime.enqueue_work(key); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); }, AckAction::UnblockPutWithPorts => { // Send the message (containing ports) stored in the component // execution state to the recipient println!("DEBUG: Unblocking put with ports"); debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks); exec_state.mode = CompMode::PutPortsBlockedSendingPort; let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); // Little bit of a hack, we didn't really unblock the sending // port, but this will mesh nicely with waiting for the sending // port to become unblocked. default_handle_recently_unblocked_port( exec_state, control, consensus, port_handle, sched_ctx, comp_ctx, inbox_main, inbox_backup )?; }, AckAction::None => {} } match new_to_ack { Some(new_to_ack) => to_ack = new_to_ack, None => break, } } return Ok(()); } /// Little helper for sending the most common kind of `Ack` fn default_send_ack( causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx ) { let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{ id: causer_of_ack_id, sender_comp_id: comp_ctx.id, target_port_id: None, content: ControlMessageContent::Ack }), true); } /// Handles the unblocking of a putter port. In case there is a pending message /// on that port then it will be sent. There are two reasons for calling this /// function: either a port was blocked (i.e. the Blocked state flag was /// cleared), or the component is ready to send a message containing ports /// (stored in the execution state). In this latter case we might still have /// a blocked port. fn default_handle_recently_unblocked_port( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) -> Result<(), (PortInstruction, String)> { let port_info = comp_ctx.get_port_mut(port_handle); let port_id = port_info.self_id; if port_info.state.is_blocked() { // Port is still blocked. We wait until the next control message where // we unblock the port. return Ok(()); } if exec_state.is_blocked_on_put_without_ports(port_id) { // Annotate the message that we're going to send let port_info = comp_ctx.get_port(port_handle); // for immutable access debug_assert_eq!(port_info.kind, PortKind::Putter); let to_send = exec_state.mode_value.take(); let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send); // Retrieve peer to send the message let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true); // Return to the regular execution mode exec_state.mode = CompMode::Sync; exec_state.mode_port = PortId::new_invalid(); } else if exec_state.mode == CompMode::PutPortsBlockedTransferredPorts { // We are waiting until all of the transferred ports become unblocked, // check so here. let mut transfer_ports = Vec::new(); find_ports_in_value_group(&exec_state.mode_value, &mut transfer_ports); if ports_not_blocked(comp_ctx, &transfer_ports) { perform_send_message_with_ports_notify_peers( exec_state, comp_ctx, sched_ctx, control, transfer_ports )?; } } else if exec_state.mode == CompMode::PutPortsBlockedSendingPort && exec_state.mode_port == port_id { // We checked above that the port became unblocked, so we can send the // message perform_send_message_with_ports_to_receiver( exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup )?; } else if exec_state.is_blocked_on_create_component() { let mut ports = Vec::new(); find_ports_in_value_group(&exec_state.mode_value, &mut ports); if ports_not_blocked(comp_ctx, &ports) { perform_create_component( exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup ); } } return Ok(()); } #[inline] pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId { return PortId(port_id.id); } #[inline] pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId { return EvalPortId{ id: port_id.0 }; } // TODO: Optimize double vec type EncounteredPorts = Vec<(Vec, PortId)>; /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut EncounteredPorts) { // Helper to check a value for a port and recurse if needed. fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut EncounteredPorts) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port let cur_port = PortId(port_id.id); for prev_port in ports.iter_mut() { if prev_port.1 == cur_port { // Already added prev_port.0.push(value_location); return; } } ports.push((vec![value_location], cur_port)); }, Value::Array(heap_pos) | Value::Message(heap_pos) | Value::String(heap_pos) | Value::Struct(heap_pos) | Value::Union(_, heap_pos) => { // Reference to some dynamic thing which might contain ports, // so recurse let heap_region = &group.regions[*heap_pos as usize]; for (value_index, embedded_value) in heap_region.iter().enumerate() { let value_location = ValueId::Heap(*heap_pos, value_index as u32); find_port_in_value(group, embedded_value, value_location, ports); } }, _ => {}, // values we don't care about } } // Clear the ports, then scan all the available values ports.clear(); for (value_index, value) in value_group.values.iter().enumerate() { find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports); } } /// Goes through the inbox of a component and takes out all the messages that /// are targeted at a specific port pub(crate) fn take_port_messages( comp_ctx: &CompCtx, port_id: PortId, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) -> Vec { let mut messages = Vec::new(); let port_handle = comp_ctx.get_port_handle(port_id); let port_index = comp_ctx.get_port_index(port_handle); if let Some(message) = inbox_main[port_index].take() { messages.push(message); } let mut message_index = 0; while message_index < inbox_backup.len() { let message = &inbox_backup[message_index]; if message.data_header.target_port == port_id { let message = inbox_backup.remove(message_index); messages.push(message); } else { message_index += 1; } } return messages; }