Changeset - b20c3a55156d
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-06-24 15:44:18
christopher.esterhuyse@gmail.com
dummy scatter/gather wave in
6 files changed with 222 insertions and 78 deletions:
0 comments (0 inline, 0 general)
src/lib.rs
Show inline comments
 
#[macro_use]
 
mod macros;
 

	
 
mod common;
 
mod protocol;
 
mod runtime;
 

	
 
// #[cfg(test)]
 
// mod test;
 

	
 
pub use common::{ConnectorId, EndpointPolarity, Polarity, PortId};
 
pub use protocol::ProtocolDescription;
 
pub use runtime::{error, Connector, FileLogger, VecLogger};
 
pub use runtime::{error, Connector, DummyLogger, FileLogger, VecLogger};
 

	
 
// #[cfg(feature = "ffi")]
 
// pub use runtime::ffi;
src/runtime/communication.rs
Show inline comments
 
@@ -16,48 +16,51 @@ struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as Route -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<Route, usize>,
 
}
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
}
 
struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait PayloadMsgSender {
 
    fn send(&mut self, port_info: &PortInfo, putter: &PortId, msg: SendPayloadMsg);
 
}
 

	
 
////////////////
 
impl Connector {
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError::*;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NoPreviousRound),
 
            ConnectorPhased::Communication(ConnectorCommunication { round_result, .. }) => {
 
                match round_result {
 
                    Err(_) => Err(PreviousSyncFailed),
 
                    Ok(None) => Err(NoPreviousRound),
 
                    Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet),
 
                }
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        if Putter != *unphased.port_info.polarities.get(&port).unwrap() {
 
@@ -101,64 +104,65 @@ impl Connector {
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                let batch = native_batches.last_mut().unwrap();
 
                if !batch.to_get.insert(port) {
 
                    return Err(MultipleOpsOnPort);
 
                }
 
                Ok(())
 
            }
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        let Self { unphased, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                comm.round_result = Self::connected_sync(unphased, comm, timeout);
 
                match &comm.round_result {
 
                    Ok(None) => unreachable!(),
 
                    Ok(Some(ok_result)) => Ok(ok_result.batch_index),
 
                    Err(sync_error) => Err(sync_error.clone()),
 
                }
 
            }
 
        }
 
    }
 

	
 
    // TODO make cu immutable
 
    // private function. mutates state but returns with round
 
    // result ASAP (allows for convenient error return with ?)
 
    fn connected_sync(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        use SyncError as Se;
 
        let mut deadline = timeout.map(|to| Instant::now() + to);
 
        // 1. run all proto components to Nonsync blockers
 
        log!(
 
            cu.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
            cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
        log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len());
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            // TODO coalesce fields
 
            log!(
 
                cu.logger,
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let mut ctx = NonsyncProtoContext {
 
                logger: &mut *cu.logger,
 
                port_info: &mut cu.port_info,
 
                id_manager: &mut cu.id_manager,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
                proto_component_ports: &mut cu
 
                    .proto_components
 
                    .get_mut(&proto_component_id)
 
                    .unwrap()
 
                    .ports,
 
@@ -241,49 +245,48 @@ impl Connector {
 
                    &predicate
 
                );
 
                solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.logger,
 
                    Route::LocalComponent(LocalComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                // TODO
 
                return Err(Se::IndistinguishableBatches([index, existing.index]));
 
            }
 
        }
 
        log!(cu.logger, "Done translating native batches into branches");
 
        comm.native_batches.push(Default::default());
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let ConnectorUnphased { port_info, proto_description, .. } = cu;
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let mut swap = HashMap::default();
 
            let mut blocked = HashMap::default();
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
            BranchingProtoComponent::drain_branches_to_blocked(
 
                cd,
 
                cu,
 
                &mut solution_storage,
 
                &mut payloads_to_get,
 
                proto_component_id,
 
                ports,
 
            );
 
            // swap the blocked branches back
 
            std::mem::swap(&mut blocked, branches);
 
        }
 
        log!(cu.logger, "All proto components are blocked");
 

	
 
        log!(cu.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        let decision = 'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
@@ -494,49 +497,48 @@ impl Connector {
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to(child, &msg).unwrap();
 
        }
 

	
 
        match decision {
 
            Decision::Failure => Err(Se::RoundFailure),
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                Ok(Some(branching_native.collapse_with(&predicate)))
 
            }
 
        }
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        // payloads_to_get: &mut Vec<(PortId, CommMsgContents)>,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = cu.port_info.firing_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    let route = Route::LocalComponent(LocalComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        route,
 
                        predicate.clone(),
 
                    );
 
                }
 
