Changeset - 842acacee86d
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-06-23 12:03:21
christopher.esterhuyse@gmail.com
fleshing out native <-> native communication phase
5 files changed with 384 insertions and 56 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
use super::*;
 
use crate::common::*;
 

	
 
////////////////
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as Route -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<Route, usize>,
 
}
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Clone)]
 
struct ProtoComponentBranch {
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
}
 

	
 
////////////////
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            &state,
 
            &moved_ports
 
        );
 
        assert!(self.proto_component_ports.is_subset(&moved_ports));
 
        // 2. remove ports from old component & update port->route
 
        let new_id = self.id_manager.new_proto_component_id();
 
        for port in moved_ports.iter() {
 
            self.proto_component_ports.remove(port);
 
            self.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
        }
 
        // 3. create a new component
 
        self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports }));
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()];
 
        self.proto_component_ports.insert(o);
 
        self.proto_component_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.port_info.polarities.insert(o, Putter);
 
        self.port_info.polarities.insert(i, Getter);
 
        self.port_info.peers.insert(o, i);
 
        self.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id));
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(
 
            self.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
 
            o,
 
            i
 
        );
 
        [o, i]
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        self.predicate.query(port)
 
        let var = self.port_info.firing_var_for(port);
 
        self.predicate.query(var)
 
    }
 
    pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.inbox.get(&port)
 
    }
 
}
 

	
 
