Files
@ a99ae23c30ec
Branch filter:
Location: CSY/reowolf/src/runtime2/inbox.rs - annotation
a99ae23c30ec
5.1 KiB
application/rls-services+xml
WIP on consensus error handling
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | a43d61913724 68411f4b8014 cf26538b25dc 8c5d438b0fa3 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 a43d61913724 8c5d438b0fa3 68411f4b8014 cf26538b25dc 68411f4b8014 68411f4b8014 68411f4b8014 088be7630245 68411f4b8014 58dfabd1be9f 58dfabd1be9f 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 088be7630245 68411f4b8014 1755ca411ca7 68411f4b8014 68411f4b8014 68411f4b8014 7662b8fb871d 58dfabd1be9f 58dfabd1be9f 68411f4b8014 1755ca411ca7 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 088be7630245 58dfabd1be9f 58dfabd1be9f 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 1755ca411ca7 68411f4b8014 68411f4b8014 68411f4b8014 c97c5d60bc61 c97c5d60bc61 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 c97c5d60bc61 c97c5d60bc61 68411f4b8014 c97c5d60bc61 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 c97c5d60bc61 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 58dfabd1be9f 58dfabd1be9f 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 8a530d2dc72f 8a530d2dc72f 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 daf15df0f8ca 68411f4b8014 68411f4b8014 daf15df0f8ca daf15df0f8ca 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 daf15df0f8ca daf15df0f8ca 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 b4ac681e0e7f b4ac681e0e7f a99ae23c30ec a99ae23c30ec a99ae23c30ec a99ae23c30ec a99ae23c30ec a99ae23c30ec a99ae23c30ec a99ae23c30ec a99ae23c30ec a99ae23c30ec a99ae23c30ec a99ae23c30ec a99ae23c30ec 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 68411f4b8014 cf26538b25dc | use std::sync::Mutex;
use std::collections::VecDeque;
use crate::protocol::eval::ValueGroup;
use super::ConnectorId;
use super::branch::BranchId;
use super::consensus::{GlobalSolution, LocalSolution};
use super::port::PortIdLocal;
// TODO: Remove Debug derive from all types
#[derive(Debug, Copy, Clone)]
pub(crate) struct PortAnnotation {
pub port_id: PortIdLocal,
pub registered_id: Option<BranchMarker>,
pub expected_firing: Option<bool>,
}
/// Marker for a branch in a port mapping. A marker is, like a branch ID, a
/// unique identifier for a branch, but differs in that a branch only has one
/// branch ID, but might have multiple associated markers (i.e. one branch
/// performing a `put` three times will generate three markers.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct BranchMarker{
marker: u32,
}
impl BranchMarker {
#[inline]
pub(crate) fn new(marker: u32) -> Self {
debug_assert!(marker != 0);
return Self{ marker };
}
#[inline]
pub(crate) fn new_invalid() -> Self {
return Self{ marker: 0 }
}
}
/// The header added by the synchronization algorithm to all.
#[derive(Debug, Clone)]
pub(crate) struct SyncHeader {
pub sending_component_id: ConnectorId,
pub highest_component_id: ConnectorId,
pub sync_round: u32,
}
/// The header added to data messages
#[derive(Debug, Clone)]
pub(crate) struct DataHeader {
pub expected_mapping: Vec<PortAnnotation>,
pub sending_port: PortIdLocal,
pub target_port: PortIdLocal,
pub new_mapping: BranchMarker,
}
// TODO: Very much on the fence about this. On one hand I thought making it a
// data message was neat because "silent port notification" should be rerouted
// like any other data message to determine the component ID of the receiver
// and to make it part of the leader election algorithm for the sync leader.
// However: it complicates logic quite a bit. Really it might be easier to
// create `Message::SyncAtComponent` and `Message::SyncAtPort` messages...
#[derive(Debug, Clone)]
pub(crate) enum DataContent {
SilentPortNotification,
Message(ValueGroup),
}
impl DataContent {
pub(crate) fn as_message(&self) -> Option<&ValueGroup> {
match self {
DataContent::SilentPortNotification => None,
DataContent::Message(message) => Some(message),
}
}
}
/// A data message is a message that is intended for the receiver's PDL code,
/// but will also be handled by the consensus algorithm
#[derive(Debug, Clone)]
pub(crate) struct DataMessage {
pub sync_header: SyncHeader,
pub data_header: DataHeader,
pub content: DataContent,
}
#[derive(Debug)]
pub(crate) enum SyncContent {
LocalSolution(LocalSolution), // sending a local solution to the leader
GlobalSolution(GlobalSolution), // broadcasting to everyone
Notification, // just a notification (so purpose of message is to send the SyncHeader)
}
/// A sync message is a message that is intended only for the consensus
/// algorithm.
#[derive(Debug)]
pub(crate) struct SyncMessage {
pub sync_header: SyncHeader,
pub target_component_id: ConnectorId,
pub content: SyncContent,
}
/// A control message is a message intended for the scheduler that is executing
/// a component.
#[derive(Debug)]
pub(crate) struct ControlMessage {
pub id: u32, // generic identifier, used to match request to response
pub sending_component_id: ConnectorId,
pub content: ControlContent,
}
#[derive(Debug)]
pub(crate) enum ControlContent {
PortPeerChanged(PortIdLocal, ConnectorId),
CloseChannel(PortIdLocal),
Ack,
Ping,
}
/// Combination of data message and control messages.
#[derive(Debug)]
pub(crate) enum Message {
Data(DataMessage),
Sync(SyncMessage),
Control(ControlMessage),
}
impl Message {
/// If the message is sent through a particular channel, then this function
/// returns the port through which the message was sent.
pub(crate) fn source_port(&self) -> Option<PortIdLocal> {
// Currently only data messages have a source port
if let Message::Data(message) = self {
return Some(message.data_header.sending_port);
} else {
return None;
}
}
}
/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
// Should behave as a MPSC queue.
pub struct PublicInbox {
messages: Mutex<VecDeque<Message>>,
}
impl PublicInbox {
pub fn new() -> Self {
Self{
messages: Mutex::new(VecDeque::new()),
}
}
pub(crate) fn insert_message(&self, message: Message) {
let mut lock = self.messages.lock().unwrap();
lock.push_back(message);
}
pub(crate) fn take_message(&self) -> Option<Message> {
let mut lock = self.messages.lock().unwrap();
return lock.pop_front();
}
pub fn is_empty(&self) -> bool {
let lock = self.messages.lock().unwrap();
return lock.is_empty();
}
}
|