Changeset - f450ae18ef58
[Not reviewed]
0 2 0
MH - 4 years ago 2021-10-01 16:54:34
contact@maxhenger.nl
merge with rewrite of connector/scheduler
2 files changed with 202 insertions and 59 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 

	
 
use super::messages::{Message, Inbox};
 

	
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::eval::{ValueGroup, Value};
 
use crate::protocol::eval::{ValueGroup, Value, Prompt};
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct PortIdLocal {
 
    pub id: u32,
 
}
 

	
 
impl PortIdLocal {
 
    pub fn new(id: u32) -> Self {
 
        Self{ id }
 
    }
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ id: u32::MAX }
 
    }
 
}
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not
 
/// yet receive anything from any branch).
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct BranchId {
 
    pub index: u32,
 
}
 

	
 
impl BranchId {
 
    fn new_invalid() -> Self {
 
        Self{ index: 0 }
 
    }
 

	
 
    fn new(index: u32) -> Self {
 
        debug_assert!(index != 0);
 
        Self{ index }
 
    }
 

	
 
    #[inline]
 
    fn is_valid(&self) -> bool {
 
        return self.index != 0;
 
    }
 
}
 

	
 
#[derive(PartialEq, Eq)]
 
pub(crate) enum SpeculativeState {
 
    // Non-synchronous variants
 
    RunningNonSync,         // regular execution of code
 
    Error,                  // encountered a runtime error
 
    Finished,               // finished executing connector's code
 
@@ -94,99 +94,99 @@ impl Branch {
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        );
 

	
 
        Branch{
 
            index: BranchId::new(new_index),
 
            parent_index: parent_branch.index,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            next_branch_in_queue: None,
 
            inbox: parent_branch.inbox.clone(),
 
            ports_delta: parent_branch.ports_delta.clone(),
 
        }
 
    }
 
}
 

	
 
#[derive(Clone)]
 
struct PortAssignment {
 
    is_assigned: bool,
 
    last_registered_branch_id: BranchId, // invalid branch ID implies not assigned yet
 
    num_times_fired: u32,
 
}
 

	
 
impl PortAssignment {
 
    fn new_unassigned() -> Self {
 
        Self{
 
            is_assigned: false,
 
            last_registered_branch_id: BranchId::new_invalid(),
 
            num_times_fired: 0,
 
        }
 
    }
 

	
 
    #[inline]
 
    fn mark_speculative(&mut self, num_times_fired: u32) {
 
        debug_assert!(!self.last_registered_branch_id.is_valid());
 
        self.is_assigned = true;
 
        self.num_times_fired = num_times_fired;
 
    }
 

	
 
    #[inline]
 
    fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) {
 
        self.is_assigned = true;
 
        self.last_registered_branch_id = branch_id;
 
        self.num_times_fired = num_times_fired;
 
    }
 
}
 

	
 
#[derive(Clone, Eq)]
 
enum PortOwnershipDelta {
 
    TakeOwnership(PortIdLocal),
 
    GiveAwayOwnership(PortIdLocal),
 
struct PortOwnershipDelta {
 
    acquired: bool, // if false, then released ownership
 
    port_id: PortIdLocal,
 
}
 

	
 
enum PortOwnershipError {
 
    UsedInInteraction(PortIdLocal),
 
    AlreadyGivenAway(PortIdLocal)
 
}
 

	
 
/// As the name implies, this contains a description of the ports associated
 
/// with a connector.
 
/// TODO: Extend documentation
 
struct ConnectorPorts {
 
    // Essentially a mapping from `port_index` to `port_id`.
 
    owned_ports: Vec<PortIdLocal>,
 
    // Contains P*B entries, where P is the number of ports and B is the number
 
    // of branches. One can find the appropriate mapping of port p at branch b
 
    // at linear index `b*P+p`.
 
    port_mapping: Vec<PortAssignment>
 
}
 

	
 
impl ConnectorPorts {
 
    /// Constructs the initial ports object. Assumes the presence of the
 
    /// non-sync branch at index 0. Will initialize all entries for the non-sync
 
    /// branch.
 
    fn new(owned_ports: Vec<PortIdLocal>) -> Self {
 
        let num_ports = owned_ports.len();
 
        let mut port_mapping = Vec::with_capacity(num_ports);
 
        for _ in 0..num_ports {
 
            port_mapping.push(PortAssignment::new_unassigned());
 
        }
 

	
 
        Self{ owned_ports, port_mapping }
 
    }
 

	
 
    /// Prepares the port mapping for a new branch. Assumes that there is no
 
