diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs new file mode 100644 index 0000000000000000000000000000000000000000..a0cc811fcc5e0ee8cfbf52a3be015719545f2dbd --- /dev/null +++ b/src/runtime2/component/component.rs @@ -0,0 +1,533 @@ +use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId}; +use crate::protocol::*; +use crate::runtime2::*; +use crate::runtime2::communication::*; + +use super::{CompCtx, CompPDL, CompId}; +use super::component_context::*; +use super::component_random::*; +use super::component_internet::*; +use super::control_layer::*; +use super::consensus::*; + +pub enum CompScheduling { + Immediate, + Requeue, + Sleep, + Exit, +} + +/// Generic representation of a component (as viewed by a scheduler). +pub(crate) trait Component { + /// Called upon the creation of the component. Note that the scheduler + /// context is officially running another component (the component that is + /// creating the new component). + fn on_creation(&mut self, comp_id: CompId, sched_ctx: &SchedulerCtx); + + /// Called when a component crashes or wishes to exit. So is not called + /// right before destruction, other components may still hold a handle to + /// the component and send it messages! + fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx); + + /// Called if the component is created by another component and the messages + /// are being transferred between the two. + fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage); + + /// Called if the component receives a new message. The component is + /// responsible for deciding where that messages goes. + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message); + + /// Called if the component's routine should be executed. The return value + /// can be used to indicate when the routine should be run again. + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result; +} + +/// Representation of the generic operating mode of a component. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) enum CompMode { + NonSync, // not in sync mode + Sync, // in sync mode, can interact with other components + SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block + BlockedGet, // blocked because we need to receive a message on a particular port + BlockedPut, // component is blocked because the port is blocked + BlockedSelect, // waiting on message to complete the select statement + StartExit, // temporary state: if encountered then we start the shutdown process + BusyExit, // temporary state: waiting for Acks for all the closed ports + Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 +} + +impl CompMode { + pub(crate) fn is_in_sync_block(&self) -> bool { + use CompMode::*; + + match self { + Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, + NonSync | StartExit | BusyExit | Exit => false, + } + } +} + +/// Component execution state: the execution mode along with some descriptive +/// fields. Fields are public for ergonomic reasons, use member functions when +/// appropriate. +pub(crate) struct CompExecState { + pub mode: CompMode, + pub mode_port: PortId, // valid if blocked on a port (put/get) + pub mode_value: ValueGroup, // valid if blocked on a put +} + +impl CompExecState { + pub(crate) fn new() -> Self { + return Self{ + mode: CompMode::NonSync, + mode_port: PortId::new_invalid(), + mode_value: ValueGroup::default(), + } + } + + pub(crate) fn set_as_blocked_get(&mut self, port: PortId) { + self.mode = CompMode::BlockedGet; + self.mode_port = port; + debug_assert!(self.mode_value.values.is_empty()); + } + + pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool { + return + self.mode == CompMode::BlockedGet && + self.mode_port == port; + } + + pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) { + self.mode = CompMode::BlockedPut; + self.mode_port = port; + self.mode_value = value; + } + + pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool { + return + self.mode == CompMode::BlockedPut && + self.mode_port == port; + } +} + +/// Creates a new component based on its definition. Meaning that if it is a +/// user-defined component then we set up the PDL code state. Otherwise we +/// construct a custom component. This does NOT take care of port and message +/// management. +pub(crate) fn create_component( + protocol: &ProtocolDescription, + definition_id: ProcedureDefinitionId, type_id: TypeId, + arguments: ValueGroup, num_ports: usize +) -> Box { + let definition = &protocol.heap[definition_id]; + debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite); + + if definition.source.is_builtin() { + // Builtin component + let component: Box = match definition.source { + ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)), + ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)), + _ => unreachable!(), + }; + + return component; + } else { + // User-defined component + let prompt = Prompt::new( + &protocol.types, &protocol.heap, + definition_id, type_id, arguments + ); + let component = CompPDL::new(prompt, num_ports); + return Box::new(component); + } +} + +// ----------------------------------------------------------------------------- +// Generic component messaging utilities (for sending and receiving) +// ----------------------------------------------------------------------------- + +/// Default handling of sending a data message. In case the port is blocked then +/// the `ExecState` will become blocked as well. Note that +/// `default_handle_control_message` will ensure that the port becomes +/// unblocked if so instructed by the receiving component. The returned +/// scheduling value must be used. +#[must_use] +pub(crate) fn default_send_data_message( + exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup, + sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx +) -> CompScheduling { + debug_assert_eq!(exec_state.mode, CompMode::Sync); + + // TODO: Handle closed ports + let port_handle = comp_ctx.get_port_handle(transmitting_port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state.is_blocked() { + // Port is blocked, so we cannot send + exec_state.set_as_blocked_put(transmitting_port_id, value); + + return CompScheduling::Sleep; + } else { + // Port is not blocked, so send to the peer + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); + + return CompScheduling::Immediate; + } +} + +pub(crate) enum IncomingData { + PlacedInSlot, + SlotFull(DataMessage), +} + +/// Default handling of receiving a data message. In case there is no room for +/// the message it is returned from this function. Note that this function is +/// different from PDL code performing a `get` on a port; this is the case where +/// the message first arrives at the component. +// NOTE: This is supposed to be a somewhat temporary implementation. It would be +// nicest if the sending component can figure out it cannot send any more data. +#[must_use] +pub(crate) fn default_handle_incoming_data_message( + exec_state: &mut CompExecState, port_value_slot: &mut Option, + comp_ctx: &mut CompCtx, incoming_message: DataMessage, + sched_ctx: &SchedulerCtx, control: &mut ControlLayer +) -> IncomingData { + let target_port_id = incoming_message.data_header.target_port; + + if port_value_slot.is_none() { + // We can put the value in the slot + *port_value_slot = Some(incoming_message); + + // Check if we're blocked on receiving this message. + dbg_code!({ + // Our port cannot have been blocked itself, because we're able to + // directly insert the message into its slot. + let port_handle = comp_ctx.get_port_handle(target_port_id); + assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); + }); + + if exec_state.is_blocked_on_get(target_port_id) { + // Return to normal operation + exec_state.mode = CompMode::Sync; + exec_state.mode_port = PortId::new_invalid(); + debug_assert!(exec_state.mode_value.values.is_empty()); + } + + return IncomingData::PlacedInSlot + } else { + // Slot is already full, so if the port was previously opened, it will + // now become closed + let port_handle = comp_ctx.get_port_handle(target_port_id); + let port_info = comp_ctx.get_port_mut(port_handle); + debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future + + if port_info.state == PortState::Open { + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); + let (peer_handle, message) = + control.initiate_port_blocking(comp_ctx, port_handle); + let peer = comp_ctx.get_peer(peer_handle); + peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + } + + return IncomingData::SlotFull(incoming_message) + } +} + +/// Default handling that has been received through a `get`. Will check if any +/// more messages are waiting, and if the corresponding port was blocked because +/// of full buffers (hence, will use the control layer to make sure the peer +/// will become unblocked). +pub(crate) fn default_handle_received_data_message( + targeted_port: PortId, slot: &mut Option, inbox_backup: &mut Vec, + comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer +) { + debug_assert!(slot.is_none()); // because we've just received from it + + // Check if there are any more messages in the backup buffer + let port_handle = comp_ctx.get_port_handle(targeted_port); + let port_info = comp_ctx.get_port(port_handle); + for message_index in 0..inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == targeted_port { + // One more message, place it in the slot + let message = inbox_backup.remove(message_index); + debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup + *slot = Some(message); + + return; + } + } + + // Did not have any more messages, so if we were blocked, then we need to + // unblock the port now (and inform the peer of this unblocking) + if port_info.state == PortState::BlockedDueToFullBuffers { + comp_ctx.set_port_state(port_handle, PortState::Open); + let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle); + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + } +} + +/// Handles control messages in the default way. Note that this function may +/// take a lot of actions in the name of the caller: pending messages may be +/// sent, ports may become blocked/unblocked, etc. So the execution +/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`) +/// state may all change. +pub(crate) fn default_handle_control_message( + exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, + message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx +) { + match message.content { + ControlMessageContent::Ack => { + default_handle_ack(control, message.id, sched_ctx, comp_ctx); + }, + ControlMessageContent::BlockPort(port_id) => { + // One of our messages was accepted, but the port should be + // blocked. + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state == PortState::Open { + // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); + } + }, + ControlMessageContent::ClosePort(port_id) => { + // Request to close the port. We immediately comply and remove + // the component handle as well + let port_handle = comp_ctx.get_port_handle(port_id); + let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + + // One exception to sending an `Ack` is if we just closed the + // port ourselves, meaning that the `ClosePort` messages got + // sent to one another. + if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) { + default_handle_ack(control, control_id, sched_ctx, comp_ctx); + } else { + default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); + comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed + comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed + } + }, + ControlMessageContent::UnblockPort(port_id) => { + // We were previously blocked (or already closed) + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state == PortState::BlockedDueToFullBuffers { + default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + } + }, + ControlMessageContent::PortPeerChangedBlock(port_id) => { + // The peer of our port has just changed. So we are asked to + // temporarily block the port (while our original recipient is + // potentially rerouting some of the in-flight messages) and + // Ack. Then we wait for the `unblock` call. + debug_assert_eq!(message.target_port_id, Some(port_id)); + let port_handle = comp_ctx.get_port_handle(port_id); + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); + + let port_info = comp_ctx.get_port(port_handle); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + + default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); + }, + ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { + let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); + let port_info = comp_ctx.get_port(port_handle); + debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); + let old_peer_id = port_info.peer_comp_id; + + comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); + + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.peer_comp_id = new_comp_id; + port_info.peer_port_id = new_port_id; + comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); + default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + } + } +} + +/// Handles a component initiating the exiting procedure, and closing all of its +/// ports. Should only be called once per component (which is ensured by +/// checking and modifying the mode in the execution state). +#[must_use] +pub(crate) fn default_handle_start_exit( + exec_state: &mut CompExecState, control: &mut ControlLayer, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx +) -> CompScheduling { + debug_assert_eq!(exec_state.mode, CompMode::StartExit); + sched_ctx.log("Component starting exit"); + exec_state.mode = CompMode::BusyExit; + + // Iterating by index to work around borrowing rules + for port_index in 0..comp_ctx.num_ports() { + let port = comp_ctx.get_port_by_index_mut(port_index); + if port.state == PortState::Closed { + // Already closed, or in the process of being closed + continue; + } + + // Mark as closed + let port_id = port.self_id; + port.state = PortState::Closed; + + // Notify peer of closing + let port_handle = comp_ctx.get_port_handle(port_id); + let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx); + let peer_info = comp_ctx.get_peer(peer); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + } + + return CompScheduling::Immediate; // to check if we can shut down immediately +} + +/// Handles a component waiting until all peers are notified that it is quitting +/// (i.e. after calling `default_handle_start_exit`). +#[must_use] +pub(crate) fn default_handle_busy_exit( + exec_state: &mut CompExecState, control: &ControlLayer, + sched_ctx: &SchedulerCtx +) -> CompScheduling { + debug_assert_eq!(exec_state.mode, CompMode::BusyExit); + if control.has_acks_remaining() { + sched_ctx.log("Component busy exiting, still has `Ack`s remaining"); + return CompScheduling::Sleep; + } else { + sched_ctx.log("Component busy exiting, now shutting down"); + exec_state.mode = CompMode::Exit; + return CompScheduling::Exit; + } +} + +/// Handles a potential synchronous round decision. If there was a decision then +/// the `Some(success)` value indicates whether the round succeeded or not. +/// Might also end up changing the `ExecState`. +pub(crate) fn default_handle_sync_decision( + exec_state: &mut CompExecState, decision: SyncRoundDecision, + consensus: &mut Consensus +) -> Option { + debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); + let success = match decision { + SyncRoundDecision::None => return None, + SyncRoundDecision::Solution => true, + SyncRoundDecision::Failure => false, + }; + + debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); + if success { + exec_state.mode = CompMode::NonSync; + consensus.notify_sync_decision(decision); + return Some(true); + } else { + exec_state.mode = CompMode::StartExit; + return Some(false); + } +} + + +#[inline] +pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { + debug_assert_eq!(_exec_state.mode, CompMode::Exit); + return CompScheduling::Exit; +} + +// ----------------------------------------------------------------------------- +// Internal messaging/state utilities +// ----------------------------------------------------------------------------- + +/// Handles an `Ack` for the control layer. +fn default_handle_ack( + control: &mut ControlLayer, control_id: ControlId, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx +) { + // Since an `Ack` may cause another one, handle them in a loop + let mut to_ack = control_id; + loop { + let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx); + match action { + AckAction::SendMessage(target_comp, message) => { + // FIX @NoDirectHandle + let mut handle = sched_ctx.runtime.get_component_public(target_comp); + handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + let _should_remove = handle.decrement_users(); + debug_assert!(_should_remove.is_none()); + }, + AckAction::ScheduleComponent(to_schedule) => { + // FIX @NoDirectHandle + let mut handle = sched_ctx.runtime.get_component_public(to_schedule); + + // Note that the component is intentionally not + // sleeping, so we just wake it up + debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); + let key = unsafe { to_schedule.upgrade() }; + sched_ctx.runtime.enqueue_work(key); + let _should_remove = handle.decrement_users(); + debug_assert!(_should_remove.is_none()); + }, + AckAction::None => {} + } + + match new_to_ack { + Some(new_to_ack) => to_ack = new_to_ack, + None => break, + } + } +} + +/// Little helper for sending the most common kind of `Ack` +fn default_send_ack( + causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle, + sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx +) { + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{ + id: causer_of_ack_id, + sender_comp_id: comp_ctx.id, + target_port_id: None, + content: ControlMessageContent::Ack + }), true); +} + +/// Handles the unblocking of a putter port. In case there is a pending message +/// on that port then it will be sent. +fn default_handle_unblock_put( + exec_state: &mut CompExecState, consensus: &mut Consensus, + port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, +) { + let port_info = comp_ctx.get_port_mut(port_handle); + let port_id = port_info.self_id; + debug_assert!(port_info.state.is_blocked()); + port_info.state = PortState::Open; + + if exec_state.is_blocked_on_put(port_id) { + // Annotate the message that we're going to send + let port_info = comp_ctx.get_port(port_handle); // for immutable access + debug_assert_eq!(port_info.kind, PortKind::Putter); + let to_send = exec_state.mode_value.take(); + let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send); + + // Retrieve peer to send the message + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true); + + exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state. + exec_state.mode_port = PortId::new_invalid(); + } +} + +#[inline] +pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId { + return PortId(port_id.id); +} + +#[inline] +pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId { + return EvalPortId{ id: port_id.0 }; +}