diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 1c31694e3d45b6214a10052fbd1104db80d1058b..a17d66cc0b843cdbd460a5bc7518bd137b1a8efd 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,59 +1,15 @@ +use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; -use crate::runtime2::connector::{BranchId, ConnectorPDL}; -use crate::runtime2::inbox::{DataMessage, PrivateInbox}; -use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; +use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey, ConnectorVariant}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; -use super::connector::{ConnectorScheduling, RunDeltaState}; -use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage}; - -/// Contains fields that are mostly managed by the scheduler, but may be -/// accessed by the connector -pub(crate) struct ConnectorCtx { - pub(crate) id: ConnectorId, - pub(crate) ports: Vec, -} - -impl ConnectorCtx { - pub(crate) fn new() -> ConnectorCtx { - Self{ - id: ConnectorId::new_invalid(), - ports: Vec::new(), - } - } - - pub(crate) fn add_port(&mut self, port: Port) { - debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id)); - self.ports.push(port); - } - - pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port { - let index = self.port_id_to_index(id); - return self.ports.remove(index); - } - - pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port { - let index = self.port_id_to_index(id); - return &self.ports[index]; - } - - pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port { - let index = self.port_id_to_index(id); - return &mut self.ports[index]; - } - - fn port_id_to_index(&self, id: PortIdLocal) -> usize { - for (idx, port) in self.ports.iter().enumerate() { - if port.self_id == id { - return idx; - } - } - - panic!("port {:?}, not owned by connector", id); - } -} +use super::connector::{BranchId, ConnectorPDL, ConnectorScheduling}; +use super::inbox::{ + Message, MessageContents, ControlMessageVariant, + DataMessage, ControlMessage, SolutionMessage, SyncMessage +}; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] @@ -74,8 +30,6 @@ impl Scheduler { pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run - let mut delta_state = RunDeltaState::new(); - 'thread_loop: loop { // Retrieve a unit of work self.debug("Waiting for work"); @@ -97,71 +51,7 @@ impl Scheduler { // connector. let mut cur_schedule = ConnectorScheduling::Immediate; while cur_schedule == ConnectorScheduling::Immediate { - // Check all the message that are in the shared inbox - while let Some(message) = scheduled.public.inbox.take_message() { - // Check for rerouting - self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message)); - if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { - self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0)); - self.runtime.send_message(other_connector_id, message); - continue; - } - - self.debug_conn(connector_id, " ... Handling message myself"); - // Check for messages that requires special action from the - // scheduler. - if let MessageContents::Control(content) = message.contents { - match content.content { - ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { - // Need to change port target - let port = scheduled.context.get_port_mut(port_id); - port.peer_connector = new_target_connector_id; - - // Note: for simplicity we program the scheduler to always finish - // running a connector with an empty outbox. If this ever changes - // then accepting the "port peer changed" message implies we need - // to change the recipient of the message in the outbox. - debug_assert!(delta_state.outbox.is_empty()); - - // And respond with an Ack - let ack_message = Message{ - sending_connector: connector_id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - }; - self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); - self.runtime.send_message(message.sending_connector, ack_message); - }, - ControlMessageVariant::CloseChannel(port_id) => { - // Mark the port as being closed - let port = scheduled.context.get_port_mut(port_id); - port.state = PortState::Closed; - - // Send an Ack - let ack_message = Message{ - sending_connector: connector_id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - }; - self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); - self.runtime.send_message(message.sending_connector, ack_message); - - }, - ControlMessageVariant::Ack => { - scheduled.router.handle_ack(content.id); - } - } - } else { - // Let connector handle message - scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state); - } - } + self.handle_inbox_messages(scheduled); // Run the main behaviour of the connector, depending on its // current state. @@ -180,14 +70,12 @@ impl Scheduler { } else { self.debug_conn(connector_id, "Running ..."); let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; - let new_schedule = scheduled.connector.run( - scheduler_ctx, &scheduled.context, &mut delta_state - ); + let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx_fancy); self.debug_conn(connector_id, "Finished running"); // Handle all of the output from the current run: messages to // send and connectors to instantiate. - self.handle_delta_state(scheduled, connector_key.downcast(), &mut delta_state); + self.handle_changes_in_context(scheduled); cur_schedule = new_schedule; } @@ -212,7 +100,7 @@ impl Scheduler { // Prepare for exit. Set the shutdown flag and broadcast // messages to notify peers of closing channels scheduled.shutting_down = true; - for port in &scheduled.context.ports { + for port in &scheduled.ctx_fancy.ports { if port.state != PortState::Closed { let message = scheduled.router.prepare_closing_channel( port.self_id, port.peer_id, @@ -234,119 +122,244 @@ impl Scheduler { } } - fn handle_delta_state(&mut self, - cur_connector: &mut ScheduledConnector, connector_id: ConnectorId, - delta_state: &mut RunDeltaState - ) { - // Handling any messages that were sent - if !delta_state.outbox.is_empty() { - for mut message in delta_state.outbox.drain(..) { - // Based on the message contents, decide where the message - // should be sent to. This might end up modifying the message. - self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); - let (peer_connector, self_port, peer_port) = match &mut message { - MessageContents::Data(contents) => { - let port = cur_connector.context.get_port(contents.sending_port); - (port.peer_connector, contents.sending_port, port.peer_id) - }, - MessageContents::Sync(contents) => { - let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) - }, - MessageContents::RequestCommit(contents)=> { - let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) - }, - MessageContents::ConfirmCommit(contents) => { - for to_visit in &contents.to_visit { - let message = Message{ + /// Receiving messages from the public inbox and handling them or storing + /// them in the component's private inbox + fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) { + let connector_id = scheduled.ctx_fancy.id; + + while let Some(message) = scheduled.public.inbox.take_message() { + // Check for rerouting + self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message)); + if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { + self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0)); + self.runtime.send_message(other_connector_id, message); + continue; + } + + // Handle special messages here, messages for the component + // will be added to the inbox. + self.debug_conn(connector_id, " ... Handling message myself"); + match message.contents { + MessageContents::Control(content) => { + match content.content { + ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { + // Need to change port target + let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap(); + port.peer_connector = new_target_connector_id; + + // Note: for simplicity we program the scheduler to always finish + // running a connector with an empty outbox. If this ever changes + // then accepting the "port peer changed" message implies we need + // to change the recipient of the message in the outbox. + debug_assert!(scheduled.ctx_fancy.outbox.is_empty()); + + // And respond with an Ack + let ack_message = Message{ sending_connector: connector_id, receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::ConfirmCommit(contents.clone()), + contents: MessageContents::Control(ControlMessage{ + id: content.id, + content: ControlMessageVariant::Ack, + }), }; - self.runtime.send_message(*to_visit, message); + self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); + self.runtime.send_message(message.sending_connector, ack_message); + }, + ControlMessageVariant::CloseChannel(port_id) => { + // Mark the port as being closed + let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap(); + port.state = PortState::Closed; + + // Send an Ack + let ack_message = Message{ + sending_connector: connector_id, + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Control(ControlMessage{ + id: content.id, + content: ControlMessageVariant::Ack, + }), + }; + self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); + self.runtime.send_message(message.sending_connector, ack_message); + }, + ControlMessageVariant::Ack => { + scheduled.router.handle_ack(content.id); } - (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) - }, - MessageContents::Control(_) | MessageContents::Ping => { - // Never generated by the user's code - unreachable!(); } - }; + }, + MessageContents::Ping => { + // Pings are sent just to wake up a component, so + // nothing to do here. + }, + _ => { + // All other cases have to be handled by the component + scheduled.ctx_fancy.inbox_messages.push(message); + } + } + } + } - // TODO: Maybe clean this up, perhaps special case for - // ConfirmCommit can be handled differently. - if peer_connector.is_valid() { - if peer_port.is_valid() { - // Sending a message to a port, so the port may not be - // closed. - let port = cur_connector.context.get_port(self_port); - match port.state { - PortState::Open => {}, - PortState::Closed => { - todo!("Handling sending over a closed port"); - } + /// Handles changes to the context that were made by the component. This is + /// the way (due to Rust's borrowing rules) that we bubble up changes in the + /// component's state that the scheduler needs to know about (e.g. a message + /// that the component wants to send). + fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) { + let connector_id = scheduled.ctx_fancy.id; + + // Handling any messages that were sent + while let Some(mut message) = scheduled.ctx_fancy.outbox.pop_front() { + // Based on the message contents, decide where the message + // should be sent to. This might end up modifying the message. + self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); + let (peer_connector, self_port, peer_port) = match &mut message { + MessageContents::Data(contents) => { + let port = scheduled.ctx_fancy.get_port_by_id(contents.sending_port).unwrap(); + (port.peer_connector, contents.sending_port, port.peer_id) + }, + MessageContents::Sync(contents) => { + let connector = contents.to_visit.pop().unwrap(); + (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) + }, + MessageContents::RequestCommit(contents)=> { + let connector = contents.to_visit.pop().unwrap(); + (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) + }, + MessageContents::ConfirmCommit(contents) => { + for to_visit in &contents.to_visit { + let message = Message{ + sending_connector: scheduled.ctx_fancy.id, + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::ConfirmCommit(contents.clone()), + }; + self.runtime.send_message(*to_visit, message); + } + (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) + }, + MessageContents::Control(_) | MessageContents::Ping => { + // Never generated by the user's code + unreachable!(); + } + }; + + // TODO: Maybe clean this up, perhaps special case for + // ConfirmCommit can be handled differently. + if peer_connector.is_valid() { + if peer_port.is_valid() { + // Sending a message to a port, so the port may not be + // closed. + let port = scheduled.ctx_fancy.get_port_by_id(self_port).unwrap(); + match port.state { + PortState::Open => {}, + PortState::Closed => { + todo!("Handling sending over a closed port"); } } - let message = Message { - sending_connector: connector_id, - receiving_port: peer_port, - contents: message, - }; - self.runtime.send_message(peer_connector, message); } + let message = Message { + sending_connector: scheduled.ctx_fancy.id, + receiving_port: peer_port, + contents: message, + }; + self.runtime.send_message(peer_connector, message); } } - if !delta_state.new_ports.is_empty() { - for port in delta_state.new_ports.drain(..) { - cur_connector.context.ports.push(port); - } - } + while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() { + match state_change { + ComponentStateChange::CreatedComponent(component) => { + // Add the new connector to the global registry + let new_key = self.runtime.create_pdl_component(component, false); + let new_connector = self.runtime.get_component_private(&new_key); + + // Transfer ports + // TODO: Clean this up the moment native components are somewhat + // properly implemented. We need to know about the ports that + // are "owned by the PDL code", and then make sure that the + // context contains a description of those ports. + let ports = if let ConnectorVariant::UserDefined(connector) = &new_connector.connector { + &connector.ports.owned_ports + } else { + unreachable!(); + }; - // Handling any new connectors that were scheduled - // TODO: Pool outgoing messages to reduce atomic access - if !delta_state.new_connectors.is_empty() { - for new_connector in delta_state.new_connectors.drain(..) { - // Add to global registry to obtain key - let new_key = self.runtime.create_pdl_component(cur_connector, new_connector); - let new_connector = self.runtime.get_component_private(&new_key); - - // Call above changed ownership of ports, but we still have to - // let the other end of the channel know that the port has - // changed location. - for port in &new_connector.context.ports { - let reroute_message = cur_connector.router.prepare_reroute( - port.self_id, port.peer_id, cur_connector.context.id, - port.peer_connector, new_connector.context.id - ); - - self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); - self.runtime.send_message(port.peer_connector, reroute_message); - } + for port_id in ports { + // Transfer messages associated with the transferred port + let mut message_idx = 0; + while message_idx < scheduled.ctx_fancy.inbox_messages.len() { + let message = &scheduled.ctx_fancy.inbox_messages[message_idx]; + if message.receiving_port == *port_id { + // Need to transfer this message + let taken_message = scheduled.ctx_fancy.inbox_messages.remove(message_idx); + new_connector.ctx_fancy.inbox_messages.push(taken_message); + } else { + message_idx += 1; + } + } - // Schedule new connector to run - self.runtime.push_work(new_key); + // Transfer the port itself + let port_index = scheduled.ctx_fancy.ports.iter() + .position(|v| v.self_id == *port_id) + .unwrap(); + let port = scheduled.ctx_fancy.ports.remove(port_index); + new_connector.ctx_fancy.ports.push(port.clone()); + + // Notify the peer that the port has changed + let reroute_message = scheduled.router.prepare_reroute( + port.self_id, port.peer_id, scheduled.ctx_fancy.id, + port.peer_connector, new_connector.ctx_fancy.id + ); + + self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); + self.runtime.send_message(port.peer_connector, reroute_message); + } + + // Schedule new connector to run + self.runtime.push_work(new_key); + }, + ComponentStateChange::CreatedPort(port) => { + scheduled.ctx_fancy.ports.push(port); + }, + ComponentStateChange::ChangedPort(port_change) => { + if port_change.is_acquired { + scheduled.ctx_fancy.ports.push(port_change.port); + } else { + let index = scheduled.ctx_fancy.ports + .iter() + .position(|v| v.self_id == port_change.port.self_id) + .unwrap(); + scheduled.ctx_fancy.ports.remove(index); + } + } } } - debug_assert!(delta_state.outbox.is_empty()); - debug_assert!(delta_state.new_ports.is_empty()); - debug_assert!(delta_state.new_connectors.is_empty()); + // Finally, check if we just entered or just left a sync region + if scheduled.ctx_fancy.changed_in_sync { + if scheduled.ctx_fancy.is_in_sync { + // Just entered sync region + } else { + // Just left sync region. So clear inbox + scheduled.ctx_fancy.inbox_messages.clear(); + scheduled.ctx_fancy.inbox_len_read = 0; + } + + scheduled.ctx_fancy.changed_in_sync = false; // reset flag + } } fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) { - debug_assert_eq!(connector_key.index, connector.context.id.0); + debug_assert_eq!(connector_key.index, connector.ctx_fancy.id.0); debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false); // This is the running connector, and only the running connector may // decide it wants to sleep again. connector.public.sleeping.store(true, Ordering::Release); - // But do to reordering we might have received messages from peers who + // But due to reordering we might have received messages from peers who // did not consider us sleeping. If so, then we wake ourselves again. if !connector.public.inbox.is_empty() { - // Try to wake ourselves up + // Try to wake ourselves up (needed because someone might be trying + // the exact same atomic compare-and-swap at this point in time) let should_wake_up_again = connector.public.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); @@ -378,14 +391,9 @@ enum ComponentStateChange { } #[derive(Clone)] -pub(crate) enum ComponentPortChange { - Acquired(Port), - Released(Port), -} - -struct InboxMessage { - target_port: PortIdLocal, - data: DataMessage, +pub(crate) struct ComponentPortChange { + pub is_acquired: bool, // otherwise: released + pub port: Port, } /// The component context (better name may be invented). This was created @@ -395,23 +403,43 @@ struct InboxMessage { /// scheduler need to be exchanged. pub(crate) struct ComponentCtxFancy { // Mostly managed by the scheduler - id: ConnectorId, + pub(crate) id: ConnectorId, ports: Vec, - inbox_messages: Vec, + inbox_messages: Vec, // never control or ping messages inbox_len_read: usize, // Submitted by the component is_in_sync: bool, changed_in_sync: bool, - outbox: Vec, - state_changes: Vec + outbox: VecDeque, + state_changes: VecDeque +} + +pub(crate) enum ReceivedMessage { + Data((PortIdLocal, DataMessage)), + Sync(SyncMessage), + RequestCommit(SolutionMessage), + ConfirmCommit(SolutionMessage), } impl ComponentCtxFancy { + pub(crate) fn new_empty() -> Self { + return Self{ + id: ConnectorId::new_invalid(), + ports: Vec::new(), + inbox_messages: Vec::new(), + inbox_len_read: 0, + is_in_sync: false, + changed_in_sync: false, + outbox: VecDeque::new(), + state_changes: VecDeque::new(), + }; + } + /// Notify the runtime that the component has created a new component. May /// only be called outside of a sync block. pub(crate) fn push_component(&mut self, component: ConnectorPDL) { debug_assert!(!self.is_in_sync); - self.state_changes.push(ComponentStateChange::CreatedComponent(component)); + self.state_changes.push_back(ComponentStateChange::CreatedComponent(component)); } /// Notify the runtime that the component has created a new port. May only @@ -419,10 +447,21 @@ impl ComponentCtxFancy { /// block, pass them when calling `notify_sync_end`). pub(crate) fn push_port(&mut self, port: Port) { debug_assert!(!self.is_in_sync); - self.state_changes.push(ComponentStateChange::CreatedPort(port)) + self.state_changes.push_back(ComponentStateChange::CreatedPort(port)) + } + + pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> { + return self.ports.iter().find(|v| v.self_id == id); } - /// Notify that component will enter a sync block. + fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> { + return self.ports.iter_mut().find(|v| v.self_id == id); + } + + /// Notify that component will enter a sync block. Note that after calling + /// this function you must allow the scheduler to pick up the changes in + /// the context by exiting your `Component::run` function with an + /// appropriate scheduling value. pub(crate) fn notify_sync_start(&mut self) -> &[Port] { debug_assert!(!self.is_in_sync); @@ -431,14 +470,20 @@ impl ComponentCtxFancy { return &self.ports } + #[inline] + pub(crate) fn is_in_sync(&self) -> bool { + return self.is_in_sync; + } + /// Submit a message for the scheduler to send to the appropriate receiver. /// May only be called inside of a sync block. pub(crate) fn submit_message(&mut self, contents: MessageContents) { debug_assert!(self.is_in_sync); - self.outbox.push(contents); + self.outbox.push_back(contents); } - /// Notify that component just finished a sync block. + /// Notify that component just finished a sync block. Like + /// `notify_sync_start`: drop out of the `Component::Run` function. pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) { debug_assert!(self.is_in_sync); @@ -447,26 +492,15 @@ impl ComponentCtxFancy { self.state_changes.reserve(changed_ports.len()); for changed_port in changed_ports { - self.state_changes.push(ComponentStateChange::ChangedPort(changed_port.clone())); + self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone())); } } - /// Inserts message into inbox. Generally only called by scheduler. - pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, data: DataMessage) { - debug_assert!(!self.inbox_messages.iter().any(|v| { - v.target_port == target_port && - v.data.sender_prev_branch_id == data.sender_prev_branch_id && - v.data.sender_cur_branch_id == data.sender_cur_branch_id - })); - - self.inbox_messages.push(InboxMessage{ target_port, data }) - } - /// Retrieves messages matching a particular port and branch id. But only /// those messages that have been previously received with /// `read_next_message`. - pub(crate) fn get_read_messages(&self, match_port_id: PortIdLocal, match_prev_branch_id: BranchId) -> MessagesIter { - return MessageIter { + pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal, match_prev_branch_id: BranchId) -> MessagesIter { + return MessagesIter { messages: &self.inbox_messages, next_index: 0, max_index: self.inbox_len_read, @@ -476,47 +510,62 @@ impl ComponentCtxFancy { /// Retrieves the next unread message from the inbox `None` if there are no /// (new) messages to read. - pub(crate) fn read_next_message(&mut self) -> Option<(&PortIdLocal, &DataMessage)> { - if self.inbox_len_read == self.inbox_messages.len() { - return None; - } + // TODO: Fix the clone of the data message, entirely unnecessary + pub(crate) fn read_next_message(&mut self) -> Option { + if !self.is_in_sync { return None; } + if self.inbox_len_read == self.inbox_messages.len() { return None; } let message = &self.inbox_messages[self.inbox_len_read]; - self.inbox_len_read += 1; - return Some((&message.target_port, &message.data)) + if let MessageContents::Data(contents) = &message.contents { + self.inbox_len_read += 1; + return Some(ReceivedMessage::Data((message.receiving_port, contents.clone()))); + } else { + // Must be a sync/solution message + let message = self.inbox_messages.remove(self.inbox_len_read); + return match message.contents { + MessageContents::Sync(v) => Some(ReceivedMessage::Sync(v)), + MessageContents::RequestCommit(v) => Some(ReceivedMessage::RequestCommit(v)), + MessageContents::ConfirmCommit(v) => Some(ReceivedMessage::ConfirmCommit(v)), + _ => unreachable!(), // because we only put data/synclike messages in the inbox + } + } } } pub(crate) struct MessagesIter<'a> { - messages: &'a [InboxMessage], + messages: &'a [Message], next_index: usize, max_index: usize, match_port_id: PortIdLocal, match_prev_branch_id: BranchId, } -impl Iterator for MessagesIter { - type Item = DataMessage; +impl<'a> Iterator for MessagesIter<'a> { + type Item = &'a DataMessage; - fn next(&mut self) -> Option<&Self::Item> { + fn next(&mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let message = &self.messages[self.next_index]; - if message.target_port == self.match_port_id && message.data.sender_prev_branch_id == self.match_prev_branch_id { - // Found a match - break; + if let MessageContents::Data(data_message) = &message.contents { + if message.receiving_port == self.match_port_id && data_message.sender_prev_branch_id == self.match_prev_branch_id { + // Found a match + self.next_index += 1; + return Some(data_message); + } + } else { + // Unreachable because: + // 1. We only iterate over messages that were previously retrieved by `read_next_message`. + // 2. Inbox does not contain control/ping messages. + // 3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox. + unreachable!(); } self.next_index += 1; } - if self.next_index == self.max_index { - return None; - } - - let message = &self.messages[self.next_index]; - self.next_index += 1; - return Some(&message.data); + // No more messages + return None; } }