Changeset - 60057e2acf9e
[Not reviewed]
0 4 0
MH - 3 years ago 2022-04-19 18:16:33
contact@maxhenger.nl
WIP on error-handling implementation
4 files changed with 70 insertions and 11 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -176,11 +176,17 @@ pub enum ControlMessageContent {
 
    Ack,
 
    BlockPort(PortId),
 
    UnblockPort(PortId),
 
    ClosePort(PortId),
 
    ClosePort(ControlMessageClosePort),
 
    PortPeerChangedBlock(PortId),
 
    PortPeerChangedUnblock(PortId, CompId),
 
}
 

	
 
#[derive(Debug)]
 
pub struct ControlMessageClosePort {
 
    pub port_to_close: PortId, // ID of the receiving port
 
    pub closed_in_sync_round: bool, // needed to ensure correct handling of errors
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Messages (generic)
 
// -----------------------------------------------------------------------------
src/runtime2/component/component.rs
Show inline comments
 
@@ -155,18 +155,22 @@ pub(crate) fn create_component(
 
pub(crate) fn default_send_data_message(
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup,
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx
 
) -> CompScheduling {
 
) -> Result<CompScheduling, String> { // @nocommit: Something better than Err(String)
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 

	
 
    // TODO: Handle closed ports
 
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 
    if port_info.state.is_blocked() {
 

	
 
    if port_info.state == PortState::Closed {
 
        // Note: normally peer is eventually consistent, but if it has shut down
 
        // then we can be sure it is consistent (I think?)
 
        return Err(format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0))
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put(transmitting_port_id, value);
 

	
 
        return CompScheduling::Sleep;
 
        return Ok(CompScheduling::Sleep);
 
    } else {
 
        // Port is not blocked, so send to the peer
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
@@ -174,7 +178,7 @@ pub(crate) fn default_send_data_message(
 
        let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true);
 

	
 
        return CompScheduling::Immediate;
 
        return Ok(CompScheduling::Immediate);
 
    }
 
}
 

	
 
@@ -279,7 +283,7 @@ pub(crate) fn default_handle_received_data_message(
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) {
 
) -> Result<(), String> { // @nocommit, use something else than Err(String)
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(control, message.id, sched_ctx, comp_ctx);
 
@@ -295,22 +299,55 @@ pub(crate) fn default_handle_control_message(
 
                comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            }
 
        },
 
        ControlMessageContent::ClosePort(port_id) => {
 
        ControlMessageContent::ClosePort(content) => {
 
            // Request to close the port. We immediately comply and remove
 
            // the component handle as well
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
            let port_handle = comp_ctx.get_port_handle(content.port_id);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // We're closing the port, so we will always update the peer of the
 
            // port (in case of error messages)
 
            port_info.peer_comp_id = message.sender_comp_id;
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time.
 
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
 
            } else {
 
                // Respond to the message
 
                let last_instruction = port_info.last_instruction;
 
                let port_was_used = last_instruction != PortInstruction::None;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
 
                comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed
 

	
 
                // Make sure that we've not reached an error condition. Note
 
                // that if this condition is not met, then we don't error out
 
                // now, but we may error out in the next sync block when we
 
                // try to `put`/`get` on the port. This condition makes sure
 
                // that if we have a successful sync round, followed by the peer
 
                // closing the port, that we don't consider the sync round to
 
                // have failed by mistake.
 
                if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used {
 
                    let error_message = match last_instruction {
 
                        PortInstruction::None => unreachable!(), // port was used
 
                        PortInstruction::NoSource => format!(
 
                            "Peer component (id:{}) shut down, so operation on port cannot have succeeded",
 
                            message.sender_comp_id.0
 
                        ),
 
                        PortInstruction::SourceLocation(source_location) => format!(
 
                            "Peer component (id:{}) shut down, so this operation cannot have succeeded",
 
                            message.sender_comp_id.0
 
                        ),
 
                    };
 

	
 
                    return Err(error_message);
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort(port_id) => {
 
@@ -351,6 +388,8 @@ pub(crate) fn default_handle_control_message(
 
            default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -2,6 +2,16 @@ use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use crate::protocol::ExpressionId;
 

	
 
/// Helper struct to remember when the last operation on the port took place
 
#[derive(Debug, PartialEq, Copy, Clone)]
 
pub enum PortInstruction {
 
    None,
 
    NoSource,
 
    SourceLocation(ExpressionId),
 
}
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    pub self_id: PortId,
 
@@ -9,6 +19,7 @@ pub struct Port {
 
    pub peer_port_id: PortId, // eventually consistent
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub last_instruction: PortInstruction,
 
    #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool,
 
}
 

	
 
@@ -58,6 +69,7 @@ impl CompCtx {
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            last_instruction: PortInstruction::None,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
@@ -66,6 +78,7 @@ impl CompCtx {
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            last_instruction: PortInstruction::None,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 

	
 
@@ -77,6 +90,7 @@ impl CompCtx {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            last_instruction: PortInstruction::None,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -632,7 +632,7 @@ impl Consensus {
 
    }
 

	
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader, // TODO: @NoDirectHandle
 
        let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.send_message(&sched_ctx.runtime, message, true);
 
        let should_remove = leader_info.decrement_users();
0 comments (0 inline, 0 general)