impl Connector {
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NoPreviousRound),
 
            ConnectorPhased::Communication { round_result, .. } => match round_result {
 
                Err(_) => Err(PreviousSyncFailed),
 
                Ok(None) => Err(NoPreviousRound),
 
                Ok(Some((_index, gotten))) => gotten.get(&port).ok_or(PortDidntGet),
 
            },
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        if !self.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        if Putter != *self.port_info.polarities.get(&port).unwrap() {
 
            return Err(WrongPolarity);
 
        }
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
                let batch = native_batches.last_mut().unwrap();
 
                if batch.to_put.contains_key(&port) {
 
                    return Err(MultipleOpsOnPort);
 
                }
 
                batch.to_put.insert(port, payload);
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
        // returns index of new batch
 
        use NextBatchError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
                native_batches.push(Default::default());
 
                Ok(native_batches.len() - 1)
 
            }
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        if !self.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        if Getter != *self.port_info.polarities.get(&port).unwrap() {
 
            return Err(WrongPolarity);
 
        }
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication { native_batches, .. } => {
 
                let batch = native_batches.last_mut().unwrap();
 
                if !batch.to_get.insert(port) {
 
                    return Err(MultipleOpsOnPort);
 
                }
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn sync(&mut self, timeout: Duration) -> Result<usize, SyncError> {
 
        use SyncError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication {
 
                round_index,
 
                neighborhood,
 
                native_batches,
 
                endpoint_manager,
 
                round_result,
 
                ..
 
            } => {
 
                let _deadline = Instant::now() + timeout;
 
                let deadline = Instant::now() + timeout;
 
                let logger: &mut dyn Logger = &mut *self.logger;
 
                // 1. run all proto components to Nonsync blockers
 
                log!(
 
                    logger,
 
                    "~~~ SYNC called with timeout {:?}; starting round {}",
 
                    &timeout,
 
                    round_index
 
                );
 
                let mut branching_proto_components =
 
                    HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
                let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
                    self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
                log!(logger, "Nonsync running {} proto components...", unrun_components.len());
 
                while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
                    // TODO coalesce fields
 
                    log!(
 
                        logger,
 
                        "Nonsync running proto component with ID {:?}. {} to go after this",
 
                        proto_component_id,
 
                        unrun_components.len()
 
                    );
 
                    let mut ctx = NonsyncProtoContext {
 
                        logger: &mut *logger,
 
                        port_info: &mut self.port_info,
 
                        id_manager: &mut self.id_manager,
 
                        proto_component_id,
 
                        unrun_components: &mut unrun_components,
 
                        proto_component_ports: &mut self
 
                            .proto_components
 
                            .get_mut(&proto_component_id)
 
                            .unwrap()
 
                            .ports,
 
                    };
 
                    let blocker = component.state.nonsync_run(&mut ctx, &self.proto_description);
 
                    log!(
 
                        logger,
 
                        "proto component {:?} ran to nonsync blocker {:?}",
 
                        proto_component_id,
 
                        &blocker
 
                    );
 
                    use NonsyncBlocker as B;
 
                    match blocker {
 
                        B::ComponentExit => drop(component),
 
                        B::Inconsistent => {
 
                            return Err(InconsistentProtoComponent(proto_component_id))
 
                        }
 
                        B::SyncBlockStart => {
 
                            branching_proto_components.insert(
 
                                proto_component_id,
 
                                BranchingProtoComponent::initial(component),
 
                            );
 
                        }
 
                    }
 
                }
 
                log!(
 
                    logger,
 
                    "All {} proto components are now done with Nonsync phase",
 
                    branching_proto_components.len(),
 
                );
 

	
 
                // NOTE: all msgs in outbox are of form (Putter, Payload)
 
                let mut payload_outbox: Vec<(PortId, SendPayloadMsg)> = vec![];
 
                // NOTE: all msgs in outbox are of form (Getter, Payload)
 
                let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
                // create the solution storage
 
                let mut solution_storage = {
 
                    let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native));
 
                    let c = self
 
                        .proto_components
 
                        .keys()
 
                        .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id)));
 
                    let e = (0..endpoint_manager.endpoint_exts.len())
 
                        .map(|index| Route::Endpoint { index });
 
                    SolutionStorage::new(n.chain(c).chain(e))
 
                };
 
                log!(logger, "Solution storage initialized");
 

	
 
                // 2. kick off the native
 
                log!(
 
                    logger,
 
                    "Translating {} native batches into branches...",
 
                    native_batches.len()
 
                );
 
                let mut branching_native = BranchingNative { branches: Default::default() };
 
                for (index, NativeBatch { to_get, to_put }) in native_batches.drain(..).enumerate()
 
                {
 
                    let predicate = {
 
                        let mut predicate = Predicate::default();
 
                        // assign trues
 
                        for &port in to_get.iter().chain(to_put.keys()) {
 
                            predicate.assigned.insert(port, true);
 
                            let var = self.port_info.firing_var_for(port);
 
                            predicate.assigned.insert(var, true);
 
                        }
 
                        // assign falses
 
                        for &port in self.native_ports.iter() {
 
                            predicate.assigned.entry(port).or_insert(false);
 
                            let var = self.port_info.firing_var_for(port);
 
                            predicate.assigned.entry(var).or_insert(false);
 
                        }
 
                        predicate
 
                    };
 
                    log!(logger, "Native branch {} has pred {:?}", index, &predicate);
 

	
 
                    // put all messages
 
                    for (port, payload) in to_put {
 
                        let msg = SendPayloadMsg { payload_predicate: predicate.clone(), payload };
 
                    for (putter, payload) in to_put {
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        log!(logger, "Native branch {} sending msg {:?}", index, &msg);
 
                        payload_outbox.push((port, msg));
 
                        // rely on invariant: sync batches respect port polarity
 
                        let getter = *self.port_info.peers.get(&putter).unwrap();
 
                        payloads_to_get.push((getter, msg));
 
                    }
 
                    if to_get.is_empty() {
 
                        log!(logger, "Native submitting trivial solution for index {}", index);
 
                        solution_storage.submit_and_digest_subtree_solution(
 
                            logger,
 
                            Route::LocalComponent(LocalComponentId::Native),
 
                            Predicate::default(),
 
                        );
 
                    }
 
                    let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
                    if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                        // TODO
 
                        return Err(IndistinguishableBatches([index, existing.index]));
 
                    }
 
                }
 
                log!(logger, "Done translating native batches into branches");
 
                native_batches.push(Default::default());
 

	
 
                // run all proto components to their sync blocker
 
                log!(
 
                    logger,
 
                    "Running all {} proto components to their sync blocker...",
 
                    branching_proto_components.len()
 
                );
 
                for (proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
                    // run this component to sync blocker in-place
 
                    log!(
 
                        logger,
 
                        "Running proto component with id {:?} to blocker...",
 
                        proto_component_id
 
                    );
 
                    let blocked = &mut proto_component.branches;
 
                    let [unblocked_from, unblocked_to] = [
 
                        &mut HashMap::<Predicate, ProtoComponentBranch>::default(),
 
                        &mut Default::default(),
 
                    ];
 
                    // DRAIN-AND-POPULATE PATTERN: DRAINING unblocked into blocked while POPULATING unblocked
 
                    std::mem::swap(unblocked_from, blocked);
 
                    while !unblocked_from.is_empty() {
 
                        for (mut predicate, mut branch) in unblocked_from.drain() {
 
                            let mut ctx = SyncProtoContext {
 
                                logger,
 
                                predicate: &predicate,
 
                                port_info: &self.port_info,
 
                                proto_component_id: *proto_component_id,
 
                                inbox: &branch.inbox,
 
                            };
 
                            let blocker = branch.state.sync_run(&mut ctx, &self.proto_description);
 
                            log!(
 
                                logger,
 
                                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                                proto_component_id,
 
                                &predicate,
 
                                &blocker,
 
                            );
 
                            use SyncBlocker as B;
 
                            match blocker {
 
                                B::Inconsistent => {
 
                                    // branch is inconsistent. throw it away
 
                                    drop((predicate, branch));
 
                                }
 
                                B::SyncBlockEnd => {
 
                                    // make concrete all variables
 
                                    for &port in proto_component.ports.iter() {
 
                                        predicate.assigned.entry(port).or_insert(false);
 
                                        let var = self.port_info.firing_var_for(port);
 
                                        predicate.assigned.entry(var).or_insert(false);
 
                                    }
 
                                    // submit solution for this component
 
                                    solution_storage.submit_and_digest_subtree_solution(
 
                                        logger,
 
                                        Route::LocalComponent(LocalComponentId::Proto(
 
                                            *proto_component_id,
 
                                        )),
 
                                        predicate.clone(),
 
                                    );
 
                                    // move to "blocked"
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntReadMsg(port) => {
 
                                    // move to "blocked"
 
                                    assert!(predicate.query(port).is_none());
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    assert!(predicate.query(var).is_none());
 
                                    assert!(!branch.inbox.contains_key(&port));
 
                                    blocked.insert(predicate, branch);
 
                                }
 
                                B::CouldntCheckFiring(port) => {
 
                                    // sanity check
 
                                    assert!(predicate.query(port).is_none());
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    assert!(predicate.query(var).is_none());
 
                                    // keep forks in "unblocked"
 
                                    unblocked_to.insert(
 
                                        predicate.clone().inserted(var, false),
 
                                        branch.clone(),
 
                                    );
 
                                    unblocked_to.insert(predicate.inserted(var, true), branch);
 
                                }
 
                                B::PutMsg(port, payload) => {
 
                                B::PutMsg(putter, payload) => {
 
                                    // sanity check
 
                                    assert_eq!(Some(&Putter), self.port_info.polarities.get(&port));
 
                                    assert_eq!(
 
                                        Some(&Putter),
 
                                        self.port_info.polarities.get(&putter)
 
                                    );
 
                                    // overwrite assignment
 
                                    let var = self.port_info.firing_var_for(port);
 
                                    let var = self.port_info.firing_var_for(putter);
 

	
 
                                    let was = predicate.assigned.insert(var, true);
 
                                    if was == Some(false) {
 
                                        log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var);
 
                                        log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                                        // discard forever
 
                                        drop((predicate, branch));
 
                                    } else {
 
                                        // keep in "unblocked"
 
                                        log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, port, var);
 
                                        payload_outbox.push((
 
                                            port,
 
                                        let getter = *self.port_info.peers.get(&putter).unwrap();
 
                                        log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                                        payloads_to_get.push((
 
                                            getter,
 
                                            SendPayloadMsg {
 
                                                payload_predicate: predicate.clone(),
 
                                                predicate: predicate.clone(),
 
                                                payload,
 
                                            },
 
                                        ));
 
                                        unblocked_to.insert(predicate, branch);
 
                                    }
 
                                }
 
                            }
 
                        }
 
                        std::mem::swap(unblocked_from, unblocked_to);
 
                    }
 
                }
 
                log!(logger, "All proto components are blocked");
 

	
 
                log!(logger, "Entering decision loop...");
 
                endpoint_manager.undelay_all();
 
                let decision = 'undecided: loop {
 
                    // check if we already have a solution
 
                    // drain payloads_to_get, sending them through endpoints / feeding them to components
 
                    while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
                        assert!(self.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                        match self.port_info.routes.get(&getter).unwrap() {
 
                            Route::Endpoint { index } => {
 
                                let msg = Msg::CommMsg(CommMsg {
 
                                    round_index: *round_index,
 
                                    contents: CommMsgContents::SendPayload(send_payload_msg),
 
                                });
 
                                endpoint_manager.send_to(*index, &msg).unwrap();
 
                            }
 
                            Route::LocalComponent(LocalComponentId::Native) => branching_native
 
                                .feed_msg(
 
                                    logger,
 
                                    &self.port_info,
 
                                    &mut solution_storage,
 
                                    getter,
 
                                    send_payload_msg,
 
                                ),
 
                            Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => {
 
                                if let Some(branching_component) =
 
                                    branching_proto_components.get_mut(&proto_component_id)
 
                                {
 
                                    branching_component.feed_msg(
 
                                        logger,
 
                                        &self.port_info,
 
                                        &mut solution_storage,
 
                                        getter,
 
                                        send_payload_msg,
 
                                    )
 
                                }
 
                            }
 
                        }
 
                    }
 

	
 
                    // check if we have a solution yet
 
                    log!(logger, "Check if we have any local decisions...");
 
                    for solution in solution_storage.iter_new_local_make_old() {
 
                        log!(logger, "New local decision with solution {:?}...", &solution);
 
                        match neighborhood.parent {
 
                            Some(parent) => {
 
                                log!(logger, "Forwarding to my parent {:?}", parent);
 
                                let suggestion = Decision::Success(solution);
 
                                let msg = Msg::CommMsg(CommMsg {
 
                                    round_index: *round_index,
 
                                    contents: CommMsgContents::Elaborate {
 
                                        partial_oracle: solution,
 
                                    },
 
                                    contents: CommMsgContents::Suggest { suggestion },
 
                                });
 
                                endpoint_manager.send_to(parent, &msg).unwrap();
 
                            }
 
                            None => {
 
                                log!(logger, "No parent. Deciding on solution {:?}", &solution);
 
                                break 'undecided Decision::Success(solution);
 
                            }
 
                        }
 
                    }
 

	
 
                    // TODO send / recv messages
 
                    // stuck! make progress by receiving a msg
 
                    // try recv messages arriving through endpoints
 
                    log!(logger, "No decision yet. Let's recv an endpoint msg...");
 
                    {
 
                        let (endpoint_index, msg) =
 
                            endpoint_manager.try_recv_any(deadline).unwrap();
 
                        log!(logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg);
 
                        let comm_msg_contents = match msg {
 
                            Msg::SetupMsg(..) => {
 
                                log!(logger, "Discarding setup message; that phase is over");
 
                                continue 'undecided;
 
                            }
 
                            Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(round_index) {
 
                                Ordering::Equal => comm_msg.contents,
 
                                Ordering::Less => {
 
                                    log!(
 
                                        logger,
 
                                        "We are in round {}, but msg is for round {}. Discard",
 
                                        comm_msg.round_index,
 
                                        round_index,
 
                                    );
 
                                    drop(comm_msg);
 
                                    continue 'undecided;
 
                                }
 
                                Ordering::Greater => {
 
                                    log!(
 
                                        logger,
 
                                        "We are in round {}, but msg is for round {}. Buffer",
 
                                        comm_msg.round_index,
 
                                        round_index,
 
                                    );
 
                                    endpoint_manager
 
                                        .delayed_messages
 
                                        .push((endpoint_index, Msg::CommMsg(comm_msg)));
 
                                    continue 'undecided;
 
                                }
 
                            },
 
                        };
 
                        match comm_msg_contents {
 
                            CommMsgContents::SendPayload(send_payload_msg) => {
 
                                let getter = endpoint_manager.endpoint_exts[endpoint_index]
 
                                    .getter_for_incoming;
 
                                assert!(self.port_info.polarities.get(&getter) == Some(&Getter));
 
                                log!(
 
                                    logger,
 
                                    "Msg routed to getter port {:?}. Buffer for recv loop",
 
                                    getter,
 
                                );
 
                                payloads_to_get.push((getter, send_payload_msg));
 
                            }
 
                            CommMsgContents::Suggest { suggestion } => {
 
                                // only accept this control msg through a child endpoint
 
                                if neighborhood.children.binary_search(&endpoint_index).is_ok() {
 
                                    match suggestion {
 
                                        Decision::Success(predicate) => {
 
                                            // child solution contributes to local solution
 
                                            log!(
 
                                                logger,
 
                                                "Child provided solution {:?}",
 
                                                &predicate
 
                                            );
 
                                            let route = Route::Endpoint { index: endpoint_index };
 
                                            solution_storage.submit_and_digest_subtree_solution(
 
                                                logger, route, predicate,
 
                                            );
 
                                        }
 
                                        Decision::Failure => match neighborhood.parent {
 
                                            None => {
 
                                                log!(
 
                                                    logger,
 
                                                    "As sink, I decide on my child's failure"
 
                                                );
 
                                                // I am the sink. Decide on failed
 
                                                break 'undecided Decision::Failure;
 
                                            }
 
                                            Some(parent) => {
 
                                                log!(logger, "Forwarding failure through my parent endpoint {:?}", parent);
 
                                                // I've got a parent. Forward the failure suggestion.
 
                                                let msg = Msg::CommMsg(CommMsg {
 
                                                    round_index: *round_index,
 
                                                    contents: CommMsgContents::Suggest {
 
                                                        suggestion,
 
                                                    },
 
                                                });
 
                                                endpoint_manager.send_to(parent, &msg).unwrap();
 
                                            }
 
                                        },
 
                                    }
 
                                } else {
 
                                    log!(logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, endpoint_index);
 
                                }
 
                            }
 
                            CommMsgContents::Announce { decision } => {
 
                                if Some(endpoint_index) == neighborhood.parent {
 
                                    // adopt this decision
 
                                    break 'undecided decision;
 
                                } else {
 
                                    log!(logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, endpoint_index);
 
                                }
 
                            }
 
                        }
 
                    }
 
                    log!(logger, "Endpoint msg recv done");
 
                };
 
                log!(logger, "Committing to decision {:?}!", &decision);
 

	
 
                // propagate the decision to children
 
                let msg = Msg::CommMsg(CommMsg {
 
                    round_index: *round_index,
 
                    contents: CommMsgContents::Announce { decision: decision.clone() },
 
                });
 
                log!(
 
                    logger,
 
                    "Announcing decision {:?} through child endpoints {:?}",
 
                    &msg,
 
                    &neighborhood.children
 
                );
 
                for &child in neighborhood.children.iter() {
 
                    endpoint_manager.send_to(child, &msg).unwrap();
 
                }
 

	
 
                *round_result = match decision {
 
                    Decision::Failure => Err(DistributedTimeout),
 
                    Decision::Success(predicate) => {
 
                        // commit changes to component states
 
                        self.proto_components.clear();
 
                        self.proto_components.extend(
 
                            branching_proto_components
 
                                .into_iter()
 
                                .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                        );
 
                        Ok(Some(branching_native.collapse_with(&predicate)))
 
                    }
 
                };
 
                log!(logger, "Updated round_result to {:?}", round_result);
 

	
 
                let returning = round_result
 
                    .as_ref()
 
                    .map(|option| option.as_ref().unwrap().0)
 
                    .map_err(|sync_error| sync_error.clone());
 
                log!(logger, "Returning {:?}", &returning);
 
                returning
 
            }
 
        }
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        port_info: &PortInfo,
 
        solution_storage: &mut SolutionStorage,
 
        getter: PortId,
 
        send_payload_msg: SendPayloadMsg,
 
    ) {
 
        assert!(port_info.polarities.get(&getter).copied() == Some(Getter));
 
        println!("BEFORE {:#?}", &self.branches);
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            // check if this branch expects to receive it
 
            let var = port_info.firing_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    let route = Route::LocalComponent(LocalComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        logger,
 
                        route,
 
                        predicate.clone(),
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(true) {
 
                // optimization. Don't bother trying this branch
 
                finished.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate
 
                    let mut branch2 = branch.clone();
 
                    // original branch untouched
 
                    finished.insert(predicate, branch);
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    finished.insert(predicate2, branch2);
 
                }
 
                Csr::New(new_predicate) => {
 
                    // fork branch, give fork the message and the new predicate
 
                    let mut branch2 = branch.clone();
 
                    // original branch untouched
 
                    finished.insert(predicate, branch);
 
                    feed_branch(&mut branch2, &new_predicate);
 
                    finished.insert(new_predicate, branch2);
 
                }
 
            }
 
        }
 
        println!("AFTER {:#?}", &self.branches);
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap<PortId, Payload>) {
 
        for (branch_predicate, branch) in self.branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                return (index, gotten);
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn feed_msg(
 
        &mut self,
 
        _logger: &mut dyn Logger,
 
        _port_info: &PortInfo,
 
        _solution_storage: &mut SolutionStorage,
 
        _getter: PortId,
 
        _send_payload_msg: SendPayloadMsg,
 
    ) {
 
        todo!()
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(routes: impl Iterator<Item = Route>) -> Self {
 
        let mut subtree_id_to_index: HashMap<Route, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for key in routes {
 
            subtree_id_to_index.insert(key, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
 
    }
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
    }
 
    pub(crate) fn reset(&mut self, subtree_ids: impl Iterator<Item = Route>) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
        for key in subtree_ids {
 
            self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
            self.subtree_solutions.push(Default::default())
 
        }
 
    }
 

	
 
    pub(crate) fn peek_new_locals(&self) -> impl Iterator<Item = &Predicate> + '_ {
 
        self.new_local.iter()
 
    }
 

	
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            local
 
        })
 
    }
 

	
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        subtree_id: Route,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
        }
 
    }
 

	
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut dyn Logger,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep traversing
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        logger,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 

	
 
