diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 881a7f6fc3400aded9a2d379dc1f08f2f2212613..8ce9cc65383a4572dec0b20619390671d6e85d25 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -226,7 +226,11 @@ impl SolutionCombiner { let putter = channel.putter.as_ref().unwrap(); let getter = channel.getter.as_ref().unwrap(); - return Some(putter.mapping == getter.mapping); + return Some( + !putter.failed && + !getter.failed && + putter.mapping == getter.mapping + ); } /// Determines the global solution if all components have contributed their @@ -313,34 +317,24 @@ impl Consensus { /// Notifies the consensus management that the PDL code has reached the end /// of a sync block. A local solution will be submitted, after which we wait /// until the participants in the round (hopefully) reach a conclusion. - pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { + pub(crate) fn notify_sync_end_success(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { debug_assert_eq!(self.mode, Mode::SyncBusy); self.mode = Mode::SyncAwaitingSolution; - // Submit our port mapping as a solution - let mut local_solution = Vec::with_capacity(self.ports.len()); - for port in &self.ports { - if let Some(mapping) = port.mapping { - let port_handle = comp_ctx.get_port_handle(port.self_port_id); - let port_info = comp_ctx.get_port(port_handle); - let new_entry = match port_info.kind { - PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{ - self_comp_id: comp_ctx.id, - self_port_id: port_info.self_id, - mapping - }), - PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{ - self_comp_id: comp_ctx.id, - self_port_id: port_info.self_id, - peer_comp_id: port.peer_comp_id, - peer_port_id: port.peer_port_id, - mapping - }) - }; - local_solution.push(new_entry); - } - } + let local_solution = self.generate_local_solution(comp_ctx, false); + let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution); + return decision; + } + + /// Notifies the consensus management that the component has encountered a + /// critical error during the synchronous round. Hence we should report that + /// we've failed and wait until all the participants have been notified of + /// the error. + pub(crate) fn notify_sync_end_failure(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision { + debug_assert_eq!(self.mode, Mode::SyncBusy); + self.mode = Mode::SyncAwaitingSolution; + let local_solution = self.generate_local_solution(comp_ctx, true); let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution); return decision; } @@ -380,7 +374,6 @@ impl Consensus { /// is used to determine peers of `get`ter ports. // TODO: The use of this function is rather ugly. Find a more robust // scheme about owners of `get`ter ports not knowing about their peers. - // (also, figure out why this was written again, I forgot). pub(crate) fn handle_incoming_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) { let target_handle = comp_ctx.get_port_handle(message.data_header.target_port); let target_index = comp_ctx.get_port_index(target_handle); @@ -642,9 +635,38 @@ impl Consensus { } // ------------------------------------------------------------------------- - // Creating message headers + // Small utilities // ------------------------------------------------------------------------- + fn generate_local_solution(&self, comp_ctx: &CompCtx, failed: bool) -> SyncLocalSolution { + let mut local_solution = Vec::with_capacity(self.ports.len()); + for port in &self.ports { + if let Some(mapping) = port.mapping { + let port_handle = comp_ctx.get_port_handle(port.self_port_id); + let port_info = comp_ctx.get_port(port_handle); + let new_entry = match port_info.kind { + PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{ + self_comp_id: comp_ctx.id, + self_port_id: port_info.self_id, + mapping, + failed + }), + PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{ + self_comp_id: comp_ctx.id, + self_port_id: port_info.self_id, + peer_comp_id: port.peer_comp_id, + peer_port_id: port.peer_port_id, + mapping, + failed + }) + }; + local_solution.push(new_entry); + } + } + + return local_solution; + } + fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader { let mut expected_mapping = Vec::with_capacity(self.ports.len()); let mut port_index = usize::MAX;