diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index f91ec36c5879a3eab121b373a0bfa3acd8a0dd7e..fff2fa8e0b618d4bcec92ef1469161811efcc3eb 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -4,25 +4,21 @@ use std::sync::atomic::Ordering; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; -use crate::ProtocolDescription; -use crate::runtime2::scheduler::ComponentCtxFancy; -use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx}; -use super::scheduler::SchedulerCtx; +use super::{ConnectorKey, ConnectorId, RuntimeInner}; +use super::scheduler::{SchedulerCtx, ComponentCtxFancy, ReceivedMessage}; use super::port::{Port, PortIdLocal, Channel, PortKind}; -use super::connector::{Branch, ConnectorScheduling, RunDeltaState, ConnectorPDL}; +use super::connector::{Branch, ConnectorScheduling, ConnectorPDL}; use super::connector::find_ports_in_value_group; use super::inbox::{Message, MessageContents}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { - /// Handle a new message (preprocessed by the scheduler). You probably only - /// want to handle `Data`, `Sync`, and `Solution` messages. The others are - /// intended for the scheduler itself. - fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); - /// Should run the connector's behaviour up until the next blocking point. - fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; + /// One should generally request and handle new messages from the component + /// context. Then perform any logic the component has to do, and in the + /// process perhaps queue up some state changes using the same context. + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; @@ -46,7 +42,10 @@ impl ConnectorApplication { let sync_done = Arc::new(( Mutex::new(false), Condvar::new() )); let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32))); - let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() }; + let connector = ConnectorApplication { + sync_done: sync_done.clone(), + job_queue: job_queue.clone() + }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); return (connector, interface); @@ -54,36 +53,35 @@ impl ConnectorApplication { } impl Connector for ConnectorApplication { - fn handle_message(&mut self, message: Message, _ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) { - use MessageContents as MC; - - match message.contents { - MC::Data(_) => unreachable!("data message in API connector"), - MC::Sync(_) | MC::RequestCommit(_) | MC::ConfirmCommit(_) => { - // Handling sync in API - }, - MC::Control(_) => {}, - MC::Ping => {}, + fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + // Handle any incoming messages if we're participating in a round + while let Some(message) = comp_ctx.read_next_message() { + match message { + ReceivedMessage::Data(_) => todo!("data message in API connector"), + ReceivedMessage::Sync(_) | ReceivedMessage::RequestCommit(_) | ReceivedMessage::ConfirmCommit(_) => { + todo!("sync message in API connector"); + } + } } - } - fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { - let mut queue = self.job_queue.lock().unwrap(); - while let Some(job) = queue.pop_front() { - match job { - ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { - println!("DEBUG: API adopting ports"); - delta_state.new_ports.reserve(2); - delta_state.new_ports.push(endpoint_a); - delta_state.new_ports.push(endpoint_b); - } - ApplicationJob::NewConnector(connector) => { - println!("DEBUG: API creating connector"); - delta_state.new_connectors.push(connector); - }, - ApplicationJob::Shutdown => { - debug_assert!(queue.is_empty()); - return ConnectorScheduling::Exit; + // Handle requests coming from the API + { + let mut queue = self.job_queue.lock().unwrap(); + while let Some(job) = queue.pop_front() { + match job { + ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { + println!("DEBUG: API adopting ports"); + comp_ctx.push_port(endpoint_a); + comp_ctx.push_port(endpoint_b); + } + ApplicationJob::NewConnector(connector) => { + println!("DEBUG: API creating connector"); + comp_ctx.push_component(connector); + }, + ApplicationJob::Shutdown => { + debug_assert!(queue.is_empty()); + return ConnectorScheduling::Exit; + } } } }