From b4a8c628800d12c57620644fb88151760a46e59a 2021-11-19 19:59:49 From: MH Date: 2021-11-19 19:59:49 Subject: [PATCH] Refactored data messages --- diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 0407db8aa68a774673bd42935987886773a78196..38250412efb892714887391c708e79c51977df4c 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -33,13 +33,10 @@ use crate::{PortId, ProtocolDescription}; use crate::common::ComponentState; use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; -use crate::runtime2::branch::PreparedStatement; -use crate::runtime2::consensus::RoundConclusion; -use crate::runtime2::inbox::SyncPortMessage; -use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; -use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; -use super::inbox::{DataMessage, DataContent, Message, SyncCompMessage, PublicInbox}; +use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement}; +use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_value_group}; +use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, PublicInbox}; use super::native::Connector; use super::port::{PortKind, PortIdLocal}; use super::scheduler::{ComponentCtx, SchedulerCtx}; @@ -228,7 +225,7 @@ impl ConnectorPDL { debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port); receiving_branch.awaiting_port = PortIdLocal::new_invalid(); - receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); + receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.clone()); self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx); // And prepare the branch for running @@ -326,7 +323,7 @@ impl ConnectorPDL { let receiving_branch_id = self.tree.fork_branch(branch_id); let branch = &mut self.tree[receiving_branch_id]; branch.awaiting_port = PortIdLocal::new_invalid(); - branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); + branch.prepared = PreparedStatement::PerformedGet(message.content.clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx); @@ -370,7 +367,7 @@ impl ConnectorPDL { let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); if let Err(_) = comp_ctx.submit_message(Message::Data(DataMessage { sync_header, data_header, - content: DataContent::Message(content), + content, })) { // We don't own the port let pd = &sched_ctx.runtime.protocol_description; @@ -500,7 +497,6 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; } else { // No final branch, because we're supposed to exit! - panic!("TEMPTEMP: NOOOOOOOOO 1"); self.last_finished_handled = None; self.mode = Mode::Error; return ConnectorScheduling::Exit; diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 828dbe3f6c374a16fc5bbb171365482950c1b05b..bd8510e2c1bd789afe73cebd37e385d9acf2b32a 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,18 +1,15 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; -use crate::runtime2::inbox::BranchMarker; -use crate::runtime2::scheduler::ComponentPortChange; use super::ConnectorId; use super::branch::BranchId; use super::port::{ChannelId, PortIdLocal}; use super::inbox::{ - Message, ChannelAnnotation, - DataMessage, DataContent, DataHeader, + Message, ChannelAnnotation, BranchMarker, DataMessage, DataHeader, SyncCompMessage, SyncCompContent, SyncPortMessage, SyncPortContent, SyncHeader, }; -use super::scheduler::ComponentCtx; +use super::scheduler::{ComponentCtx, ComponentPortChange}; struct BranchAnnotation { channel_mapping: Vec, @@ -265,19 +262,15 @@ impl Consensus { let channel_id = port_desc.channel_id; if !self.encountered_ports.contains(&self_port_id) { - ctx.submit_message(Message::Data(DataMessage { + ctx.submit_message(Message::SyncPort(SyncPortMessage { sync_header: SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, sync_round: self.sync_round }, - data_header: DataHeader{ - expected_mapping: source_mapping.clone(), - sending_port: self_port_id, - target_port: peer_port_id, - new_mapping: BranchMarker::new_invalid(), - }, - content: DataContent::SilentPortNotification, + source_port: self_port_id, + target_port: peer_port_id, + content: SyncPortContent::SilentPortNotification, })); self.encountered_ports.push(self_port_id); } @@ -439,6 +432,11 @@ impl Consensus { debug_assert!(self.is_in_sync()); debug_assert!(ctx.get_port_by_id(message.target_port).is_some()); match message.content { + SyncPortContent::SilentPortNotification => { + // The point here is to let us become part of the sync round and + // take note of the leader in case all of our ports are silent. + self.encountered_ports.push(message.target_port); + } SyncPortContent::NotificationWave => { // Wave to discover everyone in the network, handling sync // header takes care of leader discovery, here we need to make @@ -483,7 +481,7 @@ impl Consensus { // Check for sent ports debug_assert!(self.workspace_ports.is_empty()); - find_ports_in_value_group(message.content.as_message().unwrap(), &mut self.workspace_ports); + find_ports_in_value_group(&message.content, &mut self.workspace_ports); if !self.workspace_ports.is_empty() { todo!("handle received ports"); self.workspace_ports.clear(); @@ -507,11 +505,6 @@ impl Consensus { } } - if let DataContent::SilentPortNotification = message.content { - // No port can receive a "silent" notification. - return false; - } - let annotation = &self.branch_annotations[branch_id.index as usize]; for expected in &message.data_header.expected_mapping { // If we own the port, then we have an entry in the diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index c489561f96788af0e87b555f3c16628822032599..f5bc869b78ce3cc081b673b240a2d3fb2847d24f 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -58,34 +58,13 @@ pub(crate) struct DataHeader { pub new_mapping: BranchMarker, } -// TODO: Very much on the fence about this. On one hand I thought making it a -// data message was neat because "silent port notification" should be rerouted -// like any other data message to determine the component ID of the receiver -// and to make it part of the leader election algorithm for the sync leader. -// However: it complicates logic quite a bit. Really it might be easier to -// create `Message::SyncAtComponent` and `Message::SyncAtPort` messages... -#[derive(Debug, Clone)] -pub(crate) enum DataContent { - SilentPortNotification, - Message(ValueGroup), -} - -impl DataContent { - pub(crate) fn as_message(&self) -> Option<&ValueGroup> { - match self { - DataContent::SilentPortNotification => None, - DataContent::Message(message) => Some(message), - } - } -} - /// A data message is a message that is intended for the receiver's PDL code, /// but will also be handled by the consensus algorithm #[derive(Debug, Clone)] pub(crate) struct DataMessage { pub sync_header: SyncHeader, pub data_header: DataHeader, - pub content: DataContent, + pub content: ValueGroup, } #[derive(Debug)] @@ -111,6 +90,7 @@ pub(crate) struct SyncCompMessage { #[derive(Debug)] pub(crate) enum SyncPortContent { + SilentPortNotification, NotificationWave, } @@ -153,10 +133,11 @@ impl Message { /// returns the port through which the message was sent. pub(crate) fn source_port(&self) -> Option { // Currently only data messages have a source port - if let Message::Data(message) = self { - return Some(message.data_header.sending_port); - } else { - return None; + match self { + Message::Data(message) => return Some(message.data_header.sending_port), + Message::SyncPort(message) => return Some(message.source_port), + Message::SyncComp(_) => return None, + Message::Control(_) => return None, } } } diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 92670a262990d897136c2a1317979d3d1848f258..0b8891c0696a36688f310327125984d11753f596 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -14,7 +14,7 @@ use super::port::{Port, PortIdLocal, Channel, PortKind}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; use super::connector::{ConnectorScheduling, ConnectorPDL}; use super::inbox::{ - Message, DataContent, DataMessage, + Message, DataMessage, SyncCompMessage, SyncPortMessage, ControlContent, ControlMessage }; @@ -144,7 +144,7 @@ impl ConnectorApplication { self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; - receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); + receiving_branch.insert_message(message.data_header.target_port, message.content.clone()); self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx); // And prepare the branch for running @@ -199,7 +199,7 @@ impl ConnectorApplication { let message = Message::Data(DataMessage { sync_header, data_header, - content: DataContent::Message(content.clone()), + content: content.clone(), }); comp_ctx.submit_message(message); self.tree.push_into_queue(QueueKind::Runnable, branch_id); @@ -222,7 +222,7 @@ impl ConnectorApplication { debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); self.branch_extra.push(instruction_idx + 1); - branch.insert_message(port_id, message.content.as_message().unwrap().clone()); + branch.insert_message(port_id, message.content.clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx); diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 9cb007655638c88dc662e2d132a2a396909659d7..cf8769e28a413656d2b1da5dbee0b9424f700e96 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -72,7 +72,7 @@ impl Scheduler { let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx); self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule)); - + // Handle all of the output from the current run: messages to // send and connectors to instantiate. self.handle_changes_in_context(scheduled);