Files @ 15b9bb47abdc
Branch filter:

Location: CSY/reowolf/src/common.rs - annotation

15b9bb47abdc 6.9 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
mh
WIP on rewriting execution ctx to fix recv bug
06f259bf8031
4c9116f4253b
db17da820a3b
db17da820a3b
06f259bf8031
4976f1816f47
db17da820a3b
db17da820a3b
06f259bf8031
06f259bf8031
db17da820a3b
db17da820a3b
06f259bf8031
1bacc6467d19
06f259bf8031
db17da820a3b
db17da820a3b
44a98be4e4b4
06f259bf8031
06f259bf8031
06f259bf8031
06f259bf8031
db17da820a3b
06f259bf8031
a3c92705eeee
a3c92705eeee
a3c92705eeee
d76b1fe2648f
d76b1fe2648f
65390fb1cdbc
d76b1fe2648f
d76b1fe2648f
a3c92705eeee
03af3095927d
d76b1fe2648f
d76b1fe2648f
03af3095927d
d600dd299dde
8642f7a7bf01
65390fb1cdbc
a3c92705eeee
8642f7a7bf01
869d51fc1127
8642f7a7bf01
8642f7a7bf01
5c858bc28930
d76b1fe2648f
d76b1fe2648f
03af3095927d
d76b1fe2648f
d76b1fe2648f
d76b1fe2648f
03af3095927d
d600dd299dde
cc8030b35903
d76b1fe2648f
98aadfccbafd
98aadfccbafd
98aadfccbafd
98aadfccbafd
98aadfccbafd
98aadfccbafd
98aadfccbafd
98aadfccbafd
98aadfccbafd
98aadfccbafd
98aadfccbafd
d76b1fe2648f
9485a0862e90
3845071002b0
03af3095927d
d76b1fe2648f
d76b1fe2648f
d600dd299dde
03af3095927d
06f259bf8031
06f259bf8031
06f259bf8031
06f259bf8031
03af3095927d
d76b1fe2648f
d76b1fe2648f
d76b1fe2648f
d600dd299dde
41bf21c39c5b
41bf21c39c5b
41bf21c39c5b
41bf21c39c5b
d76b1fe2648f
06f259bf8031
db17da820a3b
06f259bf8031
06f259bf8031
06f259bf8031
06f259bf8031
06f259bf8031
db17da820a3b
06f259bf8031
06f259bf8031
44a98be4e4b4
44a98be4e4b4
44a98be4e4b4
06f259bf8031
9f8f7a65f90d
9f8f7a65f90d
06f259bf8031
a3c92705eeee
a3c92705eeee
a3c92705eeee
a3c92705eeee
a3c92705eeee
a3c92705eeee
a3c92705eeee
a3c92705eeee
a3c92705eeee
a3c92705eeee
330b9c117fa5
a3c92705eeee
a3c92705eeee
a3c92705eeee
a3c92705eeee
8642f7a7bf01
db17da820a3b
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
700221108e9f
700221108e9f
700221108e9f
700221108e9f
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
330b9c117fa5
330b9c117fa5
8642f7a7bf01
8642f7a7bf01
8642f7a7bf01
1e7064d79bdb
1e7064d79bdb
1e7064d79bdb
1e7064d79bdb
1e7064d79bdb
02eb59c6fd66
d76b1fe2648f
02eb59c6fd66
02eb59c6fd66
02eb59c6fd66
02eb59c6fd66
02eb59c6fd66
f52ad0660a73
02eb59c6fd66
d76b1fe2648f
02eb59c6fd66
02eb59c6fd66
02eb59c6fd66
d76b1fe2648f
02eb59c6fd66
02eb59c6fd66
02eb59c6fd66
d76b1fe2648f
d76b1fe2648f
d76b1fe2648f
d76b1fe2648f
d76b1fe2648f
f52ad0660a73
d76b1fe2648f
d76b1fe2648f
d76b1fe2648f
db17da820a3b
f52ad0660a73
d76b1fe2648f
f52ad0660a73
02eb59c6fd66
02eb59c6fd66
b8c262d37323
175721d796d7
b8c262d37323
b8c262d37323
b8c262d37323
b8c262d37323
b8c262d37323
b8c262d37323
b8c262d37323
b8c262d37323
175721d796d7
b8c262d37323
b8c262d37323
b8c262d37323
b8c262d37323
b8c262d37323
b8c262d37323
b8c262d37323
02eb59c6fd66
02eb59c6fd66
02eb59c6fd66
02eb59c6fd66
02eb59c6fd66
44a98be4e4b4
0a71d0af9edf
a3c92705eeee
a3c92705eeee
e7b7d53e6952
e7b7d53e6952
330b9c117fa5
8642f7a7bf01
a3c92705eeee
a3c92705eeee
06f259bf8031
06f259bf8031
bd16d99233dd
bd16d99233dd
d67249fd4593
bd16d99233dd
bd16d99233dd
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
cecf94fdb875
d67249fd4593
d67249fd4593
d67249fd4593
d67249fd4593
d67249fd4593
d67249fd4593
d67249fd4593
d67249fd4593
66513c4f112d
66513c4f112d
66513c4f112d
66513c4f112d
66513c4f112d
66513c4f112d
///////////////////// PRELUDE /////////////////////
pub(crate) use crate::protocol::{ComponentState, ProtocolDescription};
pub(crate) use crate::runtime::{error::AddComponentError, NonsyncProtoContext, SyncProtoContext};
pub(crate) use core::{
    cmp::Ordering,
    fmt::{Debug, Formatter},
    hash::Hash,
    ops::Range,
    time::Duration,
};
pub(crate) use maplit::hashmap;
pub(crate) use mio::{
    net::{TcpListener, TcpStream},
    Events, Interest, Poll, Token,
};
pub(crate) use std::{
    collections::{BTreeMap, HashMap, HashSet},
    io::{Read, Write},
    net::SocketAddr,
    sync::Arc,
    time::Instant,
};
pub(crate) use Polarity::*;

