diff --git a/src/collections/sets.rs b/src/collections/sets.rs index 8f36093cb95fd90b450665ad0c501aff96abd1aa..d2ce409d7f0291e643c46b987804d5a8eb57679d 100644 --- a/src/collections/sets.rs +++ b/src/collections/sets.rs @@ -89,7 +89,7 @@ impl VecSet { } #[inline] - pub fn iter(&self) -> impl Iterator { + pub fn iter(&self) -> impl Iterator { return self.inner.iter(); } diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index cb605fd86e7df443c8f41bf660a1d411bebcd52c..8f156102d6828c75c8a77f8b7bdb5dc15b3f46c8 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -283,7 +283,7 @@ impl ExecTree { /// using the provided branch as the final sync result. pub fn end_sync(&mut self, branch_id: BranchId) { debug_assert!(self.is_in_sync()); - debug_assert!(self.iter_queue(QueueKind::FinishedSync).any(|v| v.id == branch_id)); + debug_assert!(self.iter_queue(QueueKind::FinishedSync, None).any(|v| v.id == branch_id)); // Swap indicated branch into the first position self.branches.swap(0, branch_id.index as usize); @@ -294,7 +294,10 @@ impl ExecTree { branch.id = BranchId::new_invalid(); branch.parent_id = BranchId::new_invalid(); branch.sync_state = SpeculativeState::RunningNonSync; + debug_assert!(!branch.awaiting_port.is_valid()); branch.next_in_queue = BranchId::new_invalid(); + branch.inbox.clear(); + debug_assert!(branch.prepared_channel.is_none()); // Clear out all the queues for queue_idx in 0..NUM_QUEUES { diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs index 049eaa42781c38608098e5b5a1a454456a5f142f..265ef4f799deac03885e89be24fe072935123a7b 100644 --- a/src/runtime2/connector2.rs +++ b/src/runtime2/connector2.rs @@ -38,8 +38,7 @@ use crate::runtime2::port::PortKind; use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency}; -use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy}; -use super::inbox::PublicInbox; +use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy, PublicInbox}; use super::native::Connector; use super::port::PortIdLocal; use super::scheduler::{ComponentCtxFancy, SchedulerCtx}; @@ -79,7 +78,7 @@ struct ConnectorRunContext<'a> { prepared_channel: Option<(Value, Value)>, } -impl RunContext for ConnectorRunContext{ +impl<'a> RunContext for ConnectorRunContext<'a>{ fn did_put(&mut self, port: PortId) -> bool { let port_id = PortIdLocal::new(port.0.u32_suffix); let annotation = self.consensus.get_annotation(self.branch_id, port_id); @@ -110,7 +109,10 @@ impl Connector for ConnectorPDL { self.handle_new_messages(comp_ctx); if self.tree.is_in_sync() { let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); - self.consensus.handle_new_finished_sync_branches(); + if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { + todo!("call handler"); + } + return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); @@ -132,8 +134,8 @@ impl ConnectorPDL { pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) { while let Some(message) = ctx.read_next_message() { match message { - MessageFancy::Data(message) => handle_new_data_message(message, ctx), - MessageFancy::Sync(message) => handle_new_sync_message(message, ctx), + MessageFancy::Data(message) => self.handle_new_data_message(message, ctx), + MessageFancy::Sync(message) => self.handle_new_sync_message(message, ctx), MessageFancy::Control(_) => unreachable!("control message in component"), } } @@ -143,8 +145,7 @@ impl ConnectorPDL { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. debug_assert!(ctx.workspace_branches.is_empty()); - self.consensus.handle_received_sync_header(&message.sync_header, ctx); - self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut ctx.workspace_branches); + self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut ctx.workspace_branches); for branch_id in ctx.workspace_branches.drain(..) { // This branch can receive, so fork and given it the message @@ -161,8 +162,9 @@ impl ConnectorPDL { } pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) { - self.consensus.handle_received_sync_header(&message.sync_header, ctx); - self.consensus.handle_received_sync_message(message, ctx); + if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { + + } } // --- Running code @@ -303,7 +305,7 @@ impl ConnectorPDL { debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); let mut run_context = ConnectorRunContext{ - branch_id, + branch_id: branch.id, consensus: &self.consensus, received: &branch.inbox, scheduler: *sched_ctx, @@ -320,7 +322,7 @@ impl ConnectorPDL { RunResult::ComponentAtSyncStart => { let current_ports = comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); - self.consensus.start_sync(current_ports); + self.consensus.start_sync(current_ports, comp_ctx); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); return ConnectorScheduling::Immediate; @@ -361,4 +363,17 @@ impl ConnectorPDL { _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), } } + + pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtxFancy) { + let mut fake_vec = Vec::new(); + self.tree.end_sync(solution_branch_id); + self.consensus.end_sync(solution_branch_id, &mut fake_vec); + + for port in fake_vec { + // TODO: Handle sent/received ports + debug_assert!(ctx.get_port_by_id(port).is_some()); + } + + ctx.notify_sync_end(&[]); + } } \ No newline at end of file diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 234cf968bd19d1c801185f1b1303a503a2af521d..b65ea87882def537dd7a4ad334e7c65a9a87f79c 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,14 +1,14 @@ -use std::path::Component; -use std::str::pattern::Pattern; use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; -use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; -use crate::runtime2::ConnectorId; -use crate::runtime2::inbox2::{DataHeader, DataMessageFancy, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy}; -use crate::runtime2::inbox::SyncMessage; -use crate::runtime2::port::{ChannelId, Port, PortIdLocal}; -use crate::runtime2::scheduler::ComponentCtxFancy; -use super::inbox2::PortAnnotation; + +use super::branch::{BranchId, ExecTree, QueueKind}; +use super::ConnectorId; +use super::port::{ChannelId, Port, PortIdLocal}; +use super::inbox2::{ + DataHeader, DataMessageFancy, MessageFancy, + SyncContent, SyncHeader, SyncMessageFancy, PortAnnotation +}; +use super::scheduler::ComponentCtxFancy; struct BranchAnnotation { port_mapping: Vec, @@ -22,8 +22,8 @@ pub(crate) struct LocalSolution { #[derive(Clone)] pub(crate) struct GlobalSolution { - branches: Vec<(ConnectorId, BranchId)>, - port_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info + component_branches: Vec<(ConnectorId, BranchId)>, + channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info } // ----------------------------------------------------------------------------- @@ -88,10 +88,12 @@ impl Consensus { /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. - pub fn start_sync(&mut self, ports: &[Port]) { + pub fn start_sync(&mut self, ports: &[Port], ctx: &ComponentCtxFancy) { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); + debug_assert!(self.last_finished_handled.is_none()); debug_assert!(self.encountered_peers.is_empty()); + debug_assert!(self.solution_combiner.local.is_empty()); // We'll use the first "branch" (the non-sync one) to store our ports, // this allows cloning if we created a new branch. @@ -104,6 +106,9 @@ impl Consensus { }) .collect(), }); + + self.highest_connector_id = ctx.id; + } /// Notifies the consensus algorithm that a new branch has appeared. Must be @@ -172,7 +177,7 @@ impl Consensus { /// Generates sync messages for any branches that are at the end of the /// sync block. To find these branches, they should've been put in the /// "finished" queue in the execution tree. - pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) { + pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) -> Option { debug_assert!(self.is_in_sync()); let mut last_branch_id = self.last_finished_handled; @@ -194,12 +199,17 @@ impl Consensus { final_branch_id: branch.id, port_mapping: target_mapping, }; - self.send_or_store_local_solution(local_solution, ctx); + let solution_branch = self.send_or_store_local_solution(local_solution, ctx); + if solution_branch.is_some() { + // No need to continue iterating, we've found the solution + return solution_branch; + } last_branch_id = Some(branch.id); } self.last_finished_handled = last_branch_id; + return None; } pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { @@ -213,7 +223,12 @@ impl Consensus { final_ports.push(port.port_id); } - // Clear out internal storage + // Clear out internal storage to defaults + self.highest_connector_id = ConnectorId::new_invalid(); + self.branch_annotations.clear(); + self.last_finished_handled = None; + self.encountered_peers.clear(); + self.solution_combiner.clear(); } // --- Handling messages @@ -289,12 +304,11 @@ impl Consensus { SyncContent::LocalSolution(solution) => { // We might be the leader, or earlier messages caused us to not // be the leader anymore. - self.send_or_store_local_solution(solution, ctx); - return None; + return self.send_or_store_local_solution(solution, ctx); }, SyncContent::GlobalSolution(solution) => { // Take branch of interest and return it. - let (_, branch_id) = solution.branches.iter() + let (_, branch_id) = solution.component_branches.iter() .find(|(connector_id, _)| connector_id == ctx.id) .unwrap(); return Some(*branch_id); @@ -402,11 +416,18 @@ impl Consensus { } // else: exactly equal, so do nothing } - fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) { + fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) -> Option { if self.highest_connector_id == ctx.id { // We are the leader if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) { - for (connector_id, _) in global_solution.branches.iter().copied() { + let mut my_final_branch_id = BranchId::new_invalid(); + for (connector_id, branch_id) in global_solution.component_branches.iter().copied() { + if connector_id == ctx.id { + // This is our solution branch + my_final_branch_id = branch_id; + continue; + } + let message = SyncMessageFancy{ sync_header: self.create_sync_header(ctx), target_component_id: connector_id, @@ -414,6 +435,11 @@ impl Consensus { }; ctx.submit_message(MessageFancy::Sync(message)); } + + debug_assert!(my_final_branch_id.is_valid()); + return Some(my_final_branch_id); + } else { + return None; } } else { // Someone else is the leader @@ -423,6 +449,7 @@ impl Consensus { content: SyncContent::LocalSolution(solution), }; ctx.submit_message(MessageFancy::Sync(message)); + return None; } } @@ -454,7 +481,7 @@ impl Consensus { struct MatchedLocalSolution { final_branch_id: BranchId, - port_mapping: Vec<(ChannelId, BranchId)>, + channel_mapping: Vec<(ChannelId, BranchId)>, matches: Vec, } @@ -496,7 +523,7 @@ impl SolutionCombiner { let component_id = solution.component; let solution = MatchedLocalSolution{ final_branch_id: solution.final_branch_id, - port_mapping: solution.port_mapping, + channel_mapping: solution.port_mapping, matches: Vec::new(), }; @@ -529,7 +556,7 @@ impl SolutionCombiner { // in the stored solutions which other components are peers of the new // one. if new_component { - let cur_ports = &self.local[component_index].solutions[0].port_mapping; + let cur_ports = &self.local[component_index].solutions[0].channel_mapping; let mut component_peers = Vec::new(); // Find the matching components @@ -539,22 +566,22 @@ impl SolutionCombiner { continue; } - let mut matching_ports = Vec::new(); - for (cur_port_id, _) in cur_ports { - for (other_port_id, _) in &other_component.solutions[0].port_mapping { - if cur_port_id == other_port_id { + let mut matching_channels = Vec::new(); + for (cur_channel_id, _) in cur_ports { + for (other_channel_id, _) in &other_component.solutions[0].channel_mapping { + if cur_channel_id == other_channel_id { // We have a shared port - matching_ports.push(*port_id); + matching_channels.push(*cur_channel_id); } } } - if !matching_ports.is_empty() { + if !matching_channels.is_empty() { // We share some ports component_peers.push(ComponentPeer{ target_id: other_component.component, target_index: other_index, - involved_channels: matching_ports, + involved_channels: matching_channels, }); } } @@ -579,7 +606,7 @@ impl SolutionCombiner { num_ports_in_peers += existing_peer.involved_channels.len(); } - if num_ports_in_peers == other_component.solutions[0].port_mapping.len() { + if num_ports_in_peers == other_component.solutions[0].channel_mapping.len() { other_component.all_peers_present = true; } @@ -609,8 +636,8 @@ impl SolutionCombiner { // Check the port mappings between the pair of solutions. let mut all_matched = true; - 'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.port_mapping { - for (other_port, other_branch) in &other_solution.port_mapping { + 'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.channel_mapping { + for (other_port, other_branch) in &other_solution.channel_mapping { if cur_port == other_port { if cur_branch == other_branch { // Same port mapping, go to next port @@ -771,29 +798,29 @@ impl SolutionCombiner { // all (all components have their peers), and the exit condition of the // while loop: if we're here, then we have a global solution debug_assert_eq!(check_stack.len(), self.local.len()); - let mut global_solution = Vec::with_capacity(check_stack.len()); + let mut final_branches = Vec::with_capacity(check_stack.len()); for (component_index, solution_index) in check_stack.iter().copied() { let component = &self.local[component_index]; let solution = &component.solutions[solution_index]; - global_solution.push((component.component, solution.final_branch_id)); + final_branches.push((component.component, solution.final_branch_id)); } // Just debugging here, TODO: @remove - let mut total_num_ports = 0; + let mut total_num_channels = 0; for (component_index, _) in check_stack.iter().copied() { let component = &self.local[component_index]; - total_num_ports += component.solutions[0].port_mapping.len(); + total_num_channels += component.solutions[0].channel_mapping.len(); } - total_num_ports /= 2; - let mut final_mapping = Vec::with_capacity(total_num_ports); + total_num_channels /= 2; + let mut final_mapping = Vec::with_capacity(total_num_channels); let mut total_num_checked = 0; for (component_index, solution_index) in check_stack.iter().copied() { let component = &self.local[component_index]; let solution = &component.solutions[solution_index]; - for (channel_id, branch_id) in solution.port_mapping.iter().copied() { + for (channel_id, branch_id) in solution.channel_mapping.iter().copied() { match final_mapping.iter().find(|(v, _)| *v == channel_id) { Some((_, encountered_branch_id)) => { debug_assert_eq!(encountered_branch_id, branch_id); @@ -806,11 +833,11 @@ impl SolutionCombiner { } } - debug_assert_eq!(total_num_checked, total_num_ports); + debug_assert_eq!(total_num_checked, total_num_channels); return Some(GlobalSolution{ - branches: global_solution, - port_mapping: final_mapping, + component_branches: final_branches, + channel_mapping: final_mapping, }); } @@ -842,13 +869,17 @@ impl SolutionCombiner { solutions.push(LocalSolution{ component: component.component, final_branch_id: solution.final_branch_id, - port_mapping: solution.port_mapping, + port_mapping: solution.channel_mapping, }); } } return solutions; } + + fn clear(&mut self) { + self.local.clear(); + } } // ----------------------------------------------------------------------------- diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index e98a76f7fc9be937aa92ed6b03eb8907fdc9bc37..a26cf17373de3aab4a0c652c59288d1e4934cab1 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -208,36 +208,4 @@ pub struct Message { pub sending_connector: ConnectorId, pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector) pub contents: MessageContents, -} - -/// The public inbox of a connector. The thread running the connector that owns -/// this inbox may retrieved from it. Non-owning threads may only put new -/// messages inside of it. -// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. -// Should behave as a MPSC queue. -pub struct PublicInbox { - messages: Mutex>, -} - -impl PublicInbox { - pub fn new() -> Self { - Self{ - messages: Mutex::new(VecDeque::new()), - } - } - - pub fn insert_message(&self, message: MessageFancy) { - let mut lock = self.messages.lock().unwrap(); - lock.push_back(message); - } - - pub fn take_message(&self) -> Option { - let mut lock = self.messages.lock().unwrap(); - return lock.pop_front(); - } - - pub fn is_empty(&self) -> bool { - let lock = self.messages.lock().unwrap(); - return lock.is_empty(); - } } \ No newline at end of file diff --git a/src/runtime2/inbox2.rs b/src/runtime2/inbox2.rs index 028cd41ff27205289250e30341fb4b44d8502201..9a8754e20b72a68b488bf80f25adf85c699ad4c8 100644 --- a/src/runtime2/inbox2.rs +++ b/src/runtime2/inbox2.rs @@ -1,3 +1,6 @@ +use std::sync::Mutex; +use std::collections::VecDeque; + use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::BranchId; use crate::runtime2::ConnectorId; @@ -77,4 +80,36 @@ pub(crate) enum MessageFancy { Data(DataMessageFancy), Sync(SyncMessageFancy), Control(ControlMessageFancy), +} + +/// The public inbox of a connector. The thread running the connector that owns +/// this inbox may retrieved from it. Non-owning threads may only put new +/// messages inside of it. +// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. +// Should behave as a MPSC queue. +pub struct PublicInbox { + messages: Mutex>, +} + +impl PublicInbox { + pub fn new() -> Self { + Self{ + messages: Mutex::new(VecDeque::new()), + } + } + + pub fn insert_message(&self, message: MessageFancy) { + let mut lock = self.messages.lock().unwrap(); + lock.push_back(message); + } + + pub fn take_message(&self) -> Option { + let mut lock = self.messages.lock().unwrap(); + return lock.pop_front(); + } + + pub fn is_empty(&self) -> bool { + let lock = self.messages.lock().unwrap(); + return lock.is_empty(); + } } \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index ea8d309c8d8839233385c2fb61efec3fdcfa794a..a627f41c6d1ff9b47a00965c43b13e9cc8bc2b07 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -2,12 +2,12 @@ mod runtime; mod messages; -mod connector; +// mod connector; mod branch; mod native; mod port; mod scheduler; -mod inbox; +// mod inbox; mod consensus; mod inbox2; @@ -24,13 +24,11 @@ use std::thread::{self, JoinHandle}; use crate::collections::RawVec; use crate::ProtocolDescription; -use inbox::Message; use connector2::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; -use scheduler::{Scheduler, ControlMessageHandler}; +use scheduler::{Scheduler, ComponentCtxFancy, SchedulerCtx, ControlMessageHandler}; use native::{Connector, ConnectorApplication, ApplicationInterface}; -use crate::runtime2::inbox2::MessageFancy; -use crate::runtime2::port::{ChannelId, Port, PortState}; -use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx}; +use inbox2::MessageFancy; +use port::{ChannelId, Port, PortState}; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index fff2fa8e0b618d4bcec92ef1469161811efcc3eb..27d25194742f8f775b517ce22d62026124c0a626 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -6,11 +6,12 @@ use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use super::{ConnectorKey, ConnectorId, RuntimeInner}; -use super::scheduler::{SchedulerCtx, ComponentCtxFancy, ReceivedMessage}; +use super::scheduler::{SchedulerCtx, ComponentCtxFancy}; use super::port::{Port, PortIdLocal, Channel, PortKind}; -use super::connector::{Branch, ConnectorScheduling, ConnectorPDL}; -use super::connector::find_ports_in_value_group; -use super::inbox::{Message, MessageContents}; +use super::branch::{Branch}; +use super::consensus::find_ports_in_value_group; +use super::connector2::{ConnectorScheduling, ConnectorPDL}; +use super::inbox2::{MessageFancy, ControlContent, ControlMessageFancy}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { @@ -26,7 +27,7 @@ type JobQueue = Arc>>; enum ApplicationJob { NewChannel((Port, Port)), - NewConnector(ConnectorPDL), + NewConnector(ConnectorPDL, Vec), Shutdown, } @@ -57,10 +58,9 @@ impl Connector for ConnectorApplication { // Handle any incoming messages if we're participating in a round while let Some(message) = comp_ctx.read_next_message() { match message { - ReceivedMessage::Data(_) => todo!("data message in API connector"), - ReceivedMessage::Sync(_) | ReceivedMessage::RequestCommit(_) | ReceivedMessage::ConfirmCommit(_) => { - todo!("sync message in API connector"); - } + MessageFancy::Data(_) => todo!("data message in API connector"), + MessageFancy::Sync(_) => todo!("sync message in API connector"), + MessageFancy::Control(_) => todo!("impossible control message"), } } @@ -74,9 +74,9 @@ impl Connector for ConnectorApplication { comp_ctx.push_port(endpoint_a); comp_ctx.push_port(endpoint_b); } - ApplicationJob::NewConnector(connector) => { + ApplicationJob::NewConnector(connector, initial_ports) => { println!("DEBUG: API creating connector"); - comp_ctx.push_component(connector); + comp_ctx.push_component(connector, initial_ports); }, ApplicationJob::Shutdown => { debug_assert!(queue.is_empty()); @@ -139,26 +139,25 @@ impl ApplicationInterface { // asynchronously. let mut initial_ports = Vec::new(); find_ports_in_value_group(&arguments, &mut initial_ports); - for port_to_remove in &initial_ports { - match self.owned_ports.iter().position(|v| v == port_to_remove) { - Some(index_to_remove) => { - // We own the port, so continue - self.owned_ports.remove(index_to_remove); - }, - None => { - // We don't own the port - return Err(ComponentCreationError::UnownedPort); - } + for initial_port in &initial_ports { + if !self.owned_ports.iter().any(|v| v == initial_port) { + return Err(ComponentCreationError::UnownedPort); } } + // We own all ports, so remove them on this side + for initial_port in &initial_ports { + let position = self.owned_ports.iter().position(|v| *v == initial_port).unwrap(); + self.owned_ports.remove(position); + } + let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?; - let connector = ConnectorPDL::new(Branch::new_initial_branch(state), initial_ports); + let connector = ConnectorPDL::new(state); // Put on job queue { let mut queue = self.job_queue.lock().unwrap(); - queue.push_back(ApplicationJob::NewConnector(connector)); + queue.push_back(ApplicationJob::NewConnector(connector, initial_ports)); } self.wake_up_connector_with_ping(); @@ -187,11 +186,11 @@ impl ApplicationInterface { fn wake_up_connector_with_ping(&self) { let connector = self.runtime.get_component_public(self.connector_id); - connector.inbox.insert_message(Message{ - sending_connector: ConnectorId::new_invalid(), - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Ping, - }); + connector.inbox.insert_message(MessageFancy::Control(ControlMessageFancy{ + id: 0, + sending_component_id: self.connector_id, + content: ControlContent::Ack + })); let should_wake_up = connector.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 50329ff5cfc43f4f43ca9f68633c9bdcfd98c638..60626e93624e74d23fdd3ea1d7cc795385b9295f 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -154,7 +154,7 @@ impl Scheduler { // And respond with an Ack let ack_message = MessageFancy::Control(ControlMessageFancy{ - id: content.id, + id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, }); @@ -168,7 +168,7 @@ impl Scheduler { // Send an Ack let ack_message = MessageFancy::Control(ControlMessageFancy{ - id: content.id, + id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, }); @@ -176,7 +176,7 @@ impl Scheduler { self.runtime.send_message(message.sending_component_id, ack_message); }, ControlContent::Ack => { - scheduled.router.handle_ack(content.id); + scheduled.router.handle_ack(message.id); }, ControlContent::Ping => {}, } @@ -506,7 +506,7 @@ impl<'a> Iterator for MessagesIter<'a> { if message.data_header.target_port == self.match_port_id { // Found a match self.next_index += 1; - return Some(data_message); + return Some(message); } } else { // Unreachable because: