Files @ fb814548c7d5
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component_internet.rs

fb814548c7d5 15.8 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
Add tcp component to standard library
use crate::protocol::eval::{ValueGroup, Value, EvalError};
use crate::runtime2::*;
use crate::runtime2::component::{CompCtx, CompId};
use crate::runtime2::stdlib::internet::*;
use crate::runtime2::poll::*;

use super::component::{self, *};
use super::control_layer::*;
use super::consensus::*;

use std::io::ErrorKind as IoErrorKind;

enum SocketState {
    Connected(SocketTcpClient),
    Error,
}

impl SocketState {
    fn get_socket(&self) -> &SocketTcpClient {
        match self {
            SocketState::Connected(v) => v,
            SocketState::Error => unreachable!(),
        }
    }
}

/// States from the point of view of the component that is connecting to this
/// TCP component (i.e. from the point of view of attempting to interface with
/// a socket).
#[derive(PartialEq, Debug)]
enum SyncState {
    AwaitingCmd,
    Getting,
    Putting,
    FinishSync,
}

pub struct ComponentTcpClient {
    // Properties for the tcp socket
    socket_state: SocketState,
    sync_state: SyncState,
    poll_ticket: Option<PollTicket>,
    inbox_main: Option<DataMessage>,
    inbox_backup: Vec<DataMessage>,
    pdl_input_port_id: PortId, // input from PDL, so transmitted over socket
    pdl_output_port_id: PortId, // output towards PDL, so received over socket
    input_union_send_tag_value: i64,
    input_union_receive_tag_value: i64,
    input_union_finish_tag_value: i64,
    input_union_shutdown_tag_value: i64,
    // Generic component state
    exec_state: CompExecState,
    control: ControlLayer,
    consensus: Consensus,
    // Temporary variables
    byte_buffer: Vec<u8>,
}

impl Component for ComponentTcpClient {
    fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) {
        // Retrieve type information for messages we're going to receive
        let pd = &sched_ctx.runtime.protocol;
        let cmd_type = pd.find_type(b"std.internet", b"Cmd")
            .expect("'Cmd' type in the 'std.internet' module");
        let cmd_type = cmd_type
            .as_union();

        self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap();
        self.input_union_receive_tag_value = cmd_type.get_variant_tag_value(b"Receive").unwrap();
        self.input_union_finish_tag_value = cmd_type.get_variant_tag_value(b"Finish").unwrap();
        self.input_union_shutdown_tag_value = cmd_type.get_variant_tag_value(b"Shutdown").unwrap();

        // Register socket for async events
        if let SocketState::Connected(socket) = &self.socket_state {
            let self_handle = sched_ctx.runtime.get_component_public(id);
            let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, true)
                .expect("registering tcp component");

            debug_assert!(self.poll_ticket.is_none());
            self.poll_ticket = Some(poll_ticket);
        }
    }

    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) {
        if self.inbox_main.is_none() {
            self.inbox_main = Some(message);
        } else {
            self.inbox_backup.push(message);
        }
    }

    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
        match message {
            Message::Data(message) => {
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
            },
            Message::Sync(message) => {
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
                component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
            },
            Message::Control(message) => {
                component::default_handle_control_message(
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
                    message, sched_ctx, comp_ctx
                );
            },
            Message::Poll => {
                sched_ctx.log("Received polling event");
            },
        }
    }

    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
        sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode));

        match self.exec_state.mode {
            CompMode::BlockedSelect => {
                // Not possible: we never enter this state
                unreachable!();
            },
            CompMode::NonSync => {
                // When in non-sync mode
                match &mut self.socket_state {
                    SocketState::Connected(_socket) => {
                        // Always move into the sync-state
                        self.sync_state = SyncState::AwaitingCmd;
                        self.consensus.notify_sync_start(comp_ctx);
                        self.exec_state.mode = CompMode::Sync;
                    },
                    SocketState::Error => {
                        // Could potentially send an error message to the
                        // connected component.
                        self.exec_state.mode = CompMode::StartExit;
                        return Ok(CompScheduling::Immediate);
                    }
                }
            },
            CompMode::Sync => {
                // When in sync mode: wait for a command to come in
                match self.sync_state {
                    SyncState::AwaitingCmd => {
                        if let Some(message) = self.inbox_backup.pop() {
                            if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) {
                                // Check which command we're supposed to execute.
                                let (tag_value, embedded_heap_pos) = message.content.values[0].as_union();
                                if tag_value == self.input_union_send_tag_value {
                                    // Retrieve bytes from the message
                                    self.byte_buffer.clear();
                                    let union_content = &message.content.regions[embedded_heap_pos as usize];
                                    debug_assert_eq!(union_content.len(), 1);
                                    let array_heap_pos = union_content[0].as_array();
                                    let array_values = &message.content.regions[array_heap_pos as usize];
                                    self.byte_buffer.reserve(array_values.len());
                                    for value in array_values {
                                        self.byte_buffer.push(value.as_uint8());
                                    }

                                    self.sync_state = SyncState::Putting;
                                    return Ok(CompScheduling::Immediate);
                                } else if tag_value == self.input_union_receive_tag_value {
                                    // Component requires a `recv`
                                    self.sync_state = SyncState::Getting;
                                    return Ok(CompScheduling::Immediate);
                                } else if tag_value == self.input_union_finish_tag_value {
                                    // Component requires us to end the sync round
                                    let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
                                    component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
                                } else if tag_value == self.input_union_shutdown_tag_value {
                                    // Component wants to close the connection
                                    todo!("implement clean shutdown, don't forget to unregister to poll ticket");
                                }
                            } else {
                                todo!("handle sync failure due to message deadlock");
                                return Ok(CompScheduling::Sleep);
                            }
                        } else {
                            self.exec_state.set_as_blocked_get(self.pdl_input_port_id);
                            return Ok(CompScheduling::Sleep);
                        }
                    },
                    SyncState::Putting => {
                        // We're supposed to send a user-supplied message fully
                        // over the socket. But we might end up blocking. In
                        // that case the component goes to sleep until it is
                        // polled.
                        let socket = self.socket_state.get_socket();
                        while !self.byte_buffer.is_empty() {
                            match socket.send(&self.byte_buffer) {
                                Ok(bytes_sent) => {
                                    self.byte_buffer.drain(..bytes_sent);
                                },
                                Err(err) => {
                                    if err.kind() == IoErrorKind::WouldBlock {
                                        return Ok(CompScheduling::Sleep); // wait until notified
                                    } else {
                                        todo!("handle socket.send error {:?}", err)
                                    }
                                }
                            }
                        }

                        // If here then we're done putting the data, we can
                        // finish the sync round
                        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
                        component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
                    },
                    SyncState::Getting => {
                        // We're going to try and receive a single message. If
                        // this causes us to end up blocking the component
                        // goes to sleep until it is polled.
                        const BUFFER_SIZE: usize = 1024; // TODO: Move to config

                        let socket = self.socket_state.get_socket();
                        debug_assert!(self.byte_buffer.is_empty());
                        self.byte_buffer.resize(BUFFER_SIZE, 0);
                        match socket.receive(&mut self.byte_buffer) {
                            Ok(num_received) => {
                                self.byte_buffer.resize(num_received, 0);
                                let message_content = self.bytes_to_data_message_content(&self.byte_buffer);
                                let scheduling = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, message_content, sched_ctx, &mut self.consensus, comp_ctx);
                                self.sync_state = SyncState::FinishSync;
                                return Ok(scheduling);
                            },
                            Err(err) => {
                                if err.kind() == IoErrorKind::WouldBlock {
                                    return Ok(CompScheduling::Sleep); // wait until polled
                                } else {
                                    todo!("handle socket.receive error {:?}", err);
                                }
                            }
                        }
                    },
                    SyncState::FinishSync => {
                        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
                        self.exec_state.mode = CompMode::SyncEnd;
                        component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
                        return Ok(CompScheduling::Requeue);
                    }
                }
            },
            CompMode::BlockedGet => {
                // Entered when awaiting a new command
                debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd);
                return Ok(CompScheduling::Sleep);
            },
            CompMode::SyncEnd | CompMode::BlockedPut =>
                return Ok(CompScheduling::Sleep),
            CompMode::StartExit =>
                return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)),
            CompMode::BusyExit =>
                return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)),
            CompMode::Exit =>
                return Ok(component::default_handle_exit(&self.exec_state)),
        }

        return Ok(CompScheduling::Immediate);
    }
}

