diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 518df447c90e662382a0aaa57b9afc856d5eada9..3f75f4875bc9236146b9d7e95b5b23d8769bf670 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -3,24 +3,8 @@ use std::collections::HashMap; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::inbox::{Inbox, Message}; - -#[derive(Clone, Copy, PartialEq, Eq)] -pub(crate) struct PortIdLocal { - pub id: u32, -} - -impl PortIdLocal { - pub fn new(id: u32) -> Self { - Self{ id } - } - - // TODO: Unsure about this, maybe remove, then also remove all struct - // instances where I call this - pub fn new_invalid() -> Self { - Self{ id: u32::MAX } - } -} +use crate::runtime2::inbox::{Inbox, OutboxMessage}; +use crate::runtime2::port::PortIdLocal; /// Represents the identifier of a branch (the index within its container). An /// ID of `0` generally means "no branch" (e.g. no parent, or a port did not @@ -67,7 +51,7 @@ pub(crate) struct Branch { sync_state: SpeculativeState, next_branch_in_queue: Option, // Message/port state - inbox: HashMap, // TODO: @temporary, remove together with fires() + inbox: HashMap, // TODO: @temporary, remove together with fires() ports_delta: Vec, } @@ -261,6 +245,23 @@ impl BranchQueue { } } +/// Public fields of the connector that can be freely shared between multiple +/// threads. Note that this is not enforced by the compiler. The global store +/// allows retrieving the entire `Connector` as a mutable reference by one +/// thread, and this `ConnectorPublic` by any number of threads. +pub(crate) struct ConnectorPublic { + pub inbox: Inbox, +} + +impl ConnectorPublic { + pub fn new() -> Self { + ConnectorPublic{ + inbox: Inbox::new(), + } + } +} + +// TODO: Maybe prevent false sharing by aligning `public` to next cache line. pub(crate) struct Connector { // State and properties of connector itself id: u32, @@ -271,8 +272,8 @@ pub(crate) struct Connector { sync_pending_get: BranchQueue, sync_finished: BranchQueue, // Port/message management - ports: ConnectorPorts, - inbox: Inbox, + pub ports: ConnectorPorts, + pub public: ConnectorPublic, } struct TempCtx {} @@ -307,7 +308,7 @@ impl Connector { sync_pending_get: BranchQueue::new(), sync_finished: BranchQueue::new(), ports: ConnectorPorts::new(owned_ports), - inbox: Inbox::new(), + public: ConnectorPublic::new(), } } @@ -441,7 +442,7 @@ impl Connector { }, RunResult::BranchPut(port_id, value_group) => { // Branch performed a `put` on a particualar port. - let local_port_id = PortIdLocal{ id: port_id.0.u32_suffix }; + let local_port_id = PortIdLocal{ index: port_id.0.u32_suffix }; let local_port_index = self.ports.get_port_index(local_port_id); if local_port_index.is_none() { todo!("handle case where port was received before (i.e. in ports_delta)") @@ -470,9 +471,8 @@ impl Connector { // Put in run results for thread to pick up and transfer to // the correct connector inbox. port_mapping.mark_definitive(branch.index, 1); - let message = Message{ + let message = OutboxMessage { sending_port: local_port_id, - receiving_port: PortIdLocal::new_invalid(), sender_prev_branch_id: BranchId::new_invalid(), sender_cur_branch_id: branch.index, message: value_group, @@ -709,7 +709,7 @@ impl Connector { pub(crate) struct RunDeltaState { // Variables that allow the thread running the connector to pick up global // state changes and try to apply them. - pub outbox: Vec, + pub outbox: Vec, pub new_connectors: Vec, // Workspaces pub ports: Vec,