use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId}; use crate::protocol::*; use crate::runtime2::*; use crate::runtime2::communication::*; use super::{CompCtx, CompPDL, CompId}; use super::component_context::*; use super::component_random::*; use super::component_internet::*; use super::control_layer::*; use super::consensus::*; pub enum CompScheduling { Immediate, Requeue, Sleep, Exit, } /// Generic representation of a component (as viewed by a scheduler). pub(crate) trait Component { /// Called upon the creation of the component. Note that the scheduler /// context is officially running another component (the component that is /// creating the new component). fn on_creation(&mut self, comp_id: CompId, sched_ctx: &SchedulerCtx); /// Called when a component crashes or wishes to exit. So is not called /// right before destruction, other components may still hold a handle to /// the component and send it messages! fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx); /// Called if the component is created by another component and the messages /// are being transferred between the two. fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage); /// Called if the component receives a new message. The component is /// responsible for deciding where that messages goes. fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message); /// Called if the component's routine should be executed. The return value /// can be used to indicate when the routine should be run again. fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result; } /// Representation of the generic operating mode of a component. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum CompMode { NonSync, // not in sync mode Sync, // in sync mode, can interact with other components SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block BlockedGet, // blocked because we need to receive a message on a particular port BlockedPut, // component is blocked because the port is blocked BlockedSelect, // waiting on message to complete the select statement StartExit, // temporary state: if encountered then we start the shutdown process BusyExit, // temporary state: waiting for Acks for all the closed ports Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } impl CompMode { pub(crate) fn is_in_sync_block(&self) -> bool { use CompMode::*; match self { Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, NonSync | StartExit | BusyExit | Exit => false, } } } /// Component execution state: the execution mode along with some descriptive /// fields. Fields are public for ergonomic reasons, use member functions when /// appropriate. pub(crate) struct CompExecState { pub mode: CompMode, pub mode_port: PortId, // valid if blocked on a port (put/get) pub mode_value: ValueGroup, // valid if blocked on a put } impl CompExecState { pub(crate) fn new() -> Self { return Self{ mode: CompMode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), } } pub(crate) fn set_as_blocked_get(&mut self, port: PortId) { self.mode = CompMode::BlockedGet; self.mode_port = port; debug_assert!(self.mode_value.values.is_empty()); } pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool { return self.mode == CompMode::BlockedGet && self.mode_port == port; } pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) { self.mode = CompMode::BlockedPut; self.mode_port = port; self.mode_value = value; } pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool { return self.mode == CompMode::BlockedPut && self.mode_port == port; } } /// Creates a new component based on its definition. Meaning that if it is a /// user-defined component then we set up the PDL code state. Otherwise we /// construct a custom component. This does NOT take care of port and message /// management. pub(crate) fn create_component( protocol: &ProtocolDescription, definition_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup, num_ports: usize ) -> Box { let definition = &protocol.heap[definition_id]; debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite); if definition.source.is_builtin() { // Builtin component let component: Box = match definition.source { ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)), ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)), _ => unreachable!(), }; return component; } else { // User-defined component let prompt = Prompt::new( &protocol.types, &protocol.heap, definition_id, type_id, arguments ); let component = CompPDL::new(prompt, num_ports); return Box::new(component); } } // ----------------------------------------------------------------------------- // Generic component messaging utilities (for sending and receiving) // ----------------------------------------------------------------------------- /// Default handling of sending a data message. In case the port is blocked then /// the `ExecState` will become blocked as well. Note that /// `default_handle_control_message` will ensure that the port becomes /// unblocked if so instructed by the receiving component. The returned /// scheduling value must be used. #[must_use] pub(crate) fn default_send_data_message( exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx ) -> Result { // @nocommit: Something better than Err(String) debug_assert_eq!(exec_state.mode, CompMode::Sync); let port_handle = comp_ctx.get_port_handle(transmitting_port_id); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); if port_info.state == PortState::Closed { // Note: normally peer is eventually consistent, but if it has shut down // then we can be sure it is consistent (I think?) return Err(format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)) } else if port_info.state.is_blocked() { // Port is blocked, so we cannot send exec_state.set_as_blocked_put(transmitting_port_id, value); return Ok(CompScheduling::Sleep); } else { // Port is not blocked, so send to the peer let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); return Ok(CompScheduling::Immediate); } } pub(crate) enum IncomingData { PlacedInSlot, SlotFull(DataMessage), } /// Default handling of receiving a data message. In case there is no room for /// the message it is returned from this function. Note that this function is /// different from PDL code performing a `get` on a port; this is the case where /// the message first arrives at the component. // NOTE: This is supposed to be a somewhat temporary implementation. It would be // nicest if the sending component can figure out it cannot send any more data. #[must_use] pub(crate) fn default_handle_incoming_data_message( exec_state: &mut CompExecState, port_value_slot: &mut Option, comp_ctx: &mut CompCtx, incoming_message: DataMessage, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> IncomingData { let target_port_id = incoming_message.data_header.target_port; if port_value_slot.is_none() { // We can put the value in the slot *port_value_slot = Some(incoming_message); // Check if we're blocked on receiving this message. dbg_code!({ // Our port cannot have been blocked itself, because we're able to // directly insert the message into its slot. let port_handle = comp_ctx.get_port_handle(target_port_id); assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); }); if exec_state.is_blocked_on_get(target_port_id) { // Return to normal operation exec_state.mode = CompMode::Sync; exec_state.mode_port = PortId::new_invalid(); debug_assert!(exec_state.mode_value.values.is_empty()); } return IncomingData::PlacedInSlot } else { // Slot is already full, so if the port was previously opened, it will // now become closed let port_handle = comp_ctx.get_port_handle(target_port_id); let port_info = comp_ctx.get_port_mut(port_handle); debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future if port_info.state == PortState::Open { comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); let (peer_handle, message) = control.initiate_port_blocking(comp_ctx, port_handle); let peer = comp_ctx.get_peer(peer_handle); peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } return IncomingData::SlotFull(incoming_message) } } /// Default handling that has been received through a `get`. Will check if any /// more messages are waiting, and if the corresponding port was blocked because /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( targeted_port: PortId, slot: &mut Option, inbox_backup: &mut Vec, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) { debug_assert!(slot.is_none()); // because we've just received from it // Check if there are any more messages in the backup buffer let port_handle = comp_ctx.get_port_handle(targeted_port); let port_info = comp_ctx.get_port(port_handle); for message_index in 0..inbox_backup.len() { let message = &inbox_backup[message_index]; if message.data_header.target_port == targeted_port { // One more message, place it in the slot let message = inbox_backup.remove(message_index); debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup *slot = Some(message); return; } } // Did not have any more messages, so if we were blocked, then we need to // unblock the port now (and inform the peer of this unblocking) if port_info.state == PortState::BlockedDueToFullBuffers { comp_ctx.set_port_state(port_handle, PortState::Open); let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle); let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } } /// Handles control messages in the default way. Note that this function may /// take a lot of actions in the name of the caller: pending messages may be /// sent, ports may become blocked/unblocked, etc. So the execution /// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`) /// state may all change. pub(crate) fn default_handle_control_message( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx ) -> Result<(), String> { // @nocommit, use something else than Err(String) match message.content { ControlMessageContent::Ack => { default_handle_ack(control, message.id, sched_ctx, comp_ctx); }, ControlMessageContent::BlockPort(port_id) => { // One of our messages was accepted, but the port should be // blocked. let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); if port_info.state == PortState::Open { // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); } }, ControlMessageContent::ClosePort(content) => { // Request to close the port. We immediately comply and remove // the component handle as well let port_handle = comp_ctx.get_port_handle(content.port_id); let port_info = comp_ctx.get_port_mut(port_handle); let peer_comp_id = port_info.peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); // We're closing the port, so we will always update the peer of the // port (in case of error messages) port_info.peer_comp_id = message.sender_comp_id; // One exception to sending an `Ack` is if we just closed the // port ourselves, meaning that the `ClosePort` messages got // sent to one another. if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) { // The two components (sender and this component) are closing // the channel at the same time. default_handle_ack(control, control_id, sched_ctx, comp_ctx); } else { // Respond to the message let last_instruction = port_info.last_instruction; let port_was_used = last_instruction != PortInstruction::None; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed // Make sure that we've not reached an error condition. Note // that if this condition is not met, then we don't error out // now, but we may error out in the next sync block when we // try to `put`/`get` on the port. This condition makes sure // that if we have a successful sync round, followed by the peer // closing the port, that we don't consider the sync round to // have failed by mistake. if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used { let error_message = match last_instruction { PortInstruction::None => unreachable!(), // port was used PortInstruction::NoSource => format!( "Peer component (id:{}) shut down, so operation on port cannot have succeeded", message.sender_comp_id.0 ), PortInstruction::SourceLocation(source_location) => format!( "Peer component (id:{}) shut down, so this operation cannot have succeeded", message.sender_comp_id.0 ), }; return Err(error_message); } } }, ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); if port_info.state == PortState::BlockedDueToFullBuffers { default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); } }, ControlMessageContent::PortPeerChangedBlock(port_id) => { // The peer of our port has just changed. So we are asked to // temporarily block the port (while our original recipient is // potentially rerouting some of the in-flight messages) and // Ack. Then we wait for the `unblock` call. debug_assert_eq!(message.target_port_id, Some(port_id)); let port_handle = comp_ctx.get_port_handle(port_id); comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); let port_info = comp_ctx.get_port(port_handle); let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); }, ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); let port_info = comp_ctx.get_port(port_handle); debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); let old_peer_id = port_info.peer_comp_id; comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = new_comp_id; port_info.peer_port_id = new_port_id; comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); } } return Ok(()); } /// Handles a component initiating the exiting procedure, and closing all of its /// ports. Should only be called once per component (which is ensured by /// checking and modifying the mode in the execution state). #[must_use] pub(crate) fn default_handle_start_exit( exec_state: &mut CompExecState, control: &mut ControlLayer, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); sched_ctx.log("Component starting exit"); exec_state.mode = CompMode::BusyExit; // Iterating by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); if port.state == PortState::Closed { // Already closed, or in the process of being closed continue; } // Mark as closed let port_id = port.self_id; port.state = PortState::Closed; // Notify peer of closing let port_handle = comp_ctx.get_port_handle(port_id); let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx); let peer_info = comp_ctx.get_peer(peer); peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } return CompScheduling::Immediate; // to check if we can shut down immediately } /// Handles a component waiting until all peers are notified that it is quitting /// (i.e. after calling `default_handle_start_exit`). #[must_use] pub(crate) fn default_handle_busy_exit( exec_state: &mut CompExecState, control: &ControlLayer, sched_ctx: &SchedulerCtx ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::BusyExit); if control.has_acks_remaining() { sched_ctx.log("Component busy exiting, still has `Ack`s remaining"); return CompScheduling::Sleep; } else { sched_ctx.log("Component busy exiting, now shutting down"); exec_state.mode = CompMode::Exit; return CompScheduling::Exit; } } /// Handles a potential synchronous round decision. If there was a decision then /// the `Some(success)` value indicates whether the round succeeded or not. /// Might also end up changing the `ExecState`. pub(crate) fn default_handle_sync_decision( exec_state: &mut CompExecState, decision: SyncRoundDecision, consensus: &mut Consensus ) -> Option { debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); let success = match decision { SyncRoundDecision::None => return None, SyncRoundDecision::Solution => true, SyncRoundDecision::Failure => false, }; debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); if success { exec_state.mode = CompMode::NonSync; consensus.notify_sync_decision(decision); return Some(true); } else { exec_state.mode = CompMode::StartExit; return Some(false); } } #[inline] pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { debug_assert_eq!(_exec_state.mode, CompMode::Exit); return CompScheduling::Exit; } // ----------------------------------------------------------------------------- // Internal messaging/state utilities // ----------------------------------------------------------------------------- /// Handles an `Ack` for the control layer. fn default_handle_ack( control: &mut ControlLayer, control_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx ) { // Since an `Ack` may cause another one, handle them in a loop let mut to_ack = control_id; loop { let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx); match action { AckAction::SendMessage(target_comp, message) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(target_comp); handle.send_message(&sched_ctx.runtime, Message::Control(message), true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); }, AckAction::ScheduleComponent(to_schedule) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(to_schedule); // Note that the component is intentionally not // sleeping, so we just wake it up debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); let key = unsafe { to_schedule.upgrade() }; sched_ctx.runtime.enqueue_work(key); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); }, AckAction::None => {} } match new_to_ack { Some(new_to_ack) => to_ack = new_to_ack, None => break, } } } /// Little helper for sending the most common kind of `Ack` fn default_send_ack( causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx ) { let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{ id: causer_of_ack_id, sender_comp_id: comp_ctx.id, target_port_id: None, content: ControlMessageContent::Ack }), true); } /// Handles the unblocking of a putter port. In case there is a pending message /// on that port then it will be sent. fn default_handle_unblock_put( exec_state: &mut CompExecState, consensus: &mut Consensus, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, ) { let port_info = comp_ctx.get_port_mut(port_handle); let port_id = port_info.self_id; debug_assert!(port_info.state.is_blocked()); port_info.state = PortState::Open; if exec_state.is_blocked_on_put(port_id) { // Annotate the message that we're going to send let port_info = comp_ctx.get_port(port_handle); // for immutable access debug_assert_eq!(port_info.kind, PortKind::Putter); let to_send = exec_state.mode_value.take(); let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send); // Retrieve peer to send the message let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true); exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state. exec_state.mode_port = PortId::new_invalid(); } } #[inline] pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId { return PortId(port_id.id); } #[inline] pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId { return EvalPortId{ id: port_id.0 }; }