diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs new file mode 100644 index 0000000000000000000000000000000000000000..89642f966501369bc59216c2aea94d3a4f2a8d7a --- /dev/null +++ b/src/runtime2/component/component_internet.rs @@ -0,0 +1,151 @@ +use crate::protocol::eval::{ValueGroup, Value, EvalError}; +use crate::runtime2::*; +use crate::runtime2::component::CompCtx; +use crate::runtime2::stdlib::internet::*; + +use super::component::{self, *}; +use super::control_layer::*; +use super::consensus::*; + +enum SocketState { + Connected(SocketTcpClient), + Error, +} + +enum SyncState { + Getting, + Putting +} + +pub struct ComponentTcpClient { + // Properties for the tcp socket + socket_state: SocketState, + pending_recv: Vec, // on the input port + pdl_input_port_id: PortId, // input from PDL, so transmitted over socket + pdl_output_port_id: PortId, // output towards PDL, so received over socket + // Generic component state + exec_state: CompExecState, + control: ControlLayer, + consensus: Consensus, +} + +impl Component for ComponentTcpClient { + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { + self.handle_incoming_data_message(message); + } + + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + match mesage { + Message::Data(message) => { + self.handle_incoming_data_message(message); + }, + Message::Sync(message) => { + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + }, + Message::Control(message) => { + component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx + ); + } + } + } + + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode)); + + match self.exec_state.mode { + CompMode::BlockedGet | CompMode::BlockedSelect => { + // impossible for this component. We always accept the input + // values, and we never perform an explicit select. + unreachable!(); + }, + CompMode::NonSync => { + // When in non-sync mode + match &mut self.socket_state { + SocketState::Connected(socket) => { + if self.pending_tx + }, + SocketState::Error => { + self.exec_state.mode = CompMode::StartExit; + return Ok(CompScheduling::Immediate); + } + } + }, + CompMode::Sync => { + + }, + CompMode::SyncEnd | CompMode::BlockedPut => + return Ok(CompScheduling::Sleep), + CompMode::StartExit => + return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)), + CompMode::BusyExit => + return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)), + CompMode::Exit => + return Ok(component::default_handle_exit(&self.exec_state)), + } + + return Ok(CompScheduling::Immediate); + } +} + +impl ComponentTcpClient { + pub(crate) fn new(arguments: ValueGroup) -> Self { + use std::net::{IpAddr, Ipv4Addr}; + + debug_assert_eq!(arguments.values.len(), 4); + + // Parsing arguments + let ip_heap_pos = arguments.values[0].as_array(); + let ip_elements = &arguments.regions[ip_heap_pos as usize]; + if ip_elements.len() != 4 { + todo!("friendly error reporting: ip contains 4 octects"); + } + let ip_address = IpAddr::V4(Ipv4Addr::new( + ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), + ip_elements[2].as_uint8(), ip_elements[3].as_uint8() + )); + + let port = arguments.values[1].as_uint16(); + let input_port = component::port_id_from_eval(arguments.values[2].as_input()); + let output_port = component::port_id_from_eval(arguments.values[3].as_output()); + + let socket = SocketTcpClient::new(ip_address, port); + if let Err(socket) = socket { + todo!("friendly error reporting: failed to open socket {:?}", socket); + } + + return Self{ + socket_state: SocketState::Connected(socket.unwrap()), + pending_tx: Vec::new(), + pdl_input_port_id: input_port, + pdl_output_port_id: output_port, + exec_state: CompExecState::new(), + control: ControlLayer::default(), + consensus: Consensus::new(), + } + } + + // Handles incoming data from the PDL side (hence, going into the socket) + fn handle_incoming_data_message(&mut self, message: DataMessage) { + // Input message is an array of bytes (u8) + self.pending_recv.push(message); + + } + + fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec) { + debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id); + debug_assert_eq!(message.content.values.len(), 1); + + if let Value::Array(array_pos) = message.content.values[0] { + let region = &message.content.regions[array_pos as usize]; + bytes.reserve(region.len()); + for value in region { + bytes.push(value.as_uint8()); + } + } else { + unreachable!(); + } + } +} \ No newline at end of file