diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 5769144ca08a4f4f213f7b21801117d11f3cf8da..e951d0d9c29ff1c2f208c3dccd1df388d9d9b54a 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,10 +4,11 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; +use crate::runtime2::scheduler::Scheduler; use super::ConnectorId; use super::native::Connector; -use super::scheduler::ConnectorCtx; +use super::scheduler::{SchedulerCtx, ConnectorCtx}; use super::inbox::{ PrivateInbox, PublicInbox, DataMessage, SyncMessage, SolutionMessage, Message, MessageContents, @@ -332,22 +333,53 @@ pub(crate) struct ConnectorPDL { pub ports: ConnectorPorts, } -struct TempCtx {} -impl RunContext for TempCtx { +struct ConnectorRunContext<'a> { + inbox: &'a PrivateInbox, + ports: &'a ConnectorPorts, + branch: &'a Branch, + scheduler: SchedulerCtx<'a>, +} + +impl<'a> RunContext for ConnectorRunContext<'a> { fn did_put(&mut self, port: PortId) -> bool { - todo!() + if self.branch.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) { + // Either acquired or released, must be silent + return false; + } + + let port_index = self.ports.get_port_index(PortIdLocal::new(port.0.u32_suffix)).unwrap(); + let mapping = self.ports.get_port(self.branch.index.index, port_index); + return mapping.is_assigned; } fn get(&mut self, port: PortId) -> Option { - todo!() + let port_id = PortIdLocal::new(port.0.u32_suffix); + match self.branch.received.get(&port_id) { + Some(message) => Some(message.message.clone()), + None => None, + } } fn fires(&mut self, port: PortId) -> Option { - todo!() + let port_id = PortIdLocal::new(port.0.u32_suffix); + if self.branch.ports_delta.iter().any(|v| v.port_id == port_id) { + return None + } + + let port_index = self.ports.get_port_index(port_id).unwrap(); + let mapping = self.ports.get_port(self.branch.index.index, port_index); + + if mapping.is_assigned { + return Some(Value::Bool(mapping.num_times_fired != 0)); + } else { + return None; + } } fn get_channel(&mut self) -> Option<(Value, Value)> { - todo!() + let (getter, putter) = self.scheduler.runtime.create_channel(); + debug_assert_eq!(getter.kind, PortKind::Getter); + } } @@ -364,9 +396,9 @@ impl Connector for ConnectorPDL { } } - fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, sched_ctx: &SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { - let scheduling = self.run_in_speculative_mode(pd, ctx, delta_state); + let scheduling = self.run_in_speculative_mode(pd, conn_ctx, delta_state); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. @@ -388,9 +420,9 @@ impl Connector for ConnectorPDL { // Turn local solution into a message and send it along // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed - let solution_message = self.generate_initial_solution_for_branch(branch_id, ctx); + let solution_message = self.generate_initial_solution_for_branch(branch_id, conn_ctx); if let Some(valid_solution) = solution_message { - self.submit_sync_solution(valid_solution, ctx, delta_state); + self.submit_sync_solution(valid_solution, conn_ctx, delta_state); } else { // Branch is actually invalid, but we only just figured // it out. We need to mark it as invalid to prevent @@ -415,7 +447,7 @@ impl Connector for ConnectorPDL { return scheduling; } else { - let scheduling = self.run_in_deterministic_mode(pd, ctx, delta_state); + let scheduling = self.run_in_deterministic_mode(pd, conn_ctx, delta_state); return scheduling; } } @@ -694,7 +726,7 @@ impl ConnectorPDL { let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active); // Run the branch to the next blocking point - let mut run_context = TempCtx{}; + let mut run_context = ConnectorRunContext {}; let run_result = branch.code_state.run(&mut run_context, pd); // Match statement contains `return` statements only if the particular @@ -894,7 +926,7 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, sched_ctx: &SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -902,7 +934,11 @@ impl ConnectorPDL { let branch = &mut self.branches[0]; debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); - let mut run_context = TempCtx{}; + let mut run_context = ConnectorRunContext{ + inbox: &self.inbox, + ports: &self.ports, + branch: &Branch {} + }; let run_result = branch.code_state.run(&mut run_context, pd); match run_result { diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index b78e79ec1fbdf67be2196a15debcdffcf34f6ce9..76970d1e2f9c3558f0e82ab1b8cb222cae795f4d 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -281,7 +281,7 @@ impl PrivateInbox { /// could be received by a newly encountered `get` call in a connector's /// PDL code. pub(crate) fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter { - return InboxMessageIter{ + return InboxMessageIter { messages: &self.messages, next_index: 0, max_index: self.len_read, diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 5238d2ca54a24635e0ad21cfc077131084f531f1..fd5a8c1f4a1ae39e5134f103a1bcb423e059a189 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -24,6 +24,7 @@ use inbox::Message; use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; use scheduler::{Scheduler, ConnectorCtx, Router}; use native::{Connector, ConnectorApplication, ApplicationInterface}; +use crate::runtime2::port::Port; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this @@ -213,6 +214,33 @@ impl RuntimeInner { self.scheduler_notifier.notify_one(); } + // --- Creating ports + + /// Creates a new port pair. Note that these are stored globally like the + /// connectors are. Ports stored by components belong to those components. + pub(crate) fn create_channel(&self) -> (Port, Port) { + use port::{PortIdLocal, PortKind}; + + let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); + let putter_id = PortIdLocal::new(getter_id + 1); + let getter_id = PortIdLocal::new(getter_id); + + let getter_port = Port{ + self_id: getter_id, + peer_id: putter_id, + kind: PortKind::Getter, + peer_connector: self.connector_id, + }; + let putter_port = Port{ + self_id: putter_id, + peer_id: getter_id, + kind: PortKind::Putter, + peer_connector: self.connector_id, + }; + + return (getter_port, putter_port); + } + // --- Creating/retrieving/destroying components pub(crate) fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { @@ -276,11 +304,13 @@ impl RuntimeInner { #[inline] pub(crate) fn increment_active_interfaces(&self) { let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); + println!("DEBUG: Incremented active interfaces to {}", _old_num + 1); debug_assert_ne!(_old_num, 0); // once it hits 0, it stays zero } pub(crate) fn decrement_active_interfaces(&self) { let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); + println!("DEBUG: Decremented active interfaces to {}", old_num - 1); debug_assert!(old_num > 0); if old_num == 1 { // such that active interfaces is now 0 let num_connectors = self.active_connectors.load(Ordering::Acquire); @@ -292,11 +322,13 @@ impl RuntimeInner { #[inline] fn increment_active_components(&self) { - self.active_connectors.fetch_add(1, Ordering::SeqCst); + let _old_num = self.active_connectors.fetch_add(1, Ordering::SeqCst); + println!("DEBUG: Incremented components to {}", _old_num + 1); } fn decrement_active_components(&self) { let old_num = self.active_connectors.fetch_sub(1, Ordering::SeqCst); + println!("DEBUG: Decremented components to {}", old_num - 1); debug_assert!(old_num > 0); if old_num == 0 { // such that we have no more active connectors (for now!) let num_interfaces = self.active_interfaces.load(Ordering::Acquire); diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 0e877c36449aab3519e0a4aba53bf24894f62b0f..5eab60d0def9ecf32d6275a1a1c52568d19bf5cf 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -7,6 +7,7 @@ use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx}; +use super::scheduler::SchedulerCtx; use super::port::{Port, PortIdLocal, Channel, PortKind}; use super::connector::{Branch, ConnectorScheduling, RunDeltaState, ConnectorPDL}; use super::connector::find_ports_in_value_group; @@ -20,7 +21,7 @@ pub(crate) trait Connector { fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); /// Should run the connector's behaviour up until the next blocking point. - fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; + fn run(&mut self, sched_ctx: &SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; @@ -65,7 +66,7 @@ impl Connector for ConnectorApplication { } } - fn run(&mut self, _protocol_description: &ProtocolDescription, _ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, _sched_ctx: &SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop_front() { match job { @@ -111,26 +112,10 @@ impl ApplicationInterface { /// Creates a new channel. pub fn create_channel(&mut self) -> Channel { - // TODO: Duplicated logic in scheduler - let getter_id = self.runtime.port_counter.fetch_add(2, Ordering::SeqCst); - let putter_id = PortIdLocal::new(getter_id + 1); - let getter_id = PortIdLocal::new(getter_id); - - // Create ports and add a job such that they are transferred to the - // API component. (note that we do not send a ping, this is only - // necessary once we create a connector) - let getter_port = Port{ - self_id: getter_id, - peer_id: putter_id, - kind: PortKind::Getter, - peer_connector: self.connector_id, - }; - let putter_port = Port{ - self_id: putter_id, - peer_id: getter_id, - kind: PortKind::Putter, - peer_connector: self.connector_id, - }; + let (getter_port, putter_port) = self.runtime.create_channel(); + debug_assert_eq!(getter_port.kind, PortKind::Getter); + let getter_id = getter_port.self_id; + let putter_id = putter_port.self_id; { let mut lock = self.job_queue.lock().unwrap(); diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index f645cc0572f9149185f1a0db6403a2dab0253f9a..e9680d4ed5cb218f9167365de80b67554db3b3fc 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -53,6 +53,10 @@ impl ConnectorCtx { } } +pub(crate) struct SchedulerCtx<'a> { + pub(crate) runtime: &'a RuntimeInner +} + pub(crate) struct Scheduler { runtime: Arc, scheduler_id: u32,