use crate::protocol::eval::{ValueGroup, Value}; use crate::runtime2::*; use crate::runtime2::component::{CompCtx, CompId, PortInstruction}; use crate::runtime2::stdlib::internet::*; use crate::runtime2::poll::*; use super::component::{self, *}; use super::control_layer::*; use super::consensus::*; use std::io::ErrorKind as IoErrorKind; enum SocketState { Connected(SocketTcpClient), Error, } impl SocketState { fn get_socket(&self) -> &SocketTcpClient { match self { SocketState::Connected(v) => v, SocketState::Error => unreachable!(), } } } /// States from the point of view of the component that is connecting to this /// TCP component (i.e. from the point of view of attempting to interface with /// a socket). #[derive(PartialEq, Debug)] enum SyncState { AwaitingCmd, Getting, Putting, FinishSync, FinishSyncThenQuit, } pub struct ComponentTcpClient { // Properties for the tcp socket socket_state: SocketState, sync_state: SyncState, poll_ticket: Option, inbox_main: InboxMain, inbox_backup: InboxBackup, pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket input_union_send_tag_value: i64, input_union_receive_tag_value: i64, input_union_finish_tag_value: i64, input_union_shutdown_tag_value: i64, // Generic component state exec_state: CompExecState, control: ControlLayer, consensus: Consensus, // Temporary variables byte_buffer: Vec, } impl Component for ComponentTcpClient { fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) { // Retrieve type information for messages we're going to receive let pd = &sched_ctx.runtime.protocol; let cmd_type = pd.find_type(b"std.internet", b"Cmd") .expect("'Cmd' type in the 'std.internet' module"); let cmd_type = cmd_type .as_union(); self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap(); self.input_union_receive_tag_value = cmd_type.get_variant_tag_value(b"Receive").unwrap(); self.input_union_finish_tag_value = cmd_type.get_variant_tag_value(b"Finish").unwrap(); self.input_union_shutdown_tag_value = cmd_type.get_variant_tag_value(b"Shutdown").unwrap(); // Register socket for async events if let SocketState::Connected(socket) = &self.socket_state { let self_handle = sched_ctx.runtime.get_component_public(id); let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, true) .expect("registering tcp component"); debug_assert!(self.poll_ticket.is_none()); self.poll_ticket = Some(poll_ticket); } } fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) { if let Some(poll_ticket) = self.poll_ticket.take() { sched_ctx.polling.unregister(poll_ticket) .expect("unregistering tcp component"); } } fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { let slot = &mut self.inbox_main[0]; if slot.is_none() { *slot = Some(message); } else { self.inbox_backup.push(message); } } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { match message { Message::Data(message) => { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); }, Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx ) { component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); } }, Message::Poll => { sched_ctx.info("Received polling event"); }, } } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); match self.exec_state.mode { CompMode::BlockedSelect => { // Not possible: we never enter this state unreachable!(); }, CompMode::NonSync => { // When in non-sync mode match &mut self.socket_state { SocketState::Connected(_socket) => { if self.sync_state == SyncState::FinishSyncThenQuit { // Previous request was to let the component shut down self.exec_state.set_as_start_exit(ExitReason::Termination); } else { // Reset for a new request self.sync_state = SyncState::AwaitingCmd; component::default_handle_sync_start( &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus ); } return CompScheduling::Immediate; }, SocketState::Error => { // Could potentially send an error message to the // connected component. self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync); return CompScheduling::Immediate; } } }, CompMode::Sync => { // When in sync mode: wait for a command to come in match self.sync_state { SyncState::AwaitingCmd => { match component::default_attempt_get( &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource, &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, &mut self.control, &mut self.consensus ) { GetResult::Received(message) => { let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); if tag_value == self.input_union_send_tag_value { // Retrieve bytes from the message self.byte_buffer.clear(); let union_content = &message.content.regions[embedded_heap_pos as usize]; debug_assert_eq!(union_content.len(), 1); let array_heap_pos = union_content[0].as_array(); let array_values = &message.content.regions[array_heap_pos as usize]; self.byte_buffer.reserve(array_values.len()); for value in array_values { self.byte_buffer.push(value.as_uint8()); } self.sync_state = SyncState::Putting; } else if tag_value == self.input_union_receive_tag_value { // Component requires a `recv` self.sync_state = SyncState::Getting; } else if tag_value == self.input_union_finish_tag_value { // Component requires us to end the sync round self.sync_state = SyncState::FinishSync; } else if tag_value == self.input_union_shutdown_tag_value { // Component wants to close the connection self.sync_state = SyncState::FinishSyncThenQuit; } else { unreachable!("got tag_value {}", tag_value) } return CompScheduling::Immediate; }, GetResult::NoMessage => { return CompScheduling::Sleep; }, GetResult::Error(location_and_message) => { component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); return CompScheduling::Immediate; } } }, SyncState::Putting => { // We're supposed to send a user-supplied message fully // over the socket. But we might end up blocking. In // that case the component goes to sleep until it is // polled. let socket = self.socket_state.get_socket(); while !self.byte_buffer.is_empty() { match socket.send(&self.byte_buffer) { Ok(bytes_sent) => { self.byte_buffer.drain(..bytes_sent); }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { return CompScheduling::Sleep; // wait until notified } else { todo!("handle socket.send error {:?}", err) } } } } // If here then we're done putting the data, we can // finish the sync round component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; }, SyncState::Getting => { // We're going to try and receive a single message. If // this causes us to end up blocking the component // goes to sleep until it is polled. const BUFFER_SIZE: usize = 1024; // TODO: Move to config let socket = self.socket_state.get_socket(); self.byte_buffer.resize(BUFFER_SIZE, 0); match socket.receive(&mut self.byte_buffer) { Ok(num_received) => { self.byte_buffer.resize(num_received, 0); let message_content = self.bytes_to_data_message_content(&self.byte_buffer); let send_result = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, message_content, sched_ctx, &mut self.consensus, comp_ctx); if let Err(location_and_message) = send_result { component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); return CompScheduling::Immediate; } else { let scheduling = send_result.unwrap(); self.sync_state = SyncState::AwaitingCmd; return scheduling; } }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { return CompScheduling::Sleep; // wait until polled } else { todo!("handle socket.receive error {:?}", err) } } } }, SyncState::FinishSync | SyncState::FinishSyncThenQuit => { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; }, } }, CompMode::BlockedGet => { // Entered when awaiting a new command debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); return CompScheduling::Sleep; }, CompMode::SyncEnd | CompMode::BlockedPut => return CompScheduling::Sleep, CompMode::StartExit => return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus), CompMode::BusyExit => return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx), CompMode::Exit => return component::default_handle_exit(&self.exec_state), } } } impl ComponentTcpClient { pub(crate) fn new(arguments: ValueGroup) -> Self { use std::net::{IpAddr, Ipv4Addr}; debug_assert_eq!(arguments.values.len(), 4); // Parsing arguments let ip_heap_pos = arguments.values[0].as_array(); let ip_elements = &arguments.regions[ip_heap_pos as usize]; if ip_elements.len() != 4 { todo!("friendly error reporting: ip contains 4 octects"); } let ip_address = IpAddr::V4(Ipv4Addr::new( ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), ip_elements[2].as_uint8(), ip_elements[3].as_uint8() )); let port = arguments.values[1].as_uint16(); let input_port = component::port_id_from_eval(arguments.values[2].as_input()); let output_port = component::port_id_from_eval(arguments.values[3].as_output()); let socket = SocketTcpClient::new(ip_address, port); if let Err(socket) = socket { todo!("friendly error reporting: failed to open socket (reason: {:?})", socket); } return Self{ socket_state: SocketState::Connected(socket.unwrap()), sync_state: SyncState::AwaitingCmd, poll_ticket: None, inbox_main: vec![None], inbox_backup: Vec::new(), input_union_send_tag_value: -1, input_union_receive_tag_value: -1, input_union_finish_tag_value: -1, input_union_shutdown_tag_value: -1, pdl_input_port_id: input_port, pdl_output_port_id: output_port, exec_state: CompExecState::new(), control: ControlLayer::default(), consensus: Consensus::new(), byte_buffer: Vec::new(), } } // Handles incoming data from the PDL side (hence, going into the socket) fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { if self.exec_state.mode.is_in_sync_block() { self.consensus.handle_incoming_data_message(comp_ctx, &message); } match component::default_handle_incoming_data_message( &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control ) { IncomingData::PlacedInSlot => {}, IncomingData::SlotFull(message) => { self.inbox_backup.push(message); } } } fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec) { debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id); debug_assert_eq!(message.content.values.len(), 1); if let Value::Array(array_pos) = message.content.values[0] { let region = &message.content.regions[array_pos as usize]; bytes.reserve(region.len()); for value in region { bytes.push(value.as_uint8()); } } else { unreachable!(); } } fn bytes_to_data_message_content(&self, buffer: &[u8]) -> ValueGroup { // Turn bytes into silly executor-style array let mut values = Vec::with_capacity(buffer.len()); for byte in buffer.iter().copied() { values.push(Value::UInt8(byte)); } // Put in a value group let mut value_group = ValueGroup::default(); value_group.regions.push(values); value_group.values.push(Value::Array(0)); return value_group; } }