    /// intermediate branch index that we have skipped.
 
    fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) {
 
        let num_ports = self.owned_ports.len();
 
        let parent_base_idx = parent_branch_idx as usize * num_ports;
 
        let new_base_idx = new_branch_idx as usize * num_ports;
 

	
 
        debug_assert!(parent_branch_idx < new_branch_idx);
 
        debug_assert!(new_base_idx == self.port_mapping.len());
 

	
 
        self.port_mapping.reserve(num_ports);
 
        for offset in 0..num_ports {
 
            let parent_port = &self.port_mapping[parent_base_idx + offset];
 
            self.port_mapping.push(parent_port.clone());
 
        }
 
@@ -267,427 +267,511 @@ pub(crate) struct Connector {
 
    id: u32,
 
    in_sync: bool,
 
    // Branch management
 
    branches: Vec<Branch>, // first branch is always non-speculative one
 
    sync_active: BranchQueue,
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    // Port/message management
 
    ports: ConnectorPorts,
 
    inbox: Inbox,
 
}
 

	
 
struct TempCtx {}
 
impl RunContext for TempCtx {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        todo!()
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        todo!()
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!()
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        todo!()
 
    }
 
}
 

	
 
impl Connector {
 
    /// Constructs a representation of a connector. The assumption is that the
 
    /// initial branch is at the first instruction of the connector's code,
 
    /// hence is in a non-sync state.
 