pub(crate) trait IdParts {
    fn id_parts(self) -> (ConnectorId, U32Suffix);
}

/// Used by various distributed algorithms to identify connectors.
pub type ConnectorId = u32;

/// Used in conjunction with the `ConnectorId` type to create identifiers for ports and components
pub type U32Suffix = u32;
#[derive(Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd)]

/// Generalization of a port/component identifier
#[derive(serde::Serialize, serde::Deserialize)]
#[repr(C)]
pub struct Id {
    pub(crate) connector_id: ConnectorId,
    pub(crate) u32_suffix: U32Suffix,
}
#[derive(Clone, Debug, Default)]
pub struct U32Stream {
    next: u32,
}

/// Identifier of a component in a session
#[derive(Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize)]
pub struct ComponentId(Id); // PUB because it can be returned by errors

/// Identifier of a port in a session
#[derive(Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize)]
#[repr(transparent)]
pub struct PortId(pub(crate) Id);

impl PortId {
    // TODO: Remove concept of ComponentId and PortId in this file
    #[deprecated]
    pub fn new(port: u32) -> Self {
        return PortId(Id{
            connector_id: u32::MAX,
            u32_suffix: port,
        });
    }
}

/// A safely aliasable heap-allocated payload of message bytes
#[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)]
pub struct Payload(pub Arc<Vec<u8>>);
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]

/// "Orientation" of a port, determining whether they can send or receive messages with `put` and `get` respectively.
#[repr(C)]
#[derive(serde::Serialize, serde::Deserialize)]
pub enum Polarity {
    Putter, // output port (from the perspective of the component)
    Getter, // input port (from the perspective of the component)
}
#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)]

/// "Orientation" of a transport-layer network endpoint, dictating how it's connection procedure should
/// be conducted. Corresponds with connect() / accept() familiar to TCP socket programming.
#[repr(C)]
pub enum EndpointPolarity {
    Active,  // calls connect()
    Passive, // calls bind() listen() accept()
}

