Changeset - 26d47db4f922
[Not reviewed]
0 4 0
mh - 4 years ago 2021-10-19 12:29:52
contact@maxhenger.nl
WIP on second rewrite of port management
4 files changed with 15 insertions and 11 deletions:
0 comments (0 inline, 0 general)
src/runtime2/global_store.rs
Show inline comments
 
@@ -117,92 +117,93 @@ impl ConnectorStore {
 
    /// Retrieves the shared members of the connector.
 
    pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get(connector_id.0 as usize);
 
            debug_assert!(!connector.is_null());
 
            return &*connector.public;
 
        }
 
    }
 

	
 
    /// Retrieves a particular connector. Only the thread that pulled the
 
    /// associated key out of the execution queue should (be able to) call this.
 
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return *connector as &mut _;
 
        }
 
    }
 

	
 
    /// Create a new connector, returning the key that can be used to retrieve
 
    /// and/or queue it.
 
    pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant) -> ConnectorKey {
 
    /// and/or queue it. The caller must make sure that the constructed
 
    /// connector's code is initialized with the same ports as the ports in the
 
    /// `initial_ports` array.
 
    pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant, initial_ports: Vec<Port>) -> ConnectorKey {
 
        // Creation of the connector in the global store, requires a lock
 
        {
 
            let lock = self.inner.write().unwrap();
 
            let connector = ScheduledConnector {
 
                connector,
 
                context: ConnectorCtx::new(self.port_counter.clone()),
 
                public: ConnectorPublic::new(),
 
                router: Router::new(),
 
            };
 

	
 
            let index;
 
            if lock.free.is_empty() {
 
                let connector = Box::into_raw(Box::new(connector));
 

	
 
                unsafe {
 
                    // Cheating a bit here. Anyway, move to heap, store in list
 
                    index = lock.connectors.len();
 
                    lock.connectors.push(connector);
 
                }
 
            } else {
 
                index = lock.free.pop().unwrap();
 

	
 
                unsafe {
 
                    let target = lock.connectors.get_mut(index);
 
                    debug_assert!(!target.is_null());
 
                    ptr::write(*target, connector);
 
                }
 
            }
 
        }
 

	
 
        // Setting of new connector's ID
 
        let key = ConnectorKey{ index: index as u32 };
 
        let new_connector = self.get_mut(&key);
 
        new_connector.context.id = key.downcast();
 

	
 
        // Transferring ownership of ports (and crashing if there is a
 
        // programmer's mistake in port management)
 
        match &new_connector.connector {
 
            ConnectorVariant::UserDefined(connector) => {
 
                for port_id in &connector.ports.owned_ports {
 
                    let mut port = created_by.context.remove_port(*port_id);
 
                    port.owning_connector = new_connector.context.id;
 
                    new_connector.context.add_port(port);
 
                }
 
            },
 
            ConnectorVariant::Native(_) => {}, // no initial ports (yet!)
 
        }
 

	
 
        return key;
 
    }
 

	
 
    pub(crate) fn destroy(&self, key: ConnectorKey) {
 
        let lock = self.inner.write().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get_mut(key.index as usize);
 
            ptr::drop_in_place(*connector);
 
            // Note: but not deallocating!
 
        }
 

	
 
        lock.free.push(key.index as usize);
 
    }
 
}
 

	
 
impl Drop for ConnectorStore {
 
    fn drop(&mut self) {
src/runtime2/native.rs
Show inline comments
 
@@ -59,76 +59,82 @@ impl Connector for ConnectorApplication {
 
        todo!("handling messages in ConnectorApplication (API for runtime)")
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop() {
 
            match job {
 
                ApplicationJob::NewConnector(connector) => {
 
                    delta_state.new_connectors.push(connector);
 
                }
 
            }
 
        }
 

	
 
        return ConnectorScheduling::NotNow;
 
    }
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<PortIdLocal>,
 
    owned_ports: Vec<Port>,
 
}
 

	
 
impl ApplicationInterface {
 
    pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner1>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
        // TODO: Duplicated logic in scheduler
 
