Changeset - 5664651bdca4
[Not reviewed]
MH - 4 years ago 2021-11-11 11:13:02
contact@maxhenger.nl
small fix to reduce number of compiler warnings
3 files changed with 35 insertions and 35 deletions:
0 comments (0 inline, 0 general)
src/lib.rs
Show inline comments
 
@@ -4,7 +4,7 @@ mod macros;
 
mod common;
 
mod protocol;
 
mod runtime;
 
mod runtime2;
 
pub mod runtime2;
 
mod collections;
 

	
 
pub use common::{ConnectorId, EndpointPolarity, Payload, Polarity, PortId};
src/runtime2/mod.rs
Show inline comments
 
@@ -87,7 +87,7 @@ impl Connector for ConnectorVariant {
 

	
 
pub(crate) struct ScheduledConnector {
 
    pub connector: ConnectorVariant, // access by connector
 
    pub ctx_fancy: ComponentCtx,
 
    pub ctx: ComponentCtx,
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: ControlMessageHandler,
 
    pub shutting_down: bool,
 
@@ -402,7 +402,7 @@ impl ConnectorStore {
 
    fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey {
 
        let mut connector = ScheduledConnector {
 
            connector,
 
            ctx_fancy: ComponentCtx::new_empty(),
 
            ctx: ComponentCtx::new_empty(),
 
            public: ConnectorPublic::new(initially_sleeping),
 
            router: ControlMessageHandler::new(),
 
            shutting_down: false,
 
@@ -415,7 +415,7 @@ impl ConnectorStore {
 
            // No free entries, allocate new entry
 
            index = self.connectors.len();
 
            key = ConnectorKey{ index: index as u32 };
 
            connector.ctx_fancy.id = key.downcast();
 
            connector.ctx.id = key.downcast();
 

	
 
            let connector = Box::into_raw(Box::new(connector));
 
            self.connectors.push(connector);
 
@@ -423,7 +423,7 @@ impl ConnectorStore {
 
            // Free spot available
 
            index = self.free.pop().unwrap();
 
            key = ConnectorKey{ index: index as u32 };
 
            connector.ctx_fancy.id = key.downcast();
 
            connector.ctx.id = key.downcast();
 

	
 
            unsafe {
 
                let target = self.connectors.get_mut(index);
src/runtime2/scheduler.rs
Show inline comments
 
@@ -68,7 +68,7 @@ impl Scheduler {
 
                } else {
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx_fancy);
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx);
 
                    self.debug_conn(connector_id, "Finished running");
 

	
 
                    // Handle all of the output from the current run: messages to
 
@@ -98,7 +98,7 @@ impl Scheduler {
 
                    // Prepare for exit. Set the shutdown flag and broadcast
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.ctx_fancy.ports {
 
                    for port in &scheduled.ctx.ports {
 
                        if port.state != PortState::Closed {
 
                            let message = scheduled.router.prepare_closing_channel(
 
                                port.self_id, port.peer_id,
 
@@ -123,7 +123,7 @@ impl Scheduler {
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx_fancy.id;
 
        let connector_id = scheduled.ctx.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check if the message has to be rerouted because we have moved the
 
@@ -145,14 +145,14 @@ impl Scheduler {
 
                    match message.content {
 
                        ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                            // Need to change port target
 
                            let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap();
 
                            let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                            port.peer_connector = new_target_connector_id;
 

	
 
                            // Note: for simplicity we program the scheduler to always finish
 
                            // running a connector with an empty outbox. If this ever changes
 
                            // then accepting the "port peer changed" message implies we need
 
                            // to change the recipient of the message in the outbox.
 
                            debug_assert!(scheduled.ctx_fancy.outbox.is_empty());
 
                            debug_assert!(scheduled.ctx.outbox.is_empty());
 

	
 
                            // And respond with an Ack
 
                            let ack_message = Message::Control(ControlMessage {
 
@@ -165,7 +165,7 @@ impl Scheduler {
 
                        },
 
                        ControlContent::CloseChannel(port_id) => {
 
                            // Mark the port as being closed
 
                            let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap();
 
                            let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                            port.state = PortState::Closed;
 

	
 
                            // Send an Ack
 
@@ -185,7 +185,7 @@ impl Scheduler {
 
                },
 
                _ => {
 
                    // All other cases have to be handled by the component
 
                    scheduled.ctx_fancy.inbox_messages.push(message);
 
                    scheduled.ctx.inbox_messages.push(message);
 
                }
 
            }
 
        }
 
@@ -196,17 +196,17 @@ impl Scheduler {
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx_fancy.id;
 
        let connector_id = scheduled.ctx.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(message) = scheduled.ctx_fancy.outbox.pop_front() {
 
        while let Some(message) = scheduled.ctx.outbox.pop_front() {
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message));
 

	
 
            let target_component_id = match &message {
 
                Message::Data(content) => {
 
                    // Data messages are always sent to a particular port, and
 
                    // may end up being rerouted.
 
                    let port_desc = scheduled.ctx_fancy.get_port_by_id(content.data_header.sending_port).unwrap();
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.data_header.target_port);
 

	
 
                    if port_desc.state == PortState::Closed {
 
@@ -230,7 +230,7 @@ impl Scheduler {
 
            self.runtime.send_message(target_component_id, message);
 
        }
 

	
 
        while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() {
 
        while let Some(state_change) = scheduled.ctx.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Creating a new component. The creator needs to relinquish
 
@@ -243,28 +243,28 @@ impl Scheduler {
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        let mut message_idx = 0;
 
                        while message_idx < scheduled.ctx_fancy.inbox_messages.len() {
 
                            let message = &scheduled.ctx_fancy.inbox_messages[message_idx];
 
                        while message_idx < scheduled.ctx.inbox_messages.len() {
 
                            let message = &scheduled.ctx.inbox_messages[message_idx];
 
                            if Self::get_message_target_port(message) == Some(port_id) {
 
                                // Need to transfer this message
 
                                let message = scheduled.ctx_fancy.inbox_messages.remove(message_idx);
 
                                new_connector.ctx_fancy.inbox_messages.push(message);
 
                                let message = scheduled.ctx.inbox_messages.remove(message_idx);
 
                                new_connector.ctx.inbox_messages.push(message);
 
                            } else {
 
                                message_idx += 1;
 
                            }
 
                        }
 

	
 
                        // Transfer the port itself
 
                        let port_index = scheduled.ctx_fancy.ports.iter()
 
                        let port_index = scheduled.ctx.ports.iter()
 
                            .position(|v| v.self_id == port_id)
 
                            .unwrap();
 
                        let port = scheduled.ctx_fancy.ports.remove(port_index);
 
                        new_connector.ctx_fancy.ports.push(port.clone());
 
                        let port = scheduled.ctx.ports.remove(port_index);
 
                        new_connector.ctx.ports.push(port.clone());
 

	
 
                        // Notify the peer that the port has changed
 
                        let reroute_message = scheduled.router.prepare_reroute(
 
                            port.self_id, port.peer_id, scheduled.ctx_fancy.id,
 
                            port.peer_connector, new_connector.ctx_fancy.id
 
                            port.self_id, port.peer_id, scheduled.ctx.id,
 
                            port.peer_connector, new_connector.ctx.id
 
                        );
 

	
 
                        self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message));
 
@@ -275,38 +275,38 @@ impl Scheduler {
 
                    self.runtime.push_work(new_key);
 
                },
 
                ComponentStateChange::CreatedPort(port) => {
 
                    scheduled.ctx_fancy.ports.push(port);
 
                    scheduled.ctx.ports.push(port);
 
                },
 
                ComponentStateChange::ChangedPort(port_change) => {
 
                    if port_change.is_acquired {
 
                        scheduled.ctx_fancy.ports.push(port_change.port);
 
                        scheduled.ctx.ports.push(port_change.port);
 
                    } else {
 
                        let index = scheduled.ctx_fancy.ports
 
                        let index = scheduled.ctx.ports
 
                            .iter()
 
                            .position(|v| v.self_id == port_change.port.self_id)
 
                            .unwrap();
 
                        scheduled.ctx_fancy.ports.remove(index);
 
                        scheduled.ctx.ports.remove(index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx_fancy.changed_in_sync {
 
            if scheduled.ctx_fancy.is_in_sync {
 
        if scheduled.ctx.changed_in_sync {
 
            if scheduled.ctx.is_in_sync {
 
                // Just entered sync region
 
            } else {
 
                // Just left sync region. So clear inbox
 
                scheduled.ctx_fancy.inbox_messages.clear();
 
                scheduled.ctx_fancy.inbox_len_read = 0;
 
                scheduled.ctx.inbox_messages.clear();
 
                scheduled.ctx.inbox_len_read = 0;
 
            }
 

	
 
            scheduled.ctx_fancy.changed_in_sync = false; // reset flag
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
        }
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.ctx_fancy.id.0);
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.0);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
0 comments (0 inline, 0 general)