Files @ 9b5ea2f879a4
Branch filter:

Location: CSY/reowolf/src/runtime2/component.rs

9b5ea2f879a4 6.2 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
Implement MPSC queue

Multiple producer, single consumer queue with the purpose of acting
as the inbox for components.
use crate::protocol::*;
use crate::protocol::eval::{
    PortId as EvalPortId, Prompt,
    ValueGroup, Value,
    EvalContinuation, EvalResult, EvalError
};

use super::runtime::*;
use super::scheduler::SchedulerCtx;
use super::communication::*;

pub enum CompScheduling {
    Immediate,
    Requeue,
    Sleep,
    Exit,
}

pub struct CompCtx {
    pub id: CompId,
    pub ports: Vec<Port>,
    pub peers: Vec<Peer>,
    pub messages: Vec<ValueGroup>, // same size as "ports"
}

impl CompCtx {
    fn take_message(&mut self, port_id: PortId) -> Option<ValueGroup> {
        let old_value = &mut self.messages[port_id.0 as usize];
        if old_value.values.is_empty() {
            return None;
        }

        // Replace value in array with an empty one
        let mut message = ValueGroup::new_stack(Vec::new());
        std::mem::swap(old_value, &mut message);
        return Some(message);
    }

    fn find_peer(&self, port_id: PortId) -> &Peer {
        let port_info = &self.ports[port_id.0 as usize];
        let peer_info = &self.peers[port_info.local_peer_index as usize];
        return peer_info;
    }
}

pub enum ExecStmt {
    CreatedChannel((Value, Value)),
    PerformedPut,
    PerformedGet(ValueGroup),
    None,
}

impl ExecStmt {
    fn take(&mut self) -> ExecStmt {
        let mut value = ExecStmt::None;
        std::mem::swap(self, &mut value);
        return value;
    }

    fn is_none(&self) -> bool {
        match self {
            ExecStmt::None => return true,
            _ => return false,
        }
    }
}

pub struct ExecCtx {
    stmt: ExecStmt,
}

impl RunContext for ExecCtx {
    fn performed_put(&mut self, _port: EvalPortId) -> bool {
        match self.stmt.take() {
            ExecStmt::None => return false,
            ExecStmt::PerformedPut => return true,
            _ => unreachable!(),
        }
    }

    fn performed_get(&mut self, _port: EvalPortId) -> Option<ValueGroup> {
        match self.stmt.take() {
            ExecStmt::None => return None,
            ExecStmt::PerformedGet(value) => return Some(value),
            _ => unreachable!(),
        }
    }

    fn fires(&mut self, _port: EvalPortId) -> Option<Value> {
        todo!("remove fires")
    }

    fn performed_fork(&mut self) -> Option<bool> {
        todo!("remove fork")
    }

    fn created_channel(&mut self) -> Option<(Value, Value)> {
        match self.stmt.take() {
            ExecStmt::None => return None,
            ExecStmt::CreatedChannel(ports) => return Some(ports),
            _ => unreachable!(),
        }
    }
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) enum Mode {
    NonSync,
    Sync,
    BlockedGet,
    BlockedPut,
}

pub(crate) struct CompPDL {
    pub mode: Mode,
    pub mode_port: PortId, // when blocked on a port
    pub mode_value: ValueGroup, // when blocked on a put
    pub prompt: Prompt,
    pub exec_ctx: ExecCtx,
}

impl CompPDL {
    pub(crate) fn new(initial_state: Prompt) -> Self {
        return Self{
            mode: Mode::NonSync,
            mode_port: PortId::new_invalid(),
            mode_value: ValueGroup::default(),
            prompt: initial_state,
            exec_ctx: ExecCtx{
                stmt: ExecStmt::None,
            }
        }
    }

    pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
        use EvalContinuation as EC;

        let run_result = self.execute_prompt(&sched_ctx)?;

        match run_result {
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
            // Results that can be returned in sync mode
            EC::SyncBlockEnd => {
                debug_assert_eq!(self.mode, Mode::Sync);
                self.handle_sync_end(sched_ctx, comp_ctx);
            },
            EC::BlockGet(port_id) => {
                debug_assert_eq!(self.mode, Mode::Sync);

                let port_id = transform_port_id(port_id);
                if let Some(message) = comp_ctx.take_message(port_id) {
                    // We can immediately receive and continue
                    debug_assert!(self.exec_ctx.stmt.is_none());
                    self.exec_ctx.stmt = ExecStmt::PerformedGet(message);
                    return Ok(CompScheduling::Immediate);
                } else {
                    // We need to wait
                    self.mode = Mode::BlockedGet;
                    self.mode_port = port_id;
                    return Ok(CompScheduling::Sleep);
                }
            },
            EC::Put(port_id, value) => {
                debug_assert_eq!(self.mode, Mode::Sync);

                let port_id = transform_port_id(port_id);
                let peer = comp_ctx.find_peer(port_id);
            },
            // Results that can be returned outside of sync mode
            EC::ComponentTerminated => {
                debug_assert_eq!(self.mode, Mode::NonSync);

            },
            EC::SyncBlockStart => {
                debug_assert_eq!(self.mode, Mode::NonSync);
                self.handle_sync_start(sched_ctx, comp_ctx);
            },
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
                debug_assert_eq!(self.mode, Mode::NonSync);

            },
            EC::NewChannel => {
                debug_assert_eq!(self.mode, Mode::NonSync);

            }
        }

        return Ok(CompScheduling::Sleep);
    }

    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
        let mut step_result = EvalContinuation::Stepping;
        while let EvalContinuation::Stepping = step_result {
            step_result = self.prompt.step(
                &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
                &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx,
            )?;
        }

        return Ok(step_result)
    }

    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {

    }

    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {

    }
}

#[inline]
fn transform_port_id(port_id: EvalPortId) -> PortId {
    return PortId(port_id.id);
}