Changeset - f7da6a72fce7
[Not reviewed]
0 1 0
MH - 4 years ago 2021-09-21 22:27:51
contact@maxhenger.nl
move branch-related context into connector store
1 file changed with 29 insertions and 18 deletions:
0 comments (0 inline, 0 general)
src/runtime2/runtime.rs
Show inline comments
 
@@ -61,60 +61,79 @@ enum BranchState {
 
    RunningSync, // regular running speculative branch
 
    BranchPoint, // branch which ended up being a branching point
 
    ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution
 
    Failed, // branch that became inconsistent
 
}
 

	
 
#[derive(Clone)]
 
struct BranchPortDesc {
 
    last_registered_index: Option<u32>, // if putter, then last sent branch ID, if getter, then last received branch ID
 
    num_times_fired: u32, // number of puts/gets on this port
 
}
 

	
 
struct BranchContext {
 
    just_called_did_put: bool,
 
    pending_channel: Option<(Value, Value)>,
 
}
 

	
 
struct BranchDesc {
 
    index: u32,
 
    parent_index: Option<u32>,
 
    code_state: ComponentState,
 
    branch_state: BranchState,
 
    owned_ports: Vec<u32>,
 
    message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value
 
    port_mapping: HashMap<PortId, BranchPortDesc>,
 
    branch_context: BranchContext,
 
}
 

	
 
impl BranchDesc {
 
    /// Creates the first non-sync branch of a connector
 
    fn new_non_sync(component_state: ComponentState, owned_ports: Vec<u32>) -> Self {
 
        Self{
 
            index: 0,
 
            parent_index: None,
 
            code_state: component_state,
 
            branch_state: BranchState::RunningNonSync,
 
            owned_ports,
 
            message_inbox: HashMap::new(),
 
            port_mapping: HashMap::new(),
 
            branch_context: BranchContext{
 
                just_called_did_put: false,
 
                pending_channel: None,
 
            }
 
        }
 
    }
 

	
 
    /// Creates a sync branch based on the supplied branch. This supplied branch
 
    /// is the branching point for the new one, i.e. the parent in the branching
 
    /// tree.
 
    fn new_sync_from(index: u32, branch_state: &BranchDesc) -> Self {
 
        // We expect that the given branche's context is not halfway handling a
 
        // `put(...)` or a `channel x -> y` statement.
 
        debug_assert!(!branch_state.branch_context.just_called_did_put);
 
        debug_assert!(branch_state.branch_context.pending_channel.is_none());
 

	
 
        Self{
 
            index,
 
            parent_index: Some(branch_state.index),
 
            code_state: branch_state.code_state.clone(),
 
            branch_state: BranchState::RunningSync,
 
            owned_ports: branch_state.owned_ports.clone(),
 
            message_inbox: branch_state.message_inbox.clone(),
 
            port_mapping: branch_state.port_mapping.clone(),
 
            branch_context: BranchContext{
 
                just_called_did_put: false,
 
                pending_channel: None,
 
            }
 
        }
 
    }
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
enum Scheduling {
 
    Immediate,
 
    Later,
 
