Changeset - 6f608e19fa8a
[Not reviewed]
0 5 0
MH - 4 years ago 2021-11-22 18:52:41
contact@maxhenger.nl
Fix part of the error-handling code
5 files changed with 19 insertions and 10 deletions:
0 comments (0 inline, 0 general)
src/runtime2/consensus.rs
Show inline comments
 
@@ -257,193 +257,196 @@ impl Consensus {
 

	
 
        for port in source_mapping {
 
            // Note: if the port is silent, and we've never communicated
 
            // over the port, then we need to do so now, to let the peer
 
            // component know about our sync leader state.
 
            let port_desc = ctx.get_port_by_channel_id(port.channel_id).unwrap();
 
            let self_port_id = port_desc.self_id;
 
            let peer_port_id = port_desc.peer_id;
 
            let channel_id = port_desc.channel_id;
 

	
 
            if !self.encountered_ports.contains(&self_port_id) {
 
                ctx.submit_message(Message::SyncPort(SyncPortMessage {
 
                    sync_header: SyncHeader{
 
                        sending_component_id: ctx.id,
 
                        highest_component_id: self.highest_connector_id,
 
                        sync_round: self.sync_round
 
                    },
 
                    source_port: self_port_id,
 
                    target_port: peer_port_id,
 
                    content: SyncPortContent::SilentPortNotification,
 
                }));
 
                self.encountered_ports.push(self_port_id);
 
            }
 

	
 
            target_mapping.push((
 
                channel_id,
 
                port.registered_id.unwrap_or(BranchMarker::new_invalid())
 
            ));
 
        }
 

	
 
        let local_solution = LocalSolution{
 
            component: ctx.id,
 
            final_branch_id: branch_id,
 
            port_mapping: target_mapping,
 
        };
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx);
 
        return maybe_conclusion;
 
    }
 

	
 
    /// Notifies the consensus algorithm about the chosen branch to commit to
 
    /// memory (may be the invalid "start" branch)
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<ComponentPortChange>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 

	
 
        // Clear out internal storage to defaults
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.branch_markers.clear();
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 
        self.handled_wave = false;
 
        self.conclusion = None;
 
        self.ack_remaining = 0;
 

	
 
        // And modify persistent storage
 
        self.sync_round += 1;
 

	
 
        for peer in self.peers.iter_mut() {
 
            peer.encountered_this_round = false;
 
            peer.expected_sync_round += 1;
 
        }
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 

	
 
        if cfg!(debug_assertions) {
 
            // Check for consistent mapping
 
            let port = branch.channel_mapping.iter()
 
                .find(|v| v.channel_id == port_info.channel_id)
 
                .unwrap();
 
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
 
        }
 

	
 
        // Check for ports that are being sent
 
        debug_assert!(self.workspace_ports.is_empty());
 
        find_ports_in_value_group(content, &mut self.workspace_ports);
 
        if !self.workspace_ports.is_empty() {
 
            todo!("handle sending ports");
 
            self.workspace_ports.clear();
 
        }
 

	
 
        // Construct data header
 
        // TODO: Handle multiple firings. Right now we just assign the current
 
        //  branch to the `None` value because we know we can only send once.
 
        let data_header = DataHeader{
 
            expected_mapping: branch.channel_mapping.clone(),
 
            expected_mapping: branch.channel_mapping.iter()
 
                .filter(|v| v.registered_id.is_some() || v.channel_id == port_info.channel_id)
 
                .copied()
 
                .collect(),
 
            sending_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch.cur_marker,
 
        };
 

	
 
        // Update port mapping
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == port_info.channel_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch.cur_marker);
 
            }
 
        }
 

	
 
        // Update branch marker
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        branch.cur_marker = new_marker;
 
        self.branch_markers.push(branch_id);
 

	
 
        self.encountered_ports.push(source_port_id);
 

	
 
        return (self.create_sync_header(ctx), data_header);
 
    }
 

	
 
    /// Handles a new data message by handling the sync header. The caller is
 
    /// responsible for checking for branches that might be able to receive
 
    /// the message.
 
    pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool {
 
        let handled = self.handle_received_sync_header(&message.sync_header, ctx);
 
        if handled {
 
            self.encountered_ports.push(message.data_header.target_port);
 
        }
 
        return handled;
 
    }
 

	
 
    /// Handles a new sync message by handling the sync header and the contents
 
    /// of the message. Returns `Some` with the branch ID of the global solution
 
    /// if the sync solution has been found.
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if !self.handle_received_sync_header(&message.sync_header, ctx) {
 
            return None;
 
        }
 

	
 
        // And handle the contents
 
        debug_assert_eq!(message.target_component_id, ctx.id);
 

	
 
        match &message.content {
 
            SyncCompContent::LocalFailure |
 
            SyncCompContent::LocalSolution(_) |
 
            SyncCompContent::PartialSolution(_) |
 
            SyncCompContent::AckFailure |
 
            SyncCompContent::Presence(_, _) => {
 
                // Needs to be handled by the leader
 
                return self.send_to_leader_or_handle_as_leader(message.content, ctx);
 
            },
 
            SyncCompContent::GlobalSolution(solution) => {
 
                // Found a global solution
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let (_, branch_id) = solution.component_branches.iter()
 
                    .find(|(component_id, _)| *component_id == ctx.id)
 
                    .unwrap();
 
                return Some(RoundConclusion::Success(*branch_id));
 
            },
 
            SyncCompContent::GlobalFailure => {
 
                // Global failure of round, send Ack to leader
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx);
 
                debug_assert!(_result.is_none());
 
                return Some(RoundConclusion::Failure);
 
            },
 
            SyncCompContent::Notification => {
 
                // We were just interested in the sync header we handled above
 
                return None;
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) {
 
        if !self.handle_received_sync_header(&message.sync_header, ctx) {
 
            return;
 
        }
 

	
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(ctx.get_port_by_id(message.target_port).is_some());
 
        match message.content {
 
            SyncPortContent::SilentPortNotification => {
 
                // The point here is to let us become part of the sync round and
 
                // take note of the leader in case all of our ports are silent.
 
                self.encountered_ports.push(message.target_port);
 
            }
 
            SyncPortContent::NotificationWave => {
 
                // Wave to discover everyone in the network, handling sync
 
                // header takes care of leader discovery, here we need to make
 
                // sure we propagate the wave
 
                if self.handled_wave {
 
                    return;
 
                }
src/runtime2/mod.rs
Show inline comments
 
@@ -338,141 +338,141 @@ impl RuntimeInner {
 

	
 
    #[inline]
 
    fn signal_for_shutdown(&self) {
 
        debug_assert_eq!(self.active_interfaces.load(Ordering::Acquire), 0);
 
        debug_assert_eq!(self.active_connectors.load(Ordering::Acquire), 0);
 

	
 
        let _lock = self.connector_queue.lock().unwrap();
 
        let should_signal = self.should_exit
 
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_signal {
 
            self.scheduler_notifier.notify_all();
 
        }
 
    }
 
}
 

	
 
// TODO: Come back to this at some point
 
unsafe impl Send for RuntimeInner {}
 
unsafe impl Sync for RuntimeInner {}
 

	
 
// -----------------------------------------------------------------------------
 
// ConnectorStore
 
// -----------------------------------------------------------------------------
 

	
 
struct ConnectorStore {
 
    // Freelist storage of connectors. Storage should be pointer-stable as
 
    // someone might be mutating the vector while we're executing one of the
 
    // connectors.
 
    connectors: RawVec<*mut ScheduledConnector>,
 
    free: Vec<usize>,
 
}
 

	
 
impl ConnectorStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        Self {
 
            connectors: RawVec::with_capacity(capacity),
 
            free: Vec::with_capacity(capacity),
 
        }
 
    }
 

	
 
    /// Retrieves public part of connector - accessible by many threads at once.
 
    fn get_public(&self, id: ConnectorId) -> &'static ConnectorPublic {
 
        unsafe {
 
            debug_assert!(!self.free.contains(&(id.0 as usize)));
 
            let connector = self.connectors.get(id.0 as usize);
 
            debug_assert!(!connector.is_null());
 
            return &(**connector).public;
 
        }
 
    }
 

	
 
    /// Retrieves private part of connector - accessible by one thread at a
 
    /// time.
 
    fn get_private(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        unsafe {
 
            debug_assert!(!self.free.contains(&(key.index as usize)));
 
            let connector = self.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return &mut (**connector);
 
        }
 
    }
 

	
 
    /// Creates a new connector. Caller should ensure ports are set up correctly
 
    /// and the connector is queued for execution if needed.
 
    fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey {
 
        let mut connector = ScheduledConnector {
 
            connector,
 
            ctx: ComponentCtx::new_empty(),
 
            public: ConnectorPublic::new(initially_sleeping),
 
            router: ControlMessageHandler::new(),
 
            shutting_down: false,
 
        };
 

	
 
        let index;
 
        let key;
 

	
 
        if self.free.is_empty() {
 
            // No free entries, allocate new entry
 
            index = self.connectors.len();
 
            key = ConnectorKey{ index: index as u32 };
 
            connector.ctx.id = key.downcast();
 

	
 
            let connector = Box::into_raw(Box::new(connector));
 
            self.connectors.push(connector);
 
        } else {
 
            // Free spot available
 
            index = self.free.pop().unwrap();
 
            key = ConnectorKey{ index: index as u32 };
 
            connector.ctx.id = key.downcast();
 

	
 
            unsafe {
 
                let target = self.connectors.get_mut(index);
 
                std::ptr::write(*target, connector);
 
            }
 
        }
 

	
 
        println!("DEBUG [ global store  ] Created component at {}", key.index);
 
        // println!("DEBUG [ global store  ] Created component at {}", key.index);
 
        return key;
 
    }
 

	
 
    /// Destroys a connector. Caller should make sure it is not scheduled for
 
    /// execution. Otherwise one experiences "bad stuff" (tm).
 
    fn destroy(&mut self, key: ConnectorKey) {
 
        unsafe {
 
            let target = self.connectors.get_mut(key.index as usize);
 
            std::ptr::drop_in_place(*target);
 
            // Note: but not deallocating!
 
        }
 

	
 
        println!("DEBUG [ global store  ] Destroyed component at {}", key.index);
 
        // println!("DEBUG [ global store  ] Destroyed component at {}", key.index);
 
        self.free.push(key.index as usize);
 
    }
 
}
 

	
 
