use crate::protocol::eval::{ValueGroup, Value, EvalError}; use crate::runtime2::*; use crate::runtime2::component::CompCtx; use crate::runtime2::stdlib::internet::*; use super::component::{self, *}; use super::control_layer::*; use super::consensus::*; use std::io::ErrorKind as IoErrorKind; enum SocketState { Connected(SocketTcpClient), Error, } impl SocketState { fn get_socket(&self) -> &SocketTcpClient { match self { SocketState::Connected(v) => v, SocketState::Error => unreachable!(), } } } /// States from the point of view of the component that is connecting to this /// TCP component (i.e. from the point of view of attempting to interface with /// a socket). enum SyncState { AwaitingCmd, Getting, Putting } pub struct ComponentTcpClient { // Properties for the tcp socket socket_state: SocketState, sync_state: SyncState, pending_recv: Vec, // on the input port pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket input_union_send_tag_value: i64, input_union_receive_tag_value: i64, input_union_finish_tag_value: i64, // Generic component state exec_state: CompExecState, control: ControlLayer, consensus: Consensus, // Temporary variables byte_buffer: Vec, } impl Component for ComponentTcpClient { fn on_creation(&mut self, sched_ctx: &SchedulerCtx) { let pd = &sched_ctx.runtime.protocol; let cmd_type = pd.find_type(b"std.internet", b"Cmd") .expect("'Cmd' type in the 'std.internet' module") .as_union(); self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap(); self.input_union_receive_tag_value = cmd_type.get_variant_tag_value(b"Receive").unwrap(); self.input_union_finish_tag_value = cmd_type.get_variant_tag_value(b"Finish").unwrap(); } fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { self.handle_incoming_data_message(message); } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { match mesage { Message::Data(message) => { self.handle_incoming_data_message(message); }, Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); }, Message::Control(message) => { component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx ); } } } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode)); match self.exec_state.mode { CompMode::BlockedSelect => { // Not possible: we never enter this state unreachable!(); }, CompMode::NonSync => { // When in non-sync mode match &mut self.socket_state { SocketState::Connected(_socket) => { // Always move into the sync-state self.sync_state = SyncState::AwaitingCmd; self.consensus.notify_sync_start(comp_ctx); self.exec_state.mode = CompMode::Sync; }, SocketState::Error => { // Could potentially send an error message to the // connected component. self.exec_state.mode = CompMode::StartExit; return Ok(CompScheduling::Immediate); } } }, CompMode::Sync => { // When in sync mode: wait for a command to come in match self.sync_state { SyncState::AwaitingCmd => { if let Some(message) = self.pending_recv.pop() { if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) { // Check which command we're supposed to execute. let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); if tag_value == self.input_union_send_tag_value { // Retrieve bytes from the message self.byte_buffer.clear(); let union_content = &message.content.regions[embedded_heap_pos as usize]; debug_assert_eq!(union_content.len(), 1); let array_heap_pos = union_content[0].as_array(); let array_values = &message.content.regions[array_heap_pos as usize]; self.byte_buffer.reserve(array_values.len()); for value in array_values { self.byte_buffer.push(value.as_uint8()); } self.sync_state = SyncState::Putting; return Ok(CompScheduling::Immediate); } else if tag_value == self.input_union_receive_tag_value { // Component requires a `recv` self.sync_state = SyncState::Getting; return Ok(CompScheduling::Immediate); } else if tag_value == self.input_union_finish_tag_value { // Component requires us to end the sync round let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); } } else { todo!("handle sync failure due to message deadlock"); return Ok(CompScheduling::Sleep); } } else { self.exec_state.set_as_blocked_get(self.pdl_input_port_id); return Ok(CompScheduling::Sleep); } }, SyncState::Putting => { // We're supposed to send a user-supplied message fully // over the socket. But we might end up blocking. In // that case the component goes to sleep until it is // polled. let socket = self.socket_state.get_socket(); while !self.byte_buffer.is_empty() { match socket.send(&self.byte_buffer) { Ok(bytes_sent) => { self.byte_buffer.drain(..bytes_sent); }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { return Ok(CompScheduling::Sleep); // wait until notified } else { todo!("handle socket.send error {:?}", err) } } } } // If here then we're done putting the data, we can // finish the sync round let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); }, SyncState::Getting => { // We're going to try and receive a single message. If // this causes us to end up blocking the component // goes to sleep until it is polled. const BUFFER_SIZE: usize = 1024; // TODO: Move to config let socket = self.socket_state.get_socket(); debug_assert!(self.byte_buffer.is_empty()); self.byte_buffer.resize(BUFFER_SIZE, 0); match socket.receive(&mut self.byte_buffer) { Ok(num_received) => { self.byte_buffer.resize(num_received); let message_content = self.bytes_to_data_message_content(&self.byte_buffer); let port_handle = comp_ctx.get_port_handle(self.pdl_output_port_id); let port_info = comp_ctx.get_port(port_handle); let message = self.consensus.annotate_data_message(comp_ctx, port_info, message_content); }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { return Ok(CompScheduling::Sleep); // wait until polled } else { todo!("handle socket.receive error {:?}", err); } } } }, } }, CompMode::BlockedGet => { // Entered when awaiting a new command debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); if self. }, CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), CompMode::StartExit => return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)), CompMode::BusyExit => return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)), CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), } return Ok(CompScheduling::Immediate); } } impl ComponentTcpClient { pub(crate) fn new(arguments: ValueGroup) -> Self { use std::net::{IpAddr, Ipv4Addr}; debug_assert_eq!(arguments.values.len(), 4); // Parsing arguments let ip_heap_pos = arguments.values[0].as_array(); let ip_elements = &arguments.regions[ip_heap_pos as usize]; if ip_elements.len() != 4 { todo!("friendly error reporting: ip contains 4 octects"); } let ip_address = IpAddr::V4(Ipv4Addr::new( ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), ip_elements[2].as_uint8(), ip_elements[3].as_uint8() )); let port = arguments.values[1].as_uint16(); let input_port = component::port_id_from_eval(arguments.values[2].as_input()); let output_port = component::port_id_from_eval(arguments.values[3].as_output()); let socket = SocketTcpClient::new(ip_address, port); if let Err(socket) = socket { todo!("friendly error reporting: failed to open socket {:?}", socket); } return Self{ socket_state: SocketState::Connected(socket.unwrap()), input_union_send_tag_value: -1, input_union_receive_tag_value: -1, input_union_finish_tag_value: -1, pdl_input_port_id: input_port, pdl_output_port_id: output_port, exec_state: CompExecState::new(), control: ControlLayer::default(), consensus: Consensus::new(), } } // Handles incoming data from the PDL side (hence, going into the socket) fn handle_incoming_data_message(&mut self, message: DataMessage) { // Input message is an array of bytes (u8) self.pending_recv.push(message); } fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec) { debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id); debug_assert_eq!(message.content.values.len(), 1); if let Value::Array(array_pos) = message.content.values[0] { let region = &message.content.regions[array_pos as usize]; bytes.reserve(region.len()); for value in region { bytes.push(value.as_uint8()); } } else { unreachable!(); } } fn bytes_to_data_message_content(&self, buffer: &[u8]) -> ValueGroup { // Turn bytes into silly executor-style array let mut values = Vec::with_capacity(buffer.len()); for byte in buffer.iter().copied() { values.push(Value::UInt8(byte)); } // Put in a value group let mut value_group = ValueGroup::default(); value_group.regions.push(values); value_group.values.push(Value::Array(0)); return value_group; } }