diff --git a/src/protocol/eval/value.rs b/src/protocol/eval/value.rs index d8bf773b7bc74426a37fa54ad573c4c0d6d8bd00..c4c5060485d669df5943ff6af48090d89b0f604e 100644 --- a/src/protocol/eval/value.rs +++ b/src/protocol/eval/value.rs @@ -183,6 +183,7 @@ impl ValueGroup { regions: Vec::new(), } } + pub(crate) fn from_store(store: &Store, values: &[Value]) -> Self { let mut group = ValueGroup{ values: Vec::with_capacity(values.len()), @@ -197,6 +198,15 @@ impl ValueGroup { group } + /// Creates a clone of the value group, but leaves the memory inside of the + /// ValueGroup vectors allocated. + pub(crate) fn take(&mut self) -> ValueGroup { + let cloned = self.clone(); + self.values.clear(); + self.regions.clear(); + return cloned; + } + /// Transfers a provided value from a store into a local value with its /// heap allocations (if any) stored in the ValueGroup. Calling this /// function will not store the returned value in the `values` member. diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index f1d24bd7abf6821c74823e93f5f4e59401463f83..e350d706a207397f98e63d266c188ed64efcefec 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1,8 +1,13 @@ -use crate::protocol::eval::*; +use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId}; use crate::protocol::*; use crate::runtime2::*; +use crate::runtime2::communication::*; + use super::{CompCtx, CompPDL}; +use super::component_context::*; use super::component_ip::*; +use super::control_layer::*; +use super::consensus::*; pub enum CompScheduling { Immediate, @@ -26,6 +31,74 @@ pub(crate) trait Component { fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result; } +/// Representation of the generic operating mode of a component. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) enum CompMode { + NonSync, // not in sync mode + Sync, // in sync mode, can interact with other components + SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block + BlockedGet, // blocked because we need to receive a message on a particular port + BlockedPut, // component is blocked because the port is blocked + BlockedSelect, // waiting on message to complete the select statement + StartExit, // temporary state: if encountered then we start the shutdown process + BusyExit, // temporary state: waiting for Acks for all the closed ports + Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 +} + +impl CompMode { + pub(crate) fn is_in_sync_block(&self) -> bool { + use CompMode::*; + + match self { + Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, + NonSync | StartExit | BusyExit | Exit => false, + } + } +} + +/// Component execution state: the execution mode along with some descriptive +/// fields. Fields are public for ergonomic reasons, use member functions when +/// appropriate. +pub(crate) struct CompExecState { + pub mode: CompMode, + pub mode_port: PortId, // valid if blocked on a port (put/get) + pub mode_value: ValueGroup, // valid if blocked on a put +} + +impl CompExecState { + pub(crate) fn new() -> Self { + return Self{ + mode: CompMode::NonSync, + mode_port: PortId::new_invalid(), + mode_value: ValueGroup::default(), + } + } + + pub(crate) fn set_as_blocked_get(&mut self, port: PortId) { + self.mode = CompMode::BlockedGet; + self.mode_port = port; + debug_assert!(self.mode_value.values.is_empty()); + } + + pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool { + return + self.mode == CompMode::BlockedGet && + self.mode_port == port; + } + + pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) { + self.mode = CompMode::BlockedPut; + self.mode_port = port; + self.mode_value = value; + } + + pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool { + return + self.mode == CompMode::BlockedPut && + self.mode_port == port; + } +} + /// Creates a new component based on its definition. Meaning that if it is a /// user-defined component then we set up the PDL code state. Otherwise we /// construct a custom component. This does NOT take care of port and message @@ -57,6 +130,176 @@ pub(crate) fn create_component( } } +// ----------------------------------------------------------------------------- +// Generic component messaging utilities (for sending and receiving) +// ----------------------------------------------------------------------------- + +/// Handles control messages in the default way. Note that this function may +/// take a lot of actions in the name of the caller: pending messages may be +/// sent, ports may become blocked/unblocked, etc. So the execution +/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`) +/// state may all change. +pub(crate) fn default_handle_control_message( + exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, + message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx +) { + match message.content { + ControlMessageContent::Ack => { + default_handle_ack(control, message.id, sched_ctx, comp_ctx); + }, + ControlMessageContent::BlockPort(port_id) => { + // One of our messages was accepted, but the port should be + // blocked. + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state == PortState::Open { + // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); + } + }, + ControlMessageContent::ClosePort(port_id) => { + // Request to close the port. We immediately comply and remove + // the component handle as well + let port_handle = comp_ctx.get_port_handle(port_id); + let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + + // One exception to sending an `Ack` is if we just closed the + // port ourselves, meaning that the `ClosePort` messages got + // sent to one another. + if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) { + default_handle_ack(control, control_id, sched_ctx, comp_ctx); + } else { + default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); + comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed + comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed + } + }, + ControlMessageContent::UnblockPort(port_id) => { + // We were previously blocked (or already closed) + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state == PortState::BlockedDueToFullBuffers { + default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + } + }, + ControlMessageContent::PortPeerChangedBlock(port_id) => { + // The peer of our port has just changed. So we are asked to + // temporarily block the port (while our original recipient is + // potentially rerouting some of the in-flight messages) and + // Ack. Then we wait for the `unblock` call. + debug_assert_eq!(message.target_port_id, Some(port_id)); + let port_handle = comp_ctx.get_port_handle(port_id); + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); + + let port_info = comp_ctx.get_port(port_handle); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + + default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); + }, + ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { + let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); + let port_info = comp_ctx.get_port(port_handle); + debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); + let old_peer_id = port_info.peer_comp_id; + + comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); + + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.peer_comp_id = new_comp_id; + port_info.peer_port_id = new_port_id; + comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); + default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + } + } +} + +// ----------------------------------------------------------------------------- +// Internal messaging/state utilities +// ----------------------------------------------------------------------------- + +/// Handles an `Ack` for the control layer. +fn default_handle_ack( + control: &mut ControlLayer, control_id: ControlId, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx +) { + // Since an `Ack` may cause another one, handle them in a loop + let mut to_ack = control_id; + loop { + let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx); + match action { + AckAction::SendMessage(target_comp, message) => { + // FIX @NoDirectHandle + let mut handle = sched_ctx.runtime.get_component_public(target_comp); + handle.send_message(sched_ctx, Message::Control(message), true); + let _should_remove = handle.decrement_users(); + debug_assert!(_should_remove.is_none()); + }, + AckAction::ScheduleComponent(to_schedule) => { + // FIX @NoDirectHandle + let mut handle = sched_ctx.runtime.get_component_public(to_schedule); + + // Note that the component is intentionally not + // sleeping, so we just wake it up + debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); + let key = unsafe { to_schedule.upgrade() }; + sched_ctx.runtime.enqueue_work(key); + let _should_remove = handle.decrement_users(); + debug_assert!(_should_remove.is_none()); + }, + AckAction::None => {} + } + + match new_to_ack { + Some(new_to_ack) => to_ack = new_to_ack, + None => break, + } + } +} + +/// Little helper for sending the most common kind of `Ack` +fn default_send_ack( + causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle, + sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx +) { + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{ + id: causer_of_ack_id, + sender_comp_id: comp_ctx.id, + target_port_id: None, + content: ControlMessageContent::Ack + }), true); +} + +/// Handles the unblocking of a putter port. In case there is a pending message +/// on that port then it will be sent. +fn default_handle_unblock_put( + exec_state: &mut CompExecState, consensus: &mut Consensus, + port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, +) { + let port_info = comp_ctx.get_port_mut(port_handle); + let port_id = port_info.self_id; + debug_assert!(port_info.state.is_blocked()); + port_info.state = PortState::Open; + + if exec_state.is_blocked_on_put(port_id) { + // Annotate the message that we're going to send + debug_assert_eq!(port_info.kind, PortKind::Putter); + let to_send = exec_state.mode_value.take(); + let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send); + + // Retrieve peer to send the message + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message(sched_ctx, Message::Data(to_send), true); + + exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state. + exec_state.mode_port = PortId::new_invalid(); + } +} + #[inline] pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId { diff --git a/src/runtime2/component/component_ip.rs b/src/runtime2/component/component_ip.rs index c4f438ed4c93990fd82b1e16345ed6f1c78d5a73..a3a83aa59df50bf6bb336fd7cbd53aaaf66551b8 100644 --- a/src/runtime2/component/component_ip.rs +++ b/src/runtime2/component/component_ip.rs @@ -13,11 +13,21 @@ pub struct ComponentRandomU32 { impl Component for ComponentRandomU32 { fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { - unreachable!("should not adopt messages"); + // Impossible since this component does not have any input ports in its + // signature. + unreachable!(); } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { - todo!() + match message { + Message::Data(message) => unreachable!(), + Message::Sync(message) => { + + }, + Message::Control(message) => { + + } + } } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { @@ -33,7 +43,7 @@ impl ComponentRandomU32 { let minimum = arguments.values[1].as_uint32(); let maximum = arguments.values[2].as_uint32(); - return ComponentRandomU32{ + return Self{ output_port_id: port_id, random_minimum: minimum, random_maximum: maximum, diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 4e735be3142b0246610035fb13e7d4a7befa2b01..236cff963e1047d61a0c3f607543776650f6b2b6 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -10,7 +10,11 @@ use crate::protocol::eval::{ use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; -use super::component::*; +use super::component::{ + self, + CompExecState, Component, CompScheduling, CompMode, + port_id_from_eval, port_id_to_eval +}; use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; @@ -84,30 +88,6 @@ impl RunContext for ExecCtx { } } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub(crate) enum Mode { - NonSync, // not in sync mode - Sync, // in sync mode, can interact with other components - SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block - BlockedGet, // blocked because we need to receive a message on a particular port - BlockedPut, // component is blocked because the port is blocked - BlockedSelect, // waiting on message to complete the select statement - StartExit, // temporary state: if encountered then we start the shutdown process - BusyExit, // temporary state: waiting for Acks for all the closed ports - Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 -} - -impl Mode { - fn is_in_sync_block(&self) -> bool { - use Mode::*; - - match self { - Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, - NonSync | StartExit | BusyExit | Exit => false, - } - } -} - struct SelectCase { involved_ports: Vec, } @@ -226,10 +206,8 @@ impl SelectState { } pub(crate) struct CompPDL { - pub mode: Mode, - pub mode_port: PortId, // when blocked on a port - pub mode_value: ValueGroup, // when blocked on a put - select: SelectState, + pub exec_state: CompExecState, + pub select_state: SelectState, pub prompt: Prompt, pub control: ControlLayer, pub consensus: Consensus, @@ -269,7 +247,10 @@ impl Component for CompPDL { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { - self.handle_incoming_control_message(sched_ctx, comp_ctx, message); + component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx + ); }, Message::Sync(message) => { self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); @@ -280,30 +261,30 @@ impl Component for CompPDL { fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; - sched_ctx.log(&format!("Running component (mode: {:?})", self.mode)); + sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode)); // Depending on the mode don't do anything at all, take some special // actions, or fall through and run the PDL code. - match self.mode { - Mode::NonSync | Mode::Sync => { + match self.exec_state.mode { + CompMode::NonSync | CompMode::Sync => { // continue and run PDL code }, - Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::BlockedSelect => { + CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { return Ok(CompScheduling::Sleep); } - Mode::StartExit => { + CompMode::StartExit => { self.handle_component_exit(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, - Mode::BusyExit => { + CompMode::BusyExit => { if self.control.has_acks_remaining() { return Ok(CompScheduling::Sleep); } else { - self.mode = Mode::Exit; + self.exec_state.mode = CompMode::Exit; return Ok(CompScheduling::Exit); } }, - Mode::Exit => { + CompMode::Exit => { return Ok(CompScheduling::Exit); } } @@ -315,12 +296,12 @@ impl Component for CompPDL { EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.handle_sync_end(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::BlockGet(port_id) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); @@ -342,21 +323,18 @@ impl Component for CompPDL { } } else { // We need to wait - self.mode = Mode::BlockedGet; - self.mode_port = port_id; + self.exec_state.set_as_blocked_get(port_id); return Ok(CompScheduling::Sleep); } }, EC::Put(port_id, value) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); sched_ctx.log(&format!("Putting value {:?}", value)); let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); if port_info.state.is_blocked() { - self.mode = Mode::BlockedPut; - self.mode_port = port_id; - self.mode_value = value; + self.exec_state.set_as_blocked_put(port_id, value); self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked return Ok(CompScheduling::Sleep); } else { @@ -366,44 +344,44 @@ impl Component for CompPDL { } }, EC::SelectStart(num_cases, _num_ports) => { - debug_assert_eq!(self.mode, Mode::Sync); - self.select.handle_select_start(num_cases); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); + self.select_state.handle_select_start(num_cases); return Ok(CompScheduling::Requeue); }, EC::SelectRegisterPort(case_index, port_index, port_id) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); let port_id = port_id_from_eval(port_id); - if let Err(_err) = self.select.register_select_case_port(comp_ctx, case_index, port_index, port_id) { + if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { todo!("handle registering a port multiple times"); } return Ok(CompScheduling::Immediate); }, EC::SelectWait => { - debug_assert_eq!(self.mode, Mode::Sync); - let select_decision = self.select.handle_select_waiting_point(&self.inbox_main, comp_ctx); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); + let select_decision = self.select_state.handle_select_waiting_point(&self.inbox_main, comp_ctx); if let SelectDecision::Case(case_index) = select_decision { // Reached a conclusion, so we can continue immediately self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); - self.mode = Mode::Sync; + self.exec_state.mode = CompMode::Sync; return Ok(CompScheduling::Immediate); } else { // No decision yet - self.mode = Mode::BlockedSelect; + self.exec_state.mode = CompMode::BlockedSelect; return Ok(CompScheduling::Sleep); } }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { - self.mode = Mode::StartExit; // next call we'll take care of the exit + self.exec_state.mode = CompMode::StartExit; // next call we'll take care of the exit return Ok(CompScheduling::Immediate); }, EC::SyncBlockStart => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::NewComponent(definition_id, type_id, arguments) => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.create_component_and_transfer_ports( sched_ctx, comp_ctx, definition_id, type_id, arguments @@ -411,7 +389,7 @@ impl Component for CompPDL { return Ok(CompScheduling::Requeue); }, EC::NewChannel => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); debug_assert!(self.exec_ctx.stmt.is_none()); let channel = comp_ctx.create_channel(); self.exec_ctx.stmt = ExecStmt::CreatedChannel(( @@ -435,10 +413,8 @@ impl CompPDL { } return Self{ - mode: Mode::NonSync, - mode_port: PortId::new_invalid(), - mode_value: ValueGroup::default(), - select: SelectState::new(), + exec_state: CompExecState::new(), + select_state: SelectState::new(), prompt: initial_state, control: ControlLayer::default(), consensus: Consensus::new(), @@ -475,8 +451,8 @@ impl CompPDL { self.consensus.handle_new_data_message(comp_ctx, message); } } - debug_assert_eq!(self.mode, Mode::NonSync); - self.mode = Mode::Sync; + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); + self.exec_state.mode = CompMode::Sync; } /// Handles end of sync. The conclusion to the sync round might arise @@ -486,7 +462,7 @@ impl CompPDL { fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); - self.mode = Mode::SyncEnd; + self.exec_state.mode = CompMode::SyncEnd; self.handle_sync_decision(sched_ctx, comp_ctx, decision); } @@ -505,19 +481,19 @@ impl CompPDL { }; // If here then we've reached a decision - debug_assert_eq!(self.mode, Mode::SyncEnd); + debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); if is_success { - self.mode = Mode::NonSync; + self.exec_state.mode = CompMode::NonSync; self.consensus.notify_sync_decision(decision); } else { - self.mode = Mode::StartExit; + self.exec_state.mode = CompMode::StartExit; } } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component exiting"); - debug_assert_eq!(self.mode, Mode::StartExit); - self.mode = Mode::BusyExit; + debug_assert_eq!(self.exec_state.mode, CompMode::StartExit); + self.exec_state.mode = CompMode::BusyExit; // Doing this by index, then retrieving the handle is a bit rediculous, // but Rust is being Rust with its borrowing rules. @@ -557,7 +533,7 @@ impl CompPDL { /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { // Whatever we do, glean information from headers in message - if self.mode.is_in_sync_block() { + if self.exec_state.mode.is_in_sync_block() { self.consensus.handle_new_data_message(comp_ctx, &message); } @@ -572,15 +548,15 @@ impl CompPDL { // After direct insertion, check if this component's execution is // blocked on receiving a message on that port debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly - if self.mode == Mode::BlockedGet && self.mode_port == target_port_id { + if self.exec_state.is_blocked_on_get(target_port_id) { // We were indeed blocked - self.mode = Mode::Sync; - self.mode_port = PortId::new_invalid(); - } else if self.mode == Mode::BlockedSelect { - let select_decision = self.select.handle_updated_inbox(&self.inbox_main, comp_ctx); + self.exec_state.mode = CompMode::Sync; + self.exec_state.mode_port = PortId::new_invalid(); + } else if self.exec_state.mode == CompMode::BlockedSelect { + let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx); if let SelectDecision::Case(case_index) = select_decision { self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); - self.mode = Mode::Sync; + self.exec_state.mode = CompMode::Sync; } } @@ -636,158 +612,15 @@ impl CompPDL { } } - fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { - // Little local utility to send an Ack - fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_handle: LocalPeerHandle) { - let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{ - id: causer_id, - sender_comp_id: comp_ctx.id, - target_port_id: None, - content: ControlMessageContent::Ack, - }), true); - } - - // Handle the content of the control message, and optionally Ack it - match message.content { - ControlMessageContent::Ack => { - self.handle_ack(sched_ctx, comp_ctx, message.id); - }, - ControlMessageContent::BlockPort(port_id) => { - // On of our messages was accepted, but the port should be - // blocked. - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::Open { - // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); - } - }, - ControlMessageContent::ClosePort(port_id) => { - // Request to close the port. We immediately comply and remove - // the component handle as well - let port_handle = comp_ctx.get_port_handle(port_id); - let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; - let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - - // One exception to sending an `Ack` is if we just closed the - // port ourselves, meaning that the `ClosePort` messages got - // sent to one another. - if let Some(control_id) = self.control.has_close_port_entry(port_handle, comp_ctx) { - self.handle_ack(sched_ctx, comp_ctx, control_id); - } else { - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); - comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed - comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed - } - }, - ControlMessageContent::UnblockPort(port_id) => { - // We were previously blocked (or already closed) - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::BlockedDueToFullBuffers { - self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); - } - }, - ControlMessageContent::PortPeerChangedBlock(port_id) => { - // The peer of our port has just changed. So we are asked to - // temporarily block the port (while our original recipient is - // potentially rerouting some of the in-flight messages) and - // Ack. Then we wait for the `unblock` call. - debug_assert_eq!(message.target_port_id, Some(port_id)); - let port_handle = comp_ctx.get_port_handle(port_id); - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); - - let port_info = comp_ctx.get_port(port_handle); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); - }, - ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { - let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); - let port_info = comp_ctx.get_port(port_handle); - debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); - let old_peer_id = port_info.peer_comp_id; - - comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); - - let port_info = comp_ctx.get_port_mut(port_handle); - port_info.peer_comp_id = new_comp_id; - port_info.peer_port_id = new_port_id; - comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); - self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); - } - } - } - fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); self.handle_sync_decision(sched_ctx, comp_ctx, decision); } - /// Little helper that notifies the control layer of an `Ack`, and takes the - /// appropriate subsequent action - fn handle_ack(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control_id: ControlId) { - let mut to_ack = control_id; - loop { - let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); - match action { - AckAction::SendMessage(target_comp, message) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(target_comp); - handle.send_message(sched_ctx, Message::Control(message), true); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::ScheduleComponent(to_schedule) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(to_schedule); - - // Note that the component is intentionally not - // sleeping, so we just wake it up - debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); - let key = unsafe{ to_schedule.upgrade() }; - sched_ctx.runtime.enqueue_work(key); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::None => {} - } - - match new_to_ack { - Some(new_to_ack) => to_ack = new_to_ack, - None => break, - } - } - } - // ------------------------------------------------------------------------- // Handling ports // ------------------------------------------------------------------------- - /// Unblocks a port, potentially continuing execution of the component, in - /// response to a message that told us to unblock a previously blocked - fn handle_unblock_port_instruction(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { - let port_info = comp_ctx.get_port_mut(port_handle); - let port_id = port_info.self_id; - debug_assert!(port_info.state.is_blocked()); - port_info.state = PortState::Open; - - if self.mode == Mode::BlockedPut && port_id == self.mode_port { - // We were blocked on the port that just became unblocked, so - // send the message. - debug_assert_eq!(port_info.kind, PortKind::Putter); - let mut replacement = ValueGroup::default(); - std::mem::swap(&mut replacement, &mut self.mode_value); - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, replacement); - - self.mode = Mode::Sync; - self.mode_port = PortId::new_invalid(); - } - } - fn create_component_and_transfer_ports( &mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, @@ -810,9 +643,9 @@ impl CompPDL { let mut created_ctx = CompCtx::new(&reservation); println!( - "DEBUG: Comp '{}' is creating comp '{}' at ID {:?}", - self_proc.identifier.value.as_str(), other_proc.identifier.value.as_str(), - reservation.id() + "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", + self_proc.identifier.value.as_str(), creator_ctx.id, + other_proc.identifier.value.as_str(), reservation.id() ); // Take all the ports ID that are in the `args` (and currently belong to @@ -901,7 +734,7 @@ impl CompPDL { // its initial scheduling might be performed based on `Ack`s in response // to message exchanges between remote peers. let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); - let component = create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); + let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, );