Changeset - ce6bcc0a0c26
[Not reviewed]
0 7 0
Christopher Esterhuyse - 5 years ago 2020-02-06 11:32:43
christopheresterhuyse@gmail.com
more examples: protocol alternator, natives alternating
7 files changed with 256 insertions and 33 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -87,440 +87,446 @@ impl Controller {
 
            let true_ekeys = puts.keys().chain(gets.iter()).copied();
 
            let true_channel_ids = true_ekeys.clone().map(ekey_to_channel_id);
 
            predicate.batch_assign_nones(true_channel_ids, true);
 

	
 
            // assign FALSE for all in interface not assigned true
 
            predicate.batch_assign_nones(all_channel_ids.clone(), false);
 

	
 
            if branches.contains_key(&predicate) {
 
                // TODO what do I do with redundant predicates?
 
                unimplemented!(
 
                    "Having multiple batches with the same
 
                    predicate requires the support of oracle boolean variables"
 
                )
 
            }
 
            let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index };
 
            for (ekey, payload) in puts {
 
                log!(
 
                    &mut self.inner.logger,
 
                    "... ... Initial native put msg {:?} pred {:?} batch {:?}",
 
                    &payload,
 
                    &predicate,
 
                    sync_batch_index,
 
                );
 
                let msg =
 
                    CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload }
 
                        .into_msg(*round_index);
 
                endpoint_exts.get_mut(ekey).unwrap().endpoint.send(msg)?;
 
            }
 
            log!(
 
                &mut self.inner.logger,
 
                "... Initial native branch (batch index={} with pred {:?}",
 
                sync_batch_index,
 
                &predicate
 
            );
 
            if branch.to_get.is_empty() {
 
                self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut self.inner.logger,
 
                    SubtreeId::PolyN,
 
                    predicate.clone(),
 
                );
 
            }
 
            branches.insert(predicate, branch);
 
        }
 
        Ok(PolyN { ekeys, branches })
 
    }
 

	
 
    // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent.
 
    // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches.
 
    pub fn sync_round(
 
        &mut self,
 
        deadline: Instant,
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        // TODO! fuse handle_locals_return_decision and end_round_return_decision
 

	
 
        assert!(self.ephemeral.is_clear());
 

	
 
        log!(
 
            &mut self.inner.logger,
 
            "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
            self.inner.round_index
 
        );
 

	
 
        // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
        //    Some actors are dropped. some new actors are created.
 
        //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
        log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.inner.mono_ps.len());
 
        self.ephemeral.poly_ps.clear();
 
        // let mut poly_ps: Vec<PolyP> = vec![];
 
        while let Some(mut mono_p) = self.inner.mono_ps.pop() {
 
            let mut m_ctx = MonoPContext {
 
                ekeys: &mut mono_p.ekeys,
 
                inner: &mut self.inner,
 
                // endpoint_exts: &mut self.endpoint_exts,
 
                // mono_ps: &mut self.mono_ps,
 
                // channel_id_stream: &mut self.channel_id_stream,
 
            };
 
            // cross boundary into crate::protocol
 
            let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
            log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
            match blocker {
 
                MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent),
 
                MonoBlocker::ComponentExit => drop(mono_p),
 
                MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()),
 
            }
 
        }
 
        log!(
 
            &mut self.inner.logger,
 
            "Finished running all MonoPs! Have {} PolyPs waiting",
 
            self.ephemeral.poly_ps.len()
 
        );
 

	
 
        // 3. define the mapping from ekey -> actor
 
        //    this is needed during the event loop to determine which actor
 
        //    should receive the incoming message.
 
        //    TODO: store and update this mapping rather than rebuilding it each round.
 
        let ekey_to_holder: HashMap<Key, PolyId> = {
 
            use PolyId::*;
 
            let n = self.inner.mono_n.iter().flat_map(|m| m.ekeys.iter().map(move |&e| (e, N)));
 
            let p = self
 
                .ephemeral
 
                .poly_ps
 
                .iter()
 
                .enumerate()
 
                .flat_map(|(index, m)| m.ekeys.iter().map(move |&e| (e, P { index })));
 
            n.chain(p).collect()
 
        };
 
        log!(
 
            &mut self.inner.logger,
 
            "SET OF PolyPs and MonoPs final! ekey lookup map is {:?}",
 
            &ekey_to_holder
 
        );
 

	
 
        // 4. Create the solution storage. it tracks the solutions of "subtrees"
 
        //    of the controller in the overlay tree.
 
        self.ephemeral.solution_storage.reset({
 
            let n = self.inner.mono_n.iter().map(|_| SubtreeId::PolyN);
 
            let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index });
 
            let c = self
 
                .inner
 
                .family
 
                .children_ekeys
 
                .iter()
 
                .map(|&ekey| SubtreeId::ChildController { ekey });
 
            let subtree_id_iter = n.chain(m).chain(c);
 
            log!(
 
                &mut self.inner.logger,
 
                "Solution Storage has subtree Ids: {:?}",
 
                &subtree_id_iter.clone().collect::<Vec<_>>()
 
            );
 
            subtree_id_iter
 
        });
 

	
 
        // 5. kick off the synchronous round of the native actor if it exists
 

	
 
        log!(&mut self.inner.logger, "Kicking off native's synchronous round...");
 
        assert_eq!(sync_batches.is_some(), self.inner.mono_n.is_some()); // TODO better err
 
        self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches {
 
            // using if let because of nested ? operator
 
            // TODO check that there are 1+ branches or NO SOLUTION
 
            let poly_n = self.kick_off_native(sync_batches)?;
 
            log!(
 
                &mut self.inner.logger,
 
                "PolyN kicked off, and has branches with predicates... {:?}",
 
                poly_n.branches.keys().collect::<Vec<_>>()
 
            );
 
            Some(poly_n)
 
        } else {
 
            log!(&mut self.inner.logger, "NO NATIVE COMPONENT");
 
            None
 
        };
 

	
 
        // 6. Kick off the synchronous round of each protocol actor
 
        //    If just one actor becomes inconsistent now, there can be no solution!
 
        //    TODO distinguish between completed and not completed poly_p's?
 
        log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
        for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() {
 
            let my_subtree_id = SubtreeId::PolyP { index };
 
            let m_ctx = PolyPContext {
 
                my_subtree_id,
 
                inner: &mut self.inner,
 
                solution_storage: &mut self.ephemeral.solution_storage,
 
            };
 
            use SyncRunResult as Srr;
 
            let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?;
 
            log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
            match blocker {
 
                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
            }
 
        }
 
        log!(&mut self.inner.logger, "All Poly machines have been kicked off!");
 

	
 
        // 7. `solution_storage` may have new solutions for this controller
 
        //    handle their discovery. LEADER => announce, otherwise => send to parent
 
        {
 
            let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
            log!(
 
                &mut self.inner.logger,
 
                "Got {} controller-local solutions before a single RECV: {:?}",
 
                peeked.len(),
 
                peeked
 
            );
 
        }
 
        if self.handle_locals_maybe_decide()? {
 
            return Ok(());
 
        }
 

	
 
        // 4. Receive incoming messages until the DECISION is made
 
        log!(&mut self.inner.logger, "No decision yet. Time to recv messages");
 
        self.undelay_all();
 
        'recv_loop: loop {
 
            let received = self.recv(deadline)?.ok_or(SyncErr::Timeout)?;
 
            let received = self.recv(deadline)?.ok_or_else(|| {
 
                log!(
 
                    &mut self.inner.logger,
 
                    ":( timing out. Solutions storage in state... {:#?}",
 
                    &self.ephemeral.solution_storage
 
                );
 
                SyncErr::Timeout
 
            })?;
 
            let current_content = match received.msg {
 
                Msg::SetupMsg(_) => {
 
                    log!(&mut self.inner.logger, "recvd message {:?} and its SETUP :(", &received);
 
                    // This occurs in the event the connector was malformed during connect()
 
                    return Err(SyncErr::UnexpectedSetupMsg);
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index < self.inner.round_index =>
 
                {
 
                    // Old message! Can safely discard
 
                    log!(&mut self.inner.logger, "recvd message {:?} and its OLD! :(", &received);
 
                    drop(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index > self.inner.round_index =>
 
                {
 
                    // Message from a next round. Keep for later!
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "ecvd message {:?} and its for later. DELAY! :(",
 
                        &received
 
                    );
 
                    self.delay(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { contents, round_index }) => {
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "recvd a round-appropriate CommMsg {:?}",
 
                        &contents
 
                    );
 
                    assert_eq!(round_index, self.inner.round_index);
 
                    contents
 
                }
 
            };
 
            match current_content {
 
                CommMsgContents::Elaborate { partial_oracle } => {
 
                    // Child controller submitted a subtree solution.
 
                    if !self.inner.family.children_ekeys.contains(&received.recipient) {
 
                        return Err(SyncErr::ElaborateFromNonChild);
 
                    }
 
                    let subtree_id = SubtreeId::ChildController { ekey: received.recipient };
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received elaboration from child for subtree {:?}: {:?}",
 
                        subtree_id,
 
                        &partial_oracle
 
                    );
 
                    self.ephemeral.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut self.inner.logger,
 
                        subtree_id,
 
                        partial_oracle,
 
                    );
 

	
 
                    if self.handle_locals_maybe_decide()? {
 
                        return Ok(());
 
                    }
 
                }
 
                CommMsgContents::Announce { oracle } => {
 
                    if self.inner.family.parent_ekey != Some(received.recipient) {
 
                        return Err(SyncErr::AnnounceFromNonParent);
 
                    }
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received ANNOUNCEMENT from from parent {:?}: {:?}",
 
                        received.recipient,
 
                        &oracle
 
                    );
 
                    return self.end_round_with_decision(oracle);
 
                }
 
                CommMsgContents::SendPayload { payload_predicate, payload } => {
 
                    // message for some actor. Feed it to the appropriate actor
 
                    // and then give them another chance to run.
 
                    let subtree_id = ekey_to_holder.get(&received.recipient);
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
                        subtree_id,
 
                        &payload_predicate,
 
                        &payload
 
                    );
 
                    match subtree_id {
 
                        None => {
 
                            // this happens when a message is sent to a component that has exited.
 
                            // It's safe to drop this message;
 
                            // The sender branch will certainly not be part of the solution
 
                        }
 
                        Some(PolyId::N) => {
 
                            // Message for NativeMachine
 
                            self.ephemeral.poly_n.as_mut().unwrap().sync_recv(
 
                                received.recipient,
 
                                &mut self.inner.logger,
 
                                payload,
 
                                payload_predicate,
 
                                &mut self.ephemeral.solution_storage,
 
                            );
 
                            if self.handle_locals_maybe_decide()? {
 
                                return Ok(());
 
                            }
 
                        }
 
                        Some(PolyId::P { index }) => {
 
                            // Message for protocol actor
 
                            let channel_id = self
 
                                .inner
 
                                .endpoint_exts
 
                                .get(received.recipient)
 
                                .expect("UEHFU")
 
                                .info
 
                                .channel_id;
 
                            if payload_predicate.query(channel_id) != Some(true) {
 
                                // sender didn't preserve the invariant
 
                                return Err(SyncErr::PayloadPremiseExcludesTheChannel(channel_id));
 
                            }
 
                            let poly_p = &mut self.ephemeral.poly_ps[*index];
 

	
 
                            let m_ctx = PolyPContext {
 
                                my_subtree_id: SubtreeId::PolyP { index: *index },
 
                                inner: &mut self.inner,
 
                                solution_storage: &mut self.ephemeral.solution_storage,
 
                            };
 
                            use SyncRunResult as Srr;
 
                            let blocker = poly_p.poly_recv_run(
 
                                m_ctx,
 
                                &self.protocol_description,
 
                                received.recipient,
 
                                payload_predicate,
 
                                payload,
 
                            )?;
 
                            log!(
 
                                &mut self.inner.logger,
 
                                "... Fed the msg to PolyP {:?} and ran it to blocker {:?}",
 
                                subtree_id,
 
                                blocker
 
                            );
 
                            match blocker {
 
                                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                                Srr::BlockingForRecv | Srr::AllBranchesComplete => {
 
                                    {
 
                                        let peeked = self
 
                                            .ephemeral
 
                                            .solution_storage
 
                                            .peek_new_locals()
 
                                            .collect::<Vec<_>>();
 
                                        log!(
 
                                            &mut self.inner.logger,
 
                                            "Got {} new controller-local solutions from RECV: {:?}",
 
                                            peeked.len(),
 
                                            peeked
 
                                        );
 
                                    }
 
                                    if self.handle_locals_maybe_decide()? {
 
                                        return Ok(());
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    };
 
                }
 
            }
 
        }
 
    }
 
}
 
