Changeset - f450ae18ef58
[Not reviewed]
0 2 0
MH - 4 years ago 2021-10-01 16:54:34
contact@maxhenger.nl
merge with rewrite of connector/scheduler
2 files changed with 202 insertions and 59 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -4,7 +4,7 @@ use super::messages::{Message, Inbox};
 

	
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::eval::{ValueGroup, Value};
 
use crate::protocol::eval::{ValueGroup, Value, Prompt};
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct PortIdLocal {
 
@@ -139,9 +139,9 @@ impl PortAssignment {
 
}
 

	
 
#[derive(Clone, Eq)]
 
enum PortOwnershipDelta {
 
    TakeOwnership(PortIdLocal),
 
    GiveAwayOwnership(PortIdLocal),
 
struct PortOwnershipDelta {
 
    acquired: bool, // if false, then released ownership
 
    port_id: PortIdLocal,
 
}
 

	
 
enum PortOwnershipError {
 
@@ -312,6 +312,10 @@ impl Connector {
 
        }
 
    }
 

	
 
    pub fn is_in_sync_mode(&self) -> bool {
 
        return self.in_sync;
 
    }
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
@@ -368,7 +372,11 @@ impl Connector {
 
                // Branch performed a `get` on a port that has not yet received
 
                // a value in its inbox.
 
                let local_port_id = PortIdLocal::new(port_id.0.u32_suffix);
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 
                let local_port_index = self.ports.get_port_index(local_port_id);
 
                if local_port_index.is_none() {
 
                    todo!("deal with the case where the port is acquired");
 
                }
 
                let local_port_index = local_port_index.unwrap();
 
                let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index);
 

	
 
                // Check for port mapping assignment and, if present, if it is
 
@@ -435,7 +443,11 @@ impl Connector {
 
            RunResult::BranchPut(port_id, value_group) => {
 
                // Branch performed a `put` on a particualar port.
 
                let local_port_id = PortIdLocal{ id: port_id.0.u32_suffix };
 
                let local_port_index = self.ports.get_port_index(local_port_id).unwrap();
 
                let local_port_index = self.ports.get_port_index(local_port_id);
 
                if local_port_index.is_none() {
 
                    todo!("handle case where port was received before (i.e. in ports_delta)")
 
                }
 
                let local_port_index = local_port_index.unwrap();
 

	
 
                // Check the port mapping for consistency
 
                // TODO: For now we can only put once, so that simplifies stuff
 
@@ -486,7 +498,7 @@ impl Connector {
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 
@@ -516,14 +528,36 @@ impl Connector {
 
            RunResult::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                // Construction of a new component. Find all references to ports
 
                // inside of the arguments
 
                let first_port_idx = results.ports.len();
 
                debug_assert!(results.ports.is_empty());
 
                find_ports_in_value_group(&arguments, &mut results.ports);
 

	
 
                for port
 
            }
 
        }
 
                if !results.ports.is_empty() {
 
                    // Ports changing ownership
 
                    if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &results.ports) {
 
                        todo!("fatal error handling");
 
                    }
 
                }
 

	
 
        ConnectorScheduling::NotNow // TODO: @Temp
 
                // Add connector for later execution
 
                let new_connector_state = ComponentState {
 
                    prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, arguments)
 
                };
 
                let new_connector_ports = results.ports.clone(); // TODO: Do something with this
 
                let new_connector_branch = Branch::new_initial_branch(new_connector_state);
 
                let new_connector = Connector::new(0, new_connector_branch, new_connector_ports);
 

	
 
                results.new_connectors.push(new_connector);
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            RunResult::NewChannel => {
 
                // Need to prepare a new channel
 
                todo!("adding channels to some global context");
 

	
 
                return ConnectorScheduling::Later;
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
@@ -572,79 +606,129 @@ impl Connector {
 
    // Helpers for local port management. Specifically for adopting/losing
 
    // ownership over ports
 

	
 
    /// Marks the ports as being "given away" (e.g. by sending a message over a
 
    /// channel, or by constructing a connector). Will return an error if the
 
    /// connector doesn't own the port in the first place.
 
    fn give_away_ports(ports: &mut ConnectorPorts, in_sync: bool, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(in_sync == !branch.index.is_valid());
 
    /// Releasing ownership of ports while in non-sync mode. This only occurs
 
    /// while instantiating new connectors
 
    fn release_ports_during_non_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(!branch.index.is_valid()); // branch in non-sync mode
 

	
 
        for port_id in port_ids {
 
            // We must own the port, or something is wrong with our code
 
            todo!("Set up some kind of message router");
 
            debug_assert!(ports.get_port_index(*port_id).is_some());
 
            ports.remove_port(*port_id);
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    /// Releasing ownership of ports during a sync-session. Will provide an
 
    /// error if the port was already used during a sync block.
 
    fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
        for port_id in port_ids {
 
            match ports.get_port_index(*port_id) {
 
                Some(port_index) => {
 
                    // We (used to) own the port
 
                    // We (used to) own the port. Make sure it is not given away
 
                    // already and not used to put/get data.
 
                    let port_mapping = ports.get_port(branch.index.index, port_index);
 
                    if port_mapping.is_assigned {
 
                        // Port is used in some kind of interaction. Cannot both
 
                        // give away the port and use it in an interaction
 
                        return Err(PortOwnershipError::UsedInInteraction(*port_id))
 
                    if port_mapping.is_assigned && port_mapping.num_times_fired != 0 {
 
                        // Already used
 
                        return Err(PortOwnershipError::UsedInInteraction(*port_id));
 
                    }
 

	
 
                    // Make sure it is not already given away
 
                    for delta in &branch.ports_delta {
 
                        match delta {
 
                            PortOwnershipDelta::TakeOwnership(_) => unreachable!(), // because we had a port mapping
 
                            PortOwnershipDelta::GiveAwayOwnership(given_away_port_id) => {
 
                                if port_id == given_away_port_id {
 
                                    return Err(PortOwnershipError::AlreadyGivenAway(*port_id));
 
                                }
 
                            }
 
                        if delta.port_id == port_id {
 
                            // We cannot have acquired this port, because the
 
                            // call to `ports.get_port_index` returned an index.
 
                            debug_assert!(!delta.acquired);
 
                            return Err(PortOwnershipError::AlreadyGivenAway(*port_id));
 
                        }
 
                    }
 

	
 
                    // We're fine, the port will be given away. Note that if we
 
                    // are not in sync mode, then we can simply remove the
 
                    // ownership immediately.
 
                    if in_sync {
 
                        branch.ports_delta.push(PortOwnershipDelta::GiveAwayOwnership(*port_id));
 
                    } else {
 

	
 
                    }
 
                    branch.ports_delta.push(PortOwnershipDelta{
 
                        acquired: false,
 
                        port_id: *port_id,
 
                    });
 
                },
 
                None => {
 
                    // We did not yet own the port, so we must have received it
 
                    // this round, and we're going to give it away again.
 
                    debug_assert!(branch.ports_delta.contains(&PortOwnershipDelta::TakeOwnership(*port_id)));
 
                    let delta_to_find = PortOwnershipDelta::TakeOwnership(*port_id);
 
                    for delta_idx in 0..branch.ports_delta.len() {
 
                        if branch.ports_delta[delta_idx] == delta_to_find {
 
                            branch.ports_delta.remove(delta_idx);
 
                    // Not in port mapping, so we must have acquired it before,
 
                    // remove the acquirement.
 
                    let mut to_delete_index: isize = -1;
 
                    for (delta_idx, delta) in branch.ports_delta.iter().enumerate() {
 
                        if delta.port_id == *port_id {
 
                            debug_assert!(delta.acquired);
 
                            to_delete_index = delta_idx as isize;
 
                            break;
 
                        }
 
                    }
 

	
 
                    // Note for programmers: the fact that the message that
 
                    // contains this port will end up at another connector will
 
                    // take care of its new ownership.
 
                    debug_assert!(to_delete_index != -1);
 
                    branch.ports_delta.remove(to_delete_index as usize);
 
                }
 
            }
 
        }
 

	
 
        return Ok(());
 
        return Ok(())
 
    }
 

	
 
    /// Adopt ownership of the ports
 
    /// Acquiring ports during a sync-session.
 
    fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
        'port_loop: for port_id in port_ids {
 
            for (delta_idx, delta) in branch.ports_delta.iter().enumerate() {
 
                if delta.port_id == *port_id {
 
                    if delta.acquired {
 
                        // Somehow already received this port.
 
                        // TODO: @security
 
                        todo!("take care of nefarious peers");
 
                    } else {
 
                        // Sending ports to ourselves
 
                        debug_assert!(ports.get_port_index(*port_id).is_some());
 
                        branch.ports_delta.remove(delta_idx);
 
                        continue 'port_loop;
 
                    }
 
                }
 
            }
 

	
 
            // If here then we can safely acquire the new port
 
            branch.ports_delta.push(PortOwnershipDelta{
 
                acquired: true,
 
                port_id: *port_id,
 
            });
 
        }
 

	
 
        return Ok(())
 
    }
 
}
 

	
 
/// A data structure passed to a connector whose code is being executed that is
 
/// used to queue up various state changes that have to be applied after
 
/// running, e.g. the messages the have to be transferred to other connectors.
 
// TODO: Come up with a better name
 
struct RunDeltaState {
 
    outbox: Vec<Message>,
 
    ports: Vec<PortIdLocal>,
 
pub(crate) struct RunDeltaState {
 
    // Variables that allow the thread running the connector to pick up global
 
    // state changes and try to apply them.
 
    pub outbox: Vec<Message>,
 
    pub new_connectors: Vec<Connector>,
 
    // Workspaces
 
    pub ports: Vec<PortIdLocal>,
 
}
 

	
 
impl RunDeltaState {
 
    /// Constructs a new `RunDeltaState` object with the default amount of
 
    /// reserved memory
 
    pub fn new() -> Self {
 
        RunDeltaState{
 
            outbox: Vec::with_capacity(64),
 
            new_connectors: Vec::new(),
 
            ports: Vec::with_capacity(64),
 
        }
 
    }
 
}
 

	
 
enum ConnectorScheduling {
 
pub(crate) enum ConnectorScheduling {
 
    Immediate,      // Run again, immediately
 
    Later,          // Schedule for running, at some later point in time
 
    NotNow,         // Do not reschedule for running
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Condvar;
 
use std::sync::Arc;
 
use std::time::Duration;
 
use std::thread;
 
use crate::ProtocolDescription;
 

	
 
use super::connector::{Connector, ConnectorScheduling, RunDeltaState};
 
use super::global_store::GlobalStore;
 

	
 
struct Scheduler<'g> {
 
    global: &'g GlobalStore,
 
struct Scheduler {
 
    global: Arc<GlobalStore>,
 
    code: Arc<ProtocolDescription>,
 
}
 

	
 
impl<'g> Scheduler<'g> {
 
    pub fn new(store: &'g GlobalStore) {
 
impl Scheduler {
 
    pub fn new(global: Arc<GlobalStore>, code: Arc<ProtocolDescription>) -> Self {
 
        Self{
 
            global,
 
            code,
 
        }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        // TODO: @Memory, scheme for reducing allocations if excessive.
 
        let mut delta_state = RunDeltaState::new()
 

	
 
        loop {
 
            // TODO: Check if we're supposed to exit
 

	
 
            // Retrieve a unit of work
 
            let connector_key = self.global.pop_key();
 
            if connector_key.is_none() {
 
                // TODO: @Performance, needs condition variable for waking up
 
                thread::sleep(Duration::new(1, 0));
 
                continue
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector = self.global.get_connector(&connector_key);
 

	
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 

	
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                let new_schedule;
 

	
 
                if connector.is_in_sync_mode() {
 
                    // In synchronous mode, so we can expect messages being sent,
 
                    // but we never expect the creation of connectors
 
                    new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.new_connectors.is_empty());
 

	
 
                    if !delta_state.outbox.is_empty() {}
 
                } else {
 
                    // In regular running mode (not in a sync block) we cannot send
 
                    // messages but we can create new connectors
 
                    new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
                    if !delta_state.new_connectors.is_empty() {
 
                        // Push all connectors into the global state and queue them
 
                        // for execution
 

	
 
                    }
 
                }
 

	
 
                cur_schedule = new_schedule;
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)