Changeset - 98aadfccbafd
[Not reviewed]
0 7 0
MH - 4 years ago 2021-10-22 21:01:38
contact@maxhenger.nl
solving problem of connectors shutting down
7 files changed with 164 insertions and 73 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -54,6 +54,17 @@ pub struct ComponentId(Id); // PUB because it can be returned by errors
 
#[repr(transparent)]
 
pub struct PortId(pub(crate) Id);
 

	
 
impl PortId {
 
    // TODO: Remove concept of ComponentId and PortId in this file
 
    #[deprecated]
 
    pub fn new(port: u32) -> Self {
 
        return PortId(Id{
 
            connector_id: u32::MAX,
 
            u32_suffix: port,
 
        });
 
    }
 
}
 

	
 
/// A safely aliasable heap-allocated payload of message bytes
 
#[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)]
 
pub struct Payload(pub Arc<Vec<u8>>);
src/runtime2/connector.rs
Show inline comments
 
@@ -59,6 +59,7 @@ pub(crate) struct Branch {
 
    parent_index: BranchId,
 
    // Code execution state
 
    code_state: ComponentState,
 
    prepared_channel: Option<(Value, Value)>,
 
    sync_state: SpeculativeState,
 
    next_branch_in_queue: Option<u32>,
 
    // Message/port state
 
@@ -74,6 +75,7 @@ impl Branch {
 
            index: BranchId::new_invalid(),
 
            parent_index: BranchId::new_invalid(),
 
            code_state: component_state,
 
            prepared_channel: None,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            next_branch_in_queue: None,
 
            received: HashMap::new(),
 
@@ -88,11 +90,13 @@ impl Branch {
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        );
 
        debug_assert!(parent_branch.prepared_channel.is_none());
 

	
 
        Branch{
 
            index: BranchId::new(new_index),
 
            parent_index: parent_branch.index,
 
            code_state: parent_branch.code_state.clone(),
 
            prepared_channel: None,
 
            sync_state: SpeculativeState::RunningInSync,
 
            next_branch_in_queue: None,
 
            received: parent_branch.received.clone(),
 
@@ -268,6 +272,7 @@ impl ConnectorPorts {
 
        let branch_idx = branch_idx as usize;
 
        let num_ports = self.owned_ports.len();
 

	
 
        println!("port_idx = {}, branch_idx = {}, num_ports = {}, port_mapping.len() = {}", port_idx, branch_idx, num_ports, self.port_mapping.len());
 
        debug_assert!(port_idx < num_ports);
 
        debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len());
 

	
 
@@ -333,28 +338,31 @@ pub(crate) struct ConnectorPDL {
 
    pub ports: ConnectorPorts,
 
}
 

	
 
// TODO: Remove this monstrosity
 
struct ConnectorRunContext<'a> {
 
    inbox: &'a PrivateInbox,
 
    branch_index: u32,
 
    ports: &'a ConnectorPorts,
 
    branch: &'a Branch,
 
    ports_delta: &'a Vec<PortOwnershipDelta>,
 
    received: &'a HashMap<PortIdLocal, DataMessage>,
 
    scheduler: SchedulerCtx<'a>,
 
    prepared_channel: Option<(Value, Value)>,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        if self.branch.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) {
 
        if self.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) {
 
            // Either acquired or released, must be silent
 
            return false;
 
        }
 

	
 
        let port_index = self.ports.get_port_index(PortIdLocal::new(port.0.u32_suffix)).unwrap();
 
        let mapping = self.ports.get_port(self.branch.index.index, port_index);
 
        let mapping = self.ports.get_port(self.branch_index, port_index);
 
        return mapping.is_assigned;
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        match self.branch.received.get(&port_id) {
 
        match self.received.get(&port_id) {
 
            Some(message) => Some(message.message.clone()),
 
            None => None,
 
        }
 
@@ -362,12 +370,12 @@ impl<'a> RunContext for ConnectorRunContext<'a> {
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        if self.branch.ports_delta.iter().any(|v| v.port_id == port_id) {
 
        if self.ports_delta.iter().any(|v| v.port_id == port_id) {
 
            return None
 
        }
 

	
 
        let port_index = self.ports.get_port_index(port_id).unwrap();
 
        let mapping = self.ports.get_port(self.branch.index.index, port_index);
 
        let mapping = self.ports.get_port(self.branch_index, port_index);
 

	
 
        if mapping.is_assigned {
 
            return Some(Value::Bool(mapping.num_times_fired != 0));
 
@@ -377,9 +385,7 @@ impl<'a> RunContext for ConnectorRunContext<'a> {
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        let (getter, putter) = self.scheduler.runtime.create_channel();
 
        debug_assert_eq!(getter.kind, PortKind::Getter);
 

	
 
        return self.prepared_channel.take();
 
    }
 
}
 

	
 
@@ -396,9 +402,9 @@ impl Connector for ConnectorPDL {
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, conn_ctx, delta_state);
 
            let scheduling = self.run_in_speculative_mode(sched_ctx, conn_ctx, delta_state);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
@@ -447,7 +453,7 @@ impl Connector for ConnectorPDL {
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, conn_ctx, delta_state);
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, conn_ctx, delta_state);
 
            return scheduling;
 
        }
 
    }
 
@@ -719,15 +725,26 @@ impl ConnectorPDL {
 
    /// where it is the caller's responsibility to immediately take care of
 
    /// those changes. The return value indicates when (and if) the connector
 
    /// needs to be scheduled again.
 
    pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(self.in_sync);
 
        debug_assert!(!self.sync_active.is_empty());
 

	
 
        if self.sync_active.is_empty() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        let mut run_context = ConnectorRunContext {};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 
        debug_assert!(branch.prepared_channel.is_none());
 
        let mut run_context = ConnectorRunContext {
 
            branch_index: branch.index.index,
 
            ports: &self.ports,
 
            ports_delta: &branch.ports_delta,
 
            scheduler: sched_ctx,
 
            prepared_channel: None,
 
            received: &branch.received,
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        // Match statement contains `return` statements only if the particular
 
        // run result behind handled requires an immediate re-run of the
 
@@ -908,6 +925,9 @@ impl ConnectorPDL {
 
                    results.ports.clear();
 

	
 
                    results.outbox.push(MessageContents::Data(message));
 

	
 
                    let branch_index = branch.index;
 
                    Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, branch_index);
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
@@ -926,7 +946,7 @@ impl ConnectorPDL {
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: &SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 
@@ -935,11 +955,14 @@ impl ConnectorPDL {
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{
 
            inbox: &self.inbox,
 
            branch_index: branch.index.index,
 
            ports: &self.ports,
 
            branch: &Branch {}
 
            ports_delta: &branch.ports_delta,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
            received: &branch.received,
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 

	
 
        match run_result {
 
            RunResult::ComponentTerminated => {
 
@@ -953,6 +976,7 @@ impl ConnectorPDL {
 
                self.in_sync = true;
 
                let first_sync_branch = Branch::new_sync_branching_from(1, branch);
 
                let first_sync_branch_id = first_sync_branch.index;
 
                self.ports.prepare_sync_branch(0, 1);
 
                self.branches.push(first_sync_branch);
 
                Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch_id);
 

	
 
@@ -973,7 +997,11 @@ impl ConnectorPDL {
 

	
 
                // Add connector for later execution
 
                let new_connector_state = ComponentState {
 
                    prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, arguments)
 
                    prompt: Prompt::new(
 
                        &sched_ctx.runtime.protocol_description.types,
 
                        &sched_ctx.runtime.protocol_description.heap,
 
                        definition_id, monomorph_idx, arguments
 
                    )
 
                };
 
                let new_connector_ports = results.ports.clone(); // TODO: Do something with this
 
                let new_connector_branch = Branch::new_initial_branch(new_connector_state);
 
@@ -985,9 +1013,17 @@ impl ConnectorPDL {
 
            },
 
            RunResult::NewChannel => {
 
                // Need to prepare a new channel
 
                todo!("adding channels to some global context");
 
                let (getter, putter) = sched_ctx.runtime.create_channel(conn_ctx.id);
 
                debug_assert_eq!(getter.kind, PortKind::Getter);
 
                branch.prepared_channel = Some((
 
                    Value::Input(PortId::new(putter.self_id.index)),
 
                    Value::Output(PortId::new(getter.self_id.index))
 
                ));
 

	
 
                return ConnectorScheduling::Later;
 
                results.new_ports.push(putter);
 
                results.new_ports.push(getter);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
@@ -1106,6 +1142,10 @@ impl ConnectorPDL {
 
    /// Releasing ownership of ports during a sync-session. Will provide an
 
    /// error if the port was already used during a sync block.
 
    fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        if port_ids.is_empty() {
 
            return Ok(())
 
        }
 

	
 
        todo!("unfinished: add port properties during final solution-commit msgs");
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
 
@@ -1157,6 +1197,10 @@ impl ConnectorPDL {
 

	
 
    /// Acquiring ports during a sync-session.
 
    fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> {
 
        if port_ids.is_empty() {
 
            return Ok(())
 
        }
 

	
 
        todo!("unfinished: add port properties during final solution-commit msgs");
 
        debug_assert!(branch.index.is_valid()); // branch in sync mode
 

	
src/runtime2/mod.rs
Show inline comments
 
@@ -22,9 +22,10 @@ use crate::ProtocolDescription;
 

	
 
use inbox::Message;
 
use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use scheduler::{Scheduler, ConnectorCtx, Router};
 
use scheduler::{Scheduler, ConnectorCtx, ControlMessageHandler};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use crate::runtime2::port::Port;
 
use crate::runtime2::scheduler::SchedulerCtx;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
@@ -84,10 +85,10 @@ impl Connector for ConnectorVariant {
 
        }
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, scheduler_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
 
            ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, conn_ctx, delta_state),
 
            ConnectorVariant::Native(c) => c.run(scheduler_ctx, conn_ctx, delta_state),
 
        }
 
    }
 
}
 
@@ -96,7 +97,9 @@ pub(crate) struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: Router,
 
    pub router: ControlMessageHandler,
 
    pub shutting_down: bool,
 
    pub pending_acks: u32,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
@@ -214,11 +217,11 @@ impl RuntimeInner {
 
        self.scheduler_notifier.notify_one();
 
    }
 

	
 
    // --- Creating ports
 
    // --- Creating/using ports
 

	
 
    /// Creates a new port pair. Note that these are stored globally like the
 
    /// connectors are. Ports stored by components belong to those components.
 
    pub(crate) fn create_channel(&self) -> (Port, Port) {
 
    pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) {
 
        use port::{PortIdLocal, PortKind};
 

	
 
        let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst);
 
@@ -229,18 +232,34 @@ impl RuntimeInner {
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            peer_connector: self.connector_id,
 
            peer_connector: creating_connector,
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            peer_connector: self.connector_id,
 
            peer_connector: creating_connector,
 
        };
 

	
 
        return (getter_port, putter_port);
 
    }
 

	
 
    /// Sends a message to a particular connector. If the connector happened to
 
    /// be sleeping then it will be scheduled for execution.
 
    pub(crate) fn send_message(&self, target_id: ConnectorId, message: Message) {
 
        let target = self.get_component_public(target_id);
 
        target.inbox.insert_message(message);
 

	
 
        let should_wake_up = target.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe{ ConnectorKey::from_id(target_id) };
 
            self.push_work(key);
 
        }
 
    }
 

	
 
    // --- Creating/retrieving/destroying components
 

	
 
    pub(crate) fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey {
 
@@ -259,7 +278,7 @@ impl RuntimeInner {
 
        // Create as not sleeping, as we'll schedule it immediately
 
        let key = {
 
            let mut lock = self.connectors.write().unwrap();
 
            lock.create(ConnectorVariant::UserDefined(connector), true)
 
            lock.create(ConnectorVariant::UserDefined(connector), false)
 
        };
 

	
 
        // Transfer the ports
 
@@ -283,11 +302,13 @@ impl RuntimeInner {
 
        return key;
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        let lock = self.connectors.read().unwrap();
 
        return lock.get_private(connector_key);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_component_public(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
 
        let lock = self.connectors.read().unwrap();
 
        return lock.get_public(connector_id);
 
@@ -406,7 +427,9 @@ impl ConnectorStore {
 
            connector,
 
            context: ConnectorCtx::new(),
 
            public: ConnectorPublic::new(initially_sleeping),
 
            router: Router::new(),
 
            router: ControlMessageHandler::new(),
 
            shutting_down: false,
 
            pending_acks: 0,
 
        };
 

	
 
        let index;
src/runtime2/native.rs
Show inline comments
 
@@ -21,7 +21,7 @@ pub(crate) trait Connector {
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 

	
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    fn run(&mut self, sched_ctx: &SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
    fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
@@ -66,7 +66,7 @@ impl Connector for ConnectorApplication {
 
        }
 
    }
 

	
 
    fn run(&mut self, _sched_ctx: &SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, _sched_ctx: SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
@@ -112,7 +112,7 @@ impl ApplicationInterface {
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
        let (getter_port, putter_port) = self.runtime.create_channel();
 
        let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id);
 
        debug_assert_eq!(getter_port.kind, PortKind::Getter);
 
        let getter_id = getter_port.self_id;
 
        let putter_id = putter_port.self_id;
src/runtime2/port.rs
Show inline comments
 
@@ -21,7 +21,7 @@ impl PortIdLocal {
 
    }
 
}
 

	
 
#[derive(Eq, PartialEq)]
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
src/runtime2/scheduler.rs
Show inline comments
 
@@ -53,6 +53,8 @@ impl ConnectorCtx {
 
    }
 
}
 

	
 
// Because it contains pointers we're going to do a copy by value on this one
 
#[derive(Clone, Copy)]
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub(crate) runtime: &'a RuntimeInner
 
}
 
@@ -62,10 +64,6 @@ pub(crate) struct Scheduler {
 
    scheduler_id: u32,
 
}
 

	
 
// Thinking aloud: actual ports should be accessible by connector, but managed
 
// by the scheduler (to handle rerouting messages). We could just give a read-
 
// only context, instead of an extra call on the "Connector" trait.
 

	
 
impl Scheduler {
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Self{ runtime, scheduler_id };
 
@@ -144,9 +142,12 @@ impl Scheduler {
 
                }
 

	
 
                // Actually run the connector
 
                println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index);
 
                let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                let new_schedule = scheduled.connector.run(
 
                    &self.runtime.protocol_description, &scheduled.context, &mut delta_state
 
                    scheduler_ctx, &scheduled.context, &mut delta_state
 
                );
 
                println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index);
 

	
 
                // Handle all of the output from the current run: messages to
 
                // send and connectors to instantiate.
 
@@ -185,9 +186,19 @@ impl Scheduler {
 
                    }
 
                },
 
                ConnectorScheduling::Exit => {
 
                    // TODO: Better way of doing this, when exiting then
 
                    //  connected components must know their channels are invalid
 
                    self.runtime.destroy_component(connector_key);
 
                    // Prepare for exit. Set the shutdown flag and broadcast
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.context.ports {
 
                        self.runtime.send_message(port.peer_connector, Message{
 
                            sending_connector: connector_key.downcast(),
 
                            receiving_port: port.peer_id,
 
                            contents: MessageContents::Control(ControlMessage{
 
                                id: 0,
 
                                content: ControlMessageVariant::Ack
 
                            })
 
                        })
 
                    }
 
                }
 
            }
 
        }
 
@@ -221,7 +232,7 @@ impl Scheduler {
 
                                receiving_port: PortIdLocal::new_invalid(),
 
                                contents: MessageContents::ConfirmCommit(contents.clone()),
 
                            };
 
                            self.send_message_and_wake_up_if_sleeping(*to_visit, message);
 
                            self.runtime.send_message(*to_visit, message);
 
                        }
 
                        (ConnectorId::new_invalid(), PortIdLocal::new_invalid())
 
                    },
 
@@ -239,7 +250,7 @@ impl Scheduler {
 
                        receiving_port: peer_port,
 
                        contents: message,
 
                    };
 
                    self.send_message_and_wake_up_if_sleeping(peer_connector, message);
 
                    self.runtime.send_message(peer_connector, message);
 
                }
 
            }
 
        }
 
@@ -264,12 +275,13 @@ impl Scheduler {
 
                // let the other end of the channel know that the port has
 
                // changed location.
 
                for port in &new_connector.context.ports {
 
                    cur_connector.pending_acks += 1;
 
                    let reroute_message = cur_connector.router.prepare_reroute(
 
                        port.self_id, port.peer_id, cur_connector.context.id,
 
                        port.peer_connector, new_connector.context.id
 
                    );
 

	
 
                    self.send_message_and_wake_up_if_sleeping(port.peer_connector, reroute_message);
 
                    self.runtime.send_message(port.peer_connector, reroute_message);
 
                }
 

	
 
                // Schedule new connector to run
 
@@ -281,39 +293,40 @@ impl Scheduler {
 
        debug_assert!(delta_state.new_ports.is_empty());
 
        debug_assert!(delta_state.new_connectors.is_empty());
 
    }
 
}
 

	
 
    fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
 
        let connector = self.runtime.get_component_public(connector_id);
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
        connector.inbox.insert_message(message);
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 
struct ControlEntry {
 
    id: u32,
 
    variant: ControlVariant,
 
}
 

	
 
        if should_wake_up {
 
            let key = unsafe { ConnectorKey::from_id(connector_id) };
 
            self.runtime.push_work(key);
 
        }
 
    }
 