// impl ControllerEphemeral {
 
//     fn is_clear(&self) -> bool {
 
//         self.solution_storage.is_clear()
 
//             && self.poly_n.is_none()
 
//             && self.poly_ps.is_empty()
 
//             && self.mono_ps.is_empty()
 
//             && self.port_to_holder.is_empty()
 
//     }
 
//     fn clear(&mut self) {
 
//         self.solution_storage.clear();
 
//         self.poly_n.take();
 
//         self.poly_ps.clear();
 
//         self.port_to_holder.clear();
 
//     }
 
// }
 
// impl Into<PolyP> for MonoP {
 
//     fn into(self) -> PolyP {
 
//         PolyP {
 
//             complete: Default::default(),
 
//             incomplete: hashmap! {
 
//                 Predicate::new_trivial() =>
 
//                 BranchP {
 
//                     state: self.state,
 
//                     inbox: Default::default(),
 
//                     outbox: Default::default(),
 
//                     blocking_on: None,
 
//                 }
 
//             },
 
//             ports: self.ports,
 
//         }
 
//     }
 
// }
 

	
 
// impl From<EndpointError> for SyncError {
 
//     fn from(e: EndpointError) -> SyncError {
 
//         SyncError::EndpointError(e)
 
//     }
 
// }
 

	
 
