diff --git a/src/collections/mod.rs b/src/collections/mod.rs index bdb02b45a4a07748023fa8cfcbd0183264d441ae..872f1834a59e38ea0060e22458248874ec3858cb 100644 --- a/src/collections/mod.rs +++ b/src/collections/mod.rs @@ -1,7 +1,6 @@ mod string_pool; mod scoped_buffer; mod sets; -mod mpmc_queue; mod raw_vec; // TODO: Finish this later, use alloc::alloc and alloc::Layout @@ -10,5 +9,4 @@ mod raw_vec; pub(crate) use string_pool::{StringPool, StringRef}; pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection}; pub(crate) use sets::DequeSet; -pub(crate) use mpmc_queue::MpmcQueue; pub(crate) use raw_vec::RawVec; \ No newline at end of file diff --git a/src/collections/mpmc_queue.rs b/src/collections/mpmc_queue.rs deleted file mode 100644 index 41e21da3830e9feef648a4f20eb70e05716045fc..0000000000000000000000000000000000000000 --- a/src/collections/mpmc_queue.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::sync::Mutex; -use std::collections::VecDeque; - -/// Generic multiple-producer, multiple-consumer queue. Current implementation -/// has the required functionality, without all of the optimizations. -/// TODO: @Optimize -pub struct MpmcQueue { - queue: Mutex>, -} - -impl MpmcQueue { - pub fn new() -> Self { - Self::with_capacity(0) - } - - pub fn with_capacity(capacity: usize) -> Self { - Self{ - queue: Mutex::new(VecDeque::with_capacity(capacity)), - } - } - - pub fn push_back(&self, item: T) { - let mut queue = self.queue.lock().unwrap(); - queue.push_back(item); - } - - pub fn pop_front(&self) -> Option { - let mut queue = self.queue.lock().unwrap(); - return queue.pop_front(); - } -} \ No newline at end of file diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 916e7cd1c78c2580cd51044c2041e7d67c88745c..e72928313e678585399b6e2ae593b7d25a7d14dd 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -1,20 +1,22 @@ use std::collections::HashMap; use std::sync::atomic::AtomicBool; -use crate::{PortId, ProtocolDescription}; +use crate::PortId; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::scheduler::{ComponentCtxFancy, Scheduler}; use super::ConnectorId; use super::native::Connector; -use super::scheduler::{SchedulerCtx, ConnectorCtx}; +use super::scheduler::{ + SchedulerCtx, ComponentCtxFancy, ComponentPortChange, + ReceivedMessage +}; use super::inbox::{ - PrivateInbox, PublicInbox, - DataMessage, SyncMessage, SolutionMessage, Message, MessageContents, + PublicInbox, + DataMessage, SyncMessage, SolutionMessage, MessageContents, SyncBranchConstraint, SyncConnectorSolution }; -use super::port::{Port, PortKind, PortIdLocal}; +use super::port::{PortKind, PortIdLocal}; /// Represents the identifier of a branch (the index within its container). An /// ID of `0` generally means "no branch" (e.g. no parent, or a port did not @@ -65,7 +67,7 @@ pub(crate) struct Branch { next_branch_in_queue: Option, // Message/port state received: HashMap, // TODO: @temporary, remove together with fires() - ports_delta: Vec, + ports_delta: Vec, } impl Branch { @@ -154,12 +156,6 @@ impl PortAssignment { } } -#[derive(Clone)] -struct PortOwnershipDelta { - acquired: bool, // if false, then released ownership - port_id: PortIdLocal, -} - #[derive(Debug)] enum PortOwnershipError { UsedInInteraction(PortIdLocal), @@ -331,8 +327,6 @@ impl ConnectorPublic { // TODO: Maybe prevent false sharing by aligning `public` to next cache line. // TODO: Do this outside of the connector, create a wrapping struct pub(crate) struct ConnectorPDL { - // State and properties of connector itself - in_sync: bool, // Branch management branches: Vec, // first branch is always non-speculative one sync_active: BranchQueue, @@ -342,7 +336,6 @@ pub(crate) struct ConnectorPDL { cur_round: u32, // Port/message management pub committed_to: Option<(ConnectorId, u64)>, - pub inbox: PrivateInbox, pub ports: ConnectorPorts, } @@ -350,7 +343,7 @@ pub(crate) struct ConnectorPDL { struct ConnectorRunContext<'a> { branch_index: u32, ports: &'a ConnectorPorts, - ports_delta: &'a Vec, + ports_delta: &'a Vec, received: &'a HashMap, scheduler: SchedulerCtx<'a>, prepared_channel: Option<(Value, Value)>, @@ -358,7 +351,7 @@ struct ConnectorRunContext<'a> { impl<'a> RunContext for ConnectorRunContext<'a> { fn did_put(&mut self, port: PortId) -> bool { - if self.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) { + if self.ports_delta.iter().any(|v| v.port.self_id.index == port.0.u32_suffix) { // Either acquired or released, must be silent return false; } @@ -378,7 +371,7 @@ impl<'a> RunContext for ConnectorRunContext<'a> { fn fires(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); - if self.ports_delta.iter().any(|v| v.port_id == port_id) { + if self.ports_delta.iter().any(|v| v.port.self_id == port_id) { return None } @@ -398,53 +391,10 @@ impl<'a> RunContext for ConnectorRunContext<'a> { } impl Connector for ConnectorPDL { - fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { - use MessageContents as MC; - - match message.contents { - MC::Data(content) => self.handle_data_message(message.receiving_port, content), - MC::Sync(content) => self.handle_sync_message(content, ctx, delta_state), - MC::RequestCommit(content) => self.handle_request_commit_message(content, ctx, delta_state), - MC::ConfirmCommit(content) => self.handle_confirm_commit_message(content, ctx, delta_state), - MC::Control(_) | MC::Ping => {}, - } - } - - fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { - if self.in_sync { - // Check for new messages we haven't seen before. If any of the - // pending branches can accept the message, do so. - while let Some((target_port_id, message)) = comp_ctx.read_next_message() { - let mut branch_idx = self.sync_pending_get.first; - while branch_idx != 0 { - let branch = &self.branches[branch_idx as usize]; - let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0); - - let target_port_index = self.ports.get_port_index(*target_port_id).unwrap(); - let port_mapping = self.ports.get_port(branch_idx, target_port_index); - - if branch.sync_state == SpeculativeState::HaltedAtBranchPoint && - branch.halted_at_port == *target_port_id && - port_mapping.last_registered_branch_id == message.sender_prev_branch_id { - // Branch may accept this mesage, so create a fork that - // contains this message in the inbox. - let new_branch_idx = self.branches.len() as u32; - let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch); - - self.ports.prepare_sync_branch(branch_idx, new_branch_idx); - let mapping = self.ports.get_port_mut(branch_idx, target_port_index); - mapping.last_registered_branch_id = message.sender_cur_branch_id; - - let new_branch_id = BranchId::new(new_branch_idx); - self.branches.push(new_branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id) - } - - branch_idx = next_branch_idx; - } - } - - let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx, conn_ctx, delta_state); + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + self.handle_new_messages(comp_ctx); + if comp_ctx.is_in_sync() { + let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. @@ -466,9 +416,9 @@ impl Connector for ConnectorPDL { // Turn local solution into a message and send it along // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed - let solution_message = self.generate_initial_solution_for_branch(branch_id, conn_ctx); + let solution_message = self.generate_initial_solution_for_branch(branch_id, comp_ctx); if let Some(valid_solution) = solution_message { - self.submit_sync_solution(valid_solution, conn_ctx, delta_state); + self.submit_sync_solution(valid_solution, comp_ctx); } else { // Branch is actually invalid, but we only just figured // it out. We need to mark it as invalid to prevent @@ -493,7 +443,7 @@ impl Connector for ConnectorPDL { return scheduling; } else { - let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx, conn_ctx, delta_state); + let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; } } @@ -505,7 +455,6 @@ impl ConnectorPDL { /// hence is in a non-sync state. pub fn new(initial_branch: Branch, owned_ports: Vec) -> Self { Self{ - in_sync: false, branches: vec![initial_branch], sync_active: BranchQueue::new(), sync_pending_get: BranchQueue::new(), @@ -513,28 +462,71 @@ impl ConnectorPDL { sync_finished_last_handled: 0, // none at all cur_round: 0, committed_to: None, - inbox: PrivateInbox::new(), ports: ConnectorPorts::new(owned_ports), } } - pub fn is_in_sync_mode(&self) -> bool { - return self.in_sync; - } - // ------------------------------------------------------------------------- // Handling connector messages // ------------------------------------------------------------------------- - pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) { - // self.inbox.insert_message(target_port, message); + pub fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtxFancy) { + while let Some(message) = comp_ctx.read_next_message() { + match message { + ReceivedMessage::Data((target_port_id, contents)) => { + self.handle_data_message(target_port_id, &contents); + }, + ReceivedMessage::Sync(contents) => { + self.handle_sync_message(contents, comp_ctx); + }, + ReceivedMessage::RequestCommit(contents) => { + self.handle_request_commit_message(contents, comp_ctx); + }, + ReceivedMessage::ConfirmCommit(contents) => { + self.handle_confirm_commit_message(contents, comp_ctx); + }, + } + } + } + + pub fn handle_data_message(&mut self, target_port_id: PortIdLocal, message: &DataMessage) { + // Go through all branches that are waiting for a message + let mut branch_idx = self.sync_pending_get.first; + while branch_idx != 0 { + let branch = &self.branches[branch_idx as usize]; + let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0); + + let target_port_index = self.ports.get_port_index(target_port_id).unwrap(); + let port_mapping = self.ports.get_port(branch_idx, target_port_index); + + // Check if the branch may accept the message + if branch.sync_state == SpeculativeState::HaltedAtBranchPoint && + branch.halted_at_port == target_port_id && + port_mapping.last_registered_branch_id == message.sender_prev_branch_id + { + // Branch can accept. So fork it, and let the fork accept the + // message. The original branch stays waiting for new messages. + let new_branch_idx = self.branches.len() as u32; + let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch); + + self.ports.prepare_sync_branch(branch_idx, new_branch_idx); + let mapping = self.ports.get_port_mut(branch_idx, target_port_index); + mapping.last_registered_branch_id = message.sender_cur_branch_id; + + let new_branch_id = BranchId::new(new_branch_idx); + self.branches.push(new_branch); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id) + } + + branch_idx = next_branch_idx; + } } /// Accepts a synchronous message and combines it with the locally stored /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate. - pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { - debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed - debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints + pub fn handle_sync_message(&mut self, message: SyncMessage, comp_ctx: &mut ComponentCtxFancy) { + debug_assert!(!message.to_visit.contains(&comp_ctx.id)); // own ID already removed + debug_assert!(message.constraints.iter().any(|v| v.connector_id == comp_ctx.id)); // we have constraints // TODO: Optimize, use some kind of temp workspace vector let mut execution_path_branch_ids = Vec::new(); @@ -543,7 +535,7 @@ impl ConnectorPDL { // We have some solutions to match against let constraints_index = message.constraints .iter() - .position(|v| v.connector_id == ctx.id) + .position(|v| v.connector_id == comp_ctx.id) .unwrap(); let constraints = &message.constraints[constraints_index].constraints; debug_assert!(!constraints.is_empty()); @@ -621,7 +613,7 @@ impl ConnectorPDL { // - replace constraints with a local solution new_solution.constraints.remove(constraints_index); new_solution.local_solutions.push(SyncConnectorSolution{ - connector_id: ctx.id, + connector_id: comp_ctx.id, terminating_branch_id: BranchId::new(branch_index), execution_branch_ids: execution_path_branch_ids.clone(), final_port_mapping: new_solution_mapping, @@ -633,7 +625,7 @@ impl ConnectorPDL { let port_id = self.ports.get_port_id(port_index); let (peer_connector_id, peer_port_id, peer_is_getter) = { - let port = ctx.get_port(port_id); + let port = comp_ctx.get_port_by_id(port_id).unwrap(); (port.peer_connector, port.peer_id, port.kind == PortKind::Putter) }; @@ -658,7 +650,7 @@ impl ConnectorPDL { // If here, then the newly generated solution is completely // compatible. let next_branch = branch.next_branch_in_queue; - self.submit_sync_solution(new_solution, ctx, results); + self.submit_sync_solution(new_solution, comp_ctx); // Consider the next branch if branch_index == self.sync_finished_last_handled { @@ -672,7 +664,7 @@ impl ConnectorPDL { } } - fn handle_request_commit_message(&mut self, mut message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + fn handle_request_commit_message(&mut self, mut message: SolutionMessage, comp_ctx: &mut ComponentCtxFancy) { let should_propagate_message = match &self.committed_to { Some((previous_origin, previous_comparison)) => { // Already committed to something. So will commit to this if it @@ -695,22 +687,22 @@ impl ConnectorPDL { // TODO: Use temporary workspace let mut to_visit = Vec::with_capacity(message.local_solutions.len() - 1); for (connector_id, _) in &message.local_solutions { - if *connector_id != ctx.id { + if *connector_id != comp_ctx.id { to_visit.push(*connector_id); } } message.to_visit = to_visit; - self.handle_confirm_commit_message(message.clone(), ctx, delta_state); - delta_state.outbox.push(MessageContents::ConfirmCommit(message)); + comp_ctx.submit_message(MessageContents::ConfirmCommit(message.clone())); + self.handle_confirm_commit_message(message, comp_ctx); } else { // Not yet visited all of the connectors - delta_state.outbox.push(MessageContents::RequestCommit(message)); + comp_ctx.submit_message(MessageContents::RequestCommit(message)); } } } - fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) { + fn handle_confirm_commit_message(&mut self, message: SolutionMessage, comp_ctx: &mut ComponentCtxFancy) { // Make sure this is the message we actually committed to. As long as // we're running on a single machine this is fine. // TODO: Take care of nefarious peers @@ -722,16 +714,15 @@ impl ConnectorPDL { // Find the branch we're supposed to commit to let (_, branch_id) = message.local_solutions .iter() - .find(|(id, _)| *id == ctx.id) + .find(|(id, _)| *id == comp_ctx.id) .unwrap(); let branch_id = *branch_id; // Commit to the branch. That is: move the solution branch to the first // of the connector's branches - self.in_sync = false; self.branches.swap(0, branch_id.index as usize); self.branches.truncate(1); // TODO: Or drain and do not deallocate? - let solution = &mut self.branches[0]; + let solution_branch = &mut self.branches[0]; // Clear all of the other sync-related variables self.sync_active.clear(); @@ -741,18 +732,20 @@ impl ConnectorPDL { self.cur_round += 1; self.committed_to = None; - self.inbox.clear(); self.ports.commit_to_sync(); // Add/remove any of the ports we lost during the sync phase - for port_delta in &solution.ports_delta { - if port_delta.acquired { - self.ports.add_port(port_delta.port_id); + // TODO: Probably might not need this with the port syncing + for port_delta in &solution_branch.ports_delta { + if port_delta.is_acquired { + self.ports.add_port(port_delta.port.self_id); } else { - self.ports.remove_port(port_delta.port_id); + self.ports.remove_port(port_delta.port.self_id); } } - solution.commit_to_sync(); + + comp_ctx.notify_sync_end(&solution_branch.ports_delta); + solution_branch.commit_to_sync(); } // ------------------------------------------------------------------------- @@ -764,8 +757,8 @@ impl ConnectorPDL { /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { - debug_assert!(self.in_sync); + pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + debug_assert!(comp_ctx.is_in_sync()); if self.sync_active.is_empty() { return ConnectorScheduling::NotNow; @@ -864,7 +857,7 @@ impl ConnectorPDL { // But if some messages can be immediately applied, do so // now. - let messages = comp_ctx.get_read_messages(local_port_id, port_mapping.last_registered_branch_id); + let messages = comp_ctx.get_read_data_messages(local_port_id, port_mapping.last_registered_branch_id); let mut did_have_messages = false; for message in messages { @@ -884,10 +877,9 @@ impl ConnectorPDL { // If the message contains any ports then they will now // be owned by the new branch - debug_assert!(results.ports.is_empty()); - find_ports_in_value_group(&message.message, &mut results.ports); - Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports); - results.ports.clear(); + let mut transferred_ports = Vec::new(); // TODO: Create workspace somewhere + find_ports_in_value_group(&message.message, &mut transferred_ports); + Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &transferred_ports); // Schedule the new branch debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); @@ -959,12 +951,11 @@ impl ConnectorPDL { // If the message contains any ports then we release our // ownership over them in this branch - debug_assert!(results.ports.is_empty()); - find_ports_in_value_group(&message.message, &mut results.ports); - Self::release_ports_during_sync(&mut self.ports, branch, &results.ports).unwrap(); - results.ports.clear(); + let mut transferred_ports = Vec::new(); // TODO: Put in some temp workspace + find_ports_in_value_group(&message.message, &mut transferred_ports); + Self::release_ports_during_sync(&mut self.ports, branch, &transferred_ports).unwrap(); - results.outbox.push(MessageContents::Data(message)); + comp_ctx.submit_message(MessageContents::Data(message)); let branch_index = branch.index; Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, branch_index); @@ -986,8 +977,8 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { - debug_assert!(!self.in_sync); + pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + debug_assert!(!comp_ctx.is_in_sync()); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -1013,7 +1004,14 @@ impl ConnectorPDL { }, RunResult::ComponentAtSyncStart => { // Prepare for sync execution and reschedule immediately - self.in_sync = true; + // TODO: Not sure about this. I want a clear synchronization + // point between scheduler/component view on the ports. But is + // this the way to do it? + let current_ports = comp_ctx.notify_sync_start(); + for port in current_ports { + debug_assert!(self.ports.get_port_index(port.self_id).is_some()); + } + let first_sync_branch = Branch::new_sync_branching_from(1, branch); let first_sync_branch_id = first_sync_branch.index; self.ports.prepare_sync_branch(0, 1); @@ -1025,12 +1023,12 @@ impl ConnectorPDL { RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { // Construction of a new component. Find all references to ports // inside of the arguments - debug_assert!(results.ports.is_empty()); - find_ports_in_value_group(&arguments, &mut results.ports); + let mut transferred_ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut transferred_ports); - if !results.ports.is_empty() { + if !transferred_ports.is_empty() { // Ports changing ownership - if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &results.ports) { + if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &transferred_ports) { todo!("fatal error handling"); } } @@ -1043,25 +1041,25 @@ impl ConnectorPDL { definition_id, monomorph_idx, arguments ) }; - let new_connector_ports = results.ports.clone(); // TODO: Do something with this + let new_connector_branch = Branch::new_initial_branch(new_connector_state); - let new_connector = ConnectorPDL::new(new_connector_branch, new_connector_ports); + let new_connector = ConnectorPDL::new(new_connector_branch, transferred_ports); - results.new_connectors.push(new_connector); + comp_ctx.push_component(new_connector); return ConnectorScheduling::Later; }, RunResult::NewChannel => { // Need to prepare a new channel - let (getter, putter) = sched_ctx.runtime.create_channel(conn_ctx.id); + let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); debug_assert_eq!(getter.kind, PortKind::Getter); branch.prepared_channel = Some(( Value::Input(PortId::new(putter.self_id.index)), Value::Output(PortId::new(getter.self_id.index)) )); - results.new_ports.push(putter); - results.new_ports.push(getter); + comp_ctx.push_port(putter); + comp_ctx.push_port(getter); return ConnectorScheduling::Immediate; }, @@ -1201,26 +1199,27 @@ impl ConnectorPDL { } for delta in &branch.ports_delta { - if delta.port_id == *port_id { + if delta.port.self_id == *port_id { // We cannot have acquired this port, because the // call to `ports.get_port_index` returned an index. - debug_assert!(!delta.acquired); + debug_assert!(!delta.is_acquired); return Err(PortOwnershipError::AlreadyGivenAway(*port_id)); } } - branch.ports_delta.push(PortOwnershipDelta{ - acquired: false, - port_id: *port_id, - }); + // TODO: Obtain port description + // branch.ports_delta.push(ComponentPortChange{ + // is_acquired: false, + // port_id: *port_id, + // }); }, None => { // Not in port mapping, so we must have acquired it before, // remove the acquirement. let mut to_delete_index: isize = -1; for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { - if delta.port_id == *port_id { - debug_assert!(delta.acquired); + if delta.port.self_id == *port_id { + debug_assert!(delta.is_acquired); to_delete_index = delta_idx as isize; break; } @@ -1246,8 +1245,8 @@ impl ConnectorPDL { 'port_loop: for port_id in port_ids { for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { - if delta.port_id == *port_id { - if delta.acquired { + if delta.port.self_id == *port_id { + if delta.is_acquired { // Somehow already received this port. // TODO: @security todo!("take care of nefarious peers"); @@ -1261,10 +1260,11 @@ impl ConnectorPDL { } // If here then we can safely acquire the new port - branch.ports_delta.push(PortOwnershipDelta{ - acquired: true, - port_id: *port_id, - }); + // TODO: Retrieve port infor + // branch.ports_delta.push(PortOwnershipDelta{ + // acquired: true, + // port_id: *port_id, + // }); } return Ok(()) @@ -1276,8 +1276,8 @@ impl ConnectorPDL { /// Generates the initial solution for a finished sync branch. If initial /// local solution is valid, then the appropriate message is returned. /// Otherwise the initial solution is inconsistent. - fn generate_initial_solution_for_branch(&self, branch_id: BranchId, ctx: &ConnectorCtx) -> Option { - // Retrieve branchg + fn generate_initial_solution_for_branch(&self, branch_id: BranchId, comp_ctx: &ComponentCtxFancy) -> Option { + // Retrieve branchh debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode let branch = &self.branches[branch_id.index as usize]; debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd); @@ -1301,7 +1301,7 @@ impl ConnectorPDL { } let initial_local_solution = SyncConnectorSolution{ - connector_id: ctx.id, + connector_id: comp_ctx.id, terminating_branch_id: branch_id, execution_branch_ids: all_branch_ids, final_port_mapping: initial_solution_port_mapping, @@ -1317,12 +1317,12 @@ impl ConnectorPDL { // sender and one for the receiver, ensuring it was not used. // TODO: This will fail if a port is passed around multiple times. // maybe a special "passed along" entry in `ports_delta`. - if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)).unwrap() { + if !sync_message.check_constraint(comp_ctx.id, SyncBranchConstraint::SilentPort(port_delta.port.self_id)).unwrap() { return None; } // Might need to check if we own the other side of the channel - let port = ctx.get_port(port_delta.port_id); + let port = comp_ctx.get_port_by_id(port_delta.port.self_id).unwrap(); if !sync_message.add_or_check_constraint(port.peer_connector, SyncBranchConstraint::SilentPort(port.peer_id)).unwrap() { return None; } @@ -1332,7 +1332,7 @@ impl ConnectorPDL { for port_index in 0..self.ports.num_ports() { let port_id = self.ports.get_port_id(port_index); let port_mapping = self.ports.get_port(branch_id.index, port_index); - let port = ctx.get_port(port_id); + let port = comp_ctx.get_port_by_id(port_id).unwrap(); let constraint = if port_mapping.is_assigned { if port.kind == PortKind::Getter { @@ -1352,7 +1352,7 @@ impl ConnectorPDL { return Some(sync_message); } - fn submit_sync_solution(&mut self, partial_solution: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { + fn submit_sync_solution(&mut self, partial_solution: SyncMessage, comp_ctx: &mut ComponentCtxFancy) { if partial_solution.to_visit.is_empty() { // Solution is completely consistent. So ask everyone to commit // TODO: Maybe another package for random? @@ -1366,24 +1366,24 @@ impl ConnectorPDL { let mut full_solution = SolutionMessage{ comparison_number, - connector_origin: ctx.id, + connector_origin: comp_ctx.id, local_solutions: Vec::with_capacity(num_local), to_visit: Vec::with_capacity(num_local - 1), }; for local_solution in &partial_solution.local_solutions { full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id)); - if local_solution.connector_id != ctx.id { + if local_solution.connector_id != comp_ctx.id { full_solution.to_visit.push(local_solution.connector_id); } } debug_assert!(self.committed_to.is_none()); self.committed_to = Some((full_solution.connector_origin, full_solution.comparison_number)); - results.outbox.push(MessageContents::RequestCommit(full_solution)); + comp_ctx.submit_message(MessageContents::RequestCommit(full_solution)); } else { // Still have connectors to visit - results.outbox.push(MessageContents::Sync(partial_solution)); + comp_ctx.submit_message(MessageContents::Sync(partial_solution)); } } @@ -1401,33 +1401,6 @@ impl ConnectorPDL { } } -/// A data structure passed to a connector whose code is being executed that is -/// used to queue up various state changes that have to be applied after -/// running, e.g. the messages the have to be transferred to other connectors. -// TODO: Come up with a better name -pub(crate) struct RunDeltaState { - // Variables that allow the thread running the connector to pick up global - // state changes and try to apply them. - pub outbox: Vec, - pub new_connectors: Vec, - pub new_ports: Vec, - // Workspaces - pub ports: Vec, -} - -impl RunDeltaState { - /// Constructs a new `RunDeltaState` object with the default amount of - /// reserved memory - pub fn new() -> Self { - RunDeltaState{ - outbox: Vec::with_capacity(64), - new_connectors: Vec::new(), - new_ports: Vec::new(), - ports: Vec::with_capacity(64), - } - } -} - #[derive(Eq, PartialEq)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 8b1a6fd9b7a04dca1fd591b7fff46e2127c7975a..6c7f03c300f50335fb7e8b048ba18604d8cfb2bd 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -239,107 +239,4 @@ impl PublicInbox { let lock = self.messages.lock().unwrap(); return lock.is_empty(); } -} - -pub(crate) struct PrivateInbox { - // "Normal" messages, intended for a PDL protocol. These need to stick - // around during an entire sync-block (to handle `put`s for which the - // corresponding `get`s have not yet been reached). - messages: Vec<(PortIdLocal, DataMessage)>, - len_read: usize, -} - -impl PrivateInbox { - pub fn new() -> Self { - Self{ - messages: Vec::new(), - len_read: 0, - } - } - - /// Will insert the message into the inbox. Only exception is when the tuple - /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then - /// nothing is inserted.. - pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, message: DataMessage) { - for (existing_target_port, existing) in self.messages.iter() { - if existing.sender_prev_branch_id == message.sender_prev_branch_id && - existing.sender_cur_branch_id == message.sender_cur_branch_id && - *existing_target_port == target_port { - // Message was already received - return; - } - } - - self.messages.push((target_port, message)); - } - - /// Retrieves all previously read messages that satisfy the provided - /// speculative conditions. Note that the inbox remains read-locked until - /// the returned iterator is dropped. Should only be called by the - /// inbox-reader (i.e. the thread executing a connector's PDL code). - /// - /// This function should only be used to check if already-received messages - /// could be received by a newly encountered `get` call in a connector's - /// PDL code. - pub(crate) fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter { - return InboxMessageIter { - messages: &self.messages, - next_index: 0, - max_index: self.len_read, - match_port_id: port_id, - match_prev_branch_id: prev_branch_id, - }; - } - - /// Retrieves the next unread message. Should only be called by the - /// inbox-reader. - pub(crate) fn next_message(&mut self) -> Option<(&PortIdLocal, &DataMessage)> { - if self.len_read == self.messages.len() { - return None; - } - - let (target_port, message) = &self.messages[self.len_read]; - self.len_read += 1; - return Some((target_port, message)); - } - - /// Simply empties the inbox - pub(crate) fn clear(&mut self) { - self.messages.clear(); - self.len_read = 0; - } -} - -/// Iterator over previously received messages in the inbox. -pub(crate) struct InboxMessageIter<'i> { - messages: &'i Vec<(PortIdLocal, DataMessage)>, - next_index: usize, - max_index: usize, - match_port_id: PortIdLocal, - match_prev_branch_id: BranchId, -} - -impl<'i> Iterator for InboxMessageIter<'i> { - type Item = &'i DataMessage; - - fn next(&mut self) -> Option { - // Loop until match is found or at end of messages - while self.next_index < self.max_index { - let (target_port, cur_message) = &self.messages[self.next_index]; - if *target_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id { - // Found a match - break; - } - - self.next_index += 1; - } - - if self.next_index == self.max_index { - return None; - } - - let (_, message) = &self.messages[self.next_index]; - self.next_index += 1; - return Some(message); - } } \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 2acf1503e1dc5a26036832be06d56b3c97f74719..9b741ce49417d1f7933c8622bcac4f8c4c7ffe4a 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -21,11 +21,11 @@ use crate::collections::RawVec; use crate::ProtocolDescription; use inbox::Message; -use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; -use scheduler::{Scheduler, ConnectorCtx, ControlMessageHandler}; +use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; +use scheduler::{Scheduler, ControlMessageHandler}; use native::{Connector, ConnectorApplication, ApplicationInterface}; use crate::runtime2::port::{Port, PortState}; -use crate::runtime2::scheduler::SchedulerCtx; +use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx}; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this @@ -78,24 +78,17 @@ pub(crate) enum ConnectorVariant { } impl Connector for ConnectorVariant { - fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { match self { - ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state), - ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state), - } - } - - fn run(&mut self, scheduler_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { - match self { - ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, conn_ctx, delta_state), - ConnectorVariant::Native(c) => c.run(scheduler_ctx, conn_ctx, delta_state), + ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx), + ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx), } } } pub(crate) struct ScheduledConnector { pub connector: ConnectorVariant, // access by connector - pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector + pub ctx_fancy: ComponentCtxFancy, pub public: ConnectorPublic, // accessible by all schedulers and connectors pub router: ControlMessageHandler, pub shutting_down: bool, @@ -263,7 +256,8 @@ impl RuntimeInner { // --- Creating/retrieving/destroying components - pub(crate) fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { + /// Creates an initially sleeping application connector. + fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { // Initialize as sleeping, as it will be scheduled by the programmer. let mut lock = self.connectors.write().unwrap(); let key = lock.create(ConnectorVariant::Native(Box::new(component)), true); @@ -272,35 +266,17 @@ impl RuntimeInner { return key; } - /// Creates a new PDL component. The caller MUST make sure to schedule the - /// connector. - // TODO: Nicer code, not forcing the caller to schedule, perhaps? - pub(crate) fn create_pdl_component(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey { + /// Creates a new PDL component. This function just creates the component. + /// If you create it initially awake, then you must add it to the work + /// queue. Other aspects of correctness (i.e. setting initial ports) are + /// relinquished to the caller! + pub(crate) fn create_pdl_component(&self, connector: ConnectorPDL, initially_sleeping: bool) -> ConnectorKey { // Create as not sleeping, as we'll schedule it immediately let key = { let mut lock = self.connectors.write().unwrap(); - lock.create(ConnectorVariant::UserDefined(connector), false) + lock.create(ConnectorVariant::UserDefined(connector), initially_sleeping) }; - // Transfer the ports - { - let lock = self.connectors.read().unwrap(); - let created = lock.get_private(&key); - - match &created.connector { - ConnectorVariant::UserDefined(connector) => { - - println!("DEBUG: The connector {} owns the ports: {:?}", key.index, connector.ports.owned_ports.iter().map(|v| v.index).collect::>()); - for port_id in connector.ports.owned_ports.iter().copied() { - println!("DEBUG: Transferring port {:?} from {} to {}", port_id, created_by.context.id.0, key.index); - let port = created_by.context.remove_port(port_id); - created.context.add_port(port); - } - }, - ConnectorVariant::Native(_) => unreachable!(), - } - } - self.increment_active_components(); return key; } @@ -428,7 +404,7 @@ impl ConnectorStore { fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey { let mut connector = ScheduledConnector { connector, - context: ConnectorCtx::new(), + ctx_fancy: ComponentCtxFancy::new_empty(), public: ConnectorPublic::new(initially_sleeping), router: ControlMessageHandler::new(), shutting_down: false, @@ -441,7 +417,7 @@ impl ConnectorStore { // No free entries, allocate new entry index = self.connectors.len(); key = ConnectorKey{ index: index as u32 }; - connector.context.id = key.downcast(); + connector.ctx_fancy.id = key.downcast(); let connector = Box::into_raw(Box::new(connector)); self.connectors.push(connector); @@ -449,7 +425,7 @@ impl ConnectorStore { // Free spot available index = self.free.pop().unwrap(); key = ConnectorKey{ index: index as u32 }; - connector.context.id = key.downcast(); + connector.ctx_fancy.id = key.downcast(); unsafe { let target = self.connectors.get_mut(index); diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index f91ec36c5879a3eab121b373a0bfa3acd8a0dd7e..fff2fa8e0b618d4bcec92ef1469161811efcc3eb 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -4,25 +4,21 @@ use std::sync::atomic::Ordering; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; -use crate::ProtocolDescription; -use crate::runtime2::scheduler::ComponentCtxFancy; -use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx}; -use super::scheduler::SchedulerCtx; +use super::{ConnectorKey, ConnectorId, RuntimeInner}; +use super::scheduler::{SchedulerCtx, ComponentCtxFancy, ReceivedMessage}; use super::port::{Port, PortIdLocal, Channel, PortKind}; -use super::connector::{Branch, ConnectorScheduling, RunDeltaState, ConnectorPDL}; +use super::connector::{Branch, ConnectorScheduling, ConnectorPDL}; use super::connector::find_ports_in_value_group; use super::inbox::{Message, MessageContents}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { - /// Handle a new message (preprocessed by the scheduler). You probably only - /// want to handle `Data`, `Sync`, and `Solution` messages. The others are - /// intended for the scheduler itself. - fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); - /// Should run the connector's behaviour up until the next blocking point. - fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; + /// One should generally request and handle new messages from the component + /// context. Then perform any logic the component has to do, and in the + /// process perhaps queue up some state changes using the same context. + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; @@ -46,7 +42,10 @@ impl ConnectorApplication { let sync_done = Arc::new(( Mutex::new(false), Condvar::new() )); let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32))); - let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() }; + let connector = ConnectorApplication { + sync_done: sync_done.clone(), + job_queue: job_queue.clone() + }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); return (connector, interface); @@ -54,36 +53,35 @@ impl ConnectorApplication { } impl Connector for ConnectorApplication { - fn handle_message(&mut self, message: Message, _ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) { - use MessageContents as MC; - - match message.contents { - MC::Data(_) => unreachable!("data message in API connector"), - MC::Sync(_) | MC::RequestCommit(_) | MC::ConfirmCommit(_) => { - // Handling sync in API - }, - MC::Control(_) => {}, - MC::Ping => {}, + fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + // Handle any incoming messages if we're participating in a round + while let Some(message) = comp_ctx.read_next_message() { + match message { + ReceivedMessage::Data(_) => todo!("data message in API connector"), + ReceivedMessage::Sync(_) | ReceivedMessage::RequestCommit(_) | ReceivedMessage::ConfirmCommit(_) => { + todo!("sync message in API connector"); + } + } } - } - fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { - let mut queue = self.job_queue.lock().unwrap(); - while let Some(job) = queue.pop_front() { - match job { - ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { - println!("DEBUG: API adopting ports"); - delta_state.new_ports.reserve(2); - delta_state.new_ports.push(endpoint_a); - delta_state.new_ports.push(endpoint_b); - } - ApplicationJob::NewConnector(connector) => { - println!("DEBUG: API creating connector"); - delta_state.new_connectors.push(connector); - }, - ApplicationJob::Shutdown => { - debug_assert!(queue.is_empty()); - return ConnectorScheduling::Exit; + // Handle requests coming from the API + { + let mut queue = self.job_queue.lock().unwrap(); + while let Some(job) = queue.pop_front() { + match job { + ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { + println!("DEBUG: API adopting ports"); + comp_ctx.push_port(endpoint_a); + comp_ctx.push_port(endpoint_b); + } + ApplicationJob::NewConnector(connector) => { + println!("DEBUG: API creating connector"); + comp_ctx.push_component(connector); + }, + ApplicationJob::Shutdown => { + debug_assert!(queue.is_empty()); + return ConnectorScheduling::Exit; + } } } } diff --git a/src/runtime2/port.rs b/src/runtime2/port.rs index b94443dc568a7606001e3f6477046d188e499d05..579c7ee61b07d1f5566dcba57a6942f8c807e7a6 100644 --- a/src/runtime2/port.rs +++ b/src/runtime2/port.rs @@ -21,13 +21,13 @@ impl PortIdLocal { } } -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum PortKind { Putter, Getter, } -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum PortState { Open, Closed, diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 1c31694e3d45b6214a10052fbd1104db80d1058b..a17d66cc0b843cdbd460a5bc7518bd137b1a8efd 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,59 +1,15 @@ +use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; -use crate::runtime2::connector::{BranchId, ConnectorPDL}; -use crate::runtime2::inbox::{DataMessage, PrivateInbox}; -use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; +use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey, ConnectorVariant}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; -use super::connector::{ConnectorScheduling, RunDeltaState}; -use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage}; - -/// Contains fields that are mostly managed by the scheduler, but may be -/// accessed by the connector -pub(crate) struct ConnectorCtx { - pub(crate) id: ConnectorId, - pub(crate) ports: Vec, -} - -impl ConnectorCtx { - pub(crate) fn new() -> ConnectorCtx { - Self{ - id: ConnectorId::new_invalid(), - ports: Vec::new(), - } - } - - pub(crate) fn add_port(&mut self, port: Port) { - debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id)); - self.ports.push(port); - } - - pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port { - let index = self.port_id_to_index(id); - return self.ports.remove(index); - } - - pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port { - let index = self.port_id_to_index(id); - return &self.ports[index]; - } - - pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port { - let index = self.port_id_to_index(id); - return &mut self.ports[index]; - } - - fn port_id_to_index(&self, id: PortIdLocal) -> usize { - for (idx, port) in self.ports.iter().enumerate() { - if port.self_id == id { - return idx; - } - } - - panic!("port {:?}, not owned by connector", id); - } -} +use super::connector::{BranchId, ConnectorPDL, ConnectorScheduling}; +use super::inbox::{ + Message, MessageContents, ControlMessageVariant, + DataMessage, ControlMessage, SolutionMessage, SyncMessage +}; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] @@ -74,8 +30,6 @@ impl Scheduler { pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run - let mut delta_state = RunDeltaState::new(); - 'thread_loop: loop { // Retrieve a unit of work self.debug("Waiting for work"); @@ -97,71 +51,7 @@ impl Scheduler { // connector. let mut cur_schedule = ConnectorScheduling::Immediate; while cur_schedule == ConnectorScheduling::Immediate { - // Check all the message that are in the shared inbox - while let Some(message) = scheduled.public.inbox.take_message() { - // Check for rerouting - self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message)); - if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { - self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0)); - self.runtime.send_message(other_connector_id, message); - continue; - } - - self.debug_conn(connector_id, " ... Handling message myself"); - // Check for messages that requires special action from the - // scheduler. - if let MessageContents::Control(content) = message.contents { - match content.content { - ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { - // Need to change port target - let port = scheduled.context.get_port_mut(port_id); - port.peer_connector = new_target_connector_id; - - // Note: for simplicity we program the scheduler to always finish - // running a connector with an empty outbox. If this ever changes - // then accepting the "port peer changed" message implies we need - // to change the recipient of the message in the outbox. - debug_assert!(delta_state.outbox.is_empty()); - - // And respond with an Ack - let ack_message = Message{ - sending_connector: connector_id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - }; - self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); - self.runtime.send_message(message.sending_connector, ack_message); - }, - ControlMessageVariant::CloseChannel(port_id) => { - // Mark the port as being closed - let port = scheduled.context.get_port_mut(port_id); - port.state = PortState::Closed; - - // Send an Ack - let ack_message = Message{ - sending_connector: connector_id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - }; - self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); - self.runtime.send_message(message.sending_connector, ack_message); - - }, - ControlMessageVariant::Ack => { - scheduled.router.handle_ack(content.id); - } - } - } else { - // Let connector handle message - scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state); - } - } + self.handle_inbox_messages(scheduled); // Run the main behaviour of the connector, depending on its // current state. @@ -180,14 +70,12 @@ impl Scheduler { } else { self.debug_conn(connector_id, "Running ..."); let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; - let new_schedule = scheduled.connector.run( - scheduler_ctx, &scheduled.context, &mut delta_state - ); + let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx_fancy); self.debug_conn(connector_id, "Finished running"); // Handle all of the output from the current run: messages to // send and connectors to instantiate. - self.handle_delta_state(scheduled, connector_key.downcast(), &mut delta_state); + self.handle_changes_in_context(scheduled); cur_schedule = new_schedule; } @@ -212,7 +100,7 @@ impl Scheduler { // Prepare for exit. Set the shutdown flag and broadcast // messages to notify peers of closing channels scheduled.shutting_down = true; - for port in &scheduled.context.ports { + for port in &scheduled.ctx_fancy.ports { if port.state != PortState::Closed { let message = scheduled.router.prepare_closing_channel( port.self_id, port.peer_id, @@ -234,119 +122,244 @@ impl Scheduler { } } - fn handle_delta_state(&mut self, - cur_connector: &mut ScheduledConnector, connector_id: ConnectorId, - delta_state: &mut RunDeltaState - ) { - // Handling any messages that were sent - if !delta_state.outbox.is_empty() { - for mut message in delta_state.outbox.drain(..) { - // Based on the message contents, decide where the message - // should be sent to. This might end up modifying the message. - self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); - let (peer_connector, self_port, peer_port) = match &mut message { - MessageContents::Data(contents) => { - let port = cur_connector.context.get_port(contents.sending_port); - (port.peer_connector, contents.sending_port, port.peer_id) - }, - MessageContents::Sync(contents) => { - let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) - }, - MessageContents::RequestCommit(contents)=> { - let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) - }, - MessageContents::ConfirmCommit(contents) => { - for to_visit in &contents.to_visit { - let message = Message{ + /// Receiving messages from the public inbox and handling them or storing + /// them in the component's private inbox + fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) { + let connector_id = scheduled.ctx_fancy.id; + + while let Some(message) = scheduled.public.inbox.take_message() { + // Check for rerouting + self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message)); + if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { + self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0)); + self.runtime.send_message(other_connector_id, message); + continue; + } + + // Handle special messages here, messages for the component + // will be added to the inbox. + self.debug_conn(connector_id, " ... Handling message myself"); + match message.contents { + MessageContents::Control(content) => { + match content.content { + ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { + // Need to change port target + let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap(); + port.peer_connector = new_target_connector_id; + + // Note: for simplicity we program the scheduler to always finish + // running a connector with an empty outbox. If this ever changes + // then accepting the "port peer changed" message implies we need + // to change the recipient of the message in the outbox. + debug_assert!(scheduled.ctx_fancy.outbox.is_empty()); + + // And respond with an Ack + let ack_message = Message{ sending_connector: connector_id, receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::ConfirmCommit(contents.clone()), + contents: MessageContents::Control(ControlMessage{ + id: content.id, + content: ControlMessageVariant::Ack, + }), }; - self.runtime.send_message(*to_visit, message); + self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); + self.runtime.send_message(message.sending_connector, ack_message); + }, + ControlMessageVariant::CloseChannel(port_id) => { + // Mark the port as being closed + let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap(); + port.state = PortState::Closed; + + // Send an Ack + let ack_message = Message{ + sending_connector: connector_id, + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Control(ControlMessage{ + id: content.id, + content: ControlMessageVariant::Ack, + }), + }; + self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); + self.runtime.send_message(message.sending_connector, ack_message); + }, + ControlMessageVariant::Ack => { + scheduled.router.handle_ack(content.id); } - (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) - }, - MessageContents::Control(_) | MessageContents::Ping => { - // Never generated by the user's code - unreachable!(); } - }; + }, + MessageContents::Ping => { + // Pings are sent just to wake up a component, so + // nothing to do here. + }, + _ => { + // All other cases have to be handled by the component + scheduled.ctx_fancy.inbox_messages.push(message); + } + } + } + } - // TODO: Maybe clean this up, perhaps special case for - // ConfirmCommit can be handled differently. - if peer_connector.is_valid() { - if peer_port.is_valid() { - // Sending a message to a port, so the port may not be - // closed. - let port = cur_connector.context.get_port(self_port); - match port.state { - PortState::Open => {}, - PortState::Closed => { - todo!("Handling sending over a closed port"); - } + /// Handles changes to the context that were made by the component. This is + /// the way (due to Rust's borrowing rules) that we bubble up changes in the + /// component's state that the scheduler needs to know about (e.g. a message + /// that the component wants to send). + fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) { + let connector_id = scheduled.ctx_fancy.id; + + // Handling any messages that were sent + while let Some(mut message) = scheduled.ctx_fancy.outbox.pop_front() { + // Based on the message contents, decide where the message + // should be sent to. This might end up modifying the message. + self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); + let (peer_connector, self_port, peer_port) = match &mut message { + MessageContents::Data(contents) => { + let port = scheduled.ctx_fancy.get_port_by_id(contents.sending_port).unwrap(); + (port.peer_connector, contents.sending_port, port.peer_id) + }, + MessageContents::Sync(contents) => { + let connector = contents.to_visit.pop().unwrap(); + (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) + }, + MessageContents::RequestCommit(contents)=> { + let connector = contents.to_visit.pop().unwrap(); + (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) + }, + MessageContents::ConfirmCommit(contents) => { + for to_visit in &contents.to_visit { + let message = Message{ + sending_connector: scheduled.ctx_fancy.id, + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::ConfirmCommit(contents.clone()), + }; + self.runtime.send_message(*to_visit, message); + } + (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) + }, + MessageContents::Control(_) | MessageContents::Ping => { + // Never generated by the user's code + unreachable!(); + } + }; + + // TODO: Maybe clean this up, perhaps special case for + // ConfirmCommit can be handled differently. + if peer_connector.is_valid() { + if peer_port.is_valid() { + // Sending a message to a port, so the port may not be + // closed. + let port = scheduled.ctx_fancy.get_port_by_id(self_port).unwrap(); + match port.state { + PortState::Open => {}, + PortState::Closed => { + todo!("Handling sending over a closed port"); } } - let message = Message { - sending_connector: connector_id, - receiving_port: peer_port, - contents: message, - }; - self.runtime.send_message(peer_connector, message); } + let message = Message { + sending_connector: scheduled.ctx_fancy.id, + receiving_port: peer_port, + contents: message, + }; + self.runtime.send_message(peer_connector, message); } } - if !delta_state.new_ports.is_empty() { - for port in delta_state.new_ports.drain(..) { - cur_connector.context.ports.push(port); - } - } + while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() { + match state_change { + ComponentStateChange::CreatedComponent(component) => { + // Add the new connector to the global registry + let new_key = self.runtime.create_pdl_component(component, false); + let new_connector = self.runtime.get_component_private(&new_key); + + // Transfer ports + // TODO: Clean this up the moment native components are somewhat + // properly implemented. We need to know about the ports that + // are "owned by the PDL code", and then make sure that the + // context contains a description of those ports. + let ports = if let ConnectorVariant::UserDefined(connector) = &new_connector.connector { + &connector.ports.owned_ports + } else { + unreachable!(); + }; - // Handling any new connectors that were scheduled - // TODO: Pool outgoing messages to reduce atomic access - if !delta_state.new_connectors.is_empty() { - for new_connector in delta_state.new_connectors.drain(..) { - // Add to global registry to obtain key - let new_key = self.runtime.create_pdl_component(cur_connector, new_connector); - let new_connector = self.runtime.get_component_private(&new_key); - - // Call above changed ownership of ports, but we still have to - // let the other end of the channel know that the port has - // changed location. - for port in &new_connector.context.ports { - let reroute_message = cur_connector.router.prepare_reroute( - port.self_id, port.peer_id, cur_connector.context.id, - port.peer_connector, new_connector.context.id - ); - - self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); - self.runtime.send_message(port.peer_connector, reroute_message); - } + for port_id in ports { + // Transfer messages associated with the transferred port + let mut message_idx = 0; + while message_idx < scheduled.ctx_fancy.inbox_messages.len() { + let message = &scheduled.ctx_fancy.inbox_messages[message_idx]; + if message.receiving_port == *port_id { + // Need to transfer this message + let taken_message = scheduled.ctx_fancy.inbox_messages.remove(message_idx); + new_connector.ctx_fancy.inbox_messages.push(taken_message); + } else { + message_idx += 1; + } + } - // Schedule new connector to run - self.runtime.push_work(new_key); + // Transfer the port itself + let port_index = scheduled.ctx_fancy.ports.iter() + .position(|v| v.self_id == *port_id) + .unwrap(); + let port = scheduled.ctx_fancy.ports.remove(port_index); + new_connector.ctx_fancy.ports.push(port.clone()); + + // Notify the peer that the port has changed + let reroute_message = scheduled.router.prepare_reroute( + port.self_id, port.peer_id, scheduled.ctx_fancy.id, + port.peer_connector, new_connector.ctx_fancy.id + ); + + self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); + self.runtime.send_message(port.peer_connector, reroute_message); + } + + // Schedule new connector to run + self.runtime.push_work(new_key); + }, + ComponentStateChange::CreatedPort(port) => { + scheduled.ctx_fancy.ports.push(port); + }, + ComponentStateChange::ChangedPort(port_change) => { + if port_change.is_acquired { + scheduled.ctx_fancy.ports.push(port_change.port); + } else { + let index = scheduled.ctx_fancy.ports + .iter() + .position(|v| v.self_id == port_change.port.self_id) + .unwrap(); + scheduled.ctx_fancy.ports.remove(index); + } + } } } - debug_assert!(delta_state.outbox.is_empty()); - debug_assert!(delta_state.new_ports.is_empty()); - debug_assert!(delta_state.new_connectors.is_empty()); + // Finally, check if we just entered or just left a sync region + if scheduled.ctx_fancy.changed_in_sync { + if scheduled.ctx_fancy.is_in_sync { + // Just entered sync region + } else { + // Just left sync region. So clear inbox + scheduled.ctx_fancy.inbox_messages.clear(); + scheduled.ctx_fancy.inbox_len_read = 0; + } + + scheduled.ctx_fancy.changed_in_sync = false; // reset flag + } } fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) { - debug_assert_eq!(connector_key.index, connector.context.id.0); + debug_assert_eq!(connector_key.index, connector.ctx_fancy.id.0); debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false); // This is the running connector, and only the running connector may // decide it wants to sleep again. connector.public.sleeping.store(true, Ordering::Release); - // But do to reordering we might have received messages from peers who + // But due to reordering we might have received messages from peers who // did not consider us sleeping. If so, then we wake ourselves again. if !connector.public.inbox.is_empty() { - // Try to wake ourselves up + // Try to wake ourselves up (needed because someone might be trying + // the exact same atomic compare-and-swap at this point in time) let should_wake_up_again = connector.public.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); @@ -378,14 +391,9 @@ enum ComponentStateChange { } #[derive(Clone)] -pub(crate) enum ComponentPortChange { - Acquired(Port), - Released(Port), -} - -struct InboxMessage { - target_port: PortIdLocal, - data: DataMessage, +pub(crate) struct ComponentPortChange { + pub is_acquired: bool, // otherwise: released + pub port: Port, } /// The component context (better name may be invented). This was created @@ -395,23 +403,43 @@ struct InboxMessage { /// scheduler need to be exchanged. pub(crate) struct ComponentCtxFancy { // Mostly managed by the scheduler - id: ConnectorId, + pub(crate) id: ConnectorId, ports: Vec, - inbox_messages: Vec, + inbox_messages: Vec, // never control or ping messages inbox_len_read: usize, // Submitted by the component is_in_sync: bool, changed_in_sync: bool, - outbox: Vec, - state_changes: Vec + outbox: VecDeque, + state_changes: VecDeque +} + +pub(crate) enum ReceivedMessage { + Data((PortIdLocal, DataMessage)), + Sync(SyncMessage), + RequestCommit(SolutionMessage), + ConfirmCommit(SolutionMessage), } impl ComponentCtxFancy { + pub(crate) fn new_empty() -> Self { + return Self{ + id: ConnectorId::new_invalid(), + ports: Vec::new(), + inbox_messages: Vec::new(), + inbox_len_read: 0, + is_in_sync: false, + changed_in_sync: false, + outbox: VecDeque::new(), + state_changes: VecDeque::new(), + }; + } + /// Notify the runtime that the component has created a new component. May /// only be called outside of a sync block. pub(crate) fn push_component(&mut self, component: ConnectorPDL) { debug_assert!(!self.is_in_sync); - self.state_changes.push(ComponentStateChange::CreatedComponent(component)); + self.state_changes.push_back(ComponentStateChange::CreatedComponent(component)); } /// Notify the runtime that the component has created a new port. May only @@ -419,10 +447,21 @@ impl ComponentCtxFancy { /// block, pass them when calling `notify_sync_end`). pub(crate) fn push_port(&mut self, port: Port) { debug_assert!(!self.is_in_sync); - self.state_changes.push(ComponentStateChange::CreatedPort(port)) + self.state_changes.push_back(ComponentStateChange::CreatedPort(port)) + } + + pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> { + return self.ports.iter().find(|v| v.self_id == id); } - /// Notify that component will enter a sync block. + fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> { + return self.ports.iter_mut().find(|v| v.self_id == id); + } + + /// Notify that component will enter a sync block. Note that after calling + /// this function you must allow the scheduler to pick up the changes in + /// the context by exiting your `Component::run` function with an + /// appropriate scheduling value. pub(crate) fn notify_sync_start(&mut self) -> &[Port] { debug_assert!(!self.is_in_sync); @@ -431,14 +470,20 @@ impl ComponentCtxFancy { return &self.ports } + #[inline] + pub(crate) fn is_in_sync(&self) -> bool { + return self.is_in_sync; + } + /// Submit a message for the scheduler to send to the appropriate receiver. /// May only be called inside of a sync block. pub(crate) fn submit_message(&mut self, contents: MessageContents) { debug_assert!(self.is_in_sync); - self.outbox.push(contents); + self.outbox.push_back(contents); } - /// Notify that component just finished a sync block. + /// Notify that component just finished a sync block. Like + /// `notify_sync_start`: drop out of the `Component::Run` function. pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) { debug_assert!(self.is_in_sync); @@ -447,26 +492,15 @@ impl ComponentCtxFancy { self.state_changes.reserve(changed_ports.len()); for changed_port in changed_ports { - self.state_changes.push(ComponentStateChange::ChangedPort(changed_port.clone())); + self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone())); } } - /// Inserts message into inbox. Generally only called by scheduler. - pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, data: DataMessage) { - debug_assert!(!self.inbox_messages.iter().any(|v| { - v.target_port == target_port && - v.data.sender_prev_branch_id == data.sender_prev_branch_id && - v.data.sender_cur_branch_id == data.sender_cur_branch_id - })); - - self.inbox_messages.push(InboxMessage{ target_port, data }) - } - /// Retrieves messages matching a particular port and branch id. But only /// those messages that have been previously received with /// `read_next_message`. - pub(crate) fn get_read_messages(&self, match_port_id: PortIdLocal, match_prev_branch_id: BranchId) -> MessagesIter { - return MessageIter { + pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal, match_prev_branch_id: BranchId) -> MessagesIter { + return MessagesIter { messages: &self.inbox_messages, next_index: 0, max_index: self.inbox_len_read, @@ -476,47 +510,62 @@ impl ComponentCtxFancy { /// Retrieves the next unread message from the inbox `None` if there are no /// (new) messages to read. - pub(crate) fn read_next_message(&mut self) -> Option<(&PortIdLocal, &DataMessage)> { - if self.inbox_len_read == self.inbox_messages.len() { - return None; - } + // TODO: Fix the clone of the data message, entirely unnecessary + pub(crate) fn read_next_message(&mut self) -> Option { + if !self.is_in_sync { return None; } + if self.inbox_len_read == self.inbox_messages.len() { return None; } let message = &self.inbox_messages[self.inbox_len_read]; - self.inbox_len_read += 1; - return Some((&message.target_port, &message.data)) + if let MessageContents::Data(contents) = &message.contents { + self.inbox_len_read += 1; + return Some(ReceivedMessage::Data((message.receiving_port, contents.clone()))); + } else { + // Must be a sync/solution message + let message = self.inbox_messages.remove(self.inbox_len_read); + return match message.contents { + MessageContents::Sync(v) => Some(ReceivedMessage::Sync(v)), + MessageContents::RequestCommit(v) => Some(ReceivedMessage::RequestCommit(v)), + MessageContents::ConfirmCommit(v) => Some(ReceivedMessage::ConfirmCommit(v)), + _ => unreachable!(), // because we only put data/synclike messages in the inbox + } + } } } pub(crate) struct MessagesIter<'a> { - messages: &'a [InboxMessage], + messages: &'a [Message], next_index: usize, max_index: usize, match_port_id: PortIdLocal, match_prev_branch_id: BranchId, } -impl Iterator for MessagesIter { - type Item = DataMessage; +impl<'a> Iterator for MessagesIter<'a> { + type Item = &'a DataMessage; - fn next(&mut self) -> Option<&Self::Item> { + fn next(&mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let message = &self.messages[self.next_index]; - if message.target_port == self.match_port_id && message.data.sender_prev_branch_id == self.match_prev_branch_id { - // Found a match - break; + if let MessageContents::Data(data_message) = &message.contents { + if message.receiving_port == self.match_port_id && data_message.sender_prev_branch_id == self.match_prev_branch_id { + // Found a match + self.next_index += 1; + return Some(data_message); + } + } else { + // Unreachable because: + // 1. We only iterate over messages that were previously retrieved by `read_next_message`. + // 2. Inbox does not contain control/ping messages. + // 3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox. + unreachable!(); } self.next_index += 1; } - if self.next_index == self.max_index { - return None; - } - - let message = &self.messages[self.next_index]; - self.next_index += 1; - return Some(&message.data); + // No more messages + return None; } } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 04c61793c086384b81c5694d4a90c18e7a4912dc..5bd0ecb8d3f47dc32127f1ffacd0c8c213d02949 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -6,8 +6,8 @@ use crate::common::Id; use crate::protocol::eval::*; const NUM_THREADS: u32 = 4; // number of threads in runtime -const NUM_INSTANCES: u32 = 10; // number of test instances constructed -const NUM_LOOPS: u32 = 1; // number of loops within a single test (not used by all tests) +const NUM_INSTANCES: u32 = 10; // number of test instances constructed +const NUM_LOOPS: u32 = 10; // number of loops within a single test (not used by all tests) fn create_runtime(pdl: &str) -> Runtime { let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");