Changeset - f4f12a71e2e2
[Not reviewed]
0 4 1
mh - 4 years ago 2021-09-27 11:13:10
contact@maxhenger.nl
WIP on runtime with error handling and multithreading
5 files changed with 781 insertions and 11 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
new file 100644
 
use std::collections::HashMap;
 

	
 
use super::messages::{Message, Inbox};
 

	
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::eval::{ValueGroup, Value};
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct PortIdLocal {
 
    pub id: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ id: u32::MAX }
 
    }
 
}
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchId {
 
    pub index: u32,
 
}
 

	
 
impl BranchId {
 
    fn new_invalid() -> Self {
 
        Self{ index: 0 }
 
    }
 

	
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        Self{ index }
 
    }
 

	
 
    #[inline]
 
    fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
    // Synchronous variants
 
    RunningInSync,          // running within a sync block
 
    HaltedAtBranchPoint,    // at a branching point (at a `get` call)
 
    ReachedSyncEnd,         // reached end of sync block, branch represents a local solution
 
    Inconsistent,           // branch can never represent a local solution, so halted
 
}
 

	
 
pub(crate) struct Branch {
 
    index: BranchId,
 
    parent_index: BranchId,
 
    // Code execution state
 
    code_state: ComponentState,
 
    sync_state: SpeculativeState,
 
    next_branch_in_queue: Option<u32>,
 
    // Message/port state
 
    inbox: HashMap<PortIdLocal, Message>, // TODO: @temporary, remove together with fires()
 
    ports_delta: Vec<PortOwnershipDelta>,
 
}
 

	
 
impl Branch {
 
    /// Constructs a non-sync branch. It is assumed that the code is at the
 
    /// first instruction
 
    fn new_initial_branch(component_state: ComponentState) -> Self {
 
        Branch{
 
            index: BranchId::new_invalid(),
 
            parent_index: BranchId::new_invalid(),
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            next_branch_in_queue: None,
 
            inbox: HashMap::new(),
 
            ports_delta: Vec::new(),
 
        }
 
    }
 

	
 
    /// Constructs a sync branch. The provided branch is assumed to be the
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync_branching_from(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        );
 

	
 
        Branch{
 
            index: BranchId::new(new_index),
 
            parent_index: parent_branch.index,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            next_branch_in_queue: None,
 
            inbox: parent_branch.inbox.clone(),
 
            ports_delta: parent_branch.ports_delta.clone(),
 
        }
 
    }
 
}
 

	
 
#[derive(Clone)]
 
struct PortAssignment {
 
    is_assigned: bool,
 
    last_registered_branch_id: BranchId, // invalid branch ID implies not assigned yet
 
    num_times_fired: u32,
 
}
 

	
 
impl PortAssignment {
 
    fn new_unassigned() -> Self {
 
        Self{
 
            is_assigned: false,
 
            last_registered_branch_id: BranchId::new_invalid(),
 
            num_times_fired: 0,
 
        }
 
    }
 

	
 
    #[inline]
 
    fn mark_speculative(&mut self, num_times_fired: u32) {
 
        debug_assert!(!self.last_registered_branch_id.is_valid());
 
        self.is_assigned = true;
 
        self.num_times_fired = num_times_fired;
 
    }
 

	
 
    #[inline]
 
    fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) {
 
        self.is_assigned = true;
 
        self.last_registered_branch_id = branch_id;
 
        self.num_times_fired = num_times_fired;
 
    }
 
}
 

	
 
#[derive(Clone, Eq)]
 
enum PortOwnershipDelta {
 
    TakeOwnership(PortIdLocal),
 
    GiveAwayOwnership(PortIdLocal),
 
}
 

	
 
enum PortOwnershipError {
 
    UsedInInteraction(PortIdLocal),
 
    AlreadyGivenAway(PortIdLocal)
 
}
 

	
 
/// As the name implies, this contains a description of the ports associated
 
/// with a connector.
 
/// TODO: Extend documentation
 
struct ConnectorPorts {
 
    // Essentially a mapping from `port_index` to `port_id`.
 
    owned_ports: Vec<PortIdLocal>,
 
    // Contains P*B entries, where P is the number of ports and B is the number
 
    // of branches. One can find the appropriate mapping of port p at branch b
 
    // at linear index `b*P+p`.
 
    port_mapping: Vec<PortAssignment>
 
}
 

	
 
impl ConnectorPorts {
 
    /// Constructs the initial ports object. Assumes the presence of the
 
    /// non-sync branch at index 0. Will initialize all entries for the non-sync
 
    /// branch.
 
