diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs index e6589f21270927a7bf73f25f0360dfac2995632d..9337ff6f957b0d4e6e92f0b79505b274be530583 100644 --- a/src/runtime2/poll/mod.rs +++ b/src/runtime2/poll/mod.rs @@ -122,7 +122,7 @@ fn syscall_result(result: c_int) -> io::Result { #[cfg(not(unix))] struct Poller { - + // Not implemented for OS's other than unix } // ----------------------------------------------------------------------------- @@ -135,61 +135,42 @@ enum PollCmd { Shutdown, } -/// Represents the data needed to build interfaces to the polling thread (which -/// should happen first) and to create the polling thread itself. -pub(crate) struct PollingThreadBuilder { +pub struct PollingThread { poller: Arc, - generation_counter: Arc, - queue: QueueDynMpsc, runtime: Arc, + queue: QueueDynMpsc, logging_enabled: bool, } -impl PollingThreadBuilder { - pub(crate) fn new(runtime: Arc, logging_enabled: bool) -> Result { +impl PollingThread { + pub(crate) fn new(runtime: Arc, logging_enabled: bool) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> { let poller = Poller::new() .map_err(|e| rt_error!("failed to create poller, because: {}", e))?; - - return Ok(PollingThreadBuilder { - poller: Arc::new(poller), - generation_counter: Arc::new(AtomicU32::new(0)), - queue: QueueDynMpsc::new(64), - runtime, + let poller = Arc::new(poller); + let queue = QueueDynMpsc::new(64); + let queue_producers = queue.producer_factory(); + + let mut thread_data = PollingThread{ + poller: poller.clone(), + runtime: runtime.clone(), + queue, logging_enabled, - }) - } + }; + let thread_handle = thread::spawn(move || { thread_data.run() }); - pub(crate) fn client(&self) -> PollingClient { - return PollingClient{ - poller: self.poller.clone(), - generation_counter: self.generation_counter.clone(), - queue: self.queue.producer(), - } - } + let thread_handle = PollingThreadHandle{ + queue: Some(queue_producers.producer()), + handle: Some(thread_handle), + }; + let client_factory = PollingClientFactory{ + poller, + generation_counter: Arc::new(AtomicU32::new(0)), + queue_factory: queue_producers, + }; - pub(crate) fn into_thread(self) -> (PollingThread, PollingThreadDestroyer) { - let destroyer = self.queue.producer(); - - return ( - PollingThread{ - poller: self.poller, - runtime: self.runtime, - queue: self.queue, - logging_enabled: self.logging_enabled, - }, - PollingThreadDestroyer::new(destroyer) - ); + return Ok((thread_handle, client_factory)); } -} -pub(crate) struct PollingThread { - poller: Arc, - runtime: Arc, - queue: QueueDynMpsc, - logging_enabled: bool, -} - -impl PollingThread { pub(crate) fn run(&mut self) { use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::Message; @@ -244,7 +225,8 @@ impl PollingThread { for event in events.drain(..) { let key = event.u64; if let Some(handle) = lookup.get(&key) { - self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), event.events)); + let events = event.events; + self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), events)); handle.send_message(&self.runtime, Message::Poll, true); } } @@ -264,23 +246,40 @@ impl PollingThread { } // bit convoluted, but it works -pub(crate) struct PollingThreadDestroyer { - queue: Option>, +pub(crate) struct PollingThreadHandle { + // requires Option, because: + queue: Option>, // destructor needs to be called + handle: Option>, // we need to call `join` } -impl PollingThreadDestroyer { - fn new(queue: QueueDynProducer) -> Self { - return Self{ queue: Some(queue) }; - } - - pub(crate) fn initiate_destruction(&mut self) { +impl PollingThreadHandle { + pub(crate) fn shutdown(&mut self) -> thread::Result<()> { + debug_assert!(self.handle.is_some(), "polling thread already destroyed"); self.queue.take().unwrap().push(PollCmd::Shutdown); + return self.handle.take().unwrap().join(); } } -impl Drop for PollingThreadDestroyer { +impl Drop for PollingThreadHandle { fn drop(&mut self) { - debug_assert!(self.queue.is_none()); + debug_assert!(self.queue.is_none() && self.handle.is_none()); + } +} + +// oh my god, now I'm writing factory objects. I'm not feeling too well +pub(crate) struct PollingClientFactory { + poller: Arc, + generation_counter: Arc, + queue_factory: QueueDynProducerFactory, +} + +impl PollingClientFactory { + pub(crate) fn client(&self) -> PollingClient { + return PollingClient{ + poller: self.poller.clone(), + generation_counter: self.generation_counter.clone(), + queue: self.queue_factory.producer(), + }; } }