diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index c8691e1acf8b09a2c770d95d69fb4dfdcaf2d9d3..03dd5523e71f6d15f47ad885cc4fd19d9ebe235c 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -182,6 +182,7 @@ pub(crate) enum Mode { SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block BlockedGet, BlockedPut, + Exit, } impl Mode { @@ -189,7 +190,7 @@ impl Mode { match self { Mode::NonSync | Mode::Sync => return true, - Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => + Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::Exit => return false, } } @@ -239,9 +240,10 @@ impl CompPDL { pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { sched_ctx.log(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&message) { - let target = sched_ctx.runtime.get_component_public(new_target); - target.inbox.push(message); - + let mut target = sched_ctx.runtime.get_component_public(new_target); + target.send_message(sched_ctx, message, true); + let _should_remove = target.decrement_users(); + debug_assert!(!_should_remove); return; } @@ -258,6 +260,10 @@ impl CompPDL { } } + // ------------------------------------------------------------------------- + // Running component and handling changes in global component state + // ------------------------------------------------------------------------- + pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; @@ -286,7 +292,7 @@ impl CompPDL { let port_index = comp_ctx.get_port_index(port_id).unwrap(); if let Some(message) = &self.inbox_main[port_index] { // Check if we can actually receive the message - if self.consensus.try_receive_data_message(message) { + if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // Message was received. Make sure any blocked peers and // pending messages are handled. let message = self.inbox_main[port_index].take().unwrap(); @@ -317,7 +323,7 @@ impl CompPDL { }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { - debug_assert_eq!(self.mode, Mode::NonSync); + self.handle_component_exit(sched_ctx, comp_ctx); return Ok(CompScheduling::Exit); }, EC::SyncBlockStart => { @@ -346,6 +352,8 @@ impl CompPDL { Value::Output(port_id_to_eval(channel.putter_id)), Value::Input(port_id_to_eval(channel.getter_id)) )); + self.inbox_main.push(None); + self.inbox_main.push(None); return Ok(CompScheduling::Immediate); } } @@ -370,20 +378,28 @@ impl CompPDL { } fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - self.consensus.notify_sync_end(); + self.consensus.notify_sync_end(sched_ctx, comp_ctx); debug_assert_eq!(self.mode, Mode::Sync); self.mode = Mode::SyncEnd; } - fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { - use std::sync::atomic::Ordering; + fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + debug_assert_eq!(self.mode, Mode::NonSync); // not a perfect assert, but just to remind myself: cannot exit while in sync + // Note: for now we have that the scheduler handles exiting. I don't + // know if that is a good idea, we'll see + self.mode = Mode::Exit; + } + + // ------------------------------------------------------------------------- + // Handling messages + // ------------------------------------------------------------------------- + + fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { let port_info = comp_ctx.get_port(source_port_id); let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.inbox.push(Message::Data(annotated_message)); - - wake_up_if_sleeping(sched_ctx, peer_info.id, &peer_info.handle); + peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true); } /// Handles a message that came in through the public inbox. This function @@ -420,8 +436,7 @@ impl CompPDL { debug_assert_eq!(_peer_comp_id, target_comp_id); let peer = comp_ctx.get_peer(target_comp_id); - peer.handle.inbox.push(Message::Control(block_message)); - wake_up_if_sleeping(sched_ctx, target_comp_id, &peer.handle); + peer.handle.send_message(sched_ctx, Message::Control(block_message), true); } // But we still need to remember the message, so: @@ -454,8 +469,7 @@ impl CompPDL { if port_info.state == PortState::Blocked { let (peer_comp_id, message) = self.control.set_port_and_peer_unblocked(port_id, comp_ctx); let peer_info = comp_ctx.get_peer(peer_comp_id); - peer_info.handle.inbox.push(Message::Control(message)); - wake_up_if_sleeping(sched_ctx, peer_comp_id, &peer_info.handle); + peer_info.handle.send_message(sched_ctx, Message::Control(message), true); } } @@ -469,8 +483,7 @@ impl CompPDL { AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => { // FIX @NoDirectHandle let handle = sched_ctx.runtime.get_component_public(target_comp); - handle.inbox.push(Message::Control(message)); - wake_up_if_sleeping(sched_ctx, target_comp, &handle); + handle.send_message(sched_ctx, Message::Control(message), true); to_ack = new_to_ack; }, AckAction::ScheduleComponent(to_schedule) => { @@ -537,6 +550,12 @@ impl CompPDL { if port_info.state == PortState::Open { port_info.state = PortState::Blocked; } + + let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); + // TODO: Continue here. Send ack, but think about whether we + // always have the peer in our list of peers? Quickly thinking + // about it, I think so, but we may have a series of port + // transfers. Does that change things? }, ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { debug_assert_eq!(message.target_port_id, Some(port_id)); @@ -548,10 +567,36 @@ impl CompPDL { } } - fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { + fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> Option { + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + let is_success = match decision { + SyncRoundDecision::None => { + // No decision yet + return None; + }, + SyncRoundDecision::Solution => true, + SyncRoundDecision::Failure => false, + }; + + // If here then we've reached a conclusion + debug_assert_eq!(self.mode, Mode::SyncEnd); + self.mode = Mode::NonSync; + + if is_success { + // We can simply continue executing. So we do nothing extra! + } else { + todo!("handle this better, show some kind of error"); + self.handle_component_exit(sched_ctx, comp_ctx); + return Some(CompScheduling::Exit); + } + return None; } + // ------------------------------------------------------------------------- + // Handling ports + // ------------------------------------------------------------------------- + /// Marks the local port as being unblocked. If the execution was blocked on /// sending a message over this port, then execution will continue and the /// message will be sent. @@ -590,18 +635,23 @@ impl CompPDL { let peer_port_id = port_info.peer_id; let port_info = creator_ctx.get_port_mut(peer_port_id); port_info.peer_comp_id = created_ctx.id; + Self::add_peer_associated_port_to_component(sched_ctx, creator_ctx, created_ctx.id); } else { - // We don't own the port, so send the appropriate messages and - // notify the control layer + // We don't own the peer port, so send the appropriate messages + // to the peer component and notify the control layer has_reroute_entry = true; let message = self.control.add_reroute_entry( creator_ctx.id, port_info.peer_id, port_info.peer_comp_id, port_info.self_id, created_ctx.id, schedule_entry_id ); let peer_info = creator_ctx.get_peer(port_info.peer_comp_id); - peer_info.handle.inbox.push(message); + peer_info.handle.send_message(sched_ctx, message, true); } + // Take out any potential messages for the peer + let creator_port_index = creator_ctx.get_port_index(port_id).unwrap(); + let port_main_message = self.inbox_main[creator_port_index].take(); + // Transfer port and create temporary reroute entry let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); if port_info.state == PortState::Blocked { @@ -609,6 +659,20 @@ impl CompPDL { } Self::add_port_to_component(sched_ctx, created_ctx, port_info); + // Transfer the taken messages + let created_port_index = created_ctx.get_port_index(port_id).unwrap(); + component.code.inbox_main[created_port_index] = port_main_message; + let mut message_index = 0; + while message_index < self.inbox_backup.len() { + if self.inbox_backup[message_index].data_header.target_port == port_id { + // Move this message + let message = self.inbox_backup.remove(message_index); + component.code.inbox_backup.push(message); + } else { + message_index += 1; + } + } + // Maybe remove peer from the creator if let Some(mut peer_info) = peer_info { let remove_from_runtime = peer_info.handle.decrement_users(); @@ -622,6 +686,7 @@ impl CompPDL { if !has_reroute_entry { // We can schedule the component immediately self.control.remove_schedule_entry(schedule_entry_id); + component.public.sleeping.store(false, std::sync::atomic::Ordering::Release); sched_ctx.runtime.enqueue_work(comp_key); } // else: wait for the `Ack`s, they will trigger the scheduling of the component } @@ -653,23 +718,29 @@ impl CompPDL { return (port_info, Some(peer_info)); } + /// Adds a port to the component context. The peer (or its counter) will be + /// updated accordingly. fn add_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_info: Port) { // Add the port info let peer_comp_id = port_info.peer_comp_id; debug_assert!(!comp_ctx.ports.iter().any(|v| v.self_id == port_info.self_id)); comp_ctx.ports.push(port_info); + Self::add_peer_associated_port_to_component(sched_ctx, comp_ctx, peer_comp_id); + } - // Increment counters on peer, or create entry for peer if it doesn't - // exist yet. - match comp_ctx.peers.iter().position(|v| v.id == peer_comp_id) { + /// Only adds/updates a peer for a given port. This function assumes (but + /// does not check!) that the port was not considered to belong to that peer + /// before calling this function. + fn add_peer_associated_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, peer_id: CompId) { + match comp_ctx.get_peer_index(peer_id) { Some(peer_index) => { let peer_info = &mut comp_ctx.peers[peer_index]; peer_info.num_associated_ports += 1; }, None => { - let handle = sched_ctx.runtime.get_component_public(peer_comp_id); + let handle = sched_ctx.runtime.get_component_public(peer_id); comp_ctx.peers.push(Peer{ - id: peer_comp_id, + id: peer_id, num_associated_ports: 1, handle, });