Changeset - 869d51fc1127
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-07-02 14:37:05
christopher.esterhuyse@gmail.com
refactored predicate methods to be more intuitive
6 files changed with 128 insertions and 132 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -32,9 +32,10 @@ lazy_static = "1.4.0"
 

	
 
[lib]
 
# compile target: dynamically linked library using C ABI
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi"]
 
default = ["ffi", "session_optimization"]
 
ffi = [] # no feature dependencies
 
endpoint_logging = [] # see src/macros where a conditional check include endpoint logging
 
\ No newline at end of file
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
src/common.rs
Show inline comments
 
@@ -38,13 +38,13 @@ pub struct ProtoComponentId(Id);
 
)]
 
#[repr(C)]
 
pub struct Id {
 
    pub(crate) connector_id: ConnectorId,
 
    pub(crate) u32_suffix: PortSuffix,
 
}
 
#[derive(Debug, Default)]
 
#[derive(Clone, Debug, Default)]
 
pub struct U32Stream {
 
    next: u32,
 
}
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
@@ -65,17 +65,12 @@ pub enum Polarity {
 
)]
 
#[repr(C)]
 
pub enum EndpointPolarity {
 
    Active,  // calls connect()
 
    Passive, // calls bind() listen() accept()
 
}
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub(crate) struct FiringVar(pub(crate) PortId);
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) enum NonsyncBlocker {
 
    Inconsistent,
 
    ComponentExit,
 
    SyncBlockStart,
 
}
 
@@ -162,17 +157,12 @@ impl From<Vec<u8>> for Payload {
 
}
 
impl Debug for PortId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "ptID({}'{})", self.0.connector_id, self.0.u32_suffix)
 
    }
 
}
 
impl Debug for FiringVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "fvID({}'{})", (self.0).0.connector_id, (self.0).0.u32_suffix)
 
    }
 
}
 
impl Debug for ProtoComponentId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "pcID({}'{})", self.0.connector_id, self.0.u32_suffix)
 
    }
 
}
 