// impl ProtoSyncContext<'_> {
 
//     fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: Self::S) {
 
//         todo!()
 
//     }
 
//     fn new_channel(&mut self) -> [PortId; 2] {
 
//         todo!()
 
//     }
 
// }
 

	
 
// impl PolyContext for BranchPContext<'_, '_> {
 
//     type D = ProtocolD;
 

	
 
//     fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
//         assert!(self.ports.contains(&port));
 
//         let channel_id = self.m_ctx.endpoint_exts.get(port).unwrap().info.channel_id;
 
//         let val = self.predicate.query(channel_id);
 
//         log!(
 
//             &mut self.m_ctx.logger,
 
//             "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
 
//         );
 
//         val
 
//     }
 
//     fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
//         assert!(self.ports.contains(&port));
 
//         let val = self.inbox.get(&port);
 
//         log!(
 
//             &mut self.m_ctx.logger,
 
//             "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
//             self.m_ctx.my_subtree_id,
 
//             val,
src/runtime/error.rs
Show inline comments
 
use crate::common::*;
 

	
 
#[derive(Debug)]
 
pub enum EndpointError {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
}
 
#[derive(Debug)]
 
pub enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
    BrokenEndpoint(usize),
 
}
 
#[derive(Debug, Clone)]
 
pub enum SyncError {
 
    Timeout,
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
    IndistinguishableBatches([usize; 2]),
 
    DistributedTimeout,
 
}
 
#[derive(Debug)]
 
pub enum PortOpError {
 
    WrongPolarity,
 
    NotConnected,
 
    MultipleOpsOnPort,
 
