diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 22e82638e9e0b1d5468aae9428b89ec9c63248ec..42546271a1f6f01dfc042f0e78615f0446a12171 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -1,18 +1,16 @@ use std::collections::HashMap; -use std::ops::Deref; use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::global_store::ConnectorKey; -use crate::runtime2::inbox::{MessageContents, OutgoingMessage, SolutionMessage}; +use crate::runtime2::inbox::{MessageContents, SolutionMessage}; use crate::runtime2::native::Connector; -use crate::runtime2::port::PortKind; +use crate::runtime2::port::{Port, PortKind}; use crate::runtime2::scheduler::ConnectorCtx; use super::global_store::ConnectorId; use super::inbox::{ - PrivateInbox, PublicInbox, OutgoingDataMessage, DataMessage, SyncMessage, + PrivateInbox, PublicInbox, DataMessage, SyncMessage, SyncBranchConstraint, SyncConnectorSolution }; use super::port::PortIdLocal; @@ -41,7 +39,7 @@ impl BranchId { } } -#[derive(PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq)] pub(crate) enum SpeculativeState { // Non-synchronous variants RunningNonSync, // regular execution of code @@ -141,7 +139,7 @@ impl PortAssignment { } } -#[derive(Clone, Eq)] +#[derive(Clone)] struct PortOwnershipDelta { acquired: bool, // if false, then released ownership port_id: PortIdLocal, @@ -227,7 +225,7 @@ impl ConnectorPorts { #[inline] fn get_port_index(&self, port_id: PortIdLocal) -> Option { for (idx, port) in self.owned_ports.iter().enumerate() { - if port == port_id { + if *port == port_id { return Some(idx) } } @@ -250,7 +248,7 @@ impl ConnectorPorts { #[inline] fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment { let mapped_idx = self.mapped_index(branch_idx, port_idx); - return &mut self.port_mapping(mapped_idx); + return &mut self.port_mapping[mapped_idx]; } #[inline] @@ -323,7 +321,7 @@ pub(crate) struct ConnectorPDL { sync_active: BranchQueue, sync_pending_get: BranchQueue, sync_finished: BranchQueue, - sync_finished_last_handled: u32, + sync_finished_last_handled: u32, // TODO: Change to BranchId? cur_round: u32, // Port/message management pub committed_to: Option<(ConnectorId, u64)>, @@ -365,7 +363,7 @@ impl Connector for ConnectorPDL { fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { - let scheduling = self.run_in_speculative_mode(pd, ctx, results); + let scheduling = self.run_in_speculative_mode(pd, ctx, delta_state); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. @@ -389,13 +387,13 @@ impl Connector for ConnectorPDL { // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed let solution_message = self.generate_initial_solution_for_branch(branch_id, ctx); if let Some(valid_solution) = solution_message { - self.submit_sync_solution(valid_solution, ctx, results); + self.submit_sync_solution(valid_solution, ctx, delta_state); } else { // Branch is actually invalid, but we only just figured // it out. We need to mark it as invalid to prevent // future use Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id); - if branch_id == self.sync_finished_last_handled { + if branch_id.index == self.sync_finished_last_handled { self.sync_finished_last_handled = self.sync_finished.last; } @@ -414,7 +412,7 @@ impl Connector for ConnectorPDL { return scheduling; } else { - let scheduling = self.run_in_deterministic_mode(pd, ctx, results); + let scheduling = self.run_in_deterministic_mode(pd, ctx, delta_state); return scheduling; } } @@ -456,7 +454,7 @@ impl ConnectorPDL { /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate. pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed - debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints + debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints // TODO: Optimize, use some kind of temp workspace vector let mut execution_path_branch_ids = Vec::new(); @@ -513,7 +511,7 @@ impl ConnectorPDL { let port_index = port_index.unwrap(); let mapping = self.ports.get_port(branch_index, port_index); - if mapping.last_registered_branch_id != expected_branch_id { + if mapping.last_registered_branch_id != *expected_branch_id { // Not the expected interaction on this port, constraint not satisfied continue 'branch_loop; } @@ -571,9 +569,9 @@ impl ConnectorPDL { }; match new_solution.add_or_check_constraint(peer_connector_id, constraint) { - None => continue 'branch_loop, - Some(false) => continue 'branch_loop, - Some(true) => {}, + Err(_) => continue 'branch_loop, + Ok(false) => continue 'branch_loop, + Ok(true) => {}, } } @@ -599,7 +597,7 @@ impl ConnectorPDL { // Already committed to something. So will commit to this if it // takes precedence over the current solution message.comparison_number > *previous_comparison || - (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_comparison.0) + (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_origin.0) }, None => { // Not yet committed to a solution, so commit to this one @@ -844,7 +842,7 @@ impl ConnectorPDL { // Put in run results for thread to pick up and transfer to // the correct connector inbox. port_mapping.mark_definitive(branch.index, 1); - let message = OutgoingDataMessage { + let message = DataMessage{ sending_port: local_port_id, sender_prev_branch_id: BranchId::new_invalid(), sender_cur_branch_id: branch.index, @@ -858,7 +856,7 @@ impl ConnectorPDL { Self::release_ports_during_sync(&mut self.ports, branch, &results.ports); results.ports.clear(); - results.outbox.push(OutgoingMessage::Data(message)); + results.outbox.push(MessageContents::Data(message)); return ConnectorScheduling::Immediate } else { branch.sync_state = SpeculativeState::Inconsistent; @@ -948,16 +946,16 @@ impl ConnectorPDL { // linked lists inside of the vector of branches. /// Pops from front of linked-list branch queue. - fn pop_branch_from_queue(branches: &mut Vec, queue: &mut BranchQueue) -> &mut Branch { + fn pop_branch_from_queue<'a>(branches: &'a mut Vec, queue: &mut BranchQueue) -> &'a mut Branch { debug_assert!(queue.first != 0); let branch = &mut branches[queue.first as usize]; - *queue.first = branch.next_branch_in_queue.unwrap_or(0); + queue.first = branch.next_branch_in_queue.unwrap_or(0); branch.next_branch_in_queue = None; - if *queue.first == 0 { + if queue.first == 0 { // No more entries in queue - debug_assert_eq!(*queue.last, branch.index.index); - *queue.last = 0; + debug_assert_eq!(queue.last, branch.index.index); + queue.last = 0; } return branch; @@ -970,17 +968,17 @@ impl ConnectorPDL { debug_assert!(to_push.is_valid()); let to_push = to_push.index; - if *queue.last == 0 { + if queue.last == 0 { // No branches in the queue at all - debug_assert_eq!(*queue.first, 0); + debug_assert_eq!(queue.first, 0); branches[to_push as usize].next_branch_in_queue = None; - *queue.first = to_push; - *queue.last = to_push; + queue.first = to_push; + queue.last = to_push; } else { // Pre-existing branch in the queue - debug_assert_ne!(*queue.first, 0); - branches[*queue.last as usize].next_branch_in_queue = Some(to_push); - *queue.last = to_push; + debug_assert_ne!(queue.first, 0); + branches[queue.last as usize].next_branch_in_queue = Some(to_push); + queue.last = to_push; } } @@ -1050,6 +1048,7 @@ impl ConnectorPDL { /// Releasing ownership of ports during a sync-session. Will provide an /// error if the port was already used during a sync block. fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + todo!("unfinished: add port properties during final solution-commit msgs"); debug_assert!(branch.index.is_valid()); // branch in sync mode for port_id in port_ids { @@ -1064,7 +1063,7 @@ impl ConnectorPDL { } for delta in &branch.ports_delta { - if delta.port_id == port_id { + if delta.port_id == *port_id { // We cannot have acquired this port, because the // call to `ports.get_port_index` returned an index. debug_assert!(!delta.acquired); @@ -1100,6 +1099,7 @@ impl ConnectorPDL { /// Acquiring ports during a sync-session. fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + todo!("unfinished: add port properties during final solution-commit msgs"); debug_assert!(branch.index.is_valid()); // branch in sync mode 'port_loop: for port_id in port_ids { @@ -1175,7 +1175,7 @@ impl ConnectorPDL { // sender and one for the receiver, ensuring it was not used. // TODO: This will fail if a port is passed around multiple times. // maybe a special "passed along" entry in `ports_delta`. - if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) { + if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)).unwrap() { return None; } @@ -1202,7 +1202,7 @@ impl ConnectorPDL { SyncBranchConstraint::SilentPort(port.peer_id) }; - if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() { + if !sync_message.add_or_check_constraint(port.peer_connector, constraint).unwrap() { return None; } } @@ -1268,6 +1268,7 @@ pub(crate) struct RunDeltaState { // state changes and try to apply them. pub outbox: Vec, pub new_connectors: Vec, + pub new_ports: Vec, // Workspaces pub ports: Vec, } @@ -1279,11 +1280,13 @@ impl RunDeltaState { RunDeltaState{ outbox: Vec::with_capacity(64), new_connectors: Vec::new(), + new_ports: Vec::new(), ports: Vec::with_capacity(64), } } } +#[derive(Eq, PartialEq)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time @@ -1300,7 +1303,7 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve // This is an actual port let cur_port = PortIdLocal::new(port_id.0.u32_suffix); for prev_port in ports.iter() { - if prev_port == cur_port { + if *prev_port == cur_port { // Already added return; }