diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index b9980cbe237784ea95c05c76c986e651322ba3b3..7da2a179ad5808552f3503de09e651a7d5badf20 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -186,7 +186,12 @@ impl Consensus { target_port: port_info.peer_id, content: SyncPortContent::NotificationWave, }; - ctx.submit_message(Message::SyncPort(message)); + + // Note: submitting the message might fail. But we're attempting to + // handle the error anyway. + // TODO: Think about this a second time: how do we make sure the + // entire network will fail if we reach this condition + let _unused = ctx.submit_message(Message::SyncPort(message)); } return maybe_conclusion; @@ -265,7 +270,7 @@ impl Consensus { let channel_id = port_desc.channel_id; if !self.encountered_ports.contains(&self_port_id) { - ctx.submit_message(Message::SyncPort(SyncPortMessage { + let message = SyncPortMessage { sync_header: SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, @@ -274,8 +279,17 @@ impl Consensus { source_port: self_port_id, target_port: peer_port_id, content: SyncPortContent::SilentPortNotification, - })); - self.encountered_ports.push(self_port_id); + }; + match ctx.submit_message(Message::SyncPort(message)) { + Ok(_) => { + self.encountered_ports.push(self_port_id); + }, + Err(_) => { + // Seems like we were done with this branch, but one of + // the silent ports (in scope) is actually closed + return self.notify_of_fatal_branch(branch_id, ctx); + } + } } target_mapping.push(( @@ -469,7 +483,9 @@ impl Consensus { target_port: port_desc.peer_id, content: SyncPortContent::NotificationWave, }; - ctx.submit_message(Message::SyncPort(message)).unwrap(); + // As with the other SyncPort where we throw away the + // result: we're dealing with an error here anyway + let _unused = ctx.submit_message(Message::SyncPort(message)); } } } @@ -566,7 +582,7 @@ impl Consensus { target_component_id: peer.id, content: SyncCompContent::Notification, }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } // But also send our locally combined solution @@ -579,7 +595,7 @@ impl Consensus { target_component_id: sync_header.sending_component_id, content: SyncCompContent::Notification }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } // else: exactly equal, so do nothing return true; @@ -665,7 +681,7 @@ impl Consensus { target_component_id: self.highest_connector_id, content, }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } return None; @@ -690,7 +706,7 @@ impl Consensus { target_component_id: connector_id, content: SyncCompContent::GlobalSolution(global_solution.clone()), }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } debug_assert!(my_final_branch_id.is_valid()); @@ -716,7 +732,7 @@ impl Consensus { target_component_id: presence.added_by, content: SyncCompContent::GlobalFailure, }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } } } @@ -748,7 +764,7 @@ impl Consensus { target_component_id: self.highest_connector_id, content: SyncCompContent::PartialSolution(partial_solution), }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } } }