Changeset - b4a8c628800d
[Not reviewed]
0 5 0
MH - 4 years ago 2021-11-19 19:59:49
contact@maxhenger.nl
Refactored data messages
5 files changed with 30 insertions and 60 deletions:
0 comments (0 inline, 0 general)
src/runtime2/connector.rs
Show inline comments
 
@@ -33,13 +33,10 @@ use crate::{PortId, ProtocolDescription};
 
use crate::common::ComponentState;
 
use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 
use crate::runtime2::branch::PreparedStatement;
 
use crate::runtime2::consensus::RoundConclusion;
 
use crate::runtime2::inbox::SyncPortMessage;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::inbox::{DataMessage, DataContent, Message, SyncCompMessage, PublicInbox};
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement};
 
use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_value_group};
 
use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, PublicInbox};
 
use super::native::Connector;
 
use super::port::{PortKind, PortIdLocal};
 
use super::scheduler::{ComponentCtx, SchedulerCtx};
 
@@ -228,7 +225,7 @@ impl ConnectorPDL {
 

	
 
            debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port);
 
            receiving_branch.awaiting_port = PortIdLocal::new_invalid();
 
            receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 
            receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx);
 

	
 
            // And prepare the branch for running
 
@@ -326,7 +323,7 @@ impl ConnectorPDL {
 
                        let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                        let branch = &mut self.tree[receiving_branch_id];
 
                        branch.awaiting_port = PortIdLocal::new_invalid();
 
                        branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone());
 
                        branch.prepared = PreparedStatement::PerformedGet(message.content.clone());
 

	
 
                        self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                        self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx);
 
@@ -370,7 +367,7 @@ impl ConnectorPDL {
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                if let Err(_) = comp_ctx.submit_message(Message::Data(DataMessage {
 
                    sync_header, data_header,
 
                    content: DataContent::Message(content),
 
                    content,
 
                })) {
 
                    // We don't own the port
 
                    let pd = &sched_ctx.runtime.protocol_description;
 
@@ -500,7 +497,6 @@ impl ConnectorPDL {
 
            return ConnectorScheduling::Immediate;
 
        } else {
 
            // No final branch, because we're supposed to exit!
 
            panic!("TEMPTEMP: NOOOOOOOOO 1");
 
            self.last_finished_handled = None;
 
            self.mode = Mode::Error;
 
            return ConnectorScheduling::Exit;
src/runtime2/consensus.rs
Show inline comments
 
use crate::collections::VecSet;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::inbox::BranchMarker;
 
use crate::runtime2::scheduler::ComponentPortChange;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
use super::port::{ChannelId, PortIdLocal};
 
use super::inbox::{
 
    Message, ChannelAnnotation,
 
    DataMessage, DataContent, DataHeader,
 
    Message, ChannelAnnotation, BranchMarker, DataMessage, DataHeader,
 
    SyncCompMessage, SyncCompContent, SyncPortMessage, SyncPortContent, SyncHeader,
 
};
 
use super::scheduler::ComponentCtx;
 
use super::scheduler::{ComponentCtx, ComponentPortChange};
 

	
 
struct BranchAnnotation {
 
    channel_mapping: Vec<ChannelAnnotation>,
 
@@ -265,19 +262,15 @@ impl Consensus {
 
            let channel_id = port_desc.channel_id;
 

	
 
            if !self.encountered_ports.contains(&self_port_id) {
 
                ctx.submit_message(Message::Data(DataMessage {
 
                ctx.submit_message(Message::SyncPort(SyncPortMessage {
 
                    sync_header: SyncHeader{
 
                        sending_component_id: ctx.id,
 
                        highest_component_id: self.highest_connector_id,
 
                        sync_round: self.sync_round
 
                    },
 
                    data_header: DataHeader{
 
                        expected_mapping: source_mapping.clone(),
 
                        sending_port: self_port_id,
 
                        target_port: peer_port_id,
 
                        new_mapping: BranchMarker::new_invalid(),
 
                    },
 
                    content: DataContent::SilentPortNotification,
 
                    source_port: self_port_id,
 
                    target_port: peer_port_id,
 
                    content: SyncPortContent::SilentPortNotification,
 
                }));
 
                self.encountered_ports.push(self_port_id);
 
            }
 
@@ -439,6 +432,11 @@ impl Consensus {
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(ctx.get_port_by_id(message.target_port).is_some());
 
        match message.content {
 
            SyncPortContent::SilentPortNotification => {
 
                // The point here is to let us become part of the sync round and
 
                // take note of the leader in case all of our ports are silent.
 
                self.encountered_ports.push(message.target_port);
 
            }
 
            SyncPortContent::NotificationWave => {
 
                // Wave to discover everyone in the network, handling sync
 
                // header takes care of leader discovery, here we need to make
 
@@ -483,7 +481,7 @@ impl Consensus {
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(message.content.as_message().unwrap(), &mut self.workspace_ports);
 
                find_ports_in_value_group(&message.content, &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
@@ -507,11 +505,6 @@ impl Consensus {
 
            }
 
        }
 

	
 
        if let DataContent::SilentPortNotification = message.content {
 
            // No port can receive a "silent" notification.
 
            return false;
 
        }
 

	
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &message.data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
src/runtime2/inbox.rs
Show inline comments
 
@@ -58,34 +58,13 @@ pub(crate) struct DataHeader {
 
    pub new_mapping: BranchMarker,
 
}
 

	
 
// TODO: Very much on the fence about this. On one hand I thought making it a
 
//  data message was neat because "silent port notification" should be rerouted
 
//  like any other data message to determine the component ID of the receiver
 
//  and to make it part of the leader election algorithm for the sync leader.
 
//  However: it complicates logic quite a bit. Really it might be easier to
 
//  create `Message::SyncAtComponent` and `Message::SyncAtPort` messages...
 
#[derive(Debug, Clone)]
 
pub(crate) enum DataContent {
 
    SilentPortNotification,
 
    Message(ValueGroup),
 
}
 

	
 
impl DataContent {
 
    pub(crate) fn as_message(&self) -> Option<&ValueGroup> {
 
        match self {
 
            DataContent::SilentPortNotification => None,
 
            DataContent::Message(message) => Some(message),
 
        }
 
    }
 
}
 

	
 
/// A data message is a message that is intended for the receiver's PDL code,
 
/// but will also be handled by the consensus algorithm
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataMessage {
 
    pub sync_header: SyncHeader,
 
    pub data_header: DataHeader,
 
    pub content: DataContent,
 
    pub content: ValueGroup,
 
}
 

	
 
#[derive(Debug)]
 
@@ -111,6 +90,7 @@ pub(crate) struct SyncCompMessage {
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncPortContent {
 
    SilentPortNotification,
 
    NotificationWave,
 
}
 

	
 
@@ -153,10 +133,11 @@ impl Message {
 
    /// returns the port through which the message was sent.
 
    pub(crate) fn source_port(&self) -> Option<PortIdLocal> {
 
        // Currently only data messages have a source port
 
        if let Message::Data(message) = self {
 
            return Some(message.data_header.sending_port);
 
        } else {
 
            return None;
 
        match self {
 
            Message::Data(message) => return Some(message.data_header.sending_port),
 
            Message::SyncPort(message) => return Some(message.source_port),
 
            Message::SyncComp(_) => return None,
 
            Message::Control(_) => return None,
 
        }
 
    }
 
}
src/runtime2/native.rs
Show inline comments
 
@@ -14,7 +14,7 @@ use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::connector::{ConnectorScheduling, ConnectorPDL};
 
use super::inbox::{
 
    Message, DataContent, DataMessage,
 
    Message, DataMessage,
 
    SyncCompMessage, SyncPortMessage,
 
    ControlContent, ControlMessage
 
};
 
@@ -144,7 +144,7 @@ impl ConnectorApplication {
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx);
 

	
 
            // And prepare the branch for running
 
@@ -199,7 +199,7 @@ impl ConnectorApplication {
 
                    let message = Message::Data(DataMessage {
 
                        sync_header,
 
                        data_header,
 
                        content: DataContent::Message(content.clone()),
 
                        content: content.clone(),
 
                    });
 
                    comp_ctx.submit_message(message);
 
                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
@@ -222,7 +222,7 @@ impl ConnectorApplication {
 
                            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
                            self.branch_extra.push(instruction_idx + 1);
 

	
 
                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 
                            branch.insert_message(port_id, message.content.clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx);
src/runtime2/scheduler.rs
Show inline comments
 
@@ -72,7 +72,7 @@ impl Scheduler {
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx);
 
                    self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule));
 

	
 
                    
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_changes_in_context(scheduled);
0 comments (0 inline, 0 general)