diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 710de32baa0fc6e0f1c11e39b7b6bf67bf87b200..3b9c9ca77e42b135a396a2965dbfcba32f8b81cb 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -33,6 +33,7 @@ enum SyncState { Getting, Putting, FinishSync, + FinishSyncThenQuit, } pub struct ComponentTcpClient { @@ -81,6 +82,13 @@ impl Component for ComponentTcpClient { } } + fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) { + if let Some(poll_ticket) = self.poll_ticket.take() { + sched_ctx.polling.unregister(poll_ticket) + .expect("unregistering tcp component"); + } + } + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { if self.inbox_main.is_none() { self.inbox_main = Some(message); @@ -111,7 +119,7 @@ impl Component for ComponentTcpClient { } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { - sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode)); + sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); match self.exec_state.mode { CompMode::BlockedSelect => { @@ -122,10 +130,16 @@ impl Component for ComponentTcpClient { // When in non-sync mode match &mut self.socket_state { SocketState::Connected(_socket) => { - // Always move into the sync-state - self.sync_state = SyncState::AwaitingCmd; - self.consensus.notify_sync_start(comp_ctx); - self.exec_state.mode = CompMode::Sync; + if self.sync_state == SyncState::FinishSyncThenQuit { + // Previous request was to let the component shut down + self.exec_state.mode = CompMode::StartExit; + } else { + // Reset for a new request + self.sync_state = SyncState::AwaitingCmd; + self.consensus.notify_sync_start(comp_ctx); + self.exec_state.mode = CompMode::Sync; + } + return Ok(CompScheduling::Immediate); }, SocketState::Error => { // Could potentially send an error message to the @@ -139,9 +153,17 @@ impl Component for ComponentTcpClient { // When in sync mode: wait for a command to come in match self.sync_state { SyncState::AwaitingCmd => { - if let Some(message) = self.inbox_backup.pop() { + if let Some(message) = &self.inbox_main { + self.consensus.handle_incoming_data_message(comp_ctx, &message); if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) { // Check which command we're supposed to execute. + let message = self.inbox_main.take().unwrap(); + let target_port_id = message.data_header.target_port; + component::default_handle_received_data_message( + target_port_id, &mut self.inbox_main, &mut self.inbox_backup, + comp_ctx, sched_ctx, &mut self.control + ); + let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); if tag_value == self.input_union_send_tag_value { // Retrieve bytes from the message @@ -163,11 +185,14 @@ impl Component for ComponentTcpClient { return Ok(CompScheduling::Immediate); } else if tag_value == self.input_union_finish_tag_value { // Component requires us to end the sync round - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + self.sync_state = SyncState::FinishSync; + return Ok(CompScheduling::Immediate); } else if tag_value == self.input_union_shutdown_tag_value { // Component wants to close the connection - todo!("implement clean shutdown, don't forget to unregister to poll ticket"); + self.sync_state = SyncState::FinishSyncThenQuit; + return Ok(CompScheduling::Immediate); + } else { + unreachable!("got tag_value {}", tag_value) } } else { todo!("handle sync failure due to message deadlock"); @@ -202,7 +227,9 @@ impl Component for ComponentTcpClient { // If here then we're done putting the data, we can // finish the sync round let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.exec_state.mode = CompMode::SyncEnd; component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + return Ok(CompScheduling::Immediate); }, SyncState::Getting => { // We're going to try and receive a single message. If @@ -211,31 +238,30 @@ impl Component for ComponentTcpClient { const BUFFER_SIZE: usize = 1024; // TODO: Move to config let socket = self.socket_state.get_socket(); - debug_assert!(self.byte_buffer.is_empty()); self.byte_buffer.resize(BUFFER_SIZE, 0); match socket.receive(&mut self.byte_buffer) { Ok(num_received) => { self.byte_buffer.resize(num_received, 0); let message_content = self.bytes_to_data_message_content(&self.byte_buffer); let scheduling = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, message_content, sched_ctx, &mut self.consensus, comp_ctx); - self.sync_state = SyncState::FinishSync; + self.sync_state = SyncState::AwaitingCmd; return Ok(scheduling); }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { return Ok(CompScheduling::Sleep); // wait until polled } else { - todo!("handle socket.receive error {:?}", err); + todo!("handle socket.receive error {:?}", err) } } } }, - SyncState::FinishSync => { + SyncState::FinishSync | SyncState::FinishSyncThenQuit => { let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); return Ok(CompScheduling::Requeue); - } + }, } }, CompMode::BlockedGet => { @@ -252,8 +278,6 @@ impl Component for ComponentTcpClient { CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), } - - return Ok(CompScheduling::Immediate); } } @@ -280,7 +304,7 @@ impl ComponentTcpClient { let socket = SocketTcpClient::new(ip_address, port); if let Err(socket) = socket { - todo!("friendly error reporting: failed to open socket {:?}", socket); + todo!("friendly error reporting: failed to open socket (reason: {:?})", socket); } return Self{