Files
@ edb4c4be7e45
Branch filter:
Location: CSY/reowolf/src/runtime2/inbox2.rs
edb4c4be7e45
3.3 KiB
application/rls-services+xml
fixing compilation errors
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | use std::sync::Mutex;
use std::collections::VecDeque;
use crate::protocol::eval::ValueGroup;
use crate::runtime2::branch::BranchId;
use crate::runtime2::ConnectorId;
use crate::runtime2::consensus::{GlobalSolution, LocalSolution};
use crate::runtime2::port::PortIdLocal;
// TODO: Remove Debug derive from all types
#[derive(Debug, Copy, Clone)]
pub(crate) struct PortAnnotation {
pub port_id: PortIdLocal,
pub registered_id: Option<BranchId>,
pub expected_firing: Option<bool>,
}
/// The header added by the synchronization algorithm to all.
#[derive(Debug, Clone)]
pub(crate) struct SyncHeader {
pub sending_component_id: ConnectorId,
pub highest_component_id: ConnectorId,
}
/// The header added to data messages
#[derive(Debug, Clone)]
pub(crate) struct DataHeader {
pub expected_mapping: Vec<PortAnnotation>,
pub sending_port: PortIdLocal,
pub target_port: PortIdLocal,
pub new_mapping: BranchId,
}
/// A data message is a message that is intended for the receiver's PDL code,
/// but will also be handled by the consensus algorithm
#[derive(Debug, Clone)]
pub(crate) struct DataMessageFancy {
pub sync_header: SyncHeader,
pub data_header: DataHeader,
pub content: ValueGroup,
}
#[derive(Debug)]
pub(crate) enum SyncContent {
LocalSolution(LocalSolution), // sending a local solution to the leader
GlobalSolution(GlobalSolution), // broadcasting to everyone
Notification, // just a notification (so purpose of message is to send the SyncHeader)
}
/// A sync message is a message that is intended only for the consensus
/// algorithm.
#[derive(Debug)]
pub(crate) struct SyncMessageFancy {
pub sync_header: SyncHeader,
pub target_component_id: ConnectorId,
pub content: SyncContent,
}
/// A control message is a message intended for the scheduler that is executing
/// a component.
#[derive(Debug)]
pub(crate) struct ControlMessageFancy {
pub id: u32, // generic identifier, used to match request to response
pub sending_component_id: ConnectorId,
pub content: ControlContent,
}
#[derive(Debug)]
pub(crate) enum ControlContent {
PortPeerChanged(PortIdLocal, ConnectorId),
CloseChannel(PortIdLocal),
Ack,
Ping,
}
/// Combination of data message and control messages.
#[derive(Debug)]
pub(crate) enum MessageFancy {
Data(DataMessageFancy),
Sync(SyncMessageFancy),
Control(ControlMessageFancy),
}
/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
// Should behave as a MPSC queue.
pub struct PublicInbox {
messages: Mutex<VecDeque<MessageFancy>>,
}
impl PublicInbox {
pub fn new() -> Self {
Self{
messages: Mutex::new(VecDeque::new()),
}
}
pub fn insert_message(&self, message: MessageFancy) {
let mut lock = self.messages.lock().unwrap();
lock.push_back(message);
}
pub fn take_message(&self) -> Option<MessageFancy> {
let mut lock = self.messages.lock().unwrap();
return lock.pop_front();
}
pub fn is_empty(&self) -> bool {
let lock = self.messages.lock().unwrap();
return lock.is_empty();
}
}
|