Changeset - bf4c0ee5ba65
[Not reviewed]
0 7 0
mh - 3 years ago 2022-01-25 16:46:08
contact@maxhenger.nl
WIP: Changed component creation logic once more
7 files changed with 389 insertions and 108 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -29,7 +29,7 @@ pub enum PortKind {
 
    Getter,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub enum PortState {
 
    Open,
 
    Blocked,
 
@@ -189,6 +189,16 @@ impl Message {
 
                return None,
 
        }
 
    }
 

	
 
    pub(crate) fn modify_target_port(&mut self, port_id: PortId) {
 
        match self {
 
            Message::Data(v) =>
 
                v.data_header.target_port = port_id,
 
            Message::Control(v) =>
 
                v.target_port_id = Some(port_id),
 
            Message::Sync(_) => unreachable!(), // should never be called for this message type
 
        }
 
    }
 
}
 

	
 

	
src/runtime2/component/component_pdl.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::ast::DefinitionId;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
@@ -28,10 +29,10 @@ pub struct CompCtx {
 
    pub port_id_counter: u32,
 
}
 

	
 
impl Default for CompCtx {
 
    fn default() -> Self {
 
impl CompCtx {
 
    pub(crate) fn new(reservation: &CompReserved) -> Self {
 
        return Self{
 
            id: CompId(0),
 
            id: reservation.id(),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            messages: Vec::new(),
 
@@ -46,6 +47,8 @@ struct MessageView<'a> {
 
}
 

	
 
impl CompCtx {
 
    /// Creates a new channel that is fully owned by the component associated
 
    /// with this context.
 
    fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
@@ -67,6 +70,63 @@ impl CompCtx {
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adopts a port transferred by another component. Essentially copies all
 
    /// port data but creates a new ID. Caller should ensure that the other
 
    /// endpoint becomes aware of this ID.
 
    fn adopt_port(&mut self, to_transfer: &Port) -> &mut Port {
 
        let port_id = PortId(self.take_port_id());
 
        let port_index = self.ports.len();
 
        self.ports.push(Port{
 
            self_id: port_id,
 
            peer_id: to_transfer.peer_id,
 
            kind: to_transfer.kind,
 
            state: to_transfer.state,
 
            peer_comp_id: to_transfer.peer_comp_id,
 
        });
 
        return &mut self.ports[port_index];
 
    }
 

	
 
    /// Adds a peer (or increments the "associated port" counter). Hence caller
 
    /// must make sure that this makes sense.
 
    fn add_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId, peer_handle: Option<&CompHandle>) {
 
        match self.get_peer_index(peer_id) {
 
            Some(peer_index) => {
 
                let peer_info = &mut self.peers[peer_index];
 
                peer_info.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = if let Some(handle) = peer_handle {
 
                    handle.clone()
 
                } else {
 
                    sched_ctx.runtime.get_component_public(peer_id)
 
                };
 

	
 
                self.peers.push(Peer{
 
                    id: peer_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                })
 
            }
 
        }
 
    }
 

	
 
    /// Removes a peer (or decrements the "associated port" counter). If there
 
    /// are no more references to the peer then the handle will be destroyed.
 
    fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, peer_id: CompId) {
 
        let peer_index = self.get_peer_index(peer_id).unwrap();
 
        let peer_info = &mut self.peers[peer_index];
 
        peer_info.num_associated_ports -= 1;
 

	
 
        if peer_info.num_associated_ports == 0 {
 
            let mut peer = self.peers.remove(peer_index);
 
            let should_remove = peer.handle.decrement_users();
 
            if should_remove {
 
                let key = unsafe{ peer.id.upgrade() };
 
                sched_ctx.runtime.destroy_component(key);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn get_port(&self, port_id: PortId) -> &Port {
 
        let index = self.get_port_index(port_id).unwrap();
 
        return &self.ports[index];
 
@@ -237,9 +297,9 @@ impl CompPDL {
 
        }
 
    }
 

	
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        sched_ctx.log(&format!("handling message: {:?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&message) {
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
@@ -333,15 +393,10 @@ impl CompPDL {
 
            },
 
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 

	
 
                let mut ports = Vec::new(); // TODO: Optimize
 
                let protocol = &sched_ctx.runtime.protocol;
 
                find_ports_in_value_group(&arguments, &mut ports);
 
                let prompt = Prompt::new(
 
                    &protocol.types, &protocol.heap,
 
                self.create_component_and_transfer_ports2(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &ports);
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::NewChannel => {
 
@@ -635,77 +690,134 @@ impl CompPDL {
 
        }
 
    }
 

	
 
    fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) {
 
        let component = CompPDL::new(prompt, ports.len());
 
        let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true);
 
        let created_ctx = &mut component.ctx;
 

	
 
        let mut has_reroute_entry = false;
 
        let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 

	
 
        for port_id in ports.iter().copied() {
 
            // Create temporary reroute entry if the peer is another component
 
            let port_info = creator_ctx.get_port(port_id);
 
            debug_assert_ne!(port_info.state, PortState::Blocked);
 
            if port_info.peer_comp_id == creator_ctx.id {
 
                // We own the peer port. So retrieve it and modify the peer directly
 
                let peer_port_id = port_info.peer_id;
 
                let port_info = creator_ctx.get_port_mut(peer_port_id);
 
                port_info.peer_comp_id = created_ctx.id;
 
                Self::add_peer_associated_port_to_component(sched_ctx, creator_ctx, created_ctx.id);
 
    fn create_component_and_transfer_ports2(
 
        &mut self,
 
        sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx,
 
        definition_id: DefinitionId, monomorph_index: i32, mut arguments: ValueGroup
 
    ) {
 
        struct PortPair{ creator: PortId, created: PortId }
 
        let mut port_id_pairs = Vec::new();
 

	
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        // Take all the ports ID that are in the `args` (and currently belong to
 
        // the creator component) and translate them into new IDs that are
 
        // associated with the component we're about to create
 
        let mut arg_iter = ValueGroupIter::new(&mut arguments);
 
        while let Some(port_reference) = arg_iter.next() {
 
            // Create port entry for new component
 
            let creator_port_id = port_reference.id;
 
            let creator_port = creator_ctx.get_port(creator_port_id);
 
            let created_port = created_ctx.adopt_port(creator_port);
 
            let created_port_id = created_port.self_id;
 

	
 
            port_id_pairs.push(PortPair{
 
                creator: creator_port_id,
 
                created: created_port_id,
 
            });
 

	
 
            // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues)
 
            let arg_value = if let Some(heap_pos) = port_reference.heap_pos {
 
                &mut arg_iter.group.regions[heap_pos][port_reference.index]
 
            } else {
 
                // We don't own the peer port, so send the appropriate messages
 
                // to the peer component and notify the control layer
 
                has_reroute_entry = true;
 
                let message = self.control.add_reroute_entry(
 
                    creator_ctx.id, port_info.peer_id, port_info.peer_comp_id,
 
                    port_info.self_id, created_ctx.id, schedule_entry_id
 
                );
 
                let peer_info = creator_ctx.get_peer(port_info.peer_comp_id);
 
                peer_info.handle.send_message(sched_ctx, message, true);
 
                &mut arg_iter.group.values[port_reference.index]
 
            };
 
            match arg_value {
 
                Value::Input(id) => *id = port_id_to_eval(created_port_id),
 
                Value::Output(id) => *id = port_id_to_eval(created_port_id),
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
            // Take out any potential messages for the peer
 
            let creator_port_index = creator_ctx.get_port_index(port_id).unwrap();
 
            let port_main_message = self.inbox_main[creator_port_index].take();
 
        // For each transferred port pair set their peer components to the
 
        // correct values. This will only change the values for the ports of
 
        // the new component.
 
        let mut created_component_has_remote_peers = false;
 

	
 
        for pair in port_id_pairs.iter() {
 
            let creator_port_info = creator_ctx.get_port(pair.creator);
 
            let created_port_info = created_ctx.get_port_mut(pair.created);
 

	
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // Port peer is owned by the creator as well
 
                let created_peer_port_index = port_id_pairs
 
                    .iter()
 
                    .position(|v| v.creator == creator_port_info.peer_id);
 
                match created_peer_port_index {
 
                    Some(created_peer_port_index) => {
 
                        // Peer port moved to the new component as well
 
                        let peer_pair = &port_id_pairs[created_peer_port_index];
 
                        created_port_info.peer_id = peer_pair.created;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.add_peer(sched_ctx, creator_ctx.id, None);
 
                    }
 
                }
 
            } else {
 
                // Peer is a different component
 
                let peer_info = creator_ctx.get_peer(created_port_info.peer_comp_id);
 
                created_ctx.add_peer(sched_ctx, peer_info.id, Some(&peer_info.handle));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 

	
 
            // Transfer port and create temporary reroute entry
 
            let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id);
 
            if port_info.state == PortState::Blocked {
 
                todo!("Think about this when you're not tired!");
 
        // We'll now actually turn our reservation for a new component into an
 
        // actual component. Note that we initialize it as "not sleeping" as
 
        // its initial scheduling might be performed based on `Ack`s in response
 
        // to message exchanges between remote peers.
 
        let prompt = Prompt::new(
 
            &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
            definition_id, monomorph_index, arguments,
 
        );
 
        let component = CompPDL::new(prompt, port_id_pairs.len());
 
        let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component(
 
            reservation, component, created_ctx, false,
 
        );
 
        let created_ctx = &component.ctx;
 

	
 
        // Now modify the creator's ports: remove every transferred port and
 
        // potentially remove the peer component
 
        for pair in port_id_pairs.iter() {
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator).unwrap();
 
            let creator_port_info = creator_ctx.ports.remove(creator_port_index);
 
            if creator_port_info.peer_comp_id != creator_ctx.id {
 
                creator_ctx.remove_peer(sched_ctx, creator_port_info.peer_comp_id);
 
            }
 
            Self::add_port_to_component(sched_ctx, created_ctx, port_info);
 

	
 
            // Transfer the taken messages
 
            let created_port_index = created_ctx.get_port_index(port_id).unwrap();
 
            component.code.inbox_main[created_port_index] = port_main_message;
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                if self.inbox_backup[message_index].data_header.target_port == port_id {
 
                    // Move this message
 
                    let message = self.inbox_backup.remove(message_index);
 
                    component.code.inbox_backup.push(message);
 
                } else {
 
                    message_index += 1;
 
                }
 

	
 
            let created_port_info = created_ctx.get_port(pair.created);
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // This is the cause where the creator obtains a reference
 
                // to the created component
 
                let peer_port_info = creator_ctx.get_port_mut(created_port_info.peer_id);
 
                peer_port_info.peer_comp_id = created_ctx.id;
 
                creator_ctx.add_peer(sched_ctx, created_ctx.id, None);
 
            }
 
        }
 

	
 
            // Maybe remove peer from the creator
 
            if let Some(mut peer_info) = peer_info {
 
                let remove_from_runtime = peer_info.handle.decrement_users();
 
                if remove_from_runtime {
 
                    let removed_comp_key = unsafe{ peer_info.id.upgrade() };
 
                    sched_ctx.runtime.destroy_component(removed_comp_key);
 
        // By now all ports have been transferred. We'll now do any of the setup
 
        // for rerouting/messaging
 
        if created_component_has_remote_peers {
 
            let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 
            for pair in port_id_pairs.iter() {
 
                let port_info = created_ctx.get_port(pair.created);
 
                if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                    let message = self.control.add_reroute_entry(
 
                        creator_ctx.id, port_info.peer_id, port_info.peer_comp_id,
 
                        pair.creator, pair.created, created_ctx.id,
 
                        schedule_entry_id
 
                    );
 
                    let peer_info = created_ctx.get_peer(port_info.peer_comp_id);
 
                    peer_info.handle.send_message(sched_ctx, message, true);
 
                }
 
            }
 
        } else {
 
            // Peer can be scheduled immediately
 
            sched_ctx.runtime.enqueue_work(created_key);
 
        }
 

	
 
        if !has_reroute_entry {
 
            // We can schedule the component immediately
 
            self.control.remove_schedule_entry(schedule_entry_id);
 
            component.public.sleeping.store(false, std::sync::atomic::Ordering::Release);
 
            sched_ctx.runtime.enqueue_work(comp_key);
 
        } // else: wait for the `Ack`s, they will trigger the scheduling of the component
 
    }
 

	
 
    /// Removes a port from a component. Also decrements the port counter in
 
@@ -735,16 +847,6 @@ impl CompPDL {
 
        return (port_info, Some(peer_info));
 
    }
 

	
 
    /// Adds a port to the component context. The peer (or its counter) will be
 
    /// updated accordingly.
 
    fn add_port_to_component(sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_info: Port) {
 
        // Add the port info
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert!(!comp_ctx.ports.iter().any(|v| v.self_id == port_info.self_id));
 
        comp_ctx.ports.push(port_info);
 
        Self::add_peer_associated_port_to_component(sched_ctx, comp_ctx, peer_comp_id);
 
    }
 

	
 
    /// Only adds/updates a peer for a given port. This function assumes (but
 
    /// does not check!) that the port was not considered to belong to that peer
 
    /// before calling this function.
 
@@ -834,4 +936,82 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 

	
 
struct ValueGroupIter<'a> {
 
    group: &'a mut ValueGroup,
 
    heap_stack: Vec<(usize, usize)>,
 
    index: usize,
 
}
 

	
 
impl<'a> ValueGroupIter<'a> {
 
    fn new(group: &'a mut ValueGroup) -> Self {
 
        return Self{ group, heap_stack: Vec::new(), index: 0 }
 
    }
 
}
 

	
 
struct ValueGroupPortRef {
 
    id: PortId,
 
    heap_pos: Option<usize>, // otherwise: on stack
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for ValueGroupIter<'a> {
 
    type Item = ValueGroupPortRef;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Enter loop that keeps iterating until a port is found
 
        loop {
 
            if let Some(pos) = self.heap_stack.last() {
 
                let (heap_pos, region_index) = *pos;
 
                if region_index >= self.group.regions[heap_pos].len() {
 
                    self.heap_stack.pop();
 
                    continue;
 
                }
 

	
 
                let value = &self.group.regions[heap_pos][region_index];
 
                self.heap_stack.last_mut().unwrap().1 += 1;
 

	
 
                match value {
 
                    Value::Input(id) | Value::Output(id) => {
 
                        let id = PortId(id.id);
 
                        return Some(ValueGroupPortRef{
 
                            id,
 
                            heap_pos: Some(heap_pos),
 
                            index: region_index,
 
                        });
 
                    },
 
                    _ => {},
 
                }
 

	
 
                if let Some(heap_pos) = value.get_heap_pos() {
 
                    self.heap_stack.push((heap_pos as usize, 0));
 
                }
 
            } else {
 
                if self.index >= self.group.values.len() {
 
                    return None;
 
                }
 

	
 
                let value = &mut self.group.values[self.index];
 
                self.index += 1;
 

	
 
                match value {
 
                    Value::Input(id) | Value::Output(id) => {
 
                        let id = PortId(id.id);
 
                        return Some(ValueGroupPortRef{
 
                            id,
 
                            heap_pos: None,
 
                            index: self.index - 1
 
                        });
 
                    },
 
                    _ => {},
 
                }
 

	
 
                // Not a port, check if we need to enter a heap region
 
                if let Some(heap_pos) = value.get_heap_pos() {
 
                    self.heap_stack.push((heap_pos as usize, 0));
 
                } // else: just consider the next value
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -29,7 +29,8 @@ enum ControlContent {
 
struct ContentPeerChange {
 
    source_port: PortId,
 
    source_comp: CompId,
 
    target_port: PortId,
 
    old_target_port: PortId,
 
    new_target_port: PortId,
 
    new_target_comp: CompId,
 
    schedule_entry_id: ControlId,
 
}
 
@@ -47,7 +48,7 @@ pub(crate) struct ControlLayer {
 
}
 

	
 
impl ControlLayer {
 
    pub(crate) fn should_reroute(&self, message: &Message) -> Option<CompId> {
 
    pub(crate) fn should_reroute(&self, message: &mut Message) -> Option<CompId> {
 
        // Safety note: rerouting should occur during the time when we're
 
        // notifying a peer of a new component. During this period that
 
        // component hasn't been executed yet, so cannot have died yet.
 
@@ -60,7 +61,8 @@ impl ControlLayer {
 
        let target_port = target_port.unwrap();
 
        for entry in &self.entries {
 
            if let ControlContent::PeerChange(entry) = &entry.content {
 
                if entry.target_port == target_port {
 
                if entry.old_target_port == target_port {
 
                    message.modify_target_port(entry.new_target_port);
 
                    return Some(entry.new_target_comp);
 
                }
 
            }
 
@@ -91,7 +93,7 @@ impl ControlLayer {
 
                    sender_comp_id: comp_ctx.id,
 
                    target_port_id: Some(content.source_port),
 
                    content: ControlMessageContent::PortPeerChangedUnblock(
 
                        content.source_port,
 
                        content.new_target_port,
 
                        content.new_target_comp
 
                    )
 
                };
 
@@ -159,7 +161,7 @@ impl ControlLayer {
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
        target_port_id: PortId, new_comp_id: CompId,
 
        old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId,
 
        schedule_entry_id: ControlId,
 
    ) -> Message {
 
        let entry_id = self.take_id();
 
@@ -169,7 +171,8 @@ impl ControlLayer {
 
            content: ControlContent::PeerChange(ContentPeerChange{
 
                source_port: source_port_id,
 
                source_comp: source_comp_id,
 
                target_port: target_port_id,
 
                old_target_port: old_target_port_id,
 
                new_target_port: new_target_port_id,
 
                new_target_comp: new_comp_id,
 
                schedule_entry_id,
 
            }),
src/runtime2/runtime.rs
Show inline comments
 
@@ -7,7 +7,7 @@ use crate::runtime2::component::wake_up_if_sleeping;
 

	
 
use super::communication::Message;
 
use super::component::{CompCtx, CompPDL};
 
use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer};
 
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
 
use super::scheduler::*;
 

	
 
// -----------------------------------------------------------------------------
 
@@ -43,6 +43,17 @@ impl CompId {
 
    }
 
}
 

	
 
/// Handle to a component that is being created.
 
pub(crate) struct CompReserved {
 
    reservation: ComponentReservation,
 
}
 

	
 
impl CompReserved {
 
    pub(crate) fn id(&self) -> CompId {
 
        return CompId(self.reservation.index)
 
    }
 
}
 

	
 
/// Private fields of a component, may only be modified by a single thread at
 
/// a time.
 
pub(crate) struct RuntimeComp {
 
@@ -208,10 +219,42 @@ impl RuntimeInner {
 

	
 
    // Creating/destroying components
 

	
 
    pub(crate) fn start_create_pdl_component(&self) -> CompReserved {
 
        self.increment_active_components();
 
        let reservation = self.components.reserve();
 
        return CompReserved{ reservation };
 
    }
 

	
 
    pub(crate) fn finish_create_pdl_component(
 
        &self, reserved: CompReserved,
 
        component: CompPDL, mut context: CompCtx, initially_sleeping: bool,
 
    ) -> (CompKey, &mut RuntimeComp) {
 
        let inbox_queue = QueueDynMpsc::new(16);
 
        let inbox_producer = inbox_queue.producer();
 

	
 
        let _id = reserved.id();
 
        context.id = reserved.id();
 
        let component = RuntimeComp {
 
            public: CompPublic{
 
                sleeping: AtomicBool::new(initially_sleeping),
 
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
 
                inbox: inbox_producer,
 
            },
 
            code: component,
 
            ctx: context,
 
            inbox: inbox_queue,
 
        };
 

	
 
        let index = self.components.submit(reserved.reservation, component);
 
        debug_assert_eq!(index, _id.0);
 
        let component = self.components.get_mut(index);
 

	
 
        return (CompKey(index), component);
 
    }
 

	
 
    /// Creates a new component. Note that the public part will be properly
 
    /// initialized, but the private fields (e.g. owned ports, peers, etc.)
 
    /// are not.
 
    pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) {
 
    /// initialized, but not all private fields are.
 
    pub(crate) fn create_pdl_component(&self, comp: CompPDL, ctx: CompCtx, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) {
 
        let inbox_queue = QueueDynMpsc::new(16);
 
        let inbox_producer = inbox_queue.producer();
 
        let comp = RuntimeComp{
 
@@ -221,7 +264,7 @@ impl RuntimeInner {
 
                inbox: inbox_producer,
 
            },
 
            code: comp,
 
            ctx: CompCtx::default(),
 
            ctx,
 
            inbox: inbox_queue,
 
        };
 

	
src/runtime2/store/component.rs
Show inline comments
 
@@ -43,6 +43,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 

	
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
/// Generic store of components. Essentially a resizable freelist (implemented
 
/// as a ringbuffer) combined with an array of actual elements.
 
pub struct ComponentStore<T: Sized> {
 
    inner: UnfairSeLock<Inner<T>>,
 
    read_head: AtomicUsize,
 
@@ -53,6 +55,8 @@ pub struct ComponentStore<T: Sized> {
 
unsafe impl<T: Sized> Send for ComponentStore<T>{}
 
unsafe impl<T: Sized> Sync for ComponentStore<T>{}
 

	
 
/// Contents of the `ComponentStore` that require a shared/exclusive locking
 
/// mechanism for consistency.
 
struct Inner<T: Sized> {
 
    freelist: Vec<u32>,
 
    data: Vec<*mut T>,
 
@@ -63,6 +67,29 @@ struct Inner<T: Sized> {
 

	
 
type InnerShared<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 

	
 
/// Reservation of a slot in the component store. Corresponds to the case where
 
/// an index has been taken from the freelist, but the element has not yet been
 
/// initialized
 
pub struct ComponentReservation {
 
    pub(crate) index: u32,
 
    #[cfg(debug_assertions)] submitted: bool,
 
}
 

	
 
impl ComponentReservation {
 
    fn new(index: u32) -> Self {
 
        return Self{
 
            index,
 
            #[cfg(debug_assertions)] submitted: false,
 
        }
 
    }
 
}
 

	
 
impl Drop for ComponentReservation {
 
    fn drop(&mut self) {
 
        debug_assert!(self.submitted);
 
    }
 
}
 

	
 
impl<T: Sized> ComponentStore<T> {
 
    pub fn new(initial_size: usize) -> Self {
 
        Self::assert_valid_size(initial_size);
 
@@ -96,10 +123,23 @@ impl<T: Sized> ComponentStore<T> {
 
    pub fn create(&self, value: T) -> u32 {
 
        let lock = self.inner.lock_shared();
 
        let (lock, index) = self.pop_freelist_index(lock);
 
        self.initialize_at_index(lock, index, value);
 
        Self::initialize_at_index(lock, index, value);
 
        return index;
 
    }
 

	
 
    pub fn reserve(&self) -> ComponentReservation {
 
        let lock = self.inner.lock_shared();
 
        let (lock, index) = self.pop_freelist_index(lock);
 
        return ComponentReservation::new(index);
 
    }
 

	
 
    pub fn submit(&self, mut reservation: ComponentReservation, value: T) -> u32 {
 
        dbg_code!({ reservation.submitted = true; });
 
        let lock = self.inner.lock_shared();
 
        Self::initialize_at_index(lock, reservation.index, value);
 
        return reservation.index;
 
    }
 

	
 
    /// Destroys an element at the provided `index`. The caller must make sure
 
    /// that it does not use any previously received references to the data at
 
    /// this index, and that no more calls to `get` are performed using this
 
@@ -163,7 +203,7 @@ impl<T: Sized> ComponentStore<T> {
 
    }
 

	
 
    #[inline]
 
    fn initialize_at_index(&self, read_lock: InnerShared<T>, index: u32, value: T) {
 
    fn initialize_at_index(read_lock: InnerShared<T>, index: u32, value: T) {
 
        let mut target_ptr = read_lock.data[index as usize];
 

	
 
        unsafe {
src/runtime2/store/mod.rs
Show inline comments
 
@@ -6,5 +6,5 @@ pub mod unfair_se_lock;
 
pub mod component;
 
pub mod queue_mpsc;
 

	
 
pub(crate) use component::ComponentStore;
 
pub(crate) use component::{ComponentStore, ComponentReservation};
 
pub(crate) use queue_mpsc::{QueueDynMpsc, QueueDynProducer};
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::CompPDL;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_pdl_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let (key, _) = rt.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
@@ -14,11 +26,7 @@ fn test_component_creation() {
 
    let rt = Runtime::new(1, pd);
 

	
 
    for i in 0..20 {
 
        let prompt = rt.inner.protocol.new_component(b"", b"nothing_at_all", ValueGroup::new_stack(Vec::new()))
 
            .expect("component creation");
 
        let comp = CompPDL::new(prompt, 0);
 
        let (key, _) = rt.inner.create_pdl_component(comp, false);
 
        rt.inner.enqueue_work(key);
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
@@ -44,8 +52,5 @@ fn test_component_communication() {
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    let prompt = rt.inner.protocol.new_component(b"", b"constructor", ValueGroup::new_stack(Vec::new()))
 
        .expect("creation");
 
    let (key, _) = rt.inner.create_pdl_component(CompPDL::new(prompt, 0), false);
 
    rt.inner.enqueue_work(key);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)