impl ControllerEphemeral {
 
    fn is_clear(&self) -> bool {
 
        self.solution_storage.is_clear()
 
            && self.poly_n.is_none()
 
            && self.poly_ps.is_empty()
 
            && self.ekey_to_holder.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.solution_storage.clear();
 
        self.poly_n.take();
 
        self.poly_ps.clear();
 
        self.ekey_to_holder.clear();
 
    }
 
}
 
impl Into<PolyP> for MonoP {
 
    fn into(self) -> PolyP {
 
        PolyP {
 
            complete: Default::default(),
 
            incomplete: hashmap! {
 
                Predicate::new_trivial() =>
 
                BranchP {
 
                    state: self.state,
 
                    inbox: Default::default(),
 
                    outbox: Default::default(),
 
                }
 
            },
 
            ekeys: self.ekeys,
 
        }
 
    }
 
}
 

	
 
impl From<EndpointErr> for SyncErr {
 
    fn from(e: EndpointErr) -> SyncErr {
 
        SyncErr::EndpointErr(e)
 
    }
 
}
 

	
 
impl MonoContext for MonoPContext<'_> {
 
    type D = ProtocolD;
 
    type S = ProtocolS;
 
    fn new_component(&mut self, moved_ekeys: HashSet<Key>, init_state: Self::S) {
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_component with ekeys {:?}!",
 
            &moved_ekeys,
 
        );
 
        if moved_ekeys.is_subset(self.ekeys) {
 
            self.ekeys.retain(|x| !moved_ekeys.contains(x));
 
            self.inner.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys });
 
        } else {
 
            panic!("MachineP attempting to move alien ekey!");
 
        }
 
    }
 
    fn new_channel(&mut self) -> [Key; 2] {
 
        let [a, b] = Endpoint::new_memory_pair();
 
        let channel_id = self.inner.channel_id_stream.next();
 
        let kp = self.inner.endpoint_exts.alloc(EndpointExt {
 
            info: EndpointInfo { polarity: Putter, channel_id },
 
            endpoint: a,
 
        });
 
        let kg = self.inner.endpoint_exts.alloc(EndpointExt {
 
            info: EndpointInfo { polarity: Putter, channel_id },
 
            endpoint: b,
 
        });
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_channel. returning ekeys {:?}!",
 
            [kp, kg],
 
        );
 
        [kp, kg]
 
    }
 
    fn new_random(&mut self) -> u64 {
 
        type Bytes8 = [u8; std::mem::size_of::<u64>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        let val = unsafe { std::mem::transmute::<Bytes8, _>(bytes) };
 
        log!(
 
            &mut self.inner.logger,
 
            "!! MonoContext callback to new_random. returning val {:?}!",
 
            val,
 
        );
 
        val
 
    }
 
}
src/runtime/connector.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{errors::*, *};
 

	
 
