diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index da3e802b0e86c33d99ff65caf1f22ac56a3cd263..36b11b4505fa8a4dbeebcf94fb7fd23b140b799f 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -112,7 +112,7 @@ impl Scheduler { port.self_id, port.peer_id, connector_id ); - self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message)); + self.debug_conn(connector_id, &format!("Sending message to {:?} [ exit ] \n --- {:?}", port.peer_connector, message)); self.runtime.send_message(port.peer_connector, Message::Control(message)); } } @@ -151,6 +151,20 @@ impl Scheduler { self.runtime.send_message(other_component_id, message); continue; } + + match scheduled.ctx.get_port_by_id(target_port) { + Some(port_info) => { + if port_info.state == PortState::Closed { + // We're no longer supposed to receive messages + // (rerouted message arrived much later!) + continue + } + }, + None => { + // Apparently we no longer have a handle to the port + continue; + } + } } // If here, then we should handle the message @@ -174,7 +188,7 @@ impl Scheduler { sending_component_id: connector_id, content: ControlContent::Ack, }); - self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); + self.debug_conn(connector_id, &format!("Sending message to {:?} [pp ack]\n --- {:?}", message.sending_component_id, ack_message)); self.runtime.send_message(message.sending_component_id, ack_message); }, ControlContent::CloseChannel(port_id) => { @@ -188,12 +202,12 @@ impl Scheduler { sending_component_id: connector_id, content: ControlContent::Ack, }); - self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); + self.debug_conn(connector_id, &format!("Sending message to {:?} [cc ack] \n --- {:?}", message.sending_component_id, ack_message)); self.runtime.send_message(message.sending_component_id, ack_message); }, ControlContent::Ack => { if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) { - self.debug_conn(connector_id, &format!("Sending message [ack ack] \n --- {:?}", new_control_message)); + self.debug_conn(connector_id, &format!("Sending message to {:?} [ack ack] \n --- {:?}", target_component, new_control_message)); self.runtime.send_message(target_component, new_control_message); }; }, @@ -228,13 +242,15 @@ impl Scheduler { // (also: since we're shutting down, we're not in sync mode and // the context contains the definitive set of owned ports) let port = scheduled.ctx.get_port_by_id(target_port).unwrap(); - let message = SyncControlMessage{ - in_response_to_sync_round: sync_round, - target_component_id: port.peer_connector, - content: SyncControlContent::ChannelIsClosed(port.peer_id), - }; - self.debug_conn(scheduled.ctx.id, &format!("Sending message [shutdown]\n --- {:?}", message)); - self.runtime.send_message(port.peer_connector, Message::SyncControl(message)); + if port.state == PortState::Open { + let message = SyncControlMessage { + in_response_to_sync_round: sync_round, + target_component_id: port.peer_connector, + content: SyncControlContent::ChannelIsClosed(port.peer_id), + }; + self.debug_conn(scheduled.ctx.id, &format!("Sending message to {:?} [shutdown]\n --- {:?}", port.peer_connector, message)); + self.runtime.send_message(port.peer_connector, Message::SyncControl(message)); + } } } @@ -247,8 +263,6 @@ impl Scheduler { // Handling any messages that were sent while let Some(message) = scheduled.ctx.outbox.pop_front() { - self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:#?}", message)); - let target_component_id = match &message { Message::Data(content) => { // Data messages are always sent to a particular port, and @@ -282,6 +296,7 @@ impl Scheduler { Message::Control(_) => unreachable!("component sending 'Control' messages directly"), }; + self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox] \n --- {:#?}", target_component_id, message)); self.runtime.send_message(target_component_id, message); } @@ -316,7 +331,7 @@ impl Scheduler { &mut new_connector.router ); - self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); + self.debug_conn(connector_id, &format!("Sending message to {:?} [newcon]\n --- {:?}", port.peer_connector, reroute_message)); self.runtime.send_message(port.peer_connector, Message::Control(reroute_message)); } } @@ -521,6 +536,19 @@ impl ComponentCtx { return Ok(()); } + /// Removes messages in the outbox using a match + pub(crate) fn remove_messages bool>(&mut self, remove_if_fn: F) { + let mut idx = 0; + while idx < self.outbox.len() { + let should_remove = remove_if_fn(&self.outbox[idx]); + if should_remove { + self.outbox.remove(idx); + } else { + idx += 1; + } + } + } + /// Notify that component just finished a sync block. Like /// `notify_sync_start`: drop out of the `Component::Run` function. pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) { @@ -620,6 +648,7 @@ struct Inbox { next_delay_idx: u32, start_read_idx: u32, next_read_idx: u32, + last_read_idx: u32, generation: u32, } @@ -637,6 +666,7 @@ impl Inbox { next_delay_idx: 0, start_read_idx: 0, next_read_idx: 0, + last_read_idx: 0, generation: 0, } }