@@ -596,49 +598,49 @@ impl BranchingNative {
 
                }
 
            }
 
        }
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> RoundOk {
 
        for (branch_predicate, branch) in self.branches {
 
            if solution_predicate.satisfies(&branch_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                return RoundOk { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 

	
 
// |putter, m| {
 
//     let getter = *cu.port_info.peers.get(&putter).unwrap();
 
//     payloads_to_get.push((getter, m));
 
// },
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        payloads_to_get: &mut Vec<(PortId, SendPayloadMsg)>,
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) {
 
        cd.cylic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                    logger: &mut *cu.logger,
 
                    predicate: &predicate,
 
                    port_info: &cu.port_info,
 
                    inbox: &branch.inbox,
 
                };
 
                let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
                log!(
 
                    cu.logger,
 
                    "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                    proto_component_id,
 
                    &predicate,
 
                    &blocker,
 
                );
 
                use SyncBlocker as B;
 
                match blocker {
 
                    B::Inconsistent => {
 
                        // branch is inconsistent. throw it away
 
                        drop((predicate, branch));
 
                    }
 
@@ -662,63 +664,62 @@ impl BranchingProtoComponent {
 
                        assert!(!branch.inbox.contains_key(&port));
 
                        drainer.add_output(predicate, branch);
 
                    }
 
                    B::CouldntCheckFiring(port) => {
 
                        // sanity check
 
                        let var = cu.port_info.firing_var_for(port);
 
                        assert!(predicate.query(var).is_none());
 
                        // keep forks in "unblocked"
 
                        drainer.add_input(predicate.clone().inserted(var, false), branch.clone());
 
                        drainer.add_input(predicate.inserted(var, true), branch);
 
                    }
 
                    B::PutMsg(putter, payload) => {
 
                        // sanity check
 
                        assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                        // overwrite assignment
 
                        let var = cu.port_info.firing_var_for(putter);
 
                        let was = predicate.assigned.insert(var, true);
 
                        if was == Some(false) {
 
                            log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                            // discard forever
 
                            drop((predicate, branch));
 
                        } else {
 
                            // keep in "unblocked"
 
                            log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                            let getter = *cu.port_info.peers.get(&putter).unwrap();
 
                            let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
    payloads_to_get.push((getter, msg));
 
                            payload_msg_sender.send(&cu.port_info, &putter, msg);
 
                            drainer.add_input(predicate, branch);
 
                        }
 
                    }
 
                }
 
        });
 
    }
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        proto_component_id: ProtoComponentId,
 
        payloads_to_get: &mut Vec<(PortId, SendPayloadMsg)>,
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        let logger = &mut *cu.logger;
 
        log!(
 
            logger,
 
            "feeding proto component {:?} getter {:?} {:?}",
 
            proto_component_id,
 
            getter,
 
            &send_payload_msg
 
        );
 
        let BranchingProtoComponent { branches, ports } = self;
 
        let mut unblocked = HashMap::default();
 
        let mut blocked = HashMap::default();
 
        // partition drain from branches -> {unblocked, blocked}
 
        log!(logger, "visiting {} blocked branches...", branches.len());
 
        for (predicate, mut branch) in branches.drain() {
 
            use CommonSatResult as Csr;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "skipping branch");
 
                    blocked.insert(predicate, branch);
 
