diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 4e735be3142b0246610035fb13e7d4a7befa2b01..236cff963e1047d61a0c3f607543776650f6b2b6 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -10,7 +10,11 @@ use crate::protocol::eval::{ use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; -use super::component::*; +use super::component::{ + self, + CompExecState, Component, CompScheduling, CompMode, + port_id_from_eval, port_id_to_eval +}; use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; @@ -84,30 +88,6 @@ impl RunContext for ExecCtx { } } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub(crate) enum Mode { - NonSync, // not in sync mode - Sync, // in sync mode, can interact with other components - SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block - BlockedGet, // blocked because we need to receive a message on a particular port - BlockedPut, // component is blocked because the port is blocked - BlockedSelect, // waiting on message to complete the select statement - StartExit, // temporary state: if encountered then we start the shutdown process - BusyExit, // temporary state: waiting for Acks for all the closed ports - Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 -} - -impl Mode { - fn is_in_sync_block(&self) -> bool { - use Mode::*; - - match self { - Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, - NonSync | StartExit | BusyExit | Exit => false, - } - } -} - struct SelectCase { involved_ports: Vec, } @@ -226,10 +206,8 @@ impl SelectState { } pub(crate) struct CompPDL { - pub mode: Mode, - pub mode_port: PortId, // when blocked on a port - pub mode_value: ValueGroup, // when blocked on a put - select: SelectState, + pub exec_state: CompExecState, + pub select_state: SelectState, pub prompt: Prompt, pub control: ControlLayer, pub consensus: Consensus, @@ -269,7 +247,10 @@ impl Component for CompPDL { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { - self.handle_incoming_control_message(sched_ctx, comp_ctx, message); + component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx + ); }, Message::Sync(message) => { self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); @@ -280,30 +261,30 @@ impl Component for CompPDL { fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; - sched_ctx.log(&format!("Running component (mode: {:?})", self.mode)); + sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode)); // Depending on the mode don't do anything at all, take some special // actions, or fall through and run the PDL code. - match self.mode { - Mode::NonSync | Mode::Sync => { + match self.exec_state.mode { + CompMode::NonSync | CompMode::Sync => { // continue and run PDL code }, - Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::BlockedSelect => { + CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { return Ok(CompScheduling::Sleep); } - Mode::StartExit => { + CompMode::StartExit => { self.handle_component_exit(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, - Mode::BusyExit => { + CompMode::BusyExit => { if self.control.has_acks_remaining() { return Ok(CompScheduling::Sleep); } else { - self.mode = Mode::Exit; + self.exec_state.mode = CompMode::Exit; return Ok(CompScheduling::Exit); } }, - Mode::Exit => { + CompMode::Exit => { return Ok(CompScheduling::Exit); } } @@ -315,12 +296,12 @@ impl Component for CompPDL { EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.handle_sync_end(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::BlockGet(port_id) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); @@ -342,21 +323,18 @@ impl Component for CompPDL { } } else { // We need to wait - self.mode = Mode::BlockedGet; - self.mode_port = port_id; + self.exec_state.set_as_blocked_get(port_id); return Ok(CompScheduling::Sleep); } }, EC::Put(port_id, value) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); sched_ctx.log(&format!("Putting value {:?}", value)); let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); if port_info.state.is_blocked() { - self.mode = Mode::BlockedPut; - self.mode_port = port_id; - self.mode_value = value; + self.exec_state.set_as_blocked_put(port_id, value); self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked return Ok(CompScheduling::Sleep); } else { @@ -366,44 +344,44 @@ impl Component for CompPDL { } }, EC::SelectStart(num_cases, _num_ports) => { - debug_assert_eq!(self.mode, Mode::Sync); - self.select.handle_select_start(num_cases); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); + self.select_state.handle_select_start(num_cases); return Ok(CompScheduling::Requeue); }, EC::SelectRegisterPort(case_index, port_index, port_id) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); let port_id = port_id_from_eval(port_id); - if let Err(_err) = self.select.register_select_case_port(comp_ctx, case_index, port_index, port_id) { + if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { todo!("handle registering a port multiple times"); } return Ok(CompScheduling::Immediate); }, EC::SelectWait => { - debug_assert_eq!(self.mode, Mode::Sync); - let select_decision = self.select.handle_select_waiting_point(&self.inbox_main, comp_ctx); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); + let select_decision = self.select_state.handle_select_waiting_point(&self.inbox_main, comp_ctx); if let SelectDecision::Case(case_index) = select_decision { // Reached a conclusion, so we can continue immediately self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); - self.mode = Mode::Sync; + self.exec_state.mode = CompMode::Sync; return Ok(CompScheduling::Immediate); } else { // No decision yet - self.mode = Mode::BlockedSelect; + self.exec_state.mode = CompMode::BlockedSelect; return Ok(CompScheduling::Sleep); } }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { - self.mode = Mode::StartExit; // next call we'll take care of the exit + self.exec_state.mode = CompMode::StartExit; // next call we'll take care of the exit return Ok(CompScheduling::Immediate); }, EC::SyncBlockStart => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::NewComponent(definition_id, type_id, arguments) => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.create_component_and_transfer_ports( sched_ctx, comp_ctx, definition_id, type_id, arguments @@ -411,7 +389,7 @@ impl Component for CompPDL { return Ok(CompScheduling::Requeue); }, EC::NewChannel => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); debug_assert!(self.exec_ctx.stmt.is_none()); let channel = comp_ctx.create_channel(); self.exec_ctx.stmt = ExecStmt::CreatedChannel(( @@ -435,10 +413,8 @@ impl CompPDL { } return Self{ - mode: Mode::NonSync, - mode_port: PortId::new_invalid(), - mode_value: ValueGroup::default(), - select: SelectState::new(), + exec_state: CompExecState::new(), + select_state: SelectState::new(), prompt: initial_state, control: ControlLayer::default(), consensus: Consensus::new(), @@ -475,8 +451,8 @@ impl CompPDL { self.consensus.handle_new_data_message(comp_ctx, message); } } - debug_assert_eq!(self.mode, Mode::NonSync); - self.mode = Mode::Sync; + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); + self.exec_state.mode = CompMode::Sync; } /// Handles end of sync. The conclusion to the sync round might arise @@ -486,7 +462,7 @@ impl CompPDL { fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); - self.mode = Mode::SyncEnd; + self.exec_state.mode = CompMode::SyncEnd; self.handle_sync_decision(sched_ctx, comp_ctx, decision); } @@ -505,19 +481,19 @@ impl CompPDL { }; // If here then we've reached a decision - debug_assert_eq!(self.mode, Mode::SyncEnd); + debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); if is_success { - self.mode = Mode::NonSync; + self.exec_state.mode = CompMode::NonSync; self.consensus.notify_sync_decision(decision); } else { - self.mode = Mode::StartExit; + self.exec_state.mode = CompMode::StartExit; } } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component exiting"); - debug_assert_eq!(self.mode, Mode::StartExit); - self.mode = Mode::BusyExit; + debug_assert_eq!(self.exec_state.mode, CompMode::StartExit); + self.exec_state.mode = CompMode::BusyExit; // Doing this by index, then retrieving the handle is a bit rediculous, // but Rust is being Rust with its borrowing rules. @@ -557,7 +533,7 @@ impl CompPDL { /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { // Whatever we do, glean information from headers in message - if self.mode.is_in_sync_block() { + if self.exec_state.mode.is_in_sync_block() { self.consensus.handle_new_data_message(comp_ctx, &message); } @@ -572,15 +548,15 @@ impl CompPDL { // After direct insertion, check if this component's execution is // blocked on receiving a message on that port debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly - if self.mode == Mode::BlockedGet && self.mode_port == target_port_id { + if self.exec_state.is_blocked_on_get(target_port_id) { // We were indeed blocked - self.mode = Mode::Sync; - self.mode_port = PortId::new_invalid(); - } else if self.mode == Mode::BlockedSelect { - let select_decision = self.select.handle_updated_inbox(&self.inbox_main, comp_ctx); + self.exec_state.mode = CompMode::Sync; + self.exec_state.mode_port = PortId::new_invalid(); + } else if self.exec_state.mode == CompMode::BlockedSelect { + let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx); if let SelectDecision::Case(case_index) = select_decision { self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); - self.mode = Mode::Sync; + self.exec_state.mode = CompMode::Sync; } } @@ -636,158 +612,15 @@ impl CompPDL { } } - fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { - // Little local utility to send an Ack - fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_handle: LocalPeerHandle) { - let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{ - id: causer_id, - sender_comp_id: comp_ctx.id, - target_port_id: None, - content: ControlMessageContent::Ack, - }), true); - } - - // Handle the content of the control message, and optionally Ack it - match message.content { - ControlMessageContent::Ack => { - self.handle_ack(sched_ctx, comp_ctx, message.id); - }, - ControlMessageContent::BlockPort(port_id) => { - // On of our messages was accepted, but the port should be - // blocked. - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::Open { - // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); - } - }, - ControlMessageContent::ClosePort(port_id) => { - // Request to close the port. We immediately comply and remove - // the component handle as well - let port_handle = comp_ctx.get_port_handle(port_id); - let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; - let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - - // One exception to sending an `Ack` is if we just closed the - // port ourselves, meaning that the `ClosePort` messages got - // sent to one another. - if let Some(control_id) = self.control.has_close_port_entry(port_handle, comp_ctx) { - self.handle_ack(sched_ctx, comp_ctx, control_id); - } else { - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); - comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed - comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed - } - }, - ControlMessageContent::UnblockPort(port_id) => { - // We were previously blocked (or already closed) - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::BlockedDueToFullBuffers { - self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); - } - }, - ControlMessageContent::PortPeerChangedBlock(port_id) => { - // The peer of our port has just changed. So we are asked to - // temporarily block the port (while our original recipient is - // potentially rerouting some of the in-flight messages) and - // Ack. Then we wait for the `unblock` call. - debug_assert_eq!(message.target_port_id, Some(port_id)); - let port_handle = comp_ctx.get_port_handle(port_id); - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); - - let port_info = comp_ctx.get_port(port_handle); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); - }, - ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { - let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); - let port_info = comp_ctx.get_port(port_handle); - debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); - let old_peer_id = port_info.peer_comp_id; - - comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); - - let port_info = comp_ctx.get_port_mut(port_handle); - port_info.peer_comp_id = new_comp_id; - port_info.peer_port_id = new_port_id; - comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); - self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); - } - } - } - fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); self.handle_sync_decision(sched_ctx, comp_ctx, decision); } - /// Little helper that notifies the control layer of an `Ack`, and takes the - /// appropriate subsequent action - fn handle_ack(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control_id: ControlId) { - let mut to_ack = control_id; - loop { - let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); - match action { - AckAction::SendMessage(target_comp, message) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(target_comp); - handle.send_message(sched_ctx, Message::Control(message), true); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::ScheduleComponent(to_schedule) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(to_schedule); - - // Note that the component is intentionally not - // sleeping, so we just wake it up - debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); - let key = unsafe{ to_schedule.upgrade() }; - sched_ctx.runtime.enqueue_work(key); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::None => {} - } - - match new_to_ack { - Some(new_to_ack) => to_ack = new_to_ack, - None => break, - } - } - } - // ------------------------------------------------------------------------- // Handling ports // ------------------------------------------------------------------------- - /// Unblocks a port, potentially continuing execution of the component, in - /// response to a message that told us to unblock a previously blocked - fn handle_unblock_port_instruction(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { - let port_info = comp_ctx.get_port_mut(port_handle); - let port_id = port_info.self_id; - debug_assert!(port_info.state.is_blocked()); - port_info.state = PortState::Open; - - if self.mode == Mode::BlockedPut && port_id == self.mode_port { - // We were blocked on the port that just became unblocked, so - // send the message. - debug_assert_eq!(port_info.kind, PortKind::Putter); - let mut replacement = ValueGroup::default(); - std::mem::swap(&mut replacement, &mut self.mode_value); - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, replacement); - - self.mode = Mode::Sync; - self.mode_port = PortId::new_invalid(); - } - } - fn create_component_and_transfer_ports( &mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, @@ -810,9 +643,9 @@ impl CompPDL { let mut created_ctx = CompCtx::new(&reservation); println!( - "DEBUG: Comp '{}' is creating comp '{}' at ID {:?}", - self_proc.identifier.value.as_str(), other_proc.identifier.value.as_str(), - reservation.id() + "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", + self_proc.identifier.value.as_str(), creator_ctx.id, + other_proc.identifier.value.as_str(), reservation.id() ); // Take all the ports ID that are in the `args` (and currently belong to @@ -901,7 +734,7 @@ impl CompPDL { // its initial scheduling might be performed based on `Ack`s in response // to message exchanges between remote peers. let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); - let component = create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); + let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, );