diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index a73582435ecaeecf86030c59d95f085e71cff9b1..dd3081ee1d12512335b33c706a690cfa21fc7612 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -7,20 +7,19 @@ use crate::protocol::eval::{ EvalContinuation, EvalResult, EvalError }; +use crate::runtime2::runtime::CompId; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; +use super::component::{ + self, + CompExecState, Component, CompScheduling, CompMode, + port_id_from_eval, port_id_to_eval +}; use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; -pub enum CompScheduling { - Immediate, - Requeue, - Sleep, - Exit, -} - pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, @@ -90,30 +89,6 @@ impl RunContext for ExecCtx { } } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub(crate) enum Mode { - NonSync, // not in sync mode - Sync, // in sync mode, can interact with other components - SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block - BlockedGet, // blocked because we need to receive a message on a particular port - BlockedPut, // component is blocked because the port is blocked - BlockedSelect, // waiting on message to complete the select statement - StartExit, // temporary state: if encountered then we start the shutdown process - BusyExit, // temporary state: waiting for Acks for all the closed ports - Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 -} - -impl Mode { - fn is_in_sync_block(&self) -> bool { - use Mode::*; - - match self { - Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, - NonSync | StartExit | BusyExit | Exit => false, - } - } -} - struct SelectCase { involved_ports: Vec, } @@ -232,10 +207,8 @@ impl SelectState { } pub(crate) struct CompPDL { - pub mode: Mode, - pub mode_port: PortId, // when blocked on a port - pub mode_value: ValueGroup, // when blocked on a put - select: SelectState, + pub exec_state: CompExecState, + select_state: SelectState, pub prompt: Prompt, pub control: ControlLayer, pub consensus: Consensus, @@ -249,36 +222,30 @@ pub(crate) struct CompPDL { pub inbox_backup: Vec, } -impl CompPDL { - pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { - let mut inbox_main = Vec::new(); - inbox_main.reserve(num_ports); - for _ in 0..num_ports { - inbox_main.push(None); - } +impl Component for CompPDL { + fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) { + // Intentionally empty + } - return Self{ - mode: Mode::NonSync, - mode_port: PortId::new_invalid(), - mode_value: ValueGroup::default(), - select: SelectState::new(), - prompt: initial_state, - control: ControlLayer::default(), - consensus: Consensus::new(), - sync_counter: 0, - exec_ctx: ExecCtx{ - stmt: ExecStmt::None, - }, - inbox_main, - inbox_backup: Vec::new(), + fn on_shutdown(&mut self, _sched_ctx: &SchedulerCtx) { + // Intentionally empty + } + + fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) { + let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); + let port_index = comp_ctx.get_port_index(port_handle); + if self.inbox_main[port_index].is_none() { + self.inbox_main[port_index] = Some(message); + } else { + self.inbox_backup.push(message); } } - pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { - sched_ctx.log(&format!("handling message: {:#?}", message)); + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { + // sched_ctx.log(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { - let mut target = sched_ctx.runtime.get_component_public(new_target); - target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks + let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle + target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(_should_remove.is_none()); return; @@ -289,47 +256,41 @@ impl CompPDL { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { - self.handle_incoming_control_message(sched_ctx, comp_ctx, message); + component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx + ); }, Message::Sync(message) => { self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); + }, + Message::Poll => { + unreachable!(); // because we never register at the polling thread } } } - // ------------------------------------------------------------------------- - // Running component and handling changes in global component state - // ------------------------------------------------------------------------- - - pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; - sched_ctx.log(&format!("Running component (mode: {:?})", self.mode)); + sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode)); // Depending on the mode don't do anything at all, take some special // actions, or fall through and run the PDL code. - match self.mode { - Mode::NonSync | Mode::Sync | Mode::BlockedSelect => { + match self.exec_state.mode { + CompMode::NonSync | CompMode::Sync => { // continue and run PDL code }, - Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => { + CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { return Ok(CompScheduling::Sleep); } - Mode::StartExit => { - self.handle_component_exit(sched_ctx, comp_ctx); - return Ok(CompScheduling::Immediate); - }, - Mode::BusyExit => { - if self.control.has_acks_remaining() { - return Ok(CompScheduling::Sleep); - } else { - self.mode = Mode::Exit; - return Ok(CompScheduling::Exit); - } - }, - Mode::Exit => { - return Ok(CompScheduling::Exit); - } + CompMode::StartExit => return Ok(component::default_handle_start_exit( + &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx + )), + CompMode::BusyExit => return Ok(component::default_handle_busy_exit( + &mut self.exec_state, &self.control, sched_ctx + )), + CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), } let run_result = self.execute_prompt(&sched_ctx)?; @@ -339,12 +300,12 @@ impl CompPDL { EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.handle_sync_end(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::BlockGet(port_id) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); @@ -356,7 +317,10 @@ impl CompPDL { // Message was received. Make sure any blocked peers and // pending messages are handled. let message = self.inbox_main[port_index].take().unwrap(); - self.handle_received_data_message(sched_ctx, comp_ctx, port_handle); + component::default_handle_received_data_message( + port_id, &mut self.inbox_main[port_index], &mut self.inbox_backup, + comp_ctx, sched_ctx, &mut self.control + ); self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); return Ok(CompScheduling::Immediate); @@ -366,68 +330,66 @@ impl CompPDL { } } else { // We need to wait - self.mode = Mode::BlockedGet; - self.mode_port = port_id; + self.exec_state.set_as_blocked_get(port_id); return Ok(CompScheduling::Sleep); } }, EC::Put(port_id, value) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); sched_ctx.log(&format!("Putting value {:?}", value)); - let port_id = port_id_from_eval(port_id); - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - if port_info.state.is_blocked() { - self.mode = Mode::BlockedPut; - self.mode_port = port_id; - self.mode_value = value; - self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked - return Ok(CompScheduling::Sleep); - } else { - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value); - self.exec_ctx.stmt = ExecStmt::PerformedPut; - return Ok(CompScheduling::Immediate); - } + + // Send the message + let target_port_id = port_id_from_eval(port_id); + let scheduling = component::default_send_data_message( + &mut self.exec_state, target_port_id, value, + sched_ctx, &mut self.consensus, comp_ctx + ); + + // When `run` is called again (potentially after becoming + // unblocked) we need to instruct the executor that we performed + // the `put` + self.exec_ctx.stmt = ExecStmt::PerformedPut; + return Ok(scheduling); }, EC::SelectStart(num_cases, _num_ports) => { - debug_assert_eq!(self.mode, Mode::Sync); - self.select.handle_select_start(num_cases); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); + self.select_state.handle_select_start(num_cases); return Ok(CompScheduling::Requeue); }, EC::SelectRegisterPort(case_index, port_index, port_id) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); let port_id = port_id_from_eval(port_id); - if let Err(_err) = self.select.register_select_case_port(comp_ctx, case_index, port_index, port_id) { + if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { todo!("handle registering a port multiple times"); } return Ok(CompScheduling::Immediate); }, EC::SelectWait => { - debug_assert_eq!(self.mode, Mode::Sync); - let select_decision = self.select.handle_select_waiting_point(&self.inbox_main, comp_ctx); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); + let select_decision = self.select_state.handle_select_waiting_point(&self.inbox_main, comp_ctx); if let SelectDecision::Case(case_index) = select_decision { // Reached a conclusion, so we can continue immediately self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); - self.mode = Mode::Sync; + self.exec_state.mode = CompMode::Sync; return Ok(CompScheduling::Immediate); } else { // No decision yet - self.mode = Mode::BlockedSelect; + self.exec_state.mode = CompMode::BlockedSelect; return Ok(CompScheduling::Sleep); } }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { - self.mode = Mode::StartExit; // next call we'll take care of the exit + self.exec_state.mode = CompMode::StartExit; // next call we'll take care of the exit return Ok(CompScheduling::Immediate); }, EC::SyncBlockStart => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::NewComponent(definition_id, type_id, arguments) => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.create_component_and_transfer_ports( sched_ctx, comp_ctx, definition_id, type_id, arguments @@ -435,7 +397,7 @@ impl CompPDL { return Ok(CompScheduling::Requeue); }, EC::NewChannel => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); debug_assert!(self.exec_ctx.stmt.is_none()); let channel = comp_ctx.create_channel(); self.exec_ctx.stmt = ExecStmt::CreatedChannel(( @@ -448,6 +410,34 @@ impl CompPDL { } } } +} + +impl CompPDL { + pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { + let mut inbox_main = Vec::new(); + inbox_main.reserve(num_ports); + for _ in 0..num_ports { + inbox_main.push(None); + } + + return Self{ + exec_state: CompExecState::new(), + select_state: SelectState::new(), + prompt: initial_state, + control: ControlLayer::default(), + consensus: Consensus::new(), + sync_counter: 0, + exec_ctx: ExecCtx{ + stmt: ExecStmt::None, + }, + inbox_main, + inbox_backup: Vec::new(), + } + } + + // ------------------------------------------------------------------------- + // Running component and handling changes in global component state + // ------------------------------------------------------------------------- fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { let mut step_result = EvalContinuation::Stepping; @@ -466,11 +456,11 @@ impl CompPDL { self.consensus.notify_sync_start(comp_ctx); for message in self.inbox_main.iter() { if let Some(message) = message { - self.consensus.handle_new_data_message(comp_ctx, message); + self.consensus.handle_incoming_data_message(comp_ctx, message); } } - debug_assert_eq!(self.mode, Mode::NonSync); - self.mode = Mode::Sync; + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); + self.exec_state.mode = CompMode::Sync; } /// Handles end of sync. The conclusion to the sync round might arise @@ -480,7 +470,7 @@ impl CompPDL { fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); - self.mode = Mode::SyncEnd; + self.exec_state.mode = CompMode::SyncEnd; self.handle_sync_decision(sched_ctx, comp_ctx, decision); } @@ -488,30 +478,28 @@ impl CompPDL { /// the internal `Mode`, such that the next call to `run` can take the /// appropriate next steps. fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { - sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.mode)); - let is_success = match decision { + sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.exec_state.mode)); + match decision { SyncRoundDecision::None => { // No decision yet return; }, - SyncRoundDecision::Solution => true, - SyncRoundDecision::Failure => false, - }; - - // If here then we've reached a decision - debug_assert_eq!(self.mode, Mode::SyncEnd); - if is_success { - self.mode = Mode::NonSync; - self.consensus.notify_sync_decision(decision); - } else { - self.mode = Mode::StartExit; + SyncRoundDecision::Solution => { + debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); + self.exec_state.mode = CompMode::NonSync; + self.consensus.notify_sync_decision(decision); + }, + SyncRoundDecision::Failure => { + debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); + self.exec_state.mode = CompMode::StartExit; + }, } } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component exiting"); - debug_assert_eq!(self.mode, Mode::StartExit); - self.mode = Mode::BusyExit; + debug_assert_eq!(self.exec_state.mode, CompMode::StartExit); + self.exec_state.mode = CompMode::BusyExit; // Doing this by index, then retrieving the handle is a bit rediculous, // but Rust is being Rust with its borrowing rules. @@ -530,7 +518,7 @@ impl CompPDL { let port_handle = comp_ctx.get_port_handle(port_id); let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx); let peer_info = comp_ctx.get_peer(peer); - peer_info.handle.send_message(sched_ctx, Message::Control(message), true); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } } @@ -538,180 +526,34 @@ impl CompPDL { // Handling messages // ------------------------------------------------------------------------- - fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) { - let port_info = comp_ctx.get_port(source_port_handle); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = comp_ctx.get_peer(peer_handle); - let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true); - } - /// Handles a message that came in through the public inbox. This function /// will handle putting it in the correct place, and potentially blocking /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { - // Whatever we do, glean information from headers in message - if self.mode.is_in_sync_block() { - self.consensus.handle_new_data_message(comp_ctx, &message); - } - - // Check if we can insert it directly into the storage associated with - // the port - let target_port_id = message.data_header.target_port; - let port_handle = comp_ctx.get_port_handle(target_port_id); - let port_index = comp_ctx.get_port_index(port_handle); - if self.inbox_main[port_index].is_none() { - self.inbox_main[port_index] = Some(message); + use component::IncomingData; - // After direct insertion, check if this component's execution is - // blocked on receiving a message on that port - debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly - if self.mode == Mode::BlockedGet && self.mode_port == target_port_id { - // We were indeed blocked - self.mode = Mode::Sync; - self.mode_port = PortId::new_invalid(); - } else if self.mode == Mode::BlockedSelect { - let select_decision = self.select.handle_updated_inbox(&self.inbox_main, comp_ctx); - if let SelectDecision::Case(case_index) = select_decision { - self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); - self.mode = Mode::Sync; - } - } - - return; - } - - // The direct inbox is full, so the port will become (or was already) blocked - let port_info = comp_ctx.get_port_mut(port_handle); - debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); - - if port_info.state == PortState::Open { - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); - let (peer_handle, message) = - self.control.initiate_port_blocking(comp_ctx, port_handle); - - let peer = comp_ctx.get_peer(peer_handle); - peer.handle.send_message(sched_ctx, Message::Control(message), true); + // Whatever we do, glean information from headers in message + if self.exec_state.mode.is_in_sync_block() { + self.consensus.handle_incoming_data_message(comp_ctx, &message); } - // But we still need to remember the message, so: - self.inbox_backup.push(message); - } - - /// Handles when a message has been handed off from the inbox to the PDL - /// code. We check to see if there are more messages waiting and, if not, - /// then we handle the case where the port might have been blocked - /// previously. - fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { + let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); - debug_assert!(self.inbox_main[port_index].is_none()); // this function should be called after the message is taken out - - // Check for any more messages - let port_info = comp_ctx.get_port(port_handle); - for message_index in 0..self.inbox_backup.len() { - let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == port_info.self_id { - // One more message for this port - let message = self.inbox_backup.remove(message_index); - debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we had >1 message on the port - self.inbox_main[port_index] = Some(message); - - return; - } - } - - // Did not have any more messages. So if we were blocked, then we need - // to send the "unblock" message. - if port_info.state == PortState::BlockedDueToFullBuffers { - comp_ctx.set_port_state(port_handle, PortState::Open); - let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle); - let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Control(message), true); - } - } - - fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { - // Little local utility to send an Ack - fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_handle: LocalPeerHandle) { - let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{ - id: causer_id, - sender_comp_id: comp_ctx.id, - target_port_id: None, - content: ControlMessageContent::Ack, - }), true); - } - - // Handle the content of the control message, and optionally Ack it - match message.content { - ControlMessageContent::Ack => { - self.handle_ack(sched_ctx, comp_ctx, message.id); - }, - ControlMessageContent::BlockPort(port_id) => { - // On of our messages was accepted, but the port should be - // blocked. - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::Open { - // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); - } - }, - ControlMessageContent::ClosePort(port_id) => { - // Request to close the port. We immediately comply and remove - // the component handle as well - let port_handle = comp_ctx.get_port_handle(port_id); - let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; - let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - - // One exception to sending an `Ack` is if we just closed the - // port ourselves, meaning that the `ClosePort` messages got - // sent to one another. - if let Some(control_id) = self.control.has_close_port_entry(port_handle, comp_ctx) { - self.handle_ack(sched_ctx, comp_ctx, control_id); - } else { - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); - comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed - comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed - } - }, - ControlMessageContent::UnblockPort(port_id) => { - // We were previously blocked (or already closed) - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::BlockedDueToFullBuffers { - self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message, + sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => { + if self.exec_state.mode == CompMode::BlockedSelect { + let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx); + if let SelectDecision::Case(case_index) = select_decision { + self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); + self.exec_state.mode = CompMode::Sync; + } } }, - ControlMessageContent::PortPeerChangedBlock(port_id) => { - // The peer of our port has just changed. So we are asked to - // temporarily block the port (while our original recipient is - // potentially rerouting some of the in-flight messages) and - // Ack. Then we wait for the `unblock` call. - debug_assert_eq!(message.target_port_id, Some(port_id)); - let port_handle = comp_ctx.get_port_handle(port_id); - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); - - let port_info = comp_ctx.get_port(port_handle); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); - }, - ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { - let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); - let port_info = comp_ctx.get_port(port_handle); - debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); - let old_peer_id = port_info.peer_comp_id; - - comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); - - let port_info = comp_ctx.get_port_mut(port_handle); - port_info.peer_comp_id = new_comp_id; - port_info.peer_port_id = new_port_id; - comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); - self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); } } } @@ -721,67 +563,16 @@ impl CompPDL { self.handle_sync_decision(sched_ctx, comp_ctx, decision); } - /// Little helper that notifies the control layer of an `Ack`, and takes the - /// appropriate subsequent action - fn handle_ack(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control_id: ControlId) { - let mut to_ack = control_id; - loop { - let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); - match action { - AckAction::SendMessage(target_comp, message) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(target_comp); - handle.send_message(sched_ctx, Message::Control(message), true); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::ScheduleComponent(to_schedule) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(to_schedule); - - // Note that the component is intentionally not - // sleeping, so we just wake it up - debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); - let key = unsafe{ to_schedule.upgrade() }; - sched_ctx.runtime.enqueue_work(key); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::None => {} - } - - match new_to_ack { - Some(new_to_ack) => to_ack = new_to_ack, - None => break, - } - } - } - // ------------------------------------------------------------------------- // Handling ports // ------------------------------------------------------------------------- - /// Unblocks a port, potentially continuing execution of the component, in - /// response to a message that told us to unblock a previously blocked - fn handle_unblock_port_instruction(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { - let port_info = comp_ctx.get_port_mut(port_handle); - let port_id = port_info.self_id; - debug_assert!(port_info.state.is_blocked()); - port_info.state = PortState::Open; - - if self.mode == Mode::BlockedPut && port_id == self.mode_port { - // We were blocked on the port that just became unblocked, so - // send the message. - debug_assert_eq!(port_info.kind, PortKind::Putter); - let mut replacement = ValueGroup::default(); - std::mem::swap(&mut replacement, &mut self.mode_value); - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, replacement); - - self.mode = Mode::Sync; - self.mode_port = PortId::new_invalid(); - } - } - + /// Creates a new component and transfers ports. Because of the stepwise + /// process in which memory is allocated, ports are transferred, messages + /// are exchanged, component lifecycle methods are called, etc. This + /// function facilitates a lot of implicit assumptions (e.g. when the + /// `Component::on_creation` method is called, the component is already + /// registered at the runtime). fn create_component_and_transfer_ports( &mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, @@ -793,15 +584,27 @@ impl CompPDL { created_handle: LocalPortHandle, created_id: PortId, } - let mut port_id_pairs = Vec::new(); + let mut opened_port_id_pairs = Vec::new(); + let mut closed_port_id_pairs = Vec::new(); let reservation = sched_ctx.runtime.start_create_pdl_component(); let mut created_ctx = CompCtx::new(&reservation); + let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; + let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; + + // dbg_code!({ + // sched_ctx.log(&format!( + // "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", + // self_proc.identifier.value.as_str(), creator_ctx.id, + // other_proc.identifier.value.as_str(), reservation.id() + // )); + // }); + // Take all the ports ID that are in the `args` (and currently belong to // the creator component) and translate them into new IDs that are // associated with the component we're about to create - let mut arg_iter = ValueGroupIter::new(&mut arguments); + let mut arg_iter = ValueGroupPortIter::new(&mut arguments); while let Some(port_reference) = arg_iter.next() { // Create port entry for new component let creator_port_id = port_reference.id; @@ -814,12 +617,18 @@ impl CompPDL { let created_port = created_ctx.get_port(created_port_handle); let created_port_id = created_port.self_id; - port_id_pairs.push(PortPair{ + let port_id_pair = PortPair { creator_handle: creator_port_handle, creator_id: creator_port_id, created_handle: created_port_handle, created_id: created_port_id, - }); + }; + + if creator_port.state == PortState::Closed { + closed_port_id_pairs.push(port_id_pair) + } else { + opened_port_id_pairs.push(port_id_pair); + } // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) let arg_value = if let Some(heap_pos) = port_reference.heap_pos { @@ -839,20 +648,20 @@ impl CompPDL { // the new component. let mut created_component_has_remote_peers = false; - for pair in port_id_pairs.iter() { + for pair in opened_port_id_pairs.iter() { let creator_port_info = creator_ctx.get_port(pair.creator_handle); let created_port_info = created_ctx.get_port_mut(pair.created_handle); if created_port_info.peer_comp_id == creator_ctx.id { // Port peer is owned by the creator as well - let created_peer_port_index = port_id_pairs + let created_peer_port_index = opened_port_id_pairs .iter() .position(|v| v.creator_id == creator_port_info.peer_port_id); match created_peer_port_index { Some(created_peer_port_index) => { // Peer port moved to the new component as well. So // adjust IDs appropriately. - let peer_pair = &port_id_pairs[created_peer_port_index]; + let peer_pair = &opened_port_id_pairs[created_peer_port_index]; created_port_info.peer_port_id = peer_pair.created_id; created_port_info.peer_comp_id = reservation.id(); todo!("either add 'self peer', or remove that idea from Ctx altogether") @@ -877,20 +686,16 @@ impl CompPDL { // actual component. Note that we initialize it as "not sleeping" as // its initial scheduling might be performed based on `Ack`s in response // to message exchanges between remote peers. - let prompt = Prompt::new( - &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, - definition_id, type_id, arguments, - ); - let component = CompPDL::new(prompt, port_id_pairs.len()); + let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); + let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, ); - let created_ctx = &component.ctx; + component.component.on_creation(created_key.downgrade(), sched_ctx); // Now modify the creator's ports: remove every transferred port and - // potentially remove the peer component. Here is also where we will - // transfer messages in the main inbox. - for pair in port_id_pairs.iter() { + // potentially remove the peer component. + for pair in opened_port_id_pairs.iter() { // Remove peer if appropriate let creator_port_info = creator_ctx.get_port(pair.creator_handle); let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); @@ -899,12 +704,9 @@ impl CompPDL { creator_ctx.remove_port(pair.creator_handle); // Transfer any messages - let created_port_index = created_ctx.get_port_index(pair.created_handle); - let created_port_info = created_ctx.get_port(pair.created_handle); - debug_assert!(component.code.inbox_main[created_port_index].is_none()); if let Some(mut message) = self.inbox_main.remove(creator_port_index) { message.data_header.target_port = pair.created_id; - component.code.inbox_main[created_port_index] = Some(message); + component.component.adopt_message(&mut component.ctx, message) } let mut message_index = 0; @@ -914,27 +716,43 @@ impl CompPDL { // transfer message let mut message = self.inbox_backup.remove(message_index); message.data_header.target_port = pair.created_id; - component.code.inbox_backup.push(message); + component.component.adopt_message(&mut component.ctx, message); } else { message_index += 1; } } // Handle potential channel between creator and created component + let created_port_info = component.ctx.get_port(pair.created_handle); + if created_port_info.peer_comp_id == creator_ctx.id { let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); - peer_port_info.peer_comp_id = created_ctx.id; + peer_port_info.peer_comp_id = component.ctx.id; peer_port_info.peer_port_id = created_port_info.self_id; - creator_ctx.add_peer(peer_port_handle, sched_ctx, created_ctx.id, None); + creator_ctx.add_peer(peer_port_handle, sched_ctx, component.ctx.id, None); } } - // By now all ports have been transferred. We'll now do any of the setup - // for rerouting/messaging + // Do the same for the closed ports + for pair in closed_port_id_pairs.iter() { + let port_index = creator_ctx.get_port_index(pair.creator_handle); + creator_ctx.remove_port(pair.creator_handle); + let _removed_message = self.inbox_main.remove(port_index); + + // In debug mode: since we've closed the port we shouldn't have any + // messages for that port. + debug_assert!(_removed_message.is_none()); + debug_assert!(!self.inbox_backup.iter().any(|v| v.data_header.target_port == pair.creator_id)); + } + + // By now all ports and messages have been transferred. If there are any + // peers that need to be notified about this new component, then we + // initiate the protocol that will notify everyone here. if created_component_has_remote_peers { + let created_ctx = &component.ctx; let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); - for pair in port_id_pairs.iter() { + for pair in opened_port_id_pairs.iter() { let port_info = created_ctx.get_port(pair.created_handle); if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { let message = self.control.add_reroute_entry( @@ -944,7 +762,7 @@ impl CompPDL { ); let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = created_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, message, true); + peer_info.handle.send_message(&sched_ctx.runtime, message, true); } } } else { @@ -954,16 +772,6 @@ impl CompPDL { } } -#[inline] -fn port_id_from_eval(port_id: EvalPortId) -> PortId { - return PortId(port_id.id); -} - -#[inline] -fn port_id_to_eval(port_id: PortId) -> EvalPortId { - return EvalPortId{ id: port_id.0 }; -} - /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { @@ -1005,13 +813,13 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve } } -struct ValueGroupIter<'a> { +struct ValueGroupPortIter<'a> { group: &'a mut ValueGroup, heap_stack: Vec<(usize, usize)>, index: usize, } -impl<'a> ValueGroupIter<'a> { +impl<'a> ValueGroupPortIter<'a> { fn new(group: &'a mut ValueGroup) -> Self { return Self{ group, heap_stack: Vec::new(), index: 0 } } @@ -1023,7 +831,7 @@ struct ValueGroupPortRef { index: usize, } -impl<'a> Iterator for ValueGroupIter<'a> { +impl<'a> Iterator for ValueGroupPortIter<'a> { type Item = ValueGroupPortRef; fn next(&mut self) -> Option {