diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 338d05bde302f39e2d796e59f12173102be2b09d..dd3081ee1d12512335b33c706a690cfa21fc7612 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -227,6 +227,10 @@ impl Component for CompPDL { // Intentionally empty } + fn on_shutdown(&mut self, _sched_ctx: &SchedulerCtx) { + // Intentionally empty + } + fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) { let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); @@ -238,7 +242,7 @@ impl Component for CompPDL { } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { - sched_ctx.log(&format!("handling message: {:#?}", message)); + // sched_ctx.log(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks @@ -313,7 +317,10 @@ impl Component for CompPDL { // Message was received. Make sure any blocked peers and // pending messages are handled. let message = self.inbox_main[port_index].take().unwrap(); - self.handle_received_data_message(sched_ctx, comp_ctx, port_handle); + component::default_handle_received_data_message( + port_id, &mut self.inbox_main[port_index], &mut self.inbox_backup, + comp_ctx, sched_ctx, &mut self.control + ); self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); return Ok(CompScheduling::Immediate); @@ -551,38 +558,6 @@ impl CompPDL { } } - /// Handles when a message has been handed off from the inbox to the PDL - /// code. We check to see if there are more messages waiting and, if not, - /// then we handle the case where the port might have been blocked - /// previously. - fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { - let port_index = comp_ctx.get_port_index(port_handle); - debug_assert!(self.inbox_main[port_index].is_none()); // this function should be called after the message is taken out - - // Check for any more messages - let port_info = comp_ctx.get_port(port_handle); - for message_index in 0..self.inbox_backup.len() { - let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == port_info.self_id { - // One more message for this port - let message = self.inbox_backup.remove(message_index); - debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we had >1 message on the port - self.inbox_main[port_index] = Some(message); - - return; - } - } - - // Did not have any more messages. So if we were blocked, then we need - // to send the "unblock" message. - if port_info.state == PortState::BlockedDueToFullBuffers { - comp_ctx.set_port_state(port_handle, PortState::Open); - let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle); - let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); - } - } - fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); self.handle_sync_decision(sched_ctx, comp_ctx, decision); @@ -618,13 +593,13 @@ impl CompPDL { let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; - dbg_code!({ - sched_ctx.log(&format!( - "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", - self_proc.identifier.value.as_str(), creator_ctx.id, - other_proc.identifier.value.as_str(), reservation.id() - )); - }); + // dbg_code!({ + // sched_ctx.log(&format!( + // "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", + // self_proc.identifier.value.as_str(), creator_ctx.id, + // other_proc.identifier.value.as_str(), reservation.id() + // )); + // }); // Take all the ports ID that are in the `args` (and currently belong to // the creator component) and translate them into new IDs that are