diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index f9f3ab3f1df69fe514ced318d8eb388adec96ea9..764963a2e79f5b41c55d91bbf8ffe7123ad18ae4 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -226,7 +226,11 @@ impl SolutionCombiner { let putter = channel.putter.as_ref().unwrap(); let getter = channel.getter.as_ref().unwrap(); - return Some(putter.mapping == getter.mapping); + return Some( + !putter.failed && + !getter.failed && + putter.mapping == getter.mapping + ); } /// Determines the global solution if all components have contributed their @@ -313,35 +317,25 @@ impl Consensus { /// Notifies the consensus management that the PDL code has reached the end /// of a sync block. A local solution will be submitted, after which we wait /// until the participants in the round (hopefully) reach a conclusion. - pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { + pub(crate) fn notify_sync_end_success(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { debug_assert_eq!(self.mode, Mode::SyncBusy); self.mode = Mode::SyncAwaitingSolution; - // Submit our port mapping as a solution - let mut local_solution = Vec::with_capacity(self.ports.len()); - for port in &self.ports { - if let Some(mapping) = port.mapping { - let port_handle = comp_ctx.get_port_handle(port.self_port_id); - let port_info = comp_ctx.get_port(port_handle); - let new_entry = match port_info.kind { - PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{ - self_comp_id: comp_ctx.id, - self_port_id: port_info.self_id, - mapping - }), - PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{ - self_comp_id: comp_ctx.id, - self_port_id: port_info.self_id, - peer_comp_id: port.peer_comp_id, - peer_port_id: port.peer_port_id, - mapping - }) - }; - local_solution.push(new_entry); - } - } + let local_solution = self.generate_local_solution(comp_ctx, false); + let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, false); + return decision; + } + + /// Notifies the consensus management that the component has encountered a + /// critical error during the synchronous round. Hence we should report that + /// we've failed and wait until all the participants have been notified of + /// the error. + pub(crate) fn notify_sync_end_failure(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { + debug_assert_eq!(self.mode, Mode::SyncBusy); + self.mode = Mode::SyncAwaitingSolution; - let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution); + let local_solution = self.generate_local_solution(comp_ctx, true); + let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, true); return decision; } @@ -380,7 +374,6 @@ impl Consensus { /// is used to determine peers of `get`ter ports. // TODO: The use of this function is rather ugly. Find a more robust // scheme about owners of `get`ter ports not knowing about their peers. - // (also, figure out why this was written again, I forgot). pub(crate) fn handle_incoming_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) { let target_handle = comp_ctx.get_port_handle(message.data_header.target_port); let target_index = comp_ctx.get_port_index(target_handle); @@ -464,7 +457,7 @@ impl Consensus { return SyncRoundDecision::None; }, SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => { - return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution); + return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution, false); }, SyncMessageContent::PartialSolution(partial_solution) => { return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution); @@ -507,7 +500,7 @@ impl Consensus { sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::NotificationOfLeader, }; - peer.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true); + peer.handle.send_message_logged(sched_ctx, Message::Sync(message), true); } self.forward_partial_solution(sched_ctx, comp_ctx); @@ -519,7 +512,7 @@ impl Consensus { }; let peer_handle = comp_ctx.get_peer_handle(header.sending_id); let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true); + peer_info.handle.send_message_logged(sched_ctx, Message::Sync(message), true); } // else: exactly equal } @@ -558,12 +551,18 @@ impl Consensus { })); } - fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> SyncRoundDecision { + fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution, fail_if_empty: bool) -> SyncRoundDecision { if self.highest_id == comp_ctx.id { // We are the leader self.solution.combine_with_local_solution(solution_sender_id, solution); - let round_decision = self.solution.get_decision(); + let mut round_decision = self.solution.get_decision(); if round_decision != SyncRoundDecision::None { + if fail_if_empty && self.solution.matched_channels == 0 { + // TODO: Not sure about this, bit of a hack. Situation is that a component + // cannot interact with other components, but it is in a sync round, and has + // failed that sync round. + round_decision = SyncRoundDecision::Failure; + } self.broadcast_decision(sched_ctx, comp_ctx, round_decision); } return round_decision; @@ -625,16 +624,16 @@ impl Consensus { sync_header: self.create_sync_header(comp_ctx), content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure }, }); - handle.send_message(&sched_ctx.runtime, message, true); + handle.send_message_logged(sched_ctx, message, true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); } } fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) { - debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader + debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader, // TODO: @NoDirectHandle let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id); - leader_info.send_message(&sched_ctx.runtime, message, true); + leader_info.send_message_logged(sched_ctx, message, true); let should_remove = leader_info.decrement_users(); if let Some(key) = should_remove { sched_ctx.runtime.destroy_component(key); @@ -642,9 +641,38 @@ impl Consensus { } // ------------------------------------------------------------------------- - // Creating message headers + // Small utilities // ------------------------------------------------------------------------- + fn generate_local_solution(&self, comp_ctx: &CompCtx, failed: bool) -> SyncLocalSolution { + let mut local_solution = Vec::with_capacity(self.ports.len()); + for port in &self.ports { + if let Some(mapping) = port.mapping { + let port_handle = comp_ctx.get_port_handle(port.self_port_id); + let port_info = comp_ctx.get_port(port_handle); + let new_entry = match port_info.kind { + PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{ + self_comp_id: comp_ctx.id, + self_port_id: port_info.self_id, + mapping, + failed + }), + PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{ + self_comp_id: comp_ctx.id, + self_port_id: port_info.self_id, + peer_comp_id: port.peer_comp_id, + peer_port_id: port.peer_port_id, + mapping, + failed + }) + }; + local_solution.push(new_entry); + } + } + + return local_solution; + } + fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader { let mut expected_mapping = Vec::with_capacity(self.ports.len()); let mut port_index = usize::MAX;