    fn new(owned_ports: Vec<PortIdLocal>) -> Self {
 
        let num_ports = owned_ports.len();
 
        let mut port_mapping = Vec::with_capacity(num_ports);
 
        for _ in 0..num_ports {
 
            port_mapping.push(PortAssignment::new_unassigned());
 
        }
 

	
 
        Self{ owned_ports, port_mapping }
 
    }
 

	
 
    /// Prepares the port mapping for a new branch. Assumes that there is no
 
    /// intermediate branch index that we have skipped.
 
    fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) {
 
        let num_ports = self.owned_ports.len();
 
        let parent_base_idx = parent_branch_idx as usize * num_ports;
 
        let new_base_idx = new_branch_idx as usize * num_ports;
 

	
 
        debug_assert!(parent_branch_idx < new_branch_idx);
 
        debug_assert!(new_base_idx == self.port_mapping.len());
 

	
 
        self.port_mapping.reserve(num_ports);
 
        for offset in 0..num_ports {
 
            let parent_port = &self.port_mapping[parent_base_idx + offset];
 
            self.port_mapping.push(parent_port.clone());
 
        }
 
    }
 

	
 
    /// Removes a particular port from the connector. May only be done if the
 
    /// connector is in non-sync mode
 
    fn remove_port(&mut self, port_id: PortIdLocal) {
 
        debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode
 
        let port_index = self.get_port_index(port_id).unwrap();
 
        self.owned_ports.remove(port_index);
 
        self.port_mapping.remove(port_index);
 
    }
 

	
 
    /// Retrieves the index associated with a port id. Note that the port might
 
    /// not exist (yet) if a speculative branch has just received the port.
 
    /// TODO: But then again, one cannot use that port, right?
 
    #[inline]
 
    fn get_port_index(&self, port_id: PortIdLocal) -> Option<usize> {
 
        for (idx, port) in self.owned_ports.iter().enumerate() {
 
            if port == port_id {
 
                return Some(idx)
 
            }
 
        }
 

	
 
        return None
 
    }
 

	
 
    #[inline]
 
    fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &self.port_mapping[mapped_idx];
 
    }
 

	
 
    #[inline]
 
    fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment {
 
        let mapped_idx = self.mapped_index(branch_idx, port_idx);
 
        return &mut self.port_mapping(mapped_idx);
 
    }
 

	
 
    fn num_ports(&self) -> usize {
 
        return self.owned_ports.len();
 
    }
 

	
 

	
 
    // Function for internal use: retrieve index in flattened port mapping array
 
    // based on branch/port index.
 
    #[inline]
 
    fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize {
 
        let branch_idx = branch_idx as usize;
 
        let num_ports = self.owned_ports.len();
 

	
 
        debug_assert!(port_idx < num_ports);
 
        debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len());
 

	
 
        return branch_idx * num_ports + port_idx;
 
    }
 
}
 

	
 
struct BranchQueue {
 
    first: u32,
 
    last: u32,
 
}
 

	
 
impl BranchQueue {
 
    fn new() -> Self {
 
        Self{ first: 0, last: 0 }
 
    }
 

	
 
    fn is_empty(&self) -> bool {
 
        debug_assert!((self.first == 0) == (self.last == 0));
 
        return self.first == 0;
 
    }
 
}
 

	
 
pub(crate) struct Connector {
 
    // State and properties of connector itself
 
    id: u32,
 
    in_sync: bool,
 
    // Branch management
 
    branches: Vec<Branch>, // first branch is always non-speculative one
 
    sync_active: BranchQueue,
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    // Port/message management
 
    ports: ConnectorPorts,
 
    inbox: Inbox,
 
}
 

	
 
struct TempCtx {}
 
impl RunContext for TempCtx {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        todo!()
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        todo!()
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        todo!()
 
    }
 
}
 

	
 
impl Connector {
 
    /// Constructs a representation of a connector. The assumption is that the
 
    /// initial branch is at the first instruction of the connector's code,
 
    /// hence is in a non-sync state.
 
