Changeset - ab8066658334
[Not reviewed]
0 4 0
mh - 4 years ago 2021-10-25 11:39:31
contact@maxhenger.nl
WIP on shutting down connectors
4 files changed with 124 insertions and 40 deletions:
0 comments (0 inline, 0 general)
src/runtime2/inbox.rs
Show inline comments
 
@@ -187,6 +187,7 @@ pub struct ControlMessage {
 
#[derive(Debug, Clone)]
 
pub enum ControlMessageVariant {
 
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
 
    CloseChannel(PortIdLocal), // close the port associated with this
 
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
 
}
 

	
src/runtime2/mod.rs
Show inline comments
 
@@ -99,7 +99,6 @@ pub(crate) struct ScheduledConnector {
 
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
 
    pub router: ControlMessageHandler,
 
    pub shutting_down: bool,
 
    pub pending_acks: u32,
 
}
 

	
 
// -----------------------------------------------------------------------------
src/runtime2/port.rs
Show inline comments
 
@@ -27,14 +27,22 @@ pub enum PortKind {
 
    Getter,
 
}
 

	
 
/// Represents a port inside of the runtime. May be without owner if it is
 
/// created by the application interfacing with the runtime, instead of being
 
/// created by a connector.
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum PortState {
 
    Open,
 
    Closed,
 
}
 

	
 
/// Represents a port inside of the runtime. This is generally the local view of
 
/// a connector on its port, which may not be consistent with the rest of the
 
/// global system (e.g. its peer was moved to a new connector, or the peer might
 
/// have died in the meantime, so it is no longer usable).
 
pub struct Port {
 
    pub self_id: PortIdLocal,
 
    pub peer_id: PortIdLocal,
 
    pub kind: PortKind,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase.
 
    pub state: PortState,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase
 
}
 

	
 

	
src/runtime2/scheduler.rs
Show inline comments
 
@@ -2,7 +2,7 @@ use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 

	
 
use super::{RuntimeInner, ConnectorId, ConnectorKey};
 
use super::port::{Port, PortIdLocal};
 
use super::port::{Port, PortState, PortIdLocal};
 
use super::native::Connector;
 
use super::connector::{ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, MessageContents, ControlMessageVariant, ControlMessage};
 
@@ -87,6 +87,7 @@ impl Scheduler {
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector_id = connector_key.downcast();
 
            println!("DEBUG [{}]: ... Got work, running {}", scheduler_id, connector_key.index);
 

	
 
            let scheduled = self.runtime.get_component_private(&connector_key);
 
@@ -119,10 +120,11 @@ impl Scheduler {
 
                                // messages in the outbox that send to the port whose owning
 
                                // connector we just changed. This is because the `ack` will
 
                                // clear the rerouting entry of the `ack`-receiver.
 
                                self.send_message_and_wake_up_if_sleeping(
 
                                // TODO: Question from Max from the past: what the hell did you mean?
 
                                self.runtime.send_message(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_key.downcast(),
 
                                        sending_connector: connector_id,
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
@@ -131,6 +133,25 @@ impl Scheduler {
 
                                    }
 
                                );
 
                            },
 
                            ControlMessageVariant::CloseChannel(port_id) => {
 
                                // Mark the port as being closed
 
                                let port = scheduled.context.get_port_mut(port_id);
 
                                port.state = PortState::Closed;
 

	
 
                                // Send an Ack
 
                                self.runtime.send_message(
 
                                    message.sending_connector,
 
                                    Message{
 
                                        sending_connector: connector_id,
 
                                        receiving_port: PortIdLocal::new_invalid(),
 
                                        contents: MessageContents::Control(ControlMessage{
 
                                            id: content.id,
 
                                            content: ControlMessageVariant::Ack,
 
                                        }),
 
                                    }
 
                                );
 

	
 
                            },
 
                            ControlMessageVariant::Ack => {
 
                                scheduled.router.handle_ack(content.id);
 
                            }
 
@@ -141,19 +162,33 @@ impl Scheduler {
 
                    }
 
                }
 

	
 
                // Actually run the connector
 
                println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index);
 
                let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                let new_schedule = scheduled.connector.run(
 
                    scheduler_ctx, &scheduled.context, &mut delta_state
 
                );
 
                println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index);
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
                if scheduled.shutting_down {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
                        cur_schedule = ConnectorScheduling::NotNow;
 
                    }
 
                } else {
 
                    println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index);
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(
 
                        scheduler_ctx, &scheduled.context, &mut delta_state
 
                    );
 
                    println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index);
 

	
 
                // Handle all of the output from the current run: messages to
 
                // send and connectors to instantiate.
 
                self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state);
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state);
 

	
 
                cur_schedule = new_schedule;
 
                    cur_schedule = new_schedule;
 
                }
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
@@ -190,14 +225,11 @@ impl Scheduler {
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.context.ports {
 
                        self.runtime.send_message(port.peer_connector, Message{
 
                            sending_connector: connector_key.downcast(),
 
                            receiving_port: port.peer_id,
 
                            contents: MessageContents::Control(ControlMessage{
 
                                id: 0,
 
                                content: ControlMessageVariant::Ack
 
                            })
 
                        })
 
                        let message = scheduled.router.prepare_closing_channel(
 
                            port.self_id, port.peer_id,
 
                            connector_id
 
                        );
 
                        self.runtime.send_message(port.peer_connector, message);
 
                    }
 
                }
 
            }
 
