Changeset - ce39b1540ff5
[Not reviewed]
0 5 0
mh - 4 years ago 2021-10-22 13:49:49
contact@maxhenger.nl
WIP on port fixing
5 files changed with 96 insertions and 39 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -4,10 +4,11 @@ use std::sync::atomic::AtomicBool;
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::scheduler::Scheduler;
 

	
 
use super::ConnectorId;
 
use super::native::Connector;
 
use super::scheduler::ConnectorCtx;
 
use super::scheduler::{SchedulerCtx, ConnectorCtx};
 
use super::inbox::{
 
    PrivateInbox, PublicInbox,
 
    DataMessage, SyncMessage, SolutionMessage, Message, MessageContents,
 
@@ -332,22 +333,53 @@ pub(crate) struct ConnectorPDL {
 
    pub ports: ConnectorPorts,
 
}
 

	
 
struct TempCtx {}
 
impl RunContext for TempCtx {
 
struct ConnectorRunContext<'a> {
 
    inbox: &'a PrivateInbox,
 
    ports: &'a ConnectorPorts,
 
    branch: &'a Branch,
 
    scheduler: SchedulerCtx<'a>,
 
}
 

	
 
impl<'a> RunContext for ConnectorRunContext<'a> {
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        todo!()
 
        if self.branch.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) {
 
            // Either acquired or released, must be silent
 
            return false;
 
        }
 

	
 
        let port_index = self.ports.get_port_index(PortIdLocal::new(port.0.u32_suffix)).unwrap();
 
        let mapping = self.ports.get_port(self.branch.index.index, port_index);
 
        return mapping.is_assigned;
 
    }
 

	
 
    fn get(&mut self, port: PortId) -> Option<ValueGroup> {
 
        todo!()
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        match self.branch.received.get(&port_id) {
 
            Some(message) => Some(message.message.clone()),
 
            None => None,
 
        }
 
    }
 

	
 
    fn fires(&mut self, port: PortId) -> Option<Value> {
 
        todo!()
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        if self.branch.ports_delta.iter().any(|v| v.port_id == port_id) {
 
            return None
 
        }
 

	
 
        let port_index = self.ports.get_port_index(port_id).unwrap();
 
        let mapping = self.ports.get_port(self.branch.index.index, port_index);
 

	
 
        if mapping.is_assigned {
 
            return Some(Value::Bool(mapping.num_times_fired != 0));
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    fn get_channel(&mut self) -> Option<(Value, Value)> {
 
        todo!()
 
        let (getter, putter) = self.scheduler.runtime.create_channel();
 
        debug_assert_eq!(getter.kind, PortKind::Getter);
 

	
 
    }
 
}
 

	
 
@@ -364,9 +396,9 @@ impl Connector for ConnectorPDL {
 
        }
 
    }
 

	
 
    fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, sched_ctx: &SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, ctx, delta_state);
 
            let scheduling = self.run_in_speculative_mode(pd, conn_ctx, delta_state);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
@@ -388,9 +420,9 @@ impl Connector for ConnectorPDL {
 

	
 
                    // Turn local solution into a message and send it along
 
                    // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, ctx);
 
                    let solution_message = self.generate_initial_solution_for_branch(branch_id, conn_ctx);
 
                    if let Some(valid_solution) = solution_message {
 
                        self.submit_sync_solution(valid_solution, ctx, delta_state);
 
                        self.submit_sync_solution(valid_solution, conn_ctx, delta_state);
 
                    } else {
 
                        // Branch is actually invalid, but we only just figured
 
                        // it out. We need to mark it as invalid to prevent
 
@@ -415,7 +447,7 @@ impl Connector for ConnectorPDL {
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, ctx, delta_state);
 
            let scheduling = self.run_in_deterministic_mode(pd, conn_ctx, delta_state);
 
            return scheduling;
 
        }
 
    }
 
@@ -694,7 +726,7 @@ impl ConnectorPDL {
 
        let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active);
 

	
 
        // Run the branch to the next blocking point
 
        let mut run_context = TempCtx{};
 
        let mut run_context = ConnectorRunContext {};
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        // Match statement contains `return` statements only if the particular
 
@@ -894,7 +926,7 @@ impl ConnectorPDL {
 
    }
 

	
 
    /// Runs the connector in non-synchronous mode.
 
    pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
    pub fn run_in_deterministic_mode(&mut self, sched_ctx: &SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        debug_assert!(!self.in_sync);
 
        debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty());
 
        debug_assert!(self.branches.len() == 1);
 