@@ -735,49 +736,49 @@ impl BranchingProtoComponent {
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
                Csr::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    log!(logger, "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
            }
 
        }
 
        log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let mut swap = HashMap::default();
 
        let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked);
 
        BranchingProtoComponent::drain_branches_to_blocked(
 
            cd,
 
            cu,
 
            solution_storage,
 
            payloads_to_get,
 
            payload_msg_sender,
 
            proto_component_id,
 
            ports,
 
        );
 
        // swap the blocked branches back
 
        std::mem::swap(&mut blocked, branches);
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(routes: impl Iterator<Item = Route>) -> Self {
 
@@ -785,49 +786,49 @@ impl SolutionStorage {
 
        let mut subtree_solutions = vec![];
 
        for key in routes {
 
            subtree_id_to_index.insert(key, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
 
    }
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
    }
 
    pub(crate) fn reset(&mut self, subtree_ids: impl Iterator<Item = Route>) {
 
    fn reset(&mut self, subtree_ids: impl Iterator<Item = Route>) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
        for key in subtree_ids {
 
            self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
            self.subtree_solutions.push(Default::default())
 
        }
 
    }
 
    // pub(crate) fn peek_new_locals(&self) -> impl Iterator<Item = &Predicate> + '_ {
 
    //     self.new_local.iter()
 
    // }
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            local
 
        })
 
    }
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        subtree_id: Route,
 
        predicate: Predicate,
 
@@ -859,54 +860,60 @@ impl SolutionStorage {
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep traversing
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        logger,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> {
 
    fn send(&mut self, port_info: &PortInfo, putter: &PortId, msg: SendPayloadMsg) {
 
        let getter = *port_info.peers.get(putter).unwrap();
 
        self.push((getter, msg));
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.port_info.firing_var_for(port);
 
        self.predicate.query(var)
 
    }
 
    pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.inbox.get(&port)
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            &state,
 
            &moved_ports
 
        );
 
        assert!(self.proto_component_ports.is_subset(&moved_ports));
 
        // 2. remove ports from old component & update port->route
src/runtime/endpoints.rs
Show inline comments
 
use super::*;
 

	
 
struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 
#[derive(Debug)]
 
enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
}
 

	
 
/////////////////////
 

	
 
fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl Endpoint {
 
    pub fn try_recv<T: serde::de::DeserializeOwned>(
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
        // populate inbox as much as possible
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            endptlog!(logger, "Stream read to end {:?}", &res);
 
            match res {
 
                Err(e) if would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(BrokenEndpoint),
 
            }
 
        }
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        match bincode::deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
                }
 
                _ => Err(MalformedMessage),
 
                // println!("SERDE ERRKIND {:?}", e);
 
                // Err(MalformedMessage)
 
            },
 
        }
 
    }
 
    pub fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), EndpointError> {
 
    pub(super) fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), EndpointError> {
 
        bincode::serialize_into(&mut self.stream, msg).map_err(|_| EndpointError::BrokenEndpoint)
 
    }
 
}
 

	
 
impl EndpointManager {
 
