Changeset - 328f04b6612f
[Not reviewed]
0 7 0
MH - 4 years ago 2021-11-09 12:42:43
contact@maxhenger.nl
Initial pass of fixing compiler errors
7 files changed with 77 insertions and 57 deletions:
0 comments (0 inline, 0 general)
src/runtime2/branch.rs
Show inline comments
 
@@ -3,7 +3,7 @@ use std::ops::{Index, IndexMut};
 

	
 
use crate::protocol::ComponentState;
 
use crate::protocol::eval::{Value, ValueGroup};
 
use crate::runtime2::port::{Port, PortIdLocal};
 
use crate::runtime2::port::PortIdLocal;
 

	
 
/// Generic branch ID. A component will always have one branch: the
 
/// non-speculative branch. This branch has ID 0. Hence in a speculative context
 
@@ -78,14 +78,14 @@ impl Branch {
 
    /// parent of the new branch within the execution tree.
 
    fn new_sync(new_index: u32, parent_branch: &Branch) -> Self {
 
        debug_assert!(
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) ||
 
                (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        );
 
            (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) ||
 
            (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint)
 
        ); // forking from non-sync, or forking from a branching point
 
        debug_assert!(parent_branch.prepared_channel.is_none());
 

	
 
        Branch {
 
            id: BranchId::new(new_index),
 
            parent_id: parent_branch.index,
 
            parent_id: parent_branch.id,
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
@@ -130,7 +130,7 @@ impl BranchQueue {
 

	
 
const NUM_QUEUES: usize = 3;
 

	
 
#[derive(PartialEq, Eq)]
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum QueueKind {
 
    Runnable,
 
    AwaitingMessage,
 
@@ -160,7 +160,7 @@ pub(crate) struct ExecTree {
 
    // All branches. the `parent_id` field in each branch implies the shape of
 
    // the tree. Branches are index stable throughout a sync round.
 
    pub branches: Vec<Branch>,
 
    pub queues: [BranchQueue; NUM_QUEUES]
 
    queues: [BranchQueue; NUM_QUEUES]
 
}
 

	
 
impl ExecTree {
 
@@ -235,7 +235,7 @@ impl ExecTree {
 
                branch.next_in_queue.index as usize
 
            },
 
            None => {
 
                queue.first as usize
 
                queue.first.index as usize
 
            }
 
        };
 

	
 
@@ -322,7 +322,7 @@ impl IndexMut<BranchId> for ExecTree {
 
    }
 
}
 

	
 
pub struct BranchQueueIter<'a> {
 
pub(crate) struct BranchQueueIter<'a> {
 
    branches: &'a [Branch],
 
    index: usize,
 
}
 
@@ -342,7 +342,7 @@ impl<'a> Iterator for BranchQueueIter<'a> {
 
    }
 
}
 

	
 
pub struct BranchParentIter<'a> {
 
pub(crate) struct BranchParentIter<'a> {
 
    branches: &'a [Branch],
 
    index: usize,
 
}
src/runtime2/connector2.rs
Show inline comments
 
@@ -36,7 +36,7 @@ use crate::protocol::{RunContext, RunResult};
 
use crate::runtime2::consensus::find_ports_in_value_group;
 
use crate::runtime2::port::PortKind;
 

	
 
use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency};
 
use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy, PublicInbox};
 
use super::native::Connector;
 