    NotNow,
 
}
 

	
 
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
 
@@ -271,28 +290,25 @@ impl Runtime {
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert!(desc.in_sync);
 
        debug_assert!(!desc.spec_branches_active.is_empty());
 

	
 
        let branch_index = desc.spec_branches_active.pop_front().unwrap();
 
        let branch = &mut desc.branches[branch_index as usize];
 

	
 
        // Run this particular branch to a next blocking point
 
        // TODO: PERSISTENT RUN CTX
 
        let mut run_context = Context{
 
            inbox: &branch.message_inbox,
 
            port_mapping: &branch.port_mapping,
 
            connector_id,
 
            branch_id: Some(branch_index),
 
            just_called_did_put: false,
 
            pending_channel: None,
 
            branch_ctx: &mut branch.branch_context,
 
        };
 

	
 
        let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent. So we don't
 
                // run it again
 
                branch.branch_state = BranchState::Failed;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that did not have a
 
@@ -406,24 +422,25 @@ impl Runtime {
 
                // Mark the branch as being done
 
                branch.branch_state = BranchState::ReachedEndSync;
 
                desc.spec_branches_done.push(branch_index);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                debug_assert_eq!(value_group.values.len(), 1); // can only send one value
 

	
 
                // Branch just performed a `put()`. Check if we have
 
                // assigned the port value and if so, if it is
 
                // consistent.
 
                let mut can_put = true;
 
                branch.branch_context.just_called_did_put = true;
 
                match branch.port_mapping.entry(port_id) {
 
                    Entry::Vacant(entry) => {
 
                        // No entry yet
 
                        entry.insert(BranchPortDesc{
 
                            last_registered_index: Some(branch.index),
 
                            num_times_fired: 1,
 
                        });
 
                    },
 
                    Entry::Occupied(mut entry) => {
 
                        // Pre-existing entry
 
                        let entry = entry.get_mut();
 
                        if entry.num_times_fired == 0 {
 
@@ -491,28 +508,25 @@ impl Runtime {
 
        // since we assume we're not running in sync-mode).
 
        // TODO: CONTINUE HERE, PERSEISTENT BRANCH CONTEXT
 
        let desc = self.connectors.get_mut(&connector_id).unwrap();
 
        debug_assert!(!desc.in_sync);
 
        debug_assert_eq!(desc.branches.len(), 1);
 

	
 
        let branch = &mut desc.branches[0];
 

	
 
        // Run this branch to its blocking point
 
        let mut run_context = Context{
 
            inbox: &branch.message_inbox,
 
            port_mapping: &branch.port_mapping,
 
            connector_id,
 
            branch_id: None,
 
            just_called_did_put: false,
 
            pending_channel: None,
 
            branch_ctx: &mut branch.branch_context,
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &self.protocol);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => return Scheduling::NotNow,
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution
 
                Self::prepare_branch_for_sync(desc);
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Find all references to ports in the provided arguments, the
 
@@ -533,27 +547,27 @@ impl Runtime {
 
                    port.owning_connector_id = Some(new_component_id)
 
                }
 

	
 
                // Finally push the new connector into the registry
 
                let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect();
 
                self.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports));
 
                self.connectors_active.push_back(new_component_id);
 

	
 
                return Scheduling::Immediate;
 
            },
 
            RunResult::NewChannel => {
 
                // Prepare channel
 
                debug_assert!(run_context.pending_channel.is_none());
 
                debug_assert!(run_context.branch_ctx.pending_channel.is_none());
 
                let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, Some(connector_id));
 
                run_context.pending_channel = Some((
 
                run_context.branch_ctx.pending_channel = Some((
 
                    port_value_from_id(Some(connector_id), put_id, true),
 
                    port_value_from_id(Some(connector_id), get_id, false)
 
                ));
 

	
 
                return Scheduling::Immediate;
 
            },
 
            _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    /// Puts all the messages that are currently in the outbox of a particular
 
    /// connector into the inbox of the receivers. If possible then branches
 
@@ -1019,56 +1033,53 @@ impl Runtime {
 
    }
 
}
 

	
 
/// Context accessible by the code while being executed by the runtime. When the
 
/// code is being executed by the runtime it sometimes needs to interact with 
 
/// the runtime. This is achieved by the "code throwing an error code", after 
 
/// which the runtime modifies the appropriate variables and continues executing
 
/// the code again. 
 
struct Context<'a> {
 
    // Temporary references to branch related storage
 
    inbox: &'a HashMap<(PortId, u32), ValueGroup>,
 
    port_mapping: &'a HashMap<PortId, BranchPortDesc>,
 
    // Properties of currently running connector/branch
 
    connector_id: u32,
 
    branch_id: Option<u32>,
 
    just_called_did_put: bool,
 
    // Resources ready to be retrieved by running code
 
    pending_channel: Option<(Value, Value)>, // (put, get) ports
 
    branch_ctx: &'a mut BranchContext,
 
}
 

	
 
impl<'a> crate::protocol::RunContext for Context<'a> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        // Note that we want "did put" to return false if we have fired zero
 
        // times, because this implies we did a prevous
 
        return self.just_called_did_put
 
        let old_value = self.branch_ctx.just_called_did_put;
 
        self.branch_ctx.just_called_did_put = false;
 
        return old_value;
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let inbox_key = (port, 1);
 
        match self.inbox.get(&inbox_key) {
 
            None => None,
 
            Some(value) => Some(value.clone()),
 
        }
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        match self.port_mapping.get(&port) {
 
            None => None,
 
            Some(port_info) => Some(Value::Bool(port_info.num_times_fired != 0)),
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        self.pending_channel.take()
 
        self.branch_ctx.pending_channel.take()
 
    }
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports. 
 
/// Duplicates will only be added once.
 
fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                for prev_port in ports.iter() {
0 comments (0 inline, 0 general)