use crate::protocol::eval::{ValueGroup, Value, EvalError}; use crate::runtime2::*; use crate::runtime2::component::CompCtx; use crate::runtime2::stdlib::internet::*; use super::component::{self, *}; use super::control_layer::*; use super::consensus::*; enum SocketState { Connected(SocketTcpClient), Error, } enum SyncState { Getting, Putting } pub struct ComponentTcpClient { // Properties for the tcp socket socket_state: SocketState, pending_recv: Vec, // on the input port pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket // Generic component state exec_state: CompExecState, control: ControlLayer, consensus: Consensus, } impl Component for ComponentTcpClient { fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { self.handle_incoming_data_message(message); } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { match mesage { Message::Data(message) => { self.handle_incoming_data_message(message); }, Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); }, Message::Control(message) => { component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx ); } } } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode)); match self.exec_state.mode { CompMode::BlockedGet | CompMode::BlockedSelect => { // impossible for this component. We always accept the input // values, and we never perform an explicit select. unreachable!(); }, CompMode::NonSync => { // When in non-sync mode match &mut self.socket_state { SocketState::Connected(socket) => { if self.pending_tx }, SocketState::Error => { self.exec_state.mode = CompMode::StartExit; return Ok(CompScheduling::Immediate); } } }, CompMode::Sync => { }, CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), CompMode::StartExit => return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)), CompMode::BusyExit => return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)), CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), } return Ok(CompScheduling::Immediate); } } impl ComponentTcpClient { pub(crate) fn new(arguments: ValueGroup) -> Self { use std::net::{IpAddr, Ipv4Addr}; debug_assert_eq!(arguments.values.len(), 4); // Parsing arguments let ip_heap_pos = arguments.values[0].as_array(); let ip_elements = &arguments.regions[ip_heap_pos as usize]; if ip_elements.len() != 4 { todo!("friendly error reporting: ip contains 4 octects"); } let ip_address = IpAddr::V4(Ipv4Addr::new( ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), ip_elements[2].as_uint8(), ip_elements[3].as_uint8() )); let port = arguments.values[1].as_uint16(); let input_port = component::port_id_from_eval(arguments.values[2].as_input()); let output_port = component::port_id_from_eval(arguments.values[3].as_output()); let socket = SocketTcpClient::new(ip_address, port); if let Err(socket) = socket { todo!("friendly error reporting: failed to open socket {:?}", socket); } return Self{ socket_state: SocketState::Connected(socket.unwrap()), pending_tx: Vec::new(), pdl_input_port_id: input_port, pdl_output_port_id: output_port, exec_state: CompExecState::new(), control: ControlLayer::default(), consensus: Consensus::new(), } } // Handles incoming data from the PDL side (hence, going into the socket) fn handle_incoming_data_message(&mut self, message: DataMessage) { // Input message is an array of bytes (u8) self.pending_recv.push(message); } fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec) { debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id); debug_assert_eq!(message.content.values.len(), 1); if let Value::Array(array_pos) = message.content.values[0] { let region = &message.content.regions[array_pos as usize]; bytes.reserve(region.len()); for value in region { bytes.push(value.as_uint8()); } } else { unreachable!(); } } }