diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 3f75f4875bc9236146b9d7e95b5b23d8769bf670..b6c1c4b3d0221c6223fe21e9be93bfde25cd7a9e 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; +use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::inbox::{Inbox, OutboxMessage}; +use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage}; use crate::runtime2::port::PortIdLocal; /// Represents the identifier of a branch (the index within its container). An @@ -51,14 +52,14 @@ pub(crate) struct Branch { sync_state: SpeculativeState, next_branch_in_queue: Option, // Message/port state - inbox: HashMap, // TODO: @temporary, remove together with fires() + inbox: HashMap, // TODO: @temporary, remove together with fires() ports_delta: Vec, } impl Branch { /// Constructs a non-sync branch. It is assumed that the code is at the /// first instruction - fn new_initial_branch(component_state: ComponentState) -> Self { + pub(crate) fn new_initial_branch(component_state: ComponentState) -> Self { Branch{ index: BranchId::new_invalid(), parent_index: BranchId::new_invalid(), @@ -135,13 +136,13 @@ enum PortOwnershipError { /// As the name implies, this contains a description of the ports associated /// with a connector. /// TODO: Extend documentation -struct ConnectorPorts { +pub(crate) struct ConnectorPorts { // Essentially a mapping from `port_index` to `port_id`. - owned_ports: Vec, + pub owned_ports: Vec, // Contains P*B entries, where P is the number of ports and B is the number // of branches. One can find the appropriate mapping of port p at branch b // at linear index `b*P+p`. - port_mapping: Vec + pub port_mapping: Vec } impl ConnectorPorts { @@ -246,22 +247,23 @@ impl BranchQueue { } /// Public fields of the connector that can be freely shared between multiple -/// threads. Note that this is not enforced by the compiler. The global store -/// allows retrieving the entire `Connector` as a mutable reference by one -/// thread, and this `ConnectorPublic` by any number of threads. +/// threads. pub(crate) struct ConnectorPublic { - pub inbox: Inbox, + pub inbox: PublicInbox, + pub sleeping: AtomicBool, } impl ConnectorPublic { pub fn new() -> Self { ConnectorPublic{ - inbox: Inbox::new(), + inbox: PublicInbox::new(), + sleeping: AtomicBool::new(false), } } } // TODO: Maybe prevent false sharing by aligning `public` to next cache line. +// TODO: Do this outside of the connector, create a wrapping struct pub(crate) struct Connector { // State and properties of connector itself id: u32, @@ -272,8 +274,8 @@ pub(crate) struct Connector { sync_pending_get: BranchQueue, sync_finished: BranchQueue, // Port/message management + pub inbox: PrivateInbox, pub ports: ConnectorPorts, - pub public: ConnectorPublic, } struct TempCtx {} @@ -307,8 +309,8 @@ impl Connector { sync_active: BranchQueue::new(), sync_pending_get: BranchQueue::new(), sync_finished: BranchQueue::new(), + inbox: PrivateInbox::new(), ports: ConnectorPorts::new(owned_ports), - public: ConnectorPublic::new(), } } @@ -471,7 +473,7 @@ impl Connector { // Put in run results for thread to pick up and transfer to // the correct connector inbox. port_mapping.mark_definitive(branch.index, 1); - let message = OutboxMessage { + let message = OutgoingMessage { sending_port: local_port_id, sender_prev_branch_id: BranchId::new_invalid(), sender_cur_branch_id: branch.index, @@ -709,7 +711,7 @@ impl Connector { pub(crate) struct RunDeltaState { // Variables that allow the thread running the connector to pick up global // state changes and try to apply them. - pub outbox: Vec, + pub outbox: Vec, pub new_connectors: Vec, // Workspaces pub ports: Vec, @@ -736,7 +738,7 @@ pub(crate) enum ConnectorScheduling { /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. -fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value {