Changeset - a4ae7002404f
[Not reviewed]
0 5 0
MH - 3 years ago 2022-04-21 16:55:59
contact@maxhenger.nl
Handle post-ClosePort errors
5 files changed with 74 insertions and 15 deletions:
0 comments (0 inline, 0 general)
src/protocol/eval/executor.rs
Show inline comments
 
@@ -206,7 +206,7 @@ pub enum EvalContinuation {
 
    BlockGet(ExpressionId, PortId),
 
    Put(ExpressionId, PortId, ValueGroup),
 
    SelectStart(u32, u32), // (num_cases, num_ports_total)
 
    SelectRegisterPort(u32, u32, PortId), // (case_index, port_index_in_case, port_id)
 
    SelectRegisterPort(ExpressionId, u32, u32, PortId), // (call_expr_id, case_index, port_index_in_case, port_id)
 
    SelectWait, // wait until select can continue
 
    // Returned only in non-sync mode
 
    ComponentTerminated,
 
@@ -757,7 +757,7 @@ impl Prompt {
 
                                    let port_index = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_uint32();
 
                                    let port_value = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_port_id();
 

	
 
                                    return Ok(EvalContinuation::SelectRegisterPort(case_index, port_index, port_value));
 
                                    return Ok(EvalContinuation::SelectRegisterPort(expr_id, case_index, port_index, port_value));
 
                                },
 
                                Method::SelectWait => {
 
                                    match ctx.performed_select_wait() {
src/protocol/parser/pass_rewriting.rs
Show inline comments
 
@@ -217,7 +217,7 @@ impl Visitor for PassRewriting {
 
                num_ports_expression_id.upcast()
 
            ];
 

	
 
            let call_expression_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectStart, &mut self.expression_buffer, arguments);
 
            let call_expression_id = create_ast_call_expr(ctx, InputSpan::new(), self.current_procedure_id, Method::SelectStart, &mut self.expression_buffer, arguments);
 
            let call_statement_id = create_ast_expression_stmt(ctx, call_expression_id.upcast());
 

	
 
            transformed_stmts.push(call_statement_id.upcast());
 
@@ -233,6 +233,9 @@ impl Visitor for PassRewriting {
 

	
 
                for case_port_index in 0..case_num_ports {
 
                    // Arguments to runtime call
 
                    let original_get_call_id = case.involved_ports[case_port_index].0;
 
                    let original_get_call_span = ctx.heap[original_get_call_id].full_span;
 

	
 
                    let (port_variable_id, port_variable_type) = locals[total_port_index]; // so far this variable contains the temporary variables for the port expressions
 
                    let case_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_index as u64, ctx.arch.uint32_type_id);
 
                    let port_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_port_index as u64, ctx.arch.uint32_type_id);
 
@@ -244,7 +247,7 @@ impl Visitor for PassRewriting {
 
                    ];
 

	
 
                    // Create runtime call, then store it
 
                    let runtime_call_expr_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectRegisterCasePort, &mut self.expression_buffer, runtime_call_arguments);
 
                    let runtime_call_expr_id = create_ast_call_expr(ctx, original_get_call_span, self.current_procedure_id, Method::SelectRegisterCasePort, &mut self.expression_buffer, runtime_call_arguments);
 
                    let runtime_call_stmt_id = create_ast_expression_stmt(ctx, runtime_call_expr_id.upcast());
 

	
 
                    transformed_stmts.push(runtime_call_stmt_id.upcast());
 
@@ -261,7 +264,7 @@ impl Visitor for PassRewriting {
 
        locals.push((select_variable_id, select_variable_type));
 

	
 
        {
 
            let runtime_call_expr_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectWait, &mut self.expression_buffer, Vec::new());
 
            let runtime_call_expr_id = create_ast_call_expr(ctx, InputSpan::new(), self.current_procedure_id, Method::SelectWait, &mut self.expression_buffer, Vec::new());
 
            let variable_stmt_id = create_ast_variable_declaration_stmt(ctx, self.current_procedure_id, select_variable_id, select_variable_type, runtime_call_expr_id.upcast());
 
            transformed_stmts.push(variable_stmt_id.upcast().upcast());
 
        }
 
@@ -364,7 +367,12 @@ fn create_ast_variable_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDef
 
    });
 
}
 

	
 
