diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 916e7cd1c78c2580cd51044c2041e7d67c88745c..e72928313e678585399b6e2ae593b7d25a7d14dd 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -1,20 +1,22 @@ use std::collections::HashMap; use std::sync::atomic::AtomicBool; -use crate::{PortId, ProtocolDescription}; +use crate::PortId; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::scheduler::{ComponentCtxFancy, Scheduler}; use super::ConnectorId; use super::native::Connector; -use super::scheduler::{SchedulerCtx, ConnectorCtx}; +use super::scheduler::{ + SchedulerCtx, ComponentCtxFancy, ComponentPortChange, + ReceivedMessage +}; use super::inbox::{ - PrivateInbox, PublicInbox, - DataMessage, SyncMessage, SolutionMessage, Message, MessageContents, + PublicInbox, + DataMessage, SyncMessage, SolutionMessage, MessageContents, SyncBranchConstraint, SyncConnectorSolution }; -use super::port::{Port, PortKind, PortIdLocal}; +use super::port::{PortKind, PortIdLocal}; /// Represents the identifier of a branch (the index within its container). An /// ID of `0` generally means "no branch" (e.g. no parent, or a port did not @@ -65,7 +67,7 @@ pub(crate) struct Branch { next_branch_in_queue: Option, // Message/port state received: HashMap, // TODO: @temporary, remove together with fires() - ports_delta: Vec, + ports_delta: Vec, } impl Branch { @@ -154,12 +156,6 @@ impl PortAssignment { } } -#[derive(Clone)] -struct PortOwnershipDelta { - acquired: bool, // if false, then released ownership - port_id: PortIdLocal, -} - #[derive(Debug)] enum PortOwnershipError { UsedInInteraction(PortIdLocal), @@ -331,8 +327,6 @@ impl ConnectorPublic { // TODO: Maybe prevent false sharing by aligning `public` to next cache line. // TODO: Do this outside of the connector, create a wrapping struct pub(crate) struct ConnectorPDL { - // State and properties of connector itself - in_sync: bool, // Branch management branches: Vec, // first branch is always non-speculative one sync_active: BranchQueue, @@ -342,7 +336,6 @@ pub(crate) struct ConnectorPDL { cur_round: u32, // Port/message management pub committed_to: Option<(ConnectorId, u64)>, - pub inbox: PrivateInbox, pub ports: ConnectorPorts, } @@ -350,7 +343,7 @@ pub(crate) struct ConnectorPDL { struct ConnectorRunContext<'a> { branch_index: u32, ports: &'a ConnectorPorts, - ports_delta: &'a Vec, + ports_delta: &'a Vec, received: &'a HashMap, scheduler: SchedulerCtx<'a>, prepared_channel: Option<(Value, Value)>, @@ -358,7 +351,7 @@ struct ConnectorRunContext<'a> { impl<'a> RunContext for ConnectorRunContext<'a> { fn did_put(&mut self, port: PortId) -> bool { - if self.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) { + if self.ports_delta.iter().any(|v| v.port.self_id.index == port.0.u32_suffix) { // Either acquired or released, must be silent return false; } @@ -378,7 +371,7 @@ impl<'a> RunContext for ConnectorRunContext<'a> { fn fires(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); - if self.ports_delta.iter().any(|v| v.port_id == port_id) { + if self.ports_delta.iter().any(|v| v.port.self_id == port_id) { return None } @@ -398,53 +391,10 @@ impl<'a> RunContext for ConnectorRunContext<'a> { } impl Connector for ConnectorPDL { - fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { - use MessageContents as MC; - - match message.contents { - MC::Data(content) => self.handle_data_message(message.receiving_port, content), - MC::Sync(content) => self.handle_sync_message(content, ctx, delta_state), - MC::RequestCommit(content) => self.handle_request_commit_message(content, ctx, delta_state), - MC::ConfirmCommit(content) => self.handle_confirm_commit_message(content, ctx, delta_state), - MC::Control(_) | MC::Ping => {}, - } - } - - fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { - if self.in_sync { - // Check for new messages we haven't seen before. If any of the - // pending branches can accept the message, do so. - while let Some((target_port_id, message)) = comp_ctx.read_next_message() { - let mut branch_idx = self.sync_pending_get.first; - while branch_idx != 0 { - let branch = &self.branches[branch_idx as usize]; - let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0); - - let target_port_index = self.ports.get_port_index(*target_port_id).unwrap(); - let port_mapping = self.ports.get_port(branch_idx, target_port_index); - - if branch.sync_state == SpeculativeState::HaltedAtBranchPoint && - branch.halted_at_port == *target_port_id && - port_mapping.last_registered_branch_id == message.sender_prev_branch_id { - // Branch may accept this mesage, so create a fork that - // contains this message in the inbox. - let new_branch_idx = self.branches.len() as u32; - let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch); - - self.ports.prepare_sync_branch(branch_idx, new_branch_idx); - let mapping = self.ports.get_port_mut(branch_idx, target_port_index); - mapping.last_registered_branch_id = message.sender_cur_branch_id; - - let new_branch_id = BranchId::new(new_branch_idx); - self.branches.push(new_branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id) - } - - branch_idx = next_branch_idx; - } - } - - let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx, conn_ctx, delta_state); + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + self.handle_new_messages(comp_ctx); + if comp_ctx.is_in_sync() { + let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. @@ -466,9 +416,9 @@ impl Connector for ConnectorPDL { // Turn local solution into a message and send it along // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed - let solution_message = self.generate_initial_solution_for_branch(branch_id, conn_ctx); + let solution_message = self.generate_initial_solution_for_branch(branch_id, comp_ctx); if let Some(valid_solution) = solution_message { - self.submit_sync_solution(valid_solution, conn_ctx, delta_state); + self.submit_sync_solution(valid_solution, comp_ctx); } else { // Branch is actually invalid, but we only just figured // it out. We need to mark it as invalid to prevent @@ -493,7 +443,7 @@ impl Connector for ConnectorPDL { return scheduling; } else { - let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx, conn_ctx, delta_state); + let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; } } @@ -505,7 +455,6 @@ impl ConnectorPDL { /// hence is in a non-sync state. pub fn new(initial_branch: Branch, owned_ports: Vec) -> Self { Self{ - in_sync: false, branches: vec![initial_branch], sync_active: BranchQueue::new(), sync_pending_get: BranchQueue::new(), @@ -513,28 +462,71 @@ impl ConnectorPDL { sync_finished_last_handled: 0, // none at all cur_round: 0, committed_to: None, - inbox: PrivateInbox::new(), ports: ConnectorPorts::new(owned_ports), } } - pub fn is_in_sync_mode(&self) -> bool { - return self.in_sync; - } - // ------------------------------------------------------------------------- // Handling connector messages // ------------------------------------------------------------------------- - pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) { - // self.inbox.insert_message(target_port, message); + pub fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtxFancy) { + while let Some(message) = comp_ctx.read_next_message() { + match message { + ReceivedMessage::Data((target_port_id, contents)) => { + self.handle_data_message(target_port_id, &contents); + }, + ReceivedMessage::Sync(contents) => { + self.handle_sync_message(contents, comp_ctx); + }, + ReceivedMessage::RequestCommit(contents) => { + self.handle_request_commit_message(contents, comp_ctx); + }, + ReceivedMessage::ConfirmCommit(contents) => { + self.handle_confirm_commit_message(contents, comp_ctx); + }, + } + } + } + + pub fn handle_data_message(&mut self, target_port_id: PortIdLocal, message: &DataMessage) { + // Go through all branches that are waiting for a message + let mut branch_idx = self.sync_pending_get.first; + while branch_idx != 0 { + let branch = &self.branches[branch_idx as usize]; + let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0); + + let target_port_index = self.ports.get_port_index(target_port_id).unwrap(); + let port_mapping = self.ports.get_port(branch_idx, target_port_index); + + // Check if the branch may accept the message + if branch.sync_state == SpeculativeState::HaltedAtBranchPoint && + branch.halted_at_port == target_port_id && + port_mapping.last_registered_branch_id == message.sender_prev_branch_id + { + // Branch can accept. So fork it, and let the fork accept the + // message. The original branch stays waiting for new messages. + let new_branch_idx = self.branches.len() as u32; + let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch); + + self.ports.prepare_sync_branch(branch_idx, new_branch_idx); + let mapping = self.ports.get_port_mut(branch_idx, target_port_index); + mapping.last_registered_branch_id = message.sender_cur_branch_id; + + let new_branch_id = BranchId::new(new_branch_idx); + self.branches.push(new_branch); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id) + } + + branch_idx = next_branch_idx; + } } /// Accepts a synchronous message and combines it with the locally stored /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate. - pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { - debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed - debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints + pub fn handle_sync_message(&mut self, message: SyncMessage, comp_ctx: &mut ComponentCtxFancy) { + debug_assert!(!message.to_visit.contains(&comp_ctx.id)); // own ID already removed + debug_assert!(message.constraints.iter().any(|v| v.connector_id == comp_ctx.id)); // we have constraints // TODO: Optimize, use some kind of temp workspace vector let mut execution_path_branch_ids = Vec::new(); @@ -543,7 +535,7 @@ impl ConnectorPDL { // We have some solutions to match against let constraints_index = message.constraints .iter() - .position(|v| v.connector_id == ctx.id) + .position(|v| v.connector_id == comp_ctx.id) .unwrap(); let constraints = &message.constraints[constraints_index].constraints; debug_assert!(!constraints.is_empty()); @@ -621,7 +613,7 @@ impl ConnectorPDL { // - replace constraints with a local solution new_solution.constraints.remove(constraints_index); new_solution.local_solutions.push(SyncConnectorSolution{ - connector_id: ctx.id, + connector_id: comp_ctx.id, terminating_branch_id: BranchId::new(branch_index), execution_branch_ids: execution_path_branch_ids.clone(), final_port_mapping: new_solution_mapping, @@ -633,7 +625,7 @@ impl ConnectorPDL { let port_id = self.ports.get_port_id(port_index); let (peer_connector_id, peer_port_id, peer_is_getter) = { - let port = ctx.get_port(port_id); + let port = comp_ctx.get_port_by_id(port_id).unwrap(); (port.peer_connector, port.peer_id, port.kind == PortKind::Putter) }; @@ -658,7 +650,7 @@ impl ConnectorPDL { // If here, then the newly generated solution is completely // compatible. let next_branch = branch.next_branch_in_queue; - self.submit_sync_solution(new_solution, ctx, results); + self.submit_sync_solution(new_solution, comp_ctx); // Consider the next branch if branch_index == self.sync_finished_last_handled { @@ -672,7 +664,7 @@ impl ConnectorPDL { } } - fn handle_request_commit_message(&mut self, mut message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + fn handle_request_commit_message(&mut self, mut message: SolutionMessage, comp_ctx: &mut ComponentCtxFancy) { let should_propagate_message = match &self.committed_to { Some((previous_origin, previous_comparison)) => { // Already committed to something. So will commit to this if it @@ -695,22 +687,22 @@ impl ConnectorPDL { // TODO: Use temporary workspace let mut to_visit = Vec::with_capacity(message.local_solutions.len() - 1); for (connector_id, _) in &message.local_solutions { - if *connector_id != ctx.id { + if *connector_id != comp_ctx.id { to_visit.push(*connector_id); } } message.to_visit = to_visit; - self.handle_confirm_commit_message(message.clone(), ctx, delta_state); - delta_state.outbox.push(MessageContents::ConfirmCommit(message)); + comp_ctx.submit_message(MessageContents::ConfirmCommit(message.clone())); + self.handle_confirm_commit_message(message, comp_ctx); } else { // Not yet visited all of the connectors - delta_state.outbox.push(MessageContents::RequestCommit(message)); + comp_ctx.submit_message(MessageContents::RequestCommit(message)); } } } - fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) { + fn handle_confirm_commit_message(&mut self, message: SolutionMessage, comp_ctx: &mut ComponentCtxFancy) { // Make sure this is the message we actually committed to. As long as // we're running on a single machine this is fine. // TODO: Take care of nefarious peers @@ -722,16 +714,15 @@ impl ConnectorPDL { // Find the branch we're supposed to commit to let (_, branch_id) = message.local_solutions .iter() - .find(|(id, _)| *id == ctx.id) + .find(|(id, _)| *id == comp_ctx.id) .unwrap(); let branch_id = *branch_id; // Commit to the branch. That is: move the solution branch to the first // of the connector's branches - self.in_sync = false; self.branches.swap(0, branch_id.index as usize); self.branches.truncate(1); // TODO: Or drain and do not deallocate? - let solution = &mut self.branches[0]; + let solution_branch = &mut self.branches[0]; // Clear all of the other sync-related variables self.sync_active.clear(); @@ -741,18 +732,20 @@ impl ConnectorPDL { self.cur_round += 1; self.committed_to = None; - self.inbox.clear(); self.ports.commit_to_sync(); // Add/remove any of the ports we lost during the sync phase - for port_delta in &solution.ports_delta { - if port_delta.acquired { - self.ports.add_port(port_delta.port_id); + // TODO: Probably might not need this with the port syncing + for port_delta in &solution_branch.ports_delta { + if port_delta.is_acquired { + self.ports.add_port(port_delta.port.self_id); } else { - self.ports.remove_port(port_delta.port_id); + self.ports.remove_port(port_delta.port.self_id); } } - solution.commit_to_sync(); + + comp_ctx.notify_sync_end(&solution_branch.ports_delta); + solution_branch.commit_to_sync(); } // ------------------------------------------------------------------------- @@ -764,8 +757,8 @@ impl ConnectorPDL { /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { - debug_assert!(self.in_sync); + pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + debug_assert!(comp_ctx.is_in_sync()); if self.sync_active.is_empty() { return ConnectorScheduling::NotNow; @@ -864,7 +857,7 @@ impl ConnectorPDL { // But if some messages can be immediately applied, do so // now. - let messages = comp_ctx.get_read_messages(local_port_id, port_mapping.last_registered_branch_id); + let messages = comp_ctx.get_read_data_messages(local_port_id, port_mapping.last_registered_branch_id); let mut did_have_messages = false; for message in messages { @@ -884,10 +877,9 @@ impl ConnectorPDL { // If the message contains any ports then they will now // be owned by the new branch - debug_assert!(results.ports.is_empty()); - find_ports_in_value_group(&message.message, &mut results.ports); - Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports); - results.ports.clear(); + let mut transferred_ports = Vec::new(); // TODO: Create workspace somewhere + find_ports_in_value_group(&message.message, &mut transferred_ports); + Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &transferred_ports); // Schedule the new branch debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); @@ -959,12 +951,11 @@ impl ConnectorPDL { // If the message contains any ports then we release our // ownership over them in this branch - debug_assert!(results.ports.is_empty()); - find_ports_in_value_group(&message.message, &mut results.ports); - Self::release_ports_during_sync(&mut self.ports, branch, &results.ports).unwrap(); - results.ports.clear(); + let mut transferred_ports = Vec::new(); // TODO: Put in some temp workspace + find_ports_in_value_group(&message.message, &mut transferred_ports); + Self::release_ports_during_sync(&mut self.ports, branch, &transferred_ports).unwrap(); - results.outbox.push(MessageContents::Data(message)); + comp_ctx.submit_message(MessageContents::Data(message)); let branch_index = branch.index; Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, branch_index); @@ -986,8 +977,8 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { - debug_assert!(!self.in_sync); + pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + debug_assert!(!comp_ctx.is_in_sync()); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -1013,7 +1004,14 @@ impl ConnectorPDL { }, RunResult::ComponentAtSyncStart => { // Prepare for sync execution and reschedule immediately - self.in_sync = true; + // TODO: Not sure about this. I want a clear synchronization + // point between scheduler/component view on the ports. But is + // this the way to do it? + let current_ports = comp_ctx.notify_sync_start(); + for port in current_ports { + debug_assert!(self.ports.get_port_index(port.self_id).is_some()); + } + let first_sync_branch = Branch::new_sync_branching_from(1, branch); let first_sync_branch_id = first_sync_branch.index; self.ports.prepare_sync_branch(0, 1); @@ -1025,12 +1023,12 @@ impl ConnectorPDL { RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { // Construction of a new component. Find all references to ports // inside of the arguments - debug_assert!(results.ports.is_empty()); - find_ports_in_value_group(&arguments, &mut results.ports); + let mut transferred_ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut transferred_ports); - if !results.ports.is_empty() { + if !transferred_ports.is_empty() { // Ports changing ownership - if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &results.ports) { + if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &transferred_ports) { todo!("fatal error handling"); } } @@ -1043,25 +1041,25 @@ impl ConnectorPDL { definition_id, monomorph_idx, arguments ) }; - let new_connector_ports = results.ports.clone(); // TODO: Do something with this + let new_connector_branch = Branch::new_initial_branch(new_connector_state); - let new_connector = ConnectorPDL::new(new_connector_branch, new_connector_ports); + let new_connector = ConnectorPDL::new(new_connector_branch, transferred_ports); - results.new_connectors.push(new_connector); + comp_ctx.push_component(new_connector); return ConnectorScheduling::Later; }, RunResult::NewChannel => { // Need to prepare a new channel - let (getter, putter) = sched_ctx.runtime.create_channel(conn_ctx.id); + let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); debug_assert_eq!(getter.kind, PortKind::Getter); branch.prepared_channel = Some(( Value::Input(PortId::new(putter.self_id.index)), Value::Output(PortId::new(getter.self_id.index)) )); - results.new_ports.push(putter); - results.new_ports.push(getter); + comp_ctx.push_port(putter); + comp_ctx.push_port(getter); return ConnectorScheduling::Immediate; }, @@ -1201,26 +1199,27 @@ impl ConnectorPDL { } for delta in &branch.ports_delta { - if delta.port_id == *port_id { + if delta.port.self_id == *port_id { // We cannot have acquired this port, because the // call to `ports.get_port_index` returned an index. - debug_assert!(!delta.acquired); + debug_assert!(!delta.is_acquired); return Err(PortOwnershipError::AlreadyGivenAway(*port_id)); } } - branch.ports_delta.push(PortOwnershipDelta{ - acquired: false, - port_id: *port_id, - }); + // TODO: Obtain port description + // branch.ports_delta.push(ComponentPortChange{ + // is_acquired: false, + // port_id: *port_id, + // }); }, None => { // Not in port mapping, so we must have acquired it before, // remove the acquirement. let mut to_delete_index: isize = -1; for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { - if delta.port_id == *port_id { - debug_assert!(delta.acquired); + if delta.port.self_id == *port_id { + debug_assert!(delta.is_acquired); to_delete_index = delta_idx as isize; break; } @@ -1246,8 +1245,8 @@ impl ConnectorPDL { 'port_loop: for port_id in port_ids { for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { - if delta.port_id == *port_id { - if delta.acquired { + if delta.port.self_id == *port_id { + if delta.is_acquired { // Somehow already received this port. // TODO: @security todo!("take care of nefarious peers"); @@ -1261,10 +1260,11 @@ impl ConnectorPDL { } // If here then we can safely acquire the new port - branch.ports_delta.push(PortOwnershipDelta{ - acquired: true, - port_id: *port_id, - }); + // TODO: Retrieve port infor + // branch.ports_delta.push(PortOwnershipDelta{ + // acquired: true, + // port_id: *port_id, + // }); } return Ok(()) @@ -1276,8 +1276,8 @@ impl ConnectorPDL { /// Generates the initial solution for a finished sync branch. If initial /// local solution is valid, then the appropriate message is returned. /// Otherwise the initial solution is inconsistent. - fn generate_initial_solution_for_branch(&self, branch_id: BranchId, ctx: &ConnectorCtx) -> Option { - // Retrieve branchg + fn generate_initial_solution_for_branch(&self, branch_id: BranchId, comp_ctx: &ComponentCtxFancy) -> Option { + // Retrieve branchh debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode let branch = &self.branches[branch_id.index as usize]; debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd); @@ -1301,7 +1301,7 @@ impl ConnectorPDL { } let initial_local_solution = SyncConnectorSolution{ - connector_id: ctx.id, + connector_id: comp_ctx.id, terminating_branch_id: branch_id, execution_branch_ids: all_branch_ids, final_port_mapping: initial_solution_port_mapping, @@ -1317,12 +1317,12 @@ impl ConnectorPDL { // sender and one for the receiver, ensuring it was not used. // TODO: This will fail if a port is passed around multiple times. // maybe a special "passed along" entry in `ports_delta`. - if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)).unwrap() { + if !sync_message.check_constraint(comp_ctx.id, SyncBranchConstraint::SilentPort(port_delta.port.self_id)).unwrap() { return None; } // Might need to check if we own the other side of the channel - let port = ctx.get_port(port_delta.port_id); + let port = comp_ctx.get_port_by_id(port_delta.port.self_id).unwrap(); if !sync_message.add_or_check_constraint(port.peer_connector, SyncBranchConstraint::SilentPort(port.peer_id)).unwrap() { return None; } @@ -1332,7 +1332,7 @@ impl ConnectorPDL { for port_index in 0..self.ports.num_ports() { let port_id = self.ports.get_port_id(port_index); let port_mapping = self.ports.get_port(branch_id.index, port_index); - let port = ctx.get_port(port_id); + let port = comp_ctx.get_port_by_id(port_id).unwrap(); let constraint = if port_mapping.is_assigned { if port.kind == PortKind::Getter { @@ -1352,7 +1352,7 @@ impl ConnectorPDL { return Some(sync_message); } - fn submit_sync_solution(&mut self, partial_solution: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { + fn submit_sync_solution(&mut self, partial_solution: SyncMessage, comp_ctx: &mut ComponentCtxFancy) { if partial_solution.to_visit.is_empty() { // Solution is completely consistent. So ask everyone to commit // TODO: Maybe another package for random? @@ -1366,24 +1366,24 @@ impl ConnectorPDL { let mut full_solution = SolutionMessage{ comparison_number, - connector_origin: ctx.id, + connector_origin: comp_ctx.id, local_solutions: Vec::with_capacity(num_local), to_visit: Vec::with_capacity(num_local - 1), }; for local_solution in &partial_solution.local_solutions { full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id)); - if local_solution.connector_id != ctx.id { + if local_solution.connector_id != comp_ctx.id { full_solution.to_visit.push(local_solution.connector_id); } } debug_assert!(self.committed_to.is_none()); self.committed_to = Some((full_solution.connector_origin, full_solution.comparison_number)); - results.outbox.push(MessageContents::RequestCommit(full_solution)); + comp_ctx.submit_message(MessageContents::RequestCommit(full_solution)); } else { // Still have connectors to visit - results.outbox.push(MessageContents::Sync(partial_solution)); + comp_ctx.submit_message(MessageContents::Sync(partial_solution)); } } @@ -1401,33 +1401,6 @@ impl ConnectorPDL { } } -/// A data structure passed to a connector whose code is being executed that is -/// used to queue up various state changes that have to be applied after -/// running, e.g. the messages the have to be transferred to other connectors. -// TODO: Come up with a better name -pub(crate) struct RunDeltaState { - // Variables that allow the thread running the connector to pick up global - // state changes and try to apply them. - pub outbox: Vec, - pub new_connectors: Vec, - pub new_ports: Vec, - // Workspaces - pub ports: Vec, -} - -impl RunDeltaState { - /// Constructs a new `RunDeltaState` object with the default amount of - /// reserved memory - pub fn new() -> Self { - RunDeltaState{ - outbox: Vec::with_capacity(64), - new_connectors: Vec::new(), - new_ports: Vec::new(), - ports: Vec::with_capacity(64), - } - } -} - #[derive(Eq, PartialEq)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately