From af328ac5eadf9ab2748cba7e186bdfeb1f13ec62 2022-01-26 15:06:00 From: mh Date: 2022-01-26 15:06:00 Subject: [PATCH] WIP: Only trust getter port mapping in consensus --- diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index f879550f7307f7dc5a0b96d908adab1299f2175e..27cd90aa4cbd3e9b6bd6740aa221d885b378eef5 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -36,12 +36,13 @@ pub enum PortState { Closed, } +#[derive(Debug)] pub struct Port { pub self_id: PortId, - pub peer_id: PortId, + pub peer_id: PortId, // eventually consistent pub kind: PortKind, pub state: PortState, - pub peer_comp_id: CompId, + pub peer_comp_id: CompId, // eventually consistent } pub struct Channel { @@ -122,7 +123,6 @@ pub enum SyncRoundDecision { #[derive(Debug)] pub struct SyncPartialSolution { pub channel_mapping: Vec, - pub matched_channels: usize, pub decision: SyncRoundDecision, } @@ -130,7 +130,6 @@ impl Default for SyncPartialSolution { fn default() -> Self { return Self{ channel_mapping: Vec::new(), - matched_channels: 0, decision: SyncRoundDecision::None, } } diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 05119e155abc9827abad7eaa7cbe7f6222af6a50..2a6a9283b31b053a5aef528984a648f10cdddac5 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -63,7 +63,7 @@ impl CompCtx { self_id: getter_id, peer_id: putter_id, kind: PortKind::Getter, - state: PortState::Closed, + state: PortState::Open, peer_comp_id: self.id, }); @@ -341,8 +341,8 @@ impl CompPDL { // Results that can be returned in sync mode EC::SyncBlockEnd => { debug_assert_eq!(self.mode, Mode::Sync); - self.handle_sync_end(sched_ctx, comp_ctx); - return Ok(CompScheduling::Immediate); + let scheduling = self.handle_sync_end(sched_ctx, comp_ctx); + return Ok(scheduling.unwrap_or(CompScheduling::Immediate)); }, EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); @@ -427,18 +427,44 @@ impl CompPDL { } fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + sched_ctx.log("Component starting sync mode"); self.consensus.notify_sync_start(comp_ctx); debug_assert_eq!(self.mode, Mode::NonSync); self.mode = Mode::Sync; } - fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - self.consensus.notify_sync_end(sched_ctx, comp_ctx); + fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Option { + sched_ctx.log("Component ending sync mode (now waiting for solution)"); + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.handle_sync_decision(sched_ctx, comp_ctx, decision) + } + + fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) -> Option { debug_assert_eq!(self.mode, Mode::Sync); - self.mode = Mode::SyncEnd; + let is_success = match decision { + SyncRoundDecision::None => { + // No decision yet + return None; + }, + SyncRoundDecision::Solution => true, + SyncRoundDecision::Failure => false, + }; + + // If here then we've reached a decision + if is_success { + self.mode = Mode::NonSync; + self.consensus.notify_sync_decision(decision); + return None; + } else { + todo!("handle this better, show some kind of error"); + self.mode = Mode::Exit; + self.handle_component_exit(sched_ctx, comp_ctx); + return Some(CompScheduling::Exit); + } } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + sched_ctx.log("Component exiting"); debug_assert_eq!(self.mode, Mode::NonSync); // not a perfect assert, but just to remind myself: cannot exit while in sync // Note: for now we have that the scheduler handles exiting. I don't @@ -637,8 +663,11 @@ impl CompPDL { ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { debug_assert_eq!(message.target_port_id, Some(port_id)); let port_info = comp_ctx.get_port_mut(port_id); + let old_peer_comp_id = port_info.peer_comp_id; debug_assert!(port_info.state == PortState::Blocked); port_info.peer_comp_id = new_comp_id; + comp_ctx.add_peer(sched_ctx, new_comp_id, None); + comp_ctx.remove_peer(sched_ctx, old_peer_comp_id); self.unblock_local_port(sched_ctx, comp_ctx, port_id); } } @@ -646,28 +675,7 @@ impl CompPDL { fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> Option { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - let is_success = match decision { - SyncRoundDecision::None => { - // No decision yet - return None; - }, - SyncRoundDecision::Solution => true, - SyncRoundDecision::Failure => false, - }; - - // If here then we've reached a conclusion - debug_assert_eq!(self.mode, Mode::SyncEnd); - self.mode = Mode::NonSync; - - if is_success { - // We can simply continue executing. So we do nothing extra! - } else { - todo!("handle this better, show some kind of error"); - self.handle_component_exit(sched_ctx, comp_ctx); - return Some(CompScheduling::Exit); - } - - return None; + return self.handle_sync_decision(sched_ctx, comp_ctx, decision); } // ------------------------------------------------------------------------- diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index f8db817c114619e1361b8edca40ca69049cbd55c..428f88e615e5459211b971730ded896652c8c5ce 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -6,13 +6,22 @@ use crate::runtime2::communication::*; use super::component_pdl::*; pub struct PortAnnotation { - id: PortId, + self_comp_id: CompId, + self_port_id: PortId, + peer_comp_id: CompId, // only valid for getter ports + peer_port_id: PortId, // only valid for getter ports mapping: Option, } impl PortAnnotation { - fn new(id: PortId) -> Self { - return Self{ id, mapping: None } + fn new(comp_id: CompId, port_id: PortId) -> Self { + return Self{ + self_comp_id: comp_id, + self_port_id: port_id, + peer_comp_id: CompId::new_invalid(), + peer_port_id: PortId::new_invalid(), + mapping: None + } } } @@ -25,14 +34,14 @@ enum Mode { struct SolutionCombiner { solution: SyncPartialSolution, - all_present: bool, // set if the `submissions_by` only contains (_, true) entries. + matched_channels: usize, } impl SolutionCombiner { fn new() -> Self { return Self { solution: SyncPartialSolution::default(), - all_present: false, + matched_channels: 0, } } @@ -44,7 +53,7 @@ impl SolutionCombiner { /// Returns a decision for the current round. If there is no decision (yet) /// then `RoundDecision::None` is returned. fn get_decision(&self) -> SyncRoundDecision { - if self.all_present { + if self.matched_channels == self.solution.channel_mapping.len() { debug_assert_ne!(self.solution.decision, SyncRoundDecision::None); return self.solution.decision; } @@ -56,83 +65,35 @@ impl SolutionCombiner { debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution); debug_assert_ne!(partial.decision, SyncRoundDecision::Solution); - // Combine our partial solution with the provided partial solution. - // This algorithm *could* allow overlap in the partial solutions, but - // in practice this means something is going wrong (a component stored - // a local solution *and* transmitted it to the leader, then later - // submitted its partial solution), hence we will do some debug asserts - // for now. - for new_entry in partial.channel_mapping { - let channel_index = if new_entry.putter.is_some() && new_entry.getter.is_some() { - // Channel is completely specified - debug_assert!( - self.find_channel_index_for_partial_entry(new_entry.putter.as_ref().unwrap()).is_none() && - self.find_channel_index_for_partial_entry(new_entry.getter.as_ref().unwrap()).is_none() - ); + if partial.decision == SyncRoundDecision::Failure { + self.solution.decision = SyncRoundDecision::Failure; + } + + for entry in partial.channel_mapping { + let channel_index = if entry.getter.is_some() && entry.putter.is_some() { let channel_index = self.solution.channel_mapping.len(); - self.solution.channel_mapping.push(new_entry); + self.solution.channel_mapping.push(entry); + self.matched_channels += 1; channel_index - } else if let Some(new_port) = new_entry.putter { - // Only putter is present in new entry - match self.find_channel_index_for_partial_entry(&new_port) { - Some(channel_index) => { - let entry = &mut self.solution.channel_mapping[channel_index]; - debug_assert!(entry.putter.is_none()); - entry.putter = Some(new_port); - - channel_index - }, - None => { - let channel_index = self.solution.channel_mapping.len(); - self.solution.channel_mapping.push(SyncSolutionChannel{ - putter: Some(new_port), - getter: None, - }); - - channel_index - } - } - } else if let Some(new_port) = new_entry.getter { - // Only getter is present in new entry - match self.find_channel_index_for_partial_entry(&new_port) { - Some(channel_index) => { - let entry = &mut self.solution.channel_mapping[channel_index]; - debug_assert!(entry.getter.is_none()); - entry.getter = Some(new_port); - - channel_index - }, - None => { - let channel_index = self.solution.channel_mapping.len(); - self.solution.channel_mapping.push(SyncSolutionChannel{ - putter: None, - getter: Some(new_port) - }); - - channel_index - } - } + } else if let Some(putter) = entry.putter { + self.combine_with_putter_port(putter) + } else if let Some(getter) = entry.getter { + self.combine_with_getter_port(getter) } else { - unreachable!() + unreachable!(); // both putter and getter are None }; - // Make sure the new entry is consistent let channel = &self.solution.channel_mapping[channel_index]; - if !Self::channel_is_consistent(channel) { - self.solution.decision = SyncRoundDecision::Failure; + if let Some(consistent) = Self::channel_is_consistent(channel) { + if !consistent { + self.solution.decision = SyncRoundDecision::Failure; + } + self.matched_channels += 1; } } - // Check to see if we have a global solution already - self.update_all_present(); - if self.all_present && self.solution.decision != SyncRoundDecision::Failure { - debug_assert_eq!(self.solution.decision, SyncRoundDecision::None); - dbg_code!(for entry in &self.solution.channel_mapping { - debug_assert!(entry.putter.is_some() && entry.getter.is_some()); - }); - self.solution.decision = SyncRoundDecision::Solution; - } + self.update_solution(); } /// Combines the currently stored global solution (if any) with the newly @@ -143,29 +104,28 @@ impl SolutionCombiner { // Combine partial solution with the local solution entries for entry in solution { - match entry { + // Match the current entry up with its peer endpoint, or add a new + // entry. + let channel_index = match entry { SyncLocalSolutionEntry::Putter(putter) => { - + self.combine_with_putter_port(putter) }, SyncLocalSolutionEntry::Getter(getter) => { + self.combine_with_getter_port(getter) + } + }; + // Check if channel is now consistent + let channel = &self.solution.channel_mapping[channel_index]; + if let Some(consistent) = Self::channel_is_consistent(channel) { + if !consistent { + self.solution.decision = SyncRoundDecision::Failure; } + self.matched_channels += 1; } } - if !had_new_entry { - self.update_all_present(); - if self.all_present && self.solution.decision != SyncRoundDecision::Failure { - // No new entries and every component is present. This implies that - // every component successfully added their local solutions to the - // global solution. Hence: we have a global solution - debug_assert_eq!(self.solution.decision, SyncRoundDecision::None); - dbg_code!(for entry in &self.solution.channel_mapping { - debug_assert!(entry.putter.is_some() && entry.getter.is_some()); - }); - self.solution.decision = SyncRoundDecision::Solution; - } - } + self.update_solution(); } /// Takes whatever partial solution is present in the solution combiner and @@ -175,34 +135,102 @@ impl SolutionCombiner { fn take_partial_solution(&mut self) -> SyncPartialSolution { let mut partial_solution = SyncPartialSolution::default(); std::mem::swap(&mut partial_solution, &mut self.solution); - self.all_present = false; + self.clear(); return partial_solution; } fn clear(&mut self) { - self.solution.submissions_by.clear(); self.solution.channel_mapping.clear(); self.solution.decision = SyncRoundDecision::None; + self.matched_channels = 0; } // --- Small utilities for combining solutions - fn update_all_present(&mut self) { - debug_assert!(!self.all_present); // upheld by caller - for (_, present) in self.solution.submissions_by.iter() { - if !*present { - return; - } + fn combine_with_putter_port(&mut self, putter: SyncSolutionPutterPort) -> usize { + let channel_index = self.get_channel_index_for_putter(putter.self_comp_id, putter.self_port_id); + if let Some(channel_index) = channel_index { + let channel = &mut self.solution.channel_mapping[channel_index]; + debug_assert!(channel.putter.is_none()); + channel.putter = Some(putter); + + return channel_index; + } else { + let channel_index = self.solution.channel_mapping.len(); + self.solution.channel_mapping.push(SyncSolutionChannel{ + putter: Some(putter), + getter: None, + }); + + return channel_index; } + } + + fn combine_with_getter_port(&mut self, getter: SyncSolutionGetterPort) -> usize { + let channel_index = self.get_channel_index_for_getter(getter.peer_comp_id, getter.peer_port_id); + if let Some(channel_index) = channel_index { + let channel = &mut self.solution.channel_mapping[channel_index]; + debug_assert!(channel.getter.is_none()); + channel.getter = Some(getter); + + return channel_index; + } else { + let channel_index = self.solution.channel_mapping.len(); + self.solution.channel_mapping.push(SyncSolutionChannel{ + putter: None, + getter: Some(getter) + }); - self.all_present = true; + return channel_index; + } } - /// Retrieve channel index based on a putter port + /// Retrieve index of the channel containing a getter port that has received + /// from the specified putter port. fn get_channel_index_for_putter(&self, putter_comp_id: CompId, putter_port_id: PortId) -> Option { - for entry in self.solution.channel_mapping.iter() { + for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() { + if let Some(getter) = &channel.getter { + if getter.peer_comp_id == putter_comp_id && getter.peer_port_id == putter_port_id { + return Some(channel_index); + } + } + } + + return None; + } + + /// Retrieve index of the channel for a getter port. To find this channel + /// the **peer** component/port IDs of the getter port are used. + fn get_channel_index_for_getter(&self, peer_comp_id: CompId, peer_port_id: PortId) -> Option { + for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() { + if let Some(putter) = &channel.putter { + if putter.self_comp_id == peer_comp_id && putter.self_port_id == peer_port_id { + return Some(channel_index); + } + } + } + + return None; + } + + fn channel_is_consistent(channel: &SyncSolutionChannel) -> Option { + if channel.putter.is_none() || channel.getter.is_none() { + return None; + } + + let putter = channel.putter.as_ref().unwrap(); + let getter = channel.getter.as_ref().unwrap(); + return Some(putter.mapping == getter.mapping); + } + /// Determines the global solution if all components have contributed their + /// local solutions. + fn update_solution(&mut self) { + if self.matched_channels == self.solution.channel_mapping.len() { + if self.solution.decision != SyncRoundDecision::Failure { + self.solution.decision = SyncRoundDecision::Solution; + } } } } @@ -258,14 +286,22 @@ impl Consensus { let mut local_solution = Vec::with_capacity(self.ports.len()); for port in &self.ports { if let Some(mapping) = port.mapping { - let port_info = comp_ctx.get_port(port.id); - local_solution.push(SyncLocalSolutionEntry { - self_port_id: port.id, - peer_comp_id: port_info.peer_comp_id, - peer_port_id: port_info.peer_id, - mapping, - port_kind: port_info.kind, - }); + let port_info = comp_ctx.get_port(port.self_port_id); + let new_entry = match port_info.kind { + PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{ + self_comp_id: comp_ctx.id, + self_port_id: port_info.self_id, + mapping + }), + PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{ + self_comp_id: comp_ctx.id, + self_port_id: port_info.self_id, + peer_comp_id: port.peer_comp_id, + peer_port_id: port.peer_port_id, + mapping + }) + }; + local_solution.push(new_entry); } } @@ -296,7 +332,7 @@ impl Consensus { } else { for idx in 0..comp_ctx.ports.len() { let comp_port_id = comp_ctx.ports[idx].self_id; - let cons_port_id = self.ports[idx].id; + let cons_port_id = self.ports[idx].self_port_id; if comp_port_id != cons_port_id { needs_setting_ports = true; break; @@ -308,7 +344,7 @@ impl Consensus { self.ports.clear(); self.ports.reserve(comp_ctx.ports.len()); for port in &comp_ctx.ports { - self.ports.push(PortAnnotation::new(port.self_id)) + self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id)) } } } @@ -319,7 +355,7 @@ impl Consensus { pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage { debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end - debug_assert!(self.ports.iter().any(|v| v.id == port_info.self_id)); + debug_assert!(self.ports.iter().any(|v| v.self_port_id == port_info.self_id)); let data_header = self.create_data_header_and_update_mapping(port_info); let sync_header = self.create_sync_header(comp_ctx); @@ -332,7 +368,7 @@ impl Consensus { /// received. pub(crate) fn try_receive_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: &DataMessage) -> bool { debug_assert_eq!(self.mode, Mode::SyncBusy); - debug_assert!(self.ports.iter().any(|v| v.id == message.data_header.target_port)); + debug_assert!(self.ports.iter().any(|v| v.self_port_id == message.data_header.target_port)); // Make sure the expected mapping matches the currently stored mapping for (expected_id, expected_annotation) in &message.data_header.expected_mapping { @@ -343,7 +379,7 @@ impl Consensus { } // Expected mapping matches current mapping, so we will receive the message - self.set_annotation(message.data_header.target_port, message.data_header.new_mapping); + self.set_annotation(message.sync_header.sending_id, &message.data_header); // Handle the sync header embedded within the data message self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header); @@ -369,7 +405,6 @@ impl Consensus { }, SyncMessageContent::GlobalSolution => { debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); // leader can only find global- if we submitted local solution - todo!("clear port mapping or something"); return SyncRoundDecision::Solution; }, SyncMessageContent::GlobalFailure => { @@ -410,7 +445,7 @@ impl Consensus { fn get_annotation(&self, port_id: PortId) -> Option { for annotation in self.ports.iter() { - if annotation.id == port_id { + if annotation.self_port_id == port_id { return annotation.mapping; } } @@ -419,10 +454,12 @@ impl Consensus { return None; } - fn set_annotation(&mut self, port_id: PortId, mapping: u32) { + fn set_annotation(&mut self, source_comp_id: CompId, data_header: &MessageDataHeader) { for annotation in self.ports.iter_mut() { - if annotation.id == port_id { - annotation.mapping = Some(mapping); + if annotation.self_port_id == data_header.target_port { + annotation.peer_comp_id = source_comp_id; + annotation.peer_port_id = data_header.source_port; + annotation.mapping = Some(data_header.new_mapping); } } } @@ -496,12 +533,19 @@ impl Consensus { SyncRoundDecision::Failure => false, }; - for (peer, _) in self.solution.solution.submissions_by.iter().copied() { - if peer == comp_ctx.id { - // Do not send to ourselves - continue; + let mut peers = Vec::with_capacity(self.solution.solution.channel_mapping.len()); // TODO: @Performance + + for channel in self.solution.solution.channel_mapping.iter() { + let getter = channel.getter.as_ref().unwrap(); + if getter.self_comp_id != comp_ctx.id && !peers.contains(&getter.self_comp_id) { + peers.push(getter.self_comp_id); + } + if getter.peer_comp_id != comp_ctx.id && !peers.contains(&getter.peer_comp_id) { + peers.push(getter.peer_comp_id); } + } + for peer in peers { let mut handle = sched_ctx.runtime.get_component_public(peer); let message = Message::Sync(SyncMessage{ sync_header: self.create_sync_header(comp_ctx), @@ -532,10 +576,10 @@ impl Consensus { let mut expected_mapping = Vec::with_capacity(self.ports.len()); let mut port_index = usize::MAX; for (index, port) in self.ports.iter().enumerate() { - if port.id == port_info.self_id { + if port.self_port_id == port_info.self_id { port_index = index; } - expected_mapping.push((port.id, port.mapping)); + expected_mapping.push((port.self_port_id, port.mapping)); } let new_mapping = self.take_mapping();