Changeset - 842acacee86d
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-06-23 12:03:21
christopher.esterhuyse@gmail.com
fleshing out native <-> native communication phase
5 files changed with 385 insertions and 57 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -2,12 +2,13 @@ use super::*;
 
use crate::common::*;
 

	
 
////////////////
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
@@ -74,13 +75,14 @@ impl NonsyncProtoContext<'_> {
 
        );
 
        [o, i]
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        self.predicate.query(port)
 
        let var = self.port_info.firing_var_for(port);
 
        self.predicate.query(var)
 
    }
 
    pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.inbox.get(&port)
 
    }
 
}
 

	
 
@@ -113,25 +115,55 @@ impl Connector {
 
                }
 
                batch.to_put.insert(port, payload);
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
        // returns index of new batch
 
        use NextBatchError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
                native_batches.push(Default::default());
 
                Ok(native_batches.len() - 1)
 
            }
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        if !self.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        if Getter != *self.port_info.polarities.get(&port).unwrap() {
 
            return Err(WrongPolarity);
 
        }
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
                let batch = native_batches.last_mut().unwrap();
 
                if !batch.to_get.insert(port) {
 
                    return Err(MultipleOpsOnPort);
 
                }
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn sync(&mut self, timeout: Duration) -> Result<usize, SyncError> {
 
        use SyncError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication {
 
                round_index,
 
                neighborhood,
 
                native_batches,
 
                endpoint_manager,
 
                round_result,
 
                ..
 
            } => {
 
                let _deadline = Instant::now() + timeout;
 
                let deadline = Instant::now() + timeout;
 
                let logger: &mut dyn Logger = &mut *self.logger;
 
                // 1. run all proto components to Nonsync blockers
 
                log!(
 
                    logger,
 
                    "~~~ SYNC called with timeout {:?}; starting round {}",
 
                    &timeout,
 
@@ -186,14 +218,14 @@ impl Connector {
 
                log!(
 
                    logger,
 
                    "All {} proto components are now done with Nonsync phase",
 
                    branching_proto_components.len(),
 
                );
 

	
 
                // NOTE: all msgs in outbox are of form (Putter, Payload)
 
                let mut payload_outbox: Vec<(PortId, SendPayloadMsg)> = vec![];
 
                // NOTE: all msgs in outbox are of form (Getter, Payload)
 
                let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
                // create the solution storage
 
                let mut solution_storage = {
 
                    let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
                    let c = self
 
                        .proto_components
 
@@ -215,27 +247,31 @@ impl Connector {
 
                for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate()
 
                {
 
                    let predicate = {
 
                        let mut predicate = Predicate::default();
 
                        // assign trues
 
                        for &port in to_get.iter().chain(to_put.keys()) {
 
                            predicate.assigned.insert(port, true);
 
                            let var = self.port_info.firing_var_for(port);
 
                            predicate.assigned.insert(var, true);
 
                        }
 
                        // assign falses
 
                        for &port in self.native_ports.iter() {
 
                            predicate.assigned.entry(port).or_insert(false);
 
                            let var = self.port_info.firing_var_for(port);
 
                            predicate.assigned.entry(var).or_insert(false);
 
                        }
 
                        predicate
 
                    };
 
                    log!(logger, "Native branch {} has pred {:?}", index, &predicate);
 

	
 
                    // put all messages
 
                    for (port, payload) in to_put {
 
                        let msg = SendPayloadMsg { payload_predicate: predicate.clone(), payload };
 
                    for (putter, payload) in to_put {
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        log!(logger, "Native branch {} sending msg {:?}", index, &msg);
 
                        payload_outbox.push((port, msg));
 
                        // rely on invariant: sync batches respect port polarity
 
                        let getter = *self.port_info.peers.get(&putter).unwrap();
 
                        payloads_to_get.push((getter, msg));
 
                    }
 
                    if to_get.is_empty() {
 
                        log!(logger, "Native submitting trivial solution for index {}", index);
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            logger,
 
                            Route::LocalComponent(LocalComponentId::Native),
 
@@ -273,12 +309,13 @@ impl Connector {
 
                    std::mem::swap(unblocked_from, blocked);
 
                    while !unblocked_from.is_empty() {
 
                        for (mut predicate, mut branch) in unblocked_from.drain() {
 
                            let mut ctx = SyncProtoContext {
 
                                logger,
 
                                predicate: &predicate,
 
                                port_info: &self.port_info,
 
                                proto_component_id: *proto_component_id,
 
                                inbox: &branch.inbox,
 
                            };
 
                            let blocker = branch.state.sync_run(&mut ctx, &self.proto_description);
 
                            log!(
 
                                logger,
 
@@ -293,13 +330,14 @@ impl Connector {
 
                                    // branch is inconsistent. throw it away
 
                                    drop((predicate, branch));
 
                                }
 
                                B::SyncBlockEnd => {
 
                                    // make concrete all variables
 
                                    for &port in proto_component.ports.iter() {
 
                                        predicate.assigned.entry(port).or_insert(false);
 
                                        let var = self.port_info.firing_var_for(port);
 
                                        predicate.assigned.entry(var).or_insert(false);
 
                                    }
 
                                    // submit solution for this component
 
                                    solution_storage.submit_and_digest_subtree_solution(
 
                                        logger,
 
                                        Route::LocalComponent(LocalComponentId::Proto(
 
                                            *proto_component_id,
 
@@ -308,45 +346,50 @@ impl Connector {
 
                                    );
 
                                    // move to "blocked"
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntReadMsg(port) => {
 
                                    // move to "blocked"
 
                                    assert!(predicate.query(port).is_none());
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    assert!(predicate.query(var).is_none());
 
                                    assert!(!branch.inbox.contains_key(&port));
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntCheckFiring(port) => {
 
                                    // sanity check
 
                                    assert!(predicate.query(port).is_none());
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    assert!(predicate.query(var).is_none());
 
                                    // keep forks in "unblocked"
 
                                    unblocked_to.insert(
 
                                        predicate.clone().inserted(var, false),
 
                                        branch.clone(),
 
                                    );
 
                                    unblocked_to.insert(predicate.inserted(var, true), branch);
 
                                }
 
                                B::PutMsg(port, payload) => {
 
                                B::PutMsg(putter, payload) => {
 
                                    // sanity check
 
                                    assert_eq!(Some(&Putter), self.port_info.polarities.get(&port));
 
                                    assert_eq!(
 
                                        Some(&Putter),
 
                                        self.port_info.polarities.get(&putter)
 
                                    );
 
                                    // overwrite assignment
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    let var = self.port_info.firing_var_for(putter);
 

	
 
                                    let was = predicate.assigned.insert(var, true);
 
                                    if was == Some(false) {
 
                                        log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var);
 
                                        log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                                        // discard forever
 
                                        drop((predicate, branch));
 
                                    } else {
 
                                        // keep in "unblocked"
 
                                        log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, port, var);
 
                                        payload_outbox.push((
 
                                            port,
 
                                        let getter = *self.port_info.peers.get(&putter).unwrap();
 
                                        log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                                        payloads_to_get.push((
 
                                            getter,
 
                                            SendPayloadMsg {
 
                                                payload_predicate: predicate.clone(),
 
                                                predicate: predicate.clone(),
 
                                                payload,
 
                                            },
 
                                        ));
 
                                        unblocked_to.insert(predicate, branch);
 
                                    }
 
                                }
 
@@ -355,36 +398,173 @@ impl Connector {
 
                        std::mem::swap(unblocked_from, unblocked_to);
 
                    }
 
                }
 
                log!(logger, "All proto components are blocked");
 

	
 
                log!(logger, "Entering decision loop...");
 
                endpoint_manager.undelay_all();
 
                let decision = 'undecided: loop {
 
                    // check if we already have a solution
 
                    // drain payloads_to_get, sending them through endpoints / feeding them to components
 
                    while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
                        assert!(self.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                        match self.port_info.routes.get(&getter).unwrap() {
 
                            Route::Endpoint { index } => {
 
                                let msg = Msg::CommMsg(CommMsg {
 
                                    round_index: *round_index,
 
                                    contents: CommMsgContents::SendPayload(send_payload_msg),
 
                                });
 
                                endpoint_manager.send_to(*index, &msg).unwrap();
 
                            }
 
                            Route::LocalComponent(LocalComponentId::Native) => branching_native
 
                                .feed_msg(
 
                                    logger,
 
                                    &self.port_info,
 
                                    &mut solution_storage,
 
                                    getter,
 
                                    send_payload_msg,
 
                                ),
 
                            Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => {
 
                                if let Some(branching_component) =
 
                                    branching_proto_components.get_mut(&proto_component_id)
 
                                {
 
                                    branching_component.feed_msg(
 
                                        logger,
 
                                        &self.port_info,
 
                                        &mut solution_storage,
 
                                        getter,
 
                                        send_payload_msg,
 
                                    )
 
                                }
 
                            }
 
                        }
 
                    }
 

	
 
                    // check if we have a solution yet
 
                    log!(logger, "Check if we have any local decisions...");
 
                    for solution in solution_storage.iter_new_local_make_old() {
 
                        log!(logger, "New local decision with solution {:?}...", &solution);
 
                        match neighborhood.parent {
 
                            Some(parent) => {
 
                                log!(logger, "Forwarding to my parent {:?}", parent);
 
                                let suggestion = Decision::Success(solution);
 
                                let msg = Msg::CommMsg(CommMsg {
 
                                    round_index: *round_index,
 
                                    contents: CommMsgContents::Elaborate {
 
                                        partial_oracle: solution,
 
                                    },
 
                                    contents: CommMsgContents::Suggest { suggestion },
 
                                });
 
                                endpoint_manager.send_to(parent, &msg).unwrap();
 
                            }
 
                            None => {
 
                                log!(logger, "No parent. Deciding on solution {:?}", &solution);
 
                                break 'undecided Decision::Success(solution);
 
                            }
 
                        }
 
                    }
 

	
 
                    // TODO send / recv messages
 
                    // stuck! make progress by receiving a msg
 
                    // try recv messages arriving through endpoints
 
                    log!(logger, "No decision yet. Let's recv an endpoint msg...");
 
                    {
 
                        let (endpoint_index, msg) =
 
                            endpoint_manager.try_recv_any(deadline).unwrap();
 
                        log!(logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg);
 
                        let comm_msg_contents = match msg {
 
                            Msg::SetupMsg(..) => {
 
                                log!(logger, "Discarding setup message; that phase is over");
 
                                continue 'undecided;
 
                            }
 
                            Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(round_index) {
 
                                Ordering::Equal => comm_msg.contents,
 
                                Ordering::Less => {
 
                                    log!(
 
                                        logger,
 
                                        "We are in round {}, but msg is for round {}. Discard",
 
                                        comm_msg.round_index,
 
                                        round_index,
 
                                    );
 
                                    drop(comm_msg);
 
                                    continue 'undecided;
 
                                }
 
                                Ordering::Greater => {
 
                                    log!(
 
                                        logger,
 
                                        "We are in round {}, but msg is for round {}. Buffer",
 
                                        comm_msg.round_index,
 
                                        round_index,
 
                                    );
 
                                    endpoint_manager
 
                                        .delayed_messages
 
                                        .push((endpoint_index, Msg::CommMsg(comm_msg)));
 
                                    continue 'undecided;
 
                                }
 
                            },
 
                        };
 
                        match comm_msg_contents {
 
                            CommMsgContents::SendPayload(send_payload_msg) => {
 
                                let getter = endpoint_manager.endpoint_exts[endpoint_index]
 
                                    .getter_for_incoming;
 
                                assert!(self.port_info.polarities.get(&getter) == Some(&Getter));
 
                                log!(
 
                                    logger,
 
                                    "Msg routed to getter port {:?}. Buffer for recv loop",
 
                                    getter,
 
                                );
 
                                payloads_to_get.push((getter, send_payload_msg));
 
                            }
 
                            CommMsgContents::Suggest { suggestion } => {
 
                                // only accept this control msg through a child endpoint
 
                                if neighborhood.children.binary_search(&endpoint_index).is_ok() {
 
                                    match suggestion {
 
                                        Decision::Success(predicate) => {
 
                                            // child solution contributes to local solution
 
                                            log!(
 
                                                logger,
 
                                                "Child provided solution {:?}",
 
                                                &predicate
 
                                            );
 
                                            let route = Route::Endpoint { index: endpoint_index };
 
                                            solution_storage.submit_and_digest_subtree_solution(
 
                                                logger, route, predicate,
 
                                            );
 
                                        }
 
                                        Decision::Failure => match neighborhood.parent {
 
                                            None => {
 
                                                log!(
 
                                                    logger,
 
                                                    "As sink, I decide on my child's failure"
 
                                                );
 
                                                // I am the sink. Decide on failed
 
                                                break 'undecided Decision::Failure;
 
                                            }
 
                                            Some(parent) => {
 
                                                log!(logger, "Forwarding failure through my parent endpoint {:?}", parent);
 
                                                // I've got a parent. Forward the failure suggestion.
 
                                                let msg = Msg::CommMsg(CommMsg {
 
                                                    round_index: *round_index,
 
                                                    contents: CommMsgContents::Suggest {
 
                                                        suggestion,
 
                                                    },
 
                                                });
 
                                                endpoint_manager.send_to(parent, &msg).unwrap();
 
                                            }
 
                                        },
 
                                    }
 
                                } else {
 
                                    log!(logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, endpoint_index);
 
                                }
 
                            }
 
                            CommMsgContents::Announce { decision } => {
 
                                if Some(endpoint_index) == neighborhood.parent {
 
                                    // adopt this decision
 
                                    break 'undecided decision;
 
                                } else {
 
                                    log!(logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, endpoint_index);
 
                                }
 
                            }
 
                        }
 
                    }
 
                    log!(logger, "Endpoint msg recv done");
 
                };
 
                log!(logger, "Committing to decision {:?}!", &decision);
 

	
 
                // propagate the decision to children
 
                let msg = Msg::CommMsg(CommMsg {
 
                    round_index: *round_index,
 
@@ -423,23 +603,99 @@ impl Connector {
 
                returning
 
            }
 
        }
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        port_info: &PortInfo,
 
        solution_storage: &mut SolutionStorage,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        assert!(port_info.polarities.get(&getter).copied() == Some(Getter));
 
        println!("BEFORE {:#?}", &self.branches);
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            // check if this branch expects to receive it
 
            let var = port_info.firing_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    let route = Route::LocalComponent(LocalComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        logger,
 
                        route,
 
                        predicate.clone(),
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(true) {
 
                // optimization. Don't bother trying this branch
 
                finished.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate
 
                    let mut branch2 = branch.clone();
 
                    // original branch untouched
 
                    finished.insert(predicate, branch);
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    finished.insert(predicate2, branch2);
 
                }
 
                Csr::New(new_predicate) => {
 
                    // fork branch, give fork the message and the new predicate
 
                    let mut branch2 = branch.clone();
 
                    // original branch untouched
 
                    finished.insert(predicate, branch);
 
                    feed_branch(&mut branch2, &new_predicate);
 
                    finished.insert(new_predicate, branch2);
 
                }
 
            }
 
        }
 
        println!("AFTER {:#?}", &self.branches);
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap<PortId, Payload>) {
 
        for (branch_predicate, branch) in self.branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                return (index, gotten);
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn feed_msg(
 
        &mut self,
 
        _logger: &mut dyn Logger,
 
        _port_info: &PortInfo,
 
        _solution_storage: &mut SolutionStorage,
 
        _getter: PortId,
 
        _send_payload_msg: SendPayloadMsg,
 
    ) {
 
        todo!()
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
src/runtime/error.rs
Show inline comments
 
@@ -30,6 +30,11 @@ pub enum PortOpError {
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum GottenError {
 
    NoPreviousRound,
 
    PortDidntGet,
 
    PreviousSyncFailed,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum NextBatchError {
 
    NotConnected,
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -5,12 +5,16 @@ mod setup2;
 
#[cfg(test)]
 
mod my_tests;
 

	
 
use crate::common::*;
 
use error::*;
 

	
 
#[derive(
 
    Debug, Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct FiringVar(PortId);
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
pub enum LocalComponentId {
 
    Native,
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
@@ -45,19 +49,18 @@ pub struct CommMsg {
 
    pub round_index: usize,
 
    pub contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    Elaborate { partial_oracle: Predicate }, // SINKWARD
 
    Failure,                                 // SINKWARD
 
    Announce { decision: Decision },         // SINKAWAYS
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SendPayloadMsg {
 
    payload_predicate: Predicate,
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
pub enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
@@ -83,13 +86,13 @@ pub struct EndpointSetup {
 
    pub sock_addr: SocketAddr,
 
    pub is_active: bool,
 
}
 
#[derive(Debug)]
 
pub struct EndpointExt {
 
    endpoint: Endpoint,
 
    inp_for_emerging_msgs: PortId,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
pub struct Neighborhood {
 
    parent: Option<usize>,
 
    children: Vec<usize>, // ordered, deduplicated
 
}
 
@@ -148,13 +151,13 @@ pub enum ConnectorPhased {
 
    },
 
}
 
#[derive(Debug)]
 
pub struct StringLogger(ControllerId, String);
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub struct Predicate {
 
    pub assigned: BTreeMap<PortId, bool>,
 
    pub assigned: BTreeMap<FiringVar, bool>,
 
}
 
#[derive(Debug, Default)]
 
pub struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
@@ -172,12 +175,13 @@ pub struct NonsyncProtoContext<'a> {
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>,
 
}
 
pub struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    predicate: &'a Predicate,
 
    proto_component_id: ProtoComponentId,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 

	
 
// pub struct MonoPContext<'a> {
 
//     inner: &'a mut ControllerInner,
 
//     ports: &'a mut HashSet<PortId>,
 
@@ -225,17 +229,17 @@ pub struct SyncProtoContext<'a> {
 
//     gotten: HashMap<PortId, Payload>,
 
//     to_get: HashSet<PortId>,
 
// }
 

	
 
////////////////
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> PortId {
 
        match self.polarities.get(&port).unwrap() {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
        FiringVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        }
 
        })
 
    }
 
}
 
impl IdManager {
 
    fn new(controller_id: ControllerId) -> Self {
 
        Self {
 
            controller_id,
 
@@ -420,16 +424,15 @@ impl Connector {
 
//             let sols = &self.subtree_solutions[index];
 
//             f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?;
 
//         }
 
//         f.pad("]")
 
//     }
 
// }
 

	
 
impl Predicate {
 
    #[inline]
 
    pub fn inserted(mut self, k: PortId, v: bool) -> Self {
 
    pub fn inserted(mut self, k: FiringVar, v: bool) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn satisfies(&self, other: &Self) -> bool {
 
        let mut s_it = self.assigned.iter();
 
@@ -457,13 +460,13 @@ impl Predicate {
 
    /// such that n.satisfies(self) && n.satisfies(other).
 
    /// If none exists Nonexistant is returned.
 
    /// If the resulting predicate is equivlanet to self, other, or both,
 
    /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
    /// otherwise New(N) is returned.
 
    pub fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
        use CommonSatResult::*;
 
        use CommonSatResult as Csr;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // lists of assignments in self but not other and vice versa.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
@@ -488,49 +491,49 @@ impl Predicate {
 
                        // s is missing this element
 
                        o_not_s.push((oid, ob));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Nonexistant;
 
                        return Csr::Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Equivalent,       // ... equivalent to both.
 
            [false, true] => FormerNotLatter, // ... equivalent to self.
 
            [true, false] => LatterNotFormer, // ... equivalent to other.
 
            [true, true] => Csr::Equivalent,       // ... equivalent to both.
 
            [false, true] => Csr::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Csr::LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                New(new)
 
                Csr::New(new)
 
            }
 
        }
 
    }
 

	
 
    pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = PortId> + '_ {
 
        self.assigned
 
            .iter()
 
            .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    }
 
    // pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = FiringVar> + '_ {
 
    //     self.assigned
 
    //         .iter()
 
    //         .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    // }
 

	
 
    pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator<Item = PortId>, value: bool) {
 
        for channel_id in channel_ids {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
 
    }
 
    // pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator<Item = PortId>, value: bool) {
 
    //     for channel_id in channel_ids {
 
    //         self.assigned.entry(channel_id).or_insert(value);
 
    //     }
 
    // }
 
    // pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option<bool> {
 
    //     self.assigned.insert(channel_id, value)
 
    // }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
@@ -538,14 +541,14 @@ impl Predicate {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, x: PortId) -> Option<bool> {
 
        self.assigned.get(&x).copied()
 
    pub fn query(&self, var: FiringVar) -> Option<bool> {
 
        self.assigned.get(&var).copied()
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        struct MySet<'a>(&'a Predicate, bool);
 
        impl Debug for MySet<'_> {
src/runtime/my_tests.rs
Show inline comments
 
@@ -142,6 +142,70 @@ fn connected_gotten_err_ungotten() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
    assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn native_polarity_checks() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    // fail...
 
    c.get(o).unwrap_err();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
    // succeed..
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
}
 

	
 
#[test]
 
fn native_multiple_gets() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.get(i).unwrap();
 
    c.get(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn next_batch() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.next_batch().unwrap_err();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
}
 

	
 
#[test]
 
fn native_sync() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
}
 

	
 
#[test]
 
fn native_message_pass() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
src/runtime/setup2.rs
Show inline comments
 
@@ -93,13 +93,13 @@ impl Connector {
 
        match &mut self.phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(self.logger, "Call to connecting in connected state");
 
                Err(())
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(self.logger, "~~~ CONNECT called with timeout {:?}", timeout);
 
                log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = Instant::now() + timeout;
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *self.logger,
 
                    endpoint_setups,
 
                    &mut self.port_info,
 
@@ -263,18 +263,18 @@ fn new_endpoint_manager(
 
            }
 
        }
 
        events.clear();
 
    }
 
    let endpoint_exts = todos
 
        .into_iter()
 
        .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
 
        .map(|Todo { todo_endpoint, local_port, .. }| EndpointExt {
 
            endpoint: match todo_endpoint {
 
                TodoEndpoint::Endpoint(endpoint) => endpoint,
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
            },
 
            inp_for_emerging_msgs: recv_peer_port.unwrap(),
 
            getter_for_incoming: local_port,
 
        })
 
        .collect();
 
    Ok(EndpointManager {
 
        poll,
 
        events,
 
        polled_undrained,
0 comments (0 inline, 0 general)