    pub fn index_iter(&self) -> Range<usize> {
 
    pub(super) fn index_iter(&self) -> Range<usize> {
 
        0..self.num_endpoints()
 
    }
 
    pub fn num_endpoints(&self) -> usize {
 
    pub(super) fn num_endpoints(&self) -> usize {
 
        self.endpoint_exts.len()
 
    }
 
    pub fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|err| {
 
            ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 
    pub fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), EndpointError> {
 
    pub(super) fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), EndpointError> {
 
        self.endpoint_exts[index].endpoint.send(msg)
 
    }
 
    pub fn try_recv_any_comms(
 
    pub(super) fn try_recv_any_comms(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Option<Instant>,
 
    ) -> Result<Option<(usize, Msg)>, SyncError> {
 
        use {SyncError as Se, TryRecyAnyError as Trae};
 
        match self.try_recv_any(logger, deadline) {
 
            Ok(tup) => Ok(Some(tup)),
 
            Err(Trae::Timeout) => Ok(None),
 
            Err(Trae::PollFailed) => Err(Se::PollFailed),
 
            Err(Trae::EndpointError { error: _, index }) => Err(Se::BrokenEndpoint(index)),
 
        }
 
    }
 
    pub fn try_recv_any_setup(
 
    pub(super) fn try_recv_any_setup(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Option<Instant>,
 
    ) -> Result<(usize, Msg), ConnectError> {
 
        use {ConnectError as Ce, TryRecyAnyError as Trae};
 
        self.try_recv_any(logger, deadline).map_err(|err| match err {
 
            Trae::Timeout => Ce::Timeout,
 
            Trae::PollFailed => Ce::PollFailed,
 
            Trae::EndpointError { error, index } => Ce::EndpointSetupError(
 
                self.endpoint_exts[index].endpoint.stream.local_addr().unwrap(),
 
                error,
 
            ),
 
        })
 
    }
 
    fn try_recv_any(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Option<Instant>,
 
    ) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError as Trea;
 
        // 1. try messages already buffered
 
        if let Some(x) = self.undelayed_messages.pop() {
 
            endptlog!(logger, "RECV undelayed_msg {:?}", &x);
 
            return Ok(x);
 
        }
 
        loop {
 
            // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained
 
            while let Some(index) = self.polled_undrained.pop() {
 
                let endpoint = &mut self.endpoint_exts[index].endpoint;
 
                if let Some(msg) = endpoint
 
                    .try_recv(logger)
 
                    .map_err(|error| Trea::EndpointError { error, index })?
 
                {
 
                    endptlog!(logger, "RECV polled_undrained {:?}", &msg);
 
                    // if !endpoint.inbox.is_empty() {
 
                    // there may be another message waiting!
 
                    self.polled_undrained.insert(index);
 
                    // }
 
                    if !endpoint.inbox.is_empty() {
 
                        // there may be another message waiting!
 
                        self.polled_undrained.insert(index);
 
                    }
 
                    return Ok((index, msg));
 
                }
 
            }
 
            // 3. No message yet. Do we have enough time to poll?
 
            let remaining = if let Some(deadline) = deadline {
 
                Some(deadline.checked_duration_since(Instant::now()).ok_or(Trea::Timeout)?)
 
            } else {
 
                None
 
            };
 
            self.poll.poll(&mut self.events, remaining).map_err(|_| Trea::PollFailed)?;
 
            for event in self.events.iter() {
 
                let Token(index) = event.token();
 
                self.polled_undrained.insert(index);
 
                endptlog!(
 
                    logger,
 
                    "RECV poll event {:?} for endpoint index {:?}. undrained: {:?}",
 
                    &event,
 
                    index,
 
                    self.polled_undrained.iter()
 
                );
 
                if event.is_error() {
 
                    return Err(Trea::EndpointError {
 
                        error: EndpointError::BrokenEndpoint,
 
                        index,
 
                    });
 
                }
 
            }
 
            self.events.clear();
 
        }
 
    }
 
    pub fn undelay_all(&mut self) {
 
    pub(super) fn undelay_all(&mut self) {
 
        if self.undelayed_messages.is_empty() {
 
            // fast path
 
            std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages);
 
            return;
 
        }
 
        // slow path
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 
}
 
impl Debug for Endpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
    }
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
    }
 
}
 
impl<R: Read> MonitoredReader<R> {
 
    pub fn bytes_read(&self) -> usize {
 
    pub(super) fn bytes_read(&self) -> usize {
 
        self.bytes
 
    }
 
}
 
impl<R: Read> Read for MonitoredReader<R> {
 
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 
        let n = self.r.read(buf)?;
 
        self.bytes += n;
 
        Ok(n)
 
    }
 
}
 

	
 
impl Into<Msg> for SetupMsg {
 