enum ControlVariant {
 
    ChangedPort(ControlChangedPort),
 
    ClosedChannel(ControlClosedChannel),
 
}
 

	
 
/// Represents a rerouting entry due to a moved port
 
// TODO: Optimize
 
struct ReroutedTraffic {
 
    id: u32,                        // ID of control message
 
    target_port: PortIdLocal,       // targeted port
 
struct ControlChangedPort {
 
    target_port: PortIdLocal,       // if send to this port, then reroute
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector they should be rerouted to
 
    target_connector: ConnectorId,  // connector we need to reroute to
 
}
 

	
 
struct ControlClosedChannel {
 

	
 
}
 

	
 
pub(crate) struct Router {
 
pub(crate) struct ControlMessageHandler {
 
    id_counter: u32,
 
    active: Vec<ReroutedTraffic>,
 
    active: Vec<ControlEntry>,
 
}
 

	
 
impl Router {
 
impl ControlMessageHandler {
 
    pub fn new() -> Self {
 
        Router{
 
        ControlMessageHandler {
 
            id_counter: 0,
 
            active: Vec::new(),
 
        }
 
@@ -341,7 +354,7 @@ impl Router {
 

	
 
        return Message{
 
            sending_connector: self_connector_id,
 
            receiving_port: PortIdLocal::new_invalid(),
 
            receiving_port: peer_port_id,
 
            contents: MessageContents::Control(ControlMessage{
 
                id,
 
                content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id),
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -14,7 +14,7 @@ fn runtime_for(num_threads: u32, pdl: &str) -> Runtime {
 

	
 
#[test]
 
fn test_put_and_get() {
 
    let rt = runtime_for(4, "
 
    let rt = runtime_for(1, "
 
primitive putter(out<bool> sender, u32 loops) {
 
    u32 index = 0;
 
    while (index < loops) {
0 comments (0 inline, 0 general)