Changeset - 27ba74fbac9f
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-08-28 11:58:15
christopher.esterhuyse@gmail.com
made the (static) logging more modular in preparation for benchmarking
3 files changed with 31 insertions and 21 deletions:
0 comments (0 inline, 0 general)
src/macros.rs
Show inline comments
 
macro_rules! endptlog {
 
    ($logger:expr, $($arg:tt)*) => {{
 
    	if cfg!(feature = "endpoint_logging") {
 
	        if let Some(w) = $logger.line_writer() {
 
	        	let _ = writeln!(w, $($arg)*);
 
	        }
 
	    }
 
    }};
 
}
 
/*
 
Change the definition of these macros to control the logging level statically
 
*/
 

	
 
macro_rules! log {
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // ignore
 
    }};
 
    (@COMM_NB, $logger:expr, $($arg:tt)*) => {{
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
    ($logger:expr, $($arg:tt)*) => {{
 
    	if let Some(w) = $logger.line_writer() {
 
        	let _ = writeln!(w, $($arg)*);
 
    	}
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -195,24 +195,25 @@ impl Connector {
 
    }
 
    // private function. mutates state but returns with round
 
    // result ASAP (allows for convenient error return with ?)
 
    fn connected_sync(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 
        log!(
 
            @COMM_NB,
 
            cu.inner.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // NOTE: original components are immutable until Decision::Success
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
            cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
@@ -245,24 +246,25 @@ impl Connector {
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => {
 
                    branching_proto_components
 
                        .insert(proto_component_id, BranchingProtoComponent::initial(component));
 
                }
 
            }
 
        }
 
        log!(
 
            @COMM_NB,
 
            cu.inner.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            solution_storage: {
 
                let n = std::iter::once(SubtreeId::LocalComponent(ComponentId::Native));
 
                let c = cu
 
                    .proto_components
 
                    .keys()
 
@@ -275,25 +277,25 @@ impl Connector {
 
                let subtree_id_iter = n.chain(c).chain(e);
 
                log!(
 
                    cu.inner.logger,
 
                    "Children in subtree are: {:?}",
 
                    subtree_id_iter.clone().collect::<Vec<_>>()
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.inner.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.inner.logger, "Round context structure initialized");
 
        log!(@COMM_NB, cu.inner.logger, "Round context structure initialized");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.inner.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.inner.logger, "Native branch spec var is {:?}", native_spec_var);
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
@@ -344,32 +346,33 @@ impl Connector {
 
                    SubtreeId::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        log!(@COMM_NB, cu.inner.logger, "Searching for decision...");
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        log!(@COMM_NB, cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
                decision: decision.clone(),
 
            }),
 
        });
 
        log!(
 
            cu.inner.logger,
 
            "Announcing decision {:?} through child endpoints {:?}",
 
@@ -393,25 +396,25 @@ impl Connector {
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                log!(
 
                    cu.inner.logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate)))
 
            }
 
        };
 
        log!(cu.inner.logger, "Sync round ending! Cleaning up");
 
        log!(@COMM_NB, cu.inner.logger, "Sync round ending! Cleaning up");
 
        ret
 
    }
 

	
 
    fn sync_reach_decision(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ProtoComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
src/runtime/endpoints.rs
Show inline comments
 
@@ -24,37 +24,39 @@ impl NetEndpoint {
 
        use NetEndpointError as Nee;
 
        // populate inbox as much as possible
 
        let before_len = self.inbox.len();
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            match res {
 
                Err(e) if would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(Nee::BrokenNetEndpoint),
 
            }
 
        }
 
        endptlog!(
 
        log!(
 
            @ENDPT,
 
            logger,
 
            "Inbox bytes [{:x?}| {:x?}]",
 
            DenseDebugHex(&self.inbox[..before_len]),
 
            DenseDebugHex(&self.inbox[before_len..]),
 
        );
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        use bincode::config::Options;
 
        match Self::bincode_opts().deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                endptlog!(
 
                log!(
 
                    @ENDPT,
 
                    logger,
 
                    "Yielding msg. Inbox len {}-{}=={}: [{:?}]",
 
                    self.inbox.len() + msg_size,
 
                    msg_size,
 
                    self.inbox.len(),
 
                    DenseDebugHex(&self.inbox[..]),
 
                );
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
@@ -114,25 +116,25 @@ impl EndpointManager {
 
            ConnectError::NetEndpointSetupError(
 
                net_endpoint_store.endpoint_exts[trane.index]
 
                    .net_endpoint
 
                    .stream
 
                    .local_addr()
 
                    .unwrap(), // stream must already be connected
 
                trane.error,
 
            )
 
        }
 
        ///////////////////////////////////////////
 
        // try yield undelayed net message
 
        if let Some(tup) = self.undelayed_messages.pop() {
 
            endptlog!(logger, "RECV undelayed_msg {:?}", &tup);
 
            log!(@ENDPT, logger, "RECV undelayed_msg {:?}", &tup);
 
            return Ok(tup);
 
        }
 
        loop {
 
            // try recv from some polled undrained NET endpoint
 
            if let Some(tup) = self
 
                .try_recv_undrained_net(logger)
 
                .map_err(|trane| map_trane(trane, &self.net_endpoint_store))?
 
            {
 
                return Ok(tup);
 
            }
 
            // poll if time remains
 
            self.poll_and_populate(logger, deadline)?;
 
@@ -264,25 +266,25 @@ impl EndpointManager {
 
        }
 
    }
 
    fn try_recv_undrained_net(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<(usize, Msg)>, TryRecvAnyNetError> {
 
        while let Some(index) = self.net_endpoint_store.polled_undrained.pop() {
 
            let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
            if let Some(msg) = net_endpoint
 
                .try_recv(logger)
 
                .map_err(|error| TryRecvAnyNetError { error, index })?
 
            {
 
                endptlog!(logger, "RECV polled_undrained {:?}", &msg);
 
                log!(@ENDPT, logger, "RECV polled_undrained {:?}", &msg);
 
                if !net_endpoint.inbox.is_empty() {
 
                    // there may be another message waiting!
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                }
 
                return Ok(Some((index, msg)));
 
            }
 
        }
 
        Ok(None)
 
    }
 
    fn poll_and_populate(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
@@ -295,35 +297,37 @@ impl EndpointManager {
 
        } else {
 
            None
 
        };
 
        // Yes we do! Poll with remaining time as poll deadline
 
        self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?;
 
        for event in self.events.iter() {
 
            match TokenTarget::from(event.token()) {
 
                TokenTarget::Waker => {
 
                    // Can ignore. Residual event from endpoint manager setup procedure
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                    log!(
 
                        @ENDPT,
 
                        logger,
 
                        "RECV poll event {:?} for NET endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.net_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
                TokenTarget::UdpEndpoint { index } => {
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                    log!(
 
                        @ENDPT,
 
                        logger,
 
                        "RECV poll event {:?} for UDP endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.udp_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
            }
 
        }
 
        self.events.clear();
 
        Ok(())
 
    }
0 comments (0 inline, 0 general)