pub fn random_controller_id() -> ControllerId {
 
    type Bytes8 = [u8; std::mem::size_of::<ControllerId>()];
 
    let mut bytes = Bytes8::default();
 
    getrandom::getrandom(&mut bytes).unwrap();
 
    unsafe { std::mem::transmute::<Bytes8, ControllerId>(bytes) }
 
}
 

	
 
impl Default for Unconfigured {
 
    fn default() -> Self {
 
        let controller_id = random_controller_id();
 
        Self { controller_id }
 
    }
 
}
 
impl Default for Connector {
 
    fn default() -> Self {
 
        Self::Unconfigured(Unconfigured::default())
 
    }
 
}
 
impl Connector {
 
    /// Configure the Connector with the given Pdl description.
 
    pub fn configure(&mut self, pdl: &[u8], main_component: &[u8]) -> Result<(), ConfigErr> {
 
        use ConfigErr::*;
 
        let controller_id = match self {
 
            Connector::Configured(_) => return Err(AlreadyConfigured),
 
            Connector::Connected(_) => return Err(AlreadyConnected),
 
            Connector::Unconfigured(Unconfigured { controller_id }) => *controller_id,
 
        };
 
        let protocol_description = Arc::new(ProtocolD::parse(pdl).map_err(ParseErr)?);
 
        let polarities = protocol_description.component_polarities(main_component)?;
 
        let configured = Configured {
 
            controller_id,
 
            protocol_description,
 
            bindings: Default::default(),
 
            polarities,
 
            main_component: main_component.to_vec(),
 
        };
 
        *self = Connector::Configured(configured);
 
        Ok(())
 
    }
 

	
 
    /// Bind the (configured) connector's port corresponding to the
 
    pub fn bind_port(
 
        &mut self,
 
        proto_port_index: usize,
 
        binding: PortBinding,
 
    ) -> Result<(), PortBindErr> {
 
        use PortBindErr::*;
 
        match self {
 
            Connector::Unconfigured { .. } => Err(NotConfigured),
 
            Connector::Connected(_) => Err(AlreadyConnected),
 
            Connector::Configured(configured) => {
 
                if configured.polarities.len() <= proto_port_index {
 
                    return Err(IndexOutOfBounds);
 
                }
 
                configured.bindings.insert(proto_port_index, binding);
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ConnectErr> {
 
        let deadline = Instant::now() + timeout;
 
        use ConnectErr::*;
 
        let configured = match self {
 
            Connector::Unconfigured { .. } => return Err(NotConfigured),
 
            Connector::Connected(_) => return Err(AlreadyConnected),
 
            Connector::Configured(configured) => configured,
 
        };
 
        // 1. Unwrap bindings or err
 
        let bound_proto_interface: Vec<(_, _)> = configured
 
            .polarities
 
            .iter()
 
            .copied()
 
            .enumerate()
 
            .map(|(native_index, polarity)| {
 
                let binding = configured
 
                    .bindings
 
                    .get(&native_index)
 
                    .copied()
 
                    .ok_or(PortNotBound { native_index })?;
 
                Ok((binding, polarity))
 
            })
 
            .collect::<Result<Vec<(_, _)>, ConnectErr>>()?;
 
        let (controller, native_interface) = Controller::connect(
 
            configured.controller_id,
 
            &configured.main_component,
 
            configured.protocol_description.clone(),
 
            &bound_proto_interface[..],
 
            deadline,
 
        )?;
 
        *self = Connector::Connected(Connected {
 
            native_interface,
 
            sync_batches: vec![Default::default()],
 
            controller,
 
        });
 
        Ok(())
 
    }
 
    pub fn get_mut_logger(&mut self) -> Option<&mut String> {
 
        match self {
 
            Connector::Connected(connected) => Some(&mut connected.controller.inner.logger),
 
            _ => None,
 
        }
 
    }
 

	
 
    pub fn put(&mut self, native_port_index: usize, payload: Payload) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Putter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().unwrap();
 
        if sync_batch.puts.contains_key(&ekey) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.puts.insert(ekey, payload);
 
        Ok(())
 
    }
 

	
 
    pub fn get(&mut self, native_port_index: usize) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let (ekey, native_polarity) =
 
            *connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if native_polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let sync_batch = connected.sync_batches.iter_mut().last().unwrap();
 
        if sync_batch.gets.contains(&ekey) {
 
            return Err(DuplicateOperation);
 
        }
 
        sync_batch.gets.insert(ekey);
 
        Ok(())
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, ()> {
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(()),
 
        };
 
        connected.sync_batches.push(SyncBatch::default());
 
        Ok(connected.sync_batches.len() - 1)
 
    }
 

	
 
    pub fn sync(&mut self, timeout: Duration) -> Result<usize, SyncErr> {
 
        let deadline = Instant::now() + timeout;
 
        use SyncErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 

	
 
        // do the synchronous round!
 
        connected.controller.sync_round(deadline, Some(connected.sync_batches.drain(..)))?;
 
        connected.sync_batches.push(SyncBatch::default());
 

	
 
        let mono_n = connected.controller.inner.mono_n.as_mut().unwrap();
 
        let result = mono_n.result.as_mut().unwrap();
 
        Ok(result.0)
 
    }
 

	
 
    pub fn read_gotten(&self, native_port_index: usize) -> Result<&[u8], ReadGottenErr> {
 
        use ReadGottenErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
 
        };
 
        let &(key, polarity) =
 
            connected.native_interface.get(native_port_index).ok_or(IndexOutOfBounds)?;
 
        if polarity != Getter {
 
            return Err(WrongPolarity);
 
        }
 
        let mono_n = connected.controller.inner.mono_n.as_ref().expect("controller has no mono_n?");
 
        let result = mono_n.result.as_ref().ok_or(NoPreviousRound)?;
 
        let payload = result.1.get(&key).ok_or(DidntGet)?;
 
        let payload = result.1.get(&key).ok_or(DidNotGet)?;
 
        Ok(payload)
 
    }
 
}
src/runtime/errors.rs
Show inline comments
 
use crate::common::*;
 

	
 
#[derive(Debug)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum PortBindErr {
 
    AlreadyConnected,
 
    IndexOutOfBounds,
 
    NotConfigured,
 
    ParseErr,
 
    AlreadyConfigured,
 
}
 
#[derive(Debug)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum ReadGottenErr {
 
    NotConnected,
 
    IndexOutOfBounds,
 
    WrongPolarity,
 
    NoPreviousRound,
 
    DidntGet,
 
    DidNotGet,
 
}
 
#[derive(Debug)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum PortOpErr {
 
    IndexOutOfBounds,
 
    NotConnected,
 
    WrongPolarity,
 
    DuplicateOperation,
 
}
 
#[derive(Debug)]
 
#[derive(Debug, Clone, PartialEq, Eq)]
 
pub enum ConfigErr {
 
    AlreadyConnected,
 
    ParseErr(String),
 
