Changeset - a7abc31e051f
[Not reviewed]
0 2 0
Christopher Esterhuyse - 5 years ago 2020-10-06 17:18:52
christopher.esterhuyse@gmail.com
cleaned up some redundant logging macros
2 files changed with 0 insertions and 30 deletions:
0 comments (0 inline, 0 general)
src/macros.rs
Show inline comments
 
/*
 
Change the definition of these macros to control the logging level statically
 
*/
 

	
 
macro_rules! log {
 
    (@BENCH, $logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
    (@MARK, $logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
    ($logger:expr, $($arg:tt)*) => {{
src/runtime/communication.rs
Show inline comments
 
@@ -245,21 +245,12 @@ impl Connector {
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundEndedNative>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 

	
 
        log!(@MARK, cu.logger(), "sync start {}", comm.round_index);
 
        log!(
 
            cu.logger(),
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 
        log!(@BENCH, cu.logger(), "");
 

	
 
        // Create separate storages for ports and components stored in `cu`,
 
        // while kicking off the branching of components until the set of
 
        // components entering their synchronous block is finalized in `branching_proto_components`.
 
        // This is the last time cu's components and ports are accessed until the round is decided.
 
        let mut ips = cu.ips.clone();
 
        let mut branching_proto_components =
 
@@ -305,13 +296,12 @@ impl Connector {
 
        }
 
        log!(
 
            cu.logger(),
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 
        log!(@BENCH, cu.logger(), "");
 

	
 
        // Create temporary structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            ips, // already used previously, now moved into RoundCtx
 
            solution_storage: {
 
                let subtree_id_iter = {
 
@@ -339,13 +329,12 @@ impl Connector {
 
            },
 
            spec_var_stream: cu.ips.id_manager.new_spec_var_stream(),
 
            payload_inbox: Default::default(), // buffer for in-memory payloads to be handled
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.logger(), "Round context structure initialized");
 
        log!(@BENCH, cu.logger(), "");
 

	
 
        // Prepare the branching native component, involving the conversion
 
        // of its synchronous batches (user provided) into speculative branches eagerly.
 
        // As a side effect, send all PUTs with the appropriate predicates.
 
        // Afterwards, each native component's speculative branch finds a local
 
        // solution the moment it's received all the messages it's awaiting.
 
@@ -431,23 +420,20 @@ impl Connector {
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round
 
        // until a distributed decision is reached!
 
        log!(cu.logger(), "Searching for decision...");
 
        log!(@BENCH, cu.logger(), "");
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(@MARK, cu.logger(), "got decision!");
 
        log!(cu.logger(), "Committing to decision {:?}!", &decision);
 
        log!(@BENCH, cu.logger(), "");
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger(), &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
@@ -457,13 +443,12 @@ impl Connector {
 
        log!(
 
            cu.logger(),
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        log!(@MARK, cu.logger(), "forwarding decision!");
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // untouched port/component fields of `cu` are NOT overwritten.
 
@@ -490,13 +475,12 @@ impl Connector {
 
                // consume native
 
                let round_ok = branching_native.collapse_with(&mut *cu.logger(), &predicate);
 
                Ok(Some(round_ok))
 
            }
 
        };
 
        log!(cu.logger(), "Sync round ending! Cleaning up");
 
        log!(@BENCH, cu.logger(), "");
 
        ret
 
    }
 

	
 
    // Once the synchronous round has been started, this procedure
 
    // routs and handles payloads, receives control messages from neighboring connectors,
 
    // checks for timeout, and aggregates solutions until a distributed decision is reached.
 
@@ -508,13 +492,12 @@ impl Connector {
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        // The round is in progress, and now its just a matter of arriving at a decision.
 
        log!(@MARK, cu.logger(), "decide start");
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            // An unsatisfiable native is the easiest way to detect failure
 
            log!(cu.logger(), "Native starts with no branches! Failure!");
 
            match comm.neighborhood.parent {
 
                Some(parent) => {
 
@@ -576,13 +559,12 @@ impl Connector {
 
        log!(cu.logger(), "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // handle all buffered messages, sending them through endpoints / feeding them to components
 
            log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_pop() {
 
                log!(@MARK, cu.logger(), "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg);
 
                let getter_info = rctx.ips.port_info.map.get(&getter).unwrap();
 
                let cid = getter_info.owner; // the id of the component owning `getter` port
 
                assert_eq!(Getter, getter_info.polarity); // sanity check
 
                log!(
 
                    cu.logger(),
 
                    "Routing msg {:?} to {:?} via {:?}",
 
@@ -600,13 +582,12 @@ impl Connector {
 
                        // UDP mediator messages are buffered until the end of the round,
 
                        // because they are still speculative
 
                        udp_endpoint_ext.outgoing_payloads.insert(predicate, payload);
 
                    }
 
                    Route::NetEndpoint { index } => {
 
                        // this is a message sent over the network as a control message
 
                        log!(@MARK, cu.logger(), "sending payload");
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        // actually send the message now
 
                        comm.endpoint_manager.send_to_comms(index, &msg)?;
 
@@ -666,13 +647,12 @@ impl Connector {
 
            }
 
            // payload buffer is empty.
 
            // check if we have a solution yet
 
            log!(cu.logger(), "Check if we have any local decisions...");
 
            for solution in rctx.solution_storage.iter_new_local_make_old() {
 
                log!(cu.logger(), "New local decision with solution {:?}...", &solution);
 
                log!(@MARK, cu.logger(), "local solution");
 
                match comm.neighborhood.parent {
 
                    Some(parent) => {
 
                        // Always forward connector-local solutions to my parent
 
                        // AS they are moved from new->old in solution storage.
 
                        log!(cu.logger(), "Forwarding to my parent {:?}", parent);
 
                        let suggestion = Decision::Success(solution);
0 comments (0 inline, 0 general)