From 328f04b6612f722c9f33aca1eacfdb668eff82a8 2021-11-09 12:42:43 From: MH Date: 2021-11-09 12:42:43 Subject: [PATCH] Initial pass of fixing compiler errors --- diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index 8f156102d6828c75c8a77f8b7bdb5dc15b3f46c8..297585f7909df4eb0857c86295c3e2782ff1e78c 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -3,7 +3,7 @@ use std::ops::{Index, IndexMut}; use crate::protocol::ComponentState; use crate::protocol::eval::{Value, ValueGroup}; -use crate::runtime2::port::{Port, PortIdLocal}; +use crate::runtime2::port::PortIdLocal; /// Generic branch ID. A component will always have one branch: the /// non-speculative branch. This branch has ID 0. Hence in a speculative context @@ -78,14 +78,14 @@ impl Branch { /// parent of the new branch within the execution tree. fn new_sync(new_index: u32, parent_branch: &Branch) -> Self { debug_assert!( - (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) || - (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) - ); + (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) || + (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) + ); // forking from non-sync, or forking from a branching point debug_assert!(parent_branch.prepared_channel.is_none()); Branch { id: BranchId::new(new_index), - parent_id: parent_branch.index, + parent_id: parent_branch.id, code_state: parent_branch.code_state.clone(), sync_state: SpeculativeState::RunningInSync, awaiting_port: parent_branch.awaiting_port, @@ -130,7 +130,7 @@ impl BranchQueue { const NUM_QUEUES: usize = 3; -#[derive(PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq)] pub(crate) enum QueueKind { Runnable, AwaitingMessage, @@ -160,7 +160,7 @@ pub(crate) struct ExecTree { // All branches. the `parent_id` field in each branch implies the shape of // the tree. Branches are index stable throughout a sync round. pub branches: Vec, - pub queues: [BranchQueue; NUM_QUEUES] + queues: [BranchQueue; NUM_QUEUES] } impl ExecTree { @@ -235,7 +235,7 @@ impl ExecTree { branch.next_in_queue.index as usize }, None => { - queue.first as usize + queue.first.index as usize } }; @@ -322,7 +322,7 @@ impl IndexMut for ExecTree { } } -pub struct BranchQueueIter<'a> { +pub(crate) struct BranchQueueIter<'a> { branches: &'a [Branch], index: usize, } @@ -342,7 +342,7 @@ impl<'a> Iterator for BranchQueueIter<'a> { } } -pub struct BranchParentIter<'a> { +pub(crate) struct BranchParentIter<'a> { branches: &'a [Branch], index: usize, } diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs index 265ef4f799deac03885e89be24fe072935123a7b..b716d3fa20090f606e03cd887197bcb116e49753 100644 --- a/src/runtime2/connector2.rs +++ b/src/runtime2/connector2.rs @@ -36,7 +36,7 @@ use crate::protocol::{RunContext, RunResult}; use crate::runtime2::consensus::find_ports_in_value_group; use crate::runtime2::port::PortKind; -use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; +use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency}; use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy, PublicInbox}; use super::native::Connector; @@ -145,9 +145,10 @@ impl ConnectorPDL { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. debug_assert!(ctx.workspace_branches.is_empty()); - self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut ctx.workspace_branches); + let mut branches = Vec::new(); // TODO: @Remove + self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches); - for branch_id in ctx.workspace_branches.drain(..) { + for branch_id in branches.drain(..) { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); @@ -185,7 +186,7 @@ impl ConnectorPDL { branch_id, consensus: &self.consensus, received: &branch.inbox, - scheduler: *sched_ctx, + scheduler: sched_ctx, prepared_channel: branch.prepared_channel.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -308,7 +309,7 @@ impl ConnectorPDL { branch_id: branch.id, consensus: &self.consensus, received: &branch.inbox, - scheduler: *sched_ctx, + scheduler: sched_ctx, prepared_channel: branch.prepared_channel.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -320,9 +321,9 @@ impl ConnectorPDL { return ConnectorScheduling::Exit; }, RunResult::ComponentAtSyncStart => { - let current_ports = comp_ctx.notify_sync_start(); + comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); - self.consensus.start_sync(current_ports, comp_ctx); + self.consensus.start_sync(comp_ctx); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); return ConnectorScheduling::Immediate; diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index b65ea87882def537dd7a4ad334e7c65a9a87f79c..1affea56d580a3dcf628f13a2f71142a82e4525c 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -14,13 +14,14 @@ struct BranchAnnotation { port_mapping: Vec, } +#[derive(Debug)] pub(crate) struct LocalSolution { component: ConnectorId, final_branch_id: BranchId, port_mapping: Vec<(ChannelId, BranchId)>, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub(crate) struct GlobalSolution { component_branches: Vec<(ConnectorId, BranchId)>, channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info @@ -53,7 +54,7 @@ pub(crate) struct Consensus { workspace_ports: Vec, } -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum Consistency { Valid, Inconsistent, @@ -88,7 +89,7 @@ impl Consensus { /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. - pub fn start_sync(&mut self, ports: &[Port], ctx: &ComponentCtxFancy) { + pub fn start_sync(&mut self, ctx: &ComponentCtxFancy) { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); debug_assert!(self.last_finished_handled.is_none()); @@ -98,7 +99,7 @@ impl Consensus { // We'll use the first "branch" (the non-sync one) to store our ports, // this allows cloning if we created a new branch. self.branch_annotations.push(BranchAnnotation{ - port_mapping: ports.iter() + port_mapping: ctx.get_ports().iter() .map(|v| PortAnnotation{ port_id: v.self_id, registered_id: None, @@ -237,6 +238,8 @@ impl Consensus { /// sending the message is consistent with the speculative state. pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) { debug_assert!(self.is_in_sync()); + let sync_header = self.create_sync_header(ctx); + let branch = &mut self.branch_annotations[branch_id.index as usize]; if cfg!(debug_assertions) { @@ -246,7 +249,7 @@ impl Consensus { debug_assert!(port.expected_firing == None || port.expected_firing == Some(true)); } - // Check for ports that are begin sent + // Check for ports that are being sent debug_assert!(self.workspace_ports.is_empty()); find_ports_in_value_group(content, &mut self.workspace_ports); if !self.workspace_ports.is_empty() { @@ -257,7 +260,6 @@ impl Consensus { // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); - let sync_header = self.create_sync_header(ctx); let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ expected_mapping: branch.port_mapping.clone(), @@ -309,7 +311,7 @@ impl Consensus { SyncContent::GlobalSolution(solution) => { // Take branch of interest and return it. let (_, branch_id) = solution.component_branches.iter() - .find(|(connector_id, _)| connector_id == ctx.id) + .find(|(connector_id, _)| *connector_id == ctx.id) .unwrap(); return Some(*branch_id); } @@ -389,14 +391,14 @@ impl Consensus { // messages. We should also let all of our peers know self.highest_connector_id = sync_header.highest_component_id; for encountered_id in self.encountered_peers.iter() { - if encountered_id == sync_header.sending_component_id { + if *encountered_id == sync_header.sending_component_id { // Don't need to send it to this one continue } let message = SyncMessageFancy{ sync_header: self.create_sync_header(ctx), - target_component_id: encountered_id, + target_component_id: *encountered_id, content: SyncContent::Notification, }; ctx.submit_message(MessageFancy::Sync(message)); @@ -587,7 +589,7 @@ impl SolutionCombiner { } let mut num_ports_in_peers = 0; - for peer in component_peers { + for peer in &component_peers { num_ports_in_peers += peer.involved_channels.len(); } @@ -823,7 +825,7 @@ impl SolutionCombiner { for (channel_id, branch_id) in solution.channel_mapping.iter().copied() { match final_mapping.iter().find(|(v, _)| *v == channel_id) { Some((_, encountered_branch_id)) => { - debug_assert_eq!(encountered_branch_id, branch_id); + debug_assert_eq!(*encountered_branch_id, branch_id); total_num_checked += 1; }, None => { diff --git a/src/runtime2/inbox2.rs b/src/runtime2/inbox2.rs index 9a8754e20b72a68b488bf80f25adf85c699ad4c8..a38bdd08b1ace47a29d4c6fcd2cddfa67261444c 100644 --- a/src/runtime2/inbox2.rs +++ b/src/runtime2/inbox2.rs @@ -98,12 +98,12 @@ impl PublicInbox { } } - pub fn insert_message(&self, message: MessageFancy) { + pub(crate) fn insert_message(&self, message: MessageFancy) { let mut lock = self.messages.lock().unwrap(); lock.push_back(message); } - pub fn take_message(&self) -> Option { + pub(crate) fn take_message(&self) -> Option { let mut lock = self.messages.lock().unwrap(); return lock.pop_front(); } diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index a627f41c6d1ff9b47a00965c43b13e9cc8bc2b07..800fdf37cab02cfb191f2ec5a12d1422e237f9ed 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -55,7 +55,7 @@ impl ConnectorKey { /// A kind of token that allows shared access to a connector. Multiple threads /// may hold this -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct ConnectorId(pub u32); impl ConnectorId { diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 27d25194742f8f775b517ce22d62026124c0a626..de2a0fd4a5cfd1528bb6210ade13015c83478af6 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -8,7 +8,6 @@ use crate::protocol::eval::ValueGroup; use super::{ConnectorKey, ConnectorId, RuntimeInner}; use super::scheduler::{SchedulerCtx, ComponentCtxFancy}; use super::port::{Port, PortIdLocal, Channel, PortKind}; -use super::branch::{Branch}; use super::consensus::find_ports_in_value_group; use super::connector2::{ConnectorScheduling, ConnectorPDL}; use super::inbox2::{MessageFancy, ControlContent, ControlMessageFancy}; @@ -147,7 +146,7 @@ impl ApplicationInterface { // We own all ports, so remove them on this side for initial_port in &initial_ports { - let position = self.owned_ports.iter().position(|v| *v == initial_port).unwrap(); + let position = self.owned_ports.iter().position(|v| v == initial_port).unwrap(); self.owned_ports.remove(position); } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 60626e93624e74d23fdd3ea1d7cc795385b9295f..9ed1ce2cf657e3b23eb5de8932b9253a740d1f03 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -3,12 +3,12 @@ use std::sync::Arc; use std::sync::atomic::Ordering; use crate::runtime2::inbox2::ControlContent; -use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey, ConnectorVariant}; +use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::branch::{BranchId}; use super::connector2::{ConnectorPDL, ConnectorScheduling}; -use super::inbox2::{MessageFancy, DataMessageFancy, SyncMessageFancy, ControlMessageFancy}; +use super::inbox2::{MessageFancy, DataMessageFancy, ControlMessageFancy}; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] @@ -127,17 +127,20 @@ impl Scheduler { let connector_id = scheduled.ctx_fancy.id; while let Some(message) = scheduled.public.inbox.take_message() { - // Check for rerouting - self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message)); - if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { - self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0)); - self.runtime.send_message(other_connector_id, message); - continue; + // Check if the message has to be rerouted because we have moved the + // target port to another component. + self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message)); + if let Some(target_port) = Self::get_data_message_target_port(&message) { + if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { + self.debug_conn(connector_id, " ... Rerouting the message"); + self.runtime.send_message(other_component_id, message); + continue; + } } - // Handle special messages here, messages for the component - // will be added to the inbox. - self.debug_conn(connector_id, " ... Handling message myself"); + // If here, then we should handle the message + self.debug_conn(connector_id, " ... Handling the message"); + match message { MessageFancy::Control(message) => { match message.content { @@ -197,7 +200,7 @@ impl Scheduler { let connector_id = scheduled.ctx_fancy.id; // Handling any messages that were sent - while let Some(mut message) = scheduled.ctx_fancy.outbox.pop_front() { + while let Some(message) = scheduled.ctx_fancy.outbox.pop_front() { self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); let target_component_id = match &message { @@ -231,20 +234,22 @@ impl Scheduler { while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() { match state_change { ComponentStateChange::CreatedComponent(component, initial_ports) => { - // Add the new connector to the global registry + // Creating a new component. The creator needs to relinquish + // ownership of the ports that are given to the new + // component. All data messages that were intended for that + // port also needs to be transferred. let new_key = self.runtime.create_pdl_component(component, false); let new_connector = self.runtime.get_component_private(&new_key); - // Transfer ports for port_id in initial_ports { // Transfer messages associated with the transferred port let mut message_idx = 0; while message_idx < scheduled.ctx_fancy.inbox_messages.len() { let message = &scheduled.ctx_fancy.inbox_messages[message_idx]; - if message.receiving_port == *port_id { + if Self::get_data_message_target_port(message) == Some(port_id) { // Need to transfer this message - let taken_message = scheduled.ctx_fancy.inbox_messages.remove(message_idx); - new_connector.ctx_fancy.inbox_messages.push(taken_message); + let message = scheduled.ctx_fancy.inbox_messages.remove(message_idx); + new_connector.ctx_fancy.inbox_messages.push(message); } else { message_idx += 1; } @@ -252,7 +257,7 @@ impl Scheduler { // Transfer the port itself let port_index = scheduled.ctx_fancy.ports.iter() - .position(|v| v.self_id == *port_id) + .position(|v| v.self_id == port_id) .unwrap(); let port = scheduled.ctx_fancy.ports.remove(port_index); new_connector.ctx_fancy.ports.push(port.clone()); @@ -324,6 +329,15 @@ impl Scheduler { } } + #[inline] + fn get_data_message_target_port(message: &MessageFancy) -> Option { + if let MessageFancy::Data(message) = message { + return Some(message.data_header.target_port) + } + + return None + } + // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); @@ -404,6 +418,11 @@ impl ComponentCtxFancy { self.state_changes.push_back(ComponentStateChange::CreatedPort(port)) } + #[inline] + pub(crate) fn get_ports(&self) -> &[Port] { + return self.ports.as_slice(); + } + pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> { return self.ports.iter().find(|v| v.self_id == id); } @@ -413,15 +432,14 @@ impl ComponentCtxFancy { } /// Notify that component will enter a sync block. Note that after calling - /// this function you must allow the scheduler to pick up the changes in - /// the context by exiting your `Component::run` function with an - /// appropriate scheduling value. - pub(crate) fn notify_sync_start(&mut self) -> &[Port] { + /// this function you must allow the scheduler to pick up the changes in the + /// context by exiting your code-executing loop, and to continue executing + /// code the next time the scheduler picks up the component. + pub(crate) fn notify_sync_start(&mut self) { debug_assert!(!self.is_in_sync); self.is_in_sync = true; self.changed_in_sync = true; - return &self.ports } #[inline] @@ -614,7 +632,7 @@ impl ControlMessageHandler { /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. - pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option { + pub fn should_reroute(&self, target_port: PortIdLocal) -> Option { for entry in &self.active { if let ControlVariant::ChangedPort(entry) = &entry.variant { if entry.target_port == target_port {