    PortUnavailable,
 
}
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum GottenError {
 
    NoPreviousRound,
 
    PortDidntGet,
 
    PreviousSyncFailed,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum NextBatchError {
 
    NotConnected,
 
}
src/runtime/mod.rs
Show inline comments
 
mod communication;
 
pub mod error;
 
mod setup2;
 

	
 
#[cfg(test)]
 
mod my_tests;
 

	
 
use crate::common::*;
 
use error::*;
 

	
 
#[derive(
 
    Debug, Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct FiringVar(PortId);
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
pub enum LocalComponentId {
 
    Native,
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
pub enum Route {
 
    LocalComponent(LocalComponentId),
 
    Endpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub enum Decision {
 
    Failure,
 
    Success(Predicate),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderEcho { maybe_leader: ControllerId },
 
    LeaderAnnounce { leader: ControllerId },
 
    YouAreMyParent,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct CommMsg {
 
    pub round_index: usize,
 
    pub contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    Elaborate { partial_oracle: Predicate }, // SINKWARD
 
    Failure,                                 // SINKWARD
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SendPayloadMsg {
 
    payload_predicate: Predicate,
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
pub enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 
pub struct Endpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Clone)]
 
pub struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
pub trait Logger: Debug {
 
    fn line_writer(&mut self) -> &mut dyn std::fmt::Write;
 
    fn dump_log(&self, w: &mut dyn std::io::Write);
 
}
 
#[derive(Debug, Clone)]
 
pub struct EndpointSetup {
 
    pub sock_addr: SocketAddr,
 
    pub is_active: bool,
 
}
 
#[derive(Debug)]
 
pub struct EndpointExt {
 
    endpoint: Endpoint,
 
    inp_for_emerging_msgs: PortId,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
pub struct Neighborhood {
 
    parent: Option<usize>,
 
    children: Vec<usize>, // ordered, deduplicated
 
}
 
#[derive(Debug)]
 
pub struct MemInMsg {
 
    inp: PortId,
 
    msg: Payload,
 
}
 
#[derive(Debug)]
 
pub struct IdManager {
 
    controller_id: ControllerId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
pub struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    polled_undrained: IndexSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    endpoint_exts: Vec<EndpointExt>,
 
}
 
#[derive(Debug, Default)]
 
pub struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
 
pub struct Connector {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    logger: Box<dyn Logger>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    port_info: PortInfo,
 
    phased: ConnectorPhased,
 
}
 
#[derive(Debug)]
 
pub enum ConnectorPhased {
 
    Setup {
 
        endpoint_setups: Vec<(PortId, EndpointSetup)>,
 
        surplus_sockets: u16,
 
    },
 
    Communication {
 
        round_index: usize,
 
        endpoint_manager: EndpointManager,
 
        neighborhood: Neighborhood,
 
        mem_inbox: Vec<MemInMsg>,
 
        native_batches: Vec<NativeBatch>,
 
        round_result: Result<Option<(usize, HashMap<PortId, Payload>)>, SyncError>,
 
    },
 
}
 
#[derive(Debug)]
 
pub struct StringLogger(ControllerId, String);
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub struct Predicate {
 
    pub assigned: BTreeMap<PortId, bool>,
 
    pub assigned: BTreeMap<FiringVar, bool>,
 
}
 
#[derive(Debug, Default)]
 
pub struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
pub struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 
pub struct NonsyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    proto_component_id: ProtoComponentId,
 
    port_info: &'a mut PortInfo,
 
    id_manager: &'a mut IdManager,
 
    proto_component_ports: &'a mut HashSet<PortId>,
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>,
 
}
 
pub struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    predicate: &'a Predicate,
 
    proto_component_id: ProtoComponentId,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 

	
 
// pub struct MonoPContext<'a> {
 
//     inner: &'a mut ControllerInner,
 
//     ports: &'a mut HashSet<PortId>,
 
//     mono_ps: &'a mut Vec<MonoP>,
 
// }
 
// pub struct PolyPContext<'a> {
 
//     my_subtree_id: SubtreeId,
 
//     inner: &'a mut Connector,
 
//     solution_storage: &'a mut SolutionStorage,
 
// }
 
// impl PolyPContext<'_> {
 
//     #[inline(always)]
 
//     fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> {
 
//         let Self { solution_storage, my_subtree_id, inner } = self;
 
//         PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner }
 
//     }
 
// }
 
// struct BranchPContext<'m, 'r> {
 
//     m_ctx: PolyPContext<'m>,
 
//     ports: &'r HashSet<PortId>,
 
//     predicate: &'r Predicate,
 
//     inbox: &'r HashMap<PortId, Payload>,
 
// }
 

	
 
// #[derive(Debug)]
 
// pub enum SyncRunResult {
 
//     BlockingForRecv,
 
//     AllBranchesComplete,
 
//     NoBranches,
 
// }
 
// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
// pub enum PolyId {
 
//     N,
 
//     P { index: usize },
 
// }
 

	
 
// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
// pub enum SubtreeId {
 
//     PolyN,
 
//     PolyP { index: usize },
 
//     ChildController { port: PortId },
 
// }
 
// #[derive(Debug)]
 
// pub struct NativeBranch {
 
//     gotten: HashMap<PortId, Payload>,
 
//     to_get: HashSet<PortId>,
 
// }
 

	
 
////////////////
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> PortId {
 
        match self.polarities.get(&port).unwrap() {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
        FiringVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        }
 
        })
 
    }
 
}
 
impl IdManager {
 
    fn new(controller_id: ControllerId) -> Self {
 
        Self {
 
            controller_id,
 
            port_suffix_stream: Default::default(),
 
            proto_component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { controller_id: self.controller_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_proto_component_id(&mut self) -> ProtoComponentId {
 
        Id {
 
            controller_id: self.controller_id,
 
            u32_suffix: self.proto_component_suffix_stream.next(),
 
        }
 
        .into()
 
    }
 
}
 
impl Connector {
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()];
 
        self.native_ports.insert(o);
 
        self.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.port_info.polarities.insert(o, Putter);
 
        self.port_info.polarities.insert(i, Getter);
 
        self.port_info.peers.insert(o, i);
 
        self.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(LocalComponentId::Native);
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
}
 
impl EndpointManager {
 
    fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> {
 
        self.endpoint_exts[index].endpoint.send(msg)
 
    }
 
    fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError::*;
 
        // 1. try messages already buffered
 
        if let Some(x) = self.undelayed_messages.pop() {
 
            return Ok(x);
 
        }
 
        loop {
 
            // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained
 
            while let Some(index) = self.polled_undrained.pop() {
 
                let endpoint = &mut self.endpoint_exts[index].endpoint;
 
                if let Some(msg) =
 
                    endpoint.try_recv().map_err(|error| EndpointError { error, index })?
 
                {
 
                    if !endpoint.inbox.is_empty() {
 
                        // there may be another message waiting!
 
                        self.polled_undrained.insert(index);
 
                    }
 
                    return Ok((index, msg));
 
                }
 
            }
 
            // 3. No message yet. Do we have enough time to poll?
 
            let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
            self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?;
 
            for event in self.events.iter() {
 
                let Token(index) = event.token();
 
                self.polled_undrained.insert(index);
 
            }
 
            self.events.clear();
 
        }
 
    }
 
    fn undelay_all(&mut self) {
 
        if self.undelayed_messages.is_empty() {
 
            // fast path
 
            std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages);
 
            return;
 
        }
 
        // slow path
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 
}
 
impl Debug for Endpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
    }
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
    }
 
}
 
impl<R: Read> MonitoredReader<R> {
 
