Changeset - bf4c0ee5ba65
[Not reviewed]
0 7 0
mh - 3 years ago 2022-01-25 16:46:08
contact@maxhenger.nl
WIP: Changed component creation logic once more
7 files changed with 389 insertions and 108 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -20,25 +20,25 @@ impl PortId {
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub enum PortState {
 
    Open,
 
    Blocked,
 
    Closed,
 
}
 

	
 
pub struct Port {
 
    pub self_id: PortId,
 
    pub peer_id: PortId,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub peer_comp_id: CompId,
 
@@ -180,15 +180,25 @@ pub enum Message {
 

	
 
impl Message {
 
    pub(crate) fn target_port(&self) -> Option<PortId> {
 
        match self {
 
            Message::Data(v) =>
 
                return Some(v.data_header.target_port),
 
            Message::Control(v) =>
 
                return v.target_port_id,
 
            Message::Sync(_) =>
 
                return None,
 
        }
 
    }
 

	
 
    pub(crate) fn modify_target_port(&mut self, port_id: PortId) {
 
        match self {
 
            Message::Data(v) =>
 
                v.data_header.target_port = port_id,
 
            Message::Control(v) =>
 
                v.target_port_id = Some(port_id),
 
            Message::Sync(_) => unreachable!(), // should never be called for this message type
 
        }
 
    }
 
}
 

	
 

	
src/runtime2/component/component_pdl.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::ast::DefinitionId;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::*;
 
use super::control_layer::*;
 
@@ -19,63 +20,122 @@ pub enum CompScheduling {
 
    Sleep,
 
    Exit,
 
}
 

	
 
pub struct CompCtx {
 
    pub id: CompId,
 
    pub ports: Vec<Port>,
 
    pub peers: Vec<Peer>,
 
    pub messages: Vec<Option<DataMessage>>, // same size as "ports"
 
    pub port_id_counter: u32,
 
}
 

	
 
impl Default for CompCtx {
 
    fn default() -> Self {
 
impl CompCtx {
 
    pub(crate) fn new(reservation: &CompReserved) -> Self {
 
        return Self{
 
            id: CompId(0),
 
            id: reservation.id(),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            messages: Vec::new(),
 
            port_id_counter: 0,
 
        }
 
    }
 
}
 

	
 
struct MessageView<'a> {
 
    index: usize,
 
    pub message: &'a DataMessage,
 
}
 

	
 
impl CompCtx {
 
    /// Creates a new channel that is fully owned by the component associated
 
    /// with this context.
 
    fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Closed,
 
            peer_comp_id: self.id,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adopts a port transferred by another component. Essentially copies all
 
    /// port data but creates a new ID. Caller should ensure that the other
 
    /// endpoint becomes aware of this ID.
 
    fn adopt_port(&mut self, to_transfer: &Port) -> &mut Port {
 
        let port_id = PortId(self.take_port_id());
 
        let port_index = self.ports.len();
 
        self.ports.push(Port{
 
            self_id: port_id,
 
            peer_id: to_transfer.peer_id,
 
            kind: to_transfer.kind,
 
            state: to_transfer.state,
 
            peer_comp_id: to_transfer.peer_comp_id,
 
        });
 
        return &mut self.ports[port_index];
 
    }
 

	
 
    /// Adds a peer (or increments the "associated port" counter). Hence caller
 
    /// must make sure that this makes sense.
 
    fn add_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId, peer_handle: Option<&CompHandle>) {
 
        match self.get_peer_index(peer_id) {
 
            Some(peer_index) => {
 
                let peer_info = &mut self.peers[peer_index];
 
                peer_info.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = if let Some(handle) = peer_handle {
 
                    handle.clone()
 
                } else {
 
                    sched_ctx.runtime.get_component_public(peer_id)
 
                };
 

	
 
                self.peers.push(Peer{
 
                    id: peer_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                })
 
            }
 
        }
 
    }
 

	
 
    /// Removes a peer (or decrements the "associated port" counter). If there
 
    /// are no more references to the peer then the handle will be destroyed.
 
    fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId) {
 
        let peer_index = self.get_peer_index(peer_id).unwrap();
 
        let peer_info = &mut self.peers[peer_index];
 
        peer_info.num_associated_ports -= 1;
 

	
 
        if peer_info.num_associated_ports == 0 {
 
            let mut peer = self.peers.remove(peer_index);
 
            let should_remove = peer.handle.decrement_users();
 
            if should_remove {
 
                let key = unsafe{ peer.id.upgrade() };
 
                sched_ctx.runtime.destroy_component(key);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn get_port(&self, port_id: PortId) -> &Port {
 
        let index = self.get_port_index(port_id).unwrap();
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, port_id: PortId) -> &mut Port {
 
        let index = self.get_port_index(port_id).unwrap();
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_index(&self, port_id: PortId) -> Option<usize> {
 
        for (index, port) in self.ports.iter().enumerate() {
 
@@ -228,27 +288,27 @@ impl CompPDL {
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main,
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        sched_ctx.log(&format!("handling message: {:?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&message) {
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(!_should_remove);
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
@@ -324,33 +384,28 @@ impl CompPDL {
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Exit);
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 

	
 
                let mut ports = Vec::new(); // TODO: Optimize
 
                let protocol = &sched_ctx.runtime.protocol;
 
                find_ports_in_value_group(&arguments, &mut ports);
 
                let prompt = Prompt::new(
 
                    &protocol.types, &protocol.heap,
 
                self.create_component_and_transfer_ports2(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &ports);
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                self.inbox_main.push(None);
 
                self.inbox_main.push(None);
 
@@ -626,95 +681,152 @@ impl CompPDL {
 
            // We were blocked on the port that just became unblocked, so
 
            // send the message.
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            let mut replacement = ValueGroup::default();
 
            std::mem::swap(&mut replacement, &mut self.mode_value);
 
            self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_id, replacement);
 

	
 
            self.mode = Mode::Sync;
 
            self.mode_port = PortId::new_invalid();
 
        }
 
    }
 

	
 
    fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) {
 
        let component = CompPDL::new(prompt, ports.len());
 
        let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true);
 
        let created_ctx = &mut component.ctx;
 

	
 
        let mut has_reroute_entry = false;
 
        let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 

	
 
        for port_id in ports.iter().copied() {
 
            // Create temporary reroute entry if the peer is another component
 
            let port_info = creator_ctx.get_port(port_id);
 
            debug_assert_ne!(port_info.state, PortState::Blocked);
 
            if port_info.peer_comp_id == creator_ctx.id {
 
                // We own the peer port. So retrieve it and modify the peer directly
 
                let peer_port_id = port_info.peer_id;
 
                let port_info = creator_ctx.get_port_mut(peer_port_id);
 
                port_info.peer_comp_id = created_ctx.id;
 
                Self::add_peer_associated_port_to_component(sched_ctx, creator_ctx, created_ctx.id);
 
    fn create_component_and_transfer_ports2(
 
        &mut self,
 
        sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx,
 
        definition_id: DefinitionId, monomorph_index: i32, mut arguments: ValueGroup
 
    ) {
 
        struct PortPair{ creator: PortId, created: PortId }
 
        let mut port_id_pairs = Vec::new();
 

	
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        // Take all the ports ID that are in the `args` (and currently belong to
 
        // the creator component) and translate them into new IDs that are
 
        // associated with the component we're about to create
 
        let mut arg_iter = ValueGroupIter::new(&mut arguments);
 
        while let Some(port_reference) = arg_iter.next() {
 
            // Create port entry for new component
 
            let creator_port_id = port_reference.id;
 
            let creator_port = creator_ctx.get_port(creator_port_id);
 
            let created_port = created_ctx.adopt_port(creator_port);
 
            let created_port_id = created_port.self_id;
 

	
 
            port_id_pairs.push(PortPair{
 
                creator: creator_port_id,
 
                created: created_port_id,
 
            });
 

	
 
            // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues)
 
            let arg_value = if let Some(heap_pos) = port_reference.heap_pos {
 
                &mut arg_iter.group.regions[heap_pos][port_reference.index]
 
            } else {
 
                // We don't own the peer port, so send the appropriate messages
 
                // to the peer component and notify the control layer
 
                has_reroute_entry = true;
 
                let message = self.control.add_reroute_entry(
 
                    creator_ctx.id, port_info.peer_id, port_info.peer_comp_id,
 
                    port_info.self_id, created_ctx.id, schedule_entry_id
 
                );
 
                let peer_info = creator_ctx.get_peer(port_info.peer_comp_id);
 
                peer_info.handle.send_message(sched_ctx, message, true);
 
                &mut arg_iter.group.values[port_reference.index]
 
            };
 
            match arg_value {
 
                Value::Input(id) => *id = port_id_to_eval(created_port_id),
 
                Value::Output(id) => *id = port_id_to_eval(created_port_id),
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
            // Take out any potential messages for the peer
 
            let creator_port_index = creator_ctx.get_port_index(port_id).unwrap();
 
            let port_main_message = self.inbox_main[creator_port_index].take();
 
        // For each transferred port pair set their peer components to the
 
        // correct values. This will only change the values for the ports of
 
        // the new component.
 
        let mut created_component_has_remote_peers = false;
 

	
 
        for pair in port_id_pairs.iter() {
 
            let creator_port_info = creator_ctx.get_port(pair.creator);
 
            let created_port_info = created_ctx.get_port_mut(pair.created);
 

	
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // Port peer is owned by the creator as well
 
                let created_peer_port_index = port_id_pairs
 
                    .iter()
 
                    .position(|v| v.creator == creator_port_info.peer_id);
 
                match created_peer_port_index {
 
                    Some(created_peer_port_index) => {
 
                        // Peer port moved to the new component as well
 
                        let peer_pair = &port_id_pairs[created_peer_port_index];
 
                        created_port_info.peer_id = peer_pair.created;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.add_peer(sched_ctx, creator_ctx.id, None);
 
                    }
 
                }
 
            } else {
 
                // Peer is a different component
 
                let peer_info = creator_ctx.get_peer(created_port_info.peer_comp_id);
 
                created_ctx.add_peer(sched_ctx, peer_info.id, Some(&peer_info.handle));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 

	
 
            // Transfer port and create temporary reroute entry
 
            let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id);
 
            if port_info.state == PortState::Blocked {
 
                todo!("Think about this when you're not tired!");
 
        // We'll now actually turn our reservation for a new component into an
 
        // actual component. Note that we initialize it as "not sleeping" as
 
        // its initial scheduling might be performed based on `Ack`s in response
 
        // to message exchanges between remote peers.
 
        let prompt = Prompt::new(
 
            &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
            definition_id, monomorph_index, arguments,
 
        );
 
        let component = CompPDL::new(prompt, port_id_pairs.len());
 
        let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component(
 
            reservation, component, created_ctx, false,
 
        );
 
        let created_ctx = &component.ctx;
 

	
 
        // Now modify the creator's ports: remove every transferred port and
 
        // potentially remove the peer component
 
        for pair in port_id_pairs.iter() {
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator).unwrap();
 
            let creator_port_info = creator_ctx.ports.remove(creator_port_index);
 
            if creator_port_info.peer_comp_id != creator_ctx.id {
 
                creator_ctx.remove_peer(sched_ctx, creator_port_info.peer_comp_id);
 
            }
 
            Self::add_port_to_component(sched_ctx, created_ctx, port_info);
 

	
 
            // Transfer the taken messages
 
            let created_port_index = created_ctx.get_port_index(port_id).unwrap();
 
            component.code.inbox_main[created_port_index] = port_main_message;
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                if self.inbox_backup[message_index].data_header.target_port == port_id {
 
                    // Move this message
 
                    let message = self.inbox_backup.remove(message_index);
 
                    component.code.inbox_backup.push(message);
 
                } else {
 
                    message_index += 1;
 
                }
 

	
 
            let created_port_info = created_ctx.get_port(pair.created);
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // This is the cause where the creator obtains a reference
 
                // to the created component
 
                let peer_port_info = creator_ctx.get_port_mut(created_port_info.peer_id);
 
                peer_port_info.peer_comp_id = created_ctx.id;
 
                creator_ctx.add_peer(sched_ctx, created_ctx.id, None);
 
            }
 
        }
 

	
 
            // Maybe remove peer from the creator
 
            if let Some(mut peer_info) = peer_info {
 
                let remove_from_runtime = peer_info.handle.decrement_users();
 
                if remove_from_runtime {
 
                    let removed_comp_key = unsafe{ peer_info.id.upgrade() };
 
                    sched_ctx.runtime.destroy_component(removed_comp_key);
 
        // By now all ports have been transferred. We'll now do any of the setup
 
        // for rerouting/messaging
 
        if created_component_has_remote_peers {
 
            let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 
            for pair in port_id_pairs.iter() {
 
                let port_info = created_ctx.get_port(pair.created);
 
                if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                    let message = self.control.add_reroute_entry(
 
                        creator_ctx.id, port_info.peer_id, port_info.peer_comp_id,
 
                        pair.creator, pair.created, created_ctx.id,
 
                        schedule_entry_id
 
                    );
 
                    let peer_info = created_ctx.get_peer(port_info.peer_comp_id);
 
                    peer_info.handle.send_message(sched_ctx, message, true);
 
                }
 
            }
 
        } else {
 
            // Peer can be scheduled immediately
 
            sched_ctx.runtime.enqueue_work(created_key);
 
        }
 

	
 
        if !has_reroute_entry {
 
            // We can schedule the component immediately
 
            self.control.remove_schedule_entry(schedule_entry_id);
 
            component.public.sleeping.store(false, std::sync::atomic::Ordering::Release);
 
            sched_ctx.runtime.enqueue_work(comp_key);
 
        } // else: wait for the `Ack`s, they will trigger the scheduling of the component
 
    }
 

	
 
    /// Removes a port from a component. Also decrements the port counter in
 
    /// the peer component's entry. If that hits 0 then it will be removed and
 
    /// returned. If returned then the caller is responsible for decrementing
 
    /// the atomic counters of the peer component's handle.
 
    fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option<Peer>) {
 
        let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
        let port_info = comp_ctx.ports.remove(port_index);
 

	
 
        // If the component owns the peer, then we don't have to decrement the
 
        // number of peers (because we don't have an entry for ourselves)
 
@@ -726,34 +838,24 @@ impl CompPDL {
 
        let peer_info = &mut comp_ctx.peers[peer_index];
 
        peer_info.num_associated_ports -= 1;
 

	
 
        // Check if we still have other ports referencing this peer
 
        if peer_info.num_associated_ports != 0 {
 
            return (port_info, None);
 
        }
 

	
 
        let peer_info = comp_ctx.peers.remove(peer_index);
 
        return (port_info, Some(peer_info));
 
    }
 

	
 
    /// Adds a port to the component context. The peer (or its counter) will be
 
    /// updated accordingly.
 
    fn add_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_info: Port) {
 
        // Add the port info
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert!(!comp_ctx.ports.iter().any(|v| v.self_id == port_info.self_id));
 
        comp_ctx.ports.push(port_info);
 
        Self::add_peer_associated_port_to_component(sched_ctx, comp_ctx, peer_comp_id);
 
    }
 

	
 
    /// Only adds/updates a peer for a given port. This function assumes (but
 
    /// does not check!) that the port was not considered to belong to that peer
 
    /// before calling this function.
 
    fn add_peer_associated_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, peer_id: CompId) {
 
        match comp_ctx.get_peer_index(peer_id) {
 
            Some(peer_index) => {
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = sched_ctx.runtime.get_component_public(peer_id);
 
                comp_ctx.peers.push(Peer{
 
@@ -825,13 +927,91 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 

	
 
struct ValueGroupIter<'a> {
 
    group: &'a mut ValueGroup,
 
    heap_stack: Vec<(usize, usize)>,
 
    index: usize,
 
}
 

	
 
impl<'a> ValueGroupIter<'a> {
 
    fn new(group: &'a mut ValueGroup) -> Self {
 
        return Self{ group, heap_stack: Vec::new(), index: 0 }
 
    }
 
}
 

	
 
struct ValueGroupPortRef {
 
    id: PortId,
 
    heap_pos: Option<usize>, // otherwise: on stack
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for ValueGroupIter<'a> {
 
    type Item = ValueGroupPortRef;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Enter loop that keeps iterating until a port is found
 
        loop {
 
            if let Some(pos) = self.heap_stack.last() {
 
                let (heap_pos, region_index) = *pos;
 
                if region_index >= self.group.regions[heap_pos].len() {
 
                    self.heap_stack.pop();
 
                    continue;
 
                }
 

	
 
                let value = &self.group.regions[heap_pos][region_index];
 
                self.heap_stack.last_mut().unwrap().1 += 1;
 

	
 
                match value {
 
                    Value::Input(id) | Value::Output(id) => {
 
                        let id = PortId(id.id);
 
                        return Some(ValueGroupPortRef{
 
                            id,
 
                            heap_pos: Some(heap_pos),
 
                            index: region_index,
 
                        });
 
                    },
 
                    _ => {},
 
                }
 

	
 
                if let Some(heap_pos) = value.get_heap_pos() {
 
                    self.heap_stack.push((heap_pos as usize, 0));
 
                }
 
            } else {
 
                if self.index >= self.group.values.len() {
 
                    return None;
 
                }
 

	
 
                let value = &mut self.group.values[self.index];
 
                self.index += 1;
 

	
 
                match value {
 
                    Value::Input(id) | Value::Output(id) => {
 
                        let id = PortId(id.id);
 
                        return Some(ValueGroupPortRef{
 
                            id,
 
                            heap_pos: None,
 
                            index: self.index - 1
 
                        });
 
                    },
 
                    _ => {},
 
                }
 

	
 
                // Not a port, check if we need to enter a heap region
 
                if let Some(heap_pos) = value.get_heap_pos() {
 
                    self.heap_stack.push((heap_pos as usize, 0));
 
                } // else: just consider the next value
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -20,56 +20,58 @@ struct ControlEntry {
 
}
 

	
 
enum ControlContent {
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(CompId),
 
    BlockedPort(PortId),
 
    ClosedPort(PortId),
 
}
 

	
 
struct ContentPeerChange {
 
    source_port: PortId,
 
    source_comp: CompId,
 
    target_port: PortId,
 
    old_target_port: PortId,
 
    new_target_port: PortId,
 
    new_target_comp: CompId,
 
    schedule_entry_id: ControlId,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessageAndAck(CompId, ControlMessage, ControlId),
 
    ScheduleComponent(CompId),
 
}
 

	
 
/// Handling/sending control messages.
 
pub(crate) struct ControlLayer {
 
    id_counter: ControlId,
 
    entries: Vec<ControlEntry>,
 
}
 

	
 
impl ControlLayer {
 
    pub(crate) fn should_reroute(&self, message: &Message) -> Option<CompId> {
 
    pub(crate) fn should_reroute(&self, message: &mut Message) -> Option<CompId> {
 
        // Safety note: rerouting should occur during the time when we're
 
        // notifying a peer of a new component. During this period that
 
        // component hasn't been executed yet, so cannot have died yet.
 
        // FIX @NoDirectHandle
 
        let target_port = message.target_port();
 
        if target_port.is_none() {
 
            return None;
 
        }
 

	
 
        let target_port = target_port.unwrap();
 
        for entry in &self.entries {
 
            if let ControlContent::PeerChange(entry) = &entry.content {
 
                if entry.target_port == target_port {
 
                if entry.old_target_port == target_port {
 
                    message.modify_target_port(entry.new_target_port);
 
                    return Some(entry.new_target_comp);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction {
 
        let entry_index = self.get_entry_index(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 
@@ -82,25 +84,25 @@ impl ControlLayer {
 
        // All `Ack`s received, take action based on the kind of entry
 
        match &entry.content {
 
            ControlContent::PeerChange(content) => {
 
                // If change of peer is ack'd. Then we are certain we have
 
                // rerouted all of the messages, and the sender's port can now
 
                // be unblocked again.
 
                let target_comp_id = content.source_comp;
 
                let message_to_send = ControlMessage{
 
                    id: ControlId::new_invalid(),
 
                    sender_comp_id: comp_ctx.id,
 
                    target_port_id: Some(content.source_port),
 
                    content: ControlMessageContent::PortPeerChangedUnblock(
 
                        content.source_port,
 
                        content.new_target_port,
 
                        content.new_target_comp
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 
                self.entries.remove(entry_index);
 

	
 
                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
 
            },
 
            ControlContent::ScheduleComponent(to_schedule) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return AckAction::ScheduleComponent(*to_schedule);
 
@@ -150,35 +152,36 @@ impl ControlLayer {
 
    /// Removes a schedule entry. Only used if the caller preemptively called
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
    }
 

	
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
        target_port_id: PortId, new_comp_id: CompId,
 
        old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId,
 
        schedule_entry_id: ControlId,
 
    ) -> Message {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::PeerChange(ContentPeerChange{
 
                source_port: source_port_id,
 
                source_comp: source_comp_id,
 
                target_port: target_port_id,
 
                old_target_port: old_target_port_id,
 
                new_target_port: new_target_port_id,
 
                new_target_comp: new_comp_id,
 
                schedule_entry_id,
 
            }),
 
        });
 

	
 
        // increment counter on schedule entry
 
        for entry in &mut self.entries {
 
            if entry.id == schedule_entry_id {
 
                entry.ack_countdown += 1;
 
                break;
 
            }
 
        }
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::*;
 
use crate::runtime2::component::wake_up_if_sleeping;
 

	
 
use super::communication::Message;
 
use super::component::{CompCtx, CompPDL};
 
use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer};
 
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
 
use super::scheduler::*;
 

	
 
// -----------------------------------------------------------------------------
 
// Component
 
// -----------------------------------------------------------------------------
 

	
 
/// Key to a component. Type system somewhat ensures that there can only be one
 
/// of these. Only with a key one may retrieve privately-accessible memory for
 
/// a component. Practically just a generational index, like `CompId` is.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct CompKey(pub u32);
 

	
 
@@ -34,24 +34,35 @@ impl CompId {
 
    pub(crate) fn new_invalid() -> CompId {
 
        return CompId(u32::MAX);
 
    }
 

	
 
    /// Upgrade component ID to component key. Unsafe because the caller needs
 
    /// to make sure that only one component key can exist at a time (to ensure
 
    /// a component can only be scheduled/executed by one thread).
 
    pub(crate) unsafe fn upgrade(&self) -> CompKey {
 
        return CompKey(self.0);
 
    }
 
}
 

	
 
/// Handle to a component that is being created.
 
pub(crate) struct CompReserved {
 
    reservation: ComponentReservation,
 
}
 

	
 
impl CompReserved {
 
    pub(crate) fn id(&self) -> CompId {
 
        return CompId(self.reservation.index)
 
    }
 
}
 

	
 
/// Private fields of a component, may only be modified by a single thread at
 
/// a time.
 
pub(crate) struct RuntimeComp {
 
    pub public: CompPublic,
 
    pub code: CompPDL,
 
    pub ctx: CompCtx,
 
    pub inbox: QueueDynMpsc<Message>
 
}
 

	
 
/// Should contain everything that is accessible in a thread-safe manner
 
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
 
//  "foolproof" to lighten the mental burden of using the `num_handles`
 
@@ -199,38 +210,70 @@ impl RuntimeInner {
 
        // We have work, or the schedulers should exit.
 
        return lock.pop_front();
 
    }
 

	
 
    pub(crate) fn enqueue_work(&self, key: CompKey) {
 
        let mut lock = self.work_queue.lock().unwrap();
 
        lock.push_back(key);
 
        self.work_condvar.notify_one();
 
    }
 

	
 
    // Creating/destroying components
 

	
 
    pub(crate) fn start_create_pdl_component(&self) -> CompReserved {
 
        self.increment_active_components();
 
        let reservation = self.components.reserve();
 
        return CompReserved{ reservation };
 
    }
 

	
 
    pub(crate) fn finish_create_pdl_component(
 
        &self, reserved: CompReserved,
 
        component: CompPDL, mut context: CompCtx, initially_sleeping: bool,
 
    ) -> (CompKey, &mut RuntimeComp) {
 
        let inbox_queue = QueueDynMpsc::new(16);
 
        let inbox_producer = inbox_queue.producer();
 

	
 
        let _id = reserved.id();
 
        context.id = reserved.id();
 
        let component = RuntimeComp {
 
            public: CompPublic{
 
                sleeping: AtomicBool::new(initially_sleeping),
 
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
 
                inbox: inbox_producer,
 
            },
 
            code: component,
 
            ctx: context,
 
            inbox: inbox_queue,
 
        };
 

	
 
        let index = self.components.submit(reserved.reservation, component);
 
        debug_assert_eq!(index, _id.0);
 
        let component = self.components.get_mut(index);
 

	
 
        return (CompKey(index), component);
 
    }
 

	
 
    /// Creates a new component. Note that the public part will be properly
 
    /// initialized, but the private fields (e.g. owned ports, peers, etc.)
 
    /// are not.
 
    pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) {
 
    /// initialized, but not all private fields are.
 
    pub(crate) fn create_pdl_component(&self, comp: CompPDL, ctx: CompCtx, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) {
 
        let inbox_queue = QueueDynMpsc::new(16);
 
        let inbox_producer = inbox_queue.producer();
 
        let comp = RuntimeComp{
 
            public: CompPublic{
 
                sleeping: AtomicBool::new(initially_sleeping),
 
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
 
                inbox: inbox_producer,
 
            },
 
            code: comp,
 
            ctx: CompCtx::default(),
 
            ctx,
 
            inbox: inbox_queue,
 
        };
 

	
 
        let index = self.components.create(comp);
 

	
 
        // TODO: just do a reserve_index followed by a consume_index or something
 
        self.increment_active_components();
 
        let component = self.components.get_mut(index);
 
        component.ctx.id = CompId(index);
 

	
 
        return (CompKey(index), component);
 
    }
src/runtime2/store/component.rs
Show inline comments
 
@@ -34,44 +34,71 @@
 
 *  - A lot of concurrent operations are not necessary: we may move some of the
 
 *    access to the global concurrent datastructure by an initial access to some
 
 *    kind of thread-local datastructure.
 
 */
 

	
 
use std::mem::transmute;
 
use std::alloc::{dealloc, Layout};
 
use std::ptr;
 
use std::sync::atomic::{AtomicUsize, Ordering};
 

	
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
/// Generic store of components. Essentially a resizable freelist (implemented
 
/// as a ringbuffer) combined with an array of actual elements.
 
pub struct ComponentStore<T: Sized> {
 
    inner: UnfairSeLock<Inner<T>>,
 
    read_head: AtomicUsize,
 
    write_head: AtomicUsize,
 
    limit_head: AtomicUsize,
 
}
 

	
 
unsafe impl<T: Sized> Send for ComponentStore<T>{}
 
unsafe impl<T: Sized> Sync for ComponentStore<T>{}
 

	
 
/// Contents of the `ComponentStore` that require a shared/exclusive locking
 
/// mechanism for consistency.
 
struct Inner<T: Sized> {
 
    freelist: Vec<u32>,
 
    data: Vec<*mut T>,
 
    size: usize,
 
    compare_mask: usize,
 
    index_mask: usize,
 
}
 

	
 
type InnerShared<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 

	
 
/// Reservation of a slot in the component store. Corresponds to the case where
 
/// an index has been taken from the freelist, but the element has not yet been
 
/// initialized
 
pub struct ComponentReservation {
 
    pub(crate) index: u32,
 
    #[cfg(debug_assertions)] submitted: bool,
 
}
 

	
 
impl ComponentReservation {
 
    fn new(index: u32) -> Self {
 
        return Self{
 
            index,
 
            #[cfg(debug_assertions)] submitted: false,
 
        }
 
    }
 
}
 

	
 
impl Drop for ComponentReservation {
 
    fn drop(&mut self) {
 
        debug_assert!(self.submitted);
 
    }
 
}
 

	
 
impl<T: Sized> ComponentStore<T> {
 
    pub fn new(initial_size: usize) -> Self {
 
        Self::assert_valid_size(initial_size);
 

	
 
        // Fill initial freelist and preallocate data array
 
        let mut initial_freelist = Vec::with_capacity(initial_size);
 
        for idx in 0..initial_size {
 
            initial_freelist.push(idx as u32)
 
        }
 

	
 
        let mut initial_data = Vec::new();
 
        initial_data.resize(initial_size, ptr::null_mut());
 
@@ -87,28 +114,41 @@ impl<T: Sized> ComponentStore<T> {
 
            }),
 
            read_head: AtomicUsize::new(0),
 
            write_head: AtomicUsize::new(initial_size),
 
            limit_head: AtomicUsize::new(initial_size),
 
        };
 
    }
 

	
 
    /// Creates a new element initialized to the provided `value`. This returns
 
    /// the index at which the element can be retrieved.
 
    pub fn create(&self, value: T) -> u32 {
 
        let lock = self.inner.lock_shared();
 
        let (lock, index) = self.pop_freelist_index(lock);
 
        self.initialize_at_index(lock, index, value);
 
        Self::initialize_at_index(lock, index, value);
 
        return index;
 
    }
 

	
 
    pub fn reserve(&self) -> ComponentReservation {
 
        let lock = self.inner.lock_shared();
 
        let (lock, index) = self.pop_freelist_index(lock);
 
        return ComponentReservation::new(index);
 
    }
 

	
 
    pub fn submit(&self, mut reservation: ComponentReservation, value: T) -> u32 {
 
        dbg_code!({ reservation.submitted = true; });
 
        let lock = self.inner.lock_shared();
 
        Self::initialize_at_index(lock, reservation.index, value);
 
        return reservation.index;
 
    }
 

	
 
    /// Destroys an element at the provided `index`. The caller must make sure
 
    /// that it does not use any previously received references to the data at
 
    /// this index, and that no more calls to `get` are performed using this
 
    /// index. This is allowed again if the index has been reacquired using
 
    /// `create`.
 
    pub fn destroy(&self, index: u32) {
 
        let lock = self.inner.lock_shared();
 
        self.destruct_at_index(&lock, index);
 
        self.push_freelist_index(&lock, index);
 
    }
 

	
 
    /// Retrieves an element by reference
 
@@ -154,25 +194,25 @@ impl<T: Sized> ComponentStore<T> {
 
                    // We need to try again
 
                    read_index = actual_read_index;
 
                    continue 'attempt_read;
 
                }
 

	
 
                // If here then we performed the read
 
                return (shared_lock, preemptive_read);
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn initialize_at_index(&self, read_lock: InnerShared<T>, index: u32, value: T) {
 
    fn initialize_at_index(read_lock: InnerShared<T>, index: u32, value: T) {
 
        let mut target_ptr = read_lock.data[index as usize];
 

	
 
        unsafe {
 
            if target_ptr.is_null() {
 
                let layout = Layout::for_value(&value);
 
                target_ptr = std::alloc::alloc(layout).cast();
 
                let rewrite: *mut *mut T = transmute(read_lock.data.as_ptr());
 
                *rewrite.add(index as usize) = target_ptr;
 
            }
 

	
 
            std::ptr::write(target_ptr, value);
 
        }
src/runtime2/store/mod.rs
Show inline comments
 
#[macro_use]
 
#[cfg(test)]
 
mod tests;
 

	
 
pub mod unfair_se_lock;
 
pub mod component;
 
pub mod queue_mpsc;
 

	
 
pub(crate) use component::ComponentStore;
 
pub(crate) use component::{ComponentStore, ComponentReservation};
 
pub(crate) use queue_mpsc::{QueueDynMpsc, QueueDynProducer};
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::CompPDL;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_pdl_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let (key, _) = rt.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    for i in 0..20 {
 
        let prompt = rt.inner.protocol.new_component(b"", b"nothing_at_all", ValueGroup::new_stack(Vec::new()))
 
            .expect("component creation");
 
        let comp = CompPDL::new(prompt, 0);
 
        let (key, _) = rt.inner.create_pdl_component(comp, false);
 
        rt.inner.enqueue_work(key);
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o) {
 
        print(\"sender\");
 
        sync put(o, 1);
 
    }
 
    primitive receiver(in<u32> i) {
 
        print(\"receiver\");
 
@@ -35,17 +43,14 @@ fn test_component_communication() {
 
    }
 
    composite constructor() {
 
        channel o -> i;
 
        print(\"creating sender\");
 
        new sender(o);
 
        print(\"creating receiver\");
 
        new receiver(i);
 
        print(\"done\");
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    let prompt = rt.inner.protocol.new_component(b"", b"constructor", ValueGroup::new_stack(Vec::new()))
 
        .expect("creation");
 
    let (key, _) = rt.inner.create_pdl_component(CompPDL::new(prompt, 0), false);
 
    rt.inner.enqueue_work(key);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)