diff --git a/src/protocol/parser/token_parsing.rs b/src/protocol/parser/token_parsing.rs index 4de8f5c7636b1fc002d1ebaf7473666a274234b6..0142174de5614bf9ea6e795127785fac1f2fece8 100644 --- a/src/protocol/parser/token_parsing.rs +++ b/src/protocol/parser/token_parsing.rs @@ -549,8 +549,6 @@ fn is_reserved_expression_keyword(text: &[u8]) -> bool { match text { KW_LET | KW_CAST | KW_LIT_TRUE | KW_LIT_FALSE | KW_LIT_NULL | - // TODO: Remove this once global namespace errors work @nocommit - // KW_FUNC_GET | KW_FUNC_PUT | KW_FUNC_FIRES | KW_FUNC_CREATE | KW_FUNC_ASSERT | KW_FUNC_LENGTH | KW_FUNC_PRINT => true, _ => false, } } diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 1c6031a36aad186ed90c28b778f05526317e74b3..cebb7b67d319eb1fb2106c9ac06857d5a575659f 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -197,6 +197,7 @@ pub enum Message { Data(DataMessage), Sync(SyncMessage), Control(ControlMessage), + Poll, } impl Message { @@ -208,6 +209,8 @@ impl Message { return v.target_port_id, Message::Sync(_) => return None, + Message::Poll => + return None, } } @@ -218,6 +221,7 @@ impl Message { Message::Control(v) => v.target_port_id = Some(port_id), Message::Sync(_) => unreachable!(), // should never be called for this message type + Message::Poll => unreachable!(), } } } diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 245ad152d4210f97b46e8fe5aa0d762d77df67a7..135b41875ecc54b9a89055b13c4eae4e40856b2c 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -243,7 +243,7 @@ pub(crate) fn default_handle_start_exit( let port_handle = comp_ctx.get_port_handle(port_id); let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx); let peer_info = comp_ctx.get_peer(peer); - peer_info.handle.send_message(sched_ctx, Message::Control(message), true); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } return CompScheduling::Immediate; // to check if we can shut down immediately @@ -289,7 +289,7 @@ fn default_handle_ack( AckAction::SendMessage(target_comp, message) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(target_comp); - handle.send_message(sched_ctx, Message::Control(message), true); + handle.send_message(&sched_ctx.runtime, Message::Control(message), true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); }, @@ -321,7 +321,7 @@ fn default_send_ack( sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx ) { let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{ + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{ id: causer_of_ack_id, sender_comp_id: comp_ctx.id, target_port_id: None, @@ -350,7 +350,7 @@ fn default_handle_unblock_put( // Retrieve peer to send the message let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Data(to_send), true); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true); exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state. exec_state.mode_port = PortId::new_invalid(); diff --git a/src/runtime2/component/component_ip.rs b/src/runtime2/component/component_ip.rs index 7848835ed25cb158db34797fed309145e0ec0141..459110cf79312dbd28e6873288e07476d2bd83fc 100644 --- a/src/runtime2/component/component_ip.rs +++ b/src/runtime2/component/component_ip.rs @@ -45,7 +45,8 @@ impl Component for ComponentRandomU32 { &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx ); - } + }, + Message::Poll => unreachable!(), } } @@ -101,7 +102,7 @@ impl Component for ComponentRandomU32 { let message = self.consensus.annotate_data_message(comp_ctx, port_info, value_group); let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Data(message), true); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(message), true); // Remain in sync mode, but after `did_perform_send` was // set to true. diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 5a6b6e686dd7b5c95a4527c96c7c8e52dc121dc0..c48b57a00d32408e51cf6dabd5eff7600a550d0c 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -236,7 +236,7 @@ impl Component for CompPDL { sched_ctx.log(&format!("handling message: {:#?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); - target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks + target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(_should_remove.is_none()); return; @@ -254,6 +254,9 @@ impl Component for CompPDL { }, Message::Sync(message) => { self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); + }, + Message::Poll => { + unreachable!(); // because we never register at the polling thread } } } @@ -502,7 +505,7 @@ impl CompPDL { let port_handle = comp_ctx.get_port_handle(port_id); let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx); let peer_info = comp_ctx.get_peer(peer); - peer_info.handle.send_message(sched_ctx, Message::Control(message), true); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } } @@ -515,7 +518,7 @@ impl CompPDL { let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); } /// Handles a message that came in through the public inbox. This function @@ -563,7 +566,7 @@ impl CompPDL { self.control.initiate_port_blocking(comp_ctx, port_handle); let peer = comp_ctx.get_peer(peer_handle); - peer.handle.send_message(sched_ctx, Message::Control(message), true); + peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } // But we still need to remember the message, so: @@ -598,7 +601,7 @@ impl CompPDL { comp_ctx.set_port_state(port_handle, PortState::Open); let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle); let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Control(message), true); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } } @@ -625,18 +628,19 @@ impl CompPDL { let mut opened_port_id_pairs = Vec::new(); let mut closed_port_id_pairs = Vec::new(); - // TODO: @Nocommit - let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; - let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; - let reservation = sched_ctx.runtime.start_create_pdl_component(); let mut created_ctx = CompCtx::new(&reservation); - println!( - "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", - self_proc.identifier.value.as_str(), creator_ctx.id, - other_proc.identifier.value.as_str(), reservation.id() - ); + let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; + let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; + + dbg_code!({ + sched_ctx.log(&format!( + "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", + self_proc.identifier.value.as_str(), creator_ctx.id, + other_proc.identifier.value.as_str(), reservation.id() + )); + }); // Take all the ports ID that are in the `args` (and currently belong to // the creator component) and translate them into new IDs that are @@ -798,7 +802,7 @@ impl CompPDL { ); let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = created_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, message, true); + peer_info.handle.send_message(&sched_ctx.runtime, message, true); } } } else { diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index b782ef0bf1d7ee20baaa589eb2b6aceaa3e1b2c6..72e7c475fc94b9f82e2f4de4da0d1cc4cc6cafb8 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -504,7 +504,7 @@ impl Consensus { sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::NotificationOfLeader, }; - peer.handle.send_message(sched_ctx, Message::Sync(message), true); + peer.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true); } self.forward_partial_solution(sched_ctx, comp_ctx); @@ -516,7 +516,7 @@ impl Consensus { }; let peer_handle = comp_ctx.get_peer_handle(header.sending_id); let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Sync(message), true); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true); } // else: exactly equal } @@ -622,7 +622,7 @@ impl Consensus { sync_header: self.create_sync_header(comp_ctx), content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure }, }); - handle.send_message(sched_ctx, message, true); + handle.send_message(&sched_ctx.runtime, message, true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); } @@ -631,7 +631,7 @@ impl Consensus { fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) { debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id); - leader_info.send_message(sched_ctx, message, true); + leader_info.send_message(&sched_ctx.runtime, message, true); let should_remove = leader_info.decrement_users(); if let Some(key) = should_remove { sched_ctx.runtime.destroy_component(key); diff --git a/src/runtime2/component/mod.rs b/src/runtime2/component/mod.rs index d5f0016f42f5f6714cb03641d63a8d7f1140acbb..71745de1d588b5c2479bd851d1c6e6ef99f05f24 100644 --- a/src/runtime2/component/mod.rs +++ b/src/runtime2/component/mod.rs @@ -16,7 +16,7 @@ use super::runtime::*; /// If the component is sleeping, then that flag will be atomically set to /// false. If we're the ones that made that happen then we add it to the work /// queue. -pub(crate) fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) { +pub(crate) fn wake_up_if_sleeping(runtime: &RuntimeInner, comp_id: CompId, handle: &CompHandle) { use std::sync::atomic::Ordering; let should_wake_up = handle.sleeping @@ -25,6 +25,6 @@ pub(crate) fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, han if should_wake_up { let comp_key = unsafe{ comp_id.upgrade() }; - sched_ctx.runtime.enqueue_work(comp_key); + runtime.enqueue_work(comp_key); } } \ No newline at end of file diff --git a/src/runtime2/error.rs b/src/runtime2/error.rs new file mode 100644 index 0000000000000000000000000000000000000000..eb765640db1b8aed849b581f5b402c03c20e6105 --- /dev/null +++ b/src/runtime2/error.rs @@ -0,0 +1,70 @@ +use std::fmt::{Write, Debug, Display, Formatter as FmtFormatter, Result as FmtResult}; + +/// Represents an unrecoverable runtime error that is reported to the user (for +/// debugging purposes). Basically a human-readable message with its source +/// location. The error is chainable. +pub struct RtError { + file: &'static str, + line: u32, + message: String, + cause: Option>, +} + +impl RtError { + pub(crate) fn new(file: &'static str, line: u32, message: String) -> RtError { + return RtError { + file, line, message, cause: None, + } + } + + pub(crate) fn wrap(self, file: &'static str, line: u32, message: String) -> RtError { + return RtError { + file, line, message, cause: Some(Box::new(self)) + } + } +} + +impl Display for RtError { + fn fmt(&self, f: &mut FmtFormatter<'_>) -> FmtResult { + let mut error = self; + loop { + write!(f, "[{}:{}] {}", self.file, self.line, self.message).unwrap(); + match &error.cause { + Some(cause) => { + writeln!(f, " ..."); + error = cause.as_ref() + }, + None => { + writeln!(f).unwrap(); + }, + } + } + } +} + +impl Debug for RtError { + fn fmt(&self, f: &mut FmtFormatter<'_>) -> FmtResult { + return (self as &dyn Display).fmt(f); + } +} + +macro_rules! rt_error { + ($fmt:expr) => { + $crate::runtime2::error::RtError::new(file!(), line!(), $fmt.to_string()) + }; + ($fmt:expr, $($args:expr),*) => { + $crate::runtime2::error::RtError::new(file!(), line!(), format!($fmt, $($args),*)) + }; +} + +macro_rules! rt_error_try { + ($prev:expr, $($fmt_and_args:expr),*) => { + { + let result = $prev; + match result { + Ok(result) => result, + Err(result) => return Err(result.wrap(file!(), line!(), format!($($fmt_and_args),*))), + } + } + } +} \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 4e213a77a4b315c88752ea843bfea22f4fecf53c..50fd6dc2813384d890c9d4fe875f745b6d872f48 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -1,3 +1,4 @@ +#[macro_use] mod error; mod store; mod runtime; mod component; @@ -8,6 +9,7 @@ mod stdlib; #[cfg(test)] mod tests; pub use runtime::Runtime; +pub(crate) use error::RtError; pub(crate) use scheduler::SchedulerCtx; pub(crate) use communication::{ PortId, PortKind, PortState, diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs index e9240695432169e620a27f5b5a3f36e0efbbf397..e6589f21270927a7bf73f25f0360dfac2995632d 100644 --- a/src/runtime2/poll/mod.rs +++ b/src/runtime2/poll/mod.rs @@ -1,6 +1,14 @@ use libc::{self, c_int}; -use std::{io, ptr, time}; +use std::{io, ptr, time, thread}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::collections::HashMap; + +use crate::runtime2::RtError; +use crate::runtime2::runtime::{CompHandle, RuntimeInner}; +use crate::runtime2::store::queue_mpsc::*; + pub(crate) type FileDescriptor = c_int; @@ -8,40 +16,27 @@ pub(crate) trait AsFileDescriptor { fn as_file_descriptor(&self) -> FileDescriptor; } -pub(crate) struct UserData(u64); -#[inline] -pub(crate) fn register_polling( - poller: &Poller, entity: F, user: UserData, read: bool, write: bool -) -> io::Result<()> { - let file_descriptor = entity.as_file_descriptor(); - return poller.register(file_descriptor, user, read, write); -} +#[derive(Copy, Clone)] +pub(crate) struct UserData(u64); -#[inline] -pub(crate) fn unregister_polling( - poller: &Poller, entity: F -) -> io::Result<()> { - let file_descriptor = entity.as_file_descriptor(); - return poller.unregister(file_descriptor); -} +// ----------------------------------------------------------------------------- +// Poller +// ----------------------------------------------------------------------------- #[cfg(unix)] pub(crate) struct Poller { handle: c_int, - events: Vec } // All of this is gleaned from the `mio` crate. #[cfg(unix)] impl Poller { - pub fn new(event_capacity: usize) -> io::Result { - assert!(event_capacity < i32::MAX as usize); // because of argument to `epoll_wait`. + pub fn new() -> io::Result { let handle = syscall_result(unsafe{ libc::epoll_create1(libc::EPOLL_CLOEXEC) })?; return Ok(Self{ handle, - events: Vec::with_capacity(event_capacity), }) } @@ -65,25 +60,32 @@ impl Poller { return Ok(()); } - pub fn wait(&mut self, timeout: time::Duration) -> io::Result<()> { + /// Performs `epoll_wait`, waiting for the provided timeout or until events + /// are reported. They are stored in the `events` variable (up to + /// `events.cap()` are reported, so ensure it is preallocated). + pub fn wait(&self, events: &mut Vec, timeout: time::Duration) -> io::Result<()> { // See `mio` for the reason. Works around a linux bug #[cfg(target_pointer_width = "32")] const MAX_TIMEOUT: u128 = 1789569; #[cfg(not(target_pointer_width = "32"))] const MAX_TIMEOUT: u128 = c_int::MAX as u128; - let mut timeout_millis = timeout.as_millis(); - if timeout_millis > MAX_TIMEOUT { - timeout_millis = -1; // effectively infinite - } + let timeout_millis = timeout.as_millis(); + let timeout_millis = if timeout_millis > MAX_TIMEOUT { + -1 // effectively infinite + } else { + timeout_millis as c_int + }; + debug_assert!(events.is_empty()); + debug_assert!(events.capacity() > 0 && events.capacity() < i32::MAX as usize); let num_events = syscall_result(unsafe{ - libc::epoll_wait(self.handle, self.events.as_mut(), self.events.capacity() as i32, timeout_millis) + libc::epoll_wait(self.handle, events.as_mut_ptr(), events.capacity() as i32, timeout_millis) })?; unsafe{ debug_assert!(num_events >= 0); - self.events.set_len(num_events as usize); + events.set_len(num_events as usize); } return Ok(()); @@ -102,6 +104,13 @@ impl Poller { } } +#[cfg(unix)] +impl Drop for Poller { + fn drop(&mut self) { + unsafe{ libc::close(self.handle); } + } +} + #[inline] fn syscall_result(result: c_int) -> io::Result { if result < 0 { @@ -114,4 +123,203 @@ fn syscall_result(result: c_int) -> io::Result { #[cfg(not(unix))] struct Poller { +} + +// ----------------------------------------------------------------------------- +// Polling Thread +// ----------------------------------------------------------------------------- + +enum PollCmd { + Register(CompHandle, UserData), + Unregister(FileDescriptor, UserData), + Shutdown, +} + +/// Represents the data needed to build interfaces to the polling thread (which +/// should happen first) and to create the polling thread itself. +pub(crate) struct PollingThreadBuilder { + poller: Arc, + generation_counter: Arc, + queue: QueueDynMpsc, + runtime: Arc, + logging_enabled: bool, +} + +impl PollingThreadBuilder { + pub(crate) fn new(runtime: Arc, logging_enabled: bool) -> Result { + let poller = Poller::new() + .map_err(|e| rt_error!("failed to create poller, because: {}", e))?; + + return Ok(PollingThreadBuilder { + poller: Arc::new(poller), + generation_counter: Arc::new(AtomicU32::new(0)), + queue: QueueDynMpsc::new(64), + runtime, + logging_enabled, + }) + } + + pub(crate) fn client(&self) -> PollingClient { + return PollingClient{ + poller: self.poller.clone(), + generation_counter: self.generation_counter.clone(), + queue: self.queue.producer(), + } + } + + pub(crate) fn into_thread(self) -> (PollingThread, PollingThreadDestroyer) { + let destroyer = self.queue.producer(); + + return ( + PollingThread{ + poller: self.poller, + runtime: self.runtime, + queue: self.queue, + logging_enabled: self.logging_enabled, + }, + PollingThreadDestroyer::new(destroyer) + ); + } +} + +pub(crate) struct PollingThread { + poller: Arc, + runtime: Arc, + queue: QueueDynMpsc, + logging_enabled: bool, +} + +impl PollingThread { + pub(crate) fn run(&mut self) { + use crate::runtime2::scheduler::SchedulerCtx; + use crate::runtime2::communication::Message; + + const NUM_EVENTS: usize = 256; + const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250); + + // @performance: Lot of improvements possible here, a HashMap is likely + // a horrible way to do this. + let mut events = Vec::with_capacity(NUM_EVENTS); + let mut lookup = HashMap::with_capacity(64); + self.log("Starting polling thread"); + + loop { + // Retrieve events first (because the PollingClient will first + // register at epoll, and then push a command into the queue). + self.poller.wait(&mut events, EPOLL_DURATION).unwrap(); + + // Then handle everything in the command queue. + while let Some(command) = self.queue.pop() { + match command { + PollCmd::Register(handle, user_data) => { + self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0)); + let key = Self::user_data_as_key(user_data); + debug_assert!(!lookup.contains_key(&key)); + lookup.insert(key, handle); + }, + PollCmd::Unregister(_file_descriptor, user_data) => { + let key = Self::user_data_as_key(user_data); + debug_assert!(lookup.contains_key(&key)); + let mut handle = lookup.remove(&key).unwrap(); + self.log(&format!("Unregistering component {:?} as {}", handle.id(), user_data.0)); + if let Some(key) = handle.decrement_users() { + self.runtime.destroy_component(key); + } + }, + PollCmd::Shutdown => { + // The contract is that all scheduler threads shutdown + // before the polling thread. This happens when all + // components are removed. + self.log("Received shutdown signal"); + debug_assert!(lookup.is_empty()); + return; + } + } + } + + // Now process all of the events. Because we might have had a + // `Register` command followed by an `Unregister` command (e.g. a + // component has died), we might get events that are not associated + // with an entry in the lookup. + for event in events.drain(..) { + let key = event.u64; + if let Some(handle) = lookup.get(&key) { + self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), event.events)); + handle.send_message(&self.runtime, Message::Poll, true); + } + } + } + } + + #[inline] + fn user_data_as_key(data: UserData) -> u64 { + return data.0; + } + + fn log(&self, message: &str) { + if self.logging_enabled { + println!("[polling] {}", message); + } + } +} + +// bit convoluted, but it works +pub(crate) struct PollingThreadDestroyer { + queue: Option>, +} + +impl PollingThreadDestroyer { + fn new(queue: QueueDynProducer) -> Self { + return Self{ queue: Some(queue) }; + } + + pub(crate) fn initiate_destruction(&mut self) { + self.queue.take().unwrap().push(PollCmd::Shutdown); + } +} + +impl Drop for PollingThreadDestroyer { + fn drop(&mut self) { + debug_assert!(self.queue.is_none()); + } +} + +pub(crate) struct PollTicket(FileDescriptor, u64); + +/// A structure that allows the owner to register components at the polling +/// thread. Because of assumptions in the communication queue all of these +/// clients should be dropped before stopping the polling thread. +pub(crate) struct PollingClient { + poller: Arc, + generation_counter: Arc, + queue: QueueDynProducer, +} + +impl PollingClient { + fn register(&self, entity: F, handle: CompHandle, read: bool, write: bool) -> Result { + let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed); + let user_data = user_data_for_component(handle.id().0, generation); + self.queue.push(PollCmd::Register(handle, user_data)); + + let file_descriptor = entity.as_file_descriptor(); + self.poller.register(file_descriptor, user_data, read, write) + .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?; + + return Ok(PollTicket(file_descriptor, user_data.0)); + } + + fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> { + let file_descriptor = ticket.0; + let user_data = UserData(ticket.1); + self.queue.push(PollCmd::Unregister(file_descriptor, user_data)); + self.poller.unregister(file_descriptor) + .map_err(|e| rt_error!("failed to unregister polling, because: {}", e))?; + + return Ok(()); + } +} + +#[inline] +fn user_data_for_component(component_id: u32, generation: u32) -> UserData { + return UserData((generation as u64) << 32 | (component_id as u64)); } \ No newline at end of file diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index d3e432ed679759a8d87bbe6cbcff6433e4a8660c..fb8df772666e3c76b344d84977f4ceeeef71cf2e 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -1,8 +1,11 @@ use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; +use std::thread; use std::collections::VecDeque; use crate::protocol::*; +use crate::runtime2::poll::{PollingThreadBuilder, PollingThreadDestroyer}; +use crate::runtime2::RtError; use super::communication::Message; use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx}; @@ -25,7 +28,7 @@ impl CompKey { } } -/// Generational ID of a component +/// Generational ID of a component. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct CompId(pub u32); @@ -80,7 +83,7 @@ pub(crate) struct CompPublic { /// code to make sure this actually happens. pub(crate) struct CompHandle { target: *const CompPublic, - id: CompId, // TODO: @Remove after debugging + id: CompId, #[cfg(debug_assertions)] decremented: bool, } @@ -95,14 +98,17 @@ impl CompHandle { return handle; } - pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) { - sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message)); + pub(crate) fn send_message(&self, runtime: &RuntimeInner, message: Message, try_wake_up: bool) { self.inbox.push(message); if try_wake_up { - wake_up_if_sleeping(sched_ctx, self.id, self); + wake_up_if_sleeping(runtime, self.id, self); } } + pub(crate) fn id(&self) -> CompId { + return self.id; + } + fn increment_users(&self) { let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel); debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed @@ -155,13 +161,17 @@ impl Drop for CompHandle { pub struct Runtime { pub(crate) inner: Arc, - threads: Vec>, + scheduler_threads: Vec>, + polling_destroyer: PollingThreadDestroyer, + polling_thread: Option>, } impl Runtime { // TODO: debug_logging should be removed at some point - pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Runtime { - assert!(num_threads > 0, "need a thread to perform work"); + pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Result { + if num_threads == 0 { + return Err(rt_error!("need at least one thread to create the runtime")); + } let runtime_inner = Arc::new(RuntimeInner { protocol: protocol_description, components: ComponentStore::new(128), @@ -169,21 +179,36 @@ impl Runtime { work_condvar: Condvar::new(), active_elements: AtomicU32::new(1), }); - let mut runtime = Runtime { - inner: runtime_inner, - threads: Vec::with_capacity(num_threads as usize), - }; + let polling_builder = rt_error_try!( + PollingThreadBuilder::new(runtime_inner.clone(), debug_logging), + "failed to build polling thread" + ); + + let mut scheduler_threads = Vec::with_capacity(num_threads as usize); for thread_index in 0..num_threads { - let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index, debug_logging); - let thread_handle = std::thread::spawn(move || { + let mut scheduler = Scheduler::new( + runtime_inner.clone(), polling_builder.client(), + thread_index, debug_logging + ); + let thread_handle = thread::spawn(move || { scheduler.run(); }); - runtime.threads.push(thread_handle); + scheduler_threads.push(thread_handle); } - return runtime; + let (mut poller, polling_destroyer) = polling_builder.into_thread(); + let polling_thread = thread::spawn(move || { + poller.run(); + }); + + return Ok(Runtime{ + inner: runtime_inner, + scheduler_threads, + polling_destroyer, + polling_thread: Some(polling_thread), + }); } pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> { @@ -205,9 +230,12 @@ impl Runtime { impl Drop for Runtime { fn drop(&mut self) { self.inner.decrement_active_components(); - for handle in self.threads.drain(..) { + for handle in self.scheduler_threads.drain(..) { handle.join().expect("join scheduler thread"); } + + self.polling_destroyer.initiate_destruction(); + self.polling_thread.take().unwrap().join().expect("join polling thread"); } } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 87bdb80e316c95be1016a1347b9f6c5aaaaba5fa..708d4c9ec6571ce57cc104abb50d84ae80f71920 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,5 +1,6 @@ use std::sync::Arc; use std::sync::atomic::Ordering; +use crate::runtime2::poll::PollingClient; use super::component::*; use super::runtime::*; @@ -7,21 +8,24 @@ use super::runtime::*; /// Data associated with a scheduler thread pub(crate) struct Scheduler { runtime: Arc, + polling: PollingClient, scheduler_id: u32, debug_logging: bool, } pub(crate) struct SchedulerCtx<'a> { pub runtime: &'a RuntimeInner, + pub polling: &'a PollingClient, pub id: u32, pub comp: u32, pub logging_enabled: bool, } impl<'a> SchedulerCtx<'a> { - pub fn new(runtime: &'a RuntimeInner, id: u32, logging_enabled: bool) -> Self { + pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, logging_enabled: bool) -> Self { return Self { runtime, + polling, id, comp: 0, logging_enabled, @@ -38,12 +42,12 @@ impl<'a> SchedulerCtx<'a> { impl Scheduler { // public interface to thread - pub fn new(runtime: Arc, scheduler_id: u32, debug_logging: bool) -> Self { - return Scheduler{ runtime, scheduler_id, debug_logging } + pub fn new(runtime: Arc, polling: PollingClient, scheduler_id: u32, debug_logging: bool) -> Self { + return Scheduler{ runtime, polling, scheduler_id, debug_logging } } pub fn run(&mut self) { - let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id, self.debug_logging); + let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.debug_logging); 'run_loop: loop { // Wait until we have something to do (or need to quit) diff --git a/src/runtime2/stdlib/internet.rs b/src/runtime2/stdlib/internet.rs index 3071b975015dea74e0ec67b71b91528cab76089b..7e5c0518b39a19c71977791222bff3dd54d9dbef 100644 --- a/src/runtime2/stdlib/internet.rs +++ b/src/runtime2/stdlib/internet.rs @@ -206,20 +206,6 @@ impl AsRawFileDescriptor for SocketTcpClient { } } -impl event::Source for T { - fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> { - registry.selector().register() - } - - fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> { - todo!() - } - - fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> { - todo!() - } -} - /// Performs the `socket` and `bind` calls. fn create_and_bind_socket(socket_type: libc::c_int, protocol: libc::c_int, ip: IpAddr, port: u16) -> Result { let family = socket_family_from_ip(ip); diff --git a/src/runtime2/store/queue_mpsc.rs b/src/runtime2/store/queue_mpsc.rs index cbf75e8379809ec83a16aad961922690a74f3d29..fa7b5a388a5bd855f597fc60bf94f6aea3d67a05 100644 --- a/src/runtime2/store/queue_mpsc.rs +++ b/src/runtime2/store/queue_mpsc.rs @@ -149,11 +149,13 @@ impl QueueDynProducer { unsafe { // If you only knew the power of the dark side! Obi-Wan never told // you what happened to your father! - let queue: *const _ = std::mem::transmute(consumer.inner.as_ref()); + let queue = consumer.inner.as_ref() as *const _; return Self{ queue }; } } + + pub fn push(&self, value: T) { let queue = unsafe{ &*self.queue }; diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index bd77094a301b7c01383b7da4faa7cb00425ddec9..e9b3ec0cf3d9df25958d38610dc9bbc125527af9 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -24,7 +24,7 @@ fn test_component_creation() { auto b = 5 + a; } ").expect("compilation"); - let rt = Runtime::new(1, true, pd); + let rt = Runtime::new(1, true, pd).unwrap(); for _i in 0..20 { create_component(&rt, "", "nothing_at_all", no_args()); @@ -81,7 +81,7 @@ fn test_component_communication() { new sender(o_mrmm, 5, 5); new receiver(i_mrmm, 5, 5); }").expect("compilation"); - let rt = Runtime::new(3, true, pd); + let rt = Runtime::new(3, true, pd).unwrap(); create_component(&rt, "", "constructor", no_args()); } @@ -130,7 +130,7 @@ fn test_intermediate_messenger() { new constructor_template(); } ").expect("compilation"); - let rt = Runtime::new(3, true, pd); + let rt = Runtime::new(3, true, pd).unwrap(); create_component(&rt, "", "constructor", no_args()); } @@ -172,7 +172,7 @@ fn test_simple_select() { } composite constructor() { - auto num_sends = 15; + auto num_sends = 1; channel tx_a -> rx_a; channel tx_b -> rx_b; new sender(tx_a, num_sends); @@ -180,7 +180,7 @@ fn test_simple_select() { new sender(tx_b, num_sends); } ").expect("compilation"); - let rt = Runtime::new(3, false, pd); + let rt = Runtime::new(3, true, pd).unwrap(); create_component(&rt, "", "constructor", no_args()); } @@ -202,7 +202,7 @@ fn test_unguarded_select() { } } ").expect("compilation"); - let rt = Runtime::new(3, false, pd); + let rt = Runtime::new(3, false, pd).unwrap(); create_component(&rt, "", "constructor_outside_select", no_args()); create_component(&rt, "", "constructor_inside_select", no_args()); } @@ -218,7 +218,7 @@ fn test_empty_select() { } } ").expect("compilation"); - let rt = Runtime::new(3, false, pd); + let rt = Runtime::new(3, false, pd).unwrap(); create_component(&rt, "", "constructor", no_args()); } @@ -244,6 +244,6 @@ fn test_random_u32_temporary_thingo() { new random_taker(rx, num_values); } ").expect("compilation"); - let rt = Runtime::new(1, true, pd); + let rt = Runtime::new(1, true, pd).unwrap(); create_component(&rt, "", "constructor", no_args()); } \ No newline at end of file