Changeset - 9ab53cf39342
[Not reviewed]
0 2 0
MH - 4 years ago 2021-11-29 11:30:44
contact@maxhenger.nl
Replace rerouting with lock-step confirmation
2 files changed with 123 insertions and 71 deletions:
0 comments (0 inline, 0 general)
src/runtime2/scheduler.rs
Show inline comments
 
@@ -206,9 +206,8 @@ impl Scheduler {
 
                        self.runtime.send_message(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::Ack => {
 
                        if let Some((target_component, new_control_message)) = scheduled.router.handle_ack(connector_id, message.id) {
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [ack ack] \n --- {:?}", target_component, new_control_message));
 
                            self.runtime.send_message(target_component, new_control_message);
 
                        if let Some(component_key) = scheduled.router.handle_ack(message.id) {
 
                            self.runtime.push_work(component_key);
 
                        };
 
                    },
 
                    ControlContent::Ping => {},
 
@@ -303,13 +302,14 @@ impl Scheduler {
 
        while let Some(state_change) = scheduled.ctx.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Creating a new component. The creator needs to relinquish
 
                    // ownership of the ports that are given to the new
 
                    // component. All data messages that were intended for that
 
                    // port also needs to be transferred.
 
                    let new_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                    // Creating a new component. Need to relinquish control of
 
                    // the ports.
 
                    let new_component_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_component_key);
 

	
 
                    // First pass: transfer ports and the associated messages,
 
                    // also count the number of ports that have peers
 
                    let mut num_peers = 0;
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        scheduled.ctx.inbox.transfer_messages_for_port(port_id, &mut new_connector.ctx.inbox);
 
@@ -321,23 +321,32 @@ impl Scheduler {
 
                        let port = scheduled.ctx.ports.remove(port_index);
 
                        new_connector.ctx.ports.push(port.clone());
 

	
 
                        // Notify the peer that the port has changed, but only
 
                        // if the port wasn't already closed (otherwise the peer
 
                        // is gone).
 
                        if port.state == PortState::Open {
 
                            let reroute_message = scheduled.router.prepare_reroute(
 
                                port.self_id, port.peer_id, scheduled.ctx.id,
 
                                port.peer_connector, new_connector.ctx.id,
 
                                &mut new_connector.router
 
                            );
 

	
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [newcon]\n --- {:?}", port.peer_connector, reroute_message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(reroute_message));
 
                            num_peers += 1;
 
                        }
 
                    }
 

	
 
                    // Schedule new connector to run
 
                    self.runtime.push_work(new_key);
 
                    if num_peers == 0 {
 
                        // No peers to notify, so just schedule the component
 
                        self.runtime.push_work(new_component_key);
 
                    } else {
 
                        // Some peers to notify
 
                        let new_component_id = new_component_key.downcast();
 
                        let control_id = scheduled.router.prepare_new_component(new_component_key);
 
                        for port in new_connector.ctx.ports.iter() {
 
                            if port.state == PortState::Closed {
 
                                continue;
 
                            }
 

	
 
                            let control_message = scheduled.router.prepare_changed_port_peer(
 
                                control_id, scheduled.ctx.id,
 
                                port.peer_connector, port.peer_id,
 
                                new_component_id, port.self_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [newcom]\n --- {:#?}", port.peer_connector, control_message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(control_message));
 
                        }
 
                    }
 
                },
 
                ComponentStateChange::CreatedPort(port) => {
 
                    scheduled.ctx.ports.push(port);
 
@@ -854,16 +863,32 @@ struct ControlEntry {
 
}
 

	
 
enum ControlVariant {
 
    NewComponent(ControlNewComponent),
 
    ChangedPort(ControlChangedPort),
 
    ClosedChannel(ControlClosedChannel),
 
    ReroutePending,
 
}
 

	
 
impl ControlVariant {
 
    fn as_new_component_mut(&mut self) -> &mut ControlNewComponent {
 
        match self {
 
            ControlVariant::NewComponent(v) => v,
 
            _ => unreachable!(),
 
        }
 
    }
 
}
 

	
 
/// Entry for a new component waiting for execution after all of its peers have
 
/// confirmed the `ControlChangedPort` messages.
 
struct ControlNewComponent {
 
    num_acks_pending: u32,          // if it hits 0, we schedule the component
 
    component_key: ConnectorKey,    // this is the component we schedule
 
}
 

	
 
struct ControlChangedPort {
 
    target_port: PortIdLocal,       // if send to this port, then reroute
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector we need to reroute to
 
    id_of_ack_after_confirmation: u32, // control message ID we need to send to the target upon receiving an ack
 
    reroute_if_sent_to_this_port: PortIdLocal, // if sent to this port, then reroute
 
    source_connector: ConnectorId,             // connector we expect messages from
 
    target_connector: ConnectorId,             // connector we need to reroute to
 
    new_component_entry_id: u32,               // if Ack'd, we reduce the counter on this `ControlNewComponent` entry
 
}
 

	
 
struct ControlClosedChannel {
 
@@ -907,37 +932,49 @@ impl ControlMessageHandler {
 
        };
 
    }
 

	
 
    /// Prepares rerouting messages due to changed ownership of a port. The
 
    /// control message returned by this function must be sent to the
 
    /// transferred port's peer connector.
 
    pub fn prepare_reroute(
 
        &mut self,
 
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
 
        new_owner_connector_id: ConnectorId, new_owner_ctrl_handler: &mut ControlMessageHandler,
 
    ) -> ControlMessage {
 
    /// Prepares a control entry for a new component. This returns the id of
 
    /// the entry for calls to `prepare_changed_port_peer`. Don't call this
 
    /// function if the component has no peers that need to be messaged.
 
    pub fn prepare_new_component(&mut self, component_key: ConnectorKey) -> u32 {
 
        let id = self.take_id();
 

	
 
        let new_owner_id = new_owner_ctrl_handler.take_id();
 
        self.active.push(ControlEntry{
 
            id,
 
            variant: ControlVariant::ChangedPort(ControlChangedPort{
 
                target_port: port_id,
 
                source_connector: peer_connector_id,
 
                target_connector: new_owner_connector_id,
 
                id_of_ack_after_confirmation: new_owner_id,
 
            variant: ControlVariant::NewComponent(ControlNewComponent{
 
                num_acks_pending: 0,
 
                component_key,
 
            }),
 
        });
 

	
 
        new_owner_ctrl_handler.active.push(ControlEntry{
 
            id: new_owner_id,
 
            variant: ControlVariant::ReroutePending,
 
        return id;
 
    }
 

	
 
    pub fn prepare_changed_port_peer(
 
        &mut self, new_component_entry_id: u32, creating_component_id: ConnectorId,
 
        changed_component_id: ConnectorId, changed_port_id: PortIdLocal,
 
        new_target_component_id: ConnectorId, new_target_port_id: PortIdLocal
 
    ) -> ControlMessage {
 
        // Add the peer-changed entry
 
        let change_port_entry_id = self.take_id();
 
        self.active.push(ControlEntry{
 
            id: change_port_entry_id,
 
            variant: ControlVariant::ChangedPort(ControlChangedPort{
 
                reroute_if_sent_to_this_port: new_target_port_id,
 
                source_connector: changed_component_id,
 
                target_connector: new_target_component_id,
 
                new_component_entry_id,
 
            })
 
        });
 

	
 
        return ControlMessage {
 
            id,
 
            sending_component_id: self_connector_id,
 
            content: ControlContent::PortPeerChanged(peer_port_id, new_owner_connector_id),
 
        // Increment counter on "new component" entry
 
        let position = self.position(new_component_entry_id).unwrap();
 
        let new_component_entry = &mut self.active[position];
 
        let new_component_entry = new_component_entry.variant.as_new_component_mut();
 
        new_component_entry.num_acks_pending += 1;
 

	
 
        return ControlMessage{
 
            id: change_port_entry_id,
 
            sending_component_id: creating_component_id,
 
            content: ControlContent::PortPeerChanged(changed_port_id, new_target_component_id),
 
        };
 
    }
 

	
 
@@ -946,7 +983,7 @@ impl ControlMessageHandler {
 
    pub fn should_reroute(&self, target_port: PortIdLocal) -> Option<ConnectorId> {
 
        for entry in &self.active {
 
            if let ControlVariant::ChangedPort(entry) = &entry.variant {
 
                if entry.target_port == target_port {
 
                if entry.reroute_if_sent_to_this_port == target_port {
 
                    // Need to reroute this message
 
                    return Some(entry.target_connector);
 
                }
 
@@ -958,26 +995,36 @@ impl ControlMessageHandler {
 

	
 
    /// Handles an Ack as an answer to a previously sent control message.
 
    /// Handling an Ack might spawn a new message that needs to be sent.
 
    pub fn handle_ack(&mut self, handler_component_id: ConnectorId, id: u32) -> Option<(ConnectorId, Message)> {
 
        let index = self.active.iter()
 
            .position(|v| v.id == id);
 
    pub fn handle_ack(&mut self, id: u32) -> Option<ConnectorKey> {
 
        let index = self.position(id);
 

	
 
        match index {
 
            Some(index) => {
 
                let removed = self.active.remove(index);
 
                match removed.variant {
 
                // Remove the entry. If `ChangedPort`, then retrieve associated
 
                // `NewComponent`. Otherwise: early exits
 
                let removed_entry = self.active.remove(index);
 
                let new_component_idx = match removed_entry.variant {
 
                    ControlVariant::ChangedPort(message) => {
 
                        return Some((
 
                            message.target_connector,
 
                            Message::Control(ControlMessage{
 
                                id: message.id_of_ack_after_confirmation,
 
                                sending_component_id: handler_component_id,
 
                                content: ControlContent::Ack
 
                            })
 
                        ));
 
                        self.position(message.new_component_entry_id).unwrap()
 
                    },
 
                    _ => return None,
 
                };
 

	
 
                // Decrement counter, if 0, then schedule component
 
                let new_component_entry = self.active[new_component_idx].variant.as_new_component_mut();
 
                new_component_entry.num_acks_pending -= 1;
 
                if new_component_entry.num_acks_pending != 0 {
 
                    return None;
 
                }
 

	
 
                // Return component key for scheduling
 
                let new_component_entry = self.active.remove(new_component_idx);
 
                let new_component_entry = match new_component_entry.variant {
 
                    ControlVariant::NewComponent(entry) => entry,
 
                    _ => unreachable!(),
 
                };
 

	
 
                return Some(new_component_entry.component_key);
 
            },
 
            None => {
 
                todo!("handling of nefarious ACKs");
 
@@ -1000,4 +1047,9 @@ impl ControlMessageHandler {
 

	
 
        return generated_id;
 
    }
 

	
 
    #[inline]
 
    fn position(&self, id: u32) -> Option<usize> {
 
        return self.active.iter().position(|v| v.id == id);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -11,13 +11,13 @@ use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
// pub(crate) const NUM_THREADS: u32 =  8;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 250;  // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 10;       // number of loops within a single test (not used by all tests)
 
pub(crate) const NUM_THREADS: u32 =  8;     // number of threads in runtime
 
pub(crate) const NUM_INSTANCES: u32 = 1500; // number of test instances constructed
 
pub(crate) const NUM_LOOPS: u32 = 10;       // number of loops within a single test (not used by all tests)
 

	
 
pub(crate) const NUM_THREADS: u32 = 6;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 15;
 
// pub(crate) const NUM_THREADS: u32 = 6;
 
// pub(crate) const NUM_INSTANCES: u32 = 1;
 
// pub(crate) const NUM_LOOPS: u32 = 15;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
0 comments (0 inline, 0 general)