Changeset - 4a6883c04294
[Not reviewed]
0 2 0
mh - 3 years ago 2022-04-25 14:37:21
contact@maxhenger.nl
Fix bug related to checking for closed port
2 files changed with 17 insertions and 64 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -285,7 +285,6 @@ pub(crate) fn default_handle_incoming_data_message(
 
) -> IncomingData {
 
    let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    sched_ctx.log("DEBUG: Setting received_message_for_sync");
 
    comp_ctx.get_port_mut(port_handle).received_message_for_sync = true;
 
    let port_value_slot = &mut inbox_main[port_index];
 
    let target_port_id = incoming_message.data_header.target_port;
 
@@ -345,6 +344,18 @@ pub(crate) fn default_attempt_get(
 
    let port_handle = comp_ctx.get_port_handle(target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 

	
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = target_port_instruction;
 

	
 
    let port_is_closed = port_info.state == PortState::Closed;
 
    if port_is_closed {
 
        let peer_id = port_info.peer_comp_id;
 
        return GetResult::Error((
 
            target_port_instruction,
 
            format!("Cannot get from this port, as the peer component (id:{}) closed the port", peer_id.0)
 
        ));
 
    }
 

	
 
    if let Some(message) = &inbox_main[port_index] {
 
        if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
            // We're allowed to receive this message
 
@@ -369,19 +380,8 @@ pub(crate) fn default_attempt_get(
 
            )));
 
        }
 
    } else {
 
        // We don't have a message waiting for us.
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        port_info.last_instruction = target_port_instruction;
 
        let port_is_closed = port_info.state == PortState::Closed;
 
        if port_is_closed {
 
            let peer_id = port_info.peer_comp_id;
 
            return GetResult::Error((
 
                target_port_instruction,
 
                format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0)
 
            ));
 
        }
 

	
 
        // No error ocurred, so enter the BlockedGet state
 
        // We don't have a message waiting for us and the port is not blocked.
 
        // So enter the BlockedGet state
 
        exec_state.set_as_blocked_get(target_port);
 
        return GetResult::NoMessage;
 
    }
 
@@ -403,14 +403,8 @@ pub(crate) fn default_handle_received_data_message(
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 

	
 
    if port_info.state == PortState::Closed {
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot 'get' because the channel is closed"))
 
        );
 
    }
 
    debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller
 
    debug_assert_ne!(port_info.state, PortState::Closed); // checked by caller
 

	
 
    // Check if there are any more messages in the backup buffer
 
    let port_info = comp_ctx.get_port(port_handle);
 
@@ -502,8 +496,6 @@ pub(crate) fn default_handle_control_message(
 
                // now it is in the `BlockedGet` state.
 
                let port_was_used = last_instruction != PortInstruction::None;
 

	
 
                sched_ctx.log(&format!("DEBUG: last_instruction = {:?}, mode = {:?}, was_used = {}, has_had_message = {}", last_instruction, exec_state.mode, port_was_used, port_has_had_message));
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message;
 
@@ -609,7 +601,7 @@ pub(crate) fn default_handle_start_exit(
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    sched_ctx.log("Component starting exit");
 
    sched_ctx.log(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason));
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -481,45 +481,6 @@ impl CompPDL {
 
        return Ok(step_result)
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component starting sync mode");
 
        self.consensus.notify_sync_start(comp_ctx);
 
        for message in self.inbox_main.iter() {
 
            if let Some(message) = message {
 
                self.consensus.handle_incoming_data_message(comp_ctx, message);
 
            }
 
        }
 
        debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
        self.exec_state.mode = CompMode::Sync;
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log(&format!("Component exiting (reason: {:?}", self.exec_state.exit_reason));
 
        debug_assert_eq!(self.exec_state.mode, CompMode::StartExit);
 
        self.exec_state.mode = CompMode::BusyExit;
 
        let exit_inside_sync = self.exec_state.exit_reason.is_in_sync();
 

	
 
        // Doing this by index, then retrieving the handle is a bit rediculous,
 
        // but Rust is being Rust with its borrowing rules.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port = comp_ctx.get_port_by_index_mut(port_index);
 
            if port.state == PortState::Closed {
 
                // Already closed, or in the process of being closed
 
                continue;
 
            }
 

	
 
            // Mark as closed
 
            let port_id = port.self_id;
 
            port.state = PortState::Closed;
 

	
 
            // Notify peer of closing
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling messages
 
    // -------------------------------------------------------------------------
0 comments (0 inline, 0 general)