Changeset - 233468b578e8
[Not reviewed]
0 4 0
MH - 4 years ago 2021-11-19 21:03:06
contact@maxhenger.nl
WIP on handling messages intended for future sync block of failing component
4 files changed with 84 insertions and 24 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -66,13 +66,12 @@ enum Mode {
 
#[derive(Debug)]
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,          // Run again, immediately
 
    Later,              // Schedule for running, at some later point in time
 
    NotNow,             // Do not reschedule for running
 
    Exit,               // Connector has exited
 
    Error(EvalError),   // Connector has experienced a fatal error
 
}
 

	
 
pub(crate) struct ConnectorPDL {
 
    mode: Mode,
 
    eval_error: Option<EvalError>,
 
    tree: ExecTree,
 
@@ -264,13 +263,22 @@ impl ConnectorPDL {
 
            consensus: &self.consensus,
 
            prepared: branch.prepared.take(),
 
        };
 

	
 
        let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context);
 
        if let Err(eval_error) = run_result {
 
            return ConnectorScheduling::Error(eval_error);
 
            self.eval_error = Some(eval_error);
 
            self.mode = Mode::SyncError;
 
            if let Some(conclusion) = self.consensus.notify_of_fatal_branch(branch_id, comp_ctx) {
 
                // We can exit immediately
 
                return self.enter_non_sync_mode(conclusion, comp_ctx);
 
            } else {
 
                // Current branch failed. But we may have other things that are
 
                // running.
 
                return ConnectorScheduling::Immediate;
 
            }
 
        }
 
        let run_result = run_result.unwrap();
 

	
 
        // Handle the returned result. Note that this match statement contains
 
        // explicit returns in case the run result requires that the component's
 
        // code is ran again immediately
 
@@ -405,13 +413,14 @@ impl ConnectorPDL {
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            prepared: branch.prepared.take(),
 
        };
 
        let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context);
 
        if let Err(eval_error) = run_result {
 
            return ConnectorScheduling::Error(eval_error);
 
            comp_ctx.push_error(eval_error);
 
            return ConnectorScheduling::Exit
 
        }
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EvalContinuation::ComponentTerminated => {
 
                branch.sync_state = SpeculativeState::Finished;
 
@@ -470,20 +479,13 @@ impl ConnectorPDL {
 
    fn enter_non_sync_mode(&mut self, conclusion: RoundConclusion, ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError);
 

	
 
        // Depending on local state decide what to do
 
        let final_branch_id = match conclusion {
 
            RoundConclusion::Success(branch_id) => Some(branch_id),
 
            RoundConclusion::Failure => if self.mode == Mode::SyncError {
 
                // We experienced an error, so exit now
 
                None
 
            } else {
 
                // We didn't experience an error, so retry
 
                // TODO: Decide what to do with sync errors
 
                Some(BranchId::new_invalid())
 
            }
 
            RoundConclusion::Failure => None,
 
        };
 

	
 
        if let Some(solution_branch_id) = final_branch_id {
 
            let mut fake_vec = Vec::new();
 
            self.tree.end_sync(solution_branch_id);
 
            self.consensus.end_sync(solution_branch_id, &mut fake_vec);
 
@@ -496,12 +498,15 @@ impl ConnectorPDL {
 

	
 
            return ConnectorScheduling::Immediate;
 
        } else {
 
            // No final branch, because we're supposed to exit!
 
            self.last_finished_handled = None;
 
            self.mode = Mode::Error;
 
            if let Some(eval_error) = self.eval_error.take() {
 
                ctx.push_error(eval_error);
 
            }
 
            return ConnectorScheduling::Exit;
 
        }
 
    }
 

	
 
    /// Runs the prompt repeatedly until some kind of execution-blocking
 
    /// condition appears.
src/runtime2/scheduler.rs
Show inline comments
 
@@ -56,12 +56,13 @@ impl Scheduler {
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
                if scheduled.shutting_down {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
 
                    self.handle_inbox_while_shutting_down(scheduled);
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
@@ -69,13 +70,13 @@ impl Scheduler {
 
                    }
 
                } else {
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx);
 
                    self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule));
 
                    
 

	
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_changes_in_context(scheduled);
 

	
 
                    cur_schedule = new_schedule;
 
                }
 
@@ -109,23 +110,19 @@ impl Scheduler {
 
                            self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(message));
 
                        }
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // All ports (if any) already closed
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    }
 

	
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
                ConnectorScheduling::Error(eval_error) => {
 
                    // Display error. Then exit
 
                    println!("Oh oh!\n{}", eval_error);
 
                    panic!("Abort!");
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
@@ -195,12 +192,27 @@ impl Scheduler {
 
                    scheduled.ctx.inbox_messages.push(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Handles inbox messages while shutting down. This intends to handle the
 
    /// case where a component cleanly exited outside of a sync region, but a
 
    /// peer, before receiving the `CloseChannel` message, sent a message inside
 
    /// a sync region. This peer should be notified that its message is not
 
    /// received by a component in a sync region.
 
    fn handle_inbox_while_shutting_down(&mut self, scheduled: &mut ScheduledConnector) {
 
        // Note: we're not handling the public inbox, we're dealing with the
 
        // private one!
 
        while let Some(message) = scheduled.ctx.read_next_message() {
 
            if let Some(target_port) = Self::get_message_target_port(&message) {
 
                todo!("handle this, send back 'my thing is closed yo'")
 
            }
 
        }
 
    }
 

	
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 
@@ -450,13 +462,13 @@ impl ComponentCtx {
 

	
 
    /// Notify the runtime of an error. Note that this will not perform any
 
    /// special action beyond printing the error. The component is responsible
 
    /// for waiting until it is appropriate to shut down (i.e. being outside
 
    /// of a sync region) and returning the `Exit` scheduling code.
 
    pub(crate) fn push_error(&mut self, error: EvalError) {
 

	
 
        println!("ERROR: Component ({}) encountered a critical error:\n{}", self.id.0, error);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_ports(&self) -> &[Port] {
 
        return self.ports.as_slice();
 
    }
src/runtime2/tests/basics.rs
Show inline comments
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_doing_nothing() {
 
    // If this thing does not get into an infinite loop, (hence: the runtime
 
    // exits), then the test works
 
    const CODE: &'static str ="
 
    primitive silent_willy(u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            sync { index += 1; }
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("doing_nothing");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "silent_willy", ValueGroup::new_stack(vec![
 
            Value::UInt32(NUM_LOOPS),
 
        ])).expect("create component");
 
    });
 
}
 

	
 
#[test]
 
fn test_local_sync_failure() {
 
    // If the component exits cleanly, then the runtime exits cleanly, and the
 
    // test will finish
 
    const CODE: &'static str = "
 
    primitive immediate_failure() {
 
        u32[] only_allows_index_0 = { 1 };
 
        while (true) sync { // note the infinite loop
 
            auto value = only_allows_index_0[1];
 
        }
 
    }
 
    ";
 

	
 
    let thing = TestTimer::new("immediate_local_failure");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "immediate_failure", ValueGroup::new_stack(Vec::new()))
 
            .expect("create component");
 
    })
 
}
 

	
 

	
 

	
 
#[test]
 
fn test_single_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive putter(out<bool> sender, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -7,19 +7,19 @@ use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 
// pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
// pub(crate) const NUM_THREADS: u32 = 1;
 
// pub(crate) const NUM_INSTANCES: u32 = 1;
 
// pub(crate) const NUM_LOOPS: u32 = 1;
 
pub(crate) const NUM_THREADS: u32 = 1;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 1;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
0 comments (0 inline, 0 general)