Changeset - 5613956b43b0
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-09-17 18:16:41
christopher.esterhuyse@gmail.com
refactor underway (safe state). TODO: immutable connector state until round succeeds
5 files changed with 250 insertions and 231 deletions:
0 comments (0 inline, 0 general)
src/macros.rs
Show inline comments
 
@@ -8,17 +8,17 @@ macro_rules! log {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
    (@MARK, $logger:expr, $($arg:tt)*) => {{
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // ignore
 
    }};
 
    ($logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -28,25 +28,24 @@ struct NativeBranch {
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    state: ComponentState,
 
    inner: ProtoComponentBranchInner,
 
    ended: bool,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
}
 
@@ -126,31 +125,30 @@ impl Connector {
 
        // returns index of new batch
 
        let comm = self.get_comm_mut().ok_or(WrongStateError)?;
 
        comm.native_batches.push(Default::default());
 
        Ok(comm.native_batches.len() - 1)
 
    }
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError as Poe;
 
        let Self { unphased: cu, phased } = self;
 
        if !cu.inner.native_ports.contains(&port) {
 
        let info = cu.inner.current_state.port_info.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        if info.owner != cu.inner.native_component_id {
 
            return Err(Poe::PortUnavailable);
 
        }
 
        match cu.inner.port_info.polarities.get(&port) {
 
            Some(p) if *p == expect_polarity => {}
 
            Some(_) => return Err(Poe::WrongPolarity),
 
            None => return Err(Poe::UnknownPolarity),
 
        if info.polarity != expect_polarity {
 
            return Err(Poe::WrongPolarity);
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(Poe::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant
 
                Ok(batch)
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Putter)?;
 
@@ -208,49 +206,44 @@ impl Connector {
 
        log!(
 
            cu.inner.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 
        log!(@BENCH, cu.inner.logger, "");
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // iterate
 
        let mut branching_proto_components =
 
            HashMap::<ComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ComponentId, ProtoComponent)> = cu
 
        let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu
 
            .proto_components
 
            .iter()
 
            .map(|(&proto_id, proto)| (proto_id, proto.clone()))
 
            .collect();
 
        log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len());
 
        // drains unrun_components, and populates branching_proto_components.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            log!(
 
                cu.inner.logger,
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let mut ctx = NonsyncProtoContext {
 
                cu_inner: &mut cu.inner,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
                proto_component_ports: &mut cu
 
                    .proto_components
 
                    .get_mut(&proto_component_id)
 
                    .unwrap() // unrun_components' keys originate from proto_components
 
                    .ports,
 
            };
 
            let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description);
 
            let blocker = component.nonsync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.inner.logger,
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => {
 
                    branching_proto_components
 
@@ -274,80 +267,96 @@ impl Connector {
 
                    .neighborhood
 
                    .children
 
                    .iter()
 
                    .map(|&index| SubtreeId::NetEndpoint { index });
 
                let subtree_id_iter = n.chain(c).chain(e);
 
                log!(
 
                    cu.inner.logger,
 
                    "Children in subtree are: {:?}",
 
                    subtree_id_iter.clone().collect::<Vec<_>>()
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.inner.id_manager.new_spec_var_stream(),
 
            spec_var_stream: cu.inner.current_state.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.inner.logger, "Round context structure initialized");
 
        log!(@BENCH, cu.inner.logger, "");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.inner.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.inner.logger, "Native branch spec var is {:?}", native_spec_var);
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // assign trues for ports that fire
 
                let firing_ports: HashSet<PortId> =
 
                    to_get.iter().chain(to_put.keys()).copied().collect();
 
                for &port in to_get.iter().chain(to_put.keys()) {
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                // all firing ports have SpecVal::FIRING
 
                let firing_iter = to_get.iter().chain(to_put.keys()).copied();
 

	
 
                log!(
 
                    cu.inner.logger,
 
                    "New native with firing ports {:?}",
 
                    firing_iter.clone().collect::<Vec<_>>()
 
                );
 
                let firing_ports: HashSet<PortId> = firing_iter.clone().collect();
 
                for port in firing_iter {
 
                    let var = cu.inner.current_state.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // assign falses for all silent (not firing) ports
 
                for &port in cu.inner.native_ports.difference(&firing_ports) {
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                // all silent ports have SpecVal::SILENT
 
                for (port, port_info) in cu.inner.current_state.port_info.iter() {
 
                    if port_info.owner != cu.inner.native_component_id {
 
                        // not my port
 
                        continue;
 
                    }
 
                    if firing_ports.contains(port) {
 
                        // this one is FIRING
 
                        continue;
 
                    }
 
                    let var = cu.inner.current_state.spec_var_for(*port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                // this branch is consistent. distinguish it with a unique var:val mapping and proceed
 
                predicate.inserted(native_spec_var, branch_spec_val)
 
            };
 
            log!(
 
                cu.inner.logger,
 
                "Native branch index={:?} has consistent {:?}",
 
                index,
 
                &predicate
 
            );
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(
 
                    cu.inner.logger,
 
                    "Native branch {} sending msg {:?} with putter {:?}",
 
                    index,
 
                    &msg,
 
                    putter
 
                );
 
                // sanity check
 
                assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity);
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if branch.is_ended() {
 
                log!(
 
                    cu.inner.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.inner.logger,
 
@@ -449,38 +458,32 @@ impl Connector {
 
        }
 
        let mut pcb_temps_owner = <[HashMap<Predicate, ProtoComponentBranch>; 3]>::default();
 
        let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner);
 
        let mut bn_temp_owner = <HashMap<Predicate, NativeBranch>>::default();
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.inner.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let BranchingProtoComponent { branches } = proto_component;
 
            // must reborrow to constrain the lifetime of pcb_temps to inside the loop
 
            let (swap, pcb_temps) = pcb_temps.reborrow().split_first_mut();
 
            let (blocked, _pcb_temps) = pcb_temps.split_first_mut();
 
            // initially, no components have .ended==true
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, swap.0, blocked.0);
 
            BranchingProtoComponent::drain_branches_to_blocked(
 
                cd,
 
                cu,
 
                rctx,
 
                proto_component_id,
 
                ports,
 
            )?;
 
            BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
            // swap the blocked branches back
 
            std::mem::swap(blocked.0, branches);
 
            if branches.is_empty() {
 
                log!(cu.inner.logger, "{:?} has become inconsistent!", proto_component_id);
 
                if let Some(parent) = comm.neighborhood.parent {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.inner.logger, "Already requested failure");
 
                    }
 
                } else {
 
                    log!(cu.inner.logger, "As the leader, deciding on timeout");
 
@@ -491,63 +494,63 @@ impl Connector {
 
        log!(cu.inner.logger, "All proto components are blocked");
 

	
 
        log!(cu.inner.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            log!(
 
                cu.inner.logger,
 
                "Decision loop! have {} messages to recv",
 
                rctx.getter_buffer.len()
 
            );
 
            while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() {
 
                log!(@MARK, cu.inner.logger, "handling payload msg");
 
                assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                let route = cu.inner.port_info.routes.get(&getter);
 
                log!(@MARK, cu.inner.logger, "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg);
 
                let getter_info = cu.inner.current_state.port_info.get(&getter).unwrap();
 
                let cid = getter_info.owner;
 
                assert_eq!(Getter, getter_info.polarity);
 
                log!(
 
                    cu.inner.logger,
 
                    "Routing msg {:?} to {:?} via {:?}",
 
                    &send_payload_msg,
 
                    getter,
 
                    &route
 
                    &getter_info.route
 
                );
 
                match route {
 
                    None => log!(cu.inner.logger, "Delivery failed. Physical route unmapped!"),
 
                    Some(Route::UdpEndpoint { index }) => {
 
                match getter_info.route {
 
                    Route::UdpEndpoint { index } => {
 
                        let udp_endpoint_ext =
 
                            &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[*index];
 
                            &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[index];
 
                        let SendPayloadMsg { predicate, payload } = send_payload_msg;
 
                        log!(cu.inner.logger, "Delivering to udp endpoint index={}", index);
 
                        udp_endpoint_ext.outgoing_payloads.insert(predicate, payload);
 
                    }
 
                    Some(Route::NetEndpoint { index }) => {
 
                    Route::NetEndpoint { index } => {
 
                        log!(@MARK, cu.inner.logger, "sending payload");
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(*index, &msg)?;
 
                        comm.endpoint_manager.send_to_comms(index, &msg)?;
 
                    }
 
                    Some(Route::LocalComponent(cid)) if *cid == cu.inner.native_component_id => {
 
                    Route::LocalComponent if cid == cu.inner.native_component_id => {
 
                        branching_native.feed_msg(
 
                            cu,
 
                            rctx,
 
                            getter,
 
                            &send_payload_msg,
 
                            MapTempGuard::new(&mut bn_temp_owner),
 
                        )
 
                    }
 
                    Some(Route::LocalComponent(cid)) => {
 
                        if let Some(branching_component) = branching_proto_components.get_mut(cid) {
 
                            let cid = *cid;
 
                    Route::LocalComponent => {
 
                        if let Some(branching_component) = branching_proto_components.get_mut(&cid)
 
                        {
 
                            branching_component.feed_msg(
 
                                cu,
 
                                rctx,
 
                                cid,
 
                                getter,
 
                                &send_payload_msg,
 
                                pcb_temps.reborrow(),
 
                            )?;
 
                            if branching_component.branches.is_empty() {
 
                                log!(cu.inner.logger, "{:?} has become inconsistent!", cid);
 
                                if let Some(parent) = comm.neighborhood.parent {
 
                                    if already_requested_failure.replace_with_true() {
 
@@ -557,25 +560,25 @@ impl Connector {
 
                                    }
 
                                } else {
 
                                    log!(cu.inner.logger, "As the leader, deciding on timeout");
 
                                    return Ok(Decision::Failure);
 
                                }
 
                            }
 
                        } else {
 
                            log!(
 
                                cu.inner.logger,
 
                                "Delivery to getter {:?} msg {:?} failed because {:?} isn't here",
 
                                getter,
 
                                &send_payload_msg,
 
                                proto_component_id
 
                                cid
 
                            );
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            // check if we have a solution yet
 
            log!(cu.inner.logger, "Check if we have any local decisions...");
 
            for solution in rctx.solution_storage.iter_new_local_make_old() {
 
                log!(cu.inner.logger, "New local decision with solution {:?}...", &solution);
 
                log!(@MARK, cu.inner.logger, "local solution");
 
                match comm.neighborhood.parent {
 
@@ -596,25 +599,25 @@ impl Connector {
 
                    }
 
                }
 
            }
 

	
 
            // stuck! make progress by receiving a msg
 
            // try recv messages arriving through endpoints
 
            log!(cu.inner.logger, "No decision yet. Let's recv an endpoint msg...");
 
            {
 
                let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm
 
                    .endpoint_manager
 
                    .try_recv_any_comms(
 
                    &mut *cu.inner.logger,
 
                    &cu.inner.port_info,
 
                    &cu.inner.current_state,
 
                    rctx,
 
                    comm.round_index,
 
                )? {
 
                    CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg),
 
                    CommRecvOk::NewPayloadMsgs => continue 'undecided,
 
                    CommRecvOk::TimeoutWithoutNew => {
 
                        log!(cu.inner.logger, "Reached user-defined deadling without decision...");
 
                        if let Some(parent) = comm.neighborhood.parent {
 
                            if already_requested_failure.replace_with_true() {
 
                                Self::request_failure(cu, comm, parent)?
 
                            } else {
 
                                log!(cu.inner.logger, "Already requested failure");
 
@@ -716,32 +719,32 @@ impl NativeBranch {
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        round_ctx: &mut RoundCtx,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
        bn_temp: MapTempGuard<'_, Predicate, NativeBranch>,
 
    ) {
 
        log!(cu.inner.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter));
 
        assert_eq!(Getter, cu.inner.current_state.port_info.get(&getter).unwrap().polarity);
 
        let mut draining = bn_temp;
 
        let finished = &mut self.branches;
 
        std::mem::swap(draining.0, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.inner.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = cu.inner.port_info.spec_var_for(getter);
 
            let var = cu.inner.current_state.spec_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                branch.to_get.remove(&getter);
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                if branch.is_ended() {
 
                    log!(
 
                        cu.inner.logger,
 
                        "new native solution with {:?} is_ended() with gotten {:?}",
 
                        &predicate,
 
                        &branch.gotten
 
                    );
 
                    let subtree_id = SubtreeId::LocalComponent(cu.inner.native_component_id);
 
@@ -862,25 +865,24 @@ impl BranchingNative {
 
                return RoundOk { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut ConnectorUnphased,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ComponentId,
 
        ports: &HashSet<PortId>,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        cd.cyclic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                cu_inner: &mut cu.inner,
 
                predicate: &predicate,
 
                branch_inner: &mut branch.inner,
 
            };
 
            let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.inner.logger,
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                proto_component_id,
 
@@ -897,109 +899,102 @@ impl BranchingProtoComponent {
 
                        let mut branch_n = branch.clone();
 
                        branch_n.inner.untaken_choice = Some(val.0);
 
                        drainer.add_input(pred, branch_n);
 
                    }
 
                }
 
                B::CouldntReadMsg(port) => {
 
                    // move to "blocked"
 
                    assert!(!branch.inner.inbox.contains_key(&port));
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                    let var = cu.inner.current_state.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Some(&Putter), cu.inner.port_info.polarities.get(&putter));
 
                    assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity);
 
                    // overwrite assignment
 
                    let var = cu.inner.port_info.spec_var_for(putter);
 
                    let var = cu.inner.current_state.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!",
 
                            proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        branch.inner.did_put_or_get.insert(putter);
 
                        log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})",
 
                            proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.getter_buffer.putter_add(cu, putter, msg);
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for port in ports.iter() {
 
                        let var = cu.inner.port_info.spec_var_for(*port);
 
                    for (port, port_info) in cu.inner.current_state.port_info.iter() {
 
                        if port_info.owner != proto_component_id {
 
                            continue;
 
                        }
 
                        let var = cu.inner.current_state.spec_var_for(*port);
 
                        let should_have_fired = branch.inner.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let did_fire = val == SpecVal::FIRING;
 
                        if did_fire != should_have_fired {
 
                            log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}"
 
                            log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}",
 
                                port, var, val, did_fire, should_have_fired);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
                            return Ok(());
 
                        }
 
                    }
 
                    // submit solution for this component
 
                    let subtree_id = SubtreeId::LocalComponent(proto_component_id);
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.inner.logger,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // move to "blocked"
 
                    drainer.add_output(predicate, branch);
 
                }
 
            }
 
            Ok(())
 
        })
 
    }
 
    // fn branch_merge_func(
 
    //     mut a: ProtoComponentBranch,
 
    //     b: &mut ProtoComponentBranch,
 
    // ) -> ProtoComponentBranch {
 
    //     if b.ended && !a.ended {
 
    //         a.ended = true;
 
    //         std::mem::swap(&mut a, b);
 
    //     }
 
    //     a
 
    // }
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ComponentId,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
        pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        let logger = &mut *cu.inner.logger;
 
        log!(
 
            logger,
 
            "feeding proto component {:?} getter {:?} {:?}",
 
            proto_component_id,
 
            getter,
 
            &send_payload_msg
 
        );
 
        let BranchingProtoComponent { branches, ports } = self;
 
        let BranchingProtoComponent { branches } = self;
 
        let (mut unblocked, pcb_temps) = pcb_temps.split_first_mut();
 
        let (mut blocked, pcb_temps) = pcb_temps.split_first_mut();
 
        // partition drain from branches -> {unblocked, blocked}
 
        log!(logger, "visiting {} blocked branches...", branches.len());
 
        for (predicate, mut branch) in branches.drain() {
 
            if branch.ended {
 
                log!(logger, "Skipping ended branch with {:?}", &predicate);
 
                Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                continue;
 
            }
 
            use AssignmentUnionResult as Aur;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
@@ -1029,31 +1024,25 @@ impl BranchingProtoComponent {
 
                    log!(logger, "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                    Self::insert_branch_merging(&mut unblocked, predicate2, branch2);
 
                }
 
            }
 
        }
 
        log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let (swap, _pcb_temps) = pcb_temps.split_first_mut();
 
        let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0);
 
        BranchingProtoComponent::drain_branches_to_blocked(
 
            cd,
 
            cu,
 
            rctx,
 
            proto_component_id,
 
            ports,
 
        )?;
 
        BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
        // swap the blocked branches back
 
        std::mem::swap(blocked.0, branches);
 
        log!(cu.inner.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, ProtoComponentBranch>,
 
        predicate: Predicate,
 
        mut branch: ProtoComponentBranch,
 
    ) {
 
        let e = branches.entry(predicate);
 
        use std::collections::hash_map::Entry;
 
@@ -1064,37 +1053,37 @@ impl BranchingProtoComponent {
 
            }
 
            Entry::Occupied(mut eo) => {
 
                // Oh dear, there is already a branch with this predicate.
 
                // Rather than choosing either branch, we MERGE them.
 
                // This means keeping the existing one in-place, and giving it the UNION of the inboxes
 
                let old = eo.get_mut();
 
                for (k, v) in branch.inner.inbox.drain() {
 
                    old.inner.inbox.insert(k, v);
 
                }
 
            }
 
        }
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ComponentState {
 
        let BranchingProtoComponent { branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && branch_predicate.assigns_subset(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
                return state;
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
    fn initial(state: ComponentState) -> Self {
 
        let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch } }
 
        Self { branches: hashmap! { Predicate::default() => branch } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(subtree_ids: impl Iterator<Item = SubtreeId>) -> Self {
 
        let mut subtree_id_to_index: HashMap<SubtreeId, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for id in subtree_ids {
 
            subtree_id_to_index.insert(id, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
@@ -1188,35 +1177,36 @@ impl SolutionStorage {
 
}
 
impl GetterBuffer {
 
    fn len(&self) -> usize {
 
        self.getters_and_sends.len()
 
    }
 
    fn pop(&mut self) -> Option<(PortId, SendPayloadMsg)> {
 
        self.getters_and_sends.pop()
 
    }
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getters_and_sends.push((getter, msg));
 
    }
 
    fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(&getter) = cu.inner.port_info.peers.get(&putter) {
 
        if let Some(getter) = cu.inner.current_state.port_info.get(&putter).unwrap().peer {
 
            log!(cu.inner.logger, "Putter add (putter:{:?} => getter:{:?})", putter, getter);
 
            self.getter_add(getter, msg);
 
        } else {
 
            log!(cu.inner.logger, "Putter {:?} has no known peer!", putter);
 
            panic!("Putter {:?} has no known peer!");
 
        }
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.cu_inner.port_info.spec_var_for(port);
 
        let var = self.cu_inner.current_state.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.branch_inner.did_put_or_get.insert(port);
 
        self.branch_inner.inbox.get(&port)
 
    }
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.branch_inner.untaken_choice.take()
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
    fn add_input(&mut self, k: K, v: V) {
 
@@ -1228,49 +1218,63 @@ impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
}
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        log!(
 
            self.cu_inner.logger,
 
            "Component {:?} added new component with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            &state,
 
            &moved_ports
 
        );
 
        println!("MOVED PORTS {:#?}. got {:#?}", &moved_ports, &self.proto_component_ports);
 
        assert!(self.proto_component_ports.is_superset(&moved_ports));
 
        // 2. remove ports from old component & update port->route
 
        let new_id = self.cu_inner.id_manager.new_component_id();
 
        println!("MOVED PORTS {:#?}", &moved_ports);
 
        // sanity check
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.cu_inner.current_state.port_info.get(port).unwrap().owner
 
            );
 
        }
 
        // 2. create new component
 
        let new_cid = self.cu_inner.current_state.id_manager.new_component_id();
 
        self.unrun_components.push((new_cid, state));
 
        // 3. update ownership of moved ports
 
        for port in moved_ports.iter() {
 
            self.proto_component_ports.remove(port);
 
            self.cu_inner.port_info.routes.insert(*port, Route::LocalComponent(new_id));
 
            self.cu_inner.current_state.port_info.get_mut(port).unwrap().owner = new_cid;
 
        }
 
        // 3. create a new component
 
        self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports }));
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let [o, i] =
 
            [self.cu_inner.id_manager.new_port_id(), self.cu_inner.id_manager.new_port_id()];
 
        self.proto_component_ports.insert(o);
 
        self.proto_component_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.cu_inner.port_info.polarities.insert(o, Putter);
 
        self.cu_inner.port_info.polarities.insert(i, Getter);
 
        self.cu_inner.port_info.peers.insert(o, i);
 
        self.cu_inner.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(self.proto_component_id);
 
        self.cu_inner.port_info.routes.insert(o, route);
 
        self.cu_inner.port_info.routes.insert(i, route);
 
        let mut new_cid_fn = || self.cu_inner.current_state.id_manager.new_port_id();
 
        let [o, i] = [new_cid_fn(), new_cid_fn()];
 
        self.cu_inner.current_state.port_info.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                polarity: Putter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        self.cu_inner.current_state.port_info.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                polarity: Getter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        log!(
 
            self.cu_inner.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
 
            o,
 
            i
 
        );
 
        [o, i]
 
    }
 
}
 
impl ProtoComponentBranch {
 
    fn feed_msg(&mut self, getter: PortId, payload: Payload) {
src/runtime/endpoints.rs
Show inline comments
 
@@ -143,25 +143,25 @@ impl EndpointManager {
 
        }
 
    }
 

	
 
    // drops all Setup messages,
 
    // buffers all future round messages,
 
    // drops all previous round messages,
 
    // enqueues all current round SendPayload messages using round_ctx.getter_add
 
    // returns the first comm_ctrl_msg encountered
 
    // only polls until SOME message is enqueued
 
    pub(super) fn try_recv_any_comms(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        port_info: &PortInfo,
 
        current_state: &CurrentState,
 
        round_ctx: &mut impl RoundCtxTrait,
 
        round_index: usize,
 
    ) -> Result<CommRecvOk, UnrecoverableSyncError> {
 
        ///////////////////////////////////////////
 
        impl EndpointManager {
 
            fn handle_msg(
 
                &mut self,
 
                logger: &mut dyn Logger,
 
                round_ctx: &mut impl RoundCtxTrait,
 
                net_index: usize,
 
                msg: Msg,
 
                round_index: usize,
 
@@ -234,25 +234,25 @@ impl EndpointManager {
 
                    return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
                }
 
            }
 
            // try receive a udp message
 
            let recv_buffer = self.udp_in_buffer.as_mut_slice();
 
            while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() {
 
                let ee = &mut self.udp_endpoint_store.endpoint_exts[index];
 
                if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() {
 
                    // I received a payload!
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    if !ee.received_this_round {
 
                        let payload = Payload::from(&recv_buffer[..bytes_written]);
 
                        let port_spec_var = port_info.spec_var_for(ee.getter_for_incoming);
 
                        let port_spec_var = current_state.spec_var_for(ee.getter_for_incoming);
 
                        let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING);
 
                        round_ctx.getter_add(
 
                            ee.getter_for_incoming,
 
                            SendPayloadMsg { payload, predicate },
 
                        );
 
                        some_message_enqueued = true;
 
                        ee.received_this_round = true;
 
                    } else {
 
                        // lose the message!
 
                    }
 
                }
 
            }
src/runtime/mod.rs
Show inline comments
 
@@ -20,28 +20,32 @@ pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 
pub trait Logger: Debug + Send + Sync {
 
    fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>;
 
}
 
#[derive(Debug)]
 
pub struct VecLogger(ConnectorId, Vec<u8>);
 
#[derive(Debug)]
 
pub struct DummyLogger;
 
#[derive(Debug)]
 
pub struct FileLogger(ConnectorId, std::fs::File);
 
#[derive(Debug)]
 
struct CurrentState {
 
    port_info: HashMap<PortId, PortInfo>,
 
    id_manager: IdManager,
 
}
 
pub(crate) struct NonsyncProtoContext<'a> {
 
    cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    proto_component_ports: &'a mut HashSet<PortId>, // sub-structure of component
 
    unrun_components: &'a mut Vec<(ComponentId, ProtoComponent)>, // lives for Nonsync phase
 
    unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase
 
    proto_component_id: ComponentId,          // KEY in id->component map
 
}
 
pub(crate) struct SyncProtoContext<'a> {
 
    cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch
 
    predicate: &'a Predicate,                 // KEY in pred->branch map
 
}
 
#[derive(Default, Debug, Clone)]
 
struct ProtoComponentBranchInner {
 
    untaken_choice: Option<u16>,
 
    did_put_or_get: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
@@ -57,63 +61,64 @@ struct SpecVal(u16);
 
#[derive(Debug)]
 
struct RoundOk {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 
#[derive(Default)]
 
struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum Route {
 
    LocalComponent(ComponentId),
 
    LocalComponent,
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum SubtreeId {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
    owner: ComponentId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
enum Decision {
 
    Failure,
 
    Success(Predicate),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: PortInfo,
 
    port_info: HashMap<PortId, PortInfo>,
 
    endpoint_incoming_to_getter: Vec<PortId>,
 
    proto_components: HashMap<ComponentId, ProtoComponent>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
}
 
#[derive(Debug, Clone)]
 
struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct CommMsg {
 
    round_index: usize,
 
    contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    CommCtrl(CommCtrlMsg),
 
@@ -131,29 +136,24 @@ struct SendPayloadMsg {
 
#[derive(Debug, PartialEq)]
 
enum AssignmentUnionResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 
struct NetEndpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
#[derive(Debug, Clone)]
 
struct NetEndpointSetup {
 
    getter_for_incoming: PortId,
 
    sock_addr: SocketAddr,
 
    endpoint_polarity: EndpointPolarity,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct UdpEndpointSetup {
 
    getter_for_incoming: PortId,
 
    local_addr: SocketAddr,
 
    peer_addr: SocketAddr,
 
@@ -198,51 +198,50 @@ struct EndpointManager {
 
    events: Events,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
}
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    owners: HashMap<PortId, ComponentId>,
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
    owner: ComponentId,
 
    peer: Option<PortId>,
 
    polarity: Polarity,
 
    route: Route,
 
}
 

	
 
#[derive(Debug)]
 
struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundOk>, SyncError>,
 
}
 
#[derive(Debug)]
 
struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ComponentId, ProtoComponent>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
    inner: ConnectorUnphasedInner,
 
}
 
#[derive(Debug)]
 
struct ConnectorUnphasedInner {
 
    logger: Box<dyn Logger>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    port_info: PortInfo,
 
    current_state: CurrentState,
 
    native_component_id: ComponentId,
 
}
 
#[derive(Debug)]
 
struct ConnectorSetup {
 
    net_endpoint_setups: Vec<NetEndpointSetup>,
 
    udp_endpoint_setups: Vec<UdpEndpointSetup>,
 
}
 
#[derive(Debug)]
 
enum ConnectorPhased {
 
    Setup(Box<ConnectorSetup>),
 
    Communication(Box<ConnectorCommunication>),
 
}
 
@@ -316,29 +315,30 @@ impl<T: std::cmp::Ord> VecSet<T> {
 
                self.vec.insert(index, element);
 
                true
 
            }
 
        }
 
    }
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
    fn pop(&mut self) -> Option<T> {
 
        self.vec.pop()
 
    }
 
}
 
impl PortInfo {
 
impl CurrentState {
 
    fn spec_var_for(&self, port: PortId) -> SpecVar {
 
        SpecVar(match self.polarities.get(&port).unwrap() {
 
        let info = self.port_info.get(&port).unwrap();
 
        SpecVar(match info.polarity {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
            Putter => info.peer.unwrap(),
 
        })
 
    }
 
}
 
impl SpecVarStream {
 
    fn next(&mut self) -> SpecVar {
 
        let phantom_port: PortId =
 
            Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }
 
                .into();
 
        SpecVar(phantom_port)
 
    }
 
}
 
impl IdManager {
 
@@ -394,73 +394,80 @@ impl Connector {
 
        }
 
    }
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.inner.logger, &mut new_logger);
 
        new_logger
 
    }
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.inner.logger
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let [o, i] = [cu.inner.id_manager.new_port_id(), cu.inner.id_manager.new_port_id()];
 
        cu.inner.native_ports.insert(o);
 
        cu.inner.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        cu.inner.port_info.polarities.insert(o, Putter);
 
        cu.inner.port_info.polarities.insert(i, Getter);
 
        cu.inner.port_info.peers.insert(o, i);
 
        cu.inner.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(cu.inner.native_component_id);
 
        cu.inner.port_info.routes.insert(o, route);
 
        cu.inner.port_info.routes.insert(i, route);
 
        let mut new_cid = || cu.inner.current_state.id_manager.new_port_id();
 
        let [o, i] = [new_cid(), new_cid()];
 
        cu.inner.current_state.port_info.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                owner: cu.inner.native_component_id,
 
                polarity: Putter,
 
            },
 
        );
 
        cu.inner.current_state.port_info.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                owner: cu.inner.native_component_id,
 
                polarity: Getter,
 
            },
 
        );
 
        log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError as Ace;
 
        // 1. check if this is OK
 
        let cu = &mut self.unphased;
 
        let polarities = cu.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(Ace::WrongNumberOfParamaters { expected: polarities.len() });
 
        let expected_polarities = cu.proto_description.component_polarities(identifier)?;
 
        if expected_polarities.len() != ports.len() {
 
            return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !cu.inner.native_ports.contains(port) {
 
                return Err(Ace::UnknownPort(*port));
 
        for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) {
 
            let info = cu.inner.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            if info.owner != cu.inner.native_component_id {
 
                return Err(Ace::UnknownPort(port));
 
            }
 
            if expected_polarity != *cu.inner.port_info.polarities.get(port).unwrap() {
 
                return Err(Ace::WrongPortPolarity { port: *port, expected_polarity });
 
            if info.polarity != expected_polarity {
 
                return Err(Ace::WrongPortPolarity { port, expected_polarity });
 
            }
 
        }
 
        // 3. remove ports from old component & update port->route
 
        let new_cid = cu.inner.id_manager.new_component_id();
 
        // 2. add new component
 
        let new_cid = cu.inner.current_state.id_manager.new_component_id();
 
        cu.proto_components
 
            .insert(new_cid, cu.proto_description.new_main_component(identifier, ports));
 
        // 3. update port ownership
 
        for port in ports.iter() {
 
            cu.inner.port_info.routes.insert(*port, Route::LocalComponent(new_cid));
 
            match cu.inner.current_state.port_info.get_mut(port) {
 
                Some(port_info) => port_info.owner = new_cid,
 
                None => unreachable!(),
 
            }
 
        }
 
        cu.inner.native_ports.retain(|port| !ports.contains(port));
 
        // 4. add new component
 
        cu.proto_components.insert(
 
            new_cid,
 
            ProtoComponent {
 
                state: cu.proto_description.new_main_component(identifier, ports),
 
                ports: ports.iter().copied().collect(),
 
            },
 
        );
 
        Ok(())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn singleton(k: SpecVar, v: SpecVal) -> Self {
 
        Self::default().inserted(k, v)
 
    }
 
    #[inline]
 
    pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self {
 
        self.assigned.insert(k, v);
 
        self
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
impl Connector {
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        let mut id_manager = IdManager::new(connector_id);
 
        let native_component_id = id_manager.new_component_id();
 
        Self {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                inner: ConnectorUnphasedInner {
 
                    logger,
 
                    native_component_id: id_manager.new_component_id(),
 
                    id_manager,
 
                    native_ports: Default::default(),
 
                    port_info: Default::default(),
 
                    native_component_id,
 
                    current_state: CurrentState { id_manager, port_info: Default::default() },
 
                },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
            })),
 
        }
 
    }
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
    /// 2. create port pair [p1, g1]
 
    /// 3. create udp component with interface of moved ports [p1, g0]
 
    /// 4. return [p0, g1]
 
    pub fn new_udp_mediator_component(
 
        &mut self,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<[PortId; 2], WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let mut npid = || cu.inner.id_manager.new_port_id();
 
                let udp_cid = cu.inner.current_state.id_manager.new_component_id();
 
                let mut npid = || cu.inner.current_state.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 
                cu.inner.native_ports.insert(nin);
 
                cu.inner.native_ports.insert(nout);
 
                cu.inner.port_info.polarities.insert(nin, Getter);
 
                cu.inner.port_info.polarities.insert(nout, Putter);
 
                cu.inner.port_info.polarities.insert(uin, Getter);
 
                cu.inner.port_info.polarities.insert(uout, Putter);
 
                cu.inner.port_info.peers.insert(nin, uout);
 
                cu.inner.port_info.peers.insert(nout, uin);
 
                cu.inner.port_info.peers.insert(uin, nout);
 
                cu.inner.port_info.peers.insert(uout, nin);
 
                cu.inner
 
                    .port_info
 
                    .routes
 
                    .insert(nin, Route::LocalComponent(cu.inner.native_component_id));
 
                cu.inner
 
                    .port_info
 
                    .routes
 
                    .insert(nout, Route::LocalComponent(cu.inner.native_component_id));
 
                cu.inner.port_info.routes.insert(uin, Route::UdpEndpoint { index: udp_index });
 
                cu.inner.port_info.routes.insert(uout, Route::UdpEndpoint { index: udp_index });
 

	
 
                cu.inner.current_state.port_info.insert(
 
                    nin,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Getter,
 
                        peer: Some(uout),
 
                        owner: cu.inner.native_component_id,
 
                    },
 
                );
 
                cu.inner.current_state.port_info.insert(
 
                    nout,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: cu.inner.native_component_id,
 
                    },
 
                );
 
                cu.inner.current_state.port_info.insert(
 
                    uin,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Getter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.inner.current_state.port_info.insert(
 
                    uout,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    getter_for_incoming: nin,
 
                });
 
                Ok([nout, nin])
 
            }
 
        }
 
    }
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let local_port = cu.inner.id_manager.new_port_id();
 
                cu.inner.native_ports.insert(local_port);
 
                // {polarity, route} known. {peer} unknown.
 
                cu.inner.port_info.polarities.insert(local_port, polarity);
 
                cu.inner
 
                    .port_info
 
                    .routes
 
                    .insert(local_port, Route::LocalComponent(cu.inner.native_component_id));
 
                let new_pid = cu.inner.current_state.id_manager.new_port_id();
 
                cu.inner.current_state.port_info.insert(
 
                    new_pid,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        peer: None,
 
                        owner: cu.inner.native_component_id,
 
                        polarity,
 
                    },
 
                );
 
                log!(
 
                    cu.inner.logger,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    local_port,
 
                    new_pid,
 
                    polarity,
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
                    getter_for_incoming: local_port,
 
                    getter_for_incoming: new_pid,
 
                });
 
                Ok(local_port)
 
                Ok(new_pid)
 
            }
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.inner.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
                log!(cu.inner.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.inner.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &mut cu.inner.port_info,
 
                    &mut cu.inner.current_state.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.inner.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len()
 
                    "Successfully connected {} endpoints. info now {:#?} {:#?}",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len(),
 
                    &cu.inner.current_state.port_info,
 
                    &endpoint_manager,
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    cu.inner.id_manager.connector_id,
 
                    cu.inner.current_state.id_manager.connector_id,
 
                    &mut *cu.inner.logger,
 
                    &mut endpoint_manager,
 
                    &deadline,
 
                )?;
 
                log!(cu.inner.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                };
 
                if cfg!(feature = "session_optimization") {
 
                    session_optimize(cu, &mut comm, &deadline)?;
 
                }
 
                log!(cu.inner.logger, "connect() finished. setup phase complete");
 
                self.phased = ConnectorPhased::Communication(Box::new(comm));
 
                *phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    net_endpoint_setups: &[NetEndpointSetup],
 
    udp_endpoint_setups: &[UdpEndpointSetup],
 
    port_info: &mut PortInfo,
 
    port_info: &mut HashMap<PortId, PortInfo>,
 
    deadline: &Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(300);
 
    struct WakerState {
 
        continue_signal: AtomicBool,
 
        waker: mio::Waker,
 
    }
 
    impl WakerState {
 
@@ -384,32 +405,32 @@ fn new_endpoint_manager(
 
                        // event wasn't ERROR
 
                        if net_connect_retry_later.contains(&index) {
 
                            // spurious wakeup. already scheduled to retry connect later
 
                            continue;
 
                        }
 
                        if !setup_incomplete.contains(&token_target) {
 
                            // spurious wakeup. this endpoint has already been completed!
 
                            if event.is_readable() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            continue;
 
                        }
 
                        let local_polarity = *port_info
 
                            .polarities
 
                            .get(&net_todo.endpoint_setup.getter_for_incoming)
 
                        let local_info = port_info
 
                            .get_mut(&net_todo.endpoint_setup.getter_for_incoming)
 
                            .unwrap();
 
                        if event.is_writable() && !net_todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
                            let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                                polarity: local_polarity,
 
                                owner: local_info.owner,
 
                                polarity: local_info.polarity,
 
                                port: net_todo.endpoint_setup.getter_for_incoming,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg)
 
                                .map_err(|e| {
 
                                    Ce::NetEndpointSetupError(
 
                                        net_endpoint.stream.local_addr().unwrap(),
 
                                        e,
 
                                    )
 
                                })
 
                                .unwrap();
 
                            log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
@@ -426,54 +447,39 @@ fn new_endpoint_manager(
 
                            if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            match maybe_msg {
 
                                None => {} // msg deserialization incomplete
 
                                Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                    log!(
 
                                        logger,
 
                                        "endpoint[{}] got peer info {:?}",
 
                                        index,
 
                                        peer_info
 
                                    );
 
                                    if peer_info.polarity == local_polarity {
 
                                    if peer_info.polarity == local_info.polarity {
 
                                        return Err(ConnectError::PortPeerPolarityMismatch(
 
                                            net_todo.endpoint_setup.getter_for_incoming,
 
                                        ));
 
                                    }
 
                                    net_todo.recv_peer_port = Some(peer_info.port);
 
                                    // 1. finally learned the peer of this port!
 
                                    port_info.peers.insert(
 
                                        net_todo.endpoint_setup.getter_for_incoming,
 
                                        peer_info.port,
 
                                    );
 
                                    local_info.peer = Some(peer_info.port);
 
                                    // 2. learned the info of this peer port
 
                                    port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                    port_info.peers.insert(
 
                                        peer_info.port,
 
                                        net_todo.endpoint_setup.getter_for_incoming,
 
                                    );
 
                                    if let Some(route) = port_info.routes.get(&peer_info.port) {
 
                                        // check just for logging purposes
 
                                        log!(
 
                                            logger,
 
                                            "Special case! Route to peer {:?} already known to be {:?}. Leave untouched",
 
                                            peer_info.port,
 
                                            route
 
                                        );
 
                                    }
 
                                    port_info
 
                                        .routes
 
                                        .entry(peer_info.port)
 
                                        .or_insert(Route::NetEndpoint { index });
 
                                    port_info.entry(peer_info.port).or_insert(PortInfo {
 
                                        peer: Some(net_todo.endpoint_setup.getter_for_incoming),
 
                                        polarity: peer_info.polarity,
 
                                        owner: peer_info.owner,
 
                                        route: Route::NetEndpoint { index },
 
                                    });
 
                                }
 
                                Some(inappropriate_msg) => {
 
                                    log!(
 
                                        logger,
 
                                        "delaying msg {:?} during channel setup phase",
 
                                        inappropriate_msg
 
                                    );
 
                                    delayed_messages.push((index, inappropriate_msg));
 
                                }
 
                            }
 
                        }
 
                        // is the setup for this net_endpoint now complete?
 
@@ -796,36 +802,36 @@ fn session_optimize(
 
            msg @ Msg::CommMsg(..) => {
 
                log!(cu.inner.logger, "delaying msg {:?} during session optimization", msg);
 
                comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    log!(
 
        cu.inner.logger,
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    let my_session_info = SessionInfo {
 
        port_info: cu.inner.port_info.clone(),
 
        port_info: cu.inner.current_state.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
        endpoint_incoming_to_getter: comm
 
            .endpoint_manager
 
            .net_endpoint_store
 
            .endpoint_exts
 
            .iter()
 
            .map(|ee| ee.getter_for_incoming)
 
            .collect(),
 
    };
 
    unoptimized_map.insert(cu.inner.id_manager.connector_id, my_session_info);
 
    unoptimized_map.insert(cu.inner.current_state.id_manager.connector_id, my_session_info);
 
    log!(
 
        cu.inner.logger,
 
        "Inserting my own info. Unoptimized subtree map is {:?}",
 
        &unoptimized_map
 
    );
 

	
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
        log!(cu.inner.logger, "Forwarding gathered info to parent {:?}", parent);
 
        let msg = S(Sm::SessionGather { unoptimized_map });
 
        comm.endpoint_manager.send_to_setup(parent, &msg)?;
 
@@ -862,26 +868,28 @@ fn session_optimize(
 
    } else {
 
        // by computing it myself
 
        log!(cu.inner.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(&mut *cu.inner.logger, unoptimized_map)?
 
    };
 
    log!(
 
        cu.inner.logger,
 
        "Optimized info map is {:?}. Sending to children {:?}",
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.inner.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    let optimized_info =
 
        optimized_map.get(&cu.inner.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    let optimized_info = optimized_map
 
        .get(&cu.inner.current_state.id_manager.connector_id)
 
        .expect("HEY NO INFO FOR ME?")
 
        .clone();
 
    let msg = S(Sm::SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    apply_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.inner.logger, "Session optimizations applied");
 
    Ok(())
 
}
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
@@ -892,25 +900,25 @@ fn leader_session_map_optimize(
 
fn apply_optimizations(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    let SessionInfo {
 
        proto_components,
 
        port_info,
 
        serde_proto_description,
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // TODO some info which should be read-only can be mutated with the current scheme
 
    cu.inner.port_info = port_info;
 
    cu.inner.current_state.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in comm
 
        .endpoint_manager
 
        .net_endpoint_store
 
        .endpoint_exts
 
        .iter_mut()
 
        .zip(endpoint_incoming_to_getter)
 
    {
 
        ee.getter_for_incoming = getter;
 
    }
 
    Ok(())
0 comments (0 inline, 0 general)