impl Drop for ConnectorStore {
 
    fn drop(&mut self) {
 
        // Everything in the freelist already had its destructor called, so only
 
        // has to be deallocated
 
        for free_idx in self.free.iter().copied() {
 
            unsafe {
 
                let memory = self.connectors.get_mut(free_idx);
 
                let layout = std::alloc::Layout::for_value(&**memory);
 
                std::alloc::dealloc(*memory as *mut u8, layout);
 

	
 
                // mark as null for the remainder
 
                *memory = std::ptr::null_mut();
 
            }
 
        }
 

	
 
        // With the deallocated stuff marked as null, clear the remainder that
 
        // is not null
 
        for idx in 0..self.connectors.len() {
 
            unsafe {
 
                let memory = *self.connectors.get_mut(idx);
 
                if !memory.is_null() {
 
                    let _ = Box::from_raw(memory); // take care of deallocation, bit dirty, but meh
 
                }
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
@@ -115,193 +115,193 @@ impl Scheduler {
 
                            self.runtime.send_message(port.peer_connector, Message::Control(message));
 
                        }
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // All ports (if any) already closed
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    }
 

	
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
            }
 
        }
 
    }
 

	
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message));
 
            if let Some(target_port) = Self::get_message_target_port(&message) {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 
                    self.runtime.send_message(other_component_id, message);
 
                    continue;
 
                }
 
            }
 

	
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
 

	
 
            match message {
 
                Message::Control(message) => {
 
                    match message.content {
 
                        ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                            // Need to change port target
 
                            let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                            port.peer_connector = new_target_connector_id;
 

	
 
                            // Note: for simplicity we program the scheduler to always finish
 
                            // running a connector with an empty outbox. If this ever changes
 
                            // then accepting the "port peer changed" message implies we need
 
                            // to change the recipient of the message in the outbox.
 
                            debug_assert!(scheduled.ctx.outbox.is_empty());
 

	
 
                            // And respond with an Ack
 
                            let ack_message = Message::Control(ControlMessage {
 
                                id: message.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
                            self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_component_id, ack_message);
 
                        },
 
                        ControlContent::CloseChannel(port_id) => {
 
                            // Mark the port as being closed
 
                            let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                            port.state = PortState::Closed;
 

	
 
                            // Send an Ack
 
                            let ack_message = Message::Control(ControlMessage {
 
                                id: message.id,
 
                                sending_component_id: connector_id,
 
                                content: ControlContent::Ack,
 
                            });
 
                            self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message));
 
                            self.runtime.send_message(message.sending_component_id, ack_message);
 
                        },
 
                        ControlContent::Ack => {
 
                            scheduled.router.handle_ack(message.id);
 
                        },
 
                        ControlContent::Ping => {},
 
                    }
 
                },
 
                _ => {
 
                    // All other cases have to be handled by the component
 
                    scheduled.ctx.inbox_messages.push(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
    /// Handles inbox messages while shutting down. This intends to handle the
 
    /// case where a component cleanly exited outside of a sync region, but a
 
    /// peer, before receiving the `CloseChannel` message, sent a message inside
 
    /// a sync region. This peer should be notified that its message is not
 
    /// received by a component in a sync region.
 
    fn handle_inbox_while_shutting_down(&mut self, scheduled: &mut ScheduledConnector) {
 
        // Note: we're not handling the public inbox, we're dealing with the
 
        // private one!
 
        debug_assert!(scheduled.shutting_down);
 
        while let Some(message) = scheduled.ctx.read_next_message() {
 
        while let Some(message) = scheduled.ctx.read_next_message_even_if_not_in_sync() {
 
            let target_port_and_round_number = match &message {
 
                Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)),
 
                Message::SyncComp(_) => None,
 
                Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)),
 
                Message::SyncControl(_) => None,
 
                Message::Control(_) => None,
 
            };
 

	
 
            if let Some((target_port, sync_round)) = target_port_and_round_number {
 
                // This message is aimed at a port, but we're shutting down, so
 
                // notify the peer that its was not received properly.
 
                // (also: since we're shutting down, we're not in sync mode and
 
                // the context contains the definitive set of owned ports)
 
                let port = scheduled.ctx.get_port_by_id(target_port).unwrap();
 
                let message = SyncControlMessage{
 
                    in_response_to_sync_round: sync_round,
 
                    target_component_id: port.peer_connector,
 
                    content: SyncControlContent::ChannelIsClosed(port.peer_id),
 
                };
 
                self.debug_conn(scheduled.ctx.id, &format!("Sending message [shutdown]\n --- {:?}", message));
 
                self.runtime.send_message(port.peer_connector, Message::SyncControl(message));
 
            }
 
        }
 
    }
 

	
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(message) = scheduled.ctx.outbox.pop_front() {
 
            self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:#?}", message));
 

	
 
            let target_component_id = match &message {
 
                Message::Data(content) => {
 
                    // Data messages are always sent to a particular port, and
 
                    // may end up being rerouted.
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.data_header.target_port);
 

	
 
                    if port_desc.state == PortState::Closed {
 
                        todo!("handle sending over a closed port")
 
                    }
 

	
 
                    port_desc.peer_connector
 
                },
 
                Message::SyncComp(content) => {
 
                    // Sync messages are always sent to a particular component,
 
                    // the sender must make sure it actually wants to send to
 
                    // the specified component (and is not using an inconsistent
 
                    // component ID associated with a port).
 
                    content.target_component_id
 
                },
 
                Message::SyncPort(content) => {
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.target_port);
 
                    if port_desc.state == PortState::Closed {
 
                        todo!("handle sending over a closed port")
 
                    }
 

	
 
                    port_desc.peer_connector
 
                },
 
                Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"),
 
                Message::Control(_) => unreachable!("component sending 'Control' messages directly"),
 
            };
 

	
 
            self.runtime.send_message(target_component_id, message);
 
        }
 

	
 
        while let Some(state_change) = scheduled.ctx.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Creating a new component. The creator needs to relinquish
 
                    // ownership of the ports that are given to the new
 
                    // component. All data messages that were intended for that
 
                    // port also needs to be transferred.
 
                    let new_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_key);
 

	
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        let mut message_idx = 0;
 
                        while message_idx < scheduled.ctx.inbox_messages.len() {
 
                            let message = &scheduled.ctx.inbox_messages[message_idx];
 
                            if Self::get_message_target_port(message) == Some(port_id) {
 
                                // Need to transfer this message
 
                                // TODO: Revise messages, this is becoming messy and error-prone
 
                                let message = scheduled.ctx.inbox_messages.remove(message_idx);
 
                                if message_idx < scheduled.ctx.inbox_len_read {
 
                                    scheduled.ctx.inbox_len_read -= 1;
 
                                }
 
                                new_connector.ctx.inbox_messages.push(message);
 
                            } else {
 
@@ -312,366 +312,370 @@ impl Scheduler {
 
                        // Transfer the port itself
 
                        let port_index = scheduled.ctx.ports.iter()
 
                            .position(|v| v.self_id == port_id)
 
                            .unwrap();
 
                        let port = scheduled.ctx.ports.remove(port_index);
 
                        new_connector.ctx.ports.push(port.clone());
 

	
 
                        // Notify the peer that the port has changed
 
                        let reroute_message = scheduled.router.prepare_reroute(
 
                            port.self_id, port.peer_id, scheduled.ctx.id,
 
                            port.peer_connector, new_connector.ctx.id
 
                        );
 

	
 
                        self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message));
 
                        self.runtime.send_message(port.peer_connector, Message::Control(reroute_message));
 
                    }
 

	
 
                    // Schedule new connector to run
 
                    self.runtime.push_work(new_key);
 
                },
 
                ComponentStateChange::CreatedPort(port) => {
 
                    scheduled.ctx.ports.push(port);
 
                },
 
                ComponentStateChange::ChangedPort(port_change) => {
 
                    if port_change.is_acquired {
 
                        scheduled.ctx.ports.push(port_change.port);
 
                    } else {
 
                        let index = scheduled.ctx.ports
 
                            .iter()
 
                            .position(|v| v.self_id == port_change.port.self_id)
 
                            .unwrap();
 
                        scheduled.ctx.ports.remove(index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx.changed_in_sync {
 
            if scheduled.ctx.is_in_sync {
 
                // Just entered sync region
 
            } else {
 
                // Just left sync region. So clear inbox up until the last
 
                // message that was read.
 
                scheduled.ctx.inbox_messages.drain(0..scheduled.ctx.inbox_len_read);
 
                scheduled.ctx.inbox_len_read = 0;
 
            }
 

	
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
        }
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.0);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
        connector.public.sleeping.store(true, Ordering::Release);
 

	
 
        // But due to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up (needed because someone might be trying
 
            // the exact same atomic compare-and-swap at this point in time)
 
            let should_wake_up_again = connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
            if should_wake_up_again {
 
                self.runtime.push_work(connector_key)
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn get_message_target_port(message: &Message) -> Option<PortIdLocal> {
 
        match message {
 
            Message::Data(data) => return Some(data.data_header.target_port),
 
            Message::SyncComp(_) => {},
 
            Message::SyncPort(content) => return Some(content.target_port),
 
            Message::SyncControl(_) => return None,
 
            Message::Control(control) => {
 
                match &control.content {
 
                    ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id),
 
                    ControlContent::CloseChannel(port_id) => return Some(*port_id),
 
                    ControlContent::Ping | ControlContent::Ack => {},
 
                }
 
            },
 
        }
 

	
 
        return None
 
    }
 

	
 
    // TODO: Remove, this is debugging stuff
 
    fn debug(&self, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
        // println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
        println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
        // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentCtx
 
// -----------------------------------------------------------------------------
 

	
 
enum ComponentStateChange {
 
    CreatedComponent(ConnectorPDL, Vec<PortIdLocal>),
 
    CreatedPort(Port),
 
    ChangedPort(ComponentPortChange),
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) struct ComponentPortChange {
 
    pub is_acquired: bool, // otherwise: released
 
    pub port: Port,
 
}
 

	
 
/// The component context (better name may be invented). This was created
 
/// because part of the component's state is managed by the scheduler, and part
 
/// of it by the component itself. When the component starts a sync block or
 
/// exits a sync block the partially managed state by both component and
 
/// scheduler need to be exchanged.
 
pub(crate) struct ComponentCtx {
 
    // Mostly managed by the scheduler
 
    pub(crate) id: ConnectorId,
 
    ports: Vec<Port>,
 
    inbox_messages: Vec<Message>,
 
    inbox_len_read: usize,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: VecDeque<Message>,
 
    state_changes: VecDeque<ComponentStateChange>,
 

	
 
    // Workspaces that may be used by components to (generally) prevent
 
    // allocations. Be a good scout and leave it empty after you've used it.
 
    // TODO: Move to scheduler ctx, this is the wrong place
 
    pub workspace_ports: Vec<PortIdLocal>,
 
    pub workspace_branches: Vec<BranchId>,
 
}
 

	
 
impl ComponentCtx {
 
    pub(crate) fn new_empty() -> Self {
 
        return Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
            inbox_messages: Vec::new(),
 
            inbox_len_read: 0,
 
            is_in_sync: false,
 
            changed_in_sync: false,
 
            outbox: VecDeque::new(),
 
            state_changes: VecDeque::new(),
 
            workspace_ports: Vec::new(),
 
            workspace_branches: Vec::new(),
 
        };
 
    }
 

	
 
    /// Notify the runtime that the component has created a new component. May
 
    /// only be called outside of a sync block.
 
    pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec<PortIdLocal>) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push_back(ComponentStateChange::CreatedComponent(component, initial_ports));
 
    }
 

	
 
    /// Notify the runtime that the component has created a new port. May only
 
    /// be called outside of a sync block (for ports received during a sync
 
    /// block, pass them when calling `notify_sync_end`).
 
    pub(crate) fn push_port(&mut self, port: Port) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push_back(ComponentStateChange::CreatedPort(port))
 
    }
 

	
 
    /// Notify the runtime of an error. Note that this will not perform any
 
    /// special action beyond printing the error. The component is responsible
 
    /// for waiting until it is appropriate to shut down (i.e. being outside
 
    /// of a sync region) and returning the `Exit` scheduling code.
 
    pub(crate) fn push_error(&mut self, error: EvalError) {
 
        println!("ERROR: Component ({}) encountered a critical error:\n{}", self.id.0, error);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_ports(&self) -> &[Port] {
 
        return self.ports.as_slice();
 
    }
 

	
 
    pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.self_id == id);
 
    }
 

	
 
    pub(crate) fn get_port_by_channel_id(&self, id: ChannelId) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.channel_id == id);
 
    }
 

	
 
    fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> {
 
        return self.ports.iter_mut().find(|v| v.self_id == id);
 
    }
 

	
 
    /// Notify that component will enter a sync block. Note that after calling
 
    /// this function you must allow the scheduler to pick up the changes in the
 
    /// context by exiting your code-executing loop, and to continue executing
 
    /// code the next time the scheduler picks up the component.
 
    pub(crate) fn notify_sync_start(&mut self) {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        self.is_in_sync = true;
 
        self.changed_in_sync = true;
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        return self.is_in_sync;
 
    }
 

	
 
    /// Submit a message for the scheduler to send to the appropriate receiver.
 
    /// May only be called inside of a sync block.
 
    pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> {
 
        debug_assert!(self.is_in_sync);
 
        if let Some(port_id) = contents.source_port() {
 
            let port_info = self.get_port_by_id(port_id);
 
            let is_valid = match port_info {
 
                Some(port_info) => {
 
                    port_info.state == PortState::Open
 
                },
 
                None => false,
 
            };
 
            if !is_valid {
 
                // We don't own the port
 
                println!(" ****** DEBUG ****** : Sending through closed port!!! {}", port_id.index);
 
                return Err(());
 
            }
 
        }
 

	
 
        self.outbox.push_back(contents);
 
        return Ok(());
 
    }
 

	
 
    /// Notify that component just finished a sync block. Like
 
    /// `notify_sync_start`: drop out of the `Component::Run` function.
 
    pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.is_in_sync = false;
 
        self.changed_in_sync = true;
 

	
 
        self.state_changes.reserve(changed_ports.len());
 
        for changed_port in changed_ports {
 
            self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone()));
 
        }
 
    }
 

	
 
    /// Retrieves messages matching a particular port and branch id. But only
 
    /// those messages that have been previously received with
 
    /// `read_next_message`.
 
    pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return MessagesIter {
 
            messages: &self.inbox_messages,
 
            next_index: 0,
 
            max_index: self.inbox_len_read,
 
            match_port_id
 
        };
 
    }
 

	
 
    /// Retrieves the next unread message from the inbox `None` if there are no
 
    /// (new) messages to read.
 
    // TODO: Fix the clone of the data message, entirely unnecessary
 
    pub(crate) fn read_next_message(&mut self) -> Option<Message> {
 
        if !self.is_in_sync { return None; }
 
        return self.read_next_message_even_if_not_in_sync();
 
    }
 

	
 
    pub(crate) fn read_next_message_even_if_not_in_sync(&mut self) -> Option<Message> {
 
        if self.inbox_len_read == self.inbox_messages.len() { return None; }
 

	
 
        // We want to keep data messages in the inbox, because we need to check
 
        // them in the future. We don't want to keep sync messages around, we
 
        // should only handle them once. Control messages should never be in
 
        // here.
 
        let message = &self.inbox_messages[self.inbox_len_read];
 
        match &message {
 
            Message::Data(content) => {
 
                // Keep message in inbox for later reading
 
                self.inbox_len_read += 1;
 
                return Some(Message::Data(content.clone()));
 
            },
 
            Message::SyncComp(_) | Message::SyncPort(_) | Message::SyncControl(_) => {
 
                // Remove message from inbox
 
                let message = self.inbox_messages.remove(self.inbox_len_read);
 
                return Some(message);
 
            },
 
            Message::Control(_) => unreachable!("control message ended up in component inbox"),
 
        }
 
    }
 
}
 

	
 