    pub fn bytes_read(&self) -> usize {
 
        self.bytes
 
    }
 
}
 
impl<R: Read> Read for MonitoredReader<R> {
 
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 
        let n = self.r.read(buf)?;
 
        self.bytes += n;
 
        Ok(n)
 
    }
 
}
 
impl Into<Msg> for SetupMsg {
 
    fn into(self) -> Msg {
 
        Msg::SetupMsg(self)
 
    }
 
}
 
impl StringLogger {
 
    pub fn new(controller_id: ControllerId) -> Self {
 
        Self(controller_id, String::default())
 
    }
 
}
 
impl Logger for StringLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::fmt::Write {
 
        use std::fmt::Write;
 
        let _ = write!(&mut self.1, "\nCID({}): ", self.0);
 
        self
 
    }
 
    fn dump_log(&self, w: &mut dyn std::io::Write) {
 
        let _ = w.write(self.1.as_bytes());
 
    }
 
}
 
impl std::fmt::Write for StringLogger {
 
    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
 
        self.1.write_str(s)
 
    }
 
}
 
impl Endpoint {
 
    fn try_recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
        // populate inbox as much as possible
 
        'read_loop: loop {
 
            match self.stream.read_to_end(&mut self.inbox) {
 
                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(BrokenEndpoint),
 
            }
 
        }
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        match bincode::deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
                }
 
                _ => Err(MalformedMessage),
 
                // println!("SERDE ERRKIND {:?}", e);
 
                // Err(MalformedMessage)
 
            },
 
        }
 
    }
 
    fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), ()> {
 
        bincode::serialize_into(&mut self.stream, msg).map_err(drop)
 
    }
 
}
 
impl Connector {
 
    pub fn get_logger(&self) -> &dyn Logger {
 
        &*self.logger
 
    }
 
    pub fn print_state(&self) {
 
        let stdout = std::io::stdout();
 
        let mut lock = stdout.lock();
 
        writeln!(
 
            lock,
 
            "--- Connector with ControllerId={:?}.\n::LOG_DUMP:\n",
 
            self.id_manager.controller_id
 
        )
 
        .unwrap();
 
        self.get_logger().dump_log(&mut lock);
 
        writeln!(lock, "\n\nDEBUG_PRINT:\n{:#?}\n", self).unwrap();
 
    }
 
}
 
// impl Debug for SolutionStorage {
 
//     fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
//         f.pad("Solutions: [")?;
 
//         for (subtree_id, &index) in self.subtree_id_to_index.iter() {
 
//             let sols = &self.subtree_solutions[index];
 
//             f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?;
 
//         }
 
//         f.pad("]")
 
//     }
 
// }
 

	
 
impl Predicate {
 
    #[inline]
 
    pub fn inserted(mut self, k: PortId, v: bool) -> Self {
 
    pub fn inserted(mut self, k: FiringVar, v: bool) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn satisfies(&self, other: &Self) -> bool {
 
        let mut s_it = self.assigned.iter();
 
        let mut s = if let Some(s) = s_it.next() {
 
            s
 
        } else {
 
            return other.assigned.is_empty();
 
        };
 
        for (oid, ob) in other.assigned.iter() {
 
            while s.0 < oid {
 
                s = if let Some(s) = s_it.next() {
 
                    s
 
                } else {
 
                    return false;
 
                };
 
            }
 
            if s.0 > oid || s.1 != ob {
 
                return false;
 
            }
 
        }
 
        true
 
    }
 

	
 
    /// Given self and other, two predicates, return the most general Predicate possible, N
 
    /// such that n.satisfies(self) && n.satisfies(other).
 
    /// If none exists Nonexistant is returned.
 
    /// If the resulting predicate is equivlanet to self, other, or both,
 
    /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
    /// otherwise New(N) is returned.
 
    pub fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
        use CommonSatResult::*;
 
        use CommonSatResult as Csr;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // lists of assignments in self but not other and vice versa.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
            match [s, o] {
 
                [None, None] => break,
 
                [None, Some(x)] => {
 
                    o_not_s.push(x);
 
                    o_not_s.extend(o_it);
 
                    break;
 
                }
 
                [Some(x), None] => {
 
                    s_not_o.push(x);
 
                    s_not_o.extend(s_it);
 
                    break;
 
                }
 
                [Some((sid, sb)), Some((oid, ob))] => {
 
                    if sid < oid {
 
                        // o is missing this element
 
                        s_not_o.push((sid, sb));
 
                        s = s_it.next();
 
                    } else if sid > oid {
 
                        // s is missing this element
 
                        o_not_s.push((oid, ob));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Nonexistant;
 
                        return Csr::Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Equivalent,       // ... equivalent to both.
 
            [false, true] => FormerNotLatter, // ... equivalent to self.
 
            [true, false] => LatterNotFormer, // ... equivalent to other.
 
            [true, true] => Csr::Equivalent,       // ... equivalent to both.
 
            [false, true] => Csr::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Csr::LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                New(new)
 
                Csr::New(new)
 
            }
 
        }
 
    }
 

	
 
    pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = PortId> + '_ {
 
        self.assigned
 
            .iter()
 
            .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    }
 
    // pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = FiringVar> + '_ {
 
    //     self.assigned
 
    //         .iter()
 
    //         .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    // }
 

	
 
    pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator<Item = PortId>, value: bool) {
 
        for channel_id in channel_ids {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
 
    }
 
    // pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator<Item = PortId>, value: bool) {
 
    //     for channel_id in channel_ids {
 
    //         self.assigned.entry(channel_id).or_insert(value);
 
    //     }
 
    // }
 
    // pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option<bool> {
 
    //     self.assigned.insert(channel_id, value)
 
    // }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
            match res.assigned.insert(channel_id, assignment_1) {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, x: PortId) -> Option<bool> {
 
        self.assigned.get(&x).copied()
 
    pub fn query(&self, var: FiringVar) -> Option<bool> {
 
        self.assigned.get(&var).copied()
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        struct MySet<'a>(&'a Predicate, bool);
 
        impl Debug for MySet<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                let iter = self.0.assigned.iter().filter_map(|(port, &firing)| {
 
                    if firing == self.1 {
 
                        Some(port)
 
                    } else {
 
                        None
 
                    }
 
                });
 
                f.debug_set().entries(iter).finish()
 
            }
 
        }
 
        f.debug_struct("Predicate")
 
            .field("Trues", &MySet(self, true))
 
            .field("Falses", &MySet(self, false))
 
            .finish()
 
    }
 
}
src/runtime/my_tests.rs
Show inline comments
 
use crate as reowolf;
 
use crossbeam_utils::thread::scope;
 
use reowolf::{Connector, EndpointSetup, Polarity::*, ProtocolDescription};
 
