Changeset - ac804a4a3d70
[Not reviewed]
0 8 0
mh - 3 years ago 2022-04-26 10:54:14
contact@maxhenger.nl
More granularity in debug logging
8 files changed with 81 insertions and 47 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -260,7 +260,7 @@ pub(crate) fn default_send_data_message(
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 

	
 
        return Ok(CompScheduling::Immediate);
 
    }
 
@@ -319,7 +319,7 @@ pub(crate) fn default_handle_incoming_data_message(
 
            let (peer_handle, message) =
 
                control.initiate_port_blocking(comp_ctx, port_handle);
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
            peer.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        return IncomingData::SlotFull(incoming_message)
 
@@ -426,7 +426,7 @@ pub(crate) fn default_handle_received_data_message(
 
        comp_ctx.set_port_state(port_handle, PortState::Open);
 
        let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return Ok(());
 
@@ -559,7 +559,7 @@ pub(crate) fn default_handle_sync_start(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) {
 
    sched_ctx.log("Component starting sync mode");
 
    sched_ctx.info("Component starting sync mode");
 

	
 
    // If any messages are present for this sync round, set the appropriate flag
 
    // and notify the consensus handler of the present messages
 
@@ -585,7 +585,7 @@ pub(crate) fn default_handle_sync_end(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    consensus: &mut Consensus
 
) {
 
    sched_ctx.log("Component ending sync mode (but possibly waiting for a solution)");
 
    sched_ctx.info("Component ending sync mode (but possibly waiting for a solution)");
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 
    let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
    exec_state.mode = CompMode::SyncEnd;
 
@@ -601,7 +601,7 @@ pub(crate) fn default_handle_start_exit(
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    sched_ctx.log(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason));
 
    sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason));
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
 
@@ -628,7 +628,7 @@ pub(crate) fn default_handle_start_exit(
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
@@ -643,10 +643,10 @@ pub(crate) fn default_handle_busy_exit(
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
 
    if control.has_acks_remaining() {
 
        sched_ctx.log("Component busy exiting, still has `Ack`s remaining");
 
        sched_ctx.info("Component busy exiting, still has `Ack`s remaining");
 
        return CompScheduling::Sleep;
 
    } else {
 
        sched_ctx.log("Component busy exiting, now shutting down");
 
        sched_ctx.info("Component busy exiting, now shutting down");
 
        exec_state.mode = CompMode::Exit;
 
        return CompScheduling::Exit;
 
    }
 
@@ -679,7 +679,7 @@ pub(crate) fn default_handle_sync_decision(
 
        )
 
    );
 

	
 
    sched_ctx.log(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode));
 
    sched_ctx.info(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode));
 
    consensus.notify_sync_decision(decision);
 
    if success {
 
        // We cannot get a success message if the component has encountered an
 
@@ -747,7 +747,7 @@ fn default_handle_ack(
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
                handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
@@ -779,7 +779,7 @@ fn default_send_ack(
 
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
 
) {
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
        id: causer_of_ack_id,
 
        sender_comp_id: comp_ctx.id,
 
        target_port_id: None,
 
@@ -808,7 +808,7 @@ fn default_handle_unblock_put(
 
        // Retrieve peer to send the message
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -116,13 +116,13 @@ impl Component for ComponentTcpClient {
 
                }
 
            },
 
            Message::Poll => {
 
                sched_ctx.log("Received polling event");
 
                sched_ctx.info("Received polling event");
 
            },
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state));
 
        sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedSelect => {
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -244,7 +244,7 @@ impl Component for CompPDL {
 
        // sched_ctx.log(&format!("handling message: {:?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle
 
            target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            target.send_message_logged(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
@@ -274,7 +274,7 @@ impl Component for CompPDL {
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode));
 
        sched_ctx.info(&format!("Running component (mode: {:?})", self.exec_state.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
@@ -335,7 +335,7 @@ impl Component for CompPDL {
 
            },
 
            EC::Put(expr_id, port_id, value) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 
                sched_ctx.info(&format!("Putting value {:?}", value));
 

	
 
                // Send the message
 
                let target_port_id = port_id_from_eval(port_id);
 
@@ -764,7 +764,7 @@ impl CompPDL {
 
                    );
 
                    let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                    let peer_info = created_ctx.get_peer(peer_handle);
 
                    peer_info.handle.send_message(&sched_ctx.runtime, message, true);
 
                    peer_info.handle.send_message_logged(sched_ctx, message, true);
 
                }
 
            }
 
        } else {
src/runtime2/component/component_random.rs
Show inline comments
 
@@ -61,7 +61,7 @@ impl Component for ComponentRandomU32 {
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 
        sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect => {
 
@@ -81,7 +81,7 @@ impl Component for ComponentRandomU32 {
 
                if self.num_sends >= self.max_num_sends {
 
                    self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                } else {
 
                    sched_ctx.log("Entering sync mode");
 
                    sched_ctx.info("Entering sync mode");
 
                    self.did_perform_send = false;
 
                    component::default_handle_sync_start(
 
                        &mut self.exec_state, &mut [], sched_ctx, comp_ctx, &mut self.consensus
 
@@ -94,7 +94,7 @@ impl Component for ComponentRandomU32 {
 
                // This component just sends a single message, then waits until
 
                // consensus has been reached
 
                if !self.did_perform_send {
 
                    sched_ctx.log("Sending random message");
 
                    sched_ctx.info("Sending random message");
 
                    let mut random = self.generator.next_u32() - self.random_minimum;
 
                    let random_delta = self.random_maximum - self.random_minimum;
 
                    random %= random_delta;
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -500,7 +500,7 @@ impl Consensus {
 
                    sync_header: self.create_sync_header(comp_ctx),
 
                    content: SyncMessageContent::NotificationOfLeader,
 
                };
 
                peer.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true);
 
                peer.handle.send_message_logged(sched_ctx, Message::Sync(message), true);
 
            }
 

	
 
            self.forward_partial_solution(sched_ctx, comp_ctx);
 
@@ -512,7 +512,7 @@ impl Consensus {
 
            };
 
            let peer_handle = comp_ctx.get_peer_handle(header.sending_id);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true);
 
            peer_info.handle.send_message_logged(sched_ctx, Message::Sync(message), true);
 
        } // else: exactly equal
 
    }
 

	
 
@@ -624,7 +624,7 @@ impl Consensus {
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure },
 
            });
 
            handle.send_message(&sched_ctx.runtime, message, true);
 
            handle.send_message_logged(sched_ctx, message, true);
 
            let _should_remove = handle.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
        }
 
@@ -633,7 +633,7 @@ impl Consensus {
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader, // TODO: @NoDirectHandle
 
        let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.send_message(&sched_ctx.runtime, message, true);
 
        leader_info.send_message_logged(sched_ctx, message, true);
 
        let should_remove = leader_info.decrement_users();
 
        if let Some(key) = should_remove {
 
            sched_ctx.runtime.destroy_component(key);
src/runtime2/poll/mod.rs
Show inline comments
 
@@ -6,7 +6,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
 
use std::collections::HashMap;
 

	
 
use crate::runtime2::RtError;
 
use crate::runtime2::runtime::{CompHandle, RuntimeInner};
 
use crate::runtime2::runtime::{CompHandle, RuntimeInner, LogLevel};
 
use crate::runtime2::store::queue_mpsc::*;
 

	
 

	
 
@@ -139,11 +139,11 @@ pub struct PollingThread {
 
    poller: Arc<Poller>,
 
    runtime: Arc<RuntimeInner>,
 
    queue: QueueDynMpsc<PollCmd>,
 
    logging_enabled: bool,
 
    log_level: LogLevel,
 
}
 

	
 
impl PollingThread {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>, logging_enabled: bool) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>, log_level: LogLevel) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> {
 
        let poller = Poller::new()
 
            .map_err(|e| rt_error!("failed to create poller, because: {}", e))?;
 
        let poller = Arc::new(poller);
 
@@ -154,9 +154,14 @@ impl PollingThread {
 
            poller: poller.clone(),
 
            runtime: runtime.clone(),
 
            queue,
 
            logging_enabled,
 
            log_level,
 
        };
 
        let thread_handle = thread::spawn(move || { thread_data.run() });
 
        let thread_handle = thread::Builder::new()
 
            .name(String::from("poller"))
 
            .spawn(move || { thread_data.run() })
 
            .map_err(|reason|
 
                rt_error!("failed to start polling thread, because: {}", reason)
 
            )?;
 

	
 
        let thread_handle = PollingThreadHandle{
 
            queue: Some(queue_producers.producer()),
 
@@ -238,7 +243,7 @@ impl PollingThread {
 
    }
 

	
 
    fn log(&self, message: &str) {
 
        if self.logging_enabled {
 
        if self.log_level >= LogLevel::Info {
 
            println!("[polling] {}", message);
 
        }
 
    }
src/runtime2/runtime.rs
Show inline comments
 
@@ -12,6 +12,13 @@ use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx};
 
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
 
use super::scheduler::*;
 

	
 
#[derive(PartialOrd, PartialEq, Copy, Clone)]
 
pub enum LogLevel {
 
    None, // no logging
 
    Debug, // all logging (includes messages)
 
    Info, // rough logging
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Component
 
// -----------------------------------------------------------------------------
 
@@ -105,6 +112,12 @@ impl CompHandle {
 
        }
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn send_message_logged(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) {
 
        sched_ctx.debug(&format!("Sending message to comp:{} ... {:?}", self.id.0, message));
 
        self.send_message(&sched_ctx.runtime, message, try_wake_up);
 
    }
 

	
 
    pub(crate) fn id(&self) -> CompId {
 
        return self.id;
 
    }
 
@@ -167,7 +180,7 @@ pub struct Runtime {
 

	
 
impl Runtime {
 
    // TODO: debug_logging should be removed at some point
 
    pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Result<Runtime, RtError> {
 
    pub fn new(num_threads: u32, log_level: LogLevel, protocol_description: ProtocolDescription) -> Result<Runtime, RtError> {
 
        if num_threads == 0 {
 
            return Err(rt_error!("need at least one thread to create the runtime"));
 
        }
 
@@ -179,7 +192,7 @@ impl Runtime {
 
            active_elements: AtomicU32::new(1),
 
        });
 
        let (polling_handle, polling_clients) = rt_error_try!(
 
            PollingThread::new(runtime_inner.clone(), debug_logging),
 
            PollingThread::new(runtime_inner.clone(), log_level),
 
            "failed to build polling thread"
 
        );
 

	
 
@@ -188,11 +201,20 @@ impl Runtime {
 
        for thread_index in 0..num_threads {
 
            let mut scheduler = Scheduler::new(
 
                runtime_inner.clone(), polling_clients.client(),
 
                thread_index, debug_logging
 
                thread_index, log_level
 
            );
 
            let thread_handle = thread::spawn(move || {
 
                scheduler.run();
 
            });
 

	
 
            let thread_handle = thread::Builder::new()
 
                .name(format!("scheduler:{}", thread_index))
 
                .spawn(move || {
 
                    scheduler.run();
 
                })
 
                .map_err(|reason|
 
                    rt_error!(
 
                        "failed to spawn scheduler thread {}, because: {}",
 
                        thread_index, reason
 
                    )
 
                )?;
 

	
 
            scheduler_threads.push(thread_handle);
 
        }
src/runtime2/scheduler.rs
Show inline comments
 
@@ -10,7 +10,7 @@ pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    polling: PollingClient,
 
    scheduler_id: u32,
 
    debug_logging: bool,
 
    log_level: LogLevel,
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
@@ -18,22 +18,29 @@ pub(crate) struct SchedulerCtx<'a> {
 
    pub polling: &'a PollingClient,
 
    pub id: u32,
 
    pub comp: u32,
 
    pub logging_enabled: bool,
 
    pub log_level: LogLevel,
 
}
 

	
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, logging_enabled: bool) -> Self {
 
    pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, log_level: LogLevel) -> Self {
 
        return Self {
 
            runtime,
 
            polling,
 
            id,
 
            comp: 0,
 
            logging_enabled,
 
            log_level,
 
        }
 
    }
 

	
 
    pub(crate) fn log(&self, text: &str) {
 
        if self.logging_enabled {
 
    pub(crate) fn debug(&self, text: &str) {
 
        // TODO: Probably not always use colour
 
        if self.log_level >= LogLevel::Debug {
 
            println!("[s:{:02}, c:{:03}] \x1b[0;34m{}\x1b[0m", self.id, self.comp, text);
 
        }
 
    }
 

	
 
    pub(crate) fn info(&self, text: &str) {
 
        if self.log_level >= LogLevel::Info {
 
            println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
 
        }
 
    }
 
@@ -47,12 +54,12 @@ impl<'a> SchedulerCtx<'a> {
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, debug_logging: bool) -> Self {
 
        return Scheduler{ runtime, polling, scheduler_id, debug_logging }
 
    pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, log_level: LogLevel) -> Self {
 
        return Scheduler{ runtime, polling, scheduler_id, log_level }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.debug_logging);
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.log_level);
 

	
 
        'run_loop: loop {
 
            // Wait until we have something to do (or need to quit)
0 comments (0 inline, 0 general)