Changeset - edb4c4be7e45
[Not reviewed]
0 9 0
MH - 4 years ago 2021-11-08 21:21:31
contact@maxhenger.nl
fixing compilation errors
9 files changed with 178 insertions and 129 deletions:
0 comments (0 inline, 0 general)
src/collections/sets.rs
Show inline comments
 
@@ -89,7 +89,7 @@ impl<T: Eq> VecSet<T> {
 
    }
 

	
 
    #[inline]
 
    pub fn iter(&self) -> impl Iterator<Item=T> {
 
    pub fn iter(&self) -> impl Iterator<Item=&T> {
 
        return self.inner.iter();
 
    }
 

	
src/runtime2/branch.rs
Show inline comments
 
@@ -283,7 +283,7 @@ impl ExecTree {
 
    /// using the provided branch as the final sync result.
 
    pub fn end_sync(&mut self, branch_id: BranchId) {
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(self.iter_queue(QueueKind::FinishedSync).any(|v| v.id == branch_id));
 
        debug_assert!(self.iter_queue(QueueKind::FinishedSync, None).any(|v| v.id == branch_id));
 

	
 
        // Swap indicated branch into the first position
 
        self.branches.swap(0, branch_id.index as usize);
 
@@ -294,7 +294,10 @@ impl ExecTree {
 
        branch.id = BranchId::new_invalid();
 
        branch.parent_id = BranchId::new_invalid();
 
        branch.sync_state = SpeculativeState::RunningNonSync;
 
        debug_assert!(!branch.awaiting_port.is_valid());
 
        branch.next_in_queue = BranchId::new_invalid();
 
        branch.inbox.clear();
 
        debug_assert!(branch.prepared_channel.is_none());
 

	
 
        // Clear out all the queues
 
        for queue_idx in 0..NUM_QUEUES {
src/runtime2/connector2.rs
Show inline comments
 
@@ -38,8 +38,7 @@ use crate::runtime2::port::PortKind;
 

	
 
use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency};
 
use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy};
 
use super::inbox::PublicInbox;
 
use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy, PublicInbox};
 
use super::native::Connector;
 
use super::port::PortIdLocal;
 
use super::scheduler::{ComponentCtxFancy, SchedulerCtx};
 
@@ -79,7 +78,7 @@ struct ConnectorRunContext<'a> {
 
    prepared_channel: Option<(Value, Value)>,
 
}
 

	
 
impl RunContext for ConnectorRunContext{
 
impl<'a> RunContext for ConnectorRunContext<'a>{
 
    fn did_put(&mut self, port: PortId) -> bool {
 
        let port_id = PortIdLocal::new(port.0.u32_suffix);
 
        let annotation = self.consensus.get_annotation(self.branch_id, port_id);
 
@@ -110,7 +109,10 @@ impl Connector for ConnectorPDL {
 
        self.handle_new_messages(comp_ctx);
 
        if self.tree.is_in_sync() {
 
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
            self.consensus.handle_new_finished_sync_branches();
 
            if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) {
 
                todo!("call handler");
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
@@ -132,8 +134,8 @@ impl ConnectorPDL {
 
    pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) {
 
        while let Some(message) = ctx.read_next_message() {
 
            match message {
 
                MessageFancy::Data(message) => handle_new_data_message(message, ctx),
 
                MessageFancy::Sync(message) => handle_new_sync_message(message, ctx),
 
                MessageFancy::Data(message) => self.handle_new_data_message(message, ctx),
 
                MessageFancy::Sync(message) => self.handle_new_sync_message(message, ctx),
 
                MessageFancy::Control(_) => unreachable!("control message in component"),
 
            }
 
        }
 
@@ -143,8 +145,7 @@ impl ConnectorPDL {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        debug_assert!(ctx.workspace_branches.is_empty());
 
        self.consensus.handle_received_sync_header(&message.sync_header, ctx);
 
        self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut ctx.workspace_branches);
 
        self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut ctx.workspace_branches);
 

	
 
        for branch_id in ctx.workspace_branches.drain(..) {
 
            // This branch can receive, so fork and given it the message
 
@@ -161,8 +162,9 @@ impl ConnectorPDL {
 
    }
 

	
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) {
 
        self.consensus.handle_received_sync_header(&message.sync_header, ctx);
 
        self.consensus.handle_received_sync_message(message, ctx);
 
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
 

	
 
        }
 
    }
 

	
 
    // --- Running code
 
@@ -303,7 +305,7 @@ impl ConnectorPDL {
 
        debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync);
 

	
 
        let mut run_context = ConnectorRunContext{
 
            branch_id,
 
            branch_id: branch.id,
 
            consensus: &self.consensus,
 
            received: &branch.inbox,
 
            scheduler: *sched_ctx,
 
@@ -320,7 +322,7 @@ impl ConnectorPDL {
 
            RunResult::ComponentAtSyncStart => {
 
                let current_ports = comp_ctx.notify_sync_start();
 
                let sync_branch_id = self.tree.start_sync();
 
                self.consensus.start_sync(current_ports);
 
                self.consensus.start_sync(current_ports, comp_ctx);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
@@ -361,4 +363,17 @@ impl ConnectorPDL {
 
            _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result),
 
        }
 
    }
 

	
 
    pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtxFancy) {
 
        let mut fake_vec = Vec::new();
 
        self.tree.end_sync(solution_branch_id);
 
        self.consensus.end_sync(solution_branch_id, &mut fake_vec);
 

	
 
        for port in fake_vec {
 
            // TODO: Handle sent/received ports
 
            debug_assert!(ctx.get_port_by_id(port).is_some());
 
        }
 

	
 
        ctx.notify_sync_end(&[]);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/consensus.rs
Show inline comments
 
use std::path::Component;
 
use std::str::pattern::Pattern;
 
use crate::collections::VecSet;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::{BranchId, ExecTree, QueueKind};
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::inbox2::{DataHeader, DataMessageFancy, MessageFancy, SyncContent, SyncHeader, SyncMessageFancy};
 
use crate::runtime2::inbox::SyncMessage;
 
use crate::runtime2::port::{ChannelId, Port, PortIdLocal};
 
use crate::runtime2::scheduler::ComponentCtxFancy;
 
use super::inbox2::PortAnnotation;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind};
 
use super::ConnectorId;
 
use super::port::{ChannelId, Port, PortIdLocal};
 
use super::inbox2::{
 
    DataHeader, DataMessageFancy, MessageFancy,
 
    SyncContent, SyncHeader, SyncMessageFancy, PortAnnotation
 
};
 
use super::scheduler::ComponentCtxFancy;
 

	
 
struct BranchAnnotation {
 
    port_mapping: Vec<PortAnnotation>,
 
@@ -22,8 +22,8 @@ pub(crate) struct LocalSolution {
 

	
 
#[derive(Clone)]
 
pub(crate) struct GlobalSolution {
 
    branches: Vec<(ConnectorId, BranchId)>,
 
    port_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
 
    component_branches: Vec<(ConnectorId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info
 
}
 

	
 
// -----------------------------------------------------------------------------
 
@@ -88,10 +88,12 @@ impl Consensus {
 
    /// Sets up the consensus algorithm for a new synchronous round. The
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ports: &[Port]) {
 
    pub fn start_sync(&mut self, ports: &[Port], ctx: &ComponentCtxFancy) {
 
        debug_assert!(!self.highest_connector_id.is_valid());
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(self.last_finished_handled.is_none());
 
        debug_assert!(self.encountered_peers.is_empty());
 
        debug_assert!(self.solution_combiner.local.is_empty());
 

	
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
@@ -104,6 +106,9 @@ impl Consensus {
 
                })
 
                .collect(),
 
        });
 

	
 
        self.highest_connector_id = ctx.id;
 

	
 
    }
 

	
 
    /// Notifies the consensus algorithm that a new branch has appeared. Must be
 
@@ -172,7 +177,7 @@ impl Consensus {
 
    /// Generates sync messages for any branches that are at the end of the
 
    /// sync block. To find these branches, they should've been put in the
 
    /// "finished" queue in the execution tree.
 
    pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) {
 
    pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) -> Option<BranchId> {
 
        debug_assert!(self.is_in_sync());
 

	
 
        let mut last_branch_id = self.last_finished_handled;
 
@@ -194,12 +199,17 @@ impl Consensus {
 
                final_branch_id: branch.id,
 
                port_mapping: target_mapping,
 
            };
 
            self.send_or_store_local_solution(local_solution, ctx);
 
            let solution_branch = self.send_or_store_local_solution(local_solution, ctx);
 
            if solution_branch.is_some() {
 
                // No need to continue iterating, we've found the solution
 
                return solution_branch;
 
            }
 

	
 
            last_branch_id = Some(branch.id);
 
        }
 

	
 
        self.last_finished_handled = last_branch_id;
 
        return None;
 
    }
 

	
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<PortIdLocal>) {
 
@@ -213,7 +223,12 @@ impl Consensus {
 
            final_ports.push(port.port_id);
 
        }
 

	
 
        // Clear out internal storage
 
        // Clear out internal storage to defaults
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.last_finished_handled = None;
 
        self.encountered_peers.clear();
 
        self.solution_combiner.clear();
 
    }
 

	
 
    // --- Handling messages
 
@@ -289,12 +304,11 @@ impl Consensus {
 
            SyncContent::LocalSolution(solution) => {
 
                // We might be the leader, or earlier messages caused us to not
 
                // be the leader anymore.
 
                self.send_or_store_local_solution(solution, ctx);
 
                return None;
 
                return self.send_or_store_local_solution(solution, ctx);
 
            },
 
            SyncContent::GlobalSolution(solution) => {
 
                // Take branch of interest and return it.
 
                let (_, branch_id) = solution.branches.iter()
 
                let (_, branch_id) = solution.component_branches.iter()
 
                    .find(|(connector_id, _)| connector_id == ctx.id)
 
                    .unwrap();
 
                return Some(*branch_id);
 
@@ -402,11 +416,18 @@ impl Consensus {
 
        } // else: exactly equal, so do nothing
 
    }
 

	
 
    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) {
 
    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) -> Option<BranchId> {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) {
 
                for (connector_id, _) in global_solution.branches.iter().copied() {
 
                let mut my_final_branch_id = BranchId::new_invalid();
 
                for (connector_id, branch_id) in global_solution.component_branches.iter().copied() {
 
                    if connector_id == ctx.id {
 
                        // This is our solution branch
 
                        my_final_branch_id = branch_id;
 
                        continue;
 
                    }
 

	
 
                    let message = SyncMessageFancy{
 
                        sync_header: self.create_sync_header(ctx),
 
                        target_component_id: connector_id,
 
@@ -414,6 +435,11 @@ impl Consensus {
 
                    };
 
                    ctx.submit_message(MessageFancy::Sync(message));
 
                }
 

	
 
                debug_assert!(my_final_branch_id.is_valid());
 
                return Some(my_final_branch_id);
 
            } else {
 
                return None;
 
            }
 
        } else {
 
            // Someone else is the leader
 
@@ -423,6 +449,7 @@ impl Consensus {
 
                content: SyncContent::LocalSolution(solution),
 
            };
 
            ctx.submit_message(MessageFancy::Sync(message));
 
            return None;
 
        }
 
    }
 

	
 
@@ -454,7 +481,7 @@ impl Consensus {
 

	
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    port_mapping: Vec<(ChannelId, BranchId)>,
 
    channel_mapping: Vec<(ChannelId, BranchId)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
 
@@ -496,7 +523,7 @@ impl SolutionCombiner {
 
        let component_id = solution.component;
 
        let solution = MatchedLocalSolution{
 
            final_branch_id: solution.final_branch_id,
 
            port_mapping: solution.port_mapping,
 
            channel_mapping: solution.port_mapping,
 
            matches: Vec::new(),
 
        };
 

	
 
@@ -529,7 +556,7 @@ impl SolutionCombiner {
 
        // in the stored solutions which other components are peers of the new
 
        // one.
 
        if new_component {
 
            let cur_ports = &self.local[component_index].solutions[0].port_mapping;
 
            let cur_ports = &self.local[component_index].solutions[0].channel_mapping;
 
            let mut component_peers = Vec::new();
 

	
 
            // Find the matching components
 
@@ -539,22 +566,22 @@ impl SolutionCombiner {
 
                    continue;
 
                }
 

	
 
                let mut matching_ports = Vec::new();
 
                for (cur_port_id, _) in cur_ports {
 
                    for (other_port_id, _) in &other_component.solutions[0].port_mapping {
 
                        if cur_port_id == other_port_id {
 
                let mut matching_channels = Vec::new();
 
                for (cur_channel_id, _) in cur_ports {
 
                    for (other_channel_id, _) in &other_component.solutions[0].channel_mapping {
 
                        if cur_channel_id == other_channel_id {
 
                            // We have a shared port
 
                            matching_ports.push(*port_id);
 
                            matching_channels.push(*cur_channel_id);
 
                        }
 
                    }
 
                }
 

	
 
                if !matching_ports.is_empty() {
 
                if !matching_channels.is_empty() {
 
                    // We share some ports
 
                    component_peers.push(ComponentPeer{
 
                        target_id: other_component.component,
 
                        target_index: other_index,
 
                        involved_channels: matching_ports,
 
                        involved_channels: matching_channels,
 
                    });
 
                }
 
            }
 
@@ -579,7 +606,7 @@ impl SolutionCombiner {
 
                    num_ports_in_peers += existing_peer.involved_channels.len();
 
                }
 

	
 
                if num_ports_in_peers == other_component.solutions[0].port_mapping.len() {
 
                if num_ports_in_peers == other_component.solutions[0].channel_mapping.len() {
 
                    other_component.all_peers_present = true;
 
                }
 

	
 
@@ -609,8 +636,8 @@ impl SolutionCombiner {
 
                // Check the port mappings between the pair of solutions.
 
                let mut all_matched = true;
 

	
 
                'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.port_mapping {
 
                    for (other_port, other_branch) in &other_solution.port_mapping {
 
                'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.channel_mapping {
 
                    for (other_port, other_branch) in &other_solution.channel_mapping {
 
                        if cur_port == other_port {
 
                            if cur_branch == other_branch {
 
                                // Same port mapping, go to next port
 
@@ -771,29 +798,29 @@ impl SolutionCombiner {
 
        // all (all components have their peers), and the exit condition of the
 
        // while loop: if we're here, then we have a global solution
 
        debug_assert_eq!(check_stack.len(), self.local.len());
 
        let mut global_solution = Vec::with_capacity(check_stack.len());
 
        let mut final_branches = Vec::with_capacity(check_stack.len());
 
        for (component_index, solution_index) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
            let solution = &component.solutions[solution_index];
 
            global_solution.push((component.component, solution.final_branch_id));
 
            final_branches.push((component.component, solution.final_branch_id));
 
        }
 

	
 
        // Just debugging here, TODO: @remove
 
        let mut total_num_ports = 0;
 
        let mut total_num_channels = 0;
 
        for (component_index, _) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
            total_num_ports += component.solutions[0].port_mapping.len();
 
            total_num_channels += component.solutions[0].channel_mapping.len();
 
        }
 

	
 
        total_num_ports /= 2;
 
        let mut final_mapping = Vec::with_capacity(total_num_ports);
 
        total_num_channels /= 2;
 
        let mut final_mapping = Vec::with_capacity(total_num_channels);
 
        let mut total_num_checked = 0;
 

	
 
        for (component_index, solution_index) in check_stack.iter().copied() {
 
            let component = &self.local[component_index];
 
            let solution = &component.solutions[solution_index];
 

	
 
            for (channel_id, branch_id) in solution.port_mapping.iter().copied() {
 
            for (channel_id, branch_id) in solution.channel_mapping.iter().copied() {
 
                match final_mapping.iter().find(|(v, _)| *v == channel_id) {
 
                    Some((_, encountered_branch_id)) => {
 
                        debug_assert_eq!(encountered_branch_id, branch_id);
 
@@ -806,11 +833,11 @@ impl SolutionCombiner {
 
            }
 
        }
 

	
 
        debug_assert_eq!(total_num_checked, total_num_ports);
 
        debug_assert_eq!(total_num_checked, total_num_channels);
 

	
 
        return Some(GlobalSolution{
 
            branches: global_solution,
 
            port_mapping: final_mapping,
 
            component_branches: final_branches,
 
            channel_mapping: final_mapping,
 
        });
 
    }
 

	
 
@@ -842,13 +869,17 @@ impl SolutionCombiner {
 
                solutions.push(LocalSolution{
 
                    component: component.component,
 
                    final_branch_id: solution.final_branch_id,
 
                    port_mapping: solution.port_mapping,
 
                    port_mapping: solution.channel_mapping,
 
                });
 
            }
 
        }
 

	
 
        return solutions;
 
    }
 

	
 
    fn clear(&mut self) {
 
        self.local.clear();
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
src/runtime2/inbox.rs
Show inline comments
 
@@ -208,36 +208,4 @@ pub struct Message {
 
    pub sending_connector: ConnectorId,
 
    pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector)
 
    pub contents: MessageContents,
 
}
 

	
 
/// The public inbox of a connector. The thread running the connector that owns
 
/// this inbox may retrieved from it. Non-owning threads may only put new
 
/// messages inside of it.
 
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
 
//  Should behave as a MPSC queue.
 
pub struct PublicInbox {
 
    messages: Mutex<VecDeque<MessageFancy>>,
 
}
 

	
 
impl PublicInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Mutex::new(VecDeque::new()),
 
        }
 
    }
 

	
 
    pub fn insert_message(&self, message: MessageFancy) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_message(&self) -> Option<MessageFancy> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 

	
 
    pub fn is_empty(&self) -> bool {
 
        let lock = self.messages.lock().unwrap();
 
        return lock.is_empty();
 
    }
 
}
 
\ No newline at end of file
src/runtime2/inbox2.rs
Show inline comments
 
use std::sync::Mutex;
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::branch::BranchId;
 
use crate::runtime2::ConnectorId;
 
@@ -77,4 +80,36 @@ pub(crate) enum MessageFancy {
 
    Data(DataMessageFancy),
 
    Sync(SyncMessageFancy),
 
    Control(ControlMessageFancy),
 
}
 

	
 
/// The public inbox of a connector. The thread running the connector that owns
 
/// this inbox may retrieved from it. Non-owning threads may only put new
 
/// messages inside of it.
 
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
 
//  Should behave as a MPSC queue.
 
pub struct PublicInbox {
 
    messages: Mutex<VecDeque<MessageFancy>>,
 
}
 

	
 
impl PublicInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Mutex::new(VecDeque::new()),
 
        }
 
    }
 

	
 
    pub fn insert_message(&self, message: MessageFancy) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_message(&self) -> Option<MessageFancy> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 

	
 
    pub fn is_empty(&self) -> bool {
 
        let lock = self.messages.lock().unwrap();
 
        return lock.is_empty();
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
@@ -2,12 +2,12 @@
 

	
 
mod runtime;
 
mod messages;
 
mod connector;
 
// mod connector;
 
mod branch;
 
mod native;
 
mod port;
 
mod scheduler;
 
mod inbox;
 
// mod inbox;
 
mod consensus;
 
mod inbox2;
 

	
 
@@ -24,13 +24,11 @@ use std::thread::{self, JoinHandle};
 
use crate::collections::RawVec;
 
use crate::ProtocolDescription;
 

	
 
use inbox::Message;
 
use connector2::{ConnectorPDL, ConnectorPublic, ConnectorScheduling};
 
use scheduler::{Scheduler, ControlMessageHandler};
 
use scheduler::{Scheduler, ComponentCtxFancy, SchedulerCtx, ControlMessageHandler};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use crate::runtime2::inbox2::MessageFancy;
 
use crate::runtime2::port::{ChannelId, Port, PortState};
 
use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx};
 
use inbox2::MessageFancy;
 
use port::{ChannelId, Port, PortState};
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
src/runtime2/native.rs
Show inline comments
 
@@ -6,11 +6,12 @@ use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner};
 
use super::scheduler::{SchedulerCtx, ComponentCtxFancy, ReceivedMessage};
 
use super::scheduler::{SchedulerCtx, ComponentCtxFancy};
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::connector::{Branch, ConnectorScheduling, ConnectorPDL};
 
use super::connector::find_ports_in_value_group;
 
use super::inbox::{Message, MessageContents};
 
use super::branch::{Branch};
 
use super::consensus::find_ports_in_value_group;
 
use super::connector2::{ConnectorScheduling, ConnectorPDL};
 
use super::inbox2::{MessageFancy, ControlContent, ControlMessageFancy};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
@@ -26,7 +27,7 @@ type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL),
 
    NewConnector(ConnectorPDL, Vec<PortIdLocal>),
 
    Shutdown,
 
}
 

	
 
@@ -57,10 +58,9 @@ impl Connector for ConnectorApplication {
 
        // Handle any incoming messages if we're participating in a round
 
        while let Some(message) = comp_ctx.read_next_message() {
 
            match message {
 
                ReceivedMessage::Data(_) => todo!("data message in API connector"),
 
                ReceivedMessage::Sync(_) | ReceivedMessage::RequestCommit(_) | ReceivedMessage::ConfirmCommit(_) => {
 
                    todo!("sync message in API connector");
 
                }
 
                MessageFancy::Data(_) => todo!("data message in API connector"),
 
                MessageFancy::Sync(_)  => todo!("sync message in API connector"),
 
                MessageFancy::Control(_) => todo!("impossible control message"),
 
            }
 
        }
 

	
 
@@ -74,9 +74,9 @@ impl Connector for ConnectorApplication {
 
                        comp_ctx.push_port(endpoint_a);
 
                        comp_ctx.push_port(endpoint_b);
 
                    }
 
                    ApplicationJob::NewConnector(connector) => {
 
                    ApplicationJob::NewConnector(connector, initial_ports) => {
 
                        println!("DEBUG: API creating connector");
 
                        comp_ctx.push_component(connector);
 
                        comp_ctx.push_component(connector, initial_ports);
 
                    },
 
                    ApplicationJob::Shutdown => {
 
                        debug_assert!(queue.is_empty());
 
@@ -139,26 +139,25 @@ impl ApplicationInterface {
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for port_to_remove in &initial_ports {
 
            match self.owned_ports.iter().position(|v| v == port_to_remove) {
 
                Some(index_to_remove) => {
 
                    // We own the port, so continue
 
                    self.owned_ports.remove(index_to_remove);
 
                },
 
                None => {
 
                    // We don't own the port
 
                    return Err(ComponentCreationError::UnownedPort);
 
                }
 
        for initial_port in &initial_ports {
 
            if !self.owned_ports.iter().any(|v| v == initial_port) {
 
                return Err(ComponentCreationError::UnownedPort);
 
            }
 
        }
 

	
 
        // We own all ports, so remove them on this side
 
        for initial_port in &initial_ports {
 
            let position = self.owned_ports.iter().position(|v| *v == initial_port).unwrap();
 
            self.owned_ports.remove(position);
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(Branch::new_initial_branch(state), initial_ports);
 
        let connector = ConnectorPDL::new(state);
 

	
 
        // Put on job queue
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            queue.push_back(ApplicationJob::NewConnector(connector));
 
            queue.push_back(ApplicationJob::NewConnector(connector, initial_ports));
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 
@@ -187,11 +186,11 @@ impl ApplicationInterface {
 

	
 
    fn wake_up_connector_with_ping(&self) {
 
        let connector = self.runtime.get_component_public(self.connector_id);
 
        connector.inbox.insert_message(Message{
 
            sending_connector: ConnectorId::new_invalid(),
 
            receiving_port: PortIdLocal::new_invalid(),
 
            contents: MessageContents::Ping,
 
        });
 
        connector.inbox.insert_message(MessageFancy::Control(ControlMessageFancy{
 
            id: 0,
 
            sending_component_id: self.connector_id,
 
            content: ControlContent::Ack
 
        }));
 

	
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
src/runtime2/scheduler.rs
Show inline comments
 
@@ -154,7 +154,7 @@ impl Scheduler {
 

	
 
                            // And respond with an Ack
 
                            let ack_message = MessageFancy::Control(ControlMessageFancy{
 
                                id: content.id,
 
                                id: message.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
@@ -168,7 +168,7 @@ impl Scheduler {
 

	
 
                            // Send an Ack
 
                            let ack_message = MessageFancy::Control(ControlMessageFancy{
 
                                id: content.id,
 
                                id: message.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
@@ -176,7 +176,7 @@ impl Scheduler {
 
                            self.runtime.send_message(message.sending_component_id, ack_message);
 
                        },
 
                        ControlContent::Ack => {
 
                            scheduled.router.handle_ack(content.id);
 
                            scheduled.router.handle_ack(message.id);
 
                        },
 
                        ControlContent::Ping => {},
 
                    }
 
@@ -506,7 +506,7 @@ impl<'a> Iterator for MessagesIter<'a> {
 
                if message.data_header.target_port == self.match_port_id {
 
                    // Found a match
 
                    self.next_index += 1;
 
                    return Some(data_message);
 
                    return Some(message);
 
                }
 
            } else {
 
                // Unreachable because:
0 comments (0 inline, 0 general)