Files @ c62d6f0cc48a
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component_internet.rs

c62d6f0cc48a 5.5 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
WIP on implementing (figuring out) tcp component
use crate::protocol::eval::{ValueGroup, Value, EvalError};
use crate::runtime2::*;
use crate::runtime2::component::CompCtx;
use crate::runtime2::stdlib::internet::*;

use super::component::{self, *};
use super::control_layer::*;
use super::consensus::*;

enum SocketState {
    Connected(SocketTcpClient),
    Error,
}

enum SyncState {
    Getting,
    Putting
}

pub struct ComponentTcpClient {
    // Properties for the tcp socket
    socket_state: SocketState,
    pending_recv: Vec<DataMessage>, // on the input port
    pdl_input_port_id: PortId, // input from PDL, so transmitted over socket
    pdl_output_port_id: PortId, // output towards PDL, so received over socket
    // Generic component state
    exec_state: CompExecState,
    control: ControlLayer,
    consensus: Consensus,
}

impl Component for ComponentTcpClient {
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) {
        self.handle_incoming_data_message(message);
    }

    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
        match mesage {
            Message::Data(message) => {
                self.handle_incoming_data_message(message);
            },
            Message::Sync(message) => {
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
                component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
            },
            Message::Control(message) => {
                component::default_handle_control_message(
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
                    message, sched_ctx, comp_ctx
                );
            }
        }
    }

    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
        sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode));

        match self.exec_state.mode {
            CompMode::BlockedGet | CompMode::BlockedSelect => {
                // impossible for this component. We always accept the input
                // values, and we never perform an explicit select.
                unreachable!();
            },
            CompMode::NonSync => {
                // When in non-sync mode
                match &mut self.socket_state {
                    SocketState::Connected(socket) => {
                        if self.pending_tx
                    },
                    SocketState::Error => {
                        self.exec_state.mode = CompMode::StartExit;
                        return Ok(CompScheduling::Immediate);
                    }
                }
            },
            CompMode::Sync => {

            },
            CompMode::SyncEnd | CompMode::BlockedPut =>
                return Ok(CompScheduling::Sleep),
            CompMode::StartExit =>
                return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)),
            CompMode::BusyExit =>
                return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)),
            CompMode::Exit =>
                return Ok(component::default_handle_exit(&self.exec_state)),
        }

        return Ok(CompScheduling::Immediate);
    }
}

impl ComponentTcpClient {
    pub(crate) fn new(arguments: ValueGroup) -> Self {
        use std::net::{IpAddr, Ipv4Addr};

        debug_assert_eq!(arguments.values.len(), 4);

        // Parsing arguments
        let ip_heap_pos = arguments.values[0].as_array();
        let ip_elements = &arguments.regions[ip_heap_pos as usize];
        if ip_elements.len() != 4 {
            todo!("friendly error reporting: ip contains 4 octects");
        }
        let ip_address = IpAddr::V4(Ipv4Addr::new(
            ip_elements[0].as_uint8(), ip_elements[1].as_uint8(),
            ip_elements[2].as_uint8(), ip_elements[3].as_uint8()
        ));

        let port = arguments.values[1].as_uint16();
        let input_port = component::port_id_from_eval(arguments.values[2].as_input());
        let output_port = component::port_id_from_eval(arguments.values[3].as_output());

        let socket = SocketTcpClient::new(ip_address, port);
        if let Err(socket) = socket {
            todo!("friendly error reporting: failed to open socket {:?}", socket);
        }

        return Self{
            socket_state: SocketState::Connected(socket.unwrap()),
            pending_tx: Vec::new(),
            pdl_input_port_id: input_port,
            pdl_output_port_id: output_port,
            exec_state: CompExecState::new(),
            control: ControlLayer::default(),
            consensus: Consensus::new(),
        }
    }

    // Handles incoming data from the PDL side (hence, going into the socket)
    fn handle_incoming_data_message(&mut self, message: DataMessage) {
        // Input message is an array of bytes (u8)
        self.pending_recv.push(message);

    }

    fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec<u8>) {
        debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id);
        debug_assert_eq!(message.content.values.len(), 1);

        if let Value::Array(array_pos) = message.content.values[0] {
            let region = &message.content.regions[array_pos as usize];
            bytes.reserve(region.len());
            for value in region {
                bytes.push(value.as_uint8());
            }
        } else {
            unreachable!();
        }
    }
}