Changeset - 179c81cc33cb
[Not reviewed]
MH - 3 years ago 2022-05-15 15:57:50
contact@maxhenger.nl
Implement error handling for socket components
1 file changed with 71 insertions and 33 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -16,20 +16,21 @@ use crate::protocol::{ProcedureDefinitionId, TypeId};
 
// -----------------------------------------------------------------------------
 
// ComponentTcpClient
 
// -----------------------------------------------------------------------------
 

	
 
enum ClientSocketState {
 
    Connected(SocketTcpClient),
 
    ErrorReported(String),
 
    Error,
 
}
 

	
 
impl ClientSocketState {
 
    fn get_socket(&self) -> &SocketTcpClient {
 
        match self {
 
            ClientSocketState::Connected(v) => v,
 
            ClientSocketState::Error => unreachable!(),
 
            ClientSocketState::ErrorReported(_) | ClientSocketState::Error => unreachable!(),
 
        }
 
    }
 
}
 

	
 
/// States from the point of view of the component that is connecting to this
 
/// TCP component (i.e. from the point of view of attempting to interface with
 
@@ -121,13 +122,13 @@ impl Component for ComponentTcpClient {
 
                    message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => {
 
                sched_ctx.info("Received polling event");
 
                sched_ctx.debug("Received polling event");
 
            },
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state));
 
@@ -154,18 +155,23 @@ impl Component for ComponentTcpClient {
 
                            component::default_handle_sync_start(
 
                                &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                            );
 
                        }
 
                        return CompScheduling::Immediate;
 
                    },
 
                    ClientSocketState::Error => {
 
                        // Could potentially send an error message to the
 
                        // connected component.
 
                        self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
                    ClientSocketState::ErrorReported(message) => {
 
                        component::default_handle_error_for_builtin(
 
                            &mut self.exec_state, sched_ctx,
 
                            (PortInstruction::NoSource, format!("failed socket creation, reason: {}", message))
 
                        );
 
                        self.socket_state = ClientSocketState::Error;
 
                        return CompScheduling::Immediate;
 
                    }
 
                    ClientSocketState::Error => {
 
                        return CompScheduling::Sleep;
 
                    }
 
                }
 
            },
 
            CompMode::Sync => {
 
                // When in sync mode: wait for a command to come in
 
                match self.sync_state {
 
                    ClientSyncState::AwaitingCmd => {
 
@@ -225,13 +231,17 @@ impl Component for ComponentTcpClient {
 
                                    self.byte_buffer.drain(..bytes_sent);
 
                                },
 
                                Err(err) => {
 
                                    if err.kind() == IoErrorKind::WouldBlock {
 
                                        return CompScheduling::Sleep; // wait until notified
 
                                    } else {
 
                                        todo!("handle socket.send error {:?}", err)
 
                                        component::default_handle_error_for_builtin(
 
                                            &mut self.exec_state, sched_ctx,
 
                                            (PortInstruction::NoSource, format!("failed sending on socket, reason: {}", err))
 
                                        );
 
                                        return CompScheduling::Immediate;
 
                                    }
 
                                }
 
                            }
 
                        }
 

	
 
                        // If here then we're done putting the data, we can
 
@@ -266,13 +276,17 @@ impl Component for ComponentTcpClient {
 
                                }
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return CompScheduling::Sleep; // wait until polled
 
                                } else {
 
                                    todo!("handle socket.receive error {:?}", err)
 
                                    component::default_handle_error_for_builtin(
 
                                        &mut self.exec_state, sched_ctx,
 
                                        (PortInstruction::NoSource, format!("failed receiving from socket, reason: {}", err))
 
                                    );
 
                                    return CompScheduling::Immediate;
 
                                }
 
                            }
 
                        }
 
                    },
 
                    ClientSyncState::FinishSync | ClientSyncState::FinishSyncThenQuit => {
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
@@ -296,42 +310,49 @@ impl Component for ComponentTcpClient {
 
        }
 
    }
 
}
 

	
 
impl ComponentTcpClient {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        fn client_socket_state_from_result(result: Result<SocketTcpClient, SocketError>) -> ClientSocketState {
 
