Changeset - 5664651bdca4
[Not reviewed]
MH - 4 years ago 2021-11-11 11:13:02
contact@maxhenger.nl
small fix to reduce number of compiler warnings
3 files changed with 35 insertions and 35 deletions:
0 comments (0 inline, 0 general)
src/lib.rs
Show inline comments
 
#[macro_use]
 
mod macros;
 

	
 
mod common;
 
mod protocol;
 
mod runtime;
 
mod runtime2;
 
pub mod runtime2;
 
mod collections;
 

	
 
pub use common::{ConnectorId, EndpointPolarity, Payload, Polarity, PortId};
 
pub use protocol::ProtocolDescription;
 
pub use runtime::{error, Connector, DummyLogger, FileLogger, VecLogger};
 

	
 
// TODO: Remove when not benchmarking
 
pub use protocol::input_source::InputSource;
 
pub use protocol::ast::Heap;
 

	
 
#[cfg(feature = "ffi")]
 
pub mod ffi;
src/runtime2/mod.rs
Show inline comments
 
@@ -78,25 +78,25 @@ pub(crate) enum ConnectorVariant {
 

	
 
impl Connector for ConnectorVariant {
 
    fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        match self {
 
            ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx),
 
            ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx),
 
        }
 
    }
 
}
 

	
 
pub(crate) struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub ctx_fancy: ComponentCtx,
 
    pub ctx: ComponentCtx,
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: ControlMessageHandler,
 
    pub shutting_down: bool,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 

	
 
/// Externally facing runtime.
 
pub struct Runtime {
 
    inner: Arc<RuntimeInner>,
 
@@ -393,46 +393,46 @@ impl ConnectorStore {
 
            debug_assert!(!self.free.contains(&(key.index as usize)));
 
            let connector = self.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return &mut (**connector);
 
        }
 
    }
 

	
 
    /// Creates a new connector. Caller should ensure ports are set up correctly
 
    /// and the connector is queued for execution if needed.
 
    fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey {
 
        let mut connector = ScheduledConnector {
 
            connector,
 
            ctx_fancy: ComponentCtx::new_empty(),
 
            ctx: ComponentCtx::new_empty(),
 
            public: ConnectorPublic::new(initially_sleeping),
 
            router: ControlMessageHandler::new(),
 
            shutting_down: false,
 
        };
 

	
 
        let index;
 
        let key;
 

	
 
        if self.free.is_empty() {
 
            // No free entries, allocate new entry
 
            index = self.connectors.len();
 
            key = ConnectorKey{ index: index as u32 };
 
            connector.ctx_fancy.id = key.downcast();
 
            connector.ctx.id = key.downcast();
 

	
 
            let connector = Box::into_raw(Box::new(connector));
 
            self.connectors.push(connector);
 
        } else {
 
            // Free spot available
 
            index = self.free.pop().unwrap();
 
            key = ConnectorKey{ index: index as u32 };
 
            connector.ctx_fancy.id = key.downcast();
 
            connector.ctx.id = key.downcast();
 

	
 
            unsafe {
 
                let target = self.connectors.get_mut(index);
 
                std::ptr::write(*target, connector);
 
            }
 
        }
 

	
 
        return key;
 
    }
 

	
 
    /// Destroys a connector. Caller should make sure it is not scheduled for
 
    /// execution. Otherwise one experiences "bad stuff" (tm).
src/runtime2/scheduler.rs
Show inline comments
 
@@ -59,25 +59,25 @@ impl Scheduler {
 
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
                        cur_schedule = ConnectorScheduling::NotNow;
 
                    }
 
                } else {
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx_fancy);
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx);
 
                    self.debug_conn(connector_id, "Finished running");
 

	
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_changes_in_context(scheduled);
 

	
 
                    cur_schedule = new_schedule;
 
                }
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
            // So enqueue it if requested, and otherwise put it in a sleeping
 
