Changeset - e8655e939407
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-03-06 10:56:51
christopher.esterhuyse@gmail.com
cleaned up
3 files changed with 17 insertions and 27 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -243,83 +243,93 @@ impl Controller {
 
            let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?;
 
            log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker);
 
            match blocker {
 
                Srr::NoBranches => return Err(SyncErr::Inconsistent),
 
                Srr::AllBranchesComplete | Srr::BlockingForRecv => (),
 
            }
 
        }
 
        log!(&mut self.inner.logger, "All Poly machines have been kicked off!");
 

	
 
        // 7. `solution_storage` may have new solutions for this controller
 
        //    handle their discovery. LEADER => announce, otherwise => send to parent
 
        {
 
            let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::<Vec<_>>();
 
            log!(
 
                &mut self.inner.logger,
 
                "Got {} controller-local solutions before a single RECV: {:?}",
 
                peeked.len(),
 
                peeked
 
            );
 
        }
 
        if self.handle_locals_maybe_decide()? {
 
            return Ok(());
 
        }
 

	
 
        // 4. Receive incoming messages until the DECISION is made
 
        // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error
 
        log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages");
 
        self.undelay_all();
 
        'recv_loop: loop {
 
            log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline);
 
            let received = match deadline {
 
                None => {
 
                    // we have personally timed out. perform a "long" poll.
 
                    self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP")
 
                }
 
                Some(d) => match self.recv(d)? {
 
                    // we have not yet timed out. performed a time-limited poll
 
                    Some(received) => received,
 
                    None => {
 
                        // timed out! send a FAILURE message to the sink,
 
                        // and henceforth don't time out on polling.
 
                        deadline = None;
 
                        match self.inner.family.parent_ekey {
 
                            None => {
 
                                // I am the sink! announce failure and return.
 
                                return self.end_round_with_decision(Decision::Failure);
 
                            }
 
                            Some(parent_ekey) => {
 
                                // I am not the sink! send a failure message.
 
                                let announcement = Msg::CommMsg(CommMsg {
 
                                    round_index: self.inner.round_index,
 
                                    contents: CommMsgContents::Failure,
 
                                });
 
                                log!(
 
                                    &mut self.inner.logger,
 
                                    "Forwarding {:?} to parent with ekey {:?}",
 
                                    &announcement,
 
                                    parent_ekey
 
                                );
 
                                self.inner
 
                                    .endpoint_exts
 
                                    .get_mut(parent_ekey)
 
                                    .expect("ss")
 
                                    .endpoint
 
                                    .send(announcement.clone())?;
 
                                continue; // poll some more
 
                            }
 
                            None => return self.end_round_with_decision(Decision::Failure),
 
                        }
 
                        continue;
 
                    }
 
                },
 
                None => self.recv(Instant::now() + Duration::from_secs(2))?.expect("DRIED UP"),
 
            };
 
            log!(&mut self.inner.logger, "::: message {:?}...", &received);
 
            let current_content = match received.msg {
 
                Msg::SetupMsg(s) => {
 
                    // This occurs in the event the connector was malformed during connect()
 
                    println!("WASNT EXPECTING {:?}", s);
 
                    return Err(SyncErr::UnexpectedSetupMsg);
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index < self.inner.round_index =>
 
                {
 
                    // Old message! Can safely discard
 
                    log!(&mut self.inner.logger, "...and its OLD! :(");
 
                    drop(received);
 
                    continue 'recv_loop;
 
                }
 
                Msg::CommMsg(CommMsg { round_index, .. })
 
                    if round_index > self.inner.round_index =>
 
                {
 
                    // Message from a next round. Keep for later!
 
                    log!(&mut self.inner.logger, "... DELAY! :(");
 
                    self.delay(received);
 
                    continue 'recv_loop;
 
                }
 
@@ -677,30 +687,24 @@ impl PolyContext for BranchPContext<'_, '_> {
 
    fn is_firing(&mut self, ekey: Key) -> Option<bool> {
 
        assert!(self.ekeys.contains(&ekey));
 
        let channel_id = self.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id;
 
        let val = self.predicate.query(channel_id);
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to is_firing by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
    fn read_msg(&mut self, ekey: Key) -> Option<&Payload> {
 
        assert!(self.ekeys.contains(&ekey));
 
        let val = self.inbox.get(&ekey);
 
        log!(
 
            &mut self.m_ctx.inner.logger,
 
            "!! PolyContext callback to read_msg by {:?}! returning {:?}",
 
            self.m_ctx.my_subtree_id,
 
            val,
 
        );
 
        val
 
    }
 
}
 

	
 
/*
 
invariant: Controller.inner has stable MonoN/P states for which it will start the
 

	
 

	
 
*/
src/runtime/mod.rs
Show inline comments
 
@@ -74,69 +74,54 @@ pub enum PortBinding {
 
struct Arena<T> {
 
    storage: Vec<T>,
 
}
 

	
 
#[derive(Debug)]
 
struct ReceivedMsg {
 
    recipient: Key,
 
    msg: Msg,
 
}
 

	
 
#[derive(Debug)]
 
struct MessengerState {
 
    poll: Poll,
 
    events: Events,
 
    delayed: Vec<ReceivedMsg>,
 
    undelayed: Vec<ReceivedMsg>,
 
    polled_undrained: IndexSet<Key>,
 
}
 
#[derive(Debug)]
 
struct ChannelIdStream {
 
    controller_id: ControllerId,
 
    next_channel_index: ChannelIndex,
 
}
 

	
 
#[derive(Debug)]
 
enum RoundHistory {
 
    Consistent {
 
        decision: Predicate,
 
        native_component: (MonoN, PolyN),
 
        protocol_components: Box<[(MonoP, PolyP)]>,
 
    },
 
    Inconsistent {
 
        error: SyncErr,
 
        subtree_solutions: SolutionStorage,
 
        native_component: PolyN,
 
        protocol_components: Box<[PolyP]>,
 
    },
 
}
 

	
 
#[derive(Debug)]
 
struct Controller {
 
    protocol_description: Arc<ProtocolD>,
 
    inner: ControllerInner,
 
    ephemeral: ControllerEphemeral,
 
    round_histories: Vec<RoundHistory>,
 
    unrecoverable_error: Option<SyncErr>, // prevents future calls to Sync
 
}
 
#[derive(Debug)]
 
struct ControllerInner {
 
    round_index: usize,
 
    channel_id_stream: ChannelIdStream,
 
    endpoint_exts: Arena<EndpointExt>,
 
    messenger_state: MessengerState,
 
    mono_n: MonoN,       // state at next round start
 
    mono_ps: Vec<MonoP>, // state at next round start
 
    family: ControllerFamily,
 
    logger: String,
 
}
 

	
 
/// This structure has its state entirely reset between synchronous rounds
 
#[derive(Debug, Default)]
 
struct ControllerEphemeral {
 
    solution_storage: SolutionStorage,
 
    poly_n: Option<PolyN>,
 
    poly_ps: Vec<PolyP>,
 
    mono_ps: Vec<MonoP>,
 
    ekey_to_holder: HashMap<Key, PolyId>,
 
}
 

	
 
#[derive(Debug)]
src/runtime/setup.rs
Show inline comments
 
@@ -110,49 +110,50 @@ impl Controller {
 
        let family = Self::setup_sink_tree_family(
 
            major,
 
            &mut logger,
 
            &mut endpoint_exts,
 
            &mut messenger_state,
 
            ekeys_network,
 
            deadline,
 
        )?;
 

	
 
        log!(&mut logger, "CONNECT PHASE END! ~");
 
        let inner = ControllerInner {
 
            family,
 
            messenger_state,
 
            channel_id_stream,
 
            endpoint_exts,
 
            mono_ps: p_monos,
 
            mono_n: n_mono,
 
            round_index: 0,
 
            logger,
 
        };
 
        let controller = Self {
 
            protocol_description,
 
            inner,
 
            ephemeral: Default::default(),
 
            round_histories: vec![],
 
            // round_histories: vec![],
 
            unrecoverable_error: None,
 
        };
 
        Ok((controller, native_interface))
 
    }
 

	
 
    fn test_stream_connectivity(stream: &mut TcpStream) -> bool {
 
        use std::io::Write;
 
        stream.write(&[]).is_ok()
 
    }
 

	
 
    // inserts
 
    fn finish_endpoint_ext_todos(
 
        major: ControllerId,
 
        logger: &mut String,
 
        mut endpoint_ext_todos: Arena<EndpointExtTodo>,
 
        deadline: Instant,
 
    ) -> Result<(MessengerState, Arena<EndpointExt>), ConnectErr> {
 
        use {ConnectErr::*, EndpointExtTodo::*};
 

	
 
        // 1. define and setup a poller and event loop
 
        let edge = PollOpt::edge();
 
        let [ready_r, ready_w] = [Ready::readable(), Ready::writable()];
 
        let mut ms = MessengerState {
 
            poll: Poll::new().map_err(|_| PollInitFailed)?,
 
            events: Events::with_capacity(endpoint_ext_todos.len()),
0 comments (0 inline, 0 general)