Files
@ 328f04b6612f
Branch filter:
Location: CSY/reowolf/src/runtime2/inbox2.rs - annotation
328f04b6612f
3.3 KiB
application/rls-services+xml
Initial pass of fixing compiler errors
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 ce98be9707a6 ce98be9707a6 ce98be9707a6 ecc47971d535 ce98be9707a6 ce98be9707a6 f4d1c8c04de6 f4d1c8c04de6 f4d1c8c04de6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 f4d1c8c04de6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 f4d1c8c04de6 ce98be9707a6 ce98be9707a6 f4d1c8c04de6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 d98661a2215e f4d1c8c04de6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 f4d1c8c04de6 ce98be9707a6 649f3bb14317 ecc47971d535 ecc47971d535 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 f4d1c8c04de6 ce98be9707a6 ce98be9707a6 d98661a2215e ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 f4d1c8c04de6 ce98be9707a6 ce98be9707a6 f4d1c8c04de6 ce98be9707a6 ce98be9707a6 ce98be9707a6 f4d1c8c04de6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 f4d1c8c04de6 ce98be9707a6 ce98be9707a6 ce98be9707a6 ce98be9707a6 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 328f04b6612f edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 328f04b6612f edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 edb4c4be7e45 ce98be9707a6 | use std::sync::Mutex;
use std::collections::VecDeque;
use crate::protocol::eval::ValueGroup;
use crate::runtime2::branch::BranchId;
use crate::runtime2::ConnectorId;
use crate::runtime2::consensus::{GlobalSolution, LocalSolution};
use crate::runtime2::port::PortIdLocal;
// TODO: Remove Debug derive from all types
#[derive(Debug, Copy, Clone)]
pub(crate) struct PortAnnotation {
pub port_id: PortIdLocal,
pub registered_id: Option<BranchId>,
pub expected_firing: Option<bool>,
}
/// The header added by the synchronization algorithm to all.
#[derive(Debug, Clone)]
pub(crate) struct SyncHeader {
pub sending_component_id: ConnectorId,
pub highest_component_id: ConnectorId,
}
/// The header added to data messages
#[derive(Debug, Clone)]
pub(crate) struct DataHeader {
pub expected_mapping: Vec<PortAnnotation>,
pub sending_port: PortIdLocal,
pub target_port: PortIdLocal,
pub new_mapping: BranchId,
}
/// A data message is a message that is intended for the receiver's PDL code,
/// but will also be handled by the consensus algorithm
#[derive(Debug, Clone)]
pub(crate) struct DataMessageFancy {
pub sync_header: SyncHeader,
pub data_header: DataHeader,
pub content: ValueGroup,
}
#[derive(Debug)]
pub(crate) enum SyncContent {
LocalSolution(LocalSolution), // sending a local solution to the leader
GlobalSolution(GlobalSolution), // broadcasting to everyone
Notification, // just a notification (so purpose of message is to send the SyncHeader)
}
/// A sync message is a message that is intended only for the consensus
/// algorithm.
#[derive(Debug)]
pub(crate) struct SyncMessageFancy {
pub sync_header: SyncHeader,
pub target_component_id: ConnectorId,
pub content: SyncContent,
}
/// A control message is a message intended for the scheduler that is executing
/// a component.
#[derive(Debug)]
pub(crate) struct ControlMessageFancy {
pub id: u32, // generic identifier, used to match request to response
pub sending_component_id: ConnectorId,
pub content: ControlContent,
}
#[derive(Debug)]
pub(crate) enum ControlContent {
PortPeerChanged(PortIdLocal, ConnectorId),
CloseChannel(PortIdLocal),
Ack,
Ping,
}
/// Combination of data message and control messages.
#[derive(Debug)]
pub(crate) enum MessageFancy {
Data(DataMessageFancy),
Sync(SyncMessageFancy),
Control(ControlMessageFancy),
}
/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
// Should behave as a MPSC queue.
pub struct PublicInbox {
messages: Mutex<VecDeque<MessageFancy>>,
}
impl PublicInbox {
pub fn new() -> Self {
Self{
messages: Mutex::new(VecDeque::new()),
}
}
pub(crate) fn insert_message(&self, message: MessageFancy) {
let mut lock = self.messages.lock().unwrap();
lock.push_back(message);
}
pub(crate) fn take_message(&self) -> Option<MessageFancy> {
let mut lock = self.messages.lock().unwrap();
return lock.pop_front();
}
pub fn is_empty(&self) -> bool {
let lock = self.messages.lock().unwrap();
return lock.is_empty();
}
}
|