    pub fn new(id: u32, initial_branch: Branch, owned_ports: Vec<PortIdLocal>) -> Self {
 
        Self{
 
            id,
 
            in_sync: false,
 
            branches: vec![initial_branch],
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
            inbox: Inbox::new(),
 
        }
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
        let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        // Match statement contains `return` statements only if the particular
 
        // run result behind handled requires an immediate re-run of the
 
        // connector.
 
        match run_result {
 
            RunResult::BranchInconsistent => {
 
                // Speculative branch became inconsistent
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            },
 
            RunResult::BranchMissingPortState(port_id) => {
 
                // Branch called `fires()` on a port that does not yet have an
 
                // assigned speculative value. So we need to create those
 
                // branches
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 

	
 
                debug_assert!(self.ports.owned_ports.contains(&local_port_id));
 
                let silent_branch = &*branch;
 

	
 
                // Create a copied branch who will have the port set to firing
 
                let firing_index = self.branches.len() as u32;
 
                let mut firing_branch = Branch::new_sync_branching_from(firing_index, silent_branch);
 
                self.ports.prepare_sync_branch(branch.index.index, firing_index);
 

	
 
                let firing_port = self.ports.get_port_mut(firing_index, local_port_index);
 
                firing_port.mark_speculative(1);
 

	
 
                // Assign the old branch a silent value
 
                let silent_port = self.ports.get_port_mut(silent_branch.index.index, local_port_index);
 
                silent_port.mark_speculative(0);
 

	
 
                // Run both branches again
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch.index);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch.index);
 
                self.branches.push(firing_branch);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            RunResult::BranchMissingPortValue(port_id) => {
 
                // Branch performed a `get` on a port that has not yet received
 
                // a value in its inbox.
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 
                let local_port_index = self.ports.get_port_index(local_port_id);
 
                if local_port_index.is_none() {
 
                    todo!("deal with the case where the port is acquired");
 
                }
 
                let local_port_index = local_port_index.unwrap();
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 

	
 
                // Check for port mapping assignment and, if present, if it is
 
                // consistent
 
                let is_valid_get = if port_mapping.is_assigned {
 
                    assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires`
 
                    port_mapping.num_times_fired == 1
 
                } else {
 
                    // Not yet assigned
 
                    port_mapping.mark_speculative(1);
 
                    true
 
                };
 

	
 
                if is_valid_get {
 
                    // Mark as a branching point for future messages
 
                    branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch.index);
 

	
 
                    // But if some messages can be immediately applied, do so
 
                    // now.
 
                    let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    if !messages.is_empty() {
 
                        // TODO: If message contains ports, transfer ownership of port.
 
                        for message in messages {
 
                            // For each message, for the execution and feed it
 
                            // the provided message
 
                            let new_branch_index = self.branches.len() as u32;
 
                            let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch);
 
                            self.ports.prepare_sync_branch(branch.index.index, new_branch_index);
 

	
 
                            let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index);
 
                            port_mapping.last_registered_branch_id = message.sender_cur_branch_id;
 
                            debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1);
 

	
 
                            new_branch.inbox.insert(local_port_id, message.clone());
 

	
 
                            // Schedule the new branch
 
                            debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync);
 
                            Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index);
 
                            self.branches.push(new_branch);
 
                        }
 

	
 
                        // Because we have new branches to run, schedule
 
                        // immediately
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Branch is done, go through all of the ports that are not yet
 
                // assigned and modify them to be
 
                for port_idx in 0..self.ports.num_ports() {
 
                    let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx);
 
                    if !port_mapping.is_assigned {
 
                        port_mapping.mark_speculative(0);
 
                    }
 
                }
 

	
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch.index);
 
            },
 
            RunResult::BranchPut(port_id, value_group) => {
 
                // Branch performed a `put` on a particualar port.
 
                let local_port_id = PortIdLocal{ id: port_id.0.u32_suffix };
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 
                let local_port_index = self.ports.get_port_index(local_port_id);
 
                if local_port_index.is_none() {
 
                    todo!("handle case where port was received before (i.e. in ports_delta)")
 
                }
 
                let local_port_index = local_port_index.unwrap();
 

	
 
                // Check the port mapping for consistency
 
                // TODO: For now we can only put once, so that simplifies stuff
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 
                let is_valid_put = if port_mapping.is_assigned {
 
                    // Already assigned, so must be speculative and one time
 
                    // firing, otherwise we are `put`ing multiple times.
 
                    if port_mapping.last_registered_branch_id.is_valid() {
 
                        // Already did a `put`
 
                        todo!("handle error through RunDeltaState");
 
                    } else {
 
                        // Valid if speculatively firing
 
                        port_mapping.num_times_fired == 1
 
                    }
 
                } else {
 
                    // Not yet assigned, do so now
 
                    true
 
                };
 

	
 
                if is_valid_put {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = Message{
 
                        sending_port: local_port_id,
 
                        receiving_port: PortIdLocal::new_invalid(),
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
                        message: value_group,
 
                    };
 

	
 
                    results.outbox.push(message);
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result),
 
        }
 

	
 
        // Not immediately scheduling, so schedule again if there are more
 
        // branches to run
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 

	
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = TempCtx{};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
                // Need to wait until all children are terminated
 
                // TODO: Think about how to do this?
 
                branch.sync_state = SpeculativeState::Finished;
 
                return ConnectorScheduling::NotNow;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                // Prepare for sync execution and reschedule immediately
 
                self.in_sync = true;
 
                let first_sync_branch = Branch::new_sync_branching_from(1, branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index);
 
                self.branches.push(first_sync_branch);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Construction of a new component. Find all references to ports
 
                // inside of the arguments
 
                let first_port_idx = results.ports.len();
 
                debug_assert!(results.ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut results.ports);
 

	
 
                for port
 
            }
 
        }
 
                if !results.ports.is_empty() {
 
                    // Ports changing ownership
 
                    if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &results.ports) {
 
                        todo!("fatal error handling");
 
                    }
 
                }
 

	
 
        ConnectorScheduling::NotNow // TODO: @Temp
 
                // Add connector for later execution
 
                let new_connector_state = ComponentState {
 
                    prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, arguments)
 
                };
 
                let new_connector_ports = results.ports.clone(); // TODO: Do something with this
 
                let new_connector_branch = Branch::new_initial_branch(new_connector_state);
 
                let new_connector = Connector::new(0, new_connector_branch, new_connector_ports);
 

	
 
                results.new_connectors.push(new_connector);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewChannel => {
 
                // Need to prepare a new channel
 
                todo!("adding channels to some global context");
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal helpers
 
    // -------------------------------------------------------------------------
 

	
 
    // Helpers for management of the branches and their internally stored
 
    // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming
 
    // linked lists inside of the vector of branches.
 

	
 
    #[inline]
 
    fn pop_branch(branches: &mut Vec<Branch>, queue: &mut BranchQueue) -> &mut Branch {
 
        debug_assert!(queue.first != 0);
 
        let branch = &mut branches[queue.first as usize];
 
        *queue.first = branch.next_branch_in_queue.unwrap_or(0);
 
        branch.next_branch_in_queue = None;
 

	
 
        if *queue.first == 0 {
 
            // No more entries in queue
 
            debug_assert_eq!(*queue.last, branch.index.index);
 
            *queue.last = 0;
 
        }
 

	
 
        return branch;
 
    }
 

	
 
    #[inline]
 
    fn push_branch_into_queue(branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_push: BranchId) {
 
        debug_assert!(to_push.is_valid());
 
        let to_push = to_push.index;
 

	
 
        if *queue.last == 0 {
 
            // No branches in the queue at all
 
            debug_assert_eq!(*queue.first, 0);
 
            branches[to_push as usize].next_branch_in_queue = None;
 
            *queue.first = to_push;
 
            *queue.last = to_push;
 
        } else {
 
            // Pre-existing branch in the queue
 
            debug_assert_ne!(*queue.first, 0);
 
            branches[*queue.last as usize].next_branch_in_queue = Some(to_push);
 
            *queue.last = to_push;
 
        }
 
    }
 

	
 
    // Helpers for local port management. Specifically for adopting/losing
 
    // ownership over ports
 

	
 
    /// Marks the ports as being "given away" (e.g. by sending a message over a
 
    /// channel, or by constructing a connector). Will return an error if the
 
    /// connector doesn't own the port in the first place.
 
    fn give_away_ports(ports: &mut ConnectorPorts, in_sync: bool, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(in_sync == !branch.index.is_valid());
 
    /// Releasing ownership of ports while in non-sync mode. This only occurs
 
    /// while instantiating new connectors
 
    fn release_ports_during_non_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(!branch.index.is_valid()); // branch in non-sync mode
 

	
 
        for port_id in port_ids {
 
            // We must own the port, or something is wrong with our code
 
            todo!("Set up some kind of message router");
 
            debug_assert!(ports.get_port_index(*port_id).is_some());
 
            ports.remove_port(*port_id);
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    /// Releasing ownership of ports during a sync-session. Will provide an
 
    /// error if the port was already used during a sync block.
 
    fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
        for port_id in port_ids {
 
            match ports.get_port_index(*port_id) {
 
                Some(port_index) => {
 
                    // We (used to) own the port
 
                    // We (used to) own the port. Make sure it is not given away
 
                    // already and not used to put/get data.
 
                    let port_mapping = ports.get_port(branch.index.index, port_index);
 
                    if port_mapping.is_assigned {
 
                        // Port is used in some kind of interaction. Cannot both
 
                        // give away the port and use it in an interaction
 
                        return Err(PortOwnershipError::UsedInInteraction(*port_id))
 
                    if port_mapping.is_assigned && port_mapping.num_times_fired != 0 {
 
                        // Already used
 
                        return Err(PortOwnershipError::UsedInInteraction(*port_id));
 
                    }
 

	
 
                    // Make sure it is not already given away
 
                    for delta in &branch.ports_delta {
 
                        match delta {
 
                            PortOwnershipDelta::TakeOwnership(_) => unreachable!(), // because we had a port mapping
 
                            PortOwnershipDelta::GiveAwayOwnership(given_away_port_id) => {
 
                                if port_id == given_away_port_id {
 
                                    return Err(PortOwnershipError::AlreadyGivenAway(*port_id));
 
                                }
 
                            }
 
                        if delta.port_id == port_id {
 
                            // We cannot have acquired this port, because the
 
                            // call to `ports.get_port_index` returned an index.
 
                            debug_assert!(!delta.acquired);
 
                            return Err(PortOwnershipError::AlreadyGivenAway(*port_id));
 
                        }
 
                    }
 

	
 
                    // We're fine, the port will be given away. Note that if we
 
                    // are not in sync mode, then we can simply remove the
 
                    // ownership immediately.
 
                    if in_sync {
 
                        branch.ports_delta.push(PortOwnershipDelta::GiveAwayOwnership(*port_id));
 
                    } else {
 

	
 
                    }
 
                    branch.ports_delta.push(PortOwnershipDelta{
 
                        acquired: false,
 
                        port_id: *port_id,
 
                    });
 
                },
 
                None => {
 
                    // We did not yet own the port, so we must have received it
 
                    // this round, and we're going to give it away again.
 
                    debug_assert!(branch.ports_delta.contains(&PortOwnershipDelta::TakeOwnership(*port_id)));
 
                    let delta_to_find = PortOwnershipDelta::TakeOwnership(*port_id);
 
                    for delta_idx in 0..branch.ports_delta.len() {
 
                        if branch.ports_delta[delta_idx] == delta_to_find {
 
                            branch.ports_delta.remove(delta_idx);
 
                    // Not in port mapping, so we must have acquired it before,
 
                    // remove the acquirement.
 
                    let mut to_delete_index: isize = -1;
 
                    for (delta_idx, delta) in branch.ports_delta.iter().enumerate() {
 
                        if delta.port_id == *port_id {
 
                            debug_assert!(delta.acquired);
 
                            to_delete_index = delta_idx as isize;
 
                            break;
 
                        }
 
                    }
 

	
 
                    // Note for programmers: the fact that the message that
 
                    // contains this port will end up at another connector will
 
                    // take care of its new ownership.
 
                    debug_assert!(to_delete_index != -1);
 
                    branch.ports_delta.remove(to_delete_index as usize);
 
                }
 
            }
 
        }
 

	
 
        return Ok(());
 
        return Ok(())
 
    }
 

	
 
    /// Adopt ownership of the ports
 
    /// Acquiring ports during a sync-session.
 
    fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
        'port_loop: for port_id in port_ids {
 
            for (delta_idx, delta) in branch.ports_delta.iter().enumerate() {
 
                if delta.port_id == *port_id {
 
                    if delta.acquired {
 
                        // Somehow already received this port.
 
                        // TODO: @security
 
                        todo!("take care of nefarious peers");
 
                    } else {
 
                        // Sending ports to ourselves
 
                        debug_assert!(ports.get_port_index(*port_id).is_some());
 
                        branch.ports_delta.remove(delta_idx);
 
                        continue 'port_loop;
 
                    }
 
                }
 
            }
 

	
 
            // If here then we can safely acquire the new port
 
            branch.ports_delta.push(PortOwnershipDelta{
 
                acquired: true,
 
                port_id: *port_id,
 
            });
 
        }
 

	
 
        return Ok(())
 
    }
 
}
 

	
 
/// A data structure passed to a connector whose code is being executed that is
 
/// used to queue up various state changes that have to be applied after
 
/// running, e.g. the messages the have to be transferred to other connectors.
 
// TODO: Come up with a better name
 
struct RunDeltaState {
 
    outbox: Vec<Message>,
 
    ports: Vec<PortIdLocal>,
 
pub(crate) struct RunDeltaState {
 
    // Variables that allow the thread running the connector to pick up global
 
    // state changes and try to apply them.
 
    pub outbox: Vec<Message>,
 
    pub new_connectors: Vec<Connector>,
 
    // Workspaces
 
    pub ports: Vec<PortIdLocal>,
 
}
 

	
 
impl RunDeltaState {
 
    /// Constructs a new `RunDeltaState` object with the default amount of
 
    /// reserved memory
 
    pub fn new() -> Self {
 
        RunDeltaState{
 
            outbox: Vec::with_capacity(64),
 
            new_connectors: Vec::new(),
 
            ports: Vec::with_capacity(64),
 
        }
 
    }
 
}
 

	
 
enum ConnectorScheduling {
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
    NotNow,         // Do not reschedule for running
 
}
 

	
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortIdLocal::new(port_id.0.u32_suffix);
 
                for prev_port in ports.iter() {
 
                    if prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Condvar;
 
use std::sync::Arc;
 
use std::time::Duration;
 
use std::thread;
 
use crate::ProtocolDescription;
 

	
 
use super::connector::{Connector, ConnectorScheduling, RunDeltaState};
 
use super::global_store::GlobalStore;
 

	
 
struct Scheduler<'g> {
 
    global: &'g GlobalStore,
 
struct Scheduler {
 
    global: Arc<GlobalStore>,
 
    code: Arc<ProtocolDescription>,
 
}
 

	
 
impl<'g> Scheduler<'g> {
 
    pub fn new(store: &'g GlobalStore) {
 
impl Scheduler {
 
    pub fn new(global: Arc<GlobalStore>, code: Arc<ProtocolDescription>) -> Self {
 
        Self{
 
            global,
 
            code,
 
        }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        // TODO: @Memory, scheme for reducing allocations if excessive.
 
        let mut delta_state = RunDeltaState::new()
 

	
 
        loop {
 
            // TODO: Check if we're supposed to exit
 

	
 
            // Retrieve a unit of work
 
            let connector_key = self.global.pop_key();
 
            if connector_key.is_none() {
 
                // TODO: @Performance, needs condition variable for waking up
 
                thread::sleep(Duration::new(1, 0));
 
                continue
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector = self.global.get_connector(&connector_key);
 

	
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 

	
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                let new_schedule;
 

	
 
                if connector.is_in_sync_mode() {
 
                    // In synchronous mode, so we can expect messages being sent,
 
                    // but we never expect the creation of connectors
 
                    new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.new_connectors.is_empty());
 

	
 
                    if !delta_state.outbox.is_empty() {}
 
                } else {
 
                    // In regular running mode (not in a sync block) we cannot send
 
                    // messages but we can create new connectors
 
                    new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
                    if !delta_state.new_connectors.is_empty() {
 
                        // Push all connectors into the global state and queue them
 
                        // for execution
 

	
 
                    }
 
                }
 

	
 
                cur_schedule = new_schedule;
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)