pub(crate) struct MessagesIter<'a> {
 
    messages: &'a [Message],
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
}
 

	
 
impl<'a> Iterator for MessagesIter<'a> {
 
    type Item = &'a DataMessage;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let message = &self.messages[self.next_index];
 
            if let Message::Data(message) = &message {
 
                if message.data_header.target_port == self.match_port_id {
 
                    // Found a match
 
                    self.next_index += 1;
 
                    return Some(message);
 
                }
 
            } else {
 
                // Unreachable because:
 
                //  1. We only iterate over messages that were previously retrieved by `read_next_message`.
 
                //  2. Inbox does not contain control/ping messages.
 
                //  3. If `read_next_message` encounters anything else than a data message, it is removed from the inbox.
 
                unreachable!();
 
            }
 

	
 
            self.next_index += 1;
 
        }
 

	
 
        // No more messages
 
        return None;
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
struct ControlEntry {
 
    id: u32,
 
    variant: ControlVariant,
 
}
 

	
 
enum ControlVariant {
 
    ChangedPort(ControlChangedPort),
 
    ClosedChannel(ControlClosedChannel),
 
}
 

	
 
struct ControlChangedPort {
 
    target_port: PortIdLocal,       // if send to this port, then reroute
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector we need to reroute to
 
}
 

	
 