            match result {
 
                Ok(socket) => ClientSocketState::Connected(socket),
 
                Err(error) => ClientSocketState::ErrorReported(format!("Failed to create socket, reason: {:?}", error)),
 
            }
 
        }
 

	
 
        // Two possible cases here: if the number of arguments is 3, then we
 
        // get: (socket_handle, input_port, output_port). If the number of
 
        // arguments is 4, then we get: (ip, port, input_port, output_port).
 
        assert!(arguments.values.len() == 3 || arguments.values.len() == 4);
 

	
 
        // Parsing arguments
 
        let (socket, input_port, output_port) = if arguments.values.len() == 3 {
 
        let (socket_state, input_port, output_port) = if arguments.values.len() == 3 {
 
            let socket_handle = arguments.values[0].as_sint32();
 
            let socket = SocketTcpClient::new_from_handle(socket_handle);
 
            let socket_state = client_socket_state_from_result(socket);
 

	
 
            let input_port = component::port_id_from_eval(arguments.values[1].as_input());
 
            let output_port = component::port_id_from_eval(arguments.values[2].as_output());
 

	
 
            (socket, input_port, output_port)
 
            (socket_state, input_port, output_port)
 
        } else {
 
            let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1);
 
            let socket = SocketTcpClient::new(ip_address, port);
 

	
 
            let input_port = component::port_id_from_eval(arguments.values[2].as_input());
 
            let output_port = component::port_id_from_eval(arguments.values[3].as_output());
 

	
 
            (socket, input_port, output_port)
 
        };
 
            let ip_and_port = ip_addr_and_port_from_args(&arguments, 0, 1);
 
            let socket_state = match ip_and_port {
 
                Ok((ip_address, port)) => client_socket_state_from_result(SocketTcpClient::new(ip_address, port)),
 
                Err(message) => ClientSocketState::ErrorReported(message),
 
            };
 

	
 
        if let Err(socket) = socket {
 
            todo!("friendly error reporting: failed to open socket (reason: {:?})", socket);
 
        }
 