fn create_ast_call_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId, method: Method, buffer: &mut ScopedBuffer<ExpressionId>, arguments: Vec<ExpressionId>) -> CallExpressionId {
 
/// Creates an AST call expression. The provided expression span is useful for
 
/// the cases where we perform compiler-builtin function calls that, when they
 
/// fail, should provide an error pointing at a specific point in the source
 
/// code. The `containing_procedure_id` is the procedure whose instructions are
 
/// going to contain this new call expression.
 
fn create_ast_call_expr(ctx: &mut Ctx, span: InputSpan, containing_procedure_id: ProcedureDefinitionId, method: Method, buffer: &mut ScopedBuffer<ExpressionId>, arguments: Vec<ExpressionId>) -> CallExpressionId {
 
    let call_type_id = match method {
 
        Method::SelectStart => ctx.arch.void_type_id,
 
        Method::SelectRegisterCasePort => ctx.arch.void_type_id,
 
@@ -377,7 +385,7 @@ fn create_ast_call_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinit
 
    let call_expression_id = ctx.heap.alloc_call_expression(|this| CallExpression{
 
        func_span: InputSpan::new(),
 
        this,
 
        full_span: InputSpan::new(),
 
        full_span: span,
 
        parser_type: ParserType{
 
            elements: Vec::new(),
 
            full_span: InputSpan::new(),
src/runtime2/component/component.rs
Show inline comments
 
@@ -398,6 +398,7 @@ pub(crate) fn default_handle_control_message(
 
            // port (in case of error messages)
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = message.sender_comp_id;
 
            port_info.close_at_sync_end = true; // might be redundant (we might set it closed now)
 

	
 
            let port_info = comp_ctx.get_port(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
@@ -425,7 +426,6 @@ pub(crate) fn default_handle_control_message(
 
                // that if we have a successful sync round, followed by the peer
 
                // closing the port, that we don't consider the sync round to
 
                // have failed by mistake.
 
                let error_due_to_port_use =  
 
                if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used {
 
                    return Err((
 
                        last_instruction,
 
@@ -546,7 +546,7 @@ pub(crate) fn default_handle_busy_exit(
 
/// 2. The component has encountered an error during a sync round and is
 
///     exiting, hence is waiting for a "Failure" message from the leader.
 
pub(crate) fn default_handle_sync_decision(
 
    sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState,
 
    sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx,
 
    decision: SyncRoundDecision, consensus: &mut Consensus
 
) -> Option<bool> {
 
    let success = match decision {
 
@@ -568,6 +568,12 @@ pub(crate) fn default_handle_sync_decision(
 
    if success {
 
        // We cannot get a success message if the component has encountered an
 
        // error.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            if port_info.close_at_sync_end {
 
                port_info.state = PortState::Closed;
 
            }
 
        }
 
        debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
        exec_state.mode = CompMode::NonSync;
 
        return Some(true);
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -19,7 +19,8 @@ pub struct Port {
 
    pub peer_port_id: PortId, // eventually consistent
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub last_instruction: PortInstruction,
 
    pub last_instruction: PortInstruction, // used during sync round in case port ends up being closed for error reporting
 
    pub close_at_sync_end: bool, // used during sync round when receiving a `ClosePort(not_in_sync_round)` message
 
    #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool,
 
}
 

	
 
@@ -70,6 +71,7 @@ impl CompCtx {
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
@@ -79,6 +81,7 @@ impl CompCtx {
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 

	
 
@@ -91,6 +94,7 @@ impl CompCtx {
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -318,7 +318,10 @@ impl Component for CompPDL {
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::SourceLocation(expr_id);
 

	
 
                let port_info = comp_ctx.get_port_mut(port_handle);
 
                port_info.last_instruction = PortInstruction::SourceLocation(expr_id);
 
                let port_is_closed = port_info.state == PortState::Closed;
 

	
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
@@ -340,9 +343,23 @@ impl Component for CompPDL {
 
                            return CompScheduling::Immediate;
 
                        }
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        let protocol = &sched_ctx.runtime.protocol;
 
                        self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr(
 
                            &self.prompt, &protocol.modules, &protocol.heap, expr_id,
 
                            String::from("Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s")
 
                        ));
 
                        return CompScheduling::Sleep;
 
                    }
 
                } else if port_is_closed {
 
                    // No messages, but getting makes no sense as the port is
 
                    // closed.
 
                    let peer_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
                    let protocol = &sched_ctx.runtime.protocol;
 
                    self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr(
 
                        &self.prompt, &protocol.modules, &protocol.heap, expr_id,
 
                        format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0)
 
                    )));
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // We need to wait
 
                    self.exec_state.set_as_blocked_get(port_id);
 
@@ -377,12 +394,36 @@ impl Component for CompPDL {
 
                self.select_state.handle_select_start(num_cases);
 
                return CompScheduling::Requeue;
 
            },
 
            EC::SelectRegisterPort(case_index, port_index, port_id) => {
 
            EC::SelectRegisterPort(expr_id, case_index, port_index, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 

	
 
                // Note: we register the "last_instruction" here already. This
 
                // way if we get a `ClosePort` message, the condition to fail
 
                // the synchronous round is satisfied.
 
                let port_info = comp_ctx.get_port_mut(port_handle);
 
                port_info.last_instruction = PortInstruction::SourceLocation(expr_id);
 
                let port_is_closed = port_info.state == PortState::Closed;
 

	
 
                // Register port as part of select guard
 
                if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) {
 
                    todo!("handle registering a port multiple times");
 
                    // Failure occurs if a port is used twice in the same guard
 
                    let protocol = &sched_ctx.runtime.protocol;
 
                    self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr(
 
                        &self.prompt, &protocol.modules, &protocol.heap, expr_id,
 
                        String::from("Cannot have the one port appear in the same guard twice")
 
                    )));
 
                } else if port_is_closed {
 
                    // Port is closed
 
                    let peer_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
                    let protocol = &sched_ctx.runtime.protocol;
 
                    self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr(
 
                        &self.prompt, &protocol.modules, &protocol.heap, expr_id,
 
                        format!("Cannot register port, as the peer component (id:{}) has shut down", peer_id.0)
 
                    )));
 
                }
 

	
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SelectWait => {
 
@@ -492,7 +533,7 @@ impl CompPDL {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
        self.exec_state.mode = CompMode::SyncEnd;
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
0 comments (0 inline, 0 general)