    AlreadyConfigured,
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
}
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum ConnectErr {
 
    PortNotBound { native_index: usize },
 
    NotConfigured,
 
    AlreadyConnected,
 
    MetaProtocolDeviation,
 
    Disconnected,
 
    PollInitFailed,
 
    MessengerRecvErr(MessengerRecvErr),
 
    Timeout,
 
    PollingFailed,
 
    PolarityMatched(SocketAddr),
 
    AcceptFailed(SocketAddr),
 
    PassiveConnectFailed(SocketAddr),
 
    BindFailed(SocketAddr),
 
}
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum PollDeadlineErr {
 
    PollingFailed,
 
    Timeout,
 
}
 

	
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum EndpointErr {
 
    Disconnected,
 
    MetaProtocolDeviation,
 
}
 

	
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum SyncErr {
 
    NotConnected,
 
    MessengerRecvErr(MessengerRecvErr),
 
    Inconsistent,
 
    Timeout,
 
    ElaborateFromNonChild,
 
    AnnounceFromNonParent,
 
    PayloadPremiseExcludesTheChannel(ChannelId),
 
    UnexpectedSetupMsg,
 
    EndpointErr(EndpointErr),
 
    EvalErr(EvalErr),
 
}
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum EvalErr {
 
    ComponentExitWhileBranching,
 
}
 
#[derive(Debug, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum MessengerRecvErr {
 
    PollingFailed,
 
    EndpointErr(EndpointErr),
 
}
 
impl From<MainComponentErr> for ConfigErr {
 
    fn from(e: MainComponentErr) -> Self {
 
        use ConfigErr as C;
 
        use MainComponentErr as M;
 
        match e {
 
            M::NoSuchComponent => C::NoSuchComponent,
 
            M::NonPortTypeParameters => C::NonPortTypeParameters,
 
        }
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
#[cfg(feature = "ffi")]
 
pub mod ffi;
 

	
 
mod actors;
 
pub(crate) mod communication;
 
pub(crate) mod connector;
 
pub(crate) mod endpoint;
 
pub mod errors;
 
mod predicate; // TODO later
 
mod serde;
 
pub(crate) mod setup;
 

	
 
pub(crate) type ProtocolD = crate::protocol::ProtocolDescriptionImpl;
 
pub(crate) type ProtocolS = crate::protocol::ComponentStateImpl;
 

	
 
use crate::common::*;
 
use actors::*;
 
use endpoint::*;
 
use errors::*;
 

	
 
#[derive(Debug, PartialEq)]
 
pub(crate) enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 

	
 
#[derive(Clone, Eq, PartialEq, Hash)]
 
pub(crate) struct Predicate {
 
    pub assigned: BTreeMap<ChannelId, bool>,
 
}
 

	
 
#[derive(Debug, Default)]
 
struct SyncBatch {
 
    puts: HashMap<Key, Payload>,
 
    gets: HashSet<Key>,
 
}
 

	
 
#[derive(Debug)]
 
pub enum Connector {
 
    Unconfigured(Unconfigured),
 
    Configured(Configured),
 
    Connected(Connected), // TODO consider boxing. currently takes up a lot of stack real estate
 
}
 
#[derive(Debug)]
 
pub struct Unconfigured {
 
    pub controller_id: ControllerId,
 
}
 
#[derive(Debug)]
 
pub struct Configured {
 
    controller_id: ControllerId,
 
    polarities: Vec<Polarity>,
 
    bindings: HashMap<usize, PortBinding>,
 
    protocol_description: Arc<ProtocolD>,
 
    main_component: Vec<u8>,
 
}
 
#[derive(Debug)]
 
pub struct Connected {
 
    native_interface: Vec<(Key, Polarity)>,
 
    sync_batches: Vec<SyncBatch>,
 
    controller: Controller,
 
}
 

	
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortBinding {
 
    Native,
 
    Active(SocketAddr),
 
    Passive(SocketAddr),
 
}
 

	
 
#[derive(Debug)]
 
struct Arena<T> {
 
    storage: Vec<T>,
 
}
 

	
 
#[derive(Debug)]
 
struct ReceivedMsg {
 
    recipient: Key,
 
    msg: Msg,
 
}
 

	
 
#[derive(Debug)]
 
struct MessengerState {
 
    poll: Poll,
 
    events: Events,
 
    delayed: Vec<ReceivedMsg>,
 
    undelayed: Vec<ReceivedMsg>,
 
    polled_undrained: IndexSet<Key>,
 
}
 
#[derive(Debug)]
 
struct ChannelIdStream {
 
    controller_id: ControllerId,
 
    next_channel_index: ChannelIndex,
 
}
 

	
 
#[derive(Debug)]
 
struct Controller {
 
    protocol_description: Arc<ProtocolD>,
 
    inner: ControllerInner,
 
    ephemeral: ControllerEphemeral,
 
}
 
#[derive(Debug)]
 
struct ControllerInner {
 
    round_index: usize,
 
    channel_id_stream: ChannelIdStream,
 
    endpoint_exts: Arena<EndpointExt>,
 
    messenger_state: MessengerState,
 
    mono_n: Option<MonoN>,
 
    mono_ps: Vec<MonoP>,
 
    family: ControllerFamily,
 
    logger: String,
 
}
 

	
 
/// This structure has its state entirely reset between synchronous rounds
 
#[derive(Debug, Default)]
 
struct ControllerEphemeral {
 
    solution_storage: SolutionStorage,
 
    poly_n: Option<PolyN>,
 
    poly_ps: Vec<PolyP>,
 
    ekey_to_holder: HashMap<Key, PolyId>,
 
}
 

	
 
#[derive(Debug)]
 
struct ControllerFamily {
 
    parent_ekey: Option<Key>,
 
    children_ekeys: Vec<Key>,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum SyncRunResult {
 
    BlockingForRecv,
 
    AllBranchesComplete,
 
    NoBranches,
 
}
 

	
 
// Used to identify poly actors
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
enum PolyId {
 
    N,
 
    P { index: usize },
 
}
 

	
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
 
pub(crate) enum SubtreeId {
 
    PolyN,
 
    PolyP { index: usize },
 
    ChildController { ekey: Key },
 
}
 

	
 
pub(crate) struct MonoPContext<'a> {
 
    inner: &'a mut ControllerInner,
 
    ekeys: &'a mut HashSet<Key>,
 
}
 
pub(crate) struct PolyPContext<'a> {
 
    my_subtree_id: SubtreeId,
 
    inner: &'a mut ControllerInner,
 
    solution_storage: &'a mut SolutionStorage,
 
}
 
impl PolyPContext<'_> {
 
    #[inline(always)]
 
    fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> {
 
        let Self { solution_storage, my_subtree_id, inner } = self;
 
        PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner }
 
    }
 
}
 
struct BranchPContext<'m, 'r> {
 
    m_ctx: PolyPContext<'m>,
 
    ekeys: &'r HashSet<Key>,
 
    predicate: &'r Predicate,
 
    inbox: &'r HashMap<Key, Payload>,
 
}
 

	
 
#[derive(Debug, Default)]
 
#[derive(Default)]
 
pub(crate) struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 

	
 
trait Messengerlike {
 
    fn get_state_mut(&mut self) -> &mut MessengerState;
 
    fn get_endpoint_mut(&mut self, eekey: Key) -> &mut Endpoint;
 

	
 
    fn delay(&mut self, received: ReceivedMsg) {
 
        self.get_state_mut().delayed.push(received);
 
    }
 
    fn undelay_all(&mut self) {
 
        let MessengerState { delayed, undelayed, .. } = self.get_state_mut();
 
        undelayed.extend(delayed.drain(..))
 
    }
 

	
 