        let getter_id = self.runtime.global_store.connectors.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        self.ports.push(Port{
 
        self.owned_ports.push(Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            owning_connector: self.connector_id,
 
            peer_connector: self.connector_id,
 
        });
 

	
 
        return channel;
 
        self.owned_ports.push(Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            peer_connector: self.connector_id,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Creates a new connector. Note that it is not scheduled immediately, but
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    // TODO: Optimize by yanking out scheduler logic for common use.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for port_to_remove in &initial_ports {
 
            match self.owned_ports.iter().position(|v| v == port_to_remove) {
 
                Some(index_to_remove) => {
 
                    // We own the port, so continue
 
                    self.owned_ports.remove(index_to_remove)
 
                },
 
                None => {
 
                    // We don't own the port
 
                    return Err(ComponentCreationError::UnownedPort);
 
                }
 
            }
 
        }
src/runtime2/port.rs
Show inline comments
 
@@ -12,35 +12,34 @@ impl PortIdLocal {
 

	
 
    // TODO: Unsure about this, maybe remove, then also remove all struct
 
    //  instances where I call this
 
    pub fn new_invalid() -> Self {
 
        Self{ index: u32::MAX }
 
    }
 

	
 
    pub fn is_valid(&self) -> bool {
 
        return self.index != u32::MAX;
 
    }
 
}
 

	
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
/// Represents a port inside of the runtime. May be without owner if it is
 
/// created by the application interfacing with the runtime, instead of being
 
/// created by a connector.
 
pub struct Port {
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub kind: PortKind,
 
    pub owning_connector: ConnectorId,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase.
 
}
 

	
 

	
 

	
 
// TODO: Turn port ID into its own type
 
pub struct Channel {
 
    pub putter_id: PortIdLocal, // can put on it, so from the connector's point of view, this is an output
 
    pub getter_id: PortIdLocal, // vice versa: can get on it, so an input for the connector
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
@@ -8,72 +8,70 @@ use crate::ProtocolDescription;
 
use crate::runtime2::global_store::ConnectorVariant;
 
use crate::runtime2::inbox::MessageContents;
 
use crate::runtime2::native::Connector;
 
use crate::runtime2::port::{Channel, PortKind, PortOwnership};
 

	
 
use super::RuntimeInner;
 
use super::port::{Port, PortIdLocal};
 
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
 
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};
 

	
 
/// Contains fields that are mostly managed by the scheduler, but may be
 
/// accessed by the connector
 
pub(crate) struct ConnectorCtx {
 
    pub(crate) id: ConnectorId,
 
    port_counter: Arc<AtomicU32>,
 
    pub(crate) ports: Vec<Port>,
 
}
 

	
 
impl ConnectorCtx {
 
    pub(crate) fn new(port_counter: Arc<AtomicU32>) -> ConnectorCtx {
 
        Self{
 
            id: ConnectorId::new_invalid(),
 
            port_counter,
 
            ports: Vec::new(),
 
            ports: initial_ports,
 
        }
 
    }
 

	
 
    /// Creates a (putter, getter) port pair belonging to the same channel. The
 
    /// port will be implicitly owned by the connector.
 
    pub(crate) fn create_channel(&mut self) -> Channel {
 
        let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst);
 
        let putter_id = PortIdLocal::new(getter_id + 1);
 
        let getter_id = PortIdLocal::new(getter_id);
 

	
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            owning_connector: self.id,
 
            peer_connector: self.id,
 
        });
 

	
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            owning_connector: self.id,
 
            peer_connector: self.id,
 
        });
 

	
 
        return Channel{ getter_id, putter_id };
 
    }
 

	
 
    pub(crate) fn add_port(&mut self, port: Port) {
 
        debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id));
 
        self.ports.push(port);
 
    }
 

	
 
    pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port {
 
        let index = self.port_id_to_index(id);
 
        return self.ports.remove(index);
 
    }
 

	
 
    pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port {
 
        let index = self.port_id_to_index(id);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port {
 
        let index = self.port_id_to_index(id);
 
        return &mut self.ports[index];
0 comments (0 inline, 0 general)