    pub fn new(id: u32, initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            id,
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
            inbox: Inbox::new(),
 
        }
 
    }
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
        let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        // Match statement contains `return` statements only if the particular
 
        // run result behind handled requires an immediate re-run of the
 
        // connector.
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that does not yet have an
 
                // assigned speculative value. So we need to create those
 
                // branches
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 

	
 
                debug_assert!(self.ports.owned_ports.contains(&local_port_id));
 
                let silent_branch = &*branch;
 

	
 
                // Create a copied branch who will have the port set to firing
 
                let firing_index = self.branches.len() as u32;
 
                let mut firing_branch = Branch::new_sync_branching_from(firing_index, silent_branch);
 
                self.ports.prepare_sync_branch(branch.index.index, firing_index);
 

	
 
                let firing_port = self.ports.get_port_mut(firing_index, local_port_index);
 
                firing_port.mark_speculative(1);
 

	
 
                // Assign the old branch a silent value
 
                let silent_port = self.ports.get_port_mut(silent_branch.index.index, local_port_index);
 
                silent_port.mark_speculative(0);
 

	
 
                // Run both branches again
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch.index);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch.index);
 
                self.branches.push(firing_branch);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
                // Branch performed a `get` on a port that has not yet received
 
                // a value in its inbox.
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 

	
 
                // Check for port mapping assignment and, if present, if it is
 
                // consistent
 
                let is_valid_get = if port_mapping.is_assigned {
 
                    assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires`
 
                    port_mapping.num_times_fired == 1
 
                } else {
 
                    // Not yet assigned
 
                    port_mapping.mark_speculative(1);
 
                    true
 
                };
 

	
 
                if is_valid_get {
 
                    // Mark as a branching point for future messages
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch.index);
 

	
 
                    // But if some messages can be immediately applied, do so
 
                    // now.
 
                    let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    if !messages.is_empty() {
 
                        // TODO: If message contains ports, transfer ownership of port.
 
                        for message in messages {
 
                            // For each message, for the execution and feed it
 
                            // the provided message
 
                            let new_branch_index = self.branches.len() as u32;
 
                            let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch);
 
                            self.ports.prepare_sync_branch(branch.index.index, new_branch_index);
 

	
 
                            let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index);
 
                            port_mapping.last_registered_branch_id = message.sender_cur_branch_id;
 
                            debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1);
 

	
 
                            new_branch.inbox.insert(local_port_id, message.clone());
 

	
 
                            // Schedule the new branch
 
                            debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync);
 
                            Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index);
 
                            self.branches.push(new_branch);
 
                        }
 

	
 
                        // Because we have new branches to run, schedule
 
                        // immediately
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Branch is done, go through all of the ports that are not yet
 
                // assigned and modify them to be
 
                for port_idx in 0..self.ports.num_ports() {
 
                    let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx);
 
                    if !port_mapping.is_assigned {
 
                        port_mapping.mark_speculative(0);
 
                    }
 
                }
 

	
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch.index);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                // Branch performed a `put` on a particualar port.
 
                let local_port_id = PortIdLocal{ id: port_id.0.u32_suffix };
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 

	
 
                // Check the port mapping for consistency
 
                // TODO: For now we can only put once, so that simplifies stuff
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 
                let is_valid_put = if port_mapping.is_assigned {
 
                    // Already assigned, so must be speculative and one time
 
                    // firing, otherwise we are `put`ing multiple times.
 
                    if port_mapping.last_registered_branch_id.is_valid() {
 
                        // Already did a `put`
 
                        todo!("handle error through RunDeltaState");
 
                    } else {
 
                        // Valid if speculatively firing
 
                        port_mapping.num_times_fired == 1
 
                    }
 
                } else {
 
                    // Not yet assigned, do so now
 
                    true
 
                };
 

	
 
                if is_valid_put {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = Message{
 
                        sending_port: local_port_id,
 
                        receiving_port: PortIdLocal::new_invalid(),
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
                        message: value_group,
 
                    };
 

	
 
                    results.outbox.push(message);
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result),
 
        }
 

	
 
        // Not immediately scheduling, so schedule again if there are more
 
        // branches to run
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                // Need to wait until all children are terminated
 
                // TODO: Think about how to do this?
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::NotNow;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution and reschedule immediately
 
                self.in_sync = true;
 
                let first_sync_branch = Branch::new_sync_branching_from(1, branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index);
 
                self.branches.push(first_sync_branch);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Construction of a new component. Find all references to ports
 
                // inside of the arguments
 
                let first_port_idx = results.ports.len();
 
                find_ports_in_value_group(&arguments, &mut results.ports);
 

	
 
                for port
 
            }
 
        }
 

	
 
        ConnectorScheduling::NotNow // TODO: @Temp
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal helpers
 
    // -------------------------------------------------------------------------
 

	
 
    // Helpers for management of the branches and their internally stored
 
    // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming
 
    // linked lists inside of the vector of branches.
 

	
 
    #[inline]
 
    fn pop_branch(branches: &mut Vec<Branch>, queue: &mut BranchQueue) -> &mut Branch {
 
        debug_assert!(queue.first != 0);
 
        let branch = &mut branches[queue.first as usize];
 
        *queue.first = branch.next_branch_in_queue.unwrap_or(0);
 
        branch.next_branch_in_queue = None;
 

	
 
        if *queue.first == 0 {
 
            // No more entries in queue
 
            debug_assert_eq!(*queue.last, branch.index.index);
 
            *queue.last = 0;
 
        }
 

	
 
        return branch;
 
    }
 

	
 
    #[inline]
 
    fn push_branch_into_queue(branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_push: BranchId) {
 
        debug_assert!(to_push.is_valid());
 
        let to_push = to_push.index;
 

	
 
        if *queue.last == 0 {
 
            // No branches in the queue at all
 
            debug_assert_eq!(*queue.first, 0);
 
            branches[to_push as usize].next_branch_in_queue = None;
 
            *queue.first = to_push;
 
            *queue.last = to_push;
 
        } else {
 
            // Pre-existing branch in the queue
 
            debug_assert_ne!(*queue.first, 0);
 
            branches[*queue.last as usize].next_branch_in_queue = Some(to_push);
 
            *queue.last = to_push;
 
        }
 
    }
 

	
 
    // Helpers for local port management. Specifically for adopting/losing
 
    // ownership over ports
 

	
 
    /// Marks the ports as being "given away" (e.g. by sending a message over a
 
    /// channel, or by constructing a connector). Will return an error if the
 
    /// connector doesn't own the port in the first place.
 
    fn give_away_ports(ports: &mut ConnectorPorts, in_sync: bool, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(in_sync == !branch.index.is_valid());
 

	
 
        for port_id in port_ids {
 
            match ports.get_port_index(*port_id) {
 
                Some(port_index) => {
 
                    // We (used to) own the port
 
                    let port_mapping = ports.get_port(branch.index.index, port_index);
 
                    if port_mapping.is_assigned {
 
                        // Port is used in some kind of interaction. Cannot both
 
                        // give away the port and use it in an interaction
 
                        return Err(PortOwnershipError::UsedInInteraction(*port_id))
 
                    }
 

	
 
                    // Make sure it is not already given away
 
                    for delta in &branch.ports_delta {
 
                        match delta {
 
                            PortOwnershipDelta::TakeOwnership(_) => unreachable!(), // because we had a port mapping
 
                            PortOwnershipDelta::GiveAwayOwnership(given_away_port_id) => {
 
                                if port_id == given_away_port_id {
 
                                    return Err(PortOwnershipError::AlreadyGivenAway(*port_id));
 
                                }
 
                            }
 
                        }
 
                    }
 

	
 
                    // We're fine, the port will be given away. Note that if we
 
                    // are not in sync mode, then we can simply remove the
 
                    // ownership immediately.
 
                    if in_sync {
 
                        branch.ports_delta.push(PortOwnershipDelta::GiveAwayOwnership(*port_id));
 
                    } else {
 

	
 
                    }
 
                },
 
                None => {
 
                    // We did not yet own the port, so we must have received it
 
                    // this round, and we're going to give it away again.
 
                    debug_assert!(branch.ports_delta.contains(&PortOwnershipDelta::TakeOwnership(*port_id)));
 
                    let delta_to_find = PortOwnershipDelta::TakeOwnership(*port_id);
 
                    for delta_idx in 0..branch.ports_delta.len() {
 
                        if branch.ports_delta[delta_idx] == delta_to_find {
 
                            branch.ports_delta.remove(delta_idx);
 
                            break;
 
                        }
 
                    }
 

	
 
                    // Note for programmers: the fact that the message that
 
                    // contains this port will end up at another connector will
 
                    // take care of its new ownership.
 
                }
 
            }
 
        }
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Adopt ownership of the ports
 
}
 

	
 
/// A data structure passed to a connector whose code is being executed that is
 
/// used to queue up various state changes that have to be applied after
 
/// running, e.g. the messages the have to be transferred to other connectors.
 
// TODO: Come up with a better name
 
struct RunDeltaState {
 
    outbox: Vec<Message>,
 
    ports: Vec<PortIdLocal>,
 
}
 

	
 
enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
    NotNow,         // Do not reschedule for running
 
}
 

	
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortIdLocal::new(port_id.0.u32_suffix);
 
                for prev_port in ports.iter() {
 
                    if prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/messages.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::collections::hash_map::Entry;
 
use std::cmp::Ordering;
 

	
 
use super::connector::{PortIdLocal, BranchId};
 
use crate::PortId;
 
use crate::common::Id;
 
use crate::protocol::*;
 
@@ -17,11 +19,81 @@ pub struct BufferedMessage {
 
    pub(crate) message: ValueGroup,
 
}
 

	
 
/// An action performed on a port. Unsure about this
 
#[derive(PartialEq, Eq, Hash)]
 
struct PortAction {
 
    port_id: u32,
 
    prev_branch_id: Option<u32>,
 
#[derive(Clone)]
 
pub struct Message {
 
    pub sending_port: PortIdLocal,
 
    pub receiving_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
 
    pub sender_cur_branch_id: BranchId, // always valid
 
    pub message: ValueGroup,
 
}
 

	
 
pub struct Inbox {
 
    messages: Vec<Message>
 
}
 

	
 
impl Inbox {
 
    pub fn new() -> Self {
 
        Self{ messages: Vec::new() }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&mut self, message: Message) {
 
        match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) {
 
            Ok(_) => {} // message already exists
 
            Err(idx) => self.messages.insert(idx, message)
 
        }
 
    }
 

	
 
    /// Retrieves all messages for the provided conditions
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] {
 
        // Seek the first message with the appropriate port ID and branch ID
 
        let num_messages = self.messages.len();
 

	
 
        for first_idx in 0..num_messages {
 
            let msg = &self.messages[first_idx];
 
            if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id {
 
                // Found a match, seek ahead until the condition is no longer true
 
                let mut last_idx = first_idx + 1;
 
                while last_idx < num_messages {
 
                    let msg = &self.messages[last_idx];
 
                    if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id {
 
                        // No longer matching
 
                        break;
 
                    }
 
                    last_idx += 1;
 
                }
 

	
 
                // Return all the matching messages
 
                return &self.messages[first_idx..last_idx];
 
            } else if msg.receiving_port.id > port_id.id {
 
                // Because messages are ordered, this implies we couldn't find
 
                // any message
 
                break;
 
            }
 
        }
 

	
 
        return &self.messages[0..0];
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 

	
 
    // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur
 
    // branch id.
 
    fn compare_messages(a: &Message, b: &Message) -> Ordering {
 
        let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index);
 
        if ord != Ordering::Equal { return ord; }
 

	
 
        return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index);
 
    }
 
}
 

	
 
/// A connector's global inbox. Any received message ends up here. This is
 
@@ -33,6 +105,15 @@ pub struct ConnectorInbox {
 
    messages: HashMap<PortAction, Vec<BufferedMessage>>
 
}
 

	
 

	
 
/// An action performed on a port. Unsure about this
 
#[derive(PartialEq, Eq, Hash)]
 
struct PortAction {
 
    port_id: u32,
 
    prev_branch_id: Option<u32>,
 
}
 

	
 
// TODO: @remove
 
impl ConnectorInbox {
 
    pub fn new() -> Self {
 
        Self {
src/runtime2/mod.rs
Show inline comments
 
mod runtime;
 
mod messages;
 
mod connector;
 

	
 
#[cfg(test)] mod tests;
src/runtime2/runtime.rs
Show inline comments
 
@@ -359,8 +359,6 @@ impl Runtime {
 
                debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix));
 
                let mut insert_in_pending_receive = false;
 

	
 
                println!("DEBUG: Connector {} performing get on {:#?}", connector_id, port_id);
 

	
 
                match branch.port_mapping.entry(port_id) {
 
                    Entry::Vacant(entry) => {
 
                        // No entry yet, so force to firing
 
@@ -387,8 +385,6 @@ impl Runtime {
 
                    }
 
                }
 

	
 
                println!("DEBUG: insert = {}, port mapping is now {:#?}", insert_in_pending_receive, &branch.port_mapping);
 

	
 
                if insert_in_pending_receive {
 
                    // Perform the insert
 
                    match desc.spec_branches_pending_receive.entry(port_id) {
 
@@ -447,7 +443,6 @@ impl Runtime {
 
                // Branch just performed a `put()`. Check if we have
 
                // assigned the port value and if so, if it is
 
                // consistent.
 
                println!("DEBUG: Connector {} performing put on {:#?}", connector_id, port_id);
 
                let mut can_put = true;
 
                branch.branch_context.just_called_did_put = true;
 
                match branch.port_mapping.entry(port_id) {
 
@@ -477,7 +472,6 @@ impl Runtime {
 
                        }
 
                    }
 
                }
 
                println!("DEBUG: can_put = {}, port mapping is now {:#?}", can_put, &branch.port_mapping);
 

	
 
                if can_put {
 
                    // Actually put the message in the outbox
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -21,6 +21,7 @@ fn test_single_message() {
 
            assert!(false);
 
        }
 
    }
 

	
 
    fn run_putter_getter(code: &[u8]) {
 
        // Compile code
 
        let pd = ProtocolDescription::parse(code)
0 comments (0 inline, 0 general)