struct ControlClosedChannel {
 
    source_port: PortIdLocal,
 
    target_port: PortIdLocal,
 
}
 

	
 
pub(crate) struct ControlMessageHandler {
 
    id_counter: u32,
 
    active: Vec<ControlEntry>,
 
}
 

	
 
impl ControlMessageHandler {
 
    pub fn new() -> Self {
 
        ControlMessageHandler {
 
            id_counter: 0,
 
            active: Vec::new(),
 
        }
 
    }
src/runtime2/tests/mod.rs
Show inline comments
 
mod network_shapes;
 
mod api_component;
 
mod speculation;
 
mod data_transmission;
 
mod sync_failure;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
// pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 7;   // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 8;       // number of loops within a single test (not used by all tests)
 

	
 
pub(crate) const NUM_THREADS: u32 = 1;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 1;
 
pub(crate) const NUM_INSTANCES: u32 = 5;
 
pub(crate) const NUM_LOOPS: u32 = 5;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
}
 

	
 
fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor: F) {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes())
 
        .expect("parse PDL");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    let mut api = runtime.create_interface();
 
    for _ in 0..NUM_INSTANCES {
 
        constructor(&mut api);
 
    }
 
}
 

	
 
pub(crate) struct TestTimer {
 
    name: &'static str,
 
