diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index bafdf0dc3198bdff60ba18befa21bf461c0d7edc..3bc9c94477458743706ed7f8c32e622a41ef3bd6 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -9,7 +9,11 @@ use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::branch::{BranchId}; use super::connector::{ConnectorPDL, ConnectorScheduling}; -use super::inbox::{Message, DataMessage, ControlMessage, ControlContent}; +use super::inbox::{ + Message, DataMessage, SyncHeader, + ControlMessage, ControlContent, + SyncControlMessage, SyncControlContent, +}; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] @@ -203,9 +207,29 @@ impl Scheduler { fn handle_inbox_while_shutting_down(&mut self, scheduled: &mut ScheduledConnector) { // Note: we're not handling the public inbox, we're dealing with the // private one! + debug_assert!(scheduled.shutting_down); while let Some(message) = scheduled.ctx.read_next_message() { - if let Some(target_port) = Self::get_message_target_port(&message) { - todo!("handle this, send back 'my thing is closed yo'") + let target_port_and_round_number = match &message { + Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)), + Message::SyncComp(_) => None, + Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)), + Message::SyncControl(_) => None, + Message::Control(_) => None, + }; + + if let Some((target_port, sync_round)) = target_port_and_round_number { + // This message is aimed at a port, but we're shutting down, so + // notify the peer that its was not received properly. + // (also: since we're shutting down, we're not in sync mode and + // the context contains the definitive set of owned ports) + let port = scheduled.ctx.get_port_by_id(target_port).unwrap(); + let message = SyncControlMessage{ + in_response_to_sync_round: sync_round, + target_component_id: port.peer_connector, + content: SyncControlContent::ChannelIsClosed(port.peer_id), + }; + self.debug_conn(scheduled.ctx.id, &format!("Sending message [shutdown]\n --- {:?}", message)); + self.runtime.send_message(port.peer_connector, Message::SyncControl(message)); } } } @@ -249,10 +273,9 @@ impl Scheduler { } port_desc.peer_connector - } - Message::Control(_) => { - unreachable!("component sending control messages directly"); - } + }, + Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"), + Message::Control(_) => unreachable!("component sending 'Control' messages directly"), }; self.runtime.send_message(target_component_id, message); @@ -367,6 +390,7 @@ impl Scheduler { Message::Data(data) => return Some(data.data_header.target_port), Message::SyncComp(_) => {}, Message::SyncPort(content) => return Some(content.target_port), + Message::SyncControl(_) => return None, Message::Control(control) => { match &control.content { ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), @@ -381,11 +405,11 @@ impl Scheduler { // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { - // println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); + println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { - // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); + println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); } } @@ -506,8 +530,16 @@ impl ComponentCtx { pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> { debug_assert!(self.is_in_sync); if let Some(port_id) = contents.source_port() { - if self.get_port_by_id(port_id).is_none() { + let port_info = self.get_port_by_id(port_id); + let is_valid = match port_info { + Some(port_info) => { + port_info.state == PortState::Open + }, + None => false, + }; + if !is_valid { // We don't own the port + println!(" ****** DEBUG ****** : Sending through closed port!!! {}", port_id.index); return Err(()); } } @@ -554,19 +586,17 @@ impl ComponentCtx { // should only handle them once. Control messages should never be in // here. let message = &self.inbox_messages[self.inbox_len_read]; - match message { + match &message { Message::Data(content) => { + // Keep message in inbox for later reading self.inbox_len_read += 1; return Some(Message::Data(content.clone())); }, - Message::SyncComp(_) => { + Message::SyncComp(_) | Message::SyncPort(_) | Message::SyncControl(_) => { + // Remove message from inbox let message = self.inbox_messages.remove(self.inbox_len_read); return Some(message); }, - Message::SyncPort(_) => { - let message = self.inbox_messages.remove(self.inbox_len_read); - return Some(message); - } Message::Control(_) => unreachable!("control message ended up in component inbox"), } }