@@ -316,7 +348,8 @@ struct ControlChangedPort {
 
}
 

	
 
struct ControlClosedChannel {
 

	
 
    source_port: PortIdLocal,
 
    target_port: PortIdLocal,
 
}
 

	
 
pub(crate) struct ControlMessageHandler {
 
@@ -332,6 +365,32 @@ impl ControlMessageHandler {
 
        }
 
    }
 

	
 
    /// Prepares a message indicating that a channel has closed, we keep a local
 
    /// entry to match against the (hopefully) returned `Ack` message.
 
    pub fn prepare_closing_channel(
 
        &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: connectorId
 
    ) -> Message {
 
        let id = self.take_id();
 

	
 
        self.active.push(ControlEntry{
 
            id,
 
            variant: ControlVariant::ClosedChannel(ControlClosedChannel{
 
                source_port: self_port_id,
 
                target_port: peer_port_id,
 
            }),
 
        });
 

	
 
        return Message{
 
            sending_connector: self_connector_id,
 
            receiving_port: peer_port_id,
 
            contents: MessageContents::Control(ControlMessage{
 
                id,
 
                content: ControlMessageVariant::CloseChannel(peer_port_id),
 
            }),
 
        };
 
    }
 

	
 
    /// Prepares rerouting messages due to changed ownership of a port. The
 
    /// control message returned by this function must be sent to the
 
    /// transferred port's peer connector.
 
@@ -341,15 +400,15 @@ impl ControlMessageHandler {
 
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
 
        new_owner_connector_id: ConnectorId
 
    ) -> Message {
 
        let id = self.id_counter;
 
        let (new_id_counter, _) = self.id_counter.overflowing_add(1);
 
        self.id_counter = new_id_counter;
 
        let id = self.take_id();
 

	
 
        self.active.push(ReroutedTraffic{
 
        self.active.push(ControlEntry{
 
            id,
 
            target_port: port_id,
 
            source_connector: peer_connector_id,
 
            target_connector: new_owner_connector_id,
 
            variant: ControlVariant::ChangedPort(ControlChangedPort{
 
                target_port: port_id,
 
                source_connector: peer_connector_id,
 
                target_connector: new_owner_connector_id,
 
            }),
 
        });
 

	
 
        return Message{
 
@@ -365,11 +424,13 @@ impl ControlMessageHandler {
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for reroute in &self.active {
 
            if reroute.source_connector == sending_connector &&
 
                reroute.target_port == target_port {
 
                // Need to reroute this message
 
                return Some(reroute.target_connector);
 
        for entry in &self.active {
 
            if let ControlVariant::ChangedPort(entry) = entry {
 
                if entry.source_connector == sending_connector &&
 
                    entry.target_port == target_port {
 
                    // Need to reroute this message
 
                    return Some(entry.target_connector);
 
                }
 
            }
 
        }
 

	
 
@@ -386,4 +447,19 @@ impl ControlMessageHandler {
 
            None => { todo!("handling of nefarious ACKs"); },
 
        }
 
    }
 

	
 
    /// Retrieves the number of responses we still expect to receive from our
 
    /// peers
 
    #[inline]
 
    pub fn num_pending_acks(&self) -> usize {
 
        return self.active.len();
 
    }
 

	
 
    fn take_id(&mut self) -> u32 {
 
        let generated_id = self.id_counter;
 
        let (new_id, _) = self.id_counter.overflowing_add(1);
 
        self.id_counter = new_id;
 

	
 
        return generated_id;
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)