From 6555f56a22a9ae2a1adcfeeba0574d658741ec02 2022-01-21 17:20:57 From: mh Date: 2022-01-21 17:20:57 Subject: [PATCH] WIP: First sync test, partially correct --- diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 1f5cbaca826bf937a6facc7a952e517388a04e74..0d2fd6149906e06134d68780b528a22d5a1101b9 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -78,6 +78,7 @@ pub struct SyncMessage { pub content: SyncMessageContent, } +#[derive(Debug)] pub struct SyncLocalSolutionEntry { pub self_port_id: PortId, pub peer_comp_id: CompId, @@ -88,6 +89,7 @@ pub struct SyncLocalSolutionEntry { pub type SyncLocalSolution = Vec; +#[derive(Debug)] pub struct SyncSolutionPort { pub self_comp_id: CompId, pub self_port_id: PortId, @@ -97,29 +99,37 @@ pub struct SyncSolutionPort { pub port_kind: PortKind, } +#[derive(Debug)] pub struct SyncSolutionChannel { pub putter: Option, pub getter: Option, } -#[derive(Copy, Clone)] -pub enum RoundDecision { +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum SyncRoundDecision { None, Solution, Failure, } -impl RoundDecision { - fn is_some(&self) -} - +#[derive(Debug)] pub struct SyncPartialSolution { pub submissions_by: Vec<(CompId, bool)>, pub channel_mapping: Vec, pub decision: SyncRoundDecision, } -#[derive(Debug, Clone)] +impl Default for SyncPartialSolution { + fn default() -> Self { + return Self{ + submissions_by: Vec::new(), + channel_mapping: Vec::new(), + decision: SyncRoundDecision::None, + } + } +} + +#[derive(Debug)] pub enum SyncMessageContent { NotificationOfLeader, LocalSolution(CompId, SyncLocalSolution), // local solution of the specified component diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index c8691e1acf8b09a2c770d95d69fb4dfdcaf2d9d3..03dd5523e71f6d15f47ad885cc4fd19d9ebe235c 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -182,6 +182,7 @@ pub(crate) enum Mode { SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block BlockedGet, BlockedPut, + Exit, } impl Mode { @@ -189,7 +190,7 @@ impl Mode { match self { Mode::NonSync | Mode::Sync => return true, - Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => + Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::Exit => return false, } } @@ -239,9 +240,10 @@ impl CompPDL { pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { sched_ctx.log(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&message) { - let target = sched_ctx.runtime.get_component_public(new_target); - target.inbox.push(message); - + let mut target = sched_ctx.runtime.get_component_public(new_target); + target.send_message(sched_ctx, message, true); + let _should_remove = target.decrement_users(); + debug_assert!(!_should_remove); return; } @@ -258,6 +260,10 @@ impl CompPDL { } } + // ------------------------------------------------------------------------- + // Running component and handling changes in global component state + // ------------------------------------------------------------------------- + pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; @@ -286,7 +292,7 @@ impl CompPDL { let port_index = comp_ctx.get_port_index(port_id).unwrap(); if let Some(message) = &self.inbox_main[port_index] { // Check if we can actually receive the message - if self.consensus.try_receive_data_message(message) { + if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { // Message was received. Make sure any blocked peers and // pending messages are handled. let message = self.inbox_main[port_index].take().unwrap(); @@ -317,7 +323,7 @@ impl CompPDL { }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { - debug_assert_eq!(self.mode, Mode::NonSync); + self.handle_component_exit(sched_ctx, comp_ctx); return Ok(CompScheduling::Exit); }, EC::SyncBlockStart => { @@ -346,6 +352,8 @@ impl CompPDL { Value::Output(port_id_to_eval(channel.putter_id)), Value::Input(port_id_to_eval(channel.getter_id)) )); + self.inbox_main.push(None); + self.inbox_main.push(None); return Ok(CompScheduling::Immediate); } } @@ -370,20 +378,28 @@ impl CompPDL { } fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - self.consensus.notify_sync_end(); + self.consensus.notify_sync_end(sched_ctx, comp_ctx); debug_assert_eq!(self.mode, Mode::Sync); self.mode = Mode::SyncEnd; } - fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { - use std::sync::atomic::Ordering; + fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + debug_assert_eq!(self.mode, Mode::NonSync); // not a perfect assert, but just to remind myself: cannot exit while in sync + // Note: for now we have that the scheduler handles exiting. I don't + // know if that is a good idea, we'll see + self.mode = Mode::Exit; + } + + // ------------------------------------------------------------------------- + // Handling messages + // ------------------------------------------------------------------------- + + fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_id: PortId, value: ValueGroup) { let port_info = comp_ctx.get_port(source_port_id); let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.inbox.push(Message::Data(annotated_message)); - - wake_up_if_sleeping(sched_ctx, peer_info.id, &peer_info.handle); + peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true); } /// Handles a message that came in through the public inbox. This function @@ -420,8 +436,7 @@ impl CompPDL { debug_assert_eq!(_peer_comp_id, target_comp_id); let peer = comp_ctx.get_peer(target_comp_id); - peer.handle.inbox.push(Message::Control(block_message)); - wake_up_if_sleeping(sched_ctx, target_comp_id, &peer.handle); + peer.handle.send_message(sched_ctx, Message::Control(block_message), true); } // But we still need to remember the message, so: @@ -454,8 +469,7 @@ impl CompPDL { if port_info.state == PortState::Blocked { let (peer_comp_id, message) = self.control.set_port_and_peer_unblocked(port_id, comp_ctx); let peer_info = comp_ctx.get_peer(peer_comp_id); - peer_info.handle.inbox.push(Message::Control(message)); - wake_up_if_sleeping(sched_ctx, peer_comp_id, &peer_info.handle); + peer_info.handle.send_message(sched_ctx, Message::Control(message), true); } } @@ -469,8 +483,7 @@ impl CompPDL { AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => { // FIX @NoDirectHandle let handle = sched_ctx.runtime.get_component_public(target_comp); - handle.inbox.push(Message::Control(message)); - wake_up_if_sleeping(sched_ctx, target_comp, &handle); + handle.send_message(sched_ctx, Message::Control(message), true); to_ack = new_to_ack; }, AckAction::ScheduleComponent(to_schedule) => { @@ -537,6 +550,12 @@ impl CompPDL { if port_info.state == PortState::Open { port_info.state = PortState::Blocked; } + + let peer_info = comp_ctx.get_peer(port_info.peer_comp_id); + // TODO: Continue here. Send ack, but think about whether we + // always have the peer in our list of peers? Quickly thinking + // about it, I think so, but we may have a series of port + // transfers. Does that change things? }, ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { debug_assert_eq!(message.target_port_id, Some(port_id)); @@ -548,10 +567,36 @@ impl CompPDL { } } - fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { + fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> Option { + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + let is_success = match decision { + SyncRoundDecision::None => { + // No decision yet + return None; + }, + SyncRoundDecision::Solution => true, + SyncRoundDecision::Failure => false, + }; + + // If here then we've reached a conclusion + debug_assert_eq!(self.mode, Mode::SyncEnd); + self.mode = Mode::NonSync; + + if is_success { + // We can simply continue executing. So we do nothing extra! + } else { + todo!("handle this better, show some kind of error"); + self.handle_component_exit(sched_ctx, comp_ctx); + return Some(CompScheduling::Exit); + } + return None; } + // ------------------------------------------------------------------------- + // Handling ports + // ------------------------------------------------------------------------- + /// Marks the local port as being unblocked. If the execution was blocked on /// sending a message over this port, then execution will continue and the /// message will be sent. @@ -590,18 +635,23 @@ impl CompPDL { let peer_port_id = port_info.peer_id; let port_info = creator_ctx.get_port_mut(peer_port_id); port_info.peer_comp_id = created_ctx.id; + Self::add_peer_associated_port_to_component(sched_ctx, creator_ctx, created_ctx.id); } else { - // We don't own the port, so send the appropriate messages and - // notify the control layer + // We don't own the peer port, so send the appropriate messages + // to the peer component and notify the control layer has_reroute_entry = true; let message = self.control.add_reroute_entry( creator_ctx.id, port_info.peer_id, port_info.peer_comp_id, port_info.self_id, created_ctx.id, schedule_entry_id ); let peer_info = creator_ctx.get_peer(port_info.peer_comp_id); - peer_info.handle.inbox.push(message); + peer_info.handle.send_message(sched_ctx, message, true); } + // Take out any potential messages for the peer + let creator_port_index = creator_ctx.get_port_index(port_id).unwrap(); + let port_main_message = self.inbox_main[creator_port_index].take(); + // Transfer port and create temporary reroute entry let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); if port_info.state == PortState::Blocked { @@ -609,6 +659,20 @@ impl CompPDL { } Self::add_port_to_component(sched_ctx, created_ctx, port_info); + // Transfer the taken messages + let created_port_index = created_ctx.get_port_index(port_id).unwrap(); + component.code.inbox_main[created_port_index] = port_main_message; + let mut message_index = 0; + while message_index < self.inbox_backup.len() { + if self.inbox_backup[message_index].data_header.target_port == port_id { + // Move this message + let message = self.inbox_backup.remove(message_index); + component.code.inbox_backup.push(message); + } else { + message_index += 1; + } + } + // Maybe remove peer from the creator if let Some(mut peer_info) = peer_info { let remove_from_runtime = peer_info.handle.decrement_users(); @@ -622,6 +686,7 @@ impl CompPDL { if !has_reroute_entry { // We can schedule the component immediately self.control.remove_schedule_entry(schedule_entry_id); + component.public.sleeping.store(false, std::sync::atomic::Ordering::Release); sched_ctx.runtime.enqueue_work(comp_key); } // else: wait for the `Ack`s, they will trigger the scheduling of the component } @@ -653,23 +718,29 @@ impl CompPDL { return (port_info, Some(peer_info)); } + /// Adds a port to the component context. The peer (or its counter) will be + /// updated accordingly. fn add_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_info: Port) { // Add the port info let peer_comp_id = port_info.peer_comp_id; debug_assert!(!comp_ctx.ports.iter().any(|v| v.self_id == port_info.self_id)); comp_ctx.ports.push(port_info); + Self::add_peer_associated_port_to_component(sched_ctx, comp_ctx, peer_comp_id); + } - // Increment counters on peer, or create entry for peer if it doesn't - // exist yet. - match comp_ctx.peers.iter().position(|v| v.id == peer_comp_id) { + /// Only adds/updates a peer for a given port. This function assumes (but + /// does not check!) that the port was not considered to belong to that peer + /// before calling this function. + fn add_peer_associated_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, peer_id: CompId) { + match comp_ctx.get_peer_index(peer_id) { Some(peer_index) => { let peer_info = &mut comp_ctx.peers[peer_index]; peer_info.num_associated_ports += 1; }, None => { - let handle = sched_ctx.runtime.get_component_public(peer_comp_id); + let handle = sched_ctx.runtime.get_component_public(peer_id); comp_ctx.peers.push(Peer{ - id: peer_comp_id, + id: peer_id, num_associated_ports: 1, handle, }); diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index d989a63febc246bd6ed3d643c8cd6fa08b15d9d3..d30f5874a04afb8df60b09bb39b3ad8f0c4f08e7 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -2,7 +2,6 @@ use crate::protocol::eval::ValueGroup; use crate::runtime2::scheduler::*; use crate::runtime2::runtime::*; use crate::runtime2::communication::*; -use crate::runtime2::component::wake_up_if_sleeping; use super::component_pdl::*; @@ -17,7 +16,7 @@ impl PortAnnotation { } } -#[derive(Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq)] enum Mode { NonSync, SyncBusy, @@ -32,24 +31,25 @@ struct SolutionCombiner { impl SolutionCombiner { fn new() -> Self { return Self { - solution: SyncPartialSolution{ - submissions_by: Vec::new(), - channel_mapping: Vec::new(), - decision: RoundDecision::None, - }, + solution: SyncPartialSolution::default(), all_present: false, } } + #[inline] + fn has_contributions(&self) -> bool { + return !self.solution.submissions_by.is_empty(); + } + /// Returns a decision for the current round. If there is no decision (yet) /// then `RoundDecision::None` is returned. - fn get_decision(&self) -> RoundDecision { + fn get_decision(&self) -> SyncRoundDecision { if self.all_present { - debug_assert_ne!(self.solution.decision, RoundDecision::None); + debug_assert_ne!(self.solution.decision, SyncRoundDecision::None); return self.solution.decision; } - return RoundDecision::None; // even if failure: wait for everyone. + return SyncRoundDecision::None; // even in case of failure: wait for everyone. } fn combine_with_partial_solution(&mut self, partial: SyncPartialSolution) { @@ -58,8 +58,8 @@ impl SolutionCombiner { self.mark_single_component_submission(comp_id, present); } - debug_assert_ne!(self.solution.decision, RoundDecision::Solution); - debug_assert_ne!(partial.decision, RoundDecision::Solution); + debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution); + debug_assert_ne!(partial.decision, SyncRoundDecision::Solution); // Combine our partial solution with the provided partial solution. // This algorithm *could* allow overlap in the partial solutions, but @@ -125,18 +125,18 @@ impl SolutionCombiner { // Make sure the new entry is consistent let channel = &self.solution.channel_mapping[channel_index]; if !Self::channel_is_consistent(channel) { - self.solution.decision = RoundDecision::Failure; + self.solution.decision = SyncRoundDecision::Failure; } } // Check to see if we have a global solution already self.update_all_present(); - if self.all_present && self.solution.decision != RoundDecision::Failure { - debug_assert_eq!(self.solution.decision, RoundDecision::None); + if self.all_present && self.solution.decision != SyncRoundDecision::Failure { + debug_assert_eq!(self.solution.decision, SyncRoundDecision::None); dbg_code!(for entry in &self.solution.channel_mapping { debug_assert!(entry.putter.is_some() && entry.getter.is_some()); }); - self.solution.decision = RoundDecision::Solution; + self.solution.decision = SyncRoundDecision::Solution; } } @@ -151,19 +151,20 @@ impl SolutionCombiner { self.mark_single_component_submission(entry.peer_comp_id, false); } - debug_assert_ne!(self.solution.decision, RoundDecision::Solution); + debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution); // Go through all entries and check if the submitted local solution is // consistent with our partial solution let mut had_new_entry = false; for entry in solution.iter() { let preexisting_index = self.find_channel_index_for_local_entry(comp_id, entry); - let new_port = SolutionPort{ + let new_port = SyncSolutionPort{ self_comp_id: comp_id, self_port_id: entry.self_port_id, peer_comp_id: entry.peer_comp_id, peer_port_id: entry.peer_port_id, mapping: entry.mapping, + port_kind: entry.port_kind, }; match preexisting_index { @@ -172,26 +173,28 @@ impl SolutionCombiner { // the global solution. We'll handle any mismatches along // the way. let channel = &mut self.solution.channel_mapping[entry_index]; - if entry.is_putter { - // Getter should be present in existing entry - debug_assert!(channel.getter.is_some() && channel.putter.is_none()); - channel.putter = Some(new_port); - } else { - // Putter should be present in existing entry - debug_assert!(channel.putter.is_some() && channel.getter.is_none()); - channel.getter = Some(new_port); - }; + match entry.port_kind { + PortKind::Putter => { + // Getter should be present in existing entry + debug_assert!(channel.getter.is_some() && channel.putter.is_none()); + channel.putter = Some(new_port); + }, + PortKind::Getter => { + // Putter should be present in existing entry + debug_assert!(channel.putter.is_some() && channel.getter.is_none()); + channel.getter = Some(new_port); + } + } if !Self::channel_is_consistent(channel) { - self.solution.decision = RoundDecision::Failure; + self.solution.decision = SyncRoundDecision::Failure; } }, None => { // No entry yet. So add it - let new_solution = if entry.is_putter { - SolutionChannel{ putter: Some(new_port), getter: None } - } else { - SolutionChannel{ putter: None, getter: Some(new_port) } + let new_solution = match entry.port_kind { + PortKind::Putter => SyncSolutionChannel{ putter: Some(new_port), getter: None }, + PortKind::Getter => SyncSolutionChannel{ putter: None, getter: Some(new_port) }, }; self.solution.channel_mapping.push(new_solution); had_new_entry = true; @@ -201,19 +204,39 @@ impl SolutionCombiner { if !had_new_entry { self.update_all_present(); - if self.all_present && self.solution.decision != RoundDecision::Failure { + if self.all_present && self.solution.decision != SyncRoundDecision::Failure { // No new entries and every component is present. This implies that // every component successfully added their local solutions to the // global solution. Hence: we have a global solution - debug_assert_eq!(self.solution.decision, RoundDecision::None); + debug_assert_eq!(self.solution.decision, SyncRoundDecision::None); dbg_code!(for entry in &self.solution.channel_mapping { debug_assert!(entry.putter.is_some() && entry.getter.is_some()); }); - self.solution.decision = RoundDecision::Solution; + self.solution.decision = SyncRoundDecision::Solution; } } } + /// Takes whatever partial solution is present in the solution combiner and + /// returns it. The solution combiner's solution will end up being empty. + /// This is used when a new leader is found and we need to pass along our + /// partial results. + fn take_partial_solution(&mut self) -> SyncPartialSolution { + let mut partial_solution = SyncPartialSolution::default(); + std::mem::swap(&mut partial_solution, &mut self.solution); + self.all_present = false; + + return partial_solution; + } + + fn clear(&mut self) { + self.solution.submissions_by.clear(); + self.solution.channel_mapping.clear(); + self.solution.decision = SyncRoundDecision::None; + } + + // --- Small utilities for combining solutions + fn mark_single_component_submission(&mut self, comp_id: CompId, will_contribute: bool) { debug_assert!(!will_contribute || !self.solution.submissions_by.iter().any(|(id, val)| *id == comp_id && *val)); // if submitting a solution, then we do not expect an existing entry for (entry, has_contributed) in self.solution.submissions_by.iter_mut() { @@ -290,17 +313,20 @@ impl SolutionCombiner { // port B is part of its channel, but port B may consider port A not // to be part of its channel. Before merging the entries (outside of // this function) we'll make sure this is not the case. - if new_entry.is_putter { - // Expect getter to be present - if let Some(cur_entry) = &cur_entry.getter { - if might_belong_to_same_channel(cur_entry, comp_id, new_entry) { - return Some(entry_index); + match new_entry.port_kind { + PortKind::Putter => { + // Expect getter to be present + if let Some(cur_entry) = &cur_entry.getter { + if might_belong_to_same_channel(cur_entry, comp_id, new_entry) { + return Some(entry_index); + } } - } - } else { - if let Some(cur_entry) = &cur_entry.putter { - if might_belong_to_same_channel(cur_entry, comp_id, new_entry) { - return Some(entry_index); + }, + PortKind::Getter => { + if let Some(cur_entry) = &cur_entry.putter { + if might_belong_to_same_channel(cur_entry, comp_id, new_entry) { + return Some(entry_index); + } } } } @@ -362,6 +388,8 @@ impl Consensus { // Managing sync state // ------------------------------------------------------------------------- + /// Notifies the consensus management that the PDL code has reached the + /// start of a sync block. pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) { debug_assert_eq!(self.mode, Mode::NonSync); self.highest_id = comp_ctx.id; @@ -370,7 +398,10 @@ impl Consensus { self.make_ports_consistent_with_ctx(comp_ctx); } - pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> RoundDecision { + /// Notifies the consensus management that the PDL code has reached the end + /// of a sync block. A local solution will be submitted, after which we wait + /// until the participants in the round (hopefully) reach a conclusion. + pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { debug_assert_eq!(self.mode, Mode::SyncBusy); self.mode = Mode::SyncAwaitingSolution; @@ -393,6 +424,22 @@ impl Consensus { return decision; } + /// Notifies that a decision has been reached. Note that the caller should + /// still take the appropriate actions based on the decision it is supplying + /// to the consensus layer. + pub(crate) fn notify_sync_decision(&mut self, _decision: SyncRoundDecision) { + // Reset everything for the next round + debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); + self.mode = Mode::NonSync; + self.round_index = self.round_index.wrapping_add(1); + + for port in self.ports.iter_mut() { + port.mapping = None; + } + + self.solution.clear(); + } + fn make_ports_consistent_with_ctx(&mut self, comp_ctx: &CompCtx) { let mut needs_setting_ports = false; if comp_ctx.ports.len() != self.ports.len() { @@ -423,7 +470,7 @@ impl Consensus { pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage { debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end - debug_assert!(self.round.ports.iter().any(|v| v.id == port_info.self_id)); + debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id)); let data_header = self.create_data_header_and_update_mapping(port_info); let sync_header = self.create_sync_header(comp_ctx); @@ -436,12 +483,12 @@ impl Consensus { /// received. pub(crate) fn try_receive_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: &DataMessage) -> bool { debug_assert_eq!(self.mode, Mode::SyncBusy); - debug_assert!(self.round.ports.iter().any(|v| v.id == message.data_header.target_port)); + debug_assert!(self.ports.iter().any(|v| v.id == message.data_header.target_port)); // Make sure the expected mapping matches the currently stored mapping for (expected_id, expected_annotation) in &message.data_header.expected_mapping { let got_annotation = self.get_annotation(*expected_id); - if got_annotation != expected_annotation { + if got_annotation != *expected_annotation { return false; } } @@ -456,35 +503,38 @@ impl Consensus { } /// Receives the sync message and updates the consensus state appropriately. - pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> RoundDecision { + pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> SyncRoundDecision { // Whatever happens: handle the sync header (possibly changing the // currently registered leader) self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header); match message.content { SyncMessageContent::NotificationOfLeader => { - return RoundDecision::None; + return SyncRoundDecision::None; }, SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => { return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution); }, SyncMessageContent::PartialSolution(partial_solution) => { return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution); - } + }, SyncMessageContent::GlobalSolution => { - // Global solution has been found debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); // leader can only find global- if we submitted local solution todo!("clear port mapping or something"); - return RoundDecision::Solution; + return SyncRoundDecision::Solution; }, + SyncMessageContent::GlobalFailure => { + debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); + return SyncRoundDecision::Failure; + } } } fn handle_sync_header(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, header: &MessageSyncHeader) { - if header.highest_id > self.round.highest_id { + if header.highest_id.0 > self.highest_id.0 { // Sender knows of someone with a higher ID. So store highest ID, // notify all peers, and forward local solutions - self.round.highest_id = header.highest_id; + self.highest_id = header.highest_id; for peer in &comp_ctx.peers { if peer.id == header.sending_id { continue; @@ -494,20 +544,18 @@ impl Consensus { sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::NotificationOfLeader, }; - peer.handle.inbox.push(Message::Sync(message)); - wake_up_if_sleeping(sched_ctx, peer.id, &peer.handle); + peer.handle.send_message(sched_ctx, Message::Sync(message), true); } - self.forward_local_solutions(sched_ctx, comp_ctx); - } else if header.highest_id < self.round.highest_id { + self.forward_partial_solution(sched_ctx, comp_ctx); + } else if header.highest_id.0 < self.highest_id.0 { // Sender has a lower ID, so notify it of our higher one let message = SyncMessage{ sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::NotificationOfLeader, }; let peer_info = comp_ctx.get_peer(header.sending_id); - peer_info.handle.inbox.push(Message::Sync(message)); - wake_up_if_sleeping(sched_ctx, peer_info.id, &peer_info.handle); + peer_info.handle.send_message(sched_ctx, Message::Sync(message), true); } // else: exactly equal } @@ -534,47 +582,30 @@ impl Consensus { // Leader-related methods // ------------------------------------------------------------------------- - fn forward_local_solutions(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - todo!("implement") + fn forward_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + debug_assert_ne!(self.highest_id, comp_ctx.id); // not leader + + // Make sure that we have something to send + if !self.solution.has_contributions() { + return; + } + + // Swap the container with the partial solution and then send it along + let partial_solution = self.solution.take_partial_solution(); + self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(SyncMessage{ + sync_header: self.create_sync_header(comp_ctx), + content: SyncMessageContent::PartialSolution(partial_solution), + })); } - fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> RoundDecision { + fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> SyncRoundDecision { if self.highest_id == comp_ctx.id { // We are the leader self.solution.combine_with_local_solution(solution_sender_id, solution); let round_decision = self.solution.get_decision(); - let decision_is_solution = match round_decision { - RoundDecision::None => { - // No solution yet - return RoundDecision::None; - }, - RoundDecision::Solution => true, - RoundDecision::Failure => false, - }; - - // If here then we've reached a decision, broadcast it - for (peer_id, _is_present) in self.solution.solution.submissions_by.iter().copied() { - debug_assert!(_is_present); - if peer_id == comp_ctx.id { - // Do not send the result to ourselves - continue; - } - - let mut handle = sched_ctx.runtime.get_component_public(peer_id); - handle.inbox.push(Message::Sync(SyncMessage{ - sync_header: self.create_sync_header(comp_ctx), - content: if decision_is_solution { - SyncMessageContent::GlobalSolution - } else { - SyncMessageContent::GlobalFailure - }, - })); - wake_up_if_sleeping(sched_ctx, peer_id, &handle); - - let _should_remove = handle.decrement_users(); - debug_assert!(!_should_remove); + if round_decision != SyncRoundDecision::None { + self.broadcast_decision(sched_ctx, comp_ctx, round_decision); } - return round_decision; } else { // Forward the solution @@ -583,18 +614,19 @@ impl Consensus { content: SyncMessageContent::LocalSolution(solution_sender_id, solution), }; self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message)); - return RoundDecision::None; + return SyncRoundDecision::None; } } - fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> RoundDecision { + fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> SyncRoundDecision { if self.highest_id == comp_ctx.id { // We are the leader, combine existing and new solution self.solution.combine_with_partial_solution(solution); let round_decision = self.solution.get_decision(); - - - return RoundDecision::None; + if round_decision != SyncRoundDecision::None { + self.broadcast_decision(sched_ctx, comp_ctx, round_decision); + } + return round_decision; } else { // Forward the partial solution let message = SyncMessage{ @@ -602,15 +634,40 @@ impl Consensus { content: SyncMessageContent::PartialSolution(solution), }; self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message)); - return RoundDecision::None; + return SyncRoundDecision::None; + } + } + + fn broadcast_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, decision: SyncRoundDecision) { + debug_assert_eq!(self.highest_id, comp_ctx.id); + + let is_success = match decision { + SyncRoundDecision::None => unreachable!(), + SyncRoundDecision::Solution => true, + SyncRoundDecision::Failure => false, + }; + + for (peer, _) in self.solution.solution.submissions_by.iter().copied() { + if peer == comp_ctx.id { + // Do not send to ourselves + continue; + } + + let mut handle = sched_ctx.runtime.get_component_public(peer); + let message = Message::Sync(SyncMessage{ + sync_header: self.create_sync_header(comp_ctx), + content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure }, + }); + handle.send_message(sched_ctx, message, true); + let _should_remove = handle.decrement_users(); + debug_assert!(!_should_remove); } } fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) { debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader let leader_info = sched_ctx.runtime.get_component_public(self.highest_id); - leader_info.inbox.push(message); - wake_up_if_sleeping(sched_ctx, self.highest_id, &leader_info); + leader_info.send_message(sched_ctx, message, true); } // ------------------------------------------------------------------------- @@ -620,15 +677,15 @@ impl Consensus { fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader { let mut expected_mapping = Vec::with_capacity(self.ports.len()); let mut port_index = usize::MAX; - for (index, port) in self.round.ports.iter().enumerate() { + for (index, port) in self.ports.iter().enumerate() { if port.id == port_info.self_id { port_index = index; } - expected_mapping.push((port.id, Some(mapping))); + expected_mapping.push((port.id, port.mapping)); } let new_mapping = self.take_mapping(); - self.round.ports[port_index].mapping = Some(new_mapping); + self.ports[port_index].mapping = Some(new_mapping); debug_assert_eq!(port_info.kind, PortKind::Putter); return MessageDataHeader{ expected_mapping, @@ -638,14 +695,16 @@ impl Consensus { }; } + #[inline] fn create_sync_header(&self, comp_ctx: &CompCtx) -> MessageSyncHeader { return MessageSyncHeader{ - sync_round: self.round.index, + sync_round: self.round_index, sending_id: comp_ctx.id, highest_id: self.highest_id, }; } + #[inline] fn take_mapping(&mut self) -> u32 { let mapping = self.mapping_counter; self.mapping_counter = self.mapping_counter.wrapping_add(1); diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 9c93bc995682b519124f27f9cc381babc0f2c16a..0d12f51fae13996eeb669a122d263887ce7181b2 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -206,7 +206,7 @@ impl ControlLayer { port.state = PortState::Closed; if peer_comp_id == comp_ctx.id { - // We own the other end of the channel as well + // We own the other end of the channel as well. return None; } diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index bf6a9baed2d0680b331672d0fcfd60f5328d2cbf..24844a612a6856a7090dc72c3948aeae6e1626b8 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -3,11 +3,12 @@ use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; use std::collections::VecDeque; use crate::protocol::*; +use crate::runtime2::component::wake_up_if_sleeping; use super::communication::Message; use super::component::{CompCtx, CompPDL}; use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer}; -use super::scheduler::Scheduler; +use super::scheduler::*; // ----------------------------------------------------------------------------- // Component @@ -58,7 +59,7 @@ pub(crate) struct RuntimeComp { pub(crate) struct CompPublic { pub sleeping: AtomicBool, pub num_handles: AtomicU32, // manually modified (!) - pub inbox: QueueDynProducer, + inbox: QueueDynProducer, } /// Handle to public part of a component. Would be nice if we could @@ -67,19 +68,29 @@ pub(crate) struct CompPublic { /// code to make sure this actually happens. pub(crate) struct CompHandle { target: *const CompPublic, + id: CompId, // TODO: @Remove after debugging #[cfg(debug_assertions)] decremented: bool, } impl CompHandle { - fn new(public: &CompPublic) -> CompHandle { + fn new(id: CompId, public: &CompPublic) -> CompHandle { let handle = CompHandle{ target: public, + id, #[cfg(debug_assertions)] decremented: false, }; handle.increment_users(); return handle; } + pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) { + sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message)); + self.inbox.push(message); + if try_wake_up { + wake_up_if_sleeping(sched_ctx, self.id, self); + } + } + fn increment_users(&self) { let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel); debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed @@ -100,6 +111,7 @@ impl Clone for CompHandle { self.increment_users(); return CompHandle{ target: self.target, + id: self.id, #[cfg(debug_assertions)] decremented: false, }; } @@ -230,7 +242,7 @@ impl RuntimeInner { pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle { let component = self.components.get(id.0); - return CompHandle::new(&component.public); + return CompHandle::new(id, &component.public); } pub(crate) fn destroy_component(&self, key: CompKey) { diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index cdc329f9892abb8c3a290a7528604c5773899328..c2b6c834d7720443944a418ba07dd0353de7499e 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -95,9 +95,7 @@ impl Scheduler { let port_info = &component.ctx.ports[port_index]; if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.self_id, &mut component.ctx) { let peer_info = component.ctx.get_peer(peer_id); - peer_info.handle.inbox.push(Message::Control(message)); - - wake_up_if_sleeping(sched_ctx, peer_id, &peer_info.handle); + peer_info.handle.send_message(sched_ctx, Message::Control(message), true); } } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 49a2f41a5183022d5f967d62d9efa27006c6e2e1..5523acfc876170b88d69435262ec2e3956e6fd19 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -17,7 +17,7 @@ fn test_component_creation() { let prompt = rt.inner.protocol.new_component(b"", b"nothing_at_all", ValueGroup::new_stack(Vec::new())) .expect("component creation"); let comp = CompPDL::new(prompt, 0); - let (key, _) = rt.inner.create_pdl_component(comp, true); + let (key, _) = rt.inner.create_pdl_component(comp, false); rt.inner.enqueue_work(key); } } @@ -46,6 +46,6 @@ fn test_component_communication() { let prompt = rt.inner.protocol.new_component(b"", b"constructor", ValueGroup::new_stack(Vec::new())) .expect("creation"); - let (key, _) = rt.inner.create_pdl_component(CompPDL::new(prompt, 0), true); + let (key, _) = rt.inner.create_pdl_component(CompPDL::new(prompt, 0), false); rt.inner.enqueue_work(key); } \ No newline at end of file