Changeset - 179c81cc33cb
[Not reviewed]
MH - 3 years ago 2022-05-15 15:57:50
contact@maxhenger.nl
Implement error handling for socket components
1 file changed with 68 insertions and 30 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_internet.rs
Show inline comments
 
use crate::protocol::eval::{ValueGroup, Value};
 
use crate::runtime2::*;
 
use crate::runtime2::component::{CompCtx, CompId, PortInstruction};
 
use crate::runtime2::stdlib::internet::*;
 
use crate::runtime2::poll::*;
 

	
 
use super::component::{self, *};
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 

	
 
use std::io::ErrorKind as IoErrorKind;
 
use std::net::{IpAddr, Ipv4Addr};
 
use crate::protocol::{ProcedureDefinitionId, TypeId};
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentTcpClient
 
// -----------------------------------------------------------------------------
 

	
 
enum ClientSocketState {
 
    Connected(SocketTcpClient),
 
    ErrorReported(String),
 
    Error,
 
}
 

	
 
impl ClientSocketState {
 
    fn get_socket(&self) -> &SocketTcpClient {
 
        match self {
 
            ClientSocketState::Connected(v) => v,
 
            ClientSocketState::Error => unreachable!(),
 
            ClientSocketState::ErrorReported(_) | ClientSocketState::Error => unreachable!(),
 
        }
 
    }
 
}
 

	
 
/// States from the point of view of the component that is connecting to this
 
/// TCP component (i.e. from the point of view of attempting to interface with
 
/// a socket).
 
#[derive(PartialEq, Debug)]
 
enum ClientSyncState {
 
    AwaitingCmd,
 
    Getting,
 
    Putting,
 
    FinishSync,
 
    FinishSyncThenQuit,
 
}
 

	
 
pub struct ComponentTcpClient {
 
    // Properties for the tcp socket
 
    socket_state: ClientSocketState,
 
    sync_state: ClientSyncState,
 
    poll_ticket: Option<PollTicket>,
 
    inbox_main: InboxMain,
 
    inbox_backup: InboxBackup,
 
    pdl_input_port_id: PortId, // input from PDL, so transmitted over socket
 
@@ -103,87 +104,92 @@ impl Component for ComponentTcpClient {
 
            *slot = Some(message);
 
        } else {
 
            self.inbox_backup.push(message);
 
        }
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => {
 
                sched_ctx.info("Received polling event");
 
                sched_ctx.debug("Received polling event");
 
            },
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedSelect |
 
            CompMode::PutPortsBlockedTransferredPorts |
 
            CompMode::PutPortsBlockedAwaitingAcks |
 
            CompMode::PutPortsBlockedSendingPort |
 
            CompMode::NewComponentBlocked => {
 
                // Not possible: we never enter this state
 
                unreachable!();
 
            },
 