    started: std::time::Instant
 
}
 

	
 
impl TestTimer {
 
    pub(crate) fn new(name: &'static str) -> Self {
 
        Self{ name, started: std::time::Instant::now() }
 
    }
 
}
 

	
 
impl Drop for TestTimer {
 
    fn drop(&mut self) {
 
        let delta = std::time::Instant::now() - self.started;
 
        let nanos = (delta.as_secs_f64() * 1_000_000.0) as u64;
 
        let millis = nanos / 1000;
 
        let nanos = nanos % 1000;
 
        println!("[{}] Took {:>4}.{:03} ms", self.name, millis, nanos);
 
    }
 
}
src/runtime2/tests/sync_failure.rs
Show inline comments
 
// sync_failure.rs
 
//
 
// Various tests to ensure that failing components fail in a consistent way.
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_local_sync_failure() {
 
    // If the component exits cleanly, then the runtime exits cleanly, and the
 
    // test will finish
 
    const CODE: &'static str = "
 
    primitive immediate_failure_inside_sync() {
 
        u32[] only_allows_index_0 = { 1 };
 
        while (true) sync { // note the infinite loop
 
            auto value = only_allows_index_0[1];
 
        }
 
    }
 

	
 
    primitive immediate_failure_outside_sync() {
 
        u32[] only_allows_index_0 = { 1 };
 
        auto never_gonna_get = only_allows_index_0[1];
 
        while (true) sync {}
 
    }
 
    ";
 

	
 
    // let thing = TestTimer::new("local_sync_failure");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "immediate_failure_outside_sync", ValueGroup::new_stack(Vec::new()))
 
            .expect("create component");
 

	
 
        api.create_connector("", "immediate_failure_inside_sync", ValueGroup::new_stack(Vec::new()))
 
            .expect("create component");
 
    })
 
}
 

	
 
