diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs new file mode 100644 index 0000000000000000000000000000000000000000..3b9c9ca77e42b135a396a2965dbfcba32f8b81cb --- /dev/null +++ b/src/runtime2/component/component_internet.rs @@ -0,0 +1,374 @@ +use crate::protocol::eval::{ValueGroup, Value, EvalError}; +use crate::runtime2::*; +use crate::runtime2::component::{CompCtx, CompId}; +use crate::runtime2::stdlib::internet::*; +use crate::runtime2::poll::*; + +use super::component::{self, *}; +use super::control_layer::*; +use super::consensus::*; + +use std::io::ErrorKind as IoErrorKind; + +enum SocketState { + Connected(SocketTcpClient), + Error, +} + +impl SocketState { + fn get_socket(&self) -> &SocketTcpClient { + match self { + SocketState::Connected(v) => v, + SocketState::Error => unreachable!(), + } + } +} + +/// States from the point of view of the component that is connecting to this +/// TCP component (i.e. from the point of view of attempting to interface with +/// a socket). +#[derive(PartialEq, Debug)] +enum SyncState { + AwaitingCmd, + Getting, + Putting, + FinishSync, + FinishSyncThenQuit, +} + +pub struct ComponentTcpClient { + // Properties for the tcp socket + socket_state: SocketState, + sync_state: SyncState, + poll_ticket: Option, + inbox_main: Option, + inbox_backup: Vec, + pdl_input_port_id: PortId, // input from PDL, so transmitted over socket + pdl_output_port_id: PortId, // output towards PDL, so received over socket + input_union_send_tag_value: i64, + input_union_receive_tag_value: i64, + input_union_finish_tag_value: i64, + input_union_shutdown_tag_value: i64, + // Generic component state + exec_state: CompExecState, + control: ControlLayer, + consensus: Consensus, + // Temporary variables + byte_buffer: Vec, +} + +impl Component for ComponentTcpClient { + fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) { + // Retrieve type information for messages we're going to receive + let pd = &sched_ctx.runtime.protocol; + let cmd_type = pd.find_type(b"std.internet", b"Cmd") + .expect("'Cmd' type in the 'std.internet' module"); + let cmd_type = cmd_type + .as_union(); + + self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap(); + self.input_union_receive_tag_value = cmd_type.get_variant_tag_value(b"Receive").unwrap(); + self.input_union_finish_tag_value = cmd_type.get_variant_tag_value(b"Finish").unwrap(); + self.input_union_shutdown_tag_value = cmd_type.get_variant_tag_value(b"Shutdown").unwrap(); + + // Register socket for async events + if let SocketState::Connected(socket) = &self.socket_state { + let self_handle = sched_ctx.runtime.get_component_public(id); + let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, true) + .expect("registering tcp component"); + + debug_assert!(self.poll_ticket.is_none()); + self.poll_ticket = Some(poll_ticket); + } + } + + fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) { + if let Some(poll_ticket) = self.poll_ticket.take() { + sched_ctx.polling.unregister(poll_ticket) + .expect("unregistering tcp component"); + } + } + + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { + if self.inbox_main.is_none() { + self.inbox_main = Some(message); + } else { + self.inbox_backup.push(message); + } + } + + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + match message { + Message::Data(message) => { + self.handle_incoming_data_message(sched_ctx, comp_ctx, message); + }, + Message::Sync(message) => { + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + }, + Message::Control(message) => { + component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx + ); + }, + Message::Poll => { + sched_ctx.log("Received polling event"); + }, + } + } + + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); + + match self.exec_state.mode { + CompMode::BlockedSelect => { + // Not possible: we never enter this state + unreachable!(); + }, + CompMode::NonSync => { + // When in non-sync mode + match &mut self.socket_state { + SocketState::Connected(_socket) => { + if self.sync_state == SyncState::FinishSyncThenQuit { + // Previous request was to let the component shut down + self.exec_state.mode = CompMode::StartExit; + } else { + // Reset for a new request + self.sync_state = SyncState::AwaitingCmd; + self.consensus.notify_sync_start(comp_ctx); + self.exec_state.mode = CompMode::Sync; + } + return Ok(CompScheduling::Immediate); + }, + SocketState::Error => { + // Could potentially send an error message to the + // connected component. + self.exec_state.mode = CompMode::StartExit; + return Ok(CompScheduling::Immediate); + } + } + }, + CompMode::Sync => { + // When in sync mode: wait for a command to come in + match self.sync_state { + SyncState::AwaitingCmd => { + if let Some(message) = &self.inbox_main { + self.consensus.handle_incoming_data_message(comp_ctx, &message); + if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) { + // Check which command we're supposed to execute. + let message = self.inbox_main.take().unwrap(); + let target_port_id = message.data_header.target_port; + component::default_handle_received_data_message( + target_port_id, &mut self.inbox_main, &mut self.inbox_backup, + comp_ctx, sched_ctx, &mut self.control + ); + + let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); + if tag_value == self.input_union_send_tag_value { + // Retrieve bytes from the message + self.byte_buffer.clear(); + let union_content = &message.content.regions[embedded_heap_pos as usize]; + debug_assert_eq!(union_content.len(), 1); + let array_heap_pos = union_content[0].as_array(); + let array_values = &message.content.regions[array_heap_pos as usize]; + self.byte_buffer.reserve(array_values.len()); + for value in array_values { + self.byte_buffer.push(value.as_uint8()); + } + + self.sync_state = SyncState::Putting; + return Ok(CompScheduling::Immediate); + } else if tag_value == self.input_union_receive_tag_value { + // Component requires a `recv` + self.sync_state = SyncState::Getting; + return Ok(CompScheduling::Immediate); + } else if tag_value == self.input_union_finish_tag_value { + // Component requires us to end the sync round + self.sync_state = SyncState::FinishSync; + return Ok(CompScheduling::Immediate); + } else if tag_value == self.input_union_shutdown_tag_value { + // Component wants to close the connection + self.sync_state = SyncState::FinishSyncThenQuit; + return Ok(CompScheduling::Immediate); + } else { + unreachable!("got tag_value {}", tag_value) + } + } else { + todo!("handle sync failure due to message deadlock"); + return Ok(CompScheduling::Sleep); + } + } else { + self.exec_state.set_as_blocked_get(self.pdl_input_port_id); + return Ok(CompScheduling::Sleep); + } + }, + SyncState::Putting => { + // We're supposed to send a user-supplied message fully + // over the socket. But we might end up blocking. In + // that case the component goes to sleep until it is + // polled. + let socket = self.socket_state.get_socket(); + while !self.byte_buffer.is_empty() { + match socket.send(&self.byte_buffer) { + Ok(bytes_sent) => { + self.byte_buffer.drain(..bytes_sent); + }, + Err(err) => { + if err.kind() == IoErrorKind::WouldBlock { + return Ok(CompScheduling::Sleep); // wait until notified + } else { + todo!("handle socket.send error {:?}", err) + } + } + } + } + + // If here then we're done putting the data, we can + // finish the sync round + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.exec_state.mode = CompMode::SyncEnd; + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + return Ok(CompScheduling::Immediate); + }, + SyncState::Getting => { + // We're going to try and receive a single message. If + // this causes us to end up blocking the component + // goes to sleep until it is polled. + const BUFFER_SIZE: usize = 1024; // TODO: Move to config + + let socket = self.socket_state.get_socket(); + self.byte_buffer.resize(BUFFER_SIZE, 0); + match socket.receive(&mut self.byte_buffer) { + Ok(num_received) => { + self.byte_buffer.resize(num_received, 0); + let message_content = self.bytes_to_data_message_content(&self.byte_buffer); + let scheduling = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, message_content, sched_ctx, &mut self.consensus, comp_ctx); + self.sync_state = SyncState::AwaitingCmd; + return Ok(scheduling); + }, + Err(err) => { + if err.kind() == IoErrorKind::WouldBlock { + return Ok(CompScheduling::Sleep); // wait until polled + } else { + todo!("handle socket.receive error {:?}", err) + } + } + } + }, + SyncState::FinishSync | SyncState::FinishSyncThenQuit => { + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.exec_state.mode = CompMode::SyncEnd; + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + return Ok(CompScheduling::Requeue); + }, + } + }, + CompMode::BlockedGet => { + // Entered when awaiting a new command + debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); + return Ok(CompScheduling::Sleep); + }, + CompMode::SyncEnd | CompMode::BlockedPut => + return Ok(CompScheduling::Sleep), + CompMode::StartExit => + return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)), + CompMode::BusyExit => + return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)), + CompMode::Exit => + return Ok(component::default_handle_exit(&self.exec_state)), + } + } +} + +impl ComponentTcpClient { + pub(crate) fn new(arguments: ValueGroup) -> Self { + use std::net::{IpAddr, Ipv4Addr}; + + debug_assert_eq!(arguments.values.len(), 4); + + // Parsing arguments + let ip_heap_pos = arguments.values[0].as_array(); + let ip_elements = &arguments.regions[ip_heap_pos as usize]; + if ip_elements.len() != 4 { + todo!("friendly error reporting: ip contains 4 octects"); + } + let ip_address = IpAddr::V4(Ipv4Addr::new( + ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), + ip_elements[2].as_uint8(), ip_elements[3].as_uint8() + )); + + let port = arguments.values[1].as_uint16(); + let input_port = component::port_id_from_eval(arguments.values[2].as_input()); + let output_port = component::port_id_from_eval(arguments.values[3].as_output()); + + let socket = SocketTcpClient::new(ip_address, port); + if let Err(socket) = socket { + todo!("friendly error reporting: failed to open socket (reason: {:?})", socket); + } + + return Self{ + socket_state: SocketState::Connected(socket.unwrap()), + sync_state: SyncState::AwaitingCmd, + poll_ticket: None, + inbox_main: None, + inbox_backup: Vec::new(), + input_union_send_tag_value: -1, + input_union_receive_tag_value: -1, + input_union_finish_tag_value: -1, + input_union_shutdown_tag_value: -1, + pdl_input_port_id: input_port, + pdl_output_port_id: output_port, + exec_state: CompExecState::new(), + control: ControlLayer::default(), + consensus: Consensus::new(), + byte_buffer: Vec::new(), + } + } + + // Handles incoming data from the PDL side (hence, going into the socket) + fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + if self.exec_state.mode.is_in_sync_block() { + self.consensus.handle_incoming_data_message(comp_ctx, &message); + } + + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => {}, + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); + } + } + } + + fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec) { + debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id); + debug_assert_eq!(message.content.values.len(), 1); + + if let Value::Array(array_pos) = message.content.values[0] { + let region = &message.content.regions[array_pos as usize]; + bytes.reserve(region.len()); + for value in region { + bytes.push(value.as_uint8()); + } + } else { + unreachable!(); + } + } + + fn bytes_to_data_message_content(&self, buffer: &[u8]) -> ValueGroup { + // Turn bytes into silly executor-style array + let mut values = Vec::with_capacity(buffer.len()); + for byte in buffer.iter().copied() { + values.push(Value::UInt8(byte)); + } + + // Put in a value group + let mut value_group = ValueGroup::default(); + value_group.regions.push(values); + value_group.values.push(Value::Array(0)); + + return value_group; + } +} \ No newline at end of file