diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 09528f8b6221097aba72eccbf3283e326cdf00cc..461fc510cb5480691d3578b2a811defe5880ef73 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -260,7 +260,7 @@ pub(crate) fn default_send_data_message( let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); return Ok(CompScheduling::Immediate); } @@ -319,7 +319,7 @@ pub(crate) fn default_handle_incoming_data_message( let (peer_handle, message) = control.initiate_port_blocking(comp_ctx, port_handle); let peer = comp_ctx.get_peer(peer_handle); - peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + peer.handle.send_message_logged(sched_ctx, Message::Control(message), true); } return IncomingData::SlotFull(incoming_message) @@ -426,7 +426,7 @@ pub(crate) fn default_handle_received_data_message( comp_ctx.set_port_state(port_handle, PortState::Open); let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle); let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true); } return Ok(()); @@ -559,7 +559,7 @@ pub(crate) fn default_handle_sync_start( exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) { - sched_ctx.log("Component starting sync mode"); + sched_ctx.info("Component starting sync mode"); // If any messages are present for this sync round, set the appropriate flag // and notify the consensus handler of the present messages @@ -585,7 +585,7 @@ pub(crate) fn default_handle_sync_end( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) { - sched_ctx.log("Component ending sync mode (but possibly waiting for a solution)"); + sched_ctx.info("Component ending sync mode (but possibly waiting for a solution)"); debug_assert_eq!(exec_state.mode, CompMode::Sync); let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx); exec_state.mode = CompMode::SyncEnd; @@ -601,7 +601,7 @@ pub(crate) fn default_handle_start_exit( sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); - sched_ctx.log(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason)); + sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason)); exec_state.mode = CompMode::BusyExit; let exit_inside_sync = exec_state.exit_reason.is_in_sync(); @@ -628,7 +628,7 @@ pub(crate) fn default_handle_start_exit( let port_handle = comp_ctx.get_port_handle(port_id); let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx); let peer_info = comp_ctx.get_peer(peer); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true); } return CompScheduling::Immediate; // to check if we can shut down immediately @@ -643,10 +643,10 @@ pub(crate) fn default_handle_busy_exit( ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::BusyExit); if control.has_acks_remaining() { - sched_ctx.log("Component busy exiting, still has `Ack`s remaining"); + sched_ctx.info("Component busy exiting, still has `Ack`s remaining"); return CompScheduling::Sleep; } else { - sched_ctx.log("Component busy exiting, now shutting down"); + sched_ctx.info("Component busy exiting, now shutting down"); exec_state.mode = CompMode::Exit; return CompScheduling::Exit; } @@ -679,7 +679,7 @@ pub(crate) fn default_handle_sync_decision( ) ); - sched_ctx.log(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode)); + sched_ctx.info(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode)); consensus.notify_sync_decision(decision); if success { // We cannot get a success message if the component has encountered an @@ -747,7 +747,7 @@ fn default_handle_ack( AckAction::SendMessage(target_comp, message) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(target_comp); - handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + handle.send_message_logged(sched_ctx, Message::Control(message), true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); }, @@ -779,7 +779,7 @@ fn default_send_ack( sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx ) { let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{ + peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{ id: causer_of_ack_id, sender_comp_id: comp_ctx.id, target_port_id: None, @@ -808,7 +808,7 @@ fn default_handle_unblock_put( // Retrieve peer to send the message let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true); exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state. exec_state.mode_port = PortId::new_invalid(); diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 9a9ea3d99d1f8737734c2680d40beb4bbb6fd797..c6fa23e95c3e2a1622231c5f38d284391ae3dfc7 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -116,13 +116,13 @@ impl Component for ComponentTcpClient { } }, Message::Poll => { - sched_ctx.log("Received polling event"); + sched_ctx.info("Received polling event"); }, } } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { - sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); + sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); match self.exec_state.mode { CompMode::BlockedSelect => { diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 123fb16ec9f429712a6b766f9f0c5364fb7a09a4..c125b9dbe01c19ce69428411d420711a9c634d0d 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -244,7 +244,7 @@ impl Component for CompPDL { // sched_ctx.log(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle - target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks + target.send_message_logged(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(_should_remove.is_none()); return; @@ -274,7 +274,7 @@ impl Component for CompPDL { fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { use EvalContinuation as EC; - sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode)); + sched_ctx.info(&format!("Running component (mode: {:?})", self.exec_state.mode)); // Depending on the mode don't do anything at all, take some special // actions, or fall through and run the PDL code. @@ -335,7 +335,7 @@ impl Component for CompPDL { }, EC::Put(expr_id, port_id, value) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); - sched_ctx.log(&format!("Putting value {:?}", value)); + sched_ctx.info(&format!("Putting value {:?}", value)); // Send the message let target_port_id = port_id_from_eval(port_id); @@ -764,7 +764,7 @@ impl CompPDL { ); let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = created_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, message, true); + peer_info.handle.send_message_logged(sched_ctx, message, true); } } } else { diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index d493f00a294c4ad9d274aca600a7120f8a1126e3..b48f13b5b8e17d7f777e420aae1eeb5c34f8a324 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -61,7 +61,7 @@ impl Component for ComponentRandomU32 { } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { - sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); + sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { CompMode::BlockedGet | CompMode::BlockedSelect => { @@ -81,7 +81,7 @@ impl Component for ComponentRandomU32 { if self.num_sends >= self.max_num_sends { self.exec_state.set_as_start_exit(ExitReason::Termination); } else { - sched_ctx.log("Entering sync mode"); + sched_ctx.info("Entering sync mode"); self.did_perform_send = false; component::default_handle_sync_start( &mut self.exec_state, &mut [], sched_ctx, comp_ctx, &mut self.consensus @@ -94,7 +94,7 @@ impl Component for ComponentRandomU32 { // This component just sends a single message, then waits until // consensus has been reached if !self.did_perform_send { - sched_ctx.log("Sending random message"); + sched_ctx.info("Sending random message"); let mut random = self.generator.next_u32() - self.random_minimum; let random_delta = self.random_maximum - self.random_minimum; random %= random_delta; diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index b699a836149175797f2696598280be651d0389af..764963a2e79f5b41c55d91bbf8ffe7123ad18ae4 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -500,7 +500,7 @@ impl Consensus { sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::NotificationOfLeader, }; - peer.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true); + peer.handle.send_message_logged(sched_ctx, Message::Sync(message), true); } self.forward_partial_solution(sched_ctx, comp_ctx); @@ -512,7 +512,7 @@ impl Consensus { }; let peer_handle = comp_ctx.get_peer_handle(header.sending_id); let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true); + peer_info.handle.send_message_logged(sched_ctx, Message::Sync(message), true); } // else: exactly equal } @@ -624,7 +624,7 @@ impl Consensus { sync_header: self.create_sync_header(comp_ctx), content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure }, }); - handle.send_message(&sched_ctx.runtime, message, true); + handle.send_message_logged(sched_ctx, message, true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); } @@ -633,7 +633,7 @@ impl Consensus { fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) { debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader, // TODO: @NoDirectHandle let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id); - leader_info.send_message(&sched_ctx.runtime, message, true); + leader_info.send_message_logged(sched_ctx, message, true); let should_remove = leader_info.decrement_users(); if let Some(key) = should_remove { sched_ctx.runtime.destroy_component(key); diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs index 541a3536be2429a8d61424d696731668c305774c..2fc67d21192fb736ee7a12b58581c0f86bc42aaf 100644 --- a/src/runtime2/poll/mod.rs +++ b/src/runtime2/poll/mod.rs @@ -6,7 +6,7 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::collections::HashMap; use crate::runtime2::RtError; -use crate::runtime2::runtime::{CompHandle, RuntimeInner}; +use crate::runtime2::runtime::{CompHandle, RuntimeInner, LogLevel}; use crate::runtime2::store::queue_mpsc::*; @@ -139,11 +139,11 @@ pub struct PollingThread { poller: Arc, runtime: Arc, queue: QueueDynMpsc, - logging_enabled: bool, + log_level: LogLevel, } impl PollingThread { - pub(crate) fn new(runtime: Arc, logging_enabled: bool) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> { + pub(crate) fn new(runtime: Arc, log_level: LogLevel) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> { let poller = Poller::new() .map_err(|e| rt_error!("failed to create poller, because: {}", e))?; let poller = Arc::new(poller); @@ -154,9 +154,14 @@ impl PollingThread { poller: poller.clone(), runtime: runtime.clone(), queue, - logging_enabled, + log_level, }; - let thread_handle = thread::spawn(move || { thread_data.run() }); + let thread_handle = thread::Builder::new() + .name(String::from("poller")) + .spawn(move || { thread_data.run() }) + .map_err(|reason| + rt_error!("failed to start polling thread, because: {}", reason) + )?; let thread_handle = PollingThreadHandle{ queue: Some(queue_producers.producer()), @@ -238,7 +243,7 @@ impl PollingThread { } fn log(&self, message: &str) { - if self.logging_enabled { + if self.log_level >= LogLevel::Info { println!("[polling] {}", message); } } diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 9da208a48a371a354cd6ec2d7aece2ff4a5fdf21..55d417635e1d733cd1ece961abb76d58d41b0921 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -12,6 +12,13 @@ use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx}; use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer}; use super::scheduler::*; +#[derive(PartialOrd, PartialEq, Copy, Clone)] +pub enum LogLevel { + None, // no logging + Debug, // all logging (includes messages) + Info, // rough logging +} + // ----------------------------------------------------------------------------- // Component // ----------------------------------------------------------------------------- @@ -105,6 +112,12 @@ impl CompHandle { } } + #[inline] + pub(crate) fn send_message_logged(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) { + sched_ctx.debug(&format!("Sending message to comp:{} ... {:?}", self.id.0, message)); + self.send_message(&sched_ctx.runtime, message, try_wake_up); + } + pub(crate) fn id(&self) -> CompId { return self.id; } @@ -167,7 +180,7 @@ pub struct Runtime { impl Runtime { // TODO: debug_logging should be removed at some point - pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Result { + pub fn new(num_threads: u32, log_level: LogLevel, protocol_description: ProtocolDescription) -> Result { if num_threads == 0 { return Err(rt_error!("need at least one thread to create the runtime")); } @@ -179,7 +192,7 @@ impl Runtime { active_elements: AtomicU32::new(1), }); let (polling_handle, polling_clients) = rt_error_try!( - PollingThread::new(runtime_inner.clone(), debug_logging), + PollingThread::new(runtime_inner.clone(), log_level), "failed to build polling thread" ); @@ -188,11 +201,20 @@ impl Runtime { for thread_index in 0..num_threads { let mut scheduler = Scheduler::new( runtime_inner.clone(), polling_clients.client(), - thread_index, debug_logging + thread_index, log_level ); - let thread_handle = thread::spawn(move || { - scheduler.run(); - }); + + let thread_handle = thread::Builder::new() + .name(format!("scheduler:{}", thread_index)) + .spawn(move || { + scheduler.run(); + }) + .map_err(|reason| + rt_error!( + "failed to spawn scheduler thread {}, because: {}", + thread_index, reason + ) + )?; scheduler_threads.push(thread_handle); } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 806afa09d70cd7d809d1b06eac90437ad16928a9..f901b0056d7863cb7be09f9e2fe541017fdbad89 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -10,7 +10,7 @@ pub(crate) struct Scheduler { runtime: Arc, polling: PollingClient, scheduler_id: u32, - debug_logging: bool, + log_level: LogLevel, } pub(crate) struct SchedulerCtx<'a> { @@ -18,22 +18,29 @@ pub(crate) struct SchedulerCtx<'a> { pub polling: &'a PollingClient, pub id: u32, pub comp: u32, - pub logging_enabled: bool, + pub log_level: LogLevel, } impl<'a> SchedulerCtx<'a> { - pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, logging_enabled: bool) -> Self { + pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, log_level: LogLevel) -> Self { return Self { runtime, polling, id, comp: 0, - logging_enabled, + log_level, } } - pub(crate) fn log(&self, text: &str) { - if self.logging_enabled { + pub(crate) fn debug(&self, text: &str) { + // TODO: Probably not always use colour + if self.log_level >= LogLevel::Debug { + println!("[s:{:02}, c:{:03}] \x1b[0;34m{}\x1b[0m", self.id, self.comp, text); + } + } + + pub(crate) fn info(&self, text: &str) { + if self.log_level >= LogLevel::Info { println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text); } } @@ -47,12 +54,12 @@ impl<'a> SchedulerCtx<'a> { impl Scheduler { // public interface to thread - pub fn new(runtime: Arc, polling: PollingClient, scheduler_id: u32, debug_logging: bool) -> Self { - return Scheduler{ runtime, polling, scheduler_id, debug_logging } + pub fn new(runtime: Arc, polling: PollingClient, scheduler_id: u32, log_level: LogLevel) -> Self { + return Scheduler{ runtime, polling, scheduler_id, log_level } } pub fn run(&mut self) { - let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.debug_logging); + let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.log_level); 'run_loop: loop { // Wait until we have something to do (or need to quit)