            CompMode::NonSync => {
 
                // When in non-sync mode
 
                match &self.socket_state {
 
                    ClientSocketState::Connected(_socket) => {
 
                        if self.sync_state == ClientSyncState::FinishSyncThenQuit {
 
                            // Previous request was to let the component shut down
 
                            self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                        } else {
 
                            // Reset for a new request
 
                            self.sync_state = ClientSyncState::AwaitingCmd;
 
                            component::default_handle_sync_start(
 
                                &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                            );
 
                        }
 
                        return CompScheduling::Immediate;
 
                    },
 
                    ClientSocketState::Error => {
 
                        // Could potentially send an error message to the
 
                        // connected component.
 
                        self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
                    ClientSocketState::ErrorReported(message) => {
 
                        component::default_handle_error_for_builtin(
 
                            &mut self.exec_state, sched_ctx,
 
                            (PortInstruction::NoSource, format!("failed socket creation, reason: {}", message))
 
                        );
 
                        self.socket_state = ClientSocketState::Error;
 
                        return CompScheduling::Immediate;
 
                    }
 
                    ClientSocketState::Error => {
 
                        return CompScheduling::Sleep;
 
                    }
 
                }
 
            },
 
            CompMode::Sync => {
 
                // When in sync mode: wait for a command to come in
 
                match self.sync_state {
 
                    ClientSyncState::AwaitingCmd => {
 
                        match component::default_attempt_get(
 
                            &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource,
 
                            &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx,
 
                            &mut self.control, &mut self.consensus
 
                        ) {
 
                            GetResult::Received(message) => {
 
                                let (tag_value, embedded_heap_pos) = message.content.values[0].as_union();
 
                                if tag_value == self.input_union_send_tag_value {
 
                                    // Retrieve bytes from the message
 
                                    self.byte_buffer.clear();
 
                                    let union_content = &message.content.regions[embedded_heap_pos as usize];
 
                                    debug_assert_eq!(union_content.len(), 1);
 
                                    let array_heap_pos = union_content[0].as_array();
 
                                    let array_values = &message.content.regions[array_heap_pos as usize];
 
                                    self.byte_buffer.reserve(array_values.len());
 
                                    for value in array_values {
 
                                        self.byte_buffer.push(value.as_uint8());
 
                                    }
 
@@ -207,149 +213,164 @@ impl Component for ComponentTcpClient {
 
                            GetResult::NoMessage => {
 
                                return CompScheduling::Sleep;
 
                            },
 
                            GetResult::Error(location_and_message) => {
 
                                component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                return CompScheduling::Immediate;
 
                            }
 
                        }
 
                    },
 
                    ClientSyncState::Putting => {
 
                        // We're supposed to send a user-supplied message fully
 
                        // over the socket. But we might end up blocking. In
 
                        // that case the component goes to sleep until it is
 
                        // polled.
 
                        let socket = self.socket_state.get_socket();
 
                        while !self.byte_buffer.is_empty() {
 
                            match socket.send(&self.byte_buffer) {
 
                                Ok(bytes_sent) => {
 
                                    self.byte_buffer.drain(..bytes_sent);
 
                                },
 
                                Err(err) => {
 
                                    if err.kind() == IoErrorKind::WouldBlock {
 
                                        return CompScheduling::Sleep; // wait until notified
 
                                    } else {
 
                                        todo!("handle socket.send error {:?}", err)
 
                                        component::default_handle_error_for_builtin(
 
                                            &mut self.exec_state, sched_ctx,
 
                                            (PortInstruction::NoSource, format!("failed sending on socket, reason: {}", err))
 
                                        );
 
                                        return CompScheduling::Immediate;
 
                                    }
 
                                }
 
                            }
 
                        }
 

	
 
                        // If here then we're done putting the data, we can
 
                        // finish the sync round
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    },
 
                    ClientSyncState::Getting => {
 
                        // We're going to try and receive a single message. If
 
                        // this causes us to end up blocking the component
 
                        // goes to sleep until it is polled.
 
                        const BUFFER_SIZE: usize = 1024; // TODO: Move to config
 

	
 
                        let socket = self.socket_state.get_socket();
 
                        self.byte_buffer.resize(BUFFER_SIZE, 0);
 
                        match socket.receive(&mut self.byte_buffer) {
 
                            Ok(num_received) => {
 
                                self.byte_buffer.resize(num_received, 0);
 
                                let message_content = self.bytes_to_data_message_content(&self.byte_buffer);
 
                                let send_result = component::default_send_data_message(
 
                                    &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource,
 
                                    message_content, sched_ctx, &mut self.consensus, &mut self.control, comp_ctx
 
                                );
 

	
 
                                if let Err(location_and_message) = send_result {
 
                                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                    return CompScheduling::Immediate;
 
                                } else {
 
                                    let scheduling = send_result.unwrap();
 
                                    self.sync_state = ClientSyncState::AwaitingCmd;
 
                                    return scheduling;
 
                                }
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return CompScheduling::Sleep; // wait until polled
 
                                } else {
 
                                    todo!("handle socket.receive error {:?}", err)
 
                                    component::default_handle_error_for_builtin(
 
                                        &mut self.exec_state, sched_ctx,
 
                                        (PortInstruction::NoSource, format!("failed receiving from socket, reason: {}", err))
 
                                    );
 
                                    return CompScheduling::Immediate;
 
                                }
 
                            }
 
                        }
 
                    },
 
                    ClientSyncState::FinishSync | ClientSyncState::FinishSyncThenQuit => {
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    },
 
                }
 
            },
 
            CompMode::BlockedGet => {
 
                // Entered when awaiting a new command
 
                debug_assert_eq!(self.sync_state, ClientSyncState::AwaitingCmd);
 
                return CompScheduling::Sleep;
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut =>
 
                return CompScheduling::Sleep,
 
            CompMode::StartExit =>
 
                return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus),
 
            CompMode::BusyExit =>
 
                return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx),
 
            CompMode::Exit =>
 
                return component::default_handle_exit(&self.exec_state),
 
        }
 
    }
 
}
 

	
 