impl ComponentTcpClient {
    pub(crate) fn new(arguments: ValueGroup) -> Self {
        use std::net::{IpAddr, Ipv4Addr};

        debug_assert_eq!(arguments.values.len(), 4);

        // Parsing arguments
        let ip_heap_pos = arguments.values[0].as_array();
        let ip_elements = &arguments.regions[ip_heap_pos as usize];
        if ip_elements.len() != 4 {
            todo!("friendly error reporting: ip contains 4 octects");
        }
        let ip_address = IpAddr::V4(Ipv4Addr::new(
            ip_elements[0].as_uint8(), ip_elements[1].as_uint8(),
            ip_elements[2].as_uint8(), ip_elements[3].as_uint8()
        ));

        let port = arguments.values[1].as_uint16();
        let input_port = component::port_id_from_eval(arguments.values[2].as_input());
        let output_port = component::port_id_from_eval(arguments.values[3].as_output());

        let socket = SocketTcpClient::new(ip_address, port);
        if let Err(socket) = socket {
            todo!("friendly error reporting: failed to open socket {:?}", socket);
        }

        return Self{
            socket_state: SocketState::Connected(socket.unwrap()),
            sync_state: SyncState::AwaitingCmd,
            poll_ticket: None,
            inbox_main: None,
            inbox_backup: Vec::new(),
            input_union_send_tag_value: -1,
            input_union_receive_tag_value: -1,
            input_union_finish_tag_value: -1,
            input_union_shutdown_tag_value: -1,
            pdl_input_port_id: input_port,
            pdl_output_port_id: output_port,
            exec_state: CompExecState::new(),
            control: ControlLayer::default(),
            consensus: Consensus::new(),
            byte_buffer: Vec::new(),
        }
    }

    // Handles incoming data from the PDL side (hence, going into the socket)
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
        if self.exec_state.mode.is_in_sync_block() {
            self.consensus.handle_incoming_data_message(comp_ctx, &message);
        }

        match component::default_handle_incoming_data_message(
            &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control
        ) {
            IncomingData::PlacedInSlot => {},
            IncomingData::SlotFull(message) => {
                self.inbox_backup.push(message);
            }
        }
    }

    fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec<u8>) {
        debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id);
        debug_assert_eq!(message.content.values.len(), 1);

        if let Value::Array(array_pos) = message.content.values[0] {
            let region = &message.content.regions[array_pos as usize];
            bytes.reserve(region.len());
            for value in region {
                bytes.push(value.as_uint8());
            }
        } else {
            unreachable!();
        }
    }

    fn bytes_to_data_message_content(&self, buffer: &[u8]) -> ValueGroup {
        // Turn bytes into silly executor-style array
        let mut values = Vec::with_capacity(buffer.len());
        for byte in buffer.iter().copied() {
            values.push(Value::UInt8(byte));
        }

        // Put in a value group
        let mut value_group = ValueGroup::default();
        value_group.regions.push(values);
        value_group.values.push(Value::Array(0));

        return value_group;
    }
}