Files @ c62d6f0cc48a
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component_internet.rs - annotation

c62d6f0cc48a 5.5 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
mh
WIP on implementing (figuring out) tcp component
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
c62d6f0cc48a
use crate::protocol::eval::{ValueGroup, Value, EvalError};
use crate::runtime2::*;
use crate::runtime2::component::CompCtx;
use crate::runtime2::stdlib::internet::*;

use super::component::{self, *};
use super::control_layer::*;
use super::consensus::*;

enum SocketState {
    Connected(SocketTcpClient),
    Error,
}

enum SyncState {
    Getting,
    Putting
}

pub struct ComponentTcpClient {
    // Properties for the tcp socket
    socket_state: SocketState,
    pending_recv: Vec<DataMessage>, // on the input port
    pdl_input_port_id: PortId, // input from PDL, so transmitted over socket
    pdl_output_port_id: PortId, // output towards PDL, so received over socket
    // Generic component state
    exec_state: CompExecState,
    control: ControlLayer,
    consensus: Consensus,
}

impl Component for ComponentTcpClient {
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) {
        self.handle_incoming_data_message(message);
    }

    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
        match mesage {
            Message::Data(message) => {
                self.handle_incoming_data_message(message);
            },
            Message::Sync(message) => {
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
                component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
            },
            Message::Control(message) => {
                component::default_handle_control_message(
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
                    message, sched_ctx, comp_ctx
                );
            }
        }
    }

    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
        sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode));

        match self.exec_state.mode {
            CompMode::BlockedGet | CompMode::BlockedSelect => {
                // impossible for this component. We always accept the input
                // values, and we never perform an explicit select.
                unreachable!();
            },
            CompMode::NonSync => {
                // When in non-sync mode
                match &mut self.socket_state {
                    SocketState::Connected(socket) => {
                        if self.pending_tx
                    },
                    SocketState::Error => {
                        self.exec_state.mode = CompMode::StartExit;
                        return Ok(CompScheduling::Immediate);
                    }
                }
            },
            CompMode::Sync => {

            },
            CompMode::SyncEnd | CompMode::BlockedPut =>
                return Ok(CompScheduling::Sleep),
            CompMode::StartExit =>
                return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)),
            CompMode::BusyExit =>
                return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)),
            CompMode::Exit =>
                return Ok(component::default_handle_exit(&self.exec_state)),
        }

        return Ok(CompScheduling::Immediate);
    }
}

impl ComponentTcpClient {
    pub(crate) fn new(arguments: ValueGroup) -> Self {
        use std::net::{IpAddr, Ipv4Addr};

        debug_assert_eq!(arguments.values.len(), 4);

        // Parsing arguments
        let ip_heap_pos = arguments.values[0].as_array();
        let ip_elements = &arguments.regions[ip_heap_pos as usize];
        if ip_elements.len() != 4 {
            todo!("friendly error reporting: ip contains 4 octects");
        }
        let ip_address = IpAddr::V4(Ipv4Addr::new(
            ip_elements[0].as_uint8(), ip_elements[1].as_uint8(),
            ip_elements[2].as_uint8(), ip_elements[3].as_uint8()
        ));

        let port = arguments.values[1].as_uint16();
        let input_port = component::port_id_from_eval(arguments.values[2].as_input());
        let output_port = component::port_id_from_eval(arguments.values[3].as_output());

        let socket = SocketTcpClient::new(ip_address, port);
        if let Err(socket) = socket {
            todo!("friendly error reporting: failed to open socket {:?}", socket);
        }

        return Self{
            socket_state: SocketState::Connected(socket.unwrap()),
            pending_tx: Vec::new(),
            pdl_input_port_id: input_port,
            pdl_output_port_id: output_port,
            exec_state: CompExecState::new(),
            control: ControlLayer::default(),
            consensus: Consensus::new(),
        }
    }

    // Handles incoming data from the PDL side (hence, going into the socket)
    fn handle_incoming_data_message(&mut self, message: DataMessage) {
        // Input message is an array of bytes (u8)
        self.pending_recv.push(message);

    }

    fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec<u8>) {
        debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id);
        debug_assert_eq!(message.content.values.len(), 1);

        if let Value::Array(array_pos) = message.content.values[0] {
            let region = &message.content.regions[array_pos as usize];
            bytes.reserve(region.len());
            for value in region {
                bytes.push(value.as_uint8());
            }
        } else {
            unreachable!();
        }
    }
}