    fn into(self) -> Msg {
 
        Msg::SetupMsg(self)
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -30,49 +30,55 @@ pub enum Route {
 
    LocalComponent(LocalComponentId),
 
    Endpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub enum Decision {
 
    Failure,
 
    Success(Predicate),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SessionInfo {}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct CommMsg {
 
    pub round_index: usize,
 
    pub contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SendPayloadMsg {
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
pub enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 
@@ -171,49 +177,49 @@ pub struct Predicate {
 
    pub assigned: BTreeMap<FiringVar, bool>,
 
}
 
#[derive(Debug, Default)]
 
pub struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
pub struct NonsyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    proto_component_id: ProtoComponentId,
 
    port_info: &'a mut PortInfo,
 
    id_manager: &'a mut IdManager,
 
    proto_component_ports: &'a mut HashSet<PortId>,
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>,
 
}
 
pub struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    predicate: &'a Predicate,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 
////////////////
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn iter(&self) -> impl Iterator<Item = &T> {
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    fn new(mut vec: Vec<T>) -> Self {
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
    }
 
}
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
        FiringVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        })
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            connector_id,
 
            port_suffix_stream: Default::default(),
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 
use std::io::ErrorKind::WouldBlock;
 

	
 
impl Connector {
 
    pub fn new_simple(
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
    ) -> Self {
 
        let logger = Box::new(DummyLogger);
 
        // let logger = Box::new(DummyLogger);
 
        let surplus_sockets = 2;
 
        Self::new(logger, proto_description, connector_id, surplus_sockets)
 
    }
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
        surplus_sockets: u16,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        Self {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                logger,
 
                id_manager: IdManager::new(connector_id),
 
                native_ports: Default::default(),
 
                port_info: Default::default(),
 
            },
 
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
 
        }
 
    }
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
@@ -40,87 +31,88 @@ impl Connector {
 
        let Self { unphased: up, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity };
 
                let p = up.id_manager.new_port_id();
 
                up.native_ports.insert(p);
 
                // {polarity, route} known. {peer} unknown.
 
                up.port_info.polarities.insert(p, polarity);
 
                up.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native));
 
                log!(
 
                    up.logger,
 
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
 
                    p,
 
                    polarity,
 
                    &endpoint_setup
 
                );
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError::*;
 
        let Self { unphased: up, phased } = self;
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(up.logger, "Call to connecting in connected state");
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(up.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *up.logger,
 
                    &mut *cu.logger,
 
                    endpoint_setups,
 
                    &mut up.port_info,
 
                    &mut cu.port_info,
 
                    deadline,
 
                )?;
 
                log!(
 
                    up.logger,
 
                    cu.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    up.id_manager.connector_id,
 
                    &mut *up.logger,
 
                    cu.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    deadline,
 
                )?;
 
                log!(up.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                log!(up.logger, "connect() finished. setup phase complete");
 
                // TODO session optimization goes here
 
                self.phased = ConnectorPhased::Communication(ConnectorCommunication {
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                });
 
                };
 
                session_optimize(cu, &mut comm, deadline)?;
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                self.phased = ConnectorPhased::Communication(comm);
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 

	
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    port_info: &mut PortInfo,
 
    deadline: Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use ConnectError::*;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    enum TodoEndpoint {
 
        Listener(TcpListener),
 
        Endpoint(Endpoint),
 
@@ -276,50 +268,50 @@ fn new_endpoint_manager(
 
                        .unwrap();
 
                    endpoint
 
                }
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
            },
 