    fn send(&mut self, to: Key, msg: Msg) -> Result<(), EndpointErr> {
 
        self.get_endpoint_mut(to).send(msg)
 
    }
 

	
 
    // attempt to receive a message from one of the endpoints before the deadline
 
    fn recv(&mut self, deadline: Instant) -> Result<Option<ReceivedMsg>, MessengerRecvErr> {
 
        // try get something buffered
 
        if let Some(x) = self.get_state_mut().undelayed.pop() {
 
            return Ok(Some(x));
 
        }
 

	
 
        loop {
 
            // polled_undrained may not be empty
 
            while let Some(eekey) = self.get_state_mut().polled_undrained.pop() {
 
                if let Some(msg) = self.get_endpoint_mut(eekey).recv()? {
 
                    // this endpoint MAY still have messages! check again in future
 
                    self.get_state_mut().polled_undrained.insert(eekey);
 
                    return Ok(Some(ReceivedMsg { recipient: eekey, msg }));
 
                }
 
            }
 

	
 
            let state = self.get_state_mut();
 
            match state.poll_events(deadline) {
 
                Ok(()) => {
 
                    for e in state.events.iter() {
 
                        state.polled_undrained.insert(Key::from_token(e.token()));
 
                    }
 
                }
 
                Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed),
 
                Err(PollDeadlineErr::Timeout) => return Ok(None),
 
            }
 
        }
 
    }
 
}
 

	
 
/////////////////////////////////
 

	
 
impl Debug for SolutionStorage {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        f.pad("Solutions: [")?;
 
        for (subtree_id, &index) in self.subtree_id_to_index.iter() {
 
            let sols = &self.subtree_solutions[index];
 
            f.write_fmt(format_args!("{:?} => {:?}, ", subtree_id, sols))?;
 
        }
 
        f.pad("]")
 
    }
 
}
 
impl From<EvalErr> for SyncErr {
 
    fn from(e: EvalErr) -> SyncErr {
 
        SyncErr::EvalErr(e)
 
    }
 
}
 
impl From<MessengerRecvErr> for SyncErr {
 
    fn from(e: MessengerRecvErr) -> SyncErr {
 
        SyncErr::MessengerRecvErr(e)
 
    }
 
}
 
impl From<MessengerRecvErr> for ConnectErr {
 
    fn from(e: MessengerRecvErr) -> ConnectErr {
 
        ConnectErr::MessengerRecvErr(e)
 
    }
 
}
 
impl From<EndpointErr> for MessengerRecvErr {
 
    fn from(e: EndpointErr) -> MessengerRecvErr {
 
        MessengerRecvErr::EndpointErr(e)
 
    }
 
}
 
impl<T> Default for Arena<T> {
 
    fn default() -> Self {
 
        Self { storage: vec![] }
 
    }
 
}
 
impl<T> Arena<T> {
 
    pub fn alloc(&mut self, t: T) -> Key {
 
        self.storage.push(t);
 
        Key::from_raw(self.storage.len() as u64 - 1)
 
    }
 
    pub fn get(&self, key: Key) -> Option<&T> {
 
        self.storage.get(key.to_raw() as usize)
 
    }
 
    pub fn get_mut(&mut self, key: Key) -> Option<&mut T> {
 
        self.storage.get_mut(key.to_raw() as usize)
 
    }
 
    pub fn type_convert<X>(self, f: impl FnMut((Key, T)) -> X) -> Arena<X> {
 
        Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() }
 
    }
 
    pub fn iter(&self) -> impl Iterator<Item = (Key, &T)> {
 
        self.keyspace().zip(self.storage.iter())
 
    }
 
    pub fn len(&self) -> usize {
 
        self.storage.len()
 
    }
 
    pub fn keyspace(&self) -> impl Iterator<Item = Key> {
 
        (0..(self.storage.len() as u64)).map(Key::from_raw)
 
    }
 
}
 

	
 
impl ChannelIdStream {
 
    fn new(controller_id: ControllerId) -> Self {
 
        Self { controller_id, next_channel_index: 0 }
 
    }
 
    fn next(&mut self) -> ChannelId {
 
        self.next_channel_index += 1;
 
        ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 }
 
    }
 
}
 

	
 
impl MessengerState {
 
    // does NOT guarantee that events is non-empty
 
    fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> {
 
        use PollDeadlineErr::*;
 
        self.events.clear();
 
        let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
        self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?;
 
        Ok(())
 
    }
 
}
 
impl From<PollDeadlineErr> for ConnectErr {
 
    fn from(e: PollDeadlineErr) -> ConnectErr {
 
        match e {
 
            PollDeadlineErr::Timeout => ConnectErr::Timeout,
 
            PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed,
 
        }
 
    }
 
}
 

	
 
impl std::ops::Not for Polarity {
 
    type Output = Self;
 
    fn not(self) -> Self::Output {
 
        use Polarity::*;
 
        match self {
 
            Putter => Getter,
 
            Getter => Putter,
 
        }
 
    }
 
}
 

	
 
impl Predicate {
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn satisfies(&self, other: &Self) -> bool {
 
        let mut s_it = self.assigned.iter();
 
        let mut s = if let Some(s) = s_it.next() {
 
            s
 
        } else {
 
            return other.assigned.is_empty();
 
        };
 
        for (oid, ob) in other.assigned.iter() {
 
            while s.0 < oid {
 
                s = if let Some(s) = s_it.next() {
 
                    s
 
                } else {
 
                    return false;
 
                };
 
            }
 
            if s.0 > oid || s.1 != ob {
 
                return false;
 
            }
 
        }
 
        true
 
    }
 

	
 
    /// Given self and other, two predicates, return the most general Predicate possible, N
 
    /// such that n.satisfies(self) && n.satisfies(other).
 
    /// If none exists Nonexistant is returned.
 
    /// If the resulting predicate is equivlanet to self, other, or both,
 
    /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
    /// otherwise New(N) is returned.
 