@@ -902,7 +934,11 @@ impl ConnectorPDL {
 
        let branch = &mut self.branches[0];
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = TempCtx{};
 
        let mut run_context = ConnectorRunContext{
 
            inbox: &self.inbox,
 
            ports: &self.ports,
 
            branch: &Branch {}
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, pd);
 

	
 
        match run_result {
src/runtime2/inbox.rs
Show inline comments
 
@@ -281,7 +281,7 @@ impl PrivateInbox {
 
    /// could be received by a newly encountered `get` call in a connector's
 
    /// PDL code.
 
    pub(crate) fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
 
        return InboxMessageIter{
 
        return InboxMessageIter {
 
            messages: &self.messages,
 
            next_index: 0,
 
            max_index: self.len_read,
src/runtime2/mod.rs
Show inline comments
 
@@ -24,6 +24,7 @@ use inbox::Message;
 
use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use scheduler::{Scheduler, ConnectorCtx, Router};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use crate::runtime2::port::Port;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
@@ -213,6 +214,33 @@ impl RuntimeInner {
 
        self.scheduler_notifier.notify_one();
 
    }
 

	
 
    // --- Creating ports
 

	
 
    /// Creates a new port pair. Note that these are stored globally like the
 
    /// connectors are. Ports stored by components belong to those components.
 
    pub(crate) fn create_channel(&self) -> (Port, Port) {
 
        use port::{PortIdLocal, PortKind};
 

	
 
        let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            peer_connector: self.connector_id,
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            peer_connector: self.connector_id,
 
        };
 

	
 
        return (getter_port, putter_port);
 
    }
 

	
 
    // --- Creating/retrieving/destroying components
 

	
 
    pub(crate) fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey {
 
@@ -276,11 +304,13 @@ impl RuntimeInner {
 
    #[inline]
 
    pub(crate) fn increment_active_interfaces(&self) {
 
        let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst);
 
        println!("DEBUG: Incremented active interfaces to {}", _old_num + 1);
 
        debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero
 
    }
 

	
 
    pub(crate) fn decrement_active_interfaces(&self) {
 
        let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst);
 
        println!("DEBUG: Decremented active interfaces to {}", old_num - 1);
 
        debug_assert!(old_num > 0);
 
        if old_num == 1 { // such that active interfaces is now 0
 
            let num_connectors = self.active_connectors.load(Ordering::Acquire);
 
@@ -292,11 +322,13 @@ impl RuntimeInner {
 

	
 
    #[inline]
 
    fn increment_active_components(&self) {
 
        self.active_connectors.fetch_add(1, Ordering::SeqCst);
 
        let _old_num = self.active_connectors.fetch_add(1, Ordering::SeqCst);
 
        println!("DEBUG: Incremented components to {}", _old_num + 1);
 
    }
 

	
 
    fn decrement_active_components(&self) {
 
        let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst);
 
        println!("DEBUG: Decremented components to {}", old_num - 1);
 
        debug_assert!(old_num > 0);
 
        if old_num == 0 { // such that we have no more active connectors (for now!)
 
            let num_interfaces = self.active_interfaces.load(Ordering::Acquire);
src/runtime2/native.rs
Show inline comments
 
@@ -7,6 +7,7 @@ use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx};
 
use super::scheduler::SchedulerCtx;
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::connector::{Branch, ConnectorScheduling, RunDeltaState, ConnectorPDL};
 
use super::connector::find_ports_in_value_group;
 
@@ -20,7 +21,7 @@ pub(crate) trait Connector {
 
    fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState);
 

	
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
    fn run(&mut self, sched_ctx: &SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
@@ -65,7 +66,7 @@ impl Connector for ConnectorApplication {
 
        }
 
    }
 

	
 
    fn run(&mut self, _protocol_description: &ProtocolDescription, _ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
    fn run(&mut self, _sched_ctx: &SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
@@ -111,26 +112,10 @@ impl ApplicationInterface {
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
        // TODO: Duplicated logic in scheduler
 
        let getter_id = self.runtime.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        // Create ports and add a job such that they are transferred to the
 
        // API component. (note that we do not send a ping, this is only
 
        // necessary once we create a connector)
 
        let getter_port = Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            peer_connector: self.connector_id,
 
        };
 
        let putter_port = Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            peer_connector: self.connector_id,
 
        };
 
        let (getter_port, putter_port) = self.runtime.create_channel();
 
        debug_assert_eq!(getter_port.kind, PortKind::Getter);
 
        let getter_id = getter_port.self_id;
 
        let putter_id = putter_port.self_id;
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
src/runtime2/scheduler.rs
Show inline comments
 
@@ -53,6 +53,10 @@ impl ConnectorCtx {
 
    }
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub(crate) runtime: &'a RuntimeInner
 
}
 

	
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
0 comments (0 inline, 0 general)