            (socket_state, input_port, output_port)
 
        };
 

	
 
        return Self{
 
            socket_state: ClientSocketState::Connected(socket.unwrap()),
 
            socket_state,
 
            sync_state: ClientSyncState::AwaitingCmd,
 
            poll_ticket: None,
 
            inbox_main: vec![None, None],
 
            inbox_backup: Vec::new(),
 
            input_union_send_tag_value: -1,
 
            input_union_receive_tag_value: -1,
 
@@ -416,20 +437,21 @@ impl ComponentTcpClient {
 
// -----------------------------------------------------------------------------
 
// ComponentTcpListener
 
// -----------------------------------------------------------------------------
 

	
 
enum ListenerSocketState {
 
    Connected(SocketTcpListener),
 
    ErrorReported(String),
 
    Error,
 
}
 

	
 
impl ListenerSocketState {
 
    fn get_socket(&self) -> &SocketTcpListener {
 
        match self {
 
            ListenerSocketState::Connected(v) => return v,
 
            ListenerSocketState::Error => unreachable!(),
 
            ListenerSocketState::ErrorReported(_) | ListenerSocketState::Error => unreachable!(),
 
        }
 
    }
 
}
 

	
 
struct PendingComponent {
 
    client: i32, // OS socket handle
 
@@ -526,13 +548,13 @@ impl Component for ComponentTcpListener {
 
                    message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => {
 
                sched_ctx.info("Received polling event");
 
                sched_ctx.debug("Received polling event");
 
            },
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.info(&format!("Running component ComponentTcpListener (mode: {:?})", self.exec_state.mode));
 
@@ -578,16 +600,23 @@ impl Component for ComponentTcpListener {
 
                                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                            },
 
                        }
 

	
 
                        return CompScheduling::Immediate;
 
                    },
 
                    ListenerSocketState::Error => {
 
                        self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
                    ListenerSocketState::ErrorReported(message) => {
 
                        component::default_handle_error_for_builtin(
 
                            &mut self.exec_state, sched_ctx,
 
                            (PortInstruction::NoSource, message.clone())
 
                        );
 
                        self.socket_state = ListenerSocketState::Error;
 
                        return CompScheduling::Immediate;
 
                    }
 
                    ListenerSocketState::Error => {
 
                        return CompScheduling::Sleep;
 
                    }
 
                }
 
            },
 
            CompMode::Sync => {
 
                match self.sync_state {
 
                    ListenerSyncState::AwaitingCmd => {
 
                        match component::default_attempt_get(
 
@@ -665,13 +694,17 @@ impl Component for ComponentTcpListener {
 
                                return CompScheduling::Requeue;
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return CompScheduling::Sleep;
 
                                } else {
 
                                    todo!("handle listener.accept error {:?}", err)
 
                                    component::default_handle_error_for_builtin(
 
                                        &mut self.exec_state, sched_ctx,
 
                                        (PortInstruction::NoSource, format!("failed to listen on socket, reason: {}", err))
 
                                    );
 
                                    return CompScheduling::Immediate;
 
                                }
 
                            }
 
                        }
 
                    },
 
                    ListenerSyncState::AcceptChannelGenerated => {
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
@@ -702,23 +735,28 @@ impl Component for ComponentTcpListener {
 

	
 
impl ComponentTcpListener {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        debug_assert_eq!(arguments.values.len(), 4);
 

	
 
        // Parsing arguments
 
        let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1);
 
        let input_port = component::port_id_from_eval(arguments.values[2].as_input());
 
        let output_port = component::port_id_from_eval(arguments.values[3].as_output());
 

	
 
        let socket = SocketTcpListener::new(ip_address, port);
 
        if let Err(socket) = socket {
 
            todo!("friendly error reporting: failed to open socket (reason: {:?})", socket);
 
        }
 
        let socket_state = match ip_addr_and_port_from_args(&arguments, 0, 1) {
 
            Ok((ip_address, port)) => {
 
                let socket = SocketTcpListener::new(ip_address, port);
 
                match socket {
 
                    Ok(socket) => ListenerSocketState::Connected(socket),
 
                    Err(err) => ListenerSocketState::ErrorReported(format!("failed to create listener socket, reason: {:?}", err), )
 
                }
 
            },
 
            Err(message) => ListenerSocketState::ErrorReported(message),
 
        };
 

	
 
        return Self {
 
            socket_state: ListenerSocketState::Connected(socket.unwrap()),
 
            socket_state,
 
            sync_state: ListenerSyncState::AwaitingCmd,
 
            pending_component: None,
 
            poll_ticket: None,
 
            inbox_main: vec![None, None],
 
            inbox_backup: InboxBackup::new(),
 
            pdl_input_port_id: input_port,
 
@@ -749,13 +787,13 @@ impl ComponentTcpListener {
 
        }
 
    }
 
}
 

	
 
fn ip_addr_and_port_from_args(
 
    arguments: &ValueGroup, ip_index: usize, port_index: usize
 
) -> (IpAddr, u16) {
 
) -> Result<(IpAddr, u16), String> {
 
    debug_assert!(ip_index < arguments.values.len());
 
    debug_assert!(port_index < arguments.values.len());
 

	
 
    // Parsing IP address
 
    let ip_heap_pos = arguments.values[0].as_array();
 
    let ip_elements = &arguments.regions[ip_heap_pos as usize];
 
@@ -763,14 +801,14 @@ fn ip_addr_and_port_from_args(
 
    let ip_address = match ip_elements.len() {
 
        0 => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
 
        4 => IpAddr::V4(Ipv4Addr::new(
 
            ip_elements[0].as_uint8(), ip_elements[1].as_uint8(),
 
            ip_elements[2].as_uint8(), ip_elements[3].as_uint8()
 
        )),
 
        _ => todo!("friendly error reporting: ip should contain 4 octets (or 0 for unspecified)")
 
        _ => return Err(format!("Expected 0 or 4 elements in the IP address, got {}", ip_elements.len())),
 
    };
 

	
 
    let port = arguments.values[port_index].as_uint16();
 

	
 
    return (ip_address, port);
 
    return Ok((ip_address, port));
 
}
 

	
0 comments (0 inline, 0 general)