Changeset - c04f7fea1a62
[Not reviewed]
0 7 0
mh - 3 years ago 2022-01-16 13:40:10
contact@maxhenger.nl
WIP: Fixing compile errors
7 files changed with 54 insertions and 52 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -2,7 +2,7 @@ use crate::protocol::eval::*;
 
use super::runtime::*;
 
use super::component::*;
 

	
 
#[derive(Copy, Clone)]
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct PortId(pub u32);
 

	
 

	
 
@@ -64,7 +64,7 @@ pub struct MessageDataHeader {
 
}
 

	
 
pub struct ControlMessage {
 
    pub id: ControlId,
 
    pub(crate) id: ControlId,
 
    pub sender_comp_id: CompId,
 
    pub target_port_id: Option<PortId>,
 
    pub content: ControlMessageContent,
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -5,7 +5,6 @@ use crate::protocol::eval::{
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::store::QueueDynMpsc;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 
@@ -209,7 +208,13 @@ pub(crate) struct CompPDL {
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt) -> Self {
 
    pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self {
 
        let mut inbox_main = Vec::new();
 
        inbox_main.reserve(num_ports);
 
        for _ in 0..num_ports {
 
            inbox_main.push(None);
 
        }
 

	
 
        return Self{
 
            mode: Mode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
@@ -221,15 +226,11 @@ impl CompPDL {
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main: Vec::new(),
 
            inbox_main,
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn handle_setup(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        self.inbox.resize(comp_ctx.ports.len(), None);
 
    }
 

	
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        if let Some(new_target) = self.control.should_reroute(&message) {
 
            let target = sched_ctx.runtime.get_component_public(new_target);
 
@@ -243,7 +244,7 @@ impl CompPDL {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_Ctx, message);
 
                self.handle_incoming_control_message(sched_ctx, comp_ctx, message);
 
            },
 
        }
 
    }
 
@@ -265,7 +266,7 @@ impl CompPDL {
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 

	
 
                let port_id = transform_port_id(port_id);
 
                let port_id = port_id_from_eval(port_id);
 
                if let Some(message) = comp_ctx.take_message(port_id) {
 
                    // We can immediately receive and continue
 
                    debug_assert!(self.exec_ctx.stmt.is_none());
 
@@ -310,7 +311,7 @@ impl CompPDL {
 
                    &protocol.types, &protocol.heap,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &workspace_ports);
 
                self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &ports);
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::NewChannel => {
 
@@ -379,7 +380,7 @@ impl CompPDL {
 
            // After direct insertion, check if this component's execution is 
 
            // blocked on receiving a message on that port
 
            debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly
 
            if self.mode == Mode::BlockedGet && self.mode_port == message.data_header.target_port {
 
            if self.mode == Mode::BlockedGet && self.mode_port == target_port_id {
 
                // We were indeed blocked
 
                self.mode = Mode::Sync;
 
                self.mode_port = PortId::new_invalid();
 
@@ -391,11 +392,12 @@ impl CompPDL {
 
        // The direct inbox is full, so the port will become (or was already) blocked
 
        let port_info = &mut comp_ctx.ports[port_index];
 
        debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 
        let _peer_comp_id = port_info.peer_comp_id;
 

	
 
        if port_info.state == PortState::Open {
 
            let (target_comp_id, block_message) =
 
                self.control.mark_port_blocked(target_port_id, comp_ctx);
 
            debug_assert_eq!(port_info.peer_comp_id, target_comp_id);
 
            debug_assert_eq!(_peer_comp_id, target_comp_id);
 

	
 
            let peer = comp_ctx.get_peer(target_comp_id);
 
            peer.handle.inbox.push(Message::Control(block_message));
 
@@ -447,9 +449,10 @@ impl CompPDL {
 
                // the component handle as well
 
                let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
                let port_info = &mut comp_ctx.ports[port_index];
 
                let peer_comp_id = port_info.peer_comp_id;
 
                port_info.state = PortState::Closed;
 

	
 
                let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap();
 
                let peer_index = comp_ctx.get_peer_index(peer_comp_id).unwrap();
 
                let peer_info = &mut comp_ctx.peers[peer_index];
 
                peer_info.num_associated_ports -= 1;
 
                if peer_info.num_associated_ports == 0 {
 
@@ -477,7 +480,7 @@ impl CompPDL {
 
                // temporarily block the port (while our original recipient is
 
                // potentially rerouting some of the in-flight messages) and
 
                // Ack. Then we wait for the `unblock` call.
 
                debug_assert_eq!(message.target_port_id, port_id);
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked);
 
                if port_info.state == PortState::Open {
 
@@ -485,7 +488,7 @@ impl CompPDL {
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => {
 
                debug_assert_eq!(message.target_port_id, port_id);
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
 
                let port_info = comp_ctx.get_port_mut(port_id);
 
                debug_assert!(port_info.state == PortState::Blocked);
 
                port_info.peer_comp_id = new_comp_id;
 
@@ -512,7 +515,7 @@ impl CompPDL {
 
    }
 

	
 
    fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) {
 
        let component = CompPDL::new(prompt);
 
        let component = CompPDL::new(prompt, ports.len());
 
        let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true);
 
        let created_ctx = &mut component.ctx;
 

	
 
@@ -525,7 +528,8 @@ impl CompPDL {
 
            debug_assert_ne!(port_info.state, PortState::Blocked);
 
            if port_info.peer_comp_id == creator_ctx.id {
 
                // We own the peer port. So retrieve it and modify the peer directly
 
                let port_info = creator_ctx.get_port_mut(port_info.peer_id);
 
                let peer_port_id = port_info.peer_id;
 
                let port_info = creator_ctx.get_port_mut(peer_port_id);
 
                port_info.peer_comp_id = created_ctx.id;
 
            } else {
 
                // We don't own the port, so send the appropriate messages and
 
@@ -540,7 +544,7 @@ impl CompPDL {
 
            }
 

	
 
            // Transfer port and create temporary reroute entry
 
            let (mut port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id);
 
            let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id);
 
            if port_info.state == PortState::Blocked {
 
                todo!("Think about this when you're not tired!");
 
            }
 
@@ -568,8 +572,6 @@ impl CompPDL {
 
    /// returned. If returned then the caller is responsible for decrementing
 
    /// the atomic counters of the peer component's handle.
 
    fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option<Peer>) {
 
        use std::sync::atomic::Ordering;
 

	
 
        let port_index = comp_ctx.get_port_index(port_id).unwrap();
 
        let port_info = comp_ctx.ports.remove(port_index);
 

	
 
@@ -650,8 +652,6 @@ fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    use crate::protocol::eval::Value;
 

	
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
src/runtime2/component/consensus.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::communication::*;
 

	
 
use super::component_pdl::*;
 
@@ -52,7 +52,7 @@ impl Consensus {
 
    pub(crate) fn transfer_ports(&mut self, comp_ctx: &CompCtx) {
 
        let mut needs_setting_ports = false;
 
        if comp_ctx.ports.len() != self.ports.len() {
 
            ports_same = true;
 
            needs_setting_ports = true;
 
        } else {
 
            for idx in 0..comp_ctx.ports.len() {
 
                let comp_port_id = comp_ctx.ports[idx].self_id;
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -102,10 +102,10 @@ impl ControlLayer {
 

	
 
                return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack);
 
            },
 
            ControlContent::ScheduleComponent(content) => {
 
            ControlContent::ScheduleComponent(to_schedule) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return AckAction::ScheduleComponent(content.to_schedule);
 
                return AckAction::ScheduleComponent(*to_schedule);
 
            },
 
            ControlContent::BlockedPort(_) => unreachable!(),
 
            ControlContent::ClosedPort(port_id) => {
 
@@ -143,9 +143,7 @@ impl ControlLayer {
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::ScheduleComponent(ContentScheduleComponent{
 
                to_schedule: to_schedule_id
 
            }),
 
            content: ControlContent::ScheduleComponent(to_schedule_id),
 
        });
 

	
 
        return entry_id;
 
@@ -201,11 +199,13 @@ impl ControlLayer {
 

	
 
    pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> {
 
        let port = comp_ctx.get_port_mut(port_id);
 
        let peer_port_id = port.peer_id;
 
        let peer_comp_id = port.peer_comp_id;
 
        debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked);
 

	
 
        port.state = PortState::Closed;
 

	
 
        if port.peer_comp_id == comp_ctx.id {
 
        if peer_comp_id == comp_ctx.id {
 
            // We own the other end of the channel as well
 
            return None;
 
        }
 
@@ -218,12 +218,12 @@ impl ControlLayer {
 
        });
 

	
 
        return Some((
 
            port.peer_comp_id,
 
            peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port.peer_id),
 
                content: ControlMessageContent::ClosePort(port.peer_id),
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(peer_port_id),
 
            }
 
        ));
 
    }
 
@@ -233,24 +233,24 @@ impl ControlLayer {
 
        //  be renamed. Lets see where the code ends up being
 
        let entry_id = self.take_id();
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        let peer_port_id = port_info.peer_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues
 
        port_info.state = PortState::Blocked;
 

	
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0,
 
            content: ControlContent::BlockedPort(ContentBlockedPort{
 
                blocked_port: port_id,
 
            }),
 
            content: ControlContent::BlockedPort(port_id),
 
        });
 

	
 
        return (
 
            port_info.peer_comp_id,
 
            peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_id),
 
                content: ControlMessageContent::BlockPort(port_info.peer_id),
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::BlockPort(peer_port_id),
 
            }
 
        );
 
    }
 
@@ -258,10 +258,10 @@ impl ControlLayer {
 
    pub(crate) fn mark_port_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) {
 
        // Find the entry that contains the blocking entry for the port
 
        let mut entry_index = usize::MAX;
 
        let mut entry_id = ControlId::MAX;
 
        let mut entry_id = ControlId::new_invalid();
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if let ControlContent::BlockedPort(block_entry) = &entry.content {
 
                if block_entry.blocked_port == port_id {
 
            if let ControlContent::BlockedPort(blocked_port) = &entry.content {
 
                if *blocked_port == port_id {
 
                    entry_index = index;
 
                    entry_id = entry.id;
 
                    break;
 
@@ -270,16 +270,18 @@ impl ControlLayer {
 
        }
 

	
 
        let port_info = comp_ctx.get_port_mut(port_id);
 
        let peer_port_id = port_info.peer_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        debug_assert_eq!(port_info.state, PortState::Blocked);
 
        port_info.state = PortState::Open;
 

	
 
        return (
 
            port_info.peer_comp_id,
 
            peer_comp_id,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_id),
 
                content: ControlMessageContent::UnblockPort(port_info.peer_id),
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::UnblockPort(peer_port_id),
 
            }
 
        )
 
    }
src/runtime2/scheduler.rs
Show inline comments
 
@@ -46,7 +46,7 @@ impl Scheduler {
 
            // be re-executed immediately.
 
            let mut new_scheduling = CompScheduling::Immediate;
 
            while let CompScheduling::Immediate = new_scheduling {
 
                new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error");
 
                new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
 
            }
 

	
 
            // Handle the new scheduling
 
@@ -62,7 +62,7 @@ impl Scheduler {
 
    // local utilities
 

	
 
    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
 
        debug_assert_eq!(key.downgrade(), component.private.ctx.id); // make sure component matches key
 
        debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
 
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
 

	
 
        component.public.sleeping.store(true, Ordering::Release);
 
@@ -80,7 +80,7 @@ impl Scheduler {
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
        for port_index in 0..component.ctx.ports.len() {
 
            let port_info = &component.ctx.ports[port_index];
 
            if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.id, comp_ctx) {
 
            if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.self_id, &mut component.ctx) {
 
                let peer_info = component.ctx.get_peer(peer_id);
 
                peer_info.handle.inbox.push(Message::Control(message));
 

	
src/runtime2/store/component.rs
Show inline comments
 
@@ -37,7 +37,7 @@
 
 */
 

	
 
use std::mem::transmute;
 
use std::alloc::{alloc, dealloc, Layout};
 
use std::alloc::{dealloc, Layout};
 
use std::ptr;
 
use std::sync::atomic::{AtomicUsize, Ordering};
 

	
src/runtime2/store/unfair_se_lock.rs
Show inline comments
 
@@ -78,7 +78,7 @@ impl<T> UnfairSeLock<T> {
 
                    // to wait until the reader count is at 0.
 
                    shared = new_shared;
 
                    if shared != EXCLUSIVE_BIT {
 
                        shared = self.wait_until_not_shared(shared);
 
                        self.wait_until_not_shared(shared);
 
                    }
 

	
 
                    return UnfairSeLockExclusiveGuard::new(self);
0 comments (0 inline, 0 general)