Changeset - a99ae23c30ec
[Not reviewed]
0 5 0
mh - 4 years ago 2021-11-18 11:36:29
contact@maxhenger.nl
WIP on consensus error handling
5 files changed with 130 insertions and 9 deletions:
0 comments (0 inline, 0 general)
src/protocol/eval/executor.rs
Show inline comments
 
@@ -1046,4 +1046,22 @@ impl Prompt {
 

	
 
        return_value
 
    }
 

	
 
    /// Constructs an error at the current expression that lives at the top of
 
    /// the expression stack. Falls back to constructing an error at the current
 
    /// statement if there is no expression.
 
    pub(crate) fn new_error_at_expr(&self, modules: &[Module], heap: &Heap, error_message: String) -> EvalError {
 
        let last_frame = self.frames.last().unwrap();
 
        for instruction in last_frame.expr_stack.iter().rev() {
 
            if let ExprInstruction::EvalExpr(expression_id) = instruction {
 
                return EvalError::new_error_at_expr(
 
                    self, modules, heap, *expression_id, error_message
 
                );
 
            }
 
        }
 

	
 
        // If here then expression stack was empty (cannot have just rotate
 
        // instructions)
 
        panic!("attempted to construct evaluation error without any expressions to evaluate in frame");
 
    }
 
}
 
\ No newline at end of file
src/runtime2/connector.rs
Show inline comments
 
@@ -56,6 +56,14 @@ impl ConnectorPublic {
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Copy)]
 
enum Mode {
 
    NonSync,    // running non-sync code
 
    Sync,       // running sync code (in potentially multiple branches)
 
    SyncError,  // encountered an unrecoverable error in sync mode
 
    Error,      // encountered an error in non-sync mode (or finished handling the sync mode error).
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,          // Run again, immediately
 
@@ -66,6 +74,8 @@ pub(crate) enum ConnectorScheduling {
 
}
 

	
 
pub(crate) struct ConnectorPDL {
 
    mode: Mode,
 
    eval_error: Option<EvalError>,
 
    tree: ExecTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
@@ -121,7 +131,8 @@ impl<'a> RunContext for ConnectorRunContext<'a>{
 
impl Connector for ConnectorPDL {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        self.handle_new_messages(comp_ctx);
 
        if self.tree.is_in_sync() {
 
        match self.mode {
 
            Mode::Sync => {
 
                // Run in sync mode
 
                let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 

	
 
@@ -131,10 +142,9 @@ impl Connector for ConnectorPDL {
 
                    iter_id = self.tree.get_queue_next(branch_id);
 
                    self.last_finished_handled = Some(branch_id);
 

	
 

	
 
                    if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                        // Actually found a solution
 
                    self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx);
 
                        self.enter_non_sync_mode(solution_branch_id, comp_ctx);
 
                        return ConnectorScheduling::Immediate;
 
                    }
 

	
 
@@ -142,9 +152,20 @@ impl Connector for ConnectorPDL {
 
                }
 

	
 
                return scheduling;
 
        } else {
 
            },
 
            Mode::NonSync => {
 
                let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
                return scheduling;
 
            },
 
            Mode::SyncError => {
 
                todo!("write");
 
                return ConnectorScheduling::Exit;
 
            },
 
            Mode::Error => {
 
                // This shouldn't really be called. Because when we reach exit
 
                // mode the scheduler should not run the component anymore
 
                unreachable!("called component run() during error-mode");
 
            },
 
        }
 
    }
 
}
 
@@ -152,6 +173,8 @@ impl Connector for ConnectorPDL {
 
impl ConnectorPDL {
 
    pub fn new(initial: Prompt) -> Self {
 
        Self{
 
            mode: Mode::NonSync,
 
            eval_error: None,
 
            tree: ExecTree::new(initial),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
@@ -203,7 +226,7 @@ impl ConnectorPDL {
 

	
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) {
 
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
 
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
 
            self.enter_non_sync_mode(solution_branch_id, ctx);
 
        }
 
    }
 

	
 
@@ -327,10 +350,18 @@ impl ConnectorPDL {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                comp_ctx.submit_message(Message::Data(DataMessage {
 
                if let Err(_) = comp_ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header, data_header,
 
                    content: DataContent::Message(content),
 
                }));
 
                })) {
 
                    // We don't own the port
 
                    let pd = &sched_ctx.runtime.protocol_description;
 
                    let eval_error = branch.code_state.new_error_at_expr(
 
                        &pd.modules, &pd.heap,
 
                        String::from("attempted to 'put' on port that is no longer owned")
 
                    );
 
                    self.mode = Mode::SyncError(eval_error);
 
                }
 

	
 
                branch.prepared = PreparedStatement::PerformedPut;
 
                self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
@@ -416,7 +447,12 @@ impl ConnectorPDL {
 
        }
 
    }
 

	
 
    pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) {
 
    /// Helper that moves the component's state back into non-sync mode, using
 
    /// the provided solution branch ID as the branch that should be comitted to
 
    /// memory.
 
    fn enter_non_sync_mode(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) {
 
        debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError);
 

	
 
        let mut fake_vec = Vec::new();
 
        self.tree.end_sync(solution_branch_id);
 
        self.consensus.end_sync(solution_branch_id, &mut fake_vec);
 
@@ -428,6 +464,23 @@ impl ConnectorPDL {
 

	
 
        ctx.notify_sync_end(&[]);
 
        self.last_finished_handled = None;
 
        self.eval_error = None; // in case we came from the SyncError mode
 
        self.mode = Mode::NonSync;
 
    }
 

	
 
    /// Helper that moves the component's state into sync-error mode, sending
 
    /// the appropriate errors where needed. The branch that caused the fatal
 
    /// error and the evaluation error should be provided.
 
    fn enter_sync_error_mode(&mut self, failing_branch_id: BranchId, ctx: &mut ComponentCtx, eval_error: EvalError) {
 
        debug_assert!(self.mode == Mode::Sync);
 
        debug_assert!(self.eval_error.is_none());
 
        self.mode = Mode::SyncError;
 
        self.eval_error = Some(eval_error);
 

	
 
        let failing_branch = &mut self.tree[failing_branch_id];
 
        failing_branch.sync_state = SpeculativeState::Error;
 

	
 

	
 
    }
 

	
 
    /// Runs the prompt repeatedly until some kind of execution-blocking
src/runtime2/consensus.rs
Show inline comments
 
@@ -144,6 +144,25 @@ impl Consensus {
 
        self.branch_markers.push(new_branch_id);
 
    }
 

	
 
    /// Notifies the consensus algorithm that a particular branch has
 
    /// encountered an unrecoverable error. If the return value is `false`, then
 
    /// the caller can enter a "normal" exit mode instead of the special "sync"
 
    /// exit mode.
 
    pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId) -> bool {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Check for trivial case, where branch has not yet communicated within
 
        // the consensus algorithm
 
        let branch = &self.branch_annotations[failed_branch_id.index as usize];
 
        if branch.port_mapping.iter().all(|v| v.registered_id.is_none()) {
 
            return false;
 
        }
 

	
 
        // Branch has communicated. Since we need to discover the entire
 

	
 
        return true;
 
    }
 

	
 
    /// Notifies the consensus algorithm that a branch has reached the end of
 
    /// the sync block. A final check for consistency will be performed that the
 
    /// caller has to handle. Note that
src/runtime2/inbox.rs
Show inline comments
 
@@ -127,6 +127,19 @@ pub(crate) enum Message {
 
    Control(ControlMessage),
 
}
 

	
 
impl Message {
 
    /// If the message is sent through a particular channel, then this function
 
    /// returns the port through which the message was sent.
 
    pub(crate) fn source_port(&self) -> Option<PortIdLocal> {
 
        // Currently only data messages have a source port
 
        if let Message::Data(message) = self {
 
            return Some(message.data_header.sending_port);
 
        } else {
 
            return None;
 
        }
 
    }
 
}
 

	
 
/// The public inbox of a connector. The thread running the connector that owns
 
/// this inbox may retrieved from it. Non-owning threads may only put new
 
/// messages inside of it.
src/runtime2/scheduler.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::protocol::eval::EvalError;
 

	
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortState, PortIdLocal};
 
@@ -393,6 +394,7 @@ pub(crate) struct ComponentCtx {
 
    changed_in_sync: bool,
 
    outbox: VecDeque<Message>,
 
    state_changes: VecDeque<ComponentStateChange>,
 

	
 
    // Workspaces that may be used by components to (generally) prevent
 
    // allocations. Be a good scout and leave it empty after you've used it.
 
    // TODO: Move to scheduler ctx, this is the wrong place
 
@@ -431,6 +433,14 @@ impl ComponentCtx {
 
        self.state_changes.push_back(ComponentStateChange::CreatedPort(port))
 
    }
 

	
 
    /// Notify the runtime of an error. Note that this will not perform any
 
    /// special action beyond printing the error. The component is responsible
 
    /// for waiting until it is appropriate to shut down (i.e. being outside
 
    /// of a sync region) and returning the `Exit` scheduling code.
 
    pub(crate) fn push_error(&mut self, error: EvalError) {
 

	
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_ports(&self) -> &[Port] {
 
        return self.ports.as_slice();
 
@@ -462,9 +472,17 @@ impl ComponentCtx {
 

	
 
    /// Submit a message for the scheduler to send to the appropriate receiver.
 
    /// May only be called inside of a sync block.
 
    pub(crate) fn submit_message(&mut self, contents: Message) {
 
    pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> {
 
        debug_assert!(self.is_in_sync);
 
        if let Some(port_id) = contents.source_port() {
 
            if self.get_port_by_id(port_id).is_none() {
 
                // We don't own the port
 
                return Err(());
 
            }
 
        }
 

	
 
        self.outbox.push_back(contents);
 
        return Ok(());
 
    }
 

	
 
    /// Notify that component just finished a sync block. Like
0 comments (0 inline, 0 general)