@@ -89,233 +89,233 @@ impl Scheduler {
 
                    self.runtime.push_work(connector_key);
 
                },
 
                ConnectorScheduling::NotNow => {
 
                    // Need to sleep, note that we are the only ones which are
 
                    // allows to set the sleeping state to `true`, and since
 
                    // we're running it must currently be `false`.
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
                ConnectorScheduling::Exit => {
 
                    // Prepare for exit. Set the shutdown flag and broadcast
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.ctx_fancy.ports {
 
                    for port in &scheduled.ctx.ports {
 
                        if port.state != PortState::Closed {
 
                            let message = scheduled.router.prepare_closing_channel(
 
                                port.self_id, port.peer_id,
 
                                connector_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(message));
 
                        }
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    }
 

	
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx_fancy.id;
 
        let connector_id = scheduled.ctx.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message));
 
            if let Some(target_port) = Self::get_message_target_port(&message) {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 
                    self.runtime.send_message(other_component_id, message);
 
                    continue;
 
                }
 
            }
 

	
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
 

	
 
            match message {
 
                Message::Control(message) => {
 
                    match message.content {
 
                        ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                            // Need to change port target
 
                            let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap();
 
                            let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                            port.peer_connector = new_target_connector_id;
 

	
 
                            // Note: for simplicity we program the scheduler to always finish
 
                            // running a connector with an empty outbox. If this ever changes
 
                            // then accepting the "port peer changed" message implies we need
 
                            // to change the recipient of the message in the outbox.
 
                            debug_assert!(scheduled.ctx_fancy.outbox.is_empty());
 
                            debug_assert!(scheduled.ctx.outbox.is_empty());
 

	
 
                            // And respond with an Ack
 
                            let ack_message = Message::Control(ControlMessage {
 
                                id: message.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
                            self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_component_id, ack_message);
 
                        },
 
                        ControlContent::CloseChannel(port_id) => {
 
                            // Mark the port as being closed
 
                            let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap();
 
                            let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                            port.state = PortState::Closed;
 

	
 
                            // Send an Ack
 
                            let ack_message = Message::Control(ControlMessage {
 
                                id: message.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
                            self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_component_id, ack_message);
 
                        },
 
                        ControlContent::Ack => {
 
                            scheduled.router.handle_ack(message.id);
 
                        },
 
                        ControlContent::Ping => {},
 
                    }
 
                },
 
                _ => {
 
                    // All other cases have to be handled by the component
 
                    scheduled.ctx_fancy.inbox_messages.push(message);
 
                    scheduled.ctx.inbox_messages.push(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx_fancy.id;
 
        let connector_id = scheduled.ctx.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(message) = scheduled.ctx_fancy.outbox.pop_front() {
 
        while let Some(message) = scheduled.ctx.outbox.pop_front() {
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message));
 

	
 
            let target_component_id = match &message {
 
                Message::Data(content) => {
 
                    // Data messages are always sent to a particular port, and
 
                    // may end up being rerouted.
 
                    let port_desc = scheduled.ctx_fancy.get_port_by_id(content.data_header.sending_port).unwrap();
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.data_header.target_port);
 

	
 
                    if port_desc.state == PortState::Closed {
 
                        todo!("handle sending over a closed port")
 
                    }
 

	
 
                    port_desc.peer_connector
 
                },
 
                Message::Sync(content) => {
 
                    // Sync messages are always sent to a particular component,
 
                    // the sender must make sure it actually wants to send to
 
                    // the specified component (and is not using an inconsistent
 
                    // component ID associated with a port).
 
                    content.target_component_id
 
                },
 
                Message::Control(_) => {
 
                    unreachable!("component sending control messages directly");
 
                }
 
            };
 

	
 
            self.runtime.send_message(target_component_id, message);
 
        }
 

	
 
        while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() {
 
        while let Some(state_change) = scheduled.ctx.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Creating a new component. The creator needs to relinquish
 
                    // ownership of the ports that are given to the new
 
                    // component. All data messages that were intended for that
 
                    // port also needs to be transferred.
 
                    let new_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        let mut message_idx = 0;
 
                        while message_idx < scheduled.ctx_fancy.inbox_messages.len() {
 
                            let message = &scheduled.ctx_fancy.inbox_messages[message_idx];
 
                        while message_idx < scheduled.ctx.inbox_messages.len() {
 
                            let message = &scheduled.ctx.inbox_messages[message_idx];
 
                            if Self::get_message_target_port(message) == Some(port_id) {
 
                                // Need to transfer this message
 
                                let message = scheduled.ctx_fancy.inbox_messages.remove(message_idx);
 
                                new_connector.ctx_fancy.inbox_messages.push(message);
 
                                let message = scheduled.ctx.inbox_messages.remove(message_idx);
 
                                new_connector.ctx.inbox_messages.push(message);
 
                            } else {
 
                                message_idx += 1;
 
                            }
 
                        }
 

	
 
                        // Transfer the port itself
 
                        let port_index = scheduled.ctx_fancy.ports.iter()
 
                        let port_index = scheduled.ctx.ports.iter()
 
                            .position(|v| v.self_id == port_id)
 
                            .unwrap();
 
                        let port = scheduled.ctx_fancy.ports.remove(port_index);
 
                        new_connector.ctx_fancy.ports.push(port.clone());
 
                        let port = scheduled.ctx.ports.remove(port_index);
 
                        new_connector.ctx.ports.push(port.clone());
 

	
 
                        // Notify the peer that the port has changed
 
                        let reroute_message = scheduled.router.prepare_reroute(
 
                            port.self_id, port.peer_id, scheduled.ctx_fancy.id,
 
                            port.peer_connector, new_connector.ctx_fancy.id
 
                            port.self_id, port.peer_id, scheduled.ctx.id,
 
                            port.peer_connector, new_connector.ctx.id
 
                        );
 

	
 
                        self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message));
 
                        self.runtime.send_message(port.peer_connector, Message::Control(reroute_message));
 
                    }
 

	
 
                    // Schedule new connector to run
 
                    self.runtime.push_work(new_key);
 
                },
 
                ComponentStateChange::CreatedPort(port) => {
 
                    scheduled.ctx_fancy.ports.push(port);
 
                    scheduled.ctx.ports.push(port);
 
                },
 
                ComponentStateChange::ChangedPort(port_change) => {
 
                    if port_change.is_acquired {
 
                        scheduled.ctx_fancy.ports.push(port_change.port);
 
                        scheduled.ctx.ports.push(port_change.port);
 
                    } else {
 
                        let index = scheduled.ctx_fancy.ports
 
                        let index = scheduled.ctx.ports
 
                            .iter()
 
                            .position(|v| v.self_id == port_change.port.self_id)
 
                            .unwrap();
 
                        scheduled.ctx_fancy.ports.remove(index);
 
                        scheduled.ctx.ports.remove(index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx_fancy.changed_in_sync {
 
            if scheduled.ctx_fancy.is_in_sync {
 
        if scheduled.ctx.changed_in_sync {
 
            if scheduled.ctx.is_in_sync {
 
                // Just entered sync region
 
            } else {
 
                // Just left sync region. So clear inbox
 
                scheduled.ctx_fancy.inbox_messages.clear();
 
                scheduled.ctx_fancy.inbox_len_read = 0;
 
                scheduled.ctx.inbox_messages.clear();
 
                scheduled.ctx.inbox_len_read = 0;
 
            }
 

	
 
            scheduled.ctx_fancy.changed_in_sync = false; // reset flag
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
        }
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.ctx_fancy.id.0);
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.0);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
        connector.public.sleeping.store(true, Ordering::Release);
 

	
 
        // But due to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up (needed because someone might be trying
 
            // the exact same atomic compare-and-swap at this point in time)
 
            let should_wake_up_again = connector.public.sleeping
0 comments (0 inline, 0 general)