diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index decdae7dadc7d079f18c0af714a1f380dd820798..9fec80daf7169dba1681825cdfe547e42714ec11 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -9,6 +9,7 @@ use super::control_layer::*; use super::consensus::*; use std::io::ErrorKind as IoErrorKind; +use std::net::{IpAddr, Ipv4Addr}; // ----------------------------------------------------------------------------- // ComponentTcpClient @@ -66,8 +67,8 @@ impl Component for ComponentTcpClient { fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) { // Retrieve type information for messages we're going to receive let pd = &sched_ctx.runtime.protocol; - let cmd_type = pd.find_type(b"std.internet", b"Cmd") - .expect("'Cmd' type in the 'std.internet' module"); + let cmd_type = pd.find_type(b"std.internet", b"ClientCmd") + .expect("'ClientCmd' type in the 'std.internet' module"); let cmd_type = cmd_type .as_union(); @@ -80,7 +81,7 @@ impl Component for ComponentTcpClient { if let ClientSocketState::Connected(socket) = &self.socket_state { let self_handle = sched_ctx.runtime.get_component_public(id); let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, true) - .expect("registering tcp component"); + .expect("registering tcp client"); debug_assert!(self.poll_ticket.is_none()); self.poll_ticket = Some(poll_ticket); @@ -140,7 +141,7 @@ impl Component for ComponentTcpClient { }, CompMode::NonSync => { // When in non-sync mode - match &mut self.socket_state { + match &self.socket_state { ClientSocketState::Connected(_socket) => { if self.sync_state == ClientSyncState::FinishSyncThenQuit { // Previous request was to let the component shut down @@ -296,22 +297,10 @@ impl Component for ComponentTcpClient { impl ComponentTcpClient { pub(crate) fn new(arguments: ValueGroup) -> Self { - use std::net::{IpAddr, Ipv4Addr}; - debug_assert_eq!(arguments.values.len(), 4); // Parsing arguments - let ip_heap_pos = arguments.values[0].as_array(); - let ip_elements = &arguments.regions[ip_heap_pos as usize]; - if ip_elements.len() != 4 { - todo!("friendly error reporting: ip contains 4 octects"); - } - let ip_address = IpAddr::V4(Ipv4Addr::new( - ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), - ip_elements[2].as_uint8(), ip_elements[3].as_uint8() - )); - - let port = arguments.values[1].as_uint16(); + let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1); let input_port = component::port_id_from_eval(arguments.values[2].as_input()); let output_port = component::port_id_from_eval(arguments.values[3].as_output()); @@ -324,7 +313,27 @@ impl ComponentTcpClient { socket_state: ClientSocketState::Connected(socket.unwrap()), sync_state: ClientSyncState::AwaitingCmd, poll_ticket: None, - inbox_main: vec![None], + inbox_main: vec![None, None], + inbox_backup: Vec::new(), + input_union_send_tag_value: -1, + input_union_receive_tag_value: -1, + input_union_finish_tag_value: -1, + input_union_shutdown_tag_value: -1, + pdl_input_port_id: input_port, + pdl_output_port_id: output_port, + exec_state: CompExecState::new(), + control: ControlLayer::default(), + consensus: Consensus::new(), + byte_buffer: Vec::new(), + } + } + + pub(crate) fn new_with_existing_connection(socket: SocketTcpClient, input_port: PortId, output_port: PortId) -> Self { + return Self{ + socket_state: ClientSocketState::Connected(socket);, + sync_state: ClientSyncState::AwaitingCmd, + poll_ticket: None, + inbox_main: vec![None, None], inbox_backup: Vec::new(), input_union_send_tag_value: -1, input_union_receive_tag_value: -1, @@ -395,15 +404,258 @@ enum ListenerSocketState { Error, } +impl ListenerSocketState { + fn get_socket(&self) -> &SocketTcpListener { + match self { + ListenerSocketState::Connected(v) => return v, + ListenerSocketState::Error => unreachable!(), + } + } +} + +#[derive(PartialEq, Debug)] +enum ListenerSyncState { + AwaitingCmd, + Accept, + FinishSyncThenQuit, +} + pub struct ComponentTcpListener { // Properties for the tcp socket socket_state: ListenerSocketState, + sync_state: ListenerSyncState, poll_ticket: Option, inbox_main: InboxMain, inbox_backup: InboxBackup, - pdl_output_port_id: PortId, // output port, sends + pdl_input_port_id: PortId, // input port, receives commands + pdl_output_port_id: PortId, // output port, sends connections // Information about union tags - output_struct_rx_index: i64, - output_struct_tx_index: i64, + input_union_accept_tag: i64, + input_union_shutdown_tag: i64, + output_struct_rx_index: usize, + output_struct_tx_index: usize, // Generic component state -} \ No newline at end of file + exec_state: CompExecState, + control: ControlLayer, + consensus: Consensus, +} + +impl Component for ComponentTcpListener { + fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) { + // Retrieve type information for the message with ports we're going to send + let pd = &sched_ctx.runtime.protocol; + + let cmd_type = pd.find_type(b"std.internet", b"ListenerCmd") + .expect("'ListenerCmd' type in the 'std.internet' module"); + let cmd_type = cmd_type.as_union(); + + self.input_union_accept_tag = cmd_type.get_variant_tag_value(b"Accept").unwrap(); + self.input_union_shutdown_tag = cmd_type.get_variant_tag_value(b"Shutdown").unwrap(); + + let conn_type = pd.find_type(b"std.internet", b"TcpConnection") + .expect("'TcpConnection' type in the 'std.internet' module"); + let conn_type = conn_type.as_struct(); + + assert_eq!(conn_type.get_num_struct_fields(), 2); + self.output_struct_rx_index = conn_type.get_struct_field_index(b"rx").unwrap(); + self.output_struct_tx_index = conn_type.get_struct_field_index(b"tx").unwrap(); + + // Register socket for async events + if let ListenerSocketState::Connected(socket) = &self.socket_state { + let self_handle = sched_ctx.runtime.get_component_public(id); + let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, false) + .expect("registering tcp listener"); + + debug_assert!(self.poll_ticket.is_none()); + self.poll_ticket = Some(poll_ticket); + } + } + + fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) { + if let Some(poll_ticket) = self.poll_ticket.take() { + sched_ctx.polling.unregister(poll_ticket); + } + } + + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { + unreachable!(); + } + + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + match message { + Message::Data(_message) => unreachable!(), + Message::Sync(message) => { + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); + }, + Message::Control(message) => { + if let Err(location_and_message) = component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup + ) { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + } + }, + Message::Poll => { + sched_ctx.info("Received polling event"); + }, + } + } + + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { + sched_ctx.info(&format!("Running comonent ComponentTcpListener (mode: {:?})", self.exec_state.mode)); + + match self.exec_state.mode { + CompMode::BlockedSelect + => unreachable!(), + CompMode::PutPortsBlockedTransferredPorts | + CompMode::PutPortsBlockedAwaitingAcks | + CompMode::PutPortsBlockedSendingPort | + CompMode::NewComponentBlocked + => return CompScheduling::Sleep, + CompMode::NonSync => { + match &self.socket_state { + ListenerSocketState::Connected(_socket) => { + if self.sync_state == ListenerSyncState::FinishSyncThenQuit { + self.exec_state.set_as_start_exit(ExitReason::Termination); + } else { + self.sync_state = ListenerSyncState::AwaitingCmd; + component::default_handle_sync_start( + &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus + ); + } + return CompScheduling::Immediate; + }, + ListenerSocketState::Error => { + self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync); + return CompScheduling::Immediate; + } + } + }, + CompMode::Sync => { + match self.sync_state { + ListenerSyncState::AwaitingCmd => { + match component::default_attempt_get( + &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource, + &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, + &mut self.control, &mut self.consensus + ) { + GetResult::Received(message) => { + let (tag_value, _) = message.content.values[0].as_union(); + if tag_value == self.input_union_accept_tag { + self.sync_state = ListenerSyncState::Accept; + } else if tag_value == self.input_union_shutdown_tag { + self.sync_state = ListenerSyncState::FinishSyncThenQuit; + } else { + unreachable!("got tag_value {}", tag_value); + } + + return CompScheduling::Immediate; + }, + GetResult::NoMessage => { + return CompScheduling::Sleep; + }, + GetResult::Error(location_and_message) => { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate; + } + } + }, + ListenerSyncState::Accept => { + let socket = self.socket_state.get_socket(); + match socket.accept() { + Ok(client) => { + // TODO: Continue here, somehow precreate component, but do the transfer correctly + let client = client.unwrap(); + component::default_start_create_component() + todo!("actually create the component") + }, + Err(err) => { + if err.kind() == IoErrorKind::WouldBlock { + return CompScheduling::Sleep; + } else { + todo!("handle listener.accept error {:?}", err); + } + } + } + }, + ListenerSyncState::FinishSyncThenQuit => { + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + return CompScheduling::Requeue; + } + } + return CompScheduling::Sleep; + }, + CompMode::BlockedGet => { + debug_assert_eq!(self.sync_state, ListenerSyncState::AwaitingCmd); + return CompScheduling::Sleep; + }, + CompMode::SyncEnd | CompMode::BlockedPut + => return CompScheduling::Sleep, + CompMode::StartExit => + return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus), + CompMode::BusyExit => + return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx), + CompMode::Exit => + return component::default_handle_exit(&self.exec_state), + } + } +} + +impl ComponentTcpListener { + pub(crate) fn new(arguments: ValueGroup) -> Self { + debug_assert_eq!(arguments.values.len(), 4); + + // Parsing arguments + let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1); + let input_port = component::port_id_from_eval(arguments.values[2].as_input()); + let output_port = component::port_id_from_eval(arguments.values[3].as_output()); + + let socket = SocketTcpListener::new(ip_address, port); + if let Err(socket) = socket { + todo!("friendly error reporting: failed to open socket (reason: {:?})", socket); + } + + return Self { + socket_state: ListenerSocketState::Connected(socket.unwrap()), + sync_state: ListenerSyncState::AwaitingCmd, + poll_ticket: None, + inbox_main: vec![None, None], + inbox_backup: InboxBackup::new(), + pdl_input_port_id: input_port, + pdl_output_port_id: output_port, + input_union_accept_tag: -1, + input_union_shutdown_tag: -1, + output_struct_tx_index: 0, + output_struct_rx_index: 0, + exec_state: CompExecState::new(), + control: ControlLayer::default(), + consensus: Consensus::new(), + } + } +} + +fn ip_addr_and_port_from_args( + arguments: &ValueGroup, ip_index: usize, port_index: usize +) -> (IpAddr, u16) { + debug_assert!(ip_index < arguments.values.len()); + debug_assert!(port_index < arguments.values.len()); + + // Parsing IP address + let ip_heap_pos = arguments.values[0].as_array(); + let ip_elements = &arguments.regions[ip_heap_pos as usize]; + + let ip_address = match ip_elements.len() { + 0 => IpAddr::V4(Ipv4Addr::UNSPECIFIED), + 4 => IpAddr::V4(Ipv4Addr::new( + ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), + ip_elements[2].as_uint8(), ip_elements[3].as_uint8() + )), + _ => todo!("friendly error reporting: ip should contain 4 octets (or 0 for unspecified)") + }; + + let port = arguments.values[port_index].as_uint16(); + + return (ip_address, port); +} +