@@ -145,9 +145,10 @@ impl ConnectorPDL {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        debug_assert!(ctx.workspace_branches.is_empty());
 
        self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut ctx.workspace_branches);
 
        let mut branches = Vec::new(); // TODO: @Remove
 
        self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches);
 

	
 
        for branch_id in ctx.workspace_branches.drain(..) {
 
        for branch_id in branches.drain(..) {
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
@@ -185,7 +186,7 @@ impl ConnectorPDL {
 
            branch_id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: *sched_ctx,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 
@@ -308,7 +309,7 @@ impl ConnectorPDL {
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: *sched_ctx,
 
            scheduler: sched_ctx,
 
            prepared_channel: branch.prepared_channel.take(),
 
        };
 
        let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description);
 
@@ -320,9 +321,9 @@ impl ConnectorPDL {
 
                return ConnectorScheduling::Exit;
 
            },
 
            RunResult::ComponentAtSyncStart => {
 
                let current_ports = comp_ctx.notify_sync_start();
 
                comp_ctx.notify_sync_start();
 
                let sync_branch_id = self.tree.start_sync();
 
                self.consensus.start_sync(current_ports, comp_ctx);
 
                self.consensus.start_sync(comp_ctx);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
src/runtime2/consensus.rs
Show inline comments
 
@@ -14,13 +14,14 @@ struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
}
 

	
 
#[derive(Clone)]
 
#[derive(Debug, Clone)]
 
pub(crate) struct GlobalSolution {
 
    component_branches: Vec<(ConnectorId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
 
@@ -53,7 +54,7 @@ pub(crate) struct Consensus {
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
    Valid,
 
    Inconsistent,
 
@@ -88,7 +89,7 @@ impl Consensus {
 
    /// Sets up the consensus algorithm for a new synchronous round. The
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ports: &[Port], ctx: &ComponentCtxFancy) {
 
    pub fn start_sync(&mut self, ctx: &ComponentCtxFancy) {
 
        debug_assert!(!self.highest_connector_id.is_valid());
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(self.last_finished_handled.is_none());
 
@@ -98,7 +99,7 @@ impl Consensus {
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
        self.branch_annotations.push(BranchAnnotation{
 
            port_mapping: ports.iter()
 
            port_mapping: ctx.get_ports().iter()
 
                .map(|v| PortAnnotation{
 
                    port_id: v.self_id,
 
                    registered_id: None,
 
@@ -237,6 +238,8 @@ impl Consensus {
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let sync_header = self.create_sync_header(ctx);
 

	
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 

	
 
        if cfg!(debug_assertions) {
 
@@ -246,7 +249,7 @@ impl Consensus {
 
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
 
        }
 

	
 
        // Check for ports that are begin sent
 
        // Check for ports that are being sent
 
        debug_assert!(self.workspace_ports.is_empty());
 
        find_ports_in_value_group(content, &mut self.workspace_ports);
 
        if !self.workspace_ports.is_empty() {
 
@@ -257,7 +260,6 @@ impl Consensus {
 
        // TODO: Handle multiple firings. Right now we just assign the current
 
        //  branch to the `None` value because we know we can only send once.
 
        debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none());
 
        let sync_header = self.create_sync_header(ctx);
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 
        let data_header = DataHeader{
 
            expected_mapping: branch.port_mapping.clone(),
 
@@ -309,7 +311,7 @@ impl Consensus {
 
            SyncContent::GlobalSolution(solution) => {
 
                // Take branch of interest and return it.
 
                let (_, branch_id) = solution.component_branches.iter()
 
                    .find(|(connector_id, _)| connector_id == ctx.id)
 
                    .find(|(connector_id, _)| *connector_id == ctx.id)
 
                    .unwrap();
 
                return Some(*branch_id);
 
            }
 
@@ -389,14 +391,14 @@ impl Consensus {
 
            // messages. We should also let all of our peers know
 
            self.highest_connector_id = sync_header.highest_component_id;
 
            for encountered_id in self.encountered_peers.iter() {
 
                if encountered_id == sync_header.sending_component_id {
 
                if *encountered_id == sync_header.sending_component_id {
 
                    // Don't need to send it to this one
 
                    continue
 
                }
 

	
 
                let message = SyncMessageFancy{
 
                    sync_header: self.create_sync_header(ctx),
 
                    target_component_id: encountered_id,
 
                    target_component_id: *encountered_id,
 
                    content: SyncContent::Notification,
 
                };
 
                ctx.submit_message(MessageFancy::Sync(message));
 
@@ -587,7 +589,7 @@ impl SolutionCombiner {
 
            }
 

	
 
            let mut num_ports_in_peers = 0;
 
            for peer in component_peers {
 
            for peer in &component_peers {
 
                num_ports_in_peers += peer.involved_channels.len();
 
            }
 

	
 
@@ -823,7 +825,7 @@ impl SolutionCombiner {
 
            for (channel_id, branch_id) in solution.channel_mapping.iter().copied() {
 
                match final_mapping.iter().find(|(v, _)| *v == channel_id) {
 
                    Some((_, encountered_branch_id)) => {
 
                        debug_assert_eq!(encountered_branch_id, branch_id);
 
                        debug_assert_eq!(*encountered_branch_id, branch_id);
 
                        total_num_checked += 1;
 
                    },
 
                    None => {
src/runtime2/inbox2.rs
Show inline comments
 
@@ -98,12 +98,12 @@ impl PublicInbox {
 
        }
 
    }
 

	
 
    pub fn insert_message(&self, message: MessageFancy) {
 
    pub(crate) fn insert_message(&self, message: MessageFancy) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_message(&self) -> Option<MessageFancy> {
 
    pub(crate) fn take_message(&self) -> Option<MessageFancy> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
src/runtime2/mod.rs
Show inline comments
 
@@ -55,7 +55,7 @@ impl ConnectorKey {
 

	
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
 
pub struct ConnectorId(pub u32);
 

	
 
impl ConnectorId {
src/runtime2/native.rs
Show inline comments
 
@@ -8,7 +8,6 @@ use crate::protocol::eval::ValueGroup;
 
use super::{ConnectorKey, ConnectorId, RuntimeInner};
 
use super::scheduler::{SchedulerCtx, ComponentCtxFancy};
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::branch::{Branch};
 
use super::consensus::find_ports_in_value_group;
 
use super::connector2::{ConnectorScheduling, ConnectorPDL};
 
use super::inbox2::{MessageFancy, ControlContent, ControlMessageFancy};
 
@@ -147,7 +146,7 @@ impl ApplicationInterface {
 

	
 
        // We own all ports, so remove them on this side
 
        for initial_port in &initial_ports {
 
            let position = self.owned_ports.iter().position(|v| *v == initial_port).unwrap();
 
            let position = self.owned_ports.iter().position(|v| v == initial_port).unwrap();
 
            self.owned_ports.remove(position);
 
        }
 

	
src/runtime2/scheduler.rs
Show inline comments
 
@@ -3,12 +3,12 @@ use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::runtime2::inbox2::ControlContent;
 

	
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey, ConnectorVariant};
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::branch::{BranchId};
 
use super::connector2::{ConnectorPDL, ConnectorScheduling};
 
use super::inbox2::{MessageFancy, DataMessageFancy, SyncMessageFancy, ControlMessageFancy};
 
use super::inbox2::{MessageFancy, DataMessageFancy, ControlMessageFancy};
 

	
 
// Because it contains pointers we're going to do a copy by value on this one
 
#[derive(Clone, Copy)]
 
@@ -127,17 +127,20 @@ impl Scheduler {
 
        let connector_id = scheduled.ctx_fancy.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check for rerouting
 
            self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message));
 
            if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) {
 
                self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0));
 
                self.runtime.send_message(other_connector_id, message);
 
                continue;
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message));
 
            if let Some(target_port) = Self::get_data_message_target_port(&message) {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 
                    self.runtime.send_message(other_component_id, message);
 
                    continue;
 
                }
 
            }
 

	
 
            // Handle special messages here, messages for the component
 
            // will be added to the inbox.
 
            self.debug_conn(connector_id, " ... Handling message myself");
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
 

	
 
            match message {
 
                MessageFancy::Control(message) => {
 
                    match message.content {
 
@@ -197,7 +200,7 @@ impl Scheduler {
 
        let connector_id = scheduled.ctx_fancy.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(mut message) = scheduled.ctx_fancy.outbox.pop_front() {
 
        while let Some(message) = scheduled.ctx_fancy.outbox.pop_front() {
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message));
 

	
 
            let target_component_id = match &message {
 
@@ -231,20 +234,22 @@ impl Scheduler {
 
        while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Add the new connector to the global registry
 
                    // Creating a new component. The creator needs to relinquish
 
                    // ownership of the ports that are given to the new
 
                    // component. All data messages that were intended for that
 
                    // port also needs to be transferred.
 
                    let new_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                    // Transfer ports
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        let mut message_idx = 0;
 
                        while message_idx < scheduled.ctx_fancy.inbox_messages.len() {
 
                            let message = &scheduled.ctx_fancy.inbox_messages[message_idx];
 
                            if message.receiving_port == *port_id {
 
                            if Self::get_data_message_target_port(message) == Some(port_id) {
 
                                // Need to transfer this message
 
                                let taken_message = scheduled.ctx_fancy.inbox_messages.remove(message_idx);
 
                                new_connector.ctx_fancy.inbox_messages.push(taken_message);
 
                                let message = scheduled.ctx_fancy.inbox_messages.remove(message_idx);
 
                                new_connector.ctx_fancy.inbox_messages.push(message);
 
                            } else {
 
                                message_idx += 1;
 
                            }
 
@@ -252,7 +257,7 @@ impl Scheduler {
 

	
 
                        // Transfer the port itself
 
                        let port_index = scheduled.ctx_fancy.ports.iter()
 
                            .position(|v| v.self_id == *port_id)
 
                            .position(|v| v.self_id == port_id)
 
                            .unwrap();
 
                        let port = scheduled.ctx_fancy.ports.remove(port_index);
 
                        new_connector.ctx_fancy.ports.push(port.clone());
 
@@ -324,6 +329,15 @@ impl Scheduler {
 
        }
 
    }
 

	
 
    #[inline]
 
    fn get_data_message_target_port(message: &MessageFancy) -> Option<PortIdLocal> {
 
        if let MessageFancy::Data(message) = message {
 
            return Some(message.data_header.target_port)
 
        }
 

	
 
        return None
 
    }
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
@@ -404,6 +418,11 @@ impl ComponentCtxFancy {
 
        self.state_changes.push_back(ComponentStateChange::CreatedPort(port))
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_ports(&self) -> &[Port] {
 
        return self.ports.as_slice();
 
    }
 

	
 
    pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.self_id == id);
 
    }
 
@@ -413,15 +432,14 @@ impl ComponentCtxFancy {
 
    }
 

	
 
    /// Notify that component will enter a sync block. Note that after calling
 
    /// this function you must allow the scheduler to pick up the changes in
 
    /// the context by exiting your `Component::run` function with an
 
    /// appropriate scheduling value.
 
    pub(crate) fn notify_sync_start(&mut self) -> &[Port] {
 
    /// this function you must allow the scheduler to pick up the changes in the
 
    /// context by exiting your code-executing loop, and to continue executing
 
    /// code the next time the scheduler picks up the component.
 
    pub(crate) fn notify_sync_start(&mut self) {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        self.is_in_sync = true;
 
        self.changed_in_sync = true;
 
        return &self.ports
 
    }
 

	
 
    #[inline]
 
@@ -614,7 +632,7 @@ impl ControlMessageHandler {
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
 
    pub fn should_reroute(&self, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for entry in &self.active {
 
            if let ControlVariant::ChangedPort(entry) = &entry.variant {
 
                if entry.target_port == target_port {
0 comments (0 inline, 0 general)