            getter_for_incoming: local_port,
 
        })
 
        .collect();
 
    Ok(EndpointManager {
 
        poll,
 
        events,
 
        polled_undrained,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        endpoint_exts,
 
    })
 
}
 

	
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*};
 
    ////////////////////////////////
 
    use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*};
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
 
    fn do_wave(
 
        em: &mut EndpointManager,
 
        awaiting: &mut HashSet<usize>,
 
        ws: &WaveState,
 
    ) -> Result<(), ConnectError> {
 
        awaiting.clear();
 
        let msg = S(LeaderWave { wave_leader: ws.leader });
 
        for index in em.index_iter() {
 
            if Some(index) != ws.parent {
 
                em.send_to_setup(index, &msg)?;
 
                awaiting.insert(index);
 
            }
 
        }
 
        Ok(())
 
    }
 
    ///////////////////////
 
    /*
 
    Conceptually, we have two distinct disstributed algorithms back-to-back
 
    1. Leader election using echo algorithm with extinction.
 
@@ -395,89 +387,232 @@ fn init_neighborhood(
 
                                logger,
 
                                "Wave reply from index {:?} for leader {:?}. Now awaiting {} replies",
 
                                recv_index,
 
                                best_wave.leader,
 
                                awaiting.len()
 
                            );
 
                            if awaiting.is_empty() {
 
                                if let Some(parent) = best_wave.parent {
 
                                    log!(
 
                                        logger,
 
                                        "Sub-wave done! replying to parent {:?} msg {:?}",
 
                                        parent,
 
                                        &msg
 
                                    );
 
                                    em.send_to_setup(parent, &msg)?;
 
                                } else {
 
                                    let election_result: WaveState = best_wave;
 
                                    log!(logger, "Election won! Result {:?}", &election_result);
 
                                    break 'election election_result;
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 
                S(YouAreMyParent) | S(MyPortInfo(_)) => unreachable!(),
 
                comm_msg @ Msg::CommMsg { .. } => {
 
                    log!(logger, "delaying msg {:?} during election algorithm", comm_msg);
 
                    em.delayed_messages.push((recv_index, comm_msg));
 
                msg @ S(YouAreMyParent) | msg @ S(MyPortInfo(_)) => {
 
                    log!(logger, "Endpont {:?} sent unexpected msg! {:?}", recv_index, &msg);
 
                    return Err(SetupAlgMisbehavior);
 
                }
 
                msg @ S(SessionScatter { .. })
 
                | msg @ S(SessionGather { .. })
 
                | msg @ Msg::CommMsg { .. } => {
 
                    log!(logger, "delaying msg {:?} during election algorithm", msg);
 
                    em.delayed_messages.push((recv_index, msg));
 
                }
 
            }
 
        }
 
    };
 

	
 
    // starting algorithm 2. Send a message to every neighbor
 
    log!(logger, "Starting tree construction. Step 1: send one msg per neighbor");
 
    awaiting.clear();
 
    for index in em.index_iter() {
 
        if Some(index) == election_result.parent {
 
            em.send_to_setup(index, &S(YouAreMyParent))?;
 
        } else {
 
            awaiting.insert(index);
 
            em.send_to_setup(index, &S(LeaderAnnounce { tree_leader: election_result.leader }))?;
 
        }
 
    }
 
    let mut children = vec![];
 
    em.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(logger, "Tree construction_loop loop. awaiting {:?}...", awaiting.iter());
 
        let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
        log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(LeaderWave { .. }) => { /* old message */ }
 
            S(LeaderAnnounce { .. }) => {
 
                // not a child
 
                log!(
 
                    logger,
 
                    "Got reply from non-child index {:?}. Children: {:?}",
 
                    recv_index,
 
                    children.iter()
 
                );
 
                if !awaiting.remove(&recv_index) {
 
                    return Err(SetupAlgMisbehavior);
 
                }
 
            }
 
            S(YouAreMyParent) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        logger,
 
                        "Got reply from child index {:?}. Children before... {:?}",
 
                        recv_index,
 
                        children.iter()
 
                    );
 
                    return Err(SetupAlgMisbehavior);
 
                }
 
                children.push(recv_index);
 
            }
 
            S(MyPortInfo(_)) => unreachable!(),
 
            comm_msg @ Msg::CommMsg { .. } => {
 
                log!(logger, "delaying msg {:?} during election algorithm", comm_msg);
 
                em.delayed_messages.push((recv_index, comm_msg));
 
            msg @ S(MyPortInfo(_)) | msg @ S(LeaderWave { .. }) => {
 
                log!(logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(SessionScatter { .. })
 
            | msg @ S(SessionGather { .. })
 
            | msg @ Msg::CommMsg { .. } => {
 
                log!(logger, "delaying msg {:?} during election", msg);
 
                em.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    children.shrink_to_fit();
 
    let neighborhood =
 
        Neighborhood { parent: election_result.parent, children: VecSet::new(children) };
 
    log!(logger, "Neighborhood constructed {:?}", &neighborhood);
 
    Ok(neighborhood)
 
}
 

	
 
fn session_optimize(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    deadline: Option<Instant>,
 
) -> Result<(), ConnectError> {
 
    ////////////////////////////////////////
 
    use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*};
 
    ////////////////////////////////////////
 
    log!(cu.logger, "Beginning session optimization");
 
    // populate session_info_map from a message per child
 
    let mut unoptimized_map: HashMap<ConnectorId, SessionInfo> = Default::default();
 
    let mut awaiting: HashSet<usize> = comm.neighborhood.children.iter().copied().collect();
 
    comm.endpoint_manager.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(
 
            cu.logger,
 
            "Session gather loop. awaiting info from children {:?}...",
 
            awaiting.iter()
 
        );
 
        let (recv_index, msg) =
 
            comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
        log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(SessionGather { unoptimized_map: child_unoptimized_map }) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        cu.logger,
 
                        "Wasn't expecting session info from {:?}. Got {:?}",
 
                        recv_index,
 
                        &child_unoptimized_map
 
                    );
 
                    return Err(SetupAlgMisbehavior);
 
                }
 
                unoptimized_map.extend(child_unoptimized_map.into_iter());
 
            }
 
            msg @ S(YouAreMyParent)
 
            | msg @ S(MyPortInfo(..))
 
            | msg @ S(LeaderAnnounce { .. })
 
            | msg @ S(LeaderWave { .. }) => {
 
                log!(cu.logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(SessionScatter { .. }) => {
 
                log!(
 
                    cu.logger,
 
                    "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!",
 
                    recv_index,
 
                    &msg
 
                );
 
                return Err(SetupAlgMisbehavior);
 
            }
 
            msg @ Msg::CommMsg(..) => {
 
                log!(cu.logger, "delaying msg {:?} during session optimization", msg);
 
                comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    log!(
 
        cu.logger,
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    let my_session_info = SessionInfo {};
 
    unoptimized_map.insert(cu.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 

	
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
        log!(cu.logger, "Forwarding gathered info to parent {:?}", parent);
 
        let msg = S(SessionGather { unoptimized_map });
 
        comm.endpoint_manager.send_to_setup(parent, &msg)?;
 
        'scatter_loop: loop {
 
            log!(
 
                cu.logger,
 
                "Session scatter recv loop. awaiting info from children {:?}...",
 
                awaiting.iter()
 
            );
 
            let (recv_index, msg) =
 
                comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
            log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(SessionScatter { optimized_map }) => {
 
                    if recv_index != parent {
 
                        log!(cu.logger, "I expected the scatter from my parent only!");
 
                        return Err(SetupAlgMisbehavior);
 
                    }
 
                    break 'scatter_loop optimized_map;
 
                }
 
                msg @ Msg::CommMsg { .. } => {
 
                    log!(cu.logger, "delaying msg {:?} during scatter recv", msg);
 
                    comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
                }
 
                msg @ S(SessionGather { .. })
 
                | msg @ S(YouAreMyParent)
 
                | msg @ S(MyPortInfo(..))
 
                | msg @ S(LeaderAnnounce { .. })
 
                | msg @ S(LeaderWave { .. }) => {
 
                    log!(cu.logger, "discarding old message {:?} during election", msg);
 
                }
 
            }
 
        }
 
    } else {
 
        // by computing it myself
 
        log!(cu.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(unoptimized_map)?
 
    };
 
    log!(
 
        cu.logger,
 
        "Optimized info map is {:?}. Sending to children {:?}",
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    let optimized_info =
 
        optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    let msg = S(SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    apply_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.logger, "Session optimization complete");
 
    Ok(())
 
}
 
fn leader_session_map_optimize(
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    Ok(unoptimized_map)
 
}
 
fn apply_optimizations(
 
    _cu: &mut ConnectorUnphased,
 
    _comm: &mut ConnectorCommunication,
 
    _session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    Ok(())
 
}
src/runtime/tests.rs
Show inline comments
 
@@ -13,49 +13,49 @@ fn next_test_addr() -> SocketAddr {
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 

	
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> =
 
        { Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap()) };
 
}
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    let _ = std::fs::create_dir(dir_path); // we will check failure soon
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).unwrap();
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
}
 

	
 
