Changeset - 32d91577e090
[Not reviewed]
0 10 0
MH - 4 years ago 2021-11-09 17:39:28
contact@maxhenger.nl
initial multithreaded runtime
10 files changed with 125 insertions and 44 deletions:
0 comments (0 inline, 0 general)
src/collections/raw_vec.rs
Show inline comments
 
@@ -70,7 +70,7 @@ impl<T: Sized> RawVec<T> {
 
            debug_assert!(additional > 0);
 
            let new_cap = self.len.checked_add(additional).unwrap();
 
            let new_cap = cmp::max(new_cap, self.cap * Self::GROWTH_RATE);
 
            
 

	
 
            let layout = Layout::array::<T>(new_cap)
 
                .map_err(|_| AllocError::CapacityOverflow)?;
 
            debug_assert_eq!(new_cap * Self::T_SIZE, layout.size());
 
@@ -83,7 +83,7 @@ impl<T: Sized> RawVec<T> {
 
                    let old_base = self.base as *mut u8;
 
                    let (old_size, old_layout) = self.current_layout();
 

	
 
                    ptr::copy_nonoverlapping(new_base, old_base, old_size);
 
                    ptr::copy_nonoverlapping(old_base, new_base, old_size);
 
                    dealloc(old_base, old_layout);
 
                }
 

	
src/collections/sets.rs
Show inline comments
 
@@ -93,6 +93,11 @@ impl<T: Eq> VecSet<T> {
 
        return self.inner.iter();
 
    }
 

	
 
    #[inline]
 
    pub fn contains(&self, item: &T) -> bool {
 
        return self.inner.contains(item);
 
    }
 

	
 
    #[inline]
 
    pub fn is_empty(&self) -> bool {
 
        self.inner.is_empty()
src/runtime2/branch.rs
Show inline comments
 
@@ -273,8 +273,9 @@ impl ExecTree {
 
    pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId {
 
        debug_assert!(self.is_in_sync());
 
        let parent_branch = &self[parent_branch_id];
 
        let new_branch = Branch::new_sync(1, parent_branch);
 
        let new_branch = Branch::new_sync(self.branches.len() as u32, parent_branch);
 
        let new_branch_id = new_branch.id;
 
        self.branches.push(new_branch);
 

	
 
        return new_branch_id;
 
    }
src/runtime2/connector2.rs
Show inline comments
 
@@ -34,6 +34,7 @@ use crate::common::ComponentState;
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::protocol::{RunContext, RunResult};
 
use crate::runtime2::consensus::find_ports_in_value_group;
 
use crate::runtime2::inbox2::DataContent;
 
use crate::runtime2::port::PortKind;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState};
 
@@ -110,10 +111,11 @@ impl Connector for ConnectorPDL {
 
        if self.tree.is_in_sync() {
 
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
            if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) {
 
                todo!("call handler");
 
                self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx);
 
                return ConnectorScheduling::Immediate;
 
            } else {
 
                return scheduling
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
            return scheduling;
 
@@ -154,8 +156,8 @@ impl ConnectorPDL {
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.clone());
 
            self.consensus.notify_of_received_message(branch_id, &message.data_header, &message.content);
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
@@ -164,7 +166,7 @@ impl ConnectorPDL {
 

	
 
    pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) {
 
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
 

	
 
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
 
        }
 
    }
 

	
 
@@ -239,16 +241,17 @@ impl ConnectorPDL {
 
                    // a message that targets this branch, so check now.
 
                    let mut any_branch_received = false;
 
                    for message in comp_ctx.get_read_data_messages(port_id) {
 
                        if self.consensus.branch_can_receive(branch_id, &message.data_header) {
 
                        if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) {
 
                            // This branch can receive the message, so we do the
 
                            // fork-and-receive dance
 
                            let recv_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[recv_branch_id];
 
                            branch.insert_message(port_id, message.content.clone());
 
                            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                            let branch = &mut self.tree[receiving_branch_id];
 

	
 
                            branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 

	
 
                            self.consensus.notify_of_new_branch(branch_id, recv_branch_id);
 
                            self.consensus.notify_of_received_message(recv_branch_id, &message.data_header, &message.content);
 
                            self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id);
 
                            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                            self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content);
 
                            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                            any_branch_received = true;
 
                        }
 
@@ -267,7 +270,7 @@ impl ConnectorPDL {
 
                    branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                    self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
                } else if consistency == Consistency::Inconsistent {
 
                    branch.sync_state == SpeculativeState::Inconsistent;
 
                    branch.sync_state = SpeculativeState::Inconsistent;
 
                }
 
            },
 
            RunResult::BranchPut(port_id, content) => {
 
@@ -278,7 +281,8 @@ impl ConnectorPDL {
 
                    // `put()` is valid.
 
                    let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                    comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{
 
                        sync_header, data_header, content
 
                        sync_header, data_header,
 
                        content: DataContent::Message(content),
 
                    }));
 

	
 
                    self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
@@ -324,6 +328,7 @@ impl ConnectorPDL {
 
                comp_ctx.notify_sync_start();
 
                let sync_branch_id = self.tree.start_sync();
 
                self.consensus.start_sync(comp_ctx);
 
                self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
src/runtime2/consensus.rs
Show inline comments
 
use crate::collections::VecSet;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::inbox2::DataContent;
 

	
 
use super::branch::{BranchId, ExecTree, QueueKind};
 
use super::ConnectorId;
 
@@ -42,15 +43,18 @@ pub(crate) struct GlobalSolution {
 
// TODO: A lot of stuff should be batched. Like checking all the sync headers
 
//  and sending "I have a higher ID" messages.
 
pub(crate) struct Consensus {
 
    // --- State that is cleared after each round
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>,
 
    last_finished_handled: Option<BranchId>,
 
    // Gathered state (in case we are currently the leader of the distributed
 
    // consensus protocol)
 
    encountered_peers: VecSet<ConnectorId>,
 
    // Gathered state from communication
 
    encountered_peers: VecSet<ConnectorId>, // to determine when we should send "found a higher ID" messages.
 
    encountered_ports: VecSet<PortIdLocal>, // to determine if we should send "port remains silent" messages.
 
    solution_combiner: SolutionCombiner,
 
    // Workspaces
 
    // --- Persistent state
 
    // TODO: Tracking sync round numbers
 
    // --- Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
@@ -67,6 +71,7 @@ impl Consensus {
 
            branch_annotations: Vec::new(),
 
            last_finished_handled: None,
 
            encountered_peers: VecSet::new(),
 
            encountered_ports: VecSet::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            workspace_ports: Vec::new(),
 
        }
 
@@ -188,9 +193,32 @@ impl Consensus {
 
            let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
            for port in source_mapping {
 
                // Note: if the port is silent, and we've never communicated
 
                // over the port, then we need to do so now, to let the peer
 
                // component know about our sync leader state.
 
                let port_desc = ctx.get_port_by_id(port.port_id).unwrap();
 
                let peer_port_id = port_desc.peer_id;
 
                let channel_id = port_desc.channel_id;
 

	
 
                if !self.encountered_ports.contains(&port.port_id) {
 
                    ctx.submit_message(MessageFancy::Data(DataMessageFancy{
 
                        sync_header: SyncHeader{
 
                            sending_component_id: ctx.id,
 
                            highest_component_id: self.highest_connector_id,
 
                        },
 
                        data_header: DataHeader{
 
                            expected_mapping: source_mapping.clone(),
 
                            sending_port: port.port_id,
 
                            target_port: peer_port_id,
 
                            new_mapping: BranchId::new_invalid(),
 
                        },
 
                        content: DataContent::SilentPortNotification,
 
                    }));
 
                    self.encountered_ports.push(port.port_id);
 
                }
 

	
 
                target_mapping.push((
 
                    port_desc.channel_id,
 
                    channel_id,
 
                    port.registered_id.unwrap_or(BranchId::new_invalid())
 
                ));
 
            }
 
@@ -229,6 +257,7 @@ impl Consensus {
 
        self.branch_annotations.clear();
 
        self.last_finished_handled = None;
 
        self.encountered_peers.clear();
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 
    }
 

	
 
@@ -238,11 +267,10 @@ impl Consensus {
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let sync_header = self.create_sync_header(ctx);
 

	
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 

	
 
        if cfg!(debug_assertions) {
 
            // Check for consistent mapping
 
            let port = branch.port_mapping.iter()
 
                .find(|v| v.port_id == source_port_id)
 
                .unwrap();
 
@@ -257,17 +285,19 @@ impl Consensus {
 
            self.workspace_ports.clear();
 
        }
 

	
 
        // Construct data header
 
        // TODO: Handle multiple firings. Right now we just assign the current
 
        //  branch to the `None` value because we know we can only send once.
 
        debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none());
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 
        let data_header = DataHeader{
 
            expected_mapping: branch.port_mapping.clone(),
 
            sending_port: port_info.peer_id,
 
            sending_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch_id
 
        };
 

	
 
        // Update port mapping
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == source_port_id {
 
                mapping.expected_firing = Some(true);
 
@@ -275,7 +305,9 @@ impl Consensus {
 
            }
 
        }
 

	
 
        return (sync_header, data_header);
 
        self.encountered_ports.push(source_port_id);
 

	
 
        return (self.create_sync_header(ctx), data_header);
 
    }
 

	
 
    /// Handles a new data message by handling the data and sync header, and
 
@@ -286,7 +318,7 @@ impl Consensus {
 
    /// 2. We return the branches that *can* receive the message, you still
 
    ///     have to explicitly call `notify_of_received_message`.
 
    pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessageFancy, ctx: &mut ComponentCtxFancy, target_ids: &mut Vec<BranchId>) {
 
        self.handle_received_data_header(exec_tree, &message.data_header, target_ids);
 
        self.handle_received_data_header(exec_tree, &message.data_header, &message.content, target_ids);
 
        self.handle_received_sync_header(&message.sync_header, ctx);
 
    }
 

	
 
@@ -318,8 +350,9 @@ impl Consensus {
 
        }
 
    }
 

	
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) {
 
        debug_assert!(self.branch_can_receive(branch_id, data_header));
 
    pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) {
 
        debug_assert!(self.branch_can_receive(branch_id, data_header, content));
 

	
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.port_mapping {
 
            if mapping.port_id == data_header.target_port {
 
@@ -328,7 +361,7 @@ impl Consensus {
 

	
 
                // Check for sent ports
 
                debug_assert!(self.workspace_ports.is_empty());
 
                find_ports_in_value_group(content, &mut self.workspace_ports);
 
                find_ports_in_value_group(content.as_message().unwrap(), &mut self.workspace_ports);
 
                if !self.workspace_ports.is_empty() {
 
                    todo!("handle received ports");
 
                    self.workspace_ports.clear();
 
@@ -345,7 +378,12 @@ impl Consensus {
 

	
 
    /// Matches the mapping between the branch and the data message. If they
 
    /// match then the branch can receive the message.
 
    pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool {
 
    pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) -> bool {
 
        if let DataContent::SilentPortNotification = content {
 
            // No port can receive a "silent" notification.
 
            return false;
 
        }
 

	
 
        let annotation = &self.branch_annotations[branch_id.index as usize];
 
        for expected in &data_header.expected_mapping {
 
            // If we own the port, then we have an entry in the
 
@@ -369,12 +407,12 @@ impl Consensus {
 
    /// Checks data header and consults the stored port mapping and the
 
    /// execution tree to see which branches may receive the data message's
 
    /// contents.
 
    fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec<BranchId>) {
 
    fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec<BranchId>) {
 
        for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) {
 
            if branch.awaiting_port == data_header.target_port {
 
                // Found a branch awaiting the message, but we need to make sure
 
                // the mapping is correct
 
                if self.branch_can_receive(branch.id, data_header) {
 
                if self.branch_can_receive(branch.id, data_header, content) {
 
                    target_ids.push(branch.id);
 
                }
 
            }
 
@@ -419,6 +457,8 @@ impl Consensus {
 
    }
 

	
 
    fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) -> Option<BranchId> {
 
        println!("DEBUG [....:.. conn:{:02}]: Storing local solution for component {}, branch {}", ctx.id.0, solution.component.0, solution.final_branch_id.index);
 

	
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) {
src/runtime2/inbox2.rs
Show inline comments
 
@@ -32,13 +32,34 @@ pub(crate) struct DataHeader {
 
    pub new_mapping: BranchId,
 
}
 

	
 
// TODO: Very much on the fence about this. On one hand I thought making it a
 
//  data message was neat because "silent port notification" should be rerouted
 
//  like any other data message to determine the component ID of the receiver
 
//  and to make it part of the leader election algorithm for the sync leader.
 
//  However: it complicates logic quite a bit. Really it might be easier to
 
//  create `Message::SyncAtComponent` and `Message::SyncAtPort` messages...
 
#[derive(Debug, Clone)]
 
pub(crate) enum DataContent {
 
    SilentPortNotification,
 
    Message(ValueGroup),
 
}
 

	
 
impl DataContent {
 
    pub(crate) fn as_message(&self) -> Option<&ValueGroup> {
 
        match self {
 
            DataContent::SilentPortNotification => None,
 
            DataContent::Message(message) => Some(message),
 
        }
 
    }
 
}
 

	
 
/// A data message is a message that is intended for the receiver's PDL code,
 
/// but will also be handled by the consensus algorithm
 
#[derive(Debug, Clone)]
 
pub(crate) struct DataMessageFancy {
 
    pub sync_header: SyncHeader,
 
    pub data_header: DataHeader,
 
    pub content: ValueGroup,
 
    pub content: DataContent,
 
}
 

	
 
#[derive(Debug)]
src/runtime2/mod.rs
Show inline comments
 
@@ -389,6 +389,7 @@ impl ConnectorStore {
 
    /// Retrieves public part of connector - accessible by many threads at once.
 
    fn get_public(&self, id: ConnectorId) -> &'static ConnectorPublic {
 
        unsafe {
 
            debug_assert!(!self.free.contains(&(id.0 as usize)));
 
            let connector = self.connectors.get(id.0 as usize);
 
            debug_assert!(!connector.is_null());
 
            return &(**connector).public;
 
@@ -399,6 +400,7 @@ impl ConnectorStore {
 
    /// time.
 
    fn get_private(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        unsafe {
 
            debug_assert!(!self.free.contains(&(key.index as usize)));
 
            let connector = self.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return &mut (**connector);
src/runtime2/native.rs
Show inline comments
 
@@ -188,7 +188,7 @@ impl ApplicationInterface {
 
        connector.inbox.insert_message(MessageFancy::Control(ControlMessageFancy{
 
            id: 0,
 
            sending_component_id: self.connector_id,
 
            content: ControlContent::Ack
 
            content: ControlContent::Ping,
 
        }));
 

	
 
        let should_wake_up = connector.sleeping
src/runtime2/scheduler.rs
Show inline comments
 
@@ -130,7 +130,7 @@ impl Scheduler {
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message));
 
            if let Some(target_port) = Self::get_data_message_target_port(&message) {
 
            if let Some(target_port) = Self::get_message_target_port(&message) {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 
                    self.runtime.send_message(other_component_id, message);
 
@@ -221,7 +221,7 @@ impl Scheduler {
 
                    // the sender must make sure it actually wants to send to
 
                    // the specified component (and is not using an inconsistent
 
                    // component ID associated with a port).
 
                    content.sync_header.highest_component_id
 
                    content.target_component_id
 
                },
 
                MessageFancy::Control(_) => {
 
                    unreachable!("component sending control messages directly");
 
@@ -246,7 +246,7 @@ impl Scheduler {
 
                        let mut message_idx = 0;
 
                        while message_idx < scheduled.ctx_fancy.inbox_messages.len() {
 
                            let message = &scheduled.ctx_fancy.inbox_messages[message_idx];
 
                            if Self::get_data_message_target_port(message) == Some(port_id) {
 
                            if Self::get_message_target_port(message) == Some(port_id) {
 
                                // Need to transfer this message
 
                                let message = scheduled.ctx_fancy.inbox_messages.remove(message_idx);
 
                                new_connector.ctx_fancy.inbox_messages.push(message);
 
@@ -330,9 +330,17 @@ impl Scheduler {
 
    }
 

	
 
    #[inline]
 
    fn get_data_message_target_port(message: &MessageFancy) -> Option<PortIdLocal> {
 
        if let MessageFancy::Data(message) = message {
 
            return Some(message.data_header.target_port)
 
    fn get_message_target_port(message: &MessageFancy) -> Option<PortIdLocal> {
 
        match message {
 
            MessageFancy::Data(data) => return Some(data.data_header.target_port),
 
            MessageFancy::Sync(_) => {},
 
            MessageFancy::Control(control) => {
 
                match &control.content {
 
                    ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id),
 
                    ControlContent::CloseChannel(port_id) => return Some(*port_id),
 
                    ControlContent::Ping | ControlContent::Ack => {},
 
                }
 
            },
 
        }
 

	
 
        return None
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -5,7 +5,7 @@ use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 

	
 
const NUM_THREADS: u32 = 1;     // number of threads in runtime
 
const NUM_THREADS: u32 = 10;  // number of threads in runtime
 
const NUM_INSTANCES: u32 = 10;  // number of test instances constructed
 
const NUM_LOOPS: u32 = 10;      // number of loops within a single test (not used by all tests)
 

	
 
@@ -50,7 +50,6 @@ fn test_put_and_get() {
 
                print(\"getting!\");
 
                auto result = get(receiver);
 
                assert(result);
 

	
 
            }
 
            index += 1;
 
        }
0 comments (0 inline, 0 general)