diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 2f6607005df909e7d0a6eb7849a81ec5f566af7b..5623f2f59e5c52e89029c2adcd4533c4400b492a 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,5 +1,6 @@ use std::sync::Arc; use std::sync::atomic::Ordering; +use crate::runtime2::ScheduledConnector; use super::{RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; @@ -77,18 +78,18 @@ impl Scheduler { 'thread_loop: loop { // Retrieve a unit of work - println!("DEBUG [{}]: Waiting for work", scheduler_id); + self.debug("Waiting for work"); let connector_key = self.runtime.wait_for_work(); if connector_key.is_none() { // We should exit - println!("DEBUG [{}]: ... No more work, quitting", scheduler_id); + self.debug(" ... No more work, quitting"); break 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); let connector_id = connector_key.downcast(); - println!("DEBUG [{}]: ... Got work, running {}", scheduler_id, connector_key.index); + self.debug_conn(connector_id, &format!(" ... Got work, running {}", connector_key.index)); let scheduled = self.runtime.get_component_private(&connector_key); @@ -99,9 +100,9 @@ impl Scheduler { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { // Check for rerouting - println!("DEBUG [{}]: Handling message from {}:{}\n{:#?}", scheduler_id, message.sending_connector.0, message.receiving_port.index, message); + self.debug_conn(connector_id, &format!("Handling message from {}:{}\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message)); if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { - self.send_message_and_wake_up_if_sleeping(other_connector_id, message); + self.runtime.send_message(other_connector_id, message); continue; } @@ -113,14 +114,14 @@ impl Scheduler { // Need to change port target let port = scheduled.context.get_port_mut(port_id); port.peer_connector = new_target_connector_id; + + // Note: for simplicity we program the scheduler to always finish + // running a connector with an empty outbox. If this ever changes + // then accepting the "port peer changed" message implies we need + // to change the recipient of the message in the outbox. debug_assert!(delta_state.outbox.is_empty()); // And respond with an Ack - // Note: after this code has been reached, we may not have any - // messages in the outbox that send to the port whose owning - // connector we just changed. This is because the `ack` will - // clear the rerouting entry of the `ack`-receiver. - // TODO: Question from Max from the past: what the hell did you mean? self.runtime.send_message( message.sending_connector, Message{ @@ -167,6 +168,7 @@ impl Scheduler { if scheduled.shutting_down { // Nothing to do. But we're stil waiting for all our pending // control messages to be answered. + self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks())); if scheduled.router.num_pending_acks() == 0 { // We're actually done, we can safely destroy the // currently running connector @@ -176,16 +178,16 @@ impl Scheduler { cur_schedule = ConnectorScheduling::NotNow; } } else { - println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index); + self.debug_conn(connector_id, "Running ..."); let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run( scheduler_ctx, &scheduled.context, &mut delta_state ); - println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index); + self.debug_conn(connector_id, "Finished running"); // Handle all of the output from the current run: messages to // send and connectors to instantiate. - self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state); + self.handle_delta_state(scheduled, connector_key.downcast(), &mut delta_state); cur_schedule = new_schedule; } @@ -204,58 +206,55 @@ impl Scheduler { // Need to sleep, note that we are the only ones which are // allows to set the sleeping state to `true`, and since // we're running it must currently be `false`. - debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false); - scheduled.public.sleeping.store(true, Ordering::Release); - - // We might have received a message in the meantime from a - // thread that did not see the sleeping flag set to `true`, - // so: - if !scheduled.public.inbox.is_empty() { - let should_reschedule_self = scheduled.public.sleeping - .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); - - if should_reschedule_self { - self.runtime.push_work(connector_key); - } - } + self.try_go_to_sleep(connector_key, scheduled); }, ConnectorScheduling::Exit => { // Prepare for exit. Set the shutdown flag and broadcast // messages to notify peers of closing channels scheduled.shutting_down = true; for port in &scheduled.context.ports { - let message = scheduled.router.prepare_closing_channel( - port.self_id, port.peer_id, - connector_id - ); - self.runtime.send_message(port.peer_connector, message); + if port.state != PortState::Closed { + let message = scheduled.router.prepare_closing_channel( + port.self_id, port.peer_id, + connector_id + ); + self.runtime.send_message(port.peer_connector, message); + } } + + if scheduled.router.num_pending_acks() == 0 { + self.runtime.destroy_component(connector_key); + continue 'thread_loop; + } + + self.try_go_to_sleep(connector_key, scheduled); } } } } - fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) { + fn handle_delta_state(&mut self, + cur_connector: &mut ScheduledConnector, connector_id: ConnectorId, + delta_state: &mut RunDeltaState + ) { // Handling any messages that were sent - let connector_id = connector_key.downcast(); - if !delta_state.outbox.is_empty() { for mut message in delta_state.outbox.drain(..) { // Based on the message contents, decide where the message // should be sent to. This might end up modifying the message. - let (peer_connector, peer_port) = match &mut message { + self.debug_conn(connector_id, &format!("Sending message\n --- {:?}", message)); + let (peer_connector, self_port, peer_port) = match &mut message { MessageContents::Data(contents) => { - let port = context.get_port(contents.sending_port); - (port.peer_connector, port.peer_id) + let port = cur_connector.context.get_port(contents.sending_port); + (port.peer_connector, contents.sending_port, port.peer_id) }, MessageContents::Sync(contents) => { let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid()) + (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) }, MessageContents::RequestCommit(contents)=> { let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid()) + (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) }, MessageContents::ConfirmCommit(contents) => { for to_visit in &contents.to_visit { @@ -266,7 +265,7 @@ impl Scheduler { }; self.runtime.send_message(*to_visit, message); } - (ConnectorId::new_invalid(), PortIdLocal::new_invalid()) + (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) }, MessageContents::Control(_) | MessageContents::Ping => { // Never generated by the user's code @@ -277,6 +276,17 @@ impl Scheduler { // TODO: Maybe clean this up, perhaps special case for // ConfirmCommit can be handled differently. if peer_connector.is_valid() { + if peer_port.is_valid() { + // Sending a message to a port, so the port may not be + // closed. + let port = cur_connector.context.get_port(self_port); + match port.state { + PortState::Open => {}, + PortState::Closed => { + todo!("Handling sending over a closed port"); + } + } + } let message = Message { sending_connector: connector_id, receiving_port: peer_port, @@ -289,15 +299,13 @@ impl Scheduler { if !delta_state.new_ports.is_empty() { for port in delta_state.new_ports.drain(..) { - context.ports.push(port); + cur_connector.context.ports.push(port); } } // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { - let cur_connector = self.runtime.get_component_private(connector_key); - for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key let new_key = self.runtime.create_pdl_component(cur_connector, new_connector); @@ -307,7 +315,6 @@ impl Scheduler { // let the other end of the channel know that the port has // changed location. for port in &new_connector.context.ports { - cur_connector.pending_acks += 1; let reroute_message = cur_connector.router.prepare_reroute( port.self_id, port.peer_id, cur_connector.context.id, port.peer_connector, new_connector.context.id @@ -325,6 +332,37 @@ impl Scheduler { debug_assert!(delta_state.new_ports.is_empty()); debug_assert!(delta_state.new_connectors.is_empty()); } + + fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) { + debug_assert_eq!(connector_key.index, connector.context.id.0); + debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false); + + // This is the running connector, and only the running connector may + // decide it wants to sleep again. + connector.public.sleeping.store(true, Ordering::Release); + + // But do to reordering we might have received messages from peers who + // did not consider us sleeping. If so, then we wake ourselves again. + if !connector.public.inbox.is_empty() { + // Try to wake ourselves up + let should_wake_up_again = connector.public.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up_again { + self.runtime.push_work(connector_key) + } + } + } + + // TODO: Remove, this is debugging stuff + fn debug(&self, message: &str) { + println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); + } + + fn debug_conn(&self, conn: ConnectorId, message: &str) { + println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); + } } // ----------------------------------------------------------------------------- @@ -369,7 +407,7 @@ impl ControlMessageHandler { /// entry to match against the (hopefully) returned `Ack` message. pub fn prepare_closing_channel( &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, - self_connector_id: connectorId + self_connector_id: ConnectorId ) -> Message { let id = self.take_id(); @@ -425,7 +463,7 @@ impl ControlMessageHandler { /// function returns the connector that should retrieve this message. pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option { for entry in &self.active { - if let ControlVariant::ChangedPort(entry) = entry { + if let ControlVariant::ChangedPort(entry) = &entry.variant { if entry.source_connector == sending_connector && entry.target_port == target_port { // Need to reroute this message