diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index fcd9a8fe33a342cbaed4babdfef2d85ad7512f25..a18aaab3c7e49842356fd65651692255b7743501 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -222,6 +222,10 @@ pub(crate) struct CompPDL { } impl Component for CompPDL { + fn on_creation(&mut self, _sched_ctx: &SchedulerCtx) { + // Intentionally empty + } + fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) { let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); @@ -325,18 +329,19 @@ impl Component for CompPDL { EC::Put(port_id, value) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); sched_ctx.log(&format!("Putting value {:?}", value)); - let port_id = port_id_from_eval(port_id); - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - if port_info.state.is_blocked() { - self.exec_state.set_as_blocked_put(port_id, value); - self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked - return Ok(CompScheduling::Sleep); - } else { - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value); - self.exec_ctx.stmt = ExecStmt::PerformedPut; - return Ok(CompScheduling::Immediate); - } + + // Send the message + let target_port_id = port_id_from_eval(port_id); + let scheduling = component::default_send_data_message( + &mut self.exec_state, target_port_id, value, + sched_ctx, &mut self.consensus, comp_ctx + ); + + // When `run` is called again (potentially after becoming + // unblocked) we need to instruct the executor that we performed + // the `put` + self.exec_ctx.stmt = ExecStmt::PerformedPut; + return Ok(scheduling); }, EC::SelectStart(num_cases, _num_ports) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); @@ -443,7 +448,7 @@ impl CompPDL { self.consensus.notify_sync_start(comp_ctx); for message in self.inbox_main.iter() { if let Some(message) = message { - self.consensus.handle_new_data_message(comp_ctx, message); + self.consensus.handle_incoming_data_message(comp_ctx, message); } } debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); @@ -513,64 +518,36 @@ impl CompPDL { // Handling messages // ------------------------------------------------------------------------- - fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) { - let port_info = comp_ctx.get_port(source_port_handle); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = comp_ctx.get_peer(peer_handle); - let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); - } - /// Handles a message that came in through the public inbox. This function /// will handle putting it in the correct place, and potentially blocking /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + use component::IncomingData; + // Whatever we do, glean information from headers in message if self.exec_state.mode.is_in_sync_block() { - self.consensus.handle_new_data_message(comp_ctx, &message); + self.consensus.handle_incoming_data_message(comp_ctx, &message); } - // Check if we can insert it directly into the storage associated with - // the port - let target_port_id = message.data_header.target_port; - let port_handle = comp_ctx.get_port_handle(target_port_id); + let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); - if self.inbox_main[port_index].is_none() { - self.inbox_main[port_index] = Some(message); - - // After direct insertion, check if this component's execution is - // blocked on receiving a message on that port - debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly - if self.exec_state.is_blocked_on_get(target_port_id) { - // We were indeed blocked - self.exec_state.mode = CompMode::Sync; - self.exec_state.mode_port = PortId::new_invalid(); - } else if self.exec_state.mode == CompMode::BlockedSelect { - let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx); - if let SelectDecision::Case(case_index) = select_decision { - self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); - self.exec_state.mode = CompMode::Sync; + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message, + sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => { + if self.exec_state.mode == CompMode::BlockedSelect { + let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx); + if let SelectDecision::Case(case_index) = select_decision { + self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); + self.exec_state.mode = CompMode::Sync; + } } + }, + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); } - - return; } - - // The direct inbox is full, so the port will become (or was already) blocked - let port_info = comp_ctx.get_port_mut(port_handle); - debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); - - if port_info.state == PortState::Open { - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); - let (peer_handle, message) = - self.control.initiate_port_blocking(comp_ctx, port_handle); - - let peer = comp_ctx.get_peer(peer_handle); - peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); - } - - // But we still need to remember the message, so: - self.inbox_backup.push(message); } /// Handles when a message has been handed off from the inbox to the PDL @@ -732,6 +709,7 @@ impl CompPDL { let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, ); + component.component.on_creation(sched_ctx); // Now modify the creator's ports: remove every transferred port and // potentially remove the peer component.