impl ComponentTcpClient {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        fn client_socket_state_from_result(result: Result<SocketTcpClient, SocketError>) -> ClientSocketState {
 
            match result {
 
                Ok(socket) => ClientSocketState::Connected(socket),
 
                Err(error) => ClientSocketState::ErrorReported(format!("Failed to create socket, reason: {:?}", error)),
 
            }
 
        }
 

	
 
        // Two possible cases here: if the number of arguments is 3, then we
 
        // get: (socket_handle, input_port, output_port). If the number of
 
        // arguments is 4, then we get: (ip, port, input_port, output_port).
 
        assert!(arguments.values.len() == 3 || arguments.values.len() == 4);
 

	
 
        // Parsing arguments
 
        let (socket, input_port, output_port) = if arguments.values.len() == 3 {
 
        let (socket_state, input_port, output_port) = if arguments.values.len() == 3 {
 
            let socket_handle = arguments.values[0].as_sint32();
 
            let socket = SocketTcpClient::new_from_handle(socket_handle);
 
            let socket_state = client_socket_state_from_result(socket);
 

	
 
            let input_port = component::port_id_from_eval(arguments.values[1].as_input());
 
            let output_port = component::port_id_from_eval(arguments.values[2].as_output());
 

	
 
            (socket, input_port, output_port)
 
            (socket_state, input_port, output_port)
 
        } else {
 
            let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1);
 
            let socket = SocketTcpClient::new(ip_address, port);
 

	
 
            let input_port = component::port_id_from_eval(arguments.values[2].as_input());
 
            let output_port = component::port_id_from_eval(arguments.values[3].as_output());
 

	
 
            (socket, input_port, output_port)
 
            let ip_and_port = ip_addr_and_port_from_args(&arguments, 0, 1);
 
            let socket_state = match ip_and_port {
 
                Ok((ip_address, port)) => client_socket_state_from_result(SocketTcpClient::new(ip_address, port)),
 
                Err(message) => ClientSocketState::ErrorReported(message),
 
            };
 

	
 
        if let Err(socket) = socket {
 
            todo!("friendly error reporting: failed to open socket (reason: {:?})", socket);
 
        }
 
            (socket_state, input_port, output_port)
 
        };
 

	
 