    pub fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
        use CommonSatResult::*;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // lists of assignments in self but not other and vice versa.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
            match [s, o] {
 
                [None, None] => break,
 
                [None, Some(x)] => {
 
                    o_not_s.push(x);
 
                    o_not_s.extend(o_it);
 
                    break;
 
                }
 
                [Some(x), None] => {
 
                    s_not_o.push(x);
 
                    s_not_o.extend(s_it);
 
                    break;
 
                }
 
                [Some((sid, sb)), Some((oid, ob))] => {
 
                    if sid < oid {
 
                        // o is missing this element
 
                        s_not_o.push((sid, sb));
 
                        s = s_it.next();
 
                    } else if sid > oid {
 
                        // s is missing this element
 
                        o_not_s.push((sid, sb));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Equivalent,       // ... equivalent to both.
 
            [false, true] => FormerNotLatter, // ... equivalent to self.
 
            [true, false] => LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut predicate = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    predicate.assigned.insert(id, b);
 
                }
 
                New(predicate)
 
            }
 
        }
 
    }
 

	
 
    pub fn iter_matching(&self, value: bool) -> impl Iterator<Item = ChannelId> + '_ {
 
        self.assigned
 
            .iter()
 
            .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None })
 
    }
 

	
 
    pub fn batch_assign_nones(
 
        &mut self,
 
        channel_ids: impl Iterator<Item = ChannelId>,
 
        value: bool,
 
    ) {
 
        for channel_id in channel_ids {
 
            self.assigned.entry(channel_id).or_insert(value);
 
        }
src/runtime/setup.rs
Show inline comments
 
@@ -213,240 +213,241 @@ impl Controller {
 
                        if !Self::test_stream_connectivity(stream) {
 
                            return Err(PassiveConnectFailed(*addr));
 
                        }
 
                        ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                        let mut res = Ok(());
 
                        take_mut::take(entry, |e| {
 
                            assert_let![PassiveConnecting { info, stream, .. } = e => {
 
                                let mut endpoint = Endpoint::from_fresh_stream(stream);
 
                                let msg = Msg::SetupMsg(SetupMsg::ChannelSetup { info });
 
                                res = endpoint.send(msg);
 
                                Finished(EndpointExt { info, endpoint })
 
                            }]
 
                        });
 
                        res?;
 
                        log!(logger, "{:03?} ... end PassiveConnecting", major);
 
                        assert!(to_finish.remove(&ekey));
 
                    }
 
                    ActiveConnecting { addr, stream, .. } => {
 
                        log!(logger, "{:03?} start ActiveConnecting...", major);
 
                        assert!(event.readiness().is_writable());
 
                        if Self::test_stream_connectivity(stream) {
 
                            // connect successful
 
                            log!(logger, "CONNECT SUCCESS");
 
                            ms.poll.reregister(stream, token, ready_r, edge).expect("52");
 
                            take_mut::take(entry, |e| {
 
                                assert_let![ActiveConnecting { stream, polarity, addr } = e => {
 
                                    let endpoint = Endpoint::from_fresh_stream(stream);
 
                                    ActiveRecving { endpoint, polarity, addr }
 
                                }]
 
                            });
 
                            log!(logger, ".. ok");
 
                        } else {
 
                            // connect failure. retry!
 
                            log!(logger, "CONNECT FAIL");
 
                            ms.poll.deregister(stream).expect("wt");
 
                            std::thread::sleep(Duration::from_millis(backoff_millis));
 
                            backoff_millis = ((backoff_millis as f32) * 1.2) as u64 + 3;
 
                            let mut new_stream = TcpStream::connect(addr).unwrap();
 
                            ms.poll.register(&new_stream, token, ready_w, edge).expect("PAC 3");
 
                            std::mem::swap(stream, &mut new_stream);
 
                        }
 
                        log!(logger, "{:03?} ... end ActiveConnecting", major);
 
                    }
 
                    ActiveRecving { addr, polarity, endpoint } => {
 
                        log!(logger, "{:03?} start ActiveRecving...", major);
 
                        assert!(event.readiness().is_readable());
 
                        'recv_loop: while let Some(msg) = endpoint.recv()? {
 
                            if let Msg::SetupMsg(SetupMsg::ChannelSetup { info }) = msg {
 
                                if info.polarity == *polarity {
 
                                    return Err(PolarityMatched(*addr));
 
                                }
 
                                take_mut::take(entry, |e| {
 
                                    assert_let![ActiveRecving { polarity, endpoint, .. } = e => {
 
                                        let info = EndpointInfo { polarity, channel_id: info.channel_id };
 
                                        Finished(EndpointExt { info, endpoint })
 
                                    }]
 
                                });
 
                                ms.polled_undrained.insert(ekey);
 
                                assert!(to_finish.remove(&ekey));
 
                                break 'recv_loop;
 
                            } else {
 
                                ms.delayed.push(ReceivedMsg { recipient: ekey, msg });
 
                            }
 
                        }
 
                        log!(logger, "{:03?} ... end ActiveRecving", major);
 
                    }
 
                }
 
            }
 
        }
 
        for ekey in polled_undrained_later {
 
            ms.polled_undrained.insert(ekey);
 
        }
 
        let endpoint_exts = endpoint_ext_todos.type_convert(|(_, todo)| match todo {
 
            Finished(endpoint_ext) => endpoint_ext,
 
            _ => unreachable!(),
 
        });
 
        Ok((ms, endpoint_exts))
 
    }
 

	
 
    fn setup_sink_tree_family(
 
        major: ControllerId,
 
        logger: &mut String,
 
        endpoint_exts: &mut Arena<EndpointExt>,
 
        messenger_state: &mut MessengerState,
 
        neighbors: Vec<Key>,
 
        deadline: Instant,
 
    ) -> Result<ControllerFamily, ConnectErr> {
 
        use {ConnectErr::*, Msg::SetupMsg as S, SetupMsg::*};
 

	
 
        log!(logger, "neighbors {:?}", &neighbors);
 

	
 
        let mut messenger = (messenger_state, endpoint_exts);
 
        impl Messengerlike for (&mut MessengerState, &mut Arena<EndpointExt>) {
 
            fn get_state_mut(&mut self) -> &mut MessengerState {
 
                self.0
 
            }
 
            fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint {
 
                &mut self.1.get_mut(ekey).expect("OUT OF BOUNDS").endpoint
 
            }
 
        }
 

	
 
        // 1. broadcast my ID as the first echo. await reply from all in net_keylist
 
        let echo = S(LeaderEcho { maybe_leader: major });
 
        let mut awaiting = IndexSet::with_capacity(neighbors.len());
 
        for &n in neighbors.iter() {
 
            log!(logger, "{:?}'s initial echo to {:?}, {:?}", major, n, &echo);
 
            messenger.send(n, echo.clone())?;
 
            awaiting.insert(n);
 
        }
 

	
 
        // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
        //    adopt it as leader, sender as parent, and reset the await set.
 
        let mut parent: Option<Key> = None;
 
        let mut my_leader = major;
 
        messenger.undelay_all();
 
        'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
            let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?;
 
            log!(logger, "{:?} GOT {:?} {:?}", major, &recipient, &msg);
 
            match msg {
 
                S(LeaderAnnounce { leader }) => {
 
                    // someone else completed the echo and became leader first!
 
                    // the sender is my parent
 
                    parent = Some(recipient);
 
                    my_leader = leader;
 
                    awaiting.clear();
 
                    break 'echo_loop;
 
                }
 
                S(LeaderEcho { maybe_leader }) => {
 
                    use Ordering::*;
 
                    match maybe_leader.cmp(&my_leader) {
 
                        Less => { /* ignore */ }
 
                        Equal => {
 
                            awaiting.remove(&recipient);
 
                            if awaiting.is_empty() {
 
                                if let Some(p) = parent {
 
                                    // return the echo to my parent
 
                                    messenger.send(p, S(LeaderEcho { maybe_leader }))?;
 
                                } else {
 
                                    // DECIDE!
 
                                    break 'echo_loop;
 
                                }
 
                            }
 
                        }
 
                        Greater => {
 
                            // join new echo
 
                            log!(logger, "{:?} setting leader to {:?}", major, recipient);
 
                            parent = Some(recipient);
 
                            my_leader = maybe_leader;
 
                            let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                            awaiting.clear();
 
                            if neighbors.len() == 1 {
 
                                // immediately reply to parent
 
                                log!(
 
                                    logger,
 
                                    "{:?} replying echo to parent {:?} immediately",
 
                                    major,
 
                                    recipient
 
                                );
 
                                messenger.send(recipient, echo.clone())?;
 
                            } else {
 
                                for &n in neighbors.iter() {
 
                                    if n != recipient {
 
                                        log!(
 
                                            logger,
 
                                            "{:?} repeating echo {:?} to {:?}",
 
                                            major,
 
                                            &echo,
 
                                            n
 
                                        );
 
                                        messenger.send(n, echo.clone())?;
 
                                        awaiting.insert(n);
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 
                msg => messenger.delay(ReceivedMsg { recipient, msg }),
 
            }
 
        }
 
        match parent {
 
            None => assert_eq!(
 
                my_leader, major,
 
                "I've got no parent, but I consider {:?} the leader?",
 
                my_leader
 
            ),
 
            Some(parent) => assert_ne!(
 
                my_leader, major,
 
                "I have {:?} as parent, but I consider myself ({:?}) the leader?",
 
                parent, major
 
            ),
 
        }
 
        log!(logger, "{:?} DONE WITH ECHO", major);
 

	
 
        log!(logger, "{:?} DONE WITH ECHO! Leader has cid={:?}", major, my_leader);
 

	
 
        // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
        //    in this loop, every node sends 1 message to each neighbor
 
        let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
        for &k in neighbors.iter() {
 
            let msg =
 
                if Some(k) == parent { S(YouAreMyParent) } else { msg_for_non_parents.clone() };
 
            log!(logger, "{:?} ANNOUNCING to {:?} {:?}", major, k, &msg);
 
            messenger.send(k, msg)?;
 
        }
 

	
 
        // await 1 message from all non-parents
 
        for &n in neighbors.iter() {
 
            if Some(n) != parent {
 
                awaiting.insert(n);
 
            }
 
        }
 
        let mut children = Vec::default();
 
        messenger.undelay_all();
 
        while !awaiting.is_empty() {
 
            let ReceivedMsg { recipient, msg } = messenger.recv(deadline)?.ok_or(Timeout)?;
 
            match msg {
 
                S(YouAreMyParent) => {
 
                    assert!(awaiting.remove(&recipient));
 
                    children.push(recipient);
 
                }
 
                S(SetupMsg::LeaderAnnounce { leader }) => {
 
                    assert!(awaiting.remove(&recipient));
 
                    assert!(leader == my_leader);
 
                    assert!(Some(recipient) != parent);
 
                    // they wouldn't send me this if they considered me their parent
 
                }
 
                _ => messenger.delay(ReceivedMsg { recipient, msg }),
 
            }
 
        }
 
        Ok(ControllerFamily { parent_ekey: parent, children_ekeys: children })
 
    }
 
}
 

	
 
impl Messengerlike for Controller {
 
    fn get_state_mut(&mut self) -> &mut MessengerState {
 
        &mut self.inner.messenger_state
 
    }
 
    fn get_endpoint_mut(&mut self, ekey: Key) -> &mut Endpoint {
 
        &mut self.inner.endpoint_exts.get_mut(ekey).expect("OUT OF BOUNDS").endpoint
 
    }
 
}
src/test/connector.rs
Show inline comments
 
extern crate test_generator;
 

	
 
use super::*;
 

	
 
use std::thread;
 

	
 
use crate::common::*;
 
use crate::runtime::{errors::*, PortBinding::*, *};
 

	
 
// using a static AtomicU16, shared between all tests in the binary,
 
// allocate and return a socketaddr of the form 127.0.0.1:X where X in 7000..
 
fn next_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(7_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port).into()
 
}
 

	
 
#[test]
 
fn incremental() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    static PDL: &[u8] = b"";
 
    let handles = vec![
 
        thread::spawn(move || {
 
            let controller_id = 0;
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
            x.configure(
 
                b"primitive main(out a, out b) {
 
                    synchronous {
 
                        msg m = create(0);
 
                        put(a, m);
 
                    }
 
                }",
 
                b"main",
 
            )
 
            .unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        }),
 
        thread::spawn(move || {
 
            let controller_id = 1;
 
            let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
            x.configure(
 
                b"primitive main(in a, in b) {
 
                    synchronous {
 
                        get(a);
 
                    }
 
                }",
 
                b"main",
 
            )
 
            .unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Active(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        }),
 
    ];
 
    for h in handles {
 
        handle(h.join())
 
    }
 
}
 

	
 
#[test]
 
fn duo_positive() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    let a = thread::spawn(move || {
 
        let controller_id = 0;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(out a, out b) {
 
                synchronous {}
 
                synchronous {}
 
                synchronous {
 
                    msg m = create(0);
 
                    put(a, m);
 
                }
 
                synchronous {
 
                    msg m = create(0);
 
                    put(b, m);
 
                }
 
            }",
 
            b"main",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Passive(addrs[0])).unwrap();
 
        x.bind_port(1, Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
    });
 
    let b = thread::spawn(move || {
 
        let controller_id = 1;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(in a, in b) {
 
                while (true) {
 
                    synchronous {
 
                        if (fires(a)) {
 
                            get(a);
 
                        }
 
                    }
 
                    synchronous {
 
                        if (fires(b)) {
 
                            get(b);
 
                        }
 
                    }
 
                }
 
            }",
 
            b"main",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Active(addrs[0])).unwrap();
 
        x.bind_port(1, Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
    });
 
    handle(a.join());
 
    handle(b.join());
 
}
 

	
 
