diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 89642f966501369bc59216c2aea94d3a4f2a8d7a..64cf8f35d58c40467a2a7d157570f43f64a5f2b4 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -7,12 +7,27 @@ use super::component::{self, *}; use super::control_layer::*; use super::consensus::*; +use std::io::ErrorKind as IoErrorKind; + enum SocketState { Connected(SocketTcpClient), Error, } +impl SocketState { + fn get_socket(&self) -> &SocketTcpClient { + match self { + SocketState::Connected(v) => v, + SocketState::Error => unreachable!(), + } + } +} + +/// States from the point of view of the component that is connecting to this +/// TCP component (i.e. from the point of view of attempting to interface with +/// a socket). enum SyncState { + AwaitingCmd, Getting, Putting } @@ -20,16 +35,33 @@ enum SyncState { pub struct ComponentTcpClient { // Properties for the tcp socket socket_state: SocketState, + sync_state: SyncState, pending_recv: Vec, // on the input port pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket + input_union_send_tag_value: i64, + input_union_receive_tag_value: i64, + input_union_finish_tag_value: i64, // Generic component state exec_state: CompExecState, control: ControlLayer, consensus: Consensus, + // Temporary variables + byte_buffer: Vec, } impl Component for ComponentTcpClient { + fn on_creation(&mut self, sched_ctx: &SchedulerCtx) { + let pd = &sched_ctx.runtime.protocol; + let cmd_type = pd.find_type(b"std.internet", b"Cmd") + .expect("'Cmd' type in the 'std.internet' module") + .as_union(); + + self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap(); + self.input_union_receive_tag_value = cmd_type.get_variant_tag_value(b"Receive").unwrap(); + self.input_union_finish_tag_value = cmd_type.get_variant_tag_value(b"Finish").unwrap(); + } + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { self.handle_incoming_data_message(message); } @@ -56,25 +88,127 @@ impl Component for ComponentTcpClient { sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode)); match self.exec_state.mode { - CompMode::BlockedGet | CompMode::BlockedSelect => { - // impossible for this component. We always accept the input - // values, and we never perform an explicit select. + CompMode::BlockedSelect => { + // Not possible: we never enter this state unreachable!(); }, CompMode::NonSync => { // When in non-sync mode match &mut self.socket_state { - SocketState::Connected(socket) => { - if self.pending_tx + SocketState::Connected(_socket) => { + // Always move into the sync-state + self.sync_state = SyncState::AwaitingCmd; + self.consensus.notify_sync_start(comp_ctx); + self.exec_state.mode = CompMode::Sync; }, SocketState::Error => { + // Could potentially send an error message to the + // connected component. self.exec_state.mode = CompMode::StartExit; return Ok(CompScheduling::Immediate); } } }, CompMode::Sync => { + // When in sync mode: wait for a command to come in + match self.sync_state { + SyncState::AwaitingCmd => { + if let Some(message) = self.pending_recv.pop() { + if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) { + // Check which command we're supposed to execute. + let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); + if tag_value == self.input_union_send_tag_value { + // Retrieve bytes from the message + self.byte_buffer.clear(); + let union_content = &message.content.regions[embedded_heap_pos as usize]; + debug_assert_eq!(union_content.len(), 1); + let array_heap_pos = union_content[0].as_array(); + let array_values = &message.content.regions[array_heap_pos as usize]; + self.byte_buffer.reserve(array_values.len()); + for value in array_values { + self.byte_buffer.push(value.as_uint8()); + } + + self.sync_state = SyncState::Putting; + return Ok(CompScheduling::Immediate); + } else if tag_value == self.input_union_receive_tag_value { + // Component requires a `recv` + self.sync_state = SyncState::Getting; + return Ok(CompScheduling::Immediate); + } else if tag_value == self.input_union_finish_tag_value { + // Component requires us to end the sync round + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + } + } else { + todo!("handle sync failure due to message deadlock"); + return Ok(CompScheduling::Sleep); + } + } else { + self.exec_state.set_as_blocked_get(self.pdl_input_port_id); + return Ok(CompScheduling::Sleep); + } + }, + SyncState::Putting => { + // We're supposed to send a user-supplied message fully + // over the socket. But we might end up blocking. In + // that case the component goes to sleep until it is + // polled. + let socket = self.socket_state.get_socket(); + while !self.byte_buffer.is_empty() { + match socket.send(&self.byte_buffer) { + Ok(bytes_sent) => { + self.byte_buffer.drain(..bytes_sent); + }, + Err(err) => { + if err.kind() == IoErrorKind::WouldBlock { + return Ok(CompScheduling::Sleep); // wait until notified + } else { + todo!("handle socket.send error {:?}", err) + } + } + } + } + + // If here then we're done putting the data, we can + // finish the sync round + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + }, + SyncState::Getting => { + // We're going to try and receive a single message. If + // this causes us to end up blocking the component + // goes to sleep until it is polled. + const BUFFER_SIZE: usize = 1024; // TODO: Move to config + + let socket = self.socket_state.get_socket(); + debug_assert!(self.byte_buffer.is_empty()); + self.byte_buffer.resize(BUFFER_SIZE, 0); + match socket.receive(&mut self.byte_buffer) { + Ok(num_received) => { + self.byte_buffer.resize(num_received); + let message_content = self.bytes_to_data_message_content(&self.byte_buffer); + let port_handle = comp_ctx.get_port_handle(self.pdl_output_port_id); + let port_info = comp_ctx.get_port(port_handle); + let message = self.consensus.annotate_data_message(comp_ctx, port_info, message_content); + + }, + Err(err) => { + if err.kind() == IoErrorKind::WouldBlock { + return Ok(CompScheduling::Sleep); // wait until polled + } else { + todo!("handle socket.receive error {:?}", err); + } + } + } + }, + } + }, + CompMode::BlockedGet => { + // Entered when awaiting a new command + debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); + if self. }, CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), @@ -118,7 +252,9 @@ impl ComponentTcpClient { return Self{ socket_state: SocketState::Connected(socket.unwrap()), - pending_tx: Vec::new(), + input_union_send_tag_value: -1, + input_union_receive_tag_value: -1, + input_union_finish_tag_value: -1, pdl_input_port_id: input_port, pdl_output_port_id: output_port, exec_state: CompExecState::new(), @@ -131,7 +267,6 @@ impl ComponentTcpClient { fn handle_incoming_data_message(&mut self, message: DataMessage) { // Input message is an array of bytes (u8) self.pending_recv.push(message); - } fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec) { @@ -148,4 +283,19 @@ impl ComponentTcpClient { unreachable!(); } } + + fn bytes_to_data_message_content(&self, buffer: &[u8]) -> ValueGroup { + // Turn bytes into silly executor-style array + let mut values = Vec::with_capacity(buffer.len()); + for byte in buffer.iter().copied() { + values.push(Value::UInt8(byte)); + } + + // Put in a value group + let mut value_group = ValueGroup::default(); + value_group.regions.push(values); + value_group.values.push(Value::Array(0)); + + return value_group; + } } \ No newline at end of file