#[derive(Debug, Clone)]
pub(crate) enum NonsyncBlocker {
    Inconsistent,
    ComponentExit,
    SyncBlockStart,
}
#[derive(Debug, Clone)]
pub(crate) enum SyncBlocker {
    Inconsistent,
    SyncBlockEnd,
    CouldntReadMsg(PortId),
    CouldntCheckFiring(PortId),
    PutMsg(PortId, Payload),
}
pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
pub(crate) struct DebuggableIter<I: Iterator<Item = T> + Clone, T: Debug>(pub(crate) I);
///////////////////// IMPL /////////////////////
impl IdParts for Id {
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
        (self.connector_id, self.u32_suffix)
    }
}
impl IdParts for PortId {
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
        self.0.id_parts()
    }
}
impl IdParts for ComponentId {
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
        self.0.id_parts()
    }
}
impl U32Stream {
    pub(crate) fn next(&mut self) -> u32 {
        if self.next == u32::MAX {
            panic!("NO NEXT!")
        }
        self.next += 1;
        self.next - 1
    }
    pub(crate) fn n_skipped(mut self, n: u32) -> Self {
        self.next = self.next.saturating_add(n);
        self
    }
}
impl From<Id> for PortId {
    fn from(id: Id) -> PortId {
        Self(id)
    }
}
impl From<Id> for ComponentId {
    fn from(id: Id) -> Self {
        Self(id)
    }
}
impl From<&[u8]> for Payload {
    fn from(s: &[u8]) -> Payload {
        Payload(Arc::new(s.to_vec()))
    }
}
impl Payload {
    /// Create a new payload of uninitialized bytes with the given length.
    pub fn new(len: usize) -> Payload {
        let mut v = Vec::with_capacity(len);
        unsafe {
            v.set_len(len);
        }
        Payload(Arc::new(v))
    }
    /// Returns the length of the payload's byte sequence
    pub fn len(&self) -> usize {
        self.0.len()
    }
    /// Allows shared reading of the payload's contents
    pub fn as_slice(&self) -> &[u8] {
        &self.0
    }

    /// Allows mutation of the payload's contents.
    /// Results in a deep copy in the event this payload is aliased.
    pub fn as_mut_vec(&mut self) -> &mut Vec<u8> {
        Arc::make_mut(&mut self.0)
    }

    /// Modifies this payload, concatenating the given immutable payload's contents.
    /// Results in a deep copy in the event this payload is aliased.
    pub fn concatenate_with(&mut self, other: &Self) {
        let bytes = other.as_slice().iter().copied();
        let me = self.as_mut_vec();
        me.extend(bytes);
    }
}
impl serde::Serialize for Payload {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        let inner: &Vec<u8> = &self.0;
        inner.serialize(serializer)
    }
}
impl<'de> serde::Deserialize<'de> for Payload {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        let inner: Vec<u8> = Vec::deserialize(deserializer)?;
        Ok(Self(Arc::new(inner)))
    }
}
impl From<Vec<u8>> for Payload {
    fn from(s: Vec<u8>) -> Self {
        Self(s.into())
    }
}
impl Debug for PortId {
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
        let (a, b) = self.id_parts();
        write!(f, "pid{}_{}", a, b)
    }
}
impl Debug for ComponentId {
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
        let (a, b) = self.id_parts();
        write!(f, "cid{}_{}", a, b)
    }
}
impl Debug for Payload {
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
        write!(f, "Payload[{:?}]", DenseDebugHex(self.as_slice()))
    }
}
impl std::ops::Not for Polarity {
    type Output = Self;
    fn not(self) -> Self::Output {
        use Polarity::*;
        match self {
            Putter => Getter,
            Getter => Putter,
        }
    }
}
impl Debug for DenseDebugHex<'_> {
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
        for b in self.0 {
            write!(f, "{:02X?}", b)?;
        }
        Ok(())
    }
}

impl<I: Iterator<Item = T> + Clone, T: Debug> Debug for DebuggableIter<I, T> {
    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
        f.debug_list().entries(self.0.clone()).finish()
    }
}