diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index cddd244fa3f52157eb6ff24e617dd061daf02408..e47f44029bfdc991f9b827a18734ad3971c3a7d9 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -285,6 +285,8 @@ pub(crate) fn default_handle_incoming_data_message( ) -> IncomingData { let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); + sched_ctx.log("DEBUG: Setting received_message_for_sync"); + comp_ctx.get_port_mut(port_handle).received_message_for_sync = true; let port_value_slot = &mut inbox_main[port_index]; let target_port_id = incoming_message.data_header.target_port; @@ -368,7 +370,8 @@ pub(crate) fn default_attempt_get( } } else { // We don't have a message waiting for us. - let port_info = comp_ctx.get_port(port_handle); + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = target_port_instruction; let port_is_closed = port_info.state == PortState::Closed; if port_is_closed { let peer_id = port_info.peer_comp_id; @@ -499,11 +502,13 @@ pub(crate) fn default_handle_control_message( // now it is in the `BlockedGet` state. let port_was_used = last_instruction != PortInstruction::None; + sched_ctx.log(&format!("DEBUG: last_instruction = {:?}, mode = {:?}, was_used = {}, has_had_message = {}", last_instruction, exec_state.mode, port_was_used, port_has_had_message)); + if exec_state.mode.is_in_sync_block() { let closed_during_sync_round = content.closed_in_sync_round && port_was_used; - // TODO: Finish this + let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message; - if closed_during_sync_round { + if closed_during_sync_round || closed_before_sync_round { return Err(( last_instruction, format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0) @@ -618,7 +623,7 @@ pub(crate) fn default_handle_start_exit( // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); - if port.state == PortState::Closed { + if port.state == PortState::Closed || port.close_at_sync_end { // Already closed, or in the process of being closed continue; } diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 8ce9cc65383a4572dec0b20619390671d6e85d25..b699a836149175797f2696598280be651d0389af 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -322,7 +322,7 @@ impl Consensus { self.mode = Mode::SyncAwaitingSolution; let local_solution = self.generate_local_solution(comp_ctx, false); - let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution); + let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, false); return decision; } @@ -335,7 +335,7 @@ impl Consensus { self.mode = Mode::SyncAwaitingSolution; let local_solution = self.generate_local_solution(comp_ctx, true); - let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution); + let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, true); return decision; } @@ -457,7 +457,7 @@ impl Consensus { return SyncRoundDecision::None; }, SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => { - return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution); + return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution, false); }, SyncMessageContent::PartialSolution(partial_solution) => { return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution); @@ -551,12 +551,18 @@ impl Consensus { })); } - fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> SyncRoundDecision { + fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution, fail_if_empty: bool) -> SyncRoundDecision { if self.highest_id == comp_ctx.id { // We are the leader self.solution.combine_with_local_solution(solution_sender_id, solution); - let round_decision = self.solution.get_decision(); + let mut round_decision = self.solution.get_decision(); if round_decision != SyncRoundDecision::None { + if fail_if_empty && self.solution.matched_channels == 0 { + // TODO: Not sure about this, bit of a hack. Situation is that a component + // cannot interact with other components, but it is in a sync round, and has + // failed that sync round. + round_decision = SyncRoundDecision::Failure; + } self.broadcast_decision(sched_ctx, comp_ctx, round_decision); } return round_decision;