Changeset - 9154f8a5674d
[Not reviewed]
Merge
0 7 0
Hans-Dieter Hiep - 5 years ago 2020-02-04 17:26:06
hdh@cwi.nl
Merge branch 'master' of github.com:sirkibsirkib/Reowolf
7 files changed with 131 insertions and 95 deletions:
0 comments (0 inline, 0 general)
src/macros.rs
Show inline comments
 
@@ -2,12 +2,18 @@ macro_rules! lockprintln {
 
    () => (print!("\n"));
 
    ($($arg:tt)*) => ({
 
        use std::io::Write;
 
        std::writeln!(std::io::stdout().lock(), $($arg)*).expect("LPRINTLN");
 
    })
 
}
 
macro_rules! log {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        use std::fmt::Write;
 
        writeln!($logger, $($arg)*).unwrap();
 
    }};
 
}
 
macro_rules! assert_let {
 
    ($pat:pat = $expr:expr => $work:expr) => {
 
        if let $pat = $expr {
 
            $work
 
        } else {
 
            panic!("assert_let failed");
src/runtime/actors.rs
Show inline comments
 
@@ -53,25 +53,25 @@ impl PolyP {
 
        mut m_ctx: PolyPContext,
 
        protocol_description: &ProtocolD,
 
        mut to_run: Vec<(Predicate, BranchP)>,
 
    ) -> Result<SyncRunResult, EndpointErr> {
 
        use SyncRunResult as Srr;
 
        let cid = m_ctx.inner.channel_id_stream.controller_id;
 
        lockprintln!("{:?}: ~ Running branches for PolyP {:?}!", cid, m_ctx.my_subtree_id,);
 
        log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,);
 
        while let Some((mut predicate, mut branch)) = to_run.pop() {
 
            let mut r_ctx = BranchPContext {
 
                m_ctx: m_ctx.reborrow(),
 
                ekeys: &self.ekeys,
 
                predicate: &predicate,
 
                inbox: &branch.inbox,
 
            };
 
            use PolyBlocker as Sb;
 
            let blocker = branch.state.sync_run(&mut r_ctx, protocol_description);
 
            lockprintln!(
 
                "{:?}: ~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                cid,
 
            log!(
 
                &mut r_ctx.m_ctx.inner.logger,
 
                "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                r_ctx.m_ctx.my_subtree_id,
 
                &predicate,
 
                &blocker
 
            );
 
            match blocker {
 
                Sb::Inconsistent => {} // DROP
 
@@ -86,16 +86,16 @@ impl PolyP {
 
                        channel_id,
 
                        &branch.inbox,
 
                    );
 
                    if predicate.replace_assignment(channel_id, true) != Some(false) {
 
                        // don't rerun now. Rerun at next `sync_run`
 

	
 
                        lockprintln!("{:?}: ~ ... Delay {:?}", cid, m_ctx.my_subtree_id,);
 
                        log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,);
 
                        self.incomplete.insert(predicate, branch);
 
                    } else {
 
                        lockprintln!("{:?}: ~ ... Drop {:?}", cid, m_ctx.my_subtree_id,);
 
                        log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,);
 
                    }
 
                    // ELSE DROP
 
                }
 
                Sb::CouldntCheckFiring(ekey) => {
 
                    assert!(self.ekeys.contains(&ekey));
 
                    let channel_id =
 
@@ -108,42 +108,41 @@ impl PolyP {
 
                    }
 
                    assert!(predicate.replace_assignment(channel_id, true).is_none());
 
                    to_run.push((predicate, branch));
 
                    to_run.push((predicate_f, branch_f));
 
                }
 
                Sb::SyncBlockEnd => {
 
                    log!(
 
                        &mut m_ctx.inner.logger,
 
                        "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                        m_ctx.my_subtree_id,
 
                        &predicate,
 
                        &blocker
 
                    );
 
                    // come up with the predicate for this local solution
 
                    let lookup =
 
                        |&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
                    let ekeys_channel_id_iter = self.ekeys.iter().map(lookup);
 
                    predicate.batch_assign_nones(ekeys_channel_id_iter, false);
 

	
 
                    lockprintln!(
 
                        "{:?}: ~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}",
 
                        cid,
 
                        m_ctx.my_subtree_id,
 
                        &predicate,
 
                        &blocker
 
                    );
 

	
 
                    // OK now check we really received all the messages we expected to
 
                    let num_fired = predicate.iter_matching(true).count();
 
                    let num_msgs =
 
                        branch.inbox.keys().chain(branch.outbox.keys()).map(lookup).count();
 
                    match num_fired.cmp(&num_msgs) {
 
                        Ordering::Less => unreachable!(),
 
                        Ordering::Greater => lockprintln!(
 
                            "{:?}: {:?} with pred {:?} finished but |inbox|+|outbox| < .",
 
                            cid,
 
                        Ordering::Greater => log!(
 
                            &mut m_ctx.inner.logger,
 
                            "{:?} with pred {:?} finished but |inbox|+|outbox| < .",
 
                            m_ctx.my_subtree_id,
 
                            &predicate,
 
                        ),
 
                        Ordering::Equal => {
 
                            lockprintln!(
 
                                "{:?}: {:?} with pred {:?} finished! Storing this solution locally.",
 
                                cid,
 
                            log!(
 
                                &mut m_ctx.inner.logger,
 
                                "{:?} with pred {:?} finished! Storing this solution locally.",
 
                                m_ctx.my_subtree_id,
 
                                &predicate,
 
                            );
 
                            m_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                                m_ctx.my_subtree_id,
 
                                predicate.clone(),
 
@@ -284,13 +283,17 @@ impl PolyP {
 
                    }
 
                })
 
                .collect();
 
            std::mem::swap(&mut self.incomplete, &mut incomplete2);
 
            to_run
 
        };
 
        lockprintln!("{:?}: ... DONE FEEDING BRANCHES. {} branches to run!", cid, to_run.len(),);
 
        log!(
 
            &mut m_ctx.inner.logger,
 
            "... DONE FEEDING BRANCHES. {} branches to run!",
 
            to_run.len(),
 
        );
 
        self.poly_run_these_branches(m_ctx, protocol_description, to_run)
 
    }
 

	
 
    pub(crate) fn become_mono(
 
        mut self,
 
        decision: &Predicate,
src/runtime/communication.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::{actors::*, endpoint::*, errors::*, *};
 

	
 
impl Controller {
 
    fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> {
 
        let cid = self.inner.channel_id_stream.controller_id;
 
        lockprintln!("{:?}: ENDING ROUND WITH DECISION! {:?}", cid, &decision);
 
        log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision);
 

	
 
        let mut table_row = HashMap::default();
 
        self.inner.mono_n = self
 
            .ephemeral
 
            .poly_n
 
            .take()
 
            .map(|poly_n| poly_n.become_mono(&decision, &mut table_row));
 
        self.inner.mono_ps.extend(
 
            self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut table_row)),
 
        );
 
        for (ekey, payload) in table_row {
 
            let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
            lockprintln!("{:?}: VALUE {:?} => Message({:?})", cid, channel_id, payload);
 
            log!(&mut self.inner.logger, "VALUE {:?} => Message({:?})", channel_id, payload);
 
        }
 
        for channel_id in decision.iter_matching(false) {
 
            lockprintln!("{:?}: VALUE {:?} => *", cid, channel_id);
 
            log!(&mut self.inner.logger, "VALUE {:?} => *", channel_id);
 
        }
 
        let announcement =
 
            CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index);
 
        for &child_ekey in self.inner.family.children_ekeys.iter() {
 
            lockprintln!(
 
                "{:?}: Forwarding {:?} to child with ekey {:?}",
 
                cid,
 
            log!(
 
                &mut self.inner.logger,
 
                "Forwarding {:?} to child with ekey {:?}",
 
                &announcement,
 
                child_ekey
 
            );
 
            self.inner
 
                .endpoint_exts
 
                .get_mut(child_ekey)
 
@@ -42,30 +41,29 @@ impl Controller {
 
        self.ephemeral.clear();
 
        Ok(())
 
    }
 

	
 
    // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found
 
    fn handle_locals_maybe_decide(&mut self) -> Result<bool, SyncErr> {
 
        let cid = self.inner.channel_id_stream.controller_id;
 
        if let Some(parent_ekey) = self.inner.family.parent_ekey {
 
            // I have a parent -> I'm not the leader
 
            let parent_endpoint =
 
                &mut self.inner.endpoint_exts.get_mut(parent_ekey).expect("huu").endpoint;
 
            for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() {
 
                let msg =
 
                    CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index);
 
                lockprintln!("{:?}: Sending {:?} to parent {:?}", cid, &msg, parent_ekey);
 
                log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_ekey);
 
                parent_endpoint.send(msg)?;
 
            }
 
            Ok(false)
 
        } else {
 
            // I have no parent -> I'm the leader
 
            assert!(self.inner.family.parent_ekey.is_none());
 
            let maybe_decision = self.ephemeral.solution_storage.iter_new_local_make_old().next();
 
            Ok(if let Some(decision) = maybe_decision {
 
                lockprintln!("{:?}: DECIDE ON {:?} AS LEADER!", cid, &decision);
 
                log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision);
 
                self.end_round_with_decision(decision)?;
 
                true
 
            } else {
 
                false
 
            })
 
        }
 
@@ -129,42 +127,44 @@ impl Controller {
 
        sync_batches: Option<impl Iterator<Item = SyncBatch>>,
 
    ) -> Result<(), SyncErr> {
 
        // TODO! fuse handle_locals_return_decision and end_round_return_decision
 

	
 
        assert!(self.ephemeral.is_clear());
 

	
 
        let cid = self.inner.channel_id_stream.controller_id;
 
        lockprintln!();
 
        lockprintln!("~~~~~~ {:?}: SYNC ROUND STARTS! ROUND={}", cid, self.inner.round_index);
 
        log!(
 
            &mut self.inner.logger,
 
            "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~",
 
            self.inner.round_index
 
        );
 

	
 
        // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`).
 
        //    Some actors are dropped. some new actors are created.
 
        //    Ultimately, we have 0 Mono actors and a list of unnamed sync_actors
 
        lockprintln!("{:?}: Got {} MonoP's to run!", cid, self.inner.mono_ps.len());
 
        log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.inner.mono_ps.len());
 
        self.ephemeral.poly_ps.clear();
 
        // let mut poly_ps: Vec<PolyP> = vec![];
 
        while let Some(mut mono_p) = self.inner.mono_ps.pop() {
 
            let mut m_ctx = MonoPContext {
 
                ekeys: &mut mono_p.ekeys,
 
                inner: &mut self.inner,
 
                // endpoint_exts: &mut self.endpoint_exts,
 
                // mono_ps: &mut self.mono_ps,
 
                // channel_id_stream: &mut self.channel_id_stream,
 
            };
 
            // cross boundary into crate::protocol
 
            let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description);
 
            lockprintln!("{:?}: ... MonoP's pre_sync_run got blocker {:?}", cid, &blocker);
 
            log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker);
 
            match blocker {
 
                MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent),
 
                MonoBlocker::ComponentExit => drop(mono_p),
 
                MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()),
 
            }
 
        }
 
        lockprintln!(
 
            "{:?}: Finished running all MonoPs! Have {} PolyPs waiting",
 
            cid,
 
        log!(
 
            &mut self.inner.logger,
 
            "Finished running all MonoPs! Have {} PolyPs waiting",
 
            self.ephemeral.poly_ps.len()
 
        );
 

	
 
        // 3. define the mapping from ekey -> actor
 
        //    this is needed during the event loop to determine which actor
 
        //    should receive the incoming message.
 
@@ -177,15 +177,15 @@ impl Controller {
 
                .poly_ps
 
                .iter()
 
                .enumerate()
 
                .flat_map(|(index, m)| m.ekeys.iter().map(move |&e| (e, P { index })));
 
            n.chain(p).collect()
 
        };
 
        lockprintln!(
 
            "{:?}: SET OF PolyPs and MonoPs final! ekey lookup map is {:?}",
 
            cid,
 
        log!(
 
            &mut self.inner.logger,
 
            "SET OF PolyPs and MonoPs final! ekey lookup map is {:?}",
 
            &ekey_to_holder
 
        );
 

	
 
        // 4. Create the solution storage. it tracks the solutions of "subtrees"
 
        //    of the controller in the overlay tree.
 
        self.ephemeral.solution_storage.reset({
 
@@ -195,122 +195,126 @@ impl Controller {
 
                .inner
 
                .family
 
                .children_ekeys
 
                .iter()
 
                .map(|&ekey| SubtreeId::ChildController { ekey });
 
            let subtree_id_iter = n.chain(m).chain(c);
 
            lockprintln!(
 
                "{:?}: Solution Storage has subtree Ids: {:?}",
 
                cid,
 
            log!(
 
                &mut self.inner.logger,
 
                "Solution Storage has subtree Ids: {:?}",
 
                &subtree_id_iter.clone().collect::<Vec<_>>()
 
            );
 
            subtree_id_iter
 
        });
 

	
 
        // 5. kick off the synchronous round of the native actor if it exists
 

	
 
        lockprintln!("{:?}: Kicking off native's synchronous round...", cid);
 
        log!(&mut self.inner.logger, "Kicking off native's synchronous round...");
 
        assert_eq!(sync_batches.is_some(), self.inner.mono_n.is_some()); // TODO better err
 
        self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches {
 
            // using if let because of nested ? operator
 
            // TODO check that there are 1+ branches or NO SOLUTION
 
            let poly_n = self.kick_off_native(sync_batches)?;
 
            lockprintln!(
 
                "{:?}: PolyN kicked off, and has branches with predicates... {:?}",
 
                cid,
 
            log!(
 
                &mut self.inner.logger,
 
                "PolyN kicked off, and has branches with predicates... {:?}",
 
                poly_n.branches.keys().collect::<Vec<_>>()
 
            );
 
            Some(poly_n)
 
        } else {
 
            lockprintln!("{:?}: NO NATIVE COMPONENT", cid);
 
            log!(&mut self.inner.logger, "NO NATIVE COMPONENT");
 
            None
 
        };
 

	
 
        // 6. Kick off the synchronous round of each protocol actor
 
        //    If just one actor becomes inconsistent now, there can be no solution!
 
        //    TODO distinguish between completed and not completed poly_p's?
 
        lockprintln!("{:?}: Kicking off {} PolyP's.", cid, self.ephemeral.poly_ps.len());
 
        log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len());
 
        for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() {
 
            let my_subtree_id = SubtreeId::PolyP { index };
 
            let m_ctx = PolyPContext {
 
                my_subtree_id,
 
                inner: &mut self.inner,
 
                solution_storage: &mut self.ephemeral.solution_storage,
 
            };
 
            use SyncRunResult as Srr;
 
            let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?;
 
            lockprintln!("{:?}: ... PolyP's poly_run got blocker {:?}", cid, &blocker);
 
            log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
            match blocker {
 
                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
            }
 
        }
 
        lockprintln!("{:?}: All Poly machines have been kicked off!", cid);
 
        log!(&mut self.inner.logger, "All Poly machines have been kicked off!");
 

	
 
        // 7. `solution_storage` may have new solutions for this controller
 
        //    handle their discovery. LEADER => announce, otherwise => send to parent
 
        {
 
            let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
            lockprintln!(
 
                "{:?}: Got {} controller-local solutions before a single RECV: {:?}",
 
                cid,
 
            log!(
 
                &mut self.inner.logger,
 
                "Got {} controller-local solutions before a single RECV: {:?}",
 
                peeked.len(),
 
                peeked
 
            );
 
        }
 
        if self.handle_locals_maybe_decide()? {
 
            return Ok(());
 
        }
 

	
 
        // 4. Receive incoming messages until the DECISION is made
 
        lockprintln!("{:?}: No decision yet. Time to recv messages", cid);
 
        log!(&mut self.inner.logger, "No decision yet. Time to recv messages");
 
        self.undelay_all();
 
        'recv_loop: loop {
 
            let received = self.recv(deadline)?.ok_or(SyncErr::Timeout)?;
 
            let current_content = match received.msg {
 
                Msg::SetupMsg(_) => {
 
                    lockprintln!("{:?}: recvd message {:?} and its SETUP :(", cid, &received);
 
                    log!(&mut self.inner.logger, "recvd message {:?} and its SETUP :(", &received);
 
                    // This occurs in the event the connector was malformed during connect()
 
                    return Err(SyncErr::UnexpectedSetupMsg);
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index < self.inner.round_index =>
 
                {
 
                    // Old message! Can safely discard
 
                    lockprintln!("{:?}: recvd message {:?} and its OLD! :(", cid, &received);
 
                    log!(&mut self.inner.logger, "recvd message {:?} and its OLD! :(", &received);
 
                    drop(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index > self.inner.round_index =>
 
                {
 
                    // Message from a next round. Keep for later!
 
                    lockprintln!(
 
                        "{:?}: recvd message {:?} and its for later. DELAY! :(",
 
                        cid,
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "ecvd message {:?} and its for later. DELAY! :(",
 
                        &received
 
                    );
 
                    self.delay(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { contents, round_index }) => {
 
                    lockprintln!("{:?}: recvd a round-appropriate CommMsg {:?}", cid, &contents);
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "recvd a round-appropriate CommMsg {:?}",
 
                        &contents
 
                    );
 
                    assert_eq!(round_index, self.inner.round_index);
 
                    contents
 
                }
 
            };
 
            match current_content {
 
                CommMsgContents::Elaborate { partial_oracle } => {
 
                    // Child controller submitted a subtree solution.
 
                    if !self.inner.family.children_ekeys.contains(&received.recipient) {
 
                        return Err(SyncErr::ElaborateFromNonChild);
 
                    }
 
                    let subtree_id = SubtreeId::ChildController { ekey: received.recipient };
 
                    lockprintln!(
 
                        "{:?}: Received elaboration from child for subtree {:?}: {:?}",
 
                        cid,
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received elaboration from child for subtree {:?}: {:?}",
 
                        subtree_id,
 
                        &partial_oracle
 
                    );
 
                    self.ephemeral
 
                        .solution_storage
 
                        .submit_and_digest_subtree_solution(subtree_id, partial_oracle);
 
@@ -320,27 +324,30 @@ impl Controller {
 
                    }
 
                }
 
                CommMsgContents::Announce { oracle } => {
 
                    if self.inner.family.parent_ekey != Some(received.recipient) {
 
                        return Err(SyncErr::AnnounceFromNonParent);
 
                    }
 
                    lockprintln!(
 
                        "{:?}: Received ANNOUNCEMENT from from parent {:?}: {:?}",
 
                        cid,
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received ANNOUNCEMENT from from parent {:?}: {:?}",
 
                        received.recipient,
 
                        &oracle
 
                    );
 
                    return self.end_round_with_decision(oracle);
 
                }
 
                CommMsgContents::SendPayload { payload_predicate, payload } => {
 
                    // message for some actor. Feed it to the appropriate actor
 
                    // and then give them another chance to run.
 
                    let subtree_id = ekey_to_holder.get(&received.recipient);
 
                    lockprintln!(
 
                        "{:?}: Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
                        cid, subtree_id, &payload_predicate, &payload
 
                    log!(
 
                        &mut self.inner.logger,
 
                        "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}",
 
                        subtree_id,
 
                        &payload_predicate,
 
                        &payload
 
                    );
 
                    match subtree_id {
 
                        None => {
 
                            // this happens when a message is sent to a component that has exited.
 
                            // It's safe to drop this message;
 
                            // The sender branch will certainly not be part of the solution
 
@@ -379,39 +386,41 @@ impl Controller {
 
                                m_ctx,
 
                                &self.protocol_description,
 
                                received.recipient,
 
                                payload_predicate,
 
                                payload,
 
                            )?;
 
                            lockprintln!(
 
                                "{:?}: ... Fed the msg to PolyP {:?} and ran it to blocker {:?}",
 
                                cid,
 
                            log!(
 
                                &mut self.inner.logger,
 
                                "... Fed the msg to PolyP {:?} and ran it to blocker {:?}",
 
                                subtree_id,
 
                                blocker
 
                            );
 
                            match blocker {
 
                                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                                Srr::BlockingForRecv | Srr::AllBranchesComplete => {
 
                                    continue 'recv_loop
 
                                    {
 
                                        let peeked = self
 
                                            .ephemeral
 
                                            .solution_storage
 
                                            .peek_new_locals()
 
                                            .collect::<Vec<_>>();
 
                                        log!(
 
                                            &mut self.inner.logger,
 
                                            "Got {} new controller-local solutions from RECV: {:?}",
 
                                            peeked.len(),
 
                                            peeked
 
                                        );
 
                                    }
 
                                    if self.handle_locals_maybe_decide()? {
 
                                        return Ok(());
 
                                    }
 
                                }
 
                            }
 
                        }
 
                    };
 
                    {
 
                        let peeked =
 
                            self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
                        lockprintln!(
 
                            "{:?}: Got {} new controller-local solutions from RECV: {:?}",
 
                            cid,
 
                            peeked.len(),
 
                            peeked
 
                        );
 
                    }
 
                    if self.handle_locals_maybe_decide()? {
 
                        return Ok(());
 
                    }
 
                }
 
            }
 
        }
 
    }
 
}
 
impl ControllerEphemeral {
src/runtime/connector.rs
Show inline comments
 
@@ -89,12 +89,18 @@ impl Connector {
 
            native_interface,
 
            sync_batches: vec![Default::default()],
 
            controller,
 
        });
 
        Ok(())
 
    }
 
    pub fn get_mut_logger(&mut self) -> Option<&mut String> {
 
        match self {
 
            Connector::Connected(connected) => Some(&mut connected.controller.inner.logger),
 
            _ => None,
 
        }
 
    }
 

	
 
    pub fn put(&mut self, native_port_index: usize, payload: Payload) -> Result<(), PortOpErr> {
 
        use PortOpErr::*;
 
        let connected = match self {
 
            Connector::Connected(connected) => connected,
 
            _ => return Err(NotConnected),
src/runtime/mod.rs
Show inline comments
 
@@ -106,12 +106,13 @@ struct ControllerInner {
 
    channel_id_stream: ChannelIdStream,
 
    endpoint_exts: Arena<EndpointExt>,
 
    messenger_state: MessengerState,
 
    mono_n: Option<MonoN>,
 
    mono_ps: Vec<MonoP>,
 
    family: ControllerFamily,
 
    logger: String,
 
}
 

	
 
/// This structure has its state entirely reset between synchronous rounds
 
#[derive(Debug, Default)]
 
struct ControllerEphemeral {
 
    solution_storage: SolutionStorage,
src/runtime/setup.rs
Show inline comments
 
@@ -116,12 +116,13 @@ impl Controller {
 
            messenger_state,
 
            channel_id_stream,
 
            endpoint_exts,
 
            mono_ps: p_monos,
 
            mono_n: n_mono,
 
            round_index: 0,
 
            logger: String::default(),
 
        };
 
        let controller = Self { protocol_description, inner, ephemeral: Default::default() };
 
        Ok((controller, native_interface))
 
    }
 

	
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
src/test/connector.rs
Show inline comments
 
@@ -13,13 +13,14 @@ use crate::runtime::errors::*;
 

	
 
#[test]
 
fn incremental() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = ["127.0.0.1:7010".parse().unwrap(), "127.0.0.1:7011".parse().unwrap()];
 
    let a = thread::spawn(move || {
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id: 0 });
 
        let controller_id = 0;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(out a, out b) {
 
            synchronous {
 
                msg m = create(0);
 
                put(a, m);
 
            }
 
@@ -27,62 +28,69 @@ fn incremental() {
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
    });
 
    let b = thread::spawn(move || {
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id: 1 });
 
        let controller_id = 1;
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id });
 
        x.configure(
 
            b"primitive main(in a, in b) {
 
            synchronous {
 
                get(a);
 
            }
 
        }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Active(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        println!("\n---------\nLOG CID={}\n{}", controller_id, x.get_mut_logger().unwrap());
 
    });
 
    handle(a.join());
 
    handle(b.join());
 
}
 

	
 
#[test]
 
fn duo_positive() {
 
    let timeout = Duration::from_millis(1_500);
 
    let addrs = ["127.0.0.1:7012".parse().unwrap(), "127.0.0.1:7013".parse().unwrap()];
 
    let a = thread::spawn(move || {
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id: 0 });
 
        x.configure(b"
 
        x.configure(
 
            b"
 
        primitive main(out a, out b) {
 
            synchronous {}
 
            synchronous {}
 
            synchronous {
 
                msg m = create(0);
 
                put(a, m);
 
            }
 
            synchronous {
 
                msg m = create(0);
 
                put(b, m);
 
            }
 
        }").unwrap();
 
        }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Passive(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Passive(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
    });
 
    let b = thread::spawn(move || {
 
        let mut x = Connector::Unconfigured(Unconfigured { controller_id: 1 });
 
        x.configure(b"
 
        x.configure(
 
            b"
 
        primitive main(in a, in b) {
 
            while (true) {
 
                synchronous {
 
                    if (fires(a)) {
 
                        get(a);
 
                    }
 
@@ -90,13 +98,15 @@ fn duo_positive() {
 
                synchronous {
 
                    if (fires(b)) {
 
                        get(b);
 
                    }
 
                }
 
            }
 
        }").unwrap();
 
        }",
 
        )
 
        .unwrap();
 
        x.bind_port(0, PortBinding::Active(addrs[0])).unwrap();
 
        x.bind_port(1, PortBinding::Active(addrs[1])).unwrap();
 
        x.connect(timeout).unwrap();
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
 
        assert_eq!(0, x.sync(timeout).unwrap());
0 comments (0 inline, 0 general)