#[test]
 
fn duo_negative() {
 
    let timeout = Duration::from_millis(500);
 
    let addrs = [next_addr(), next_addr()];
 
    let a = thread::spawn(move || {
 
        let controller_id = 0;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(out a, out b) {
 
                synchronous {}
 
                synchronous {
 
                    msg m = create(0);
 
                    put(a, m); // fires a on second round
 
                }
 
            }",
 
            b"main",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Passive(addrs[0])).unwrap();
 
        x.bind_port(1, Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        let r = x.sync(timeout);
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        match r {
 
            Err(SyncErr::Timeout) => {}
 
            x => unreachable!("{:?}", x),
 
        }
 
    });
 
    let b = thread::spawn(move || {
 
        let controller_id = 1;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(in a, in b) {
 
                while (true) {
 
                    synchronous {
 
                        if (fires(a)) {
 
                            get(a);
 
                        }
 
                    }
 
                    synchronous {
 
                        if (fires(b)) { // never fire a on even round
 
                            get(b);
 
                        }
 
                    }
 
                }
 
            }",
 
            b"main",
 
        )
 
        .unwrap();
 
        x.bind_port(0, Active(addrs[0])).unwrap();
 
        x.bind_port(1, Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        let r = x.sync(timeout);
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
        match r {
 
            Err(SyncErr::Timeout) => {}
 
            x => unreachable!("{:?}", x),
 
        }
 
    });
 
    handle(a.join());
 
    handle(b.join());
 
}
 

	
 
static FORWARD: &[u8] = b"
 
primitive forward(in i, out o) {
 
    while(true) synchronous {
 
        put(o, get(i));
 
    }
 
}";
 

	
 
#[test]
 
fn connect_natives() {
 
    static CHAIN: &[u8] = b"
 
    primitive main(in i, out o) {
 
        while(true) synchronous {}
 
    }";
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    do_all(&[
 
        &|x| {
 
            x.configure(CHAIN, b"main").unwrap();
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
        },
 
        &|x| {
 
            x.configure(CHAIN, b"main").unwrap();
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
        },
 
    ]);
 
}
 

	
 
#[test]
 
fn forward() {
 
    static FORWARD: &[u8] = b"
 
    primitive main(in i, out o) {
 
        while(true) synchronous {
 
            put(o, get(i));
 
        }
 
    }";
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    do_all(&[
 
        //
 
        &|x| {
 
            x.configure(FORWARD, b"main").unwrap();
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let msg = b"HELLO!".to_vec();
 
            x.put(0, msg).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
        },
 
        &|x| {
 
            x.configure(FORWARD, b"main").unwrap();
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let expect = b"HELLO!".to_vec();
 
            x.get(0).unwrap();
 
            assert_eq!(0, x.sync(timeout).unwrap());
 
            assert_eq!(expect, x.read_gotten(0).unwrap());
 
        },
 
    ]);
 
}
 

	
 