#[test]
 
fn test_shared_sync_failure() {
 
    // Same as above. One of the components should fail, the other should follow
 
    // suit because it cannot complete a sync round.
 
    // suit because it cannot complete a sync round. We intentionally have an
 
    // infinite loop in the while condition because we need at least two loops
 
    // for the last error to get picked up.
 
    const CODE: &'static str = "
 
    enum Location { BeforeSync, AfterPut, AfterGet, AfterSync, Never }
 
    primitive failing_at_location(in<bool> input, out<bool> output, Location loc) {
 
        u32[] failure_array = {};
 
        while (true) {
 
            if (loc == Location::BeforeSync) failure_array[0];
 
            sync {
 
                put(output, true);
 
                if (loc == Location::AfterPut) failure_array[0];
 
                auto received = get(input);
 
                assert(received);
 
                if (loc == Location::AfterGet) failure_array[0];
 
            }
 
            if (loc == Location::AfterSync) failure_array[0];
 
        }
 
    }
 

	
 
    composite constructor(Location loc) {
 
        channel output_a -> input_a;
 
        channel output_b -> input_b;
 
        new failing_at_location(input_a, output_b, Location::Never);
 
        new failing_at_location(input_b, output_a, loc);
 
    }
 
    ";
 

	
 
    run_test_in_runtime(CODE, |api| {
 
        for variant in 0..1 {
 
        for variant in 0..4 { // all `Location` enum variants, except `Never`.
 
            // Create the channels
 
            api.create_connector("", "constructor", ValueGroup::new_stack(vec![
 
                Value::Enum(variant)
 
            ])).expect("create connector");
 
        }
 
    })
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)