Changeset - f4d1c8c04de6
[Not reviewed]
0 6 0
MH - 4 years ago 2021-11-07 15:43:41
contact@maxhenger.nl
modified scheduler to use new ExecTree and Consensus
6 files changed with 108 insertions and 147 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector2.rs
Show inline comments
 
@@ -96,7 +96,7 @@ impl Connector for ConnectorPDL {
 
}
 

	
 
impl ConnectorPDL {
 
    pub fn new(initial: ComponentState, owned_ports: Vec<PortIdLocal>) -> Self {
 
    pub fn new(initial: ComponentState) -> Self {
 
        Self{
 
            tree: ExecTree::new(initial),
 
            consensus: Consensus::new(),
 
@@ -129,7 +129,7 @@ impl ConnectorPDL {
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.clone());
 
            self.consensus.notify_of_received_message(branch_id, &message.data_header);
 
            self.consensus.notify_of_received_message(branch_id, &message.data_header, &message.content);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
@@ -303,8 +303,9 @@ impl ConnectorPDL {
 
                        definition_id, monomorph_idx, arguments
 
                    ),
 
                };
 
                let new_component = ConnectorPDL::new(new_state, comp_ctx.workspace_ports.clone());
 
                comp_ctx.push_component(new_component);
 
                let new_component = ConnectorPDL::new(new_state);
 
                comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone());
 
                comp_ctx.workspace_ports.clear();
 

	
 
                return ConnectorScheduling::Later;
 
            },
src/runtime2/consensus.rs
Show inline comments
 
@@ -165,6 +165,7 @@ impl Consensus {
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 
        let data_header = DataHeader{
 
            expected_mapping: branch.port_mapping.clone(),
 
            sending_port: port_info.peer_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch_id
 
        };
src/runtime2/inbox.rs
Show inline comments
 
@@ -17,6 +17,7 @@ use std::sync::Mutex;
 

	
 
use super::ConnectorId;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::inbox2::MessageFancy;
 
use super::connector::BranchId;
 
use super::port::PortIdLocal;
 

	
 
@@ -215,7 +216,7 @@ pub struct Message {
 
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
 
//  Should behave as a MPSC queue.
 
pub struct PublicInbox {
 
    messages: Mutex<VecDeque<Message>>,
 
    messages: Mutex<VecDeque<MessageFancy>>,
 
}
 

	
 
impl PublicInbox {
 
@@ -225,12 +226,12 @@ impl PublicInbox {
 
        }
 
    }
 

	
 
    pub fn insert_message(&self, message: Message) {
 
    pub fn insert_message(&self, message: MessageFancy) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_message(&self) -> Option<Message> {
 
    pub fn take_message(&self) -> Option<MessageFancy> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
src/runtime2/inbox2.rs
Show inline comments
 
@@ -3,7 +3,9 @@ use crate::runtime2::branch::BranchId;
 
use crate::runtime2::ConnectorId;
 
use crate::runtime2::port::PortIdLocal;
 

	
 
#[derive(Copy, Clone)]
 
// TODO: Remove Debug derive from all types
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub(crate) struct PortAnnotation {
 
    pub port_id: PortIdLocal,
 
    pub registered_id: Option<BranchId>,
 
@@ -11,32 +13,38 @@ pub(crate) struct PortAnnotation {
 
}
 

	
 
/// The header added by the synchronization algorithm to all.
 
#[derive(Debug, Clone)]
 
pub(crate) struct SyncHeader {
 
    pub sending_component_id: ConnectorId,
 
    pub highest_component_id: ConnectorId,
 
}
 

	
 
/// The header added to data messages
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataHeader {
 
    pub expected_mapping: Vec<PortAnnotation>,
 
    pub sending_port: PortIdLocal,
 
    pub target_port: PortIdLocal,
 
    pub new_mapping: BranchId,
 
}
 

	
 
/// A data message is a message that is intended for the receiver's PDL code,
 
/// but will also be handled by the consensus algrorithm
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataMessageFancy {
 
    pub sync_header: SyncHeader,
 
    pub data_header: DataHeader,
 
    pub content: ValueGroup,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncContent {
 

	
 
}
 

	
 
/// A sync message is a message that is intended only for the consensus
 
/// algorithm.
 
#[derive(Debug)]
 
pub(crate) struct SyncMessageFancy {
 
    pub sync_header: SyncHeader,
 
    pub content: SyncContent,
 
@@ -44,11 +52,14 @@ pub(crate) struct SyncMessageFancy {
 

	
 
/// A control message is a message intended for the scheduler that is executing
 
/// a component.
 
#[derive(Debug)]
 
pub(crate) struct ControlMessageFancy {
 
    pub id: u32, // generic identifier, used to match request to response
 
    pub sending_component_id: ConnectorId,
 
    pub content: ControlContent,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum ControlContent {
 
    PortPeerChanged(PortIdLocal, ConnectorId),
 
    CloseChannel(PortIdLocal),
 
@@ -57,6 +68,7 @@ pub(crate) enum ControlContent {
 
}
 

	
 
/// Combination of data message and control messages.
 
#[derive(Debug)]
 
pub(crate) enum MessageFancy {
 
    Data(DataMessageFancy),
 
    Sync(SyncMessageFancy),
src/runtime2/mod.rs
Show inline comments
 
@@ -25,9 +25,10 @@ use crate::collections::RawVec;
 
use crate::ProtocolDescription;
 

	
 
use inbox::Message;
 
use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling};
 
use connector2::{ConnectorPDL, ConnectorPublic, ConnectorScheduling};
 
use scheduler::{Scheduler, ControlMessageHandler};
 
use native::{Connector, ConnectorApplication, ApplicationInterface};
 
use crate::runtime2::inbox2::MessageFancy;
 
use crate::runtime2::port::{Port, PortState};
 
use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx};
 

	
 
@@ -244,7 +245,7 @@ impl RuntimeInner {
 

	
 
    /// Sends a message to a particular connector. If the connector happened to
 
    /// be sleeping then it will be scheduled for execution.
 
    pub(crate) fn send_message(&self, target_id: ConnectorId, message: Message) {
 
    pub(crate) fn send_message(&self, target_id: ConnectorId, message: MessageFancy) {
 
        let target = self.get_component_public(target_id);
 
        target.inbox.insert_message(message);
 

	
src/runtime2/scheduler.rs
Show inline comments
 
use std::collections::VecDeque;
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::runtime2::inbox2::{DataMessageFancy, MessageFancy};
 
use crate::runtime2::inbox2::ControlContent;
 

	
 
use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey, ConnectorVariant};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::connector::{BranchId, ConnectorPDL, ConnectorScheduling};
 
use super::inbox::{
 
    Message, MessageContents, ControlMessageVariant,
 
    DataMessage, ControlMessage, SolutionMessage, SyncMessage
 
};
 
use super::branch::{BranchId};
 
use super::connector2::{ConnectorPDL, ConnectorScheduling};
 
use super::inbox2::{MessageFancy, DataMessageFancy, SyncMessageFancy, ControlMessageFancy};
 

	
 
// Because it contains pointers we're going to do a copy by value on this one
 
#[derive(Clone, Copy)]
 
@@ -108,7 +106,7 @@ impl Scheduler {
 
                                connector_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message));
 
                            self.runtime.send_message(port.peer_connector, message);
 
                            self.runtime.send_message(port.peer_connector, MessageFancy::Control(message));
 
                        }
 
                    }
 

	
 
@@ -140,10 +138,10 @@ impl Scheduler {
 
            // Handle special messages here, messages for the component
 
            // will be added to the inbox.
 
            self.debug_conn(connector_id, " ... Handling message myself");
 
            match message.contents {
 
                MessageContents::Control(content) => {
 
                    match content.content {
 
                        ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
            match message {
 
                MessageFancy::Control(message) => {
 
                    match message.content {
 
                        ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                            // Need to change port target
 
                            let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap();
 
                            port.peer_connector = new_target_connector_id;
 
@@ -155,43 +153,34 @@ impl Scheduler {
 
                            debug_assert!(scheduled.ctx_fancy.outbox.is_empty());
 

	
 
                            // And respond with an Ack
 
                            let ack_message = Message{
 
                                sending_connector: connector_id,
 
                                receiving_port: PortIdLocal::new_invalid(),
 
                                contents: MessageContents::Control(ControlMessage{
 
                                    id: content.id,
 
                                    content: ControlMessageVariant::Ack,
 
                                }),
 
                            };
 
                            let ack_message = MessageFancy::Control(ControlMessageFancy{
 
                                id: content.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
                            self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_connector, ack_message);
 
                            self.runtime.send_message(message.sending_component_id, ack_message);
 
                        },
 
                        ControlMessageVariant::CloseChannel(port_id) => {
 
                        ControlContent::CloseChannel(port_id) => {
 
                            // Mark the port as being closed
 
                            let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap();
 
                            port.state = PortState::Closed;
 

	
 
                            // Send an Ack
 
                            let ack_message = Message{
 
                                sending_connector: connector_id,
 
                                receiving_port: PortIdLocal::new_invalid(),
 
                                contents: MessageContents::Control(ControlMessage{
 
                                    id: content.id,
 
                                    content: ControlMessageVariant::Ack,
 
                                }),
 
                            };
 
                            let ack_message = MessageFancy::Control(ControlMessageFancy{
 
                                id: content.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
                            self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_connector, ack_message);
 
                            self.runtime.send_message(message.sending_component_id, ack_message);
 
                        },
 
                        ControlMessageVariant::Ack => {
 
                        ControlContent::Ack => {
 
                            scheduled.router.handle_ack(content.id);
 
                        }
 
                        },
 
                        ControlContent::Ping => {},
 
                    }
 
                },
 
                MessageContents::Ping => {
 
                    // Pings are sent just to wake up a component, so
 
                    // nothing to do here.
 
                },
 
                _ => {
 
                    // All other cases have to be handled by the component
 
                    scheduled.ctx_fancy.inbox_messages.push(message);
 
@@ -203,87 +192,51 @@ impl Scheduler {
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send).
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx_fancy.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(mut message) = scheduled.ctx_fancy.outbox.pop_front() {
 
            // Based on the message contents, decide where the message
 
            // should be sent to. This might end up modifying the message.
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message));
 
            let (peer_connector, self_port, peer_port) = match &mut message {
 
                MessageContents::Data(contents) => {
 
                    let port = scheduled.ctx_fancy.get_port_by_id(contents.sending_port).unwrap();
 
                    (port.peer_connector, contents.sending_port, port.peer_id)
 
                },
 
                MessageContents::Sync(contents) => {
 
                    let connector = contents.to_visit.pop().unwrap();
 
                    (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 
                },
 
                MessageContents::RequestCommit(contents)=> {
 
                    let connector = contents.to_visit.pop().unwrap();
 
                    (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 
                },
 
                MessageContents::ConfirmCommit(contents) => {
 
                    for to_visit in &contents.to_visit {
 
                        let message = Message{
 
                            sending_connector: scheduled.ctx_fancy.id,
 
                            receiving_port: PortIdLocal::new_invalid(),
 
                            contents: MessageContents::ConfirmCommit(contents.clone()),
 
                        };
 
                        self.runtime.send_message(*to_visit, message);
 

	
 
            let target_component_id = match &message {
 
                MessageFancy::Data(content) => {
 
                    // Data messages are always sent to a particular port, and
 
                    // may end up being rerouted.
 
                    let port_desc = scheduled.ctx_fancy.get_port_by_id(content.data_header.sending_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.data_header.target_port);
 

	
 
                    if port_desc.state == PortState::Closed {
 
                        todo!("handle sending over a closed port")
 
                    }
 
                    (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid())
 

	
 
                    port_desc.peer_connector
 
                },
 
                MessageContents::Control(_) | MessageContents::Ping => {
 
                    // Never generated by the user's code
 
                    unreachable!();
 
                MessageFancy::Sync(content) => {
 
                    // Sync messages are always sent to a particular component,
 
                    // the sender must make sure it actually wants to send to
 
                    // the specified component (and is not using an inconsistent
 
                    // component ID associated with a port).
 
                    content.sync_header.highest_component_id
 
                },
 
                MessageFancy::Control(_) => {
 
                    unreachable!("component sending control messages directly");
 
                }
 
            };
 

	
 
            // TODO: Maybe clean this up, perhaps special case for
 
            //  ConfirmCommit can be handled differently.
 
            if peer_connector.is_valid() {
 
                if peer_port.is_valid() {
 
                    // Sending a message to a port, so the port may not be
 
                    // closed.
 
                    let port = scheduled.ctx_fancy.get_port_by_id(self_port).unwrap();
 
                    match port.state {
 
                        PortState::Open => {},
 
                        PortState::Closed => {
 
                            todo!("Handling sending over a closed port");
 
                        }
 
                    }
 
                }
 
                let message = Message {
 
                    sending_connector: scheduled.ctx_fancy.id,
 
                    receiving_port: peer_port,
 
                    contents: message,
 
                };
 
                self.runtime.send_message(peer_connector, message);
 
            }
 
            self.runtime.send_message(target_component_id, message);
 
        }
 

	
 
        while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component) => {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Add the new connector to the global registry
 
                    let new_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                    // Transfer ports
 
                    // TODO: Clean this up the moment native components are somewhat
 
                    //  properly implemented. We need to know about the ports that
 
                    //  are "owned by the PDL code", and then make sure that the
 
                    //  context contains a description of those ports.
 
                    let ports = if let ConnectorVariant::UserDefined(connector) = &new_connector.connector {
 
                        &connector.ports.owned_ports
 
                    } else {
 
                        unreachable!();
 
                    };
 

	
 
                    for port_id in ports {
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        let mut message_idx = 0;
 
                        while message_idx < scheduled.ctx_fancy.inbox_messages.len() {
 
@@ -311,7 +264,7 @@ impl Scheduler {
 
                        );
 

	
 
                        self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message));
 
                        self.runtime.send_message(port.peer_connector, reroute_message);
 
                        self.runtime.send_message(port.peer_connector, MessageFancy::Control(reroute_message));
 
                    }
 

	
 
                    // Schedule new connector to run
 
@@ -386,7 +339,7 @@ impl Scheduler {
 
// -----------------------------------------------------------------------------
 

	
 
enum ComponentStateChange {
 
    CreatedComponent(ConnectorPDL),
 
    CreatedComponent(ConnectorPDL, Vec<PortIdLocal>),
 
    CreatedPort(Port),
 
    ChangedPort(ComponentPortChange),
 
}
 
@@ -412,20 +365,14 @@ pub(crate) struct ComponentCtxFancy {
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: VecDeque<MessageFancy>,
 
    state_changes: VecDeque<ComponentStateChange>
 
    state_changes: VecDeque<ComponentStateChange>,
 
    // Workspaces that may be used by components to (generally) prevent
 
    // allocations. Be a good scout and leave it empty after you've used it.
 
    // TODO: Move to scheduler ctx, this is the wrong place
 
    pub workspace_ports: Vec<PortIdLocal>,
 
    pub workspace_branches: Vec<BranchId>,
 
}
 

	
 
pub(crate) enum ReceivedMessage {
 
    Data((PortIdLocal, DataMessage)),
 
    Sync(SyncMessage),
 
    RequestCommit(SolutionMessage),
 
    ConfirmCommit(SolutionMessage),
 
}
 

	
 
impl ComponentCtxFancy {
 
    pub(crate) fn new_empty() -> Self {
 
        return Self{
 
@@ -437,14 +384,16 @@ impl ComponentCtxFancy {
 
            changed_in_sync: false,
 
            outbox: VecDeque::new(),
 
            state_changes: VecDeque::new(),
 
            workspace_ports: Vec::new(),
 
            workspace_branches: Vec::new(),
 
        };
 
    }
 

	
 
    /// Notify the runtime that the component has created a new component. May
 
    /// only be called outside of a sync block.
 
    pub(crate) fn push_component(&mut self, component: ConnectorPDL) {
 
    pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec<PortIdLocal>) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push_back(ComponentStateChange::CreatedComponent(component));
 
        self.state_changes.push_back(ComponentStateChange::CreatedComponent(component, initial_ports));
 
    }
 

	
 
    /// Notify the runtime that the component has created a new port. May only
 
@@ -520,19 +469,21 @@ impl ComponentCtxFancy {
 
        if !self.is_in_sync { return None; }
 
        if self.inbox_len_read == self.inbox_messages.len() { return None; }
 

	
 
        // We want to keep data messages in the inbox, because we need to check
 
        // them in the future. We don't want to keep sync messages around, we
 
        // should only handle them once. Control messages should never be in
 
        // here.
 
        let message = &self.inbox_messages[self.inbox_len_read];
 
        if let MessageContents::Data(contents) = &message.contents {
 
            self.inbox_len_read += 1;
 
            return Some(ReceivedMessage::Data((message.receiving_port, contents.clone())));
 
        } else {
 
            // Must be a sync/solution message
 
            let message = self.inbox_messages.remove(self.inbox_len_read);
 
            return match message.contents {
 
                MessageContents::Sync(v) => Some(ReceivedMessage::Sync(v)),
 
                MessageContents::RequestCommit(v) => Some(ReceivedMessage::RequestCommit(v)),
 
                MessageContents::ConfirmCommit(v) => Some(ReceivedMessage::ConfirmCommit(v)),
 
                _ => unreachable!(), // because we only put data/synclike messages in the inbox
 
            }
 
        match message {
 
            MessageFancy::Data(content) => {
 
                self.inbox_len_read += 1;
 
                return Some(MessageFancy::Data(content.clone()));
 
            },
 
            MessageFancy::Sync(_) => {
 
                let message = self.inbox_messages.remove(self.inbox_len_read);
 
                return Some(message);
 
            },
 
            MessageFancy::Control(_) => unreachable!("control message ended up in component inbox"),
 
        }
 
    }
 
}
 
@@ -616,7 +567,7 @@ impl ControlMessageHandler {
 
    pub fn prepare_closing_channel(
 
        &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId
 
    ) -> Message {
 
    ) -> ControlMessageFancy {
 
        let id = self.take_id();
 

	
 
        self.active.push(ControlEntry{
 
@@ -627,13 +578,10 @@ impl ControlMessageHandler {
 
            }),
 
        });
 

	
 
        return Message{
 
            sending_connector: self_connector_id,
 
            receiving_port: peer_port_id,
 
            contents: MessageContents::Control(ControlMessage{
 
                id,
 
                content: ControlMessageVariant::CloseChannel(peer_port_id),
 
            }),
 
        return ControlMessageFancy{
 
            id,
 
            sending_component_id: self_connector_id,
 
            content: ControlContent::CloseChannel(peer_port_id),
 
        };
 
    }
 

	
 
@@ -645,7 +593,7 @@ impl ControlMessageHandler {
 
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
 
        new_owner_connector_id: ConnectorId
 
    ) -> Message {
 
    ) -> ControlMessageFancy {
 
        let id = self.take_id();
 

	
 
        self.active.push(ControlEntry{
 
@@ -657,13 +605,10 @@ impl ControlMessageHandler {
 
            }),
 
        });
 

	
 
        return Message{
 
            sending_connector: self_connector_id,
 
            receiving_port: peer_port_id,
 
            contents: MessageContents::Control(ControlMessage{
 
                id,
 
                content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id),
 
            })
 
        return ControlMessageFancy{
 
            id,
 
            sending_component_id: self_connector_id,
 
            content: ControlContent::PortPeerChanged(peer_port_id, new_owner_connector_id),
 
        };
 
    }
 

	
0 comments (0 inline, 0 general)