use std::net::SocketAddr;
 
use std::{sync::Arc, time::Duration};
 

	
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 

	
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> =
 
        { Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap()) };
 
}
 

	
 
#[test]
 
fn simple_connector() {
 
    let c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn new_port_pair() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, _] = c.new_port_pair();
 
    let [_, _] = c.new_port_pair();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn new_sync() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.new_port_pair();
 
    c.add_component(b"sync", &[i, o]).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn new_net_port() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let sock_addr = next_test_addr();
 
    let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
    let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let sock_addr = next_test_addr();
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
    let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
    let res = c.connect(Duration::from_secs(1));
 
    println!("{:#?}", c);
 
    c.get_logger().dump_log(&mut std::io::stdout().lock());
 
    res.unwrap();
 
}
 

	
 
#[test]
 
fn multithreaded_connect() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.print_state();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.print_state();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn put_no_sync() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
}
 

	
 
#[test]
 
fn wrong_polarity_bad() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
}
 

	
 
#[test]
 
fn dup_put_bad() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap_err();
 
}
 

	
 
#[test]
 
fn trivial_sync() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
    c.print_state();
 
}
 

	
 
#[test]
 
fn unconnected_gotten_err() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn connected_gotten_err_no_round() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn connected_gotten_err_ungotten() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
    assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn native_polarity_checks() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    // fail...
 
    c.get(o).unwrap_err();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
    // succeed..
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
}
 

	
 
#[test]
 
fn native_multiple_gets() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.get(i).unwrap();
 
    c.get(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn next_batch() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.next_batch().unwrap_err();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
}
 

	
 
#[test]
 
fn native_sync() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
}
 

	
 
#[test]
 
fn native_message_pass() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
src/runtime/setup2.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
struct LogicalChannelInfo {
 
    local_port: PortId,
 
    peer_port: PortId,
 
    local_polarity: Polarity,
 
    endpoint_index: usize,
 
}
 
///////////////
 
impl Connector {
 
    pub fn new_simple(
 
        proto_description: Arc<ProtocolDescription>,
 
        controller_id: ControllerId,
 
    ) -> Self {
 
        let logger = Box::new(StringLogger::new(controller_id));
 
        let surplus_sockets = 8;
 
        Self::new(logger, proto_description, controller_id, surplus_sockets)
 
    }
 
    pub fn new(
 
        logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        controller_id: ControllerId,
 
        surplus_sockets: u16,
 
    ) -> Self {
 
        Self {
 
            proto_description,
 
            proto_components: Default::default(),
 
            logger,
 
            id_manager: IdManager::new(controller_id),
 
            native_ports: Default::default(),
 
            port_info: Default::default(),
 
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
 
        }
 
    }
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        endpoint_setup: EndpointSetup,
 
    ) -> Result<PortId, ()> {
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let p = self.id_manager.new_port_id();
 
                self.native_ports.insert(p);
 
                // {polarity, route} known. {peer} unknown.
 
                self.port_info.polarities.insert(p, polarity);
 
                self.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native));
 
                log!(self.logger, "Added net port {:?} with info {:?} ", p, &endpoint_setup);
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
    }
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError::*;
 
        // 1. check if this is OK
 
        let polarities = self.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !self.native_ports.contains(port) {
 
                return Err(UnknownPort(*port));
 
            }
 
            if expected_polarity != *self.port_info.polarities.get(port).unwrap() {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // 3. remove ports from old component & update port->route
 
        let new_id = self.id_manager.new_proto_component_id();
 
        for port in ports.iter() {
 
            self.port_info
 
                .routes
 
                .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id)));
 
        }
 
        // 4. add new component
 
        self.proto_components.insert(
 
            new_id,
 
            ProtoComponent {
 
                state: self.proto_description.new_main_component(identifier, ports),
 
                ports: ports.iter().copied().collect(),
 
            },
 
        );
 
        Ok(())
 
    }
 
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> {
 
        match &mut self.phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(self.logger, "Call to connecting in connected state");
 
                Err(())
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(self.logger, "~~~ CONNECT called with timeout {:?}", timeout);
 
                log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = Instant::now() + timeout;
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *self.logger,
 
                    endpoint_setups,
 
                    &mut self.port_info,
 
                    deadline,
 
                )?;
 
                log!(
 
                    self.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    self.id_manager.controller_id,
 
                    &mut *self.logger,
 
                    &mut endpoint_manager,
 
                    deadline,
 
                )?;
 
                log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // TODO session optimization goes here
 
                self.phased = ConnectorPhased::Communication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                };
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 

	
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    port_info: &mut PortInfo,
 
    deadline: Instant,
 
) -> Result<EndpointManager, ()> {
 
    ////////////////////////////////////////////
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    enum TodoEndpoint {
 
        Listener(TcpListener),
 
        Endpoint(Endpoint),
 
    }
 
    fn init_todo(
 
        token: Token,
 
        local_port: PortId,
 
        endpoint_setup: &EndpointSetup,
 
        poll: &mut Poll,
 
    ) -> Result<Todo, ()> {
 
        let todo_endpoint = if endpoint_setup.is_active {
 
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr).map_err(drop)?;
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr).map_err(drop)?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Listener(listener)
 
        };
 
        Ok(Todo { todo_endpoint, local_port, sent_local_port: false, recv_peer_port: None })
 
    };
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    let mut poll = Poll::new().map_err(drop)?;
 
    let mut events = Events::with_capacity(64);
 
    let mut polled_undrained = IndexSet::<usize>::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init_todo(Token(index), *local_port, endpoint_setup, &mut poll)
 
        })
 
        .collect::<Result<Vec<Todo>, _>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?;
 
        poll.poll(&mut events, Some(remaining)).map_err(drop)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint {
 
                let (mut stream, peer_addr) = listener.accept().map_err(drop)?;
 
                poll.registry().deregister(listener).unwrap();
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                log!(logger, "Endpoint[{}] accepted a connection from {:?}", index, peer_addr);
 
                let endpoint = Endpoint { stream, inbox: vec![] };
 
                todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
            }
 
            match todo {
 
                Todo {
 
                    todo_endpoint: TodoEndpoint::Endpoint(endpoint),
 
                    local_port,
 
                    sent_local_port,
 
                    recv_peer_port,
 
                    ..
 
                } => {
 
                    if !setup_incomplete.contains(&index) {
 
                        continue;
 
                    }
 
                    let local_polarity = *port_info.polarities.get(local_port).unwrap();
 
                    if event.is_writable() && !*sent_local_port {
 
                        let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                            polarity: local_polarity,
 
                            port: *local_port,
 
                        }));
 
                        endpoint.send(&msg)?;
 
                        log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                        *sent_local_port = true;
 
                    }
 
                    if event.is_readable() && recv_peer_port.is_none() {
 
                        let maybe_msg = endpoint.try_recv().map_err(drop)?;
 
                        if maybe_msg.is_some() && !endpoint.inbox.is_empty() {
 
                            polled_undrained.insert(index);
 
                        }
 
                        match maybe_msg {
 
                            None => {} // msg deserialization incomplete
 
                            Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info);
 
                                assert!(peer_info.polarity != local_polarity);
 
                                *recv_peer_port = Some(peer_info.port);
 
                                // 1. finally learned the peer of this port!
 
                                port_info.peers.insert(*local_port, peer_info.port);
 
                                // 2. learned the info of this peer port
 
                                port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                port_info.peers.insert(peer_info.port, *local_port);
 
                                port_info.routes.insert(peer_info.port, Route::Endpoint { index });
 
                            }
 
                            Some(inappropriate_msg) => {
 
                                log!(
 
                                    logger,
 
                                    "delaying msg {:?} during channel setup phase",
 
                                    inappropriate_msg
 
                                );
 
                                delayed_messages.push((index, inappropriate_msg));
 
                            }
 
                        }
 
                    }
 
                    if *sent_local_port && recv_peer_port.is_some() {
 
                        setup_incomplete.remove(&index);
 
                        log!(logger, "endpoint[{}] is finished!", index);
 
                    }
 
                }
 
                Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(),
 
            }
 
        }
 
        events.clear();
 
    }
 
    let endpoint_exts = todos
 
        .into_iter()
 
        .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
 
        .map(|Todo { todo_endpoint, local_port, .. }| EndpointExt {
 
            endpoint: match todo_endpoint {
 
                TodoEndpoint::Endpoint(endpoint) => endpoint,
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
            },
 
            inp_for_emerging_msgs: recv_peer_port.unwrap(),
 
            getter_for_incoming: local_port,
 
        })
 
        .collect();
 
    Ok(EndpointManager {
 
        poll,
 
        events,
 
        polled_undrained,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        endpoint_exts,
 
    })
 
}
 

	
 