        return Self{
 
            socket_state: ClientSocketState::Connected(socket.unwrap()),
 
            socket_state,
 
            sync_state: ClientSyncState::AwaitingCmd,
 
            poll_ticket: None,
 
            inbox_main: vec![None, None],
 
            inbox_backup: Vec::new(),
 
            input_union_send_tag_value: -1,
 
            input_union_receive_tag_value: -1,
 
            input_union_finish_tag_value: -1,
 
            input_union_shutdown_tag_value: -1,
 
            pdl_input_port_id: input_port,
 
            pdl_output_port_id: output_port,
 
            exec_state: CompExecState::new(),
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            byte_buffer: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn new_with_existing_connection(socket: SocketTcpClient, input_port: PortId, output_port: PortId) -> Self {
 
        return Self{
 
            socket_state: ClientSocketState::Connected(socket),
 
            sync_state: ClientSyncState::AwaitingCmd,
 
            poll_ticket: None,
 
            inbox_main: vec![None, None],
 
            inbox_backup: Vec::new(),
 
@@ -398,56 +419,57 @@ impl ComponentTcpClient {
 
    }
 

	
 
    fn bytes_to_data_message_content(&self, buffer: &[u8]) -> ValueGroup {
 
        // Turn bytes into silly executor-style array
 
        let mut values = Vec::with_capacity(buffer.len());
 
        for byte in buffer.iter().copied() {
 
            values.push(Value::UInt8(byte));
 
        }
 

	
 
        // Put in a value group
 
        let mut value_group = ValueGroup::default();
 
        value_group.regions.push(values);
 
        value_group.values.push(Value::Array(0));
 

	
 
        return value_group;
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentTcpListener
 
// -----------------------------------------------------------------------------
 

	
 
enum ListenerSocketState {
 
    Connected(SocketTcpListener),
 
    ErrorReported(String),
 
    Error,
 
}
 

	
 
impl ListenerSocketState {
 
    fn get_socket(&self) -> &SocketTcpListener {
 
        match self {
 
            ListenerSocketState::Connected(v) => return v,
 
            ListenerSocketState::Error => unreachable!(),
 
            ListenerSocketState::ErrorReported(_) | ListenerSocketState::Error => unreachable!(),
 
        }
 
    }
 
}
 

	
 
struct PendingComponent {
 
    client: i32, // OS socket handle
 
    cmd_rx: PortId,
 
    data_tx: PortId,
 
}
 

	
 
enum ListenerSyncState {
 
    AwaitingCmd,
 
    AcceptCommandReceived, // just received `Accept` command
 
    AcceptChannelGenerated, // created channel, waiting to end the sync round
 
    AcceptGenerateComponent, // sync ended, back in non-sync, now generate component
 
    FinishSyncThenQuit,
 
}
 

	
 
pub struct ComponentTcpListener {
 
    // Properties for the tcp socket
 
    socket_state: ListenerSocketState,
 
    sync_state: ListenerSyncState,
 
    pending_component: Option<PendingComponent>,
 
    poll_ticket: Option<PollTicket>,
 
@@ -508,49 +530,49 @@ impl Component for ComponentTcpListener {
 
    }
 

	
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
 
        unreachable!();
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => {
 
                sched_ctx.info("Received polling event");
 
                sched_ctx.debug("Received polling event");
 
            },
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.info(&format!("Running component ComponentTcpListener (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedSelect
 
                => unreachable!(),
 
            CompMode::PutPortsBlockedTransferredPorts |
 
            CompMode::PutPortsBlockedAwaitingAcks |
 
            CompMode::PutPortsBlockedSendingPort |
 
            CompMode::NewComponentBlocked
 
                => return CompScheduling::Sleep,
 
            CompMode::NonSync => {
 
                match &self.socket_state {
 
                    ListenerSocketState::Connected(_socket) => {
 
                        match self.sync_state {
 
                            ListenerSyncState::AwaitingCmd => {
 
                                component::default_handle_sync_start(
 
                                    &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                                );
 
                            },
 
@@ -560,52 +582,59 @@ impl Component for ComponentTcpListener {
 
                                // Now that we're outside the sync round, create the tcp client
 
                                // component
 
                                let pending = self.pending_component.take().unwrap();
 

	
 
                                let arguments = ValueGroup::new_stack(vec![
 
                                    Value::SInt32(pending.client),
 
                                    Value::Input(port_id_to_eval(pending.cmd_rx)),
 
                                    Value::Output(port_id_to_eval(pending.data_tx)),
 
                                ]);
 
                                component::default_start_create_component(
 
                                    &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control,
 
                                    &mut self.inbox_main, &mut self.inbox_backup,
 
                                    self.tcp_client_definition.0, self.tcp_client_definition.1,
 
                                    arguments
 
                                );
 
                                self.sync_state = ListenerSyncState::AwaitingCmd;
 
                            },
 
                            ListenerSyncState::FinishSyncThenQuit => {
 
                                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                            },
 
                        }
 

	
 
                        return CompScheduling::Immediate;
 
                    },
 
                    ListenerSocketState::Error => {
 
                        self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
                    ListenerSocketState::ErrorReported(message) => {
 
                        component::default_handle_error_for_builtin(
 
                            &mut self.exec_state, sched_ctx,
 
                            (PortInstruction::NoSource, message.clone())
 
                        );
 
                        self.socket_state = ListenerSocketState::Error;
 
                        return CompScheduling::Immediate;
 
                    }
 
                    ListenerSocketState::Error => {
 
                        return CompScheduling::Sleep;
 
                    }
 
                }
 
            },
 
            CompMode::Sync => {
 
                match self.sync_state {
 
                    ListenerSyncState::AwaitingCmd => {
 
                        match component::default_attempt_get(
 
                            &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource,
 
                            &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx,
 
                            &mut self.control, &mut self.consensus
 
                        ) {
 
                            GetResult::Received(message) => {
 
                                let (tag_value, _) = message.content.values[0].as_union();
 
                                if tag_value == self.input_union_accept_tag {
 
                                    self.sync_state = ListenerSyncState::AcceptCommandReceived;
 
                                } else if tag_value == self.input_union_shutdown_tag {
 
                                    self.sync_state = ListenerSyncState::FinishSyncThenQuit;
 
                                } else {
 
                                    unreachable!("got tag_value {}", tag_value);
 
                                }
 

	
 
                                return CompScheduling::Immediate;
 
                            },
 
                            GetResult::NoMessage => {
 
                                return CompScheduling::Sleep;
 
@@ -647,130 +676,139 @@ impl Component for ComponentTcpListener {
 
                                    &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, values,
 
                                    sched_ctx, &mut self.consensus, &mut self.control, comp_ctx
 
                                ) {
 
                                    component::default_handle_error_for_builtin(
 
                                        &mut self.exec_state, sched_ctx, location_and_message
 
                                    );
 
                                }
 

	
 
                                // Prepare for finishing the consensus round, and once finished,
 
                                // create the tcp client component
 
                                self.sync_state = ListenerSyncState::AcceptChannelGenerated;
 
                                debug_assert!(self.pending_component.is_none());
 
                                self.pending_component = Some(PendingComponent{
 
                                    client: client_handle,
 
                                    cmd_rx: cmd_channel.getter_id,
 
                                    data_tx: data_channel.putter_id
 
                                });
 

	
 
                                return CompScheduling::Requeue;
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return CompScheduling::Sleep;
 
                                } else {
 
                                    todo!("handle listener.accept error {:?}", err)
 
                                    component::default_handle_error_for_builtin(
 
                                        &mut self.exec_state, sched_ctx,
 
                                        (PortInstruction::NoSource, format!("failed to listen on socket, reason: {}", err))
 
                                    );
 
                                    return CompScheduling::Immediate;
 
                                }
 
                            }
 
                        }
 
                    },
 
                    ListenerSyncState::AcceptChannelGenerated => {
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                        self.sync_state = ListenerSyncState::AcceptGenerateComponent;
 
                        return CompScheduling::Requeue;
 
                    }
 
                    ListenerSyncState::FinishSyncThenQuit => {
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    },
 
                    ListenerSyncState::AcceptGenerateComponent => unreachable!(),
 
                }
 
            },
 
            CompMode::BlockedGet => {
 
                return CompScheduling::Sleep;
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut
 
                => return CompScheduling::Sleep,
 
            CompMode::StartExit =>
 
                return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus),
 
            CompMode::BusyExit =>
 
                return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx),
 
            CompMode::Exit =>
 
                return component::default_handle_exit(&self.exec_state),
 
        }
 
    }
 
}
 

	
 
impl ComponentTcpListener {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        debug_assert_eq!(arguments.values.len(), 4);
 

	
 
        // Parsing arguments
 
        let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1);
 
        let input_port = component::port_id_from_eval(arguments.values[2].as_input());
 
        let output_port = component::port_id_from_eval(arguments.values[3].as_output());
 

	
 
        let socket_state = match ip_addr_and_port_from_args(&arguments, 0, 1) {
 
            Ok((ip_address, port)) => {
 
                let socket = SocketTcpListener::new(ip_address, port);
 
        if let Err(socket) = socket {
 
            todo!("friendly error reporting: failed to open socket (reason: {:?})", socket);
 
                match socket {
 
                    Ok(socket) => ListenerSocketState::Connected(socket),
 
                    Err(err) => ListenerSocketState::ErrorReported(format!("failed to create listener socket, reason: {:?}", err), )
 
                }
 
            },
 
            Err(message) => ListenerSocketState::ErrorReported(message),
 
        };
 

	
 
        return Self {
 
            socket_state: ListenerSocketState::Connected(socket.unwrap()),
 
            socket_state,
 
            sync_state: ListenerSyncState::AwaitingCmd,
 
            pending_component: None,
 
            poll_ticket: None,
 
            inbox_main: vec![None, None],
 
            inbox_backup: InboxBackup::new(),
 
            pdl_input_port_id: input_port,
 
            pdl_output_port_id: output_port,
 
            tcp_client_definition: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()),
 
            input_union_accept_tag: -1,
 
            input_union_shutdown_tag: -1,
 
            output_struct_tx_index: 0,
 
            output_struct_rx_index: 0,
 
            exec_state: CompExecState::new(),
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
        }
 
    }
 

	
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        if self.exec_state.mode.is_in_sync_block() {
 
            self.consensus.handle_incoming_data_message(comp_ctx, &message);
 
        }
 

	
 
        match component::default_handle_incoming_data_message(
 
            &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control
 
        ) {
 
            IncomingData::PlacedInSlot => {},
 
            IncomingData::SlotFull(message) => {
 
                self.inbox_backup.push(message);
 
            }
 
        }
 
    }
 
}
 

	
 
fn ip_addr_and_port_from_args(
 
    arguments: &ValueGroup, ip_index: usize, port_index: usize
 
) -> (IpAddr, u16) {
 
) -> Result<(IpAddr, u16), String> {
 
    debug_assert!(ip_index < arguments.values.len());
 
    debug_assert!(port_index < arguments.values.len());
 

	
 
    // Parsing IP address
 
    let ip_heap_pos = arguments.values[0].as_array();
 
    let ip_elements = &arguments.regions[ip_heap_pos as usize];
 

	
 
    let ip_address = match ip_elements.len() {
 
        0 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
 
        4 => IpAddr::V4(Ipv4Addr::new(
 
            ip_elements[0].as_uint8(), ip_elements[1].as_uint8(),
 
            ip_elements[2].as_uint8(), ip_elements[3].as_uint8()
 
        )),
 
        _ => todo!("friendly error reporting: ip should contain 4 octets (or 0 for unspecified)")
 
        _ => return Err(format!("Expected 0 or 4 elements in the IP address, got {}", ip_elements.len())),
 
    };
 

	
 
    let port = arguments.values[port_index].as_uint16();
 

	
 
    return (ip_address, port);
 
    return Ok((ip_address, port));
 
}
 

	
0 comments (0 inline, 0 general)