//////////////////////////////////////////
 

	
 
#[test]
 
fn basic_connector() {
 
    Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0, 0);
 
}
 

	
 
#[test]
 
fn basic_logged_connector() {
 
    let test_log_path = Path::new("./logs/basic_logged_connector");
 
    file_logged_connector(0, test_log_path);
 
}
 

	
 
#[test]
 
fn new_port_pair() {
 
    let test_log_path = Path::new("./logs/new_port_pair");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, _] = c.new_port_pair();
 
    let [_, _] = c.new_port_pair();
 
}
 

	
 
#[test]
 
fn new_sync() {
 
    let test_log_path = Path::new("./logs/new_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.add_component(b"sync", &[i, o]).unwrap();
 
}
 

	
 
@@ -65,51 +65,51 @@ fn new_net_port() {
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let sock_addr = next_test_addr();
 
    let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let test_log_path = Path::new("./logs/trivial_connect");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let sock_addr = next_test_addr();
 
    let test_log_path = Path::new("./logs/single_node_connect");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _ = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
}
 

	
 
#[test]
 
fn multithreaded_connect() {
 
fn minimal_net_connect() {
 
    let sock_addr = next_test_addr();
 
    let test_log_path = Path::new("./logs/multithreaded_connect");
 
    let test_log_path = Path::new("./logs/minimal_net_connect");
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let _ = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn put_no_sync() {
 
    let test_log_path = Path::new("./logs/put_no_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
}
 

	
 
@@ -257,108 +257,110 @@ fn connector_pair_nondet() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.next_batch().unwrap();
 
            c.get(g).unwrap();
 
            assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap());
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn cannot_use_moved_ports() {
 
    let test_log_path = Path::new("./logs/cannot_use_moved_ports"); /*
 
                                                                    native p|-->|g sync
 
                                                                    */
 
    /*
 
    native p|-->|g sync
 
    */
 
    let test_log_path = Path::new("./logs/cannot_use_moved_ports");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.add_component(b"sync", &[g, p]).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p, (b"hello" as &[_]).into()).unwrap_err();
 
    c.get(g).unwrap_err();
 
}
 

	
 
#[test]
 
fn sync_sync() {
 
    let test_log_path = Path::new("./logs/sync_sync"); /*
 
                                                       native p0|-->|g0 sync
 
                                                              g1|<--|p1
 
                                                       */
 
    /*
 
    native p0|-->|g0 sync
 
           g1|<--|p1
 
    */
 
    let test_log_path = Path::new("./logs/sync_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"sync", &[g0, p1]).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p0, (b"hello" as &[_]).into()).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    c.gotten(g1).unwrap();
 
}
 

	
 
#[test]
 
fn double_net_connect() {
 
    let test_log_path = Path::new("./logs/double_net_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [_p, _g] = [
 
                c.new_net_port(Putter, sock_addrs[0], Active).unwrap(),
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [_g, _p] = [
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
 
                c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(),
 
            ];
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn distributed_msg_bounce() {
 
    let test_log_path = Path::new("./logs/distributed_msg_bounce");
 
    /*
 
    native[0] | sync 0.p|-->|1.p native[1]
 
                     0.g|<--|1.g
 
    */
 
    let test_log_path = Path::new("./logs/distributed_msg_bounce");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            /*
 
            native | sync p|-->
 
                   |      g|<--
 
            */
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p, g] = [
 
                c.new_net_port(Putter, sock_addrs[0], Active).unwrap(),
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.add_component(b"sync", &[g, p]).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            /*
 
            native p|-->
 
                   g|<--
 
            */
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [g, p] = [
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
0 comments (0 inline, 0 general)