diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 4d508f2545e458c5747fd9fdd2eed7fe9d269b92..d40466dfd2406d124b09c13d4c5f91e6f5743b8a 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -8,18 +8,26 @@ use super::component::{self, *}; use super::control_layer::*; use super::consensus::*; + use std::io::ErrorKind as IoErrorKind; +use std::net::{IpAddr, Ipv4Addr}; +use crate::protocol::{ProcedureDefinitionId, TypeId}; + +// ----------------------------------------------------------------------------- +// ComponentTcpClient +// ----------------------------------------------------------------------------- -enum SocketState { +enum ClientSocketState { Connected(SocketTcpClient), + ErrorReported(String), Error, } -impl SocketState { +impl ClientSocketState { fn get_socket(&self) -> &SocketTcpClient { match self { - SocketState::Connected(v) => v, - SocketState::Error => unreachable!(), + ClientSocketState::Connected(v) => v, + ClientSocketState::ErrorReported(_) | ClientSocketState::Error => unreachable!(), } } } @@ -28,7 +36,7 @@ impl SocketState { /// TCP component (i.e. from the point of view of attempting to interface with /// a socket). #[derive(PartialEq, Debug)] -enum SyncState { +enum ClientSyncState { AwaitingCmd, Getting, Putting, @@ -38,13 +46,14 @@ enum SyncState { pub struct ComponentTcpClient { // Properties for the tcp socket - socket_state: SocketState, - sync_state: SyncState, + socket_state: ClientSocketState, + sync_state: ClientSyncState, poll_ticket: Option, inbox_main: InboxMain, inbox_backup: InboxBackup, pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket + // Information about union tags, extracted from PDL input_union_send_tag_value: i64, input_union_receive_tag_value: i64, input_union_finish_tag_value: i64, @@ -61,8 +70,8 @@ impl Component for ComponentTcpClient { fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) { // Retrieve type information for messages we're going to receive let pd = &sched_ctx.runtime.protocol; - let cmd_type = pd.find_type(b"std.internet", b"Cmd") - .expect("'Cmd' type in the 'std.internet' module"); + let cmd_type = pd.find_type(b"std.internet", b"ClientCmd") + .expect("'ClientCmd' type in the 'std.internet' module"); let cmd_type = cmd_type .as_union(); @@ -72,10 +81,10 @@ impl Component for ComponentTcpClient { self.input_union_shutdown_tag_value = cmd_type.get_variant_tag_value(b"Shutdown").unwrap(); // Register socket for async events - if let SocketState::Connected(socket) = &self.socket_state { + if let ClientSocketState::Connected(socket) = &self.socket_state { let self_handle = sched_ctx.runtime.get_component_public(id); let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, true) - .expect("registering tcp component"); + .expect("registering tcp client"); debug_assert!(self.poll_ticket.is_none()); self.poll_ticket = Some(poll_ticket); @@ -116,7 +125,7 @@ impl Component for ComponentTcpClient { } }, Message::Poll => { - sched_ctx.info("Received polling event"); + sched_ctx.debug("Received polling event"); }, } } @@ -135,32 +144,37 @@ impl Component for ComponentTcpClient { }, CompMode::NonSync => { // When in non-sync mode - match &mut self.socket_state { - SocketState::Connected(_socket) => { - if self.sync_state == SyncState::FinishSyncThenQuit { + match &self.socket_state { + ClientSocketState::Connected(_socket) => { + if self.sync_state == ClientSyncState::FinishSyncThenQuit { // Previous request was to let the component shut down self.exec_state.set_as_start_exit(ExitReason::Termination); } else { // Reset for a new request - self.sync_state = SyncState::AwaitingCmd; + self.sync_state = ClientSyncState::AwaitingCmd; component::default_handle_sync_start( &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus ); } return CompScheduling::Immediate; }, - SocketState::Error => { - // Could potentially send an error message to the - // connected component. - self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync); + ClientSocketState::ErrorReported(message) => { + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, + (PortInstruction::NoSource, format!("failed socket creation, reason: {}", message)) + ); + self.socket_state = ClientSocketState::Error; return CompScheduling::Immediate; } + ClientSocketState::Error => { + return CompScheduling::Sleep; + } } }, CompMode::Sync => { // When in sync mode: wait for a command to come in match self.sync_state { - SyncState::AwaitingCmd => { + ClientSyncState::AwaitingCmd => { match component::default_attempt_get( &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource, &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, @@ -180,16 +194,16 @@ impl Component for ComponentTcpClient { self.byte_buffer.push(value.as_uint8()); } - self.sync_state = SyncState::Putting; + self.sync_state = ClientSyncState::Putting; } else if tag_value == self.input_union_receive_tag_value { // Component requires a `recv` - self.sync_state = SyncState::Getting; + self.sync_state = ClientSyncState::Getting; } else if tag_value == self.input_union_finish_tag_value { // Component requires us to end the sync round - self.sync_state = SyncState::FinishSync; + self.sync_state = ClientSyncState::FinishSync; } else if tag_value == self.input_union_shutdown_tag_value { // Component wants to close the connection - self.sync_state = SyncState::FinishSyncThenQuit; + self.sync_state = ClientSyncState::FinishSyncThenQuit; } else { unreachable!("got tag_value {}", tag_value) } @@ -205,7 +219,7 @@ impl Component for ComponentTcpClient { } } }, - SyncState::Putting => { + ClientSyncState::Putting => { // We're supposed to send a user-supplied message fully // over the socket. But we might end up blocking. In // that case the component goes to sleep until it is @@ -220,7 +234,11 @@ impl Component for ComponentTcpClient { if err.kind() == IoErrorKind::WouldBlock { return CompScheduling::Sleep; // wait until notified } else { - todo!("handle socket.send error {:?}", err) + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, + (PortInstruction::NoSource, format!("failed sending on socket, reason: {}", err)) + ); + return CompScheduling::Immediate; } } } @@ -231,7 +249,7 @@ impl Component for ComponentTcpClient { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; }, - SyncState::Getting => { + ClientSyncState::Getting => { // We're going to try and receive a single message. If // this causes us to end up blocking the component // goes to sleep until it is polled. @@ -253,7 +271,7 @@ impl Component for ComponentTcpClient { return CompScheduling::Immediate; } else { let scheduling = send_result.unwrap(); - self.sync_state = SyncState::AwaitingCmd; + self.sync_state = ClientSyncState::AwaitingCmd; return scheduling; } }, @@ -261,12 +279,16 @@ impl Component for ComponentTcpClient { if err.kind() == IoErrorKind::WouldBlock { return CompScheduling::Sleep; // wait until polled } else { - todo!("handle socket.receive error {:?}", err) + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, + (PortInstruction::NoSource, format!("failed receiving from socket, reason: {}", err)) + ); + return CompScheduling::Immediate; } } } }, - SyncState::FinishSync | SyncState::FinishSyncThenQuit => { + ClientSyncState::FinishSync | ClientSyncState::FinishSyncThenQuit => { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; }, @@ -274,7 +296,7 @@ impl Component for ComponentTcpClient { }, CompMode::BlockedGet => { // Entered when awaiting a new command - debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); + debug_assert_eq!(self.sync_state, ClientSyncState::AwaitingCmd); return CompScheduling::Sleep; }, CompMode::SyncEnd | CompMode::BlockedPut => @@ -291,35 +313,66 @@ impl Component for ComponentTcpClient { impl ComponentTcpClient { pub(crate) fn new(arguments: ValueGroup) -> Self { - use std::net::{IpAddr, Ipv4Addr}; + fn client_socket_state_from_result(result: Result) -> ClientSocketState { + match result { + Ok(socket) => ClientSocketState::Connected(socket), + Err(error) => ClientSocketState::ErrorReported(format!("Failed to create socket, reason: {:?}", error)), + } + } - debug_assert_eq!(arguments.values.len(), 4); + // Two possible cases here: if the number of arguments is 3, then we + // get: (socket_handle, input_port, output_port). If the number of + // arguments is 4, then we get: (ip, port, input_port, output_port). + assert!(arguments.values.len() == 3 || arguments.values.len() == 4); // Parsing arguments - let ip_heap_pos = arguments.values[0].as_array(); - let ip_elements = &arguments.regions[ip_heap_pos as usize]; - if ip_elements.len() != 4 { - todo!("friendly error reporting: ip contains 4 octects"); - } - let ip_address = IpAddr::V4(Ipv4Addr::new( - ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), - ip_elements[2].as_uint8(), ip_elements[3].as_uint8() - )); + let (socket_state, input_port, output_port) = if arguments.values.len() == 3 { + let socket_handle = arguments.values[0].as_sint32(); + let socket = SocketTcpClient::new_from_handle(socket_handle); + let socket_state = client_socket_state_from_result(socket); - let port = arguments.values[1].as_uint16(); - let input_port = component::port_id_from_eval(arguments.values[2].as_input()); - let output_port = component::port_id_from_eval(arguments.values[3].as_output()); + let input_port = component::port_id_from_eval(arguments.values[1].as_input()); + let output_port = component::port_id_from_eval(arguments.values[2].as_output()); + + (socket_state, input_port, output_port) + } else { + let input_port = component::port_id_from_eval(arguments.values[2].as_input()); + let output_port = component::port_id_from_eval(arguments.values[3].as_output()); - let socket = SocketTcpClient::new(ip_address, port); - if let Err(socket) = socket { - todo!("friendly error reporting: failed to open socket (reason: {:?})", socket); + let ip_and_port = ip_addr_and_port_from_args(&arguments, 0, 1); + let socket_state = match ip_and_port { + Ok((ip_address, port)) => client_socket_state_from_result(SocketTcpClient::new(ip_address, port)), + Err(message) => ClientSocketState::ErrorReported(message), + }; + + (socket_state, input_port, output_port) + }; + + return Self{ + socket_state, + sync_state: ClientSyncState::AwaitingCmd, + poll_ticket: None, + inbox_main: vec![None, None], + inbox_backup: Vec::new(), + input_union_send_tag_value: -1, + input_union_receive_tag_value: -1, + input_union_finish_tag_value: -1, + input_union_shutdown_tag_value: -1, + pdl_input_port_id: input_port, + pdl_output_port_id: output_port, + exec_state: CompExecState::new(), + control: ControlLayer::default(), + consensus: Consensus::new(), + byte_buffer: Vec::new(), } + } + pub(crate) fn new_with_existing_connection(socket: SocketTcpClient, input_port: PortId, output_port: PortId) -> Self { return Self{ - socket_state: SocketState::Connected(socket.unwrap()), - sync_state: SyncState::AwaitingCmd, + socket_state: ClientSocketState::Connected(socket), + sync_state: ClientSyncState::AwaitingCmd, poll_ticket: None, - inbox_main: vec![None], + inbox_main: vec![None, None], inbox_backup: Vec::new(), input_union_send_tag_value: -1, input_union_receive_tag_value: -1, @@ -379,4 +432,383 @@ impl ComponentTcpClient { return value_group; } -} \ No newline at end of file +} + +// ----------------------------------------------------------------------------- +// ComponentTcpListener +// ----------------------------------------------------------------------------- + +enum ListenerSocketState { + Connected(SocketTcpListener), + ErrorReported(String), + Error, +} + +impl ListenerSocketState { + fn get_socket(&self) -> &SocketTcpListener { + match self { + ListenerSocketState::Connected(v) => return v, + ListenerSocketState::ErrorReported(_) | ListenerSocketState::Error => unreachable!(), + } + } +} + +struct PendingComponent { + client: i32, // OS socket handle + cmd_rx: PortId, + data_tx: PortId, +} + +enum ListenerSyncState { + AwaitingCmd, + AcceptCommandReceived, // just received `Accept` command + AcceptChannelGenerated, // created channel, waiting to end the sync round + AcceptGenerateComponent, // sync ended, back in non-sync, now generate component + FinishSyncThenQuit, +} + +pub struct ComponentTcpListener { + // Properties for the tcp socket + socket_state: ListenerSocketState, + sync_state: ListenerSyncState, + pending_component: Option, + poll_ticket: Option, + inbox_main: InboxMain, + inbox_backup: InboxBackup, + pdl_input_port_id: PortId, // input port, receives commands + pdl_output_port_id: PortId, // output port, sends connections + // Type information extracted from protocol + tcp_client_definition: (ProcedureDefinitionId, TypeId), + input_union_accept_tag: i64, + input_union_shutdown_tag: i64, + output_struct_rx_index: usize, + output_struct_tx_index: usize, + // Generic component state + exec_state: CompExecState, + control: ControlLayer, + consensus: Consensus, +} + +impl Component for ComponentTcpListener { + fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) { + // Retrieve type information for the message with ports we're going to send + let pd = &sched_ctx.runtime.protocol; + + self.tcp_client_definition = sched_ctx.runtime.protocol.find_procedure(b"std.internet", b"tcp_client") + .expect("'tcp_client' component in the 'std.internet' module"); + + let cmd_type = pd.find_type(b"std.internet", b"ListenerCmd") + .expect("'ListenerCmd' type in the 'std.internet' module"); + let cmd_type = cmd_type.as_union(); + + self.input_union_accept_tag = cmd_type.get_variant_tag_value(b"Accept").unwrap(); + self.input_union_shutdown_tag = cmd_type.get_variant_tag_value(b"Shutdown").unwrap(); + + let conn_type = pd.find_type(b"std.internet", b"TcpConnection") + .expect("'TcpConnection' type in the 'std.internet' module"); + let conn_type = conn_type.as_struct(); + + assert_eq!(conn_type.get_num_struct_fields(), 2); + self.output_struct_rx_index = conn_type.get_struct_field_index(b"rx").unwrap(); + self.output_struct_tx_index = conn_type.get_struct_field_index(b"tx").unwrap(); + + // Register socket for async events + if let ListenerSocketState::Connected(socket) = &self.socket_state { + let self_handle = sched_ctx.runtime.get_component_public(id); + let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, false) + .expect("registering tcp listener"); + + debug_assert!(self.poll_ticket.is_none()); + self.poll_ticket = Some(poll_ticket); + } + } + + fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) { + if let Some(poll_ticket) = self.poll_ticket.take() { + sched_ctx.polling.unregister(poll_ticket); + } + } + + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { + unreachable!(); + } + + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + match message { + Message::Data(message) => { + self.handle_incoming_data_message(sched_ctx, comp_ctx, message); + }, + Message::Sync(message) => { + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); + }, + Message::Control(message) => { + if let Err(location_and_message) = component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup + ) { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + } + }, + Message::Poll => { + sched_ctx.debug("Received polling event"); + }, + } + } + + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { + sched_ctx.info(&format!("Running component ComponentTcpListener (mode: {:?})", self.exec_state.mode)); + + match self.exec_state.mode { + CompMode::BlockedSelect + => unreachable!(), + CompMode::PutPortsBlockedTransferredPorts | + CompMode::PutPortsBlockedAwaitingAcks | + CompMode::PutPortsBlockedSendingPort | + CompMode::NewComponentBlocked + => return CompScheduling::Sleep, + CompMode::NonSync => { + match &self.socket_state { + ListenerSocketState::Connected(_socket) => { + match self.sync_state { + ListenerSyncState::AwaitingCmd => { + component::default_handle_sync_start( + &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus + ); + }, + ListenerSyncState::AcceptCommandReceived | + ListenerSyncState::AcceptChannelGenerated => unreachable!(), + ListenerSyncState::AcceptGenerateComponent => { + // Now that we're outside the sync round, create the tcp client + // component + let pending = self.pending_component.take().unwrap(); + + let arguments = ValueGroup::new_stack(vec![ + Value::SInt32(pending.client), + Value::Input(port_id_to_eval(pending.cmd_rx)), + Value::Output(port_id_to_eval(pending.data_tx)), + ]); + component::default_start_create_component( + &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, + &mut self.inbox_main, &mut self.inbox_backup, + self.tcp_client_definition.0, self.tcp_client_definition.1, + arguments + ); + self.sync_state = ListenerSyncState::AwaitingCmd; + }, + ListenerSyncState::FinishSyncThenQuit => { + self.exec_state.set_as_start_exit(ExitReason::Termination); + }, + } + + return CompScheduling::Immediate; + }, + ListenerSocketState::ErrorReported(message) => { + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, + (PortInstruction::NoSource, message.clone()) + ); + self.socket_state = ListenerSocketState::Error; + return CompScheduling::Immediate; + } + ListenerSocketState::Error => { + return CompScheduling::Sleep; + } + } + }, + CompMode::Sync => { + match self.sync_state { + ListenerSyncState::AwaitingCmd => { + match component::default_attempt_get( + &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource, + &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, + &mut self.control, &mut self.consensus + ) { + GetResult::Received(message) => { + let (tag_value, _) = message.content.values[0].as_union(); + if tag_value == self.input_union_accept_tag { + self.sync_state = ListenerSyncState::AcceptCommandReceived; + } else if tag_value == self.input_union_shutdown_tag { + self.sync_state = ListenerSyncState::FinishSyncThenQuit; + } else { + unreachable!("got tag_value {}", tag_value); + } + + return CompScheduling::Immediate; + }, + GetResult::NoMessage => { + return CompScheduling::Sleep; + }, + GetResult::Error(location_and_message) => { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate; + } + } + }, + ListenerSyncState::AcceptCommandReceived => { + let socket = self.socket_state.get_socket(); + match socket.accept() { + Ok(client_handle) => { + // Create the channels (and the inbox entries, to stay consistent + // with the expectations from the `component` module's functions) + let cmd_channel = comp_ctx.create_channel(); + let data_channel = comp_ctx.create_channel(); + + let port_ids = [ + cmd_channel.putter_id, cmd_channel.getter_id, + data_channel.putter_id, data_channel.getter_id, + ]; + for port_id in port_ids { + let expected_port_index = self.inbox_main.len(); + let port_handle = comp_ctx.get_port_handle(port_id); + self.inbox_main.push(None); + self.consensus.notify_of_new_port(expected_port_index, port_handle, comp_ctx); + } + + // Construct the message containing the appropriate ports that will + // be sent to the component commanding this listener. + let mut values = ValueGroup::new_stack(Vec::with_capacity(1)); + values.values.push(Value::Struct(0)); + values.regions.push(vec![Value::Unassigned, Value::Unassigned]); + values.regions[0][self.output_struct_tx_index] = Value::Output(port_id_to_eval(cmd_channel.putter_id)); + values.regions[0][self.output_struct_rx_index] = Value::Input(port_id_to_eval(data_channel.getter_id)); + if let Err(location_and_message) = component::default_send_data_message( + &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, values, + sched_ctx, &mut self.consensus, &mut self.control, comp_ctx + ) { + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, location_and_message + ); + } + + // Prepare for finishing the consensus round, and once finished, + // create the tcp client component + self.sync_state = ListenerSyncState::AcceptChannelGenerated; + debug_assert!(self.pending_component.is_none()); + self.pending_component = Some(PendingComponent{ + client: client_handle, + cmd_rx: cmd_channel.getter_id, + data_tx: data_channel.putter_id + }); + + return CompScheduling::Requeue; + }, + Err(err) => { + if err.kind() == IoErrorKind::WouldBlock { + return CompScheduling::Sleep; + } else { + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, + (PortInstruction::NoSource, format!("failed to listen on socket, reason: {}", err)) + ); + return CompScheduling::Immediate; + } + } + } + }, + ListenerSyncState::AcceptChannelGenerated => { + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + self.sync_state = ListenerSyncState::AcceptGenerateComponent; + return CompScheduling::Requeue; + } + ListenerSyncState::FinishSyncThenQuit => { + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + return CompScheduling::Requeue; + }, + ListenerSyncState::AcceptGenerateComponent => unreachable!(), + } + }, + CompMode::BlockedGet => { + return CompScheduling::Sleep; + }, + CompMode::SyncEnd | CompMode::BlockedPut + => return CompScheduling::Sleep, + CompMode::StartExit => + return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus), + CompMode::BusyExit => + return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx), + CompMode::Exit => + return component::default_handle_exit(&self.exec_state), + } + } +} + +impl ComponentTcpListener { + pub(crate) fn new(arguments: ValueGroup) -> Self { + debug_assert_eq!(arguments.values.len(), 4); + + // Parsing arguments + let input_port = component::port_id_from_eval(arguments.values[2].as_input()); + let output_port = component::port_id_from_eval(arguments.values[3].as_output()); + + let socket_state = match ip_addr_and_port_from_args(&arguments, 0, 1) { + Ok((ip_address, port)) => { + let socket = SocketTcpListener::new(ip_address, port); + match socket { + Ok(socket) => ListenerSocketState::Connected(socket), + Err(err) => ListenerSocketState::ErrorReported(format!("failed to create listener socket, reason: {:?}", err), ) + } + }, + Err(message) => ListenerSocketState::ErrorReported(message), + }; + + return Self { + socket_state, + sync_state: ListenerSyncState::AwaitingCmd, + pending_component: None, + poll_ticket: None, + inbox_main: vec![None, None], + inbox_backup: InboxBackup::new(), + pdl_input_port_id: input_port, + pdl_output_port_id: output_port, + tcp_client_definition: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()), + input_union_accept_tag: -1, + input_union_shutdown_tag: -1, + output_struct_tx_index: 0, + output_struct_rx_index: 0, + exec_state: CompExecState::new(), + control: ControlLayer::default(), + consensus: Consensus::new(), + } + } + + fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + if self.exec_state.mode.is_in_sync_block() { + self.consensus.handle_incoming_data_message(comp_ctx, &message); + } + + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => {}, + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); + } + } + } +} + +fn ip_addr_and_port_from_args( + arguments: &ValueGroup, ip_index: usize, port_index: usize +) -> Result<(IpAddr, u16), String> { + debug_assert!(ip_index < arguments.values.len()); + debug_assert!(port_index < arguments.values.len()); + + // Parsing IP address + let ip_heap_pos = arguments.values[0].as_array(); + let ip_elements = &arguments.regions[ip_heap_pos as usize]; + + let ip_address = match ip_elements.len() { + 0 => IpAddr::V4(Ipv4Addr::UNSPECIFIED), + 4 => IpAddr::V4(Ipv4Addr::new( + ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), + ip_elements[2].as_uint8(), ip_elements[3].as_uint8() + )), + _ => return Err(format!("Expected 0 or 4 elements in the IP address, got {}", ip_elements.len())), + }; + + let port = arguments.values[port_index].as_uint16(); + + return Ok((ip_address, port)); +} +