Files @ 8622657b70f9
Branch filter:

Location: CSY/reowolf/src/runtime2/component.rs - annotation

8622657b70f9 6.2 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
Add ctor/dtor tests to MPSC queue

Now also testing that resources are properly moved and/or bitwise
copied inside of the MPSC queue. To make this possible the Resource
testing struct was moved outside of the store::component::tests
module.
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
use crate::protocol::*;
use crate::protocol::eval::{
    PortId as EvalPortId, Prompt,
    ValueGroup, Value,
    EvalContinuation, EvalResult, EvalError
};

use super::runtime::*;
use super::scheduler::SchedulerCtx;
use super::communication::*;

pub enum CompScheduling {
    Immediate,
    Requeue,
    Sleep,
    Exit,
}

pub struct CompCtx {
    pub id: CompId,
    pub ports: Vec<Port>,
    pub peers: Vec<Peer>,
    pub messages: Vec<ValueGroup>, // same size as "ports"
}

impl CompCtx {
    fn take_message(&mut self, port_id: PortId) -> Option<ValueGroup> {
        let old_value = &mut self.messages[port_id.0 as usize];
        if old_value.values.is_empty() {
            return None;
        }

        // Replace value in array with an empty one
        let mut message = ValueGroup::new_stack(Vec::new());
        std::mem::swap(old_value, &mut message);
        return Some(message);
    }

    fn find_peer(&self, port_id: PortId) -> &Peer {
        let port_info = &self.ports[port_id.0 as usize];
        let peer_info = &self.peers[port_info.local_peer_index as usize];
        return peer_info;
    }
}

pub enum ExecStmt {
    CreatedChannel((Value, Value)),
    PerformedPut,
    PerformedGet(ValueGroup),
    None,
}

impl ExecStmt {
    fn take(&mut self) -> ExecStmt {
        let mut value = ExecStmt::None;
        std::mem::swap(self, &mut value);
        return value;
    }

    fn is_none(&self) -> bool {
        match self {
            ExecStmt::None => return true,
            _ => return false,
        }
    }
}

pub struct ExecCtx {
    stmt: ExecStmt,
}

impl RunContext for ExecCtx {
    fn performed_put(&mut self, _port: EvalPortId) -> bool {
        match self.stmt.take() {
            ExecStmt::None => return false,
            ExecStmt::PerformedPut => return true,
            _ => unreachable!(),
        }
    }

    fn performed_get(&mut self, _port: EvalPortId) -> Option<ValueGroup> {
        match self.stmt.take() {
            ExecStmt::None => return None,
            ExecStmt::PerformedGet(value) => return Some(value),
            _ => unreachable!(),
        }
    }

    fn fires(&mut self, _port: EvalPortId) -> Option<Value> {
        todo!("remove fires")
    }

    fn performed_fork(&mut self) -> Option<bool> {
        todo!("remove fork")
    }

    fn created_channel(&mut self) -> Option<(Value, Value)> {
        match self.stmt.take() {
            ExecStmt::None => return None,
            ExecStmt::CreatedChannel(ports) => return Some(ports),
            _ => unreachable!(),
        }
    }
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum Mode {
    NonSync,
    Sync,
    BlockedGet,
    BlockedPut,
}

pub(crate) struct CompPDL {
    pub mode: Mode,
    pub mode_port: PortId, // when blocked on a port
    pub mode_value: ValueGroup, // when blocked on a put
    pub prompt: Prompt,
    pub exec_ctx: ExecCtx,
}

impl CompPDL {
    pub(crate) fn new(initial_state: Prompt) -> Self {
        return Self{
            mode: Mode::NonSync,
            mode_port: PortId::new_invalid(),
            mode_value: ValueGroup::default(),
            prompt: initial_state,
            exec_ctx: ExecCtx{
                stmt: ExecStmt::None,
            }
        }
    }

    pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
        use EvalContinuation as EC;

        let run_result = self.execute_prompt(&sched_ctx)?;

        match run_result {
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
            // Results that can be returned in sync mode
            EC::SyncBlockEnd => {
                debug_assert_eq!(self.mode, Mode::Sync);
                self.handle_sync_end(sched_ctx, comp_ctx);
            },
            EC::BlockGet(port_id) => {
                debug_assert_eq!(self.mode, Mode::Sync);

                let port_id = transform_port_id(port_id);
                if let Some(message) = comp_ctx.take_message(port_id) {
                    // We can immediately receive and continue
                    debug_assert!(self.exec_ctx.stmt.is_none());
                    self.exec_ctx.stmt = ExecStmt::PerformedGet(message);
                    return Ok(CompScheduling::Immediate);
                } else {
                    // We need to wait
                    self.mode = Mode::BlockedGet;
                    self.mode_port = port_id;
                    return Ok(CompScheduling::Sleep);
                }
            },
            EC::Put(port_id, value) => {
                debug_assert_eq!(self.mode, Mode::Sync);

                let port_id = transform_port_id(port_id);
                let peer = comp_ctx.find_peer(port_id);
            },
            // Results that can be returned outside of sync mode
            EC::ComponentTerminated => {
                debug_assert_eq!(self.mode, Mode::NonSync);

            },
            EC::SyncBlockStart => {
                debug_assert_eq!(self.mode, Mode::NonSync);
                self.handle_sync_start(sched_ctx, comp_ctx);
            },
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
                debug_assert_eq!(self.mode, Mode::NonSync);

            },
            EC::NewChannel => {
                debug_assert_eq!(self.mode, Mode::NonSync);

            }
        }

        return Ok(CompScheduling::Sleep);
    }

    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
        let mut step_result = EvalContinuation::Stepping;
        while let EvalContinuation::Stepping = step_result {
            step_result = self.prompt.step(
                &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
                &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx,
            )?;
        }

        return Ok(step_result)
    }

    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {

    }

    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {

    }
}

#[inline]
fn transform_port_id(port_id: EvalPortId) -> PortId {
    return PortId(port_id.id);
}