diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index cd80e3b817f8818688e722288db1721673a55d6c..9a9ea3d99d1f8737734c2680d40beb4bbb6fd797 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -41,8 +41,8 @@ pub struct ComponentTcpClient { socket_state: SocketState, sync_state: SyncState, poll_ticket: Option, - inbox_main: Option, - inbox_backup: Vec, + inbox_main: InboxMain, + inbox_backup: InboxBackup, pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket input_union_send_tag_value: i64, @@ -90,8 +90,9 @@ impl Component for ComponentTcpClient { } fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { - if self.inbox_main.is_none() { - self.inbox_main = Some(message); + let slot = &mut self.inbox_main[0]; + if slot.is_none() { + *slot = Some(message); } else { self.inbox_backup.push(message); } @@ -138,8 +139,9 @@ impl Component for ComponentTcpClient { } else { // Reset for a new request self.sync_state = SyncState::AwaitingCmd; - self.consensus.notify_sync_start(comp_ctx); - self.exec_state.mode = CompMode::Sync; + component::default_handle_sync_start( + &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus + ); } return CompScheduling::Immediate; }, @@ -155,62 +157,48 @@ impl Component for ComponentTcpClient { // When in sync mode: wait for a command to come in match self.sync_state { SyncState::AwaitingCmd => { - if let Some(message) = &self.inbox_main { - self.consensus.handle_incoming_data_message(comp_ctx, &message); - if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) { - // Check which command we're supposed to execute. - let message = self.inbox_main.take().unwrap(); - let target_port_id = message.data_header.target_port; - let receive_result = component::default_handle_received_data_message( - target_port_id, PortInstruction::NoSource, - &mut self.inbox_main, &mut self.inbox_backup, - comp_ctx, sched_ctx, &mut self.control - ); - - if let Err(location_and_message) = receive_result { - component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); - return CompScheduling::Immediate; - } else { - let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); - if tag_value == self.input_union_send_tag_value { - // Retrieve bytes from the message - self.byte_buffer.clear(); - let union_content = &message.content.regions[embedded_heap_pos as usize]; - debug_assert_eq!(union_content.len(), 1); - let array_heap_pos = union_content[0].as_array(); - let array_values = &message.content.regions[array_heap_pos as usize]; - self.byte_buffer.reserve(array_values.len()); - for value in array_values { - self.byte_buffer.push(value.as_uint8()); - } - - self.sync_state = SyncState::Putting; - return CompScheduling::Immediate; - } else if tag_value == self.input_union_receive_tag_value { - // Component requires a `recv` - self.sync_state = SyncState::Getting; - return CompScheduling::Immediate; - } else if tag_value == self.input_union_finish_tag_value { - // Component requires us to end the sync round - self.sync_state = SyncState::FinishSync; - return CompScheduling::Immediate; - } else if tag_value == self.input_union_shutdown_tag_value { - // Component wants to close the connection - self.sync_state = SyncState::FinishSyncThenQuit; - return CompScheduling::Immediate; - } else { - unreachable!("got tag_value {}", tag_value) + match component::default_attempt_get( + &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource, + &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, + &mut self.control, &mut self.consensus + ) { + GetResult::Received(message) => { + let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); + if tag_value == self.input_union_send_tag_value { + // Retrieve bytes from the message + self.byte_buffer.clear(); + let union_content = &message.content.regions[embedded_heap_pos as usize]; + debug_assert_eq!(union_content.len(), 1); + let array_heap_pos = union_content[0].as_array(); + let array_values = &message.content.regions[array_heap_pos as usize]; + self.byte_buffer.reserve(array_values.len()); + for value in array_values { + self.byte_buffer.push(value.as_uint8()); } + + self.sync_state = SyncState::Putting; + } else if tag_value == self.input_union_receive_tag_value { + // Component requires a `recv` + self.sync_state = SyncState::Getting; + } else if tag_value == self.input_union_finish_tag_value { + // Component requires us to end the sync round + self.sync_state = SyncState::FinishSync; + } else if tag_value == self.input_union_shutdown_tag_value { + // Component wants to close the connection + self.sync_state = SyncState::FinishSyncThenQuit; + } else { + unreachable!("got tag_value {}", tag_value) } - } else { - todo!("handle sync failure due to message deadlock"); + + return CompScheduling::Immediate; + }, + GetResult::NoMessage => { return CompScheduling::Sleep; + }, + GetResult::Error(location_and_message) => { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate; } - } else { - let port_handle = comp_ctx.get_port_handle(self.pdl_input_port_id); - comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::NoSource; - self.exec_state.set_as_blocked_get(self.pdl_input_port_id); - return CompScheduling::Sleep; } }, SyncState::Putting => { @@ -236,10 +224,8 @@ impl Component for ComponentTcpClient { // If here then we're done putting the data, we can // finish the sync round - let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); - self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); - return CompScheduling::Immediate; + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + return CompScheduling::Requeue; }, SyncState::Getting => { // We're going to try and receive a single message. If @@ -273,9 +259,7 @@ impl Component for ComponentTcpClient { } }, SyncState::FinishSync | SyncState::FinishSyncThenQuit => { - let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); - self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; }, } @@ -327,7 +311,7 @@ impl ComponentTcpClient { socket_state: SocketState::Connected(socket.unwrap()), sync_state: SyncState::AwaitingCmd, poll_ticket: None, - inbox_main: None, + inbox_main: vec![None], inbox_backup: Vec::new(), input_union_send_tag_value: -1, input_union_receive_tag_value: -1,