fn init_neighborhood(
 
    controller_id: ControllerId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: Instant,
 
) -> Result<Neighborhood, ()> {
 
    use {Msg::SetupMsg as S, SetupMsg::*};
 
    log!(logger, "beginning neighborhood construction");
 
    // 1. broadcast my ID as the first echo. await reply from all neighbors
 
    let echo = S(LeaderEcho { maybe_leader: controller_id });
 
    let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len());
 
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
        log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo);
 
        ee.endpoint.send(&echo)?;
 
        awaiting.insert(index);
 
    }
 

	
 
    // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
    //    adopt it as leader, sender as parent, and reset the await set.
 
    let mut parent: Option<usize> = None;
 
    let mut my_leader = controller_id;
 
    em.undelay_all();
 
    'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        log!(logger, "GOT from index {:?} msg {:?}", &index, &msg);
 
        match msg {
 
            S(LeaderAnnounce { leader }) => {
 
                // someone else completed the echo and became leader first!
 
                // the sender is my parent
 
                parent = Some(index);
 
                my_leader = leader;
 
                awaiting.clear();
 
                break 'echo_loop;
 
            }
 
            S(LeaderEcho { maybe_leader }) => {
 
                use Ordering::*;
 
                match maybe_leader.cmp(&my_leader) {
 
                    Less => { /* ignore this wave */ }
 
                    Equal => {
 
                        awaiting.remove(&index);
 
                        if awaiting.is_empty() {
 
                            if let Some(p) = parent {
 
                                // return the echo to my parent
 
                                em.send_to(p, &S(LeaderEcho { maybe_leader }))?;
 
                            } else {
 
                                // wave completed!
 
                                break 'echo_loop;
 
                            }
 
                        }
 
                    }
 
                    Greater => {
 
                        // join new echo
 
                        log!(logger, "Setting leader to index {:?}", index);
 
                        parent = Some(index);
 
                        my_leader = maybe_leader;
 
                        let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                        awaiting.clear();
 
                        if em.endpoint_exts.len() == 1 {
 
                            // immediately reply to parent
 
                            log!(logger, "replying echo to parent {:?} immediately", index);
 
                            em.send_to(index, &echo)?;
 
                        } else {
 
                            for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
                                if index2 == index {
 
                                    // don't propagate echo to my parent
 
                                    continue;
 
                                }
 
                                log!(logger, "repeating echo {:?} to {:?}", &echo, index2);
 
                                ee.endpoint.send(&echo)?;
 
                                awaiting.insert(index2);
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 
            inappropriate_msg => {
 
                log!(logger, "delaying msg {:?} during echo phase", inappropriate_msg);
 
                em.delayed_messages.push((index, inappropriate_msg))
 
            }
 
        }
 
    }
 
    match parent {
 
        None => assert_eq!(
 
            my_leader, controller_id,
 
            "I've got no parent, but I consider {:?} the leader?",
 
            my_leader
 
        ),
 
        Some(parent) => assert_ne!(
 
            my_leader, controller_id,
 
            "I have {:?} as parent, but I consider myself ({:?}) the leader?",
 
            parent, controller_id
 
        ),
 
    }
 
    log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader);
 

	
 
    // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
    //    in this loop, every node sends 1 message to each neighbor
 
    //    await 1 message from all non-parents.
 
    let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
        let msg = if Some(index) == parent {
 
            &S(YouAreMyParent)
 
        } else {
 
            awaiting.insert(index);
 
            &msg_for_non_parents
 
        };
 
        log!(logger, "ANNOUNCING to {:?} {:?}", index, msg);
 
        ee.endpoint.send(msg)?;
 
    }
 
    let mut children = Vec::default();
 
    em.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(logger, "awaiting {:?}", &awaiting);
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        match msg {
 
            S(YouAreMyParent) => {
 
                assert!(awaiting.remove(&index));
 
                children.push(index);
 
            }
 
            S(LeaderAnnounce { leader }) => {
 
                assert!(awaiting.remove(&index));
 
                assert!(leader == my_leader);
 
                assert!(Some(index) != parent);
 
                // they wouldn't send me this if they considered me their parent
 
            }
 
            inappropriate_msg => {
 
                log!(logger, "delaying msg {:?} during echo-reply phase", inappropriate_msg);
 
                em.delayed_messages.push((index, inappropriate_msg));
 
            }
 
        }
 
    }
 
    children.sort();
 
    children.dedup();
 
    Ok(Neighborhood { parent, children })
 
}
0 comments (0 inline, 0 general)