Changeset - 27ba74fbac9f
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-08-28 11:58:15
christopher.esterhuyse@gmail.com
made the (static) logging more modular in preparation for benchmarking
3 files changed with 25 insertions and 15 deletions:
0 comments (0 inline, 0 general)
src/macros.rs
Show inline comments
 
macro_rules! endptlog {
 
    ($logger:expr, $($arg:tt)*) => {{
 
    	if cfg!(feature = "endpoint_logging") {
 
/*
 
Change the definition of these macros to control the logging level statically
 
*/
 

	
 
macro_rules! log {
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // ignore
 
    }};
 
    (@COMM_NB, $logger:expr, $($arg:tt)*) => {{
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
	    }
 
    }};
 
}
 
macro_rules! log {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -201,12 +201,13 @@ impl Connector {
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 
        log!(
 
            @COMM_NB,
 
            cu.inner.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 

	
 
@@ -251,12 +252,13 @@ impl Connector {
 
                    branching_proto_components
 
                        .insert(proto_component_id, BranchingProtoComponent::initial(component));
 
                }
 
            }
 
        }
 
        log!(
 
            @COMM_NB,
 
            cu.inner.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
@@ -281,13 +283,13 @@ impl Connector {
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.inner.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.inner.logger, "Round context structure initialized");
 
        log!(@COMM_NB, cu.inner.logger, "Round context structure initialized");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.inner.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
@@ -350,20 +352,21 @@ impl Connector {
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        log!(@COMM_NB, cu.inner.logger, "Searching for decision...");
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        log!(@COMM_NB, cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
@@ -399,13 +402,13 @@ impl Connector {
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate)))
 
            }
 
        };
 
        log!(cu.inner.logger, "Sync round ending! Cleaning up");
 
        log!(@COMM_NB, cu.inner.logger, "Sync round ending! Cleaning up");
 
        ret
 
    }
 

	
 
    fn sync_reach_decision(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
src/runtime/endpoints.rs
Show inline comments
 
@@ -30,25 +30,27 @@ impl NetEndpoint {
 
                Err(e) if would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(Nee::BrokenNetEndpoint),
 
            }
 
        }
 
        endptlog!(
 
        log!(
 
            @ENDPT,
 
            logger,
 
            "Inbox bytes [{:x?}| {:x?}]",
 
            DenseDebugHex(&self.inbox[..before_len]),
 
            DenseDebugHex(&self.inbox[before_len..]),
 
        );
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        use bincode::config::Options;
 
        match Self::bincode_opts().deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                endptlog!(
 
                log!(
 
                    @ENDPT,
 
                    logger,
 
                    "Yielding msg. Inbox len {}-{}=={}: [{:?}]",
 
                    self.inbox.len() + msg_size,
 
                    msg_size,
 
                    self.inbox.len(),
 
                    DenseDebugHex(&self.inbox[..]),
 
@@ -120,13 +122,13 @@ impl EndpointManager {
 
                trane.error,
 
            )
 
        }
 
        ///////////////////////////////////////////
 
        // try yield undelayed net message
 
        if let Some(tup) = self.undelayed_messages.pop() {
 
            endptlog!(logger, "RECV undelayed_msg {:?}", &tup);
 
            log!(@ENDPT, logger, "RECV undelayed_msg {:?}", &tup);
 
            return Ok(tup);
 
        }
 
        loop {
 
            // try recv from some polled undrained NET endpoint
 
            if let Some(tup) = self
 
                .try_recv_undrained_net(logger)
 
@@ -270,13 +272,13 @@ impl EndpointManager {
 
        while let Some(index) = self.net_endpoint_store.polled_undrained.pop() {
 
            let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
            if let Some(msg) = net_endpoint
 
                .try_recv(logger)
 
                .map_err(|error| TryRecvAnyNetError { error, index })?
 
            {
 
                endptlog!(logger, "RECV polled_undrained {:?}", &msg);
 
                log!(@ENDPT, logger, "RECV polled_undrained {:?}", &msg);
 
                if !net_endpoint.inbox.is_empty() {
 
                    // there may be another message waiting!
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                }
 
                return Ok(Some((index, msg)));
 
            }
 
@@ -301,23 +303,25 @@ impl EndpointManager {
 
            match TokenTarget::from(event.token()) {
 
                TokenTarget::Waker => {
 
                    // Can ignore. Residual event from endpoint manager setup procedure
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                    log!(
 
                        @ENDPT,
 
                        logger,
 
                        "RECV poll event {:?} for NET endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.net_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
                TokenTarget::UdpEndpoint { index } => {
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                    log!(
 
                        @ENDPT,
 
                        logger,
 
                        "RECV poll event {:?} for UDP endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.udp_endpoint_store.polled_undrained.iter()
 
                    );
0 comments (0 inline, 0 general)