Changeset - 842acacee86d
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-06-23 12:03:21
christopher.esterhuyse@gmail.com
fleshing out native <-> native communication phase
5 files changed with 384 insertions and 56 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -5,6 +5,7 @@ use crate::common::*;
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
@@ -77,7 +78,8 @@ impl NonsyncProtoContext<'_> {
 
}
 
impl SyncProtoContext<'_> {
 
    pub fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        self.predicate.query(port)
 
        let var = self.port_info.firing_var_for(port);
 
        self.predicate.query(var)
 
    }
 
    pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.inbox.get(&port)
 
@@ -116,6 +118,36 @@ impl Connector {
 
            }
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
        // returns index of new batch
 
        use NextBatchError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
                native_batches.push(Default::default());
 
                Ok(native_batches.len() - 1)
 
            }
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        if !self.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        if Getter != *self.port_info.polarities.get(&port).unwrap() {
 
            return Err(WrongPolarity);
 
        }
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
                let batch = native_batches.last_mut().unwrap();
 
                if !batch.to_get.insert(port) {
 
                    return Err(MultipleOpsOnPort);
 
                }
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn sync(&mut self, timeout: Duration) -> Result<usize, SyncError> {
 
        use SyncError::*;
 
        match &mut self.phased {
 
@@ -128,7 +160,7 @@ impl Connector {
 
                round_result,
 
                ..
 
            } => {
 
                let _deadline = Instant::now() + timeout;
 
                let deadline = Instant::now() + timeout;
 
                let logger: &mut dyn Logger = &mut *self.logger;
 
                // 1. run all proto components to Nonsync blockers
 
                log!(
 
@@ -189,8 +221,8 @@ impl Connector {
 
                    branching_proto_components.len(),
 
                );
 

	
 
                // NOTE: all msgs in outbox are of form (Putter, Payload)
 
                let mut payload_outbox: Vec<(PortId, SendPayloadMsg)> = vec![];
 
                // NOTE: all msgs in outbox are of form (Getter, Payload)
 
                let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
                // create the solution storage
 
                let mut solution_storage = {
 
@@ -218,21 +250,25 @@ impl Connector {
 
                        let mut predicate = Predicate::default();
 
                        // assign trues
 
                        for &port in to_get.iter().chain(to_put.keys()) {
 
                            predicate.assigned.insert(port, true);
 
                            let var = self.port_info.firing_var_for(port);
 
                            predicate.assigned.insert(var, true);
 
                        }
 
                        // assign falses
 
                        for &port in self.native_ports.iter() {
 
                            predicate.assigned.entry(port).or_insert(false);
 
                            let var = self.port_info.firing_var_for(port);
 
                            predicate.assigned.entry(var).or_insert(false);
 
                        }
 
                        predicate
 
                    };
 
                    log!(logger, "Native branch {} has pred {:?}", index, &predicate);
 

	
 
                    // put all messages
 
                    for (port, payload) in to_put {
 
                        let msg = SendPayloadMsg { payload_predicate: predicate.clone(), payload };
 
                    for (putter, payload) in to_put {
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        log!(logger, "Native branch {} sending msg {:?}", index, &msg);
 
                        payload_outbox.push((port, msg));
 
                        // rely on invariant: sync batches respect port polarity
 
                        let getter = *self.port_info.peers.get(&putter).unwrap();
 
                        payloads_to_get.push((getter, msg));
 
                    }
 
                    if to_get.is_empty() {
 
                        log!(logger, "Native submitting trivial solution for index {}", index);
 
@@ -276,6 +312,7 @@ impl Connector {
 
                            let mut ctx = SyncProtoContext {
 
                                logger,
 
                                predicate: &predicate,
 
                                port_info: &self.port_info,
 
                                proto_component_id: *proto_component_id,
 
                                inbox: &branch.inbox,
 
                            };
 
@@ -296,7 +333,8 @@ impl Connector {
 
                                B::SyncBlockEnd => {
 
                                    // make concrete all variables
 
                                    for &port in proto_component.ports.iter() {
 
                                        predicate.assigned.entry(port).or_insert(false);
 
                                        let var = self.port_info.firing_var_for(port);
 
                                        predicate.assigned.entry(var).or_insert(false);
 
                                    }
 
                                    // submit solution for this component
 
                                    solution_storage.submit_and_digest_subtree_solution(
 
@@ -311,14 +349,15 @@ impl Connector {
 
                                }
 
                                B::CouldntReadMsg(port) => {
 
                                    // move to "blocked"
 
                                    assert!(predicate.query(port).is_none());
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    assert!(predicate.query(var).is_none());
 
                                    assert!(!branch.inbox.contains_key(&port));
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntCheckFiring(port) => {
 
                                    // sanity check
 
                                    assert!(predicate.query(port).is_none());
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    assert!(predicate.query(var).is_none());
 
                                    // keep forks in "unblocked"
 
                                    unblocked_to.insert(
 
                                        predicate.clone().inserted(var, false),
 
@@ -326,24 +365,28 @@ impl Connector {
 
                                    );
 
                                    unblocked_to.insert(predicate.inserted(var, true), branch);
 
                                }
 
                                B::PutMsg(port, payload) => {
 
                                B::PutMsg(putter, payload) => {
 
                                    // sanity check
 
                                    assert_eq!(Some(&Putter), self.port_info.polarities.get(&port));
 
                                    assert_eq!(
 
                                        Some(&Putter),
 
                                        self.port_info.polarities.get(&putter)
 
                                    );
 
                                    // overwrite assignment
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    let var = self.port_info.firing_var_for(putter);
 

	
 
                                    let was = predicate.assigned.insert(var, true);
 
                                    if was == Some(false) {
 
                                        log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var);
 
                                        log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                                        // discard forever
 
                                        drop((predicate, branch));
 
                                    } else {
 
                                        // keep in "unblocked"
 
                                        log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, port, var);
 
                                        payload_outbox.push((
 
                                            port,
 
                                        let getter = *self.port_info.peers.get(&putter).unwrap();
 
                                        log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                                        payloads_to_get.push((
 
                                            getter,
 
                                            SendPayloadMsg {
 
                                                payload_predicate: predicate.clone(),
 
                                                predicate: predicate.clone(),
 
                                                payload,
 
                                            },
 
                                        ));
 
@@ -358,19 +401,54 @@ impl Connector {
 
                log!(logger, "All proto components are blocked");
 

	
 
                log!(logger, "Entering decision loop...");
 
                endpoint_manager.undelay_all();
 
                let decision = 'undecided: loop {
 
                    // check if we already have a solution
 
                    // drain payloads_to_get, sending them through endpoints / feeding them to components
 
                    while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
                        assert!(self.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                        match self.port_info.routes.get(&getter).unwrap() {
 
                            Route::Endpoint { index } => {
 
                                let msg = Msg::CommMsg(CommMsg {
 
                                    round_index: *round_index,
 
                                    contents: CommMsgContents::SendPayload(send_payload_msg),
 
                                });
 
                                endpoint_manager.send_to(*index, &msg).unwrap();
 
                            }
 
                            Route::LocalComponent(LocalComponentId::Native) => branching_native
 
                                .feed_msg(
 
                                    logger,
 
                                    &self.port_info,
 
                                    &mut solution_storage,
 
                                    getter,
 
                                    send_payload_msg,
 
                                ),
 
                            Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => {
 
                                if let Some(branching_component) =
 
                                    branching_proto_components.get_mut(&proto_component_id)
 
                                {
 
                                    branching_component.feed_msg(
 
                                        logger,
 
                                        &self.port_info,
 
                                        &mut solution_storage,
 
                                        getter,
 
                                        send_payload_msg,
 
                                    )
 
                                }
 
                            }
 
                        }
 
                    }
 

	
 
                    // check if we have a solution yet
 
                    log!(logger, "Check if we have any local decisions...");
 
                    for solution in solution_storage.iter_new_local_make_old() {
 
                        log!(logger, "New local decision with solution {:?}...", &solution);
 
                        match neighborhood.parent {
 
                            Some(parent) => {
 
                                log!(logger, "Forwarding to my parent {:?}", parent);
 
                                let suggestion = Decision::Success(solution);
 
                                let msg = Msg::CommMsg(CommMsg {
 
                                    round_index: *round_index,
 
                                    contents: CommMsgContents::Elaborate {
 
                                        partial_oracle: solution,
 
                                    },
 
                                    contents: CommMsgContents::Suggest { suggestion },
 
                                });
 
                                endpoint_manager.send_to(parent, &msg).unwrap();
 
                            }
 
@@ -381,7 +459,109 @@ impl Connector {
 
                        }
 
                    }
 

	
 
                    // TODO send / recv messages
 
                    // stuck! make progress by receiving a msg
 
                    // try recv messages arriving through endpoints
 
                    log!(logger, "No decision yet. Let's recv an endpoint msg...");
 
                    {
 
                        let (endpoint_index, msg) =
 
                            endpoint_manager.try_recv_any(deadline).unwrap();
 
                        log!(logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg);
 
                        let comm_msg_contents = match msg {
 
                            Msg::SetupMsg(..) => {
 
                                log!(logger, "Discarding setup message; that phase is over");
 
                                continue 'undecided;
 
                            }
 
                            Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(round_index) {
 
                                Ordering::Equal => comm_msg.contents,
 
                                Ordering::Less => {
 
                                    log!(
 
                                        logger,
 
                                        "We are in round {}, but msg is for round {}. Discard",
 
                                        comm_msg.round_index,
 
                                        round_index,
 
                                    );
 
                                    drop(comm_msg);
 
                                    continue 'undecided;
 
                                }
 
                                Ordering::Greater => {
 
                                    log!(
 
                                        logger,
 
                                        "We are in round {}, but msg is for round {}. Buffer",
 
                                        comm_msg.round_index,
 
                                        round_index,
 
                                    );
 
                                    endpoint_manager
 
                                        .delayed_messages
 
                                        .push((endpoint_index, Msg::CommMsg(comm_msg)));
 
                                    continue 'undecided;
 
                                }
 
                            },
 
                        };
 
                        match comm_msg_contents {
 
                            CommMsgContents::SendPayload(send_payload_msg) => {
 
                                let getter = endpoint_manager.endpoint_exts[endpoint_index]
 
                                    .getter_for_incoming;
 
                                assert!(self.port_info.polarities.get(&getter) == Some(&Getter));
 
                                log!(
 
                                    logger,
 
                                    "Msg routed to getter port {:?}. Buffer for recv loop",
 
                                    getter,
 
                                );
 
                                payloads_to_get.push((getter, send_payload_msg));
 
                            }
 
                            CommMsgContents::Suggest { suggestion } => {
 
                                // only accept this control msg through a child endpoint
 
                                if neighborhood.children.binary_search(&endpoint_index).is_ok() {
 
                                    match suggestion {
 
                                        Decision::Success(predicate) => {
 
                                            // child solution contributes to local solution
 
                                            log!(
 
                                                logger,
 
                                                "Child provided solution {:?}",
 
                                                &predicate
 
                                            );
 
                                            let route = Route::Endpoint { index: endpoint_index };
 
                                            solution_storage.submit_and_digest_subtree_solution(
 
                                                logger, route, predicate,
 
                                            );
 
                                        }
 
                                        Decision::Failure => match neighborhood.parent {
 
                                            None => {
 
                                                log!(
 
                                                    logger,
 
                                                    "As sink, I decide on my child's failure"
 
                                                );
 
                                                // I am the sink. Decide on failed
 
                                                break 'undecided Decision::Failure;
 
                                            }
 
                                            Some(parent) => {
 
                                                log!(logger, "Forwarding failure through my parent endpoint {:?}", parent);
 
                                                // I've got a parent. Forward the failure suggestion.
 
                                                let msg = Msg::CommMsg(CommMsg {
 
                                                    round_index: *round_index,
 
                                                    contents: CommMsgContents::Suggest {
 
                                                        suggestion,
 
                                                    },
 
                                                });
 
                                                endpoint_manager.send_to(parent, &msg).unwrap();
 
                                            }
 
                                        },
 
                                    }
 
                                } else {
 
                                    log!(logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, endpoint_index);
 
                                }
 
                            }
 
                            CommMsgContents::Announce { decision } => {
 
                                if Some(endpoint_index) == neighborhood.parent {
 
                                    // adopt this decision
 
                                    break 'undecided decision;
 
                                } else {
 
                                    log!(logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, endpoint_index);
 
                                }
 
                            }
 
                        }
 
                    }
 
                    log!(logger, "Endpoint msg recv done");
 
                };
 
                log!(logger, "Committing to decision {:?}!", &decision);
 

	
 
@@ -426,6 +606,72 @@ impl Connector {
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        port_info: &PortInfo,
 
        solution_storage: &mut SolutionStorage,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        assert!(port_info.polarities.get(&getter).copied() == Some(Getter));
 
        println!("BEFORE {:#?}", &self.branches);
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            // check if this branch expects to receive it
 
            let var = port_info.firing_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    let route = Route::LocalComponent(LocalComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        logger,
 
                        route,
 
                        predicate.clone(),
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(true) {
 
                // optimization. Don't bother trying this branch
 
                finished.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate
 
                    let mut branch2 = branch.clone();
 
                    // original branch untouched
 
                    finished.insert(predicate, branch);
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    finished.insert(predicate2, branch2);
 
                }
 
                Csr::New(new_predicate) => {
 
                    // fork branch, give fork the message and the new predicate
 
                    let mut branch2 = branch.clone();
 
                    // original branch untouched
 
                    finished.insert(predicate, branch);
 
                    feed_branch(&mut branch2, &new_predicate);
 
                    finished.insert(new_predicate, branch2);
 
                }
 
            }
 
        }
 
        println!("AFTER {:#?}", &self.branches);
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap<PortId, Payload>) {
 
        for (branch_predicate, branch) in self.branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
@@ -437,6 +683,16 @@ impl BranchingNative {
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn feed_msg(
 
        &mut self,
 
        _logger: &mut dyn Logger,
 
        _port_info: &PortInfo,
 
        _solution_storage: &mut SolutionStorage,
 
        _getter: PortId,
 
        _send_payload_msg: SendPayloadMsg,
 
    ) {
 
        todo!()
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
src/runtime/error.rs
Show inline comments
 
@@ -33,3 +33,8 @@ pub enum GottenError {
 
    PortDidntGet,
 
    PreviousSyncFailed,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum NextBatchError {
 
    NotConnected,
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -8,6 +8,10 @@ mod my_tests;
 
use crate::common::*;
 
use error::*;
 

	
 
#[derive(
 
    Debug, Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct FiringVar(PortId);
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
pub enum LocalComponentId {
 
    Native,
 
@@ -48,13 +52,12 @@ pub struct CommMsg {
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    Elaborate { partial_oracle: Predicate }, // SINKWARD
 
    Failure,                                 // SINKWARD
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SendPayloadMsg {
 
    payload_predicate: Predicate,
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
@@ -86,7 +89,7 @@ pub struct EndpointSetup {
 
#[derive(Debug)]
 
pub struct EndpointExt {
 
    endpoint: Endpoint,
 
    inp_for_emerging_msgs: PortId,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
pub struct Neighborhood {
 
@@ -151,7 +154,7 @@ pub enum ConnectorPhased {
 
pub struct StringLogger(ControllerId, String);
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub struct Predicate {
 
    pub assigned: BTreeMap<PortId, bool>,
 
    pub assigned: BTreeMap<FiringVar, bool>,
 
}
 
#[derive(Debug, Default)]
 
pub struct NativeBatch {
 
@@ -175,6 +178,7 @@ pub struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    predicate: &'a Predicate,
 
    proto_component_id: ProtoComponentId,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 

	
 
@@ -228,11 +232,11 @@ pub struct SyncProtoContext<'a> {
 

	
 
////////////////
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> PortId {
 
        match self.polarities.get(&port).unwrap() {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
        FiringVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        }
 
        })
 
    }
 
}
 
impl IdManager {
 
@@ -423,10 +427,9 @@ impl Connector {
 
//         f.pad("]")
 
//     }
 
// }
 

	
 
impl Predicate {
 
    #[inline]
 
    pub fn inserted(mut self, k: PortId, v: bool) -> Self {
 
    pub fn inserted(mut self, k: FiringVar, v: bool) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 
@@ -460,7 +463,7 @@ impl Predicate {
 
    /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
    /// otherwise New(N) is returned.
 
    pub fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
        use CommonSatResult::*;
 
        use CommonSatResult as Csr;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
@@ -491,7 +494,7 @@ impl Predicate {
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Nonexistant;
 
                        return Csr::Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
@@ -502,9 +505,9 @@ impl Predicate {
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Equivalent,       // ... equivalent to both.
 
            [false, true] => FormerNotLatter, // ... equivalent to self.
 
            [true, false] => LatterNotFormer, // ... equivalent to other.
 
            [true, true] => Csr::Equivalent,       // ... equivalent to both.
 
            [false, true] => Csr::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Csr::LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
@@ -512,22 +515,22 @@ impl Predicate {
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                New(new)
 
                Csr::New(new)
 
            }
 
        }
 
    }
 

	
 
    pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = PortId> + '_ {
 
        self.assigned
 
            .iter()
 
            .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    }
 
    // pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = FiringVar> + '_ {
 
    //     self.assigned
 
    //         .iter()
 
    //         .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    // }
 

	
 
    pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator<Item = PortId>, value: bool) {
 
        for channel_id in channel_ids {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
 
    }
 
    // pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator<Item = PortId>, value: bool) {
 
    //     for channel_id in channel_ids {
 
    //         self.assigned.entry(channel_id).or_insert(value);
 
    //     }
 
    // }
 
    // pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option<bool> {
 
    //     self.assigned.insert(channel_id, value)
 
    // }
 
@@ -541,8 +544,8 @@ impl Predicate {
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, x: PortId) -> Option<bool> {
 
        self.assigned.get(&x).copied()
 
    pub fn query(&self, var: FiringVar) -> Option<bool> {
 
        self.assigned.get(&var).copied()
 
    }
 
}
 
impl Debug for Predicate {
src/runtime/my_tests.rs
Show inline comments
 
@@ -145,3 +145,67 @@ fn connected_gotten_err_ungotten() {
 
    c.sync(Duration::from_secs(1)).unwrap();
 
    assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn native_polarity_checks() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    // fail...
 
    c.get(o).unwrap_err();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
    // succeed..
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
}
 

	
 
#[test]
 
fn native_multiple_gets() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.get(i).unwrap();
 
    c.get(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn next_batch() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.next_batch().unwrap_err();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
}
 

	
 
#[test]
 
fn native_sync() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
}
 

	
 
#[test]
 
fn native_message_pass() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
src/runtime/setup2.rs
Show inline comments
 
@@ -96,7 +96,7 @@ impl Connector {
 
                Err(())
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(self.logger, "~~~ CONNECT called with timeout {:?}", timeout);
 
                log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = Instant::now() + timeout;
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
@@ -266,12 +266,12 @@ fn new_endpoint_manager(
 
    }
 
    let endpoint_exts = todos
 
        .into_iter()
 
        .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
 
        .map(|Todo { todo_endpoint, local_port, .. }| EndpointExt {
 
            endpoint: match todo_endpoint {
 
                TodoEndpoint::Endpoint(endpoint) => endpoint,
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
            },
 
            inp_for_emerging_msgs: recv_peer_port.unwrap(),
 
            getter_for_incoming: local_port,
 
        })
 
        .collect();
 
    Ok(EndpointManager {
0 comments (0 inline, 0 general)