Changeset - 1e0c33498fac
[Not reviewed]
0 2 0
MH - 3 years ago 2022-04-22 13:02:36
contact@maxhenger.nl
Test and fix pre-sync ClosePort case
2 files changed with 20 insertions and 9 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -276,24 +276,26 @@ pub(crate) enum IncomingData {
 
/// different from PDL code performing a `get` on a port; this is the case where
 
/// the message first arrives at the component.
 
// NOTE: This is supposed to be a somewhat temporary implementation. It would be
 
//  nicest if the sending component can figure out it cannot send any more data.
 
#[must_use]
 
pub(crate) fn default_handle_incoming_data_message(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMain,
 
    comp_ctx: &mut CompCtx, incoming_message: DataMessage,
 
    sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> IncomingData {
 
    let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    sched_ctx.log("DEBUG: Setting received_message_for_sync");
 
    comp_ctx.get_port_mut(port_handle).received_message_for_sync = true;
 
    let port_value_slot = &mut inbox_main[port_index];
 
    let target_port_id = incoming_message.data_header.target_port;
 

	
 
    if port_value_slot.is_none() {
 
        // We can put the value in the slot
 
        *port_value_slot = Some(incoming_message);
 

	
 
        // Check if we're blocked on receiving this message.
 
        dbg_code!({
 
            // Our port cannot have been blocked itself, because we're able to
 
            // directly insert the message into its slot.
 
            assert!(!comp_ctx.get_port(port_handle).state.is_blocked());
 
@@ -359,25 +361,26 @@ pub(crate) fn default_attempt_get(
 
                Err(location_and_message) => return GetResult::Error(location_and_message)
 
            }
 
        } else {
 
            // We're not allowed to receive this message. This means that the
 
            // receiver is attempting to receive something out of order with
 
            // respect to the sender.
 
            return GetResult::Error((target_port_instruction, String::from(
 
                "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s"
 
            )));
 
        }
 
    } else {
 
        // We don't have a message waiting for us.
 
        let port_info = comp_ctx.get_port(port_handle);
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        port_info.last_instruction = target_port_instruction;
 
        let port_is_closed = port_info.state == PortState::Closed;
 
        if port_is_closed {
 
            let peer_id = port_info.peer_comp_id;
 
            return GetResult::Error((
 
                target_port_instruction,
 
                format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0)
 
            ));
 
        }
 

	
 
        // No error ocurred, so enter the BlockedGet state
 
        exec_state.set_as_blocked_get(target_port);
 
        return GetResult::NoMessage;
 
@@ -490,29 +493,31 @@ pub(crate) fn default_handle_control_message(
 
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
 

	
 
                // Handle any possible error conditions (which boil down to: the
 
                // port has been used, but the peer has died). If not in sync
 
                // mode then we close the port immediately.
 

	
 
                // Note that `port_was_used` does not mean that any messages
 
                // were actually received. It might also mean that e.g. the
 
                // component attempted a `get`, but there were no messages, so
 
                // now it is in the `BlockedGet` state.
 
                let port_was_used = last_instruction != PortInstruction::None;
 

	
 
                sched_ctx.log(&format!("DEBUG: last_instruction = {:?}, mode = {:?}, was_used = {}, has_had_message = {}", last_instruction, exec_state.mode, port_was_used, port_has_had_message));
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    // TODO: Finish this
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message;
 

	
 
                    if closed_during_sync_round {
 
                    if closed_during_sync_round || closed_before_sync_round {
 
                        return Err((
 
                            last_instruction,
 
                            format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0)
 
                        ));
 
                    }
 
                } else {
 
                    comp_ctx.set_port_state(port_handle, PortState::Closed);
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort(port_id) => {
 
            // We were previously blocked (or already closed)
 
@@ -609,25 +614,25 @@ pub(crate) fn default_handle_start_exit(
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
 
    // If exiting while inside sync mode, report to the leader of the current
 
    // round that we've failed.
 
    if exit_inside_sync {
 
        let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx);
 
        default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
    }
 

	
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state == PortState::Closed {
 
        if port.state == PortState::Closed || port.close_at_sync_end {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 

	
 
        // Mark as closed
 
        let port_id = port.self_id;
 
        port.state = PortState::Closed;
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -313,38 +313,38 @@ impl Consensus {
 
            }
 
        }
 
    }
 

	
 
    /// Notifies the consensus management that the PDL code has reached the end
 
    /// of a sync block. A local solution will be submitted, after which we wait
 
    /// until the participants in the round (hopefully) reach a conclusion.
 
    pub(crate) fn notify_sync_end_success(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        let local_solution = self.generate_local_solution(comp_ctx, false);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, false);
 
        return decision;
 
    }
 

	
 
    /// Notifies the consensus management that the component has encountered a
 
    /// critical error during the synchronous round. Hence we should report that
 
    /// we've failed and wait until all the participants have been notified of
 
    /// the error.
 
    pub(crate) fn notify_sync_end_failure(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        let local_solution = self.generate_local_solution(comp_ctx, true);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, true);
 
        return decision;
 
    }
 

	
 
    /// Notifies that a decision has been reached. Note that the caller should
 
    /// still take the appropriate actions based on the decision it is supplying
 
    /// to the consensus layer.
 
    pub(crate) fn notify_sync_decision(&mut self, _decision: SyncRoundDecision) {
 
        // Reset everything for the next round
 
        debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
        self.mode = Mode::NonSync;
 
        self.round_index = self.round_index.wrapping_add(1);
 

	
 
@@ -448,25 +448,25 @@ impl Consensus {
 

	
 
    /// Receives the sync message and updates the consensus state appropriately.
 
    pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> SyncRoundDecision {
 
        // Whatever happens: handle the sync header (possibly changing the
 
        // currently registered leader)
 
        self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header);
 

	
 
        match message.content {
 
            SyncMessageContent::NotificationOfLeader => {
 
                return SyncRoundDecision::None;
 
            },
 
            SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => {
 
                return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution);
 
                return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution, false);
 
            },
 
            SyncMessageContent::PartialSolution(partial_solution) => {
 
                return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution);
 
            },
 
            SyncMessageContent::GlobalSolution => {
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); // leader can only find global- if we submitted local solution
 
                return SyncRoundDecision::Solution;
 
            },
 
            SyncMessageContent::GlobalFailure => {
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
                return SyncRoundDecision::Failure;
 
            }
 
@@ -542,30 +542,36 @@ impl Consensus {
 
        if !self.solution.has_contributions() {
 
            return;
 
        }
 

	
 
        // Swap the container with the partial solution and then send it along
 
        let partial_solution = self.solution.take_partial_solution();
 
        self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(SyncMessage{
 
            sync_header: self.create_sync_header(comp_ctx),
 
            content: SyncMessageContent::PartialSolution(partial_solution),
 
        }));
 
    }
 

	
 
    fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> SyncRoundDecision {
 
    fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution, fail_if_empty: bool) -> SyncRoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader
 
            self.solution.combine_with_local_solution(solution_sender_id, solution);
 
            let round_decision = self.solution.get_decision();
 
            let mut round_decision = self.solution.get_decision();
 
            if round_decision != SyncRoundDecision::None {
 
                if fail_if_empty && self.solution.matched_channels == 0 {
 
                    // TODO: Not sure about this, bit of a hack. Situation is that a component
 
                    //  cannot interact with other components, but it is in a sync round, and has
 
                    //  failed that sync round.
 
                    round_decision = SyncRoundDecision::Failure;
 
                }
 
                self.broadcast_decision(sched_ctx, comp_ctx, round_decision);
 
            }
 
            return round_decision;
 
        } else {
 
            // Forward the solution
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::LocalSolution(solution_sender_id, solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return SyncRoundDecision::None;
 
        }
0 comments (0 inline, 0 general)