static SYNC: &[u8] = b"
 
primitive sync(in i, out o) {
 
    while(true) synchronous {
 
        if (fires(i)) put(o, get(i));
 
    }
 
}";
 
#[test]
 
fn native_alt() {
 
    /*
 
    Alice -->sync--A|P-->sync--> Bob
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr()];
 
    const N: usize = 3;
 
    do_all(&[
 
        //
 
        &|x| {
 
            x.configure(SYNC, b"sync").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Active(addrs[0])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let msg = b"HI".to_vec();
 
            for _i in 0..N {
 
                // round _i*2: batches: [0=>*]
 
                assert_eq!(0, x.sync(timeout).unwrap());
 

	
 
                // round _i*2+1: batches: [0=>HI]
 
                x.put(0, msg.clone()).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            x.configure(SYNC, b"sync").unwrap();
 
            x.bind_port(0, Passive(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            let expect = b"HI".to_vec();
 
            for _i in 0..(2 * N) {
 
                // round _i batches:[0=>*, 0=>HI]
 
                x.next_batch().unwrap();
 
                x.get(0).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    1 => assert_eq!(Ok(&expect[..]), x.read_gotten(0)),
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
    ]);
 
}
 

	
 
static ALTERNATOR_2: &[u8] = b"
 
primitive alternator_2(in i, out a, out b) {
 
    while(true) {
 
        synchronous { put(a, get(i)); }
 
        synchronous { put(b, get(i)); } 
 
    }
 
}";
 

	
 
#[test]
 
fn alternator_2() {
 
    /*                    /--|-->A
 
    Sender -->alternator_2
 
                          \--|-->B
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 5;
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(ALTERNATOR_2, b"alternator_2").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                for _ in 0..2 {
 
                    x.put(0, b"hey".to_vec()).unwrap();
 
                    assert_eq!(0, x.sync(timeout).unwrap());
 
                }
 
            }
 
        },
 
        &|x| {
 
            // A
 
            x.configure(SYNC, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            let expecting: &[u8] = b"hey";
 

	
 
            for _ in 0..N {
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(expecting), x.read_gotten(0));
 

	
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 
            }
 
        },
 
        &|x| {
 
            // B
 
            x.configure(SYNC, b"sync").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 
            let expecting: &[u8] = b"hey";
 

	
 
            for _ in 0..N {
 
                // silent round
 
                assert_eq!(Ok(0), x.sync(timeout)); // MISS ONE
 
                assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0));
 

	
 
                // get msg round
 
                x.get(0).unwrap();
 
                assert_eq!(Ok(0), x.sync(timeout)); // GET ONE
 
                assert_eq!(Ok(expecting), x.read_gotten(0));
 
            }
 
        },
 
    ]);
 
}
 

	
 
static PARITY_ROUTER: &[u8] = b"
 
primitive parity_router(in i, out odd, out even) {
 
    while(true) synchronous {
 
        msg m = get(i);
 
        if (m[0]%2==0) {
 
            put(even, m);
 
        } else {
 
            put(odd, m);
 
        }
 
    }
 
}";
 

	
 
#[test]
 
// THIS DOES NOT YET WORK. TODOS are hit
 
fn parity_router() {
 
    /*                    /--|-->Getsodd
 
    Sender -->parity_router
 
                          \--|-->Getseven
 
    */
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = [next_addr(), next_addr()];
 
    const N: usize = 1;
 
    do_all(&[
 
        //
 
        &|x| {
 
            // Sender
 
            x.configure(PARITY_ROUTER, b"parity_router").unwrap();
 
            x.bind_port(0, Native).unwrap();
 
            x.bind_port(1, Passive(addrs[0])).unwrap();
 
            x.bind_port(2, Passive(addrs[1])).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for i in 0..N {
 
                let msg = vec![i as u8]; // messages [0], [1], [2], ...
 
                x.put(0, msg).unwrap();
 
                assert_eq!(0, x.sync(timeout).unwrap());
 
            }
 
        },
 
        &|x| {
 
            // Getsodd
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[0])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // round _i batches:[0=>*, 0=>?]
 
                x.next_batch().unwrap();
 
                x.get(0).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    1 => {
 
                        let msg = x.read_gotten(0).unwrap();
 
                        assert!(msg[0] % 2 == 1); // assert msg is odd
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
        &|x| {
 
            // Getseven
 
            x.configure(FORWARD, b"forward").unwrap();
 
            x.bind_port(0, Active(addrs[1])).unwrap();
 
            x.bind_port(1, Native).unwrap();
 
            x.connect(timeout).unwrap();
 

	
 
            for _ in 0..N {
 
                // round _i batches:[0=>*, 0=>?]
 
                x.next_batch().unwrap();
 
                x.get(0).unwrap();
 
                match x.sync(timeout).unwrap() {
 
                    0 => assert_eq!(Err(ReadGottenErr::DidNotGet), x.read_gotten(0)),
 
                    1 => {
 
                        let msg = x.read_gotten(0).unwrap();
 
                        assert!(msg[0] % 2 == 0); // assert msg is even
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
 
        },
 
    ]);
 
}
src/test/mod.rs
Show inline comments
 
use crate::common::ControllerId;
 
use crate::runtime::Connector;
 
use crate::runtime::Unconfigured;
 
use core::fmt::Debug;
 

	
 
mod connector;
 
mod setup;
 

	
 
struct Panicked(Box<dyn std::any::Any>);
 
impl Debug for Panicked {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        if let Some(str_slice) = self.0.downcast_ref::<&'static str>() {
 
            f.pad(str_slice)
 
        } else if let Some(string) = self.0.downcast_ref::<String>() {
 
            f.pad(string)
 
        } else {
 
            f.pad("Box<Any>")
 
        }
 
    }
 
}
 
fn handle(result: Result<(), Box<(dyn std::any::Any + Send + 'static)>>) {
 
    if let Err(x) = result {
 
        panic!("Worker panicked: {:?}", Panicked(x))
 
    }
 
}
 

	
 
fn do_all(i: &[&(dyn Fn(&mut Connector) + Sync)]) {
 
    let cid_iter = 0..(i.len() as ControllerId);
 
    let mut connectors = cid_iter
 
        .clone()
 
        .map(|controller_id| Connector::Unconfigured(Unconfigured { controller_id }))
 
        .collect::<Vec<_>>();
 

	
 
    let mut results = vec![];
 
    crossbeam_utils::thread::scope(|s| {
 
        let handles: Vec<_> = i
 
            .iter()
 
            .zip(connectors.iter_mut())
 
            .map(|(func, connector)| s.spawn(move |_| func(connector)))
 
            .collect();
 
        for h in handles {
 
            results.push(h.join());
 
        }
 
    })
 
    .unwrap();
 

	
 
    let mut failures = false;
 

	
 
    for ((controller_id, connector), res) in
 
        cid_iter.zip(connectors.iter_mut()).zip(results.into_iter())
 
    {
 
        println!("====================\n CID {:?} ...", controller_id);
 
        println!("\n\n====================\n CID {:?} ...", controller_id);
 
        match connector.get_mut_logger() {
 
            Some(logger) => println!("{}", logger),
 
            None => println!("<No Log>"),
 
        }
 
        match res {
 
            Ok(()) => println!("CID {:?} OK!", controller_id),
 
            Err(e) => {
 
                failures = true;
 
                println!("CI {:?} PANIC! {:?}", controller_id, Panicked(e));
 
            }
 
        };
 
    }
 
    if failures {
 
        panic!("FAILURES!");
 
    }
 
}
0 comments (0 inline, 0 general)