impl Debug for Payload {
src/runtime/communication.rs
Show inline comments
 
@@ -153,12 +153,14 @@ impl Connector {
 
            cu.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 

	
 
        let mut spec_var_stream = cu.id_manager.new_spec_var_stream();
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
            cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
        log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len());
 
@@ -221,29 +223,33 @@ impl Connector {
 
        // 2. kick off the native
 
        log!(
 
            cu.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_branch_spec_var = spec_var_stream.next();
 
        log!(cu.logger, "Native branch spec var is {:?}", native_branch_spec_var);
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for (index, NativeBatch { to_get, to_put }) in
 
            comm.native_batches.drain(..).enumerate()
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                let mut predicate =
 
                    Predicate::default().inserted(native_branch_spec_var, branch_spec_val);
 
                // assign trues for ports that fire
 
                let firing_ports: HashSet<PortId> =
 
                    to_get.iter().chain(to_put.keys()).copied().collect();
 
                for &port in to_get.iter().chain(to_put.keys()) {
 
                    let var = cu.port_info.firing_var_for(port);
 
                    predicate.assigned.insert(var, true);
 
                    let var = cu.port_info.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // assign falses for silent ports
 
                for &port in cu.native_ports.difference(&firing_ports) {
 
                    let var = cu.port_info.firing_var_for(port);
 
                    if let Some(true) = predicate.assigned.insert(var, false) {
 
                    let var = cu.port_info.spec_var_for(port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                predicate
 
            };
 
@@ -266,14 +272,15 @@ impl Connector {
 
                    &mut *cu.logger,
 
                    Route::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                return Err(Se::IndistinguishableBatches([index, existing.index]));
 
            if let Some(_existing) = branching_native.branches.insert(predicate, branch) {
 
                unreachable!()
 
                // return Err(Se::IndistinguishableBatches([index, existing.index]));
 
            }
 
        }
 
        // restore the invariant
 
        comm.native_batches.push(Default::default());
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
@@ -642,13 +649,13 @@ impl BranchingNative {
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = cu.port_info.firing_var_for(getter);
 
            let var = cu.port_info.spec_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    log!(
 
@@ -662,40 +669,40 @@ impl BranchingNative {
 
                        &mut *cu.logger,
 
                        route,
 
                        predicate.clone(),
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(true) {
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    cu.logger,
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                finished.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
            use AllMapperResult as Amr;
 
            match predicate.all_mapper(&send_payload_msg.predicate) {
 
                Amr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        cu.logger,
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
                        &predicate
 
                    );
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                Amr::Equivalent | Amr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(cu.logger, "branch pred covers it! Accept the msg");
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                Amr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.logger,
 
@@ -703,13 +710,13 @@ impl BranchingNative {
 
                        &predicate2,
 
                        &predicate
 
                    );
 
                    finished.insert(predicate, branch);
 
                    finished.insert(predicate2, branch2);
 
                }
 
                Csr::New(predicate2) => {
 
                Amr::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.logger,
 
                        "new subsuming pred created {:?}. forking and feeding",
 
@@ -726,13 +733,13 @@ impl BranchingNative {
 
            logger,
 
            "Collapsing native with {} branch preds {:?}",
 
            self.branches.len(),
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            if branch.to_get.is_empty() && solution_predicate.satisfies(&branch_predicate) {
 
            if branch.to_get.is_empty() && solution_predicate.consistent_with(&branch_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
@@ -772,14 +779,14 @@ impl BranchingProtoComponent {
 
                    // branch is inconsistent. throw it away
 
                    drop((predicate, branch));
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for &port in ports.iter() {
 
                        let var = cu.port_info.firing_var_for(port);
 
                        predicate.assigned.entry(var).or_insert(false);
 
                        let var = cu.port_info.spec_var_for(port);
 
                        predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                    }
 
                    // submit solution for this component
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                        predicate.clone(),
 
@@ -792,25 +799,25 @@ impl BranchingProtoComponent {
 
                    // move to "blocked"
 
                    assert!(!branch.inbox.contains_key(&port));
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    let var = cu.port_info.firing_var_for(port);
 
                    let var = cu.port_info.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, false), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, true), branch);
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                    // overwrite assignment
 
                    let var = cu.port_info.firing_var_for(putter);
 
                    let was = predicate.assigned.insert(var, true);
 
                    if was == Some(false) {
 
                    let var = cu.port_info.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
@@ -848,36 +855,36 @@ impl BranchingProtoComponent {
 
        for (predicate, mut branch) in branches.drain() {
 
            if branch.ended {
 
                log!(logger, "Skipping ended branch with {:?}", &predicate);
 
                blocked.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            use AllMapperResult as Amr;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
            match predicate.all_mapper(&send_payload_msg.predicate) {
 
                Amr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "skipping branch");
 
                    blocked.insert(predicate, branch);
 
                }
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                Amr::Equivalent | Amr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    log!(logger, "feeding this branch without altering its predicate");
 
                    branch.feed_msg(getter, send_payload_msg.payload.clone());
 
                    unblocked.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                Amr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    log!(logger, "Forking this branch, giving it the predicate of the msg");
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
                Csr::New(predicate2) => {
 
                Amr::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    log!(logger, "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
@@ -901,13 +908,13 @@ impl BranchingProtoComponent {
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && branch_predicate.satisfies(solution_predicate) {
 
            if branch.ended && solution_predicate.consistent_with(&branch_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
@@ -1031,14 +1038,14 @@ impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> {
 
            Err(SyncError::MalformedStateError(MalformedStateError::GetterUnknownFor { putter }))
 
        }
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.port_info.firing_var_for(port);
 
        self.predicate.query(var)
 
        let var = self.port_info.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.inbox.get(&port)
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
src/runtime/mod.rs
Show inline comments
 
@@ -42,12 +42,21 @@ pub(crate) struct NonsyncProtoContext<'a> {
 
pub(crate) struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    predicate: &'a Predicate,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 

	
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
struct SpecVar(PortId);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
struct SpecVal(u16);
 
#[derive(Debug)]
 
struct RoundOk {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 
#[derive(Default)]
 
@@ -112,13 +121,13 @@ enum CommMsgContents {
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SendPayloadMsg {
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
enum CommonSatResult {
 
enum AllMapperResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 
@@ -150,12 +159,17 @@ struct Neighborhood {
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
@@ -191,13 +205,13 @@ struct ConnectorUnphased {
 
enum ConnectorPhased {
 
    Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16 },
 
    Communication(Box<ConnectorCommunication>),
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
struct Predicate {
 
    assigned: BTreeMap<FiringVar, bool>,
 
    assigned: BTreeMap<SpecVar, SpecVal>,
 
}
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
@@ -229,27 +243,41 @@ impl<T: std::cmp::Ord> VecSet<T> {
 
    }
 
    fn pop(&mut self) -> Option<T> {
 
        self.vec.pop()
 
    }
 
}
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
        FiringVar(match self.polarities.get(&port).unwrap() {
 
    fn spec_var_for(&self, port: PortId) -> SpecVar {
 
        SpecVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        })
 
    }
 
}
 
impl SpecVarStream {
 
    fn next(&mut self) -> SpecVar {
 
        let phantom_port: PortId =
 
            Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }
 
                .into();
 
        SpecVar(phantom_port)
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            connector_id,
 
            port_suffix_stream: Default::default(),
 
            proto_component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_spec_var_stream(&self) -> SpecVarStream {
 
        SpecVarStream {
 
            connector_id: self.connector_id,
 
            port_suffix_stream: self.port_suffix_stream.clone(),
 
        }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_proto_component_id(&mut self) -> ProtoComponentId {
 
        Id {
 
            connector_id: self.connector_id,
 
@@ -335,47 +363,38 @@ impl Connector {
 
        );
 
        Ok(())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn inserted(mut self, k: FiringVar, v: bool) -> Self {
 
    pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn satisfies(&self, other: &Self) -> bool {
 
        let mut s_it = self.assigned.iter();
 
        let mut s = if let Some(s) = s_it.next() {
 
            s
 
        } else {
 
            return other.assigned.is_empty();
 
        };
 
        for (oid, ob) in other.assigned.iter() {
 
            while s.0 < oid {
 
                s = if let Some(s) = s_it.next() {
 
                    s
 
                } else {
 
                    return false;
 
                };
 
            }
 
            if s.0 > oid || s.1 != ob {
 
                return false;
 
    pub fn consistent_with(&self, other: &Self) -> bool {
 
        let [larger, smaller] =
 
            if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] };
 

	
 
        for (var, val) in smaller.assigned.iter() {
 
            match larger.assigned.get(var) {
 
                Some(val2) if val2 != val => return false,
 
                _ => {}
 
            }
 
        }
 
        true
 
    }
 

	
 
    /// Given self and other, two predicates, return the most general Predicate possible, N
 
    /// such that n.satisfies(self) && n.satisfies(other).
 
    /// If none exists Nonexistant is returned.
 
    /// If the resulting predicate is equivlanet to self, other, or both,
 
    /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
    /// otherwise New(N) is returned.
 
    fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
        use CommonSatResult as Csr;
 
    fn all_mapper(&self, other: &Self) -> AllMapperResult {
 
        use AllMapperResult as Amr;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // lists of assignments in self but not other and vice versa.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
@@ -400,34 +419,34 @@ impl Predicate {
 
                        // s is missing this element
 
                        o_not_s.push((oid, ob));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Csr::Nonexistant;
 
                        return Amr::Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Csr::Equivalent,       // ... equivalent to both.
 
            [false, true] => Csr::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Csr::LatterNotFormer, // ... equivalent to other.
 
            [true, true] => Amr::Equivalent,       // ... equivalent to both.
 
            [false, true] => Amr::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Amr::LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                Csr::New(new)
 
                Amr::New(new)
 
            }
 
        }
 
    }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
@@ -435,40 +454,24 @@ impl Predicate {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, var: FiringVar) -> Option<bool> {
 
    pub fn query(&self, var: SpecVar) -> Option<SpecVal> {
 
        self.assigned.get(&var).copied()
 
    }
 
}
 
impl<T: Debug + std::cmp::Ord> Debug for VecSet<T> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_set().entries(self.vec.iter()).finish()
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        struct MySet<'a>(&'a Predicate, bool);
 
        impl Debug for MySet<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                let iter = self.0.assigned.iter().filter_map(|(port, &firing)| {
 
                    if firing == self.1 {
 
                        Some(port)
 
                    } else {
 
                        None
 
                    }
 
                });
 
                f.debug_set().entries(iter).finish()
 
            }
 
        }
 
        f.debug_struct("Predicate")
 
            .field("Trues", &MySet(self, true))
 
            .field("Falses", &MySet(self, false))
 
            .finish()
 
        f.debug_tuple("Predicate").field(&self.assigned).finish()
 
    }
 
}
 
impl serde::Serialize for SerdeProtocolDescription {
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
@@ -483,43 +486,27 @@ impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription {
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: ProtocolDescription = ProtocolDescription::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
 

	
 
#[test]
 
fn bincode_serde() {
 
    let mut b = Vec::with_capacity(64);
 
    use bincode::config::Options;
 
    let opt = bincode::config::DefaultOptions::default();
 
    opt.serialize_into(&mut b, &Decision::Failure).unwrap();
 
    println!("failure  {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(&mut b, &CommMsgContents::Suggest { suggestion: Decision::Failure })
 
        .unwrap();
 
    println!("decision {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(
 
        &mut b,
 
        &CommMsg {
 
            round_index: 4,
 
            contents: CommMsgContents::Suggest { suggestion: Decision::Failure },
 
        },
 
    )
 
    .unwrap();
 
    println!("commmsg  {:x?}", b);
 
    b.clear();
 

	
 
    opt.serialize_into(
 
        &mut b,
 
        &Msg::CommMsg(CommMsg {
 
            round_index: 4,
 
            contents: CommMsgContents::Suggest { suggestion: Decision::Failure },
 
        }),
 
    )
 
    .unwrap();
 
    println!("msg      {:x?}", b);
 
    b.clear();
 
impl Debug for SpecVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_tuple("vrID").field(&self.0).finish()
 
    }
 
}
 
impl SpecVal {
 
    const FIRING: Self = SpecVal(1);
 
    const SILENT: Self = SpecVal(0);
 
    fn is_firing(self) -> bool {
 
        self == Self::FIRING
 
        // all else treated as SILENT
 
    }
 
    fn iter_domain() -> impl Iterator<Item = Self> {
 
        (0..).map(SpecVal)
 
    }
 
}
 
impl Debug for SpecVal {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        self.0.fmt(f)
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -84,13 +84,15 @@ impl Connector {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                };
 
                session_optimize(cu, &mut comm, deadline)?;
 
                if cfg!(feature = "session_optimization") {
 
                    session_optimize(cu, &mut comm, deadline)?;
 
                }
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                self.phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
src/runtime/tests.rs
Show inline comments
 
@@ -574,6 +574,15 @@ fn together() {
 
            c.sync(Some(Duration::from_millis(500))).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn native_batch_distinguish() {
 
    let test_log_path = Path::new("./logs/native_batch_distinguish");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.next_batch().unwrap();
 
    c.sync(Some(Duration::from_secs(3))).unwrap();
 
}
0 comments (0 inline, 0 general)