diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index fff2fa8e0b618d4bcec92ef1469161811efcc3eb..27d25194742f8f775b517ce22d62026124c0a626 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -6,11 +6,12 @@ use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use super::{ConnectorKey, ConnectorId, RuntimeInner}; -use super::scheduler::{SchedulerCtx, ComponentCtxFancy, ReceivedMessage}; +use super::scheduler::{SchedulerCtx, ComponentCtxFancy}; use super::port::{Port, PortIdLocal, Channel, PortKind}; -use super::connector::{Branch, ConnectorScheduling, ConnectorPDL}; -use super::connector::find_ports_in_value_group; -use super::inbox::{Message, MessageContents}; +use super::branch::{Branch}; +use super::consensus::find_ports_in_value_group; +use super::connector2::{ConnectorScheduling, ConnectorPDL}; +use super::inbox2::{MessageFancy, ControlContent, ControlMessageFancy}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { @@ -26,7 +27,7 @@ type JobQueue = Arc>>; enum ApplicationJob { NewChannel((Port, Port)), - NewConnector(ConnectorPDL), + NewConnector(ConnectorPDL, Vec), Shutdown, } @@ -57,10 +58,9 @@ impl Connector for ConnectorApplication { // Handle any incoming messages if we're participating in a round while let Some(message) = comp_ctx.read_next_message() { match message { - ReceivedMessage::Data(_) => todo!("data message in API connector"), - ReceivedMessage::Sync(_) | ReceivedMessage::RequestCommit(_) | ReceivedMessage::ConfirmCommit(_) => { - todo!("sync message in API connector"); - } + MessageFancy::Data(_) => todo!("data message in API connector"), + MessageFancy::Sync(_) => todo!("sync message in API connector"), + MessageFancy::Control(_) => todo!("impossible control message"), } } @@ -74,9 +74,9 @@ impl Connector for ConnectorApplication { comp_ctx.push_port(endpoint_a); comp_ctx.push_port(endpoint_b); } - ApplicationJob::NewConnector(connector) => { + ApplicationJob::NewConnector(connector, initial_ports) => { println!("DEBUG: API creating connector"); - comp_ctx.push_component(connector); + comp_ctx.push_component(connector, initial_ports); }, ApplicationJob::Shutdown => { debug_assert!(queue.is_empty()); @@ -139,26 +139,25 @@ impl ApplicationInterface { // asynchronously. let mut initial_ports = Vec::new(); find_ports_in_value_group(&arguments, &mut initial_ports); - for port_to_remove in &initial_ports { - match self.owned_ports.iter().position(|v| v == port_to_remove) { - Some(index_to_remove) => { - // We own the port, so continue - self.owned_ports.remove(index_to_remove); - }, - None => { - // We don't own the port - return Err(ComponentCreationError::UnownedPort); - } + for initial_port in &initial_ports { + if !self.owned_ports.iter().any(|v| v == initial_port) { + return Err(ComponentCreationError::UnownedPort); } } + // We own all ports, so remove them on this side + for initial_port in &initial_ports { + let position = self.owned_ports.iter().position(|v| *v == initial_port).unwrap(); + self.owned_ports.remove(position); + } + let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?; - let connector = ConnectorPDL::new(Branch::new_initial_branch(state), initial_ports); + let connector = ConnectorPDL::new(state); // Put on job queue { let mut queue = self.job_queue.lock().unwrap(); - queue.push_back(ApplicationJob::NewConnector(connector)); + queue.push_back(ApplicationJob::NewConnector(connector, initial_ports)); } self.wake_up_connector_with_ping(); @@ -187,11 +186,11 @@ impl ApplicationInterface { fn wake_up_connector_with_ping(&self) { let connector = self.runtime.get_component_public(self.connector_id); - connector.inbox.insert_message(Message{ - sending_connector: ConnectorId::new_invalid(), - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Ping, - }); + connector.inbox.insert_message(MessageFancy::Control(ControlMessageFancy{ + id: 0, + sending_component_id: self.connector_id, + content: ControlContent::Ack + })); let should_wake_up = connector.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)