diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 3b9c9ca77e42b135a396a2965dbfcba32f8b81cb..c6fa23e95c3e2a1622231c5f38d284391ae3dfc7 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -1,6 +1,6 @@ -use crate::protocol::eval::{ValueGroup, Value, EvalError}; +use crate::protocol::eval::{ValueGroup, Value}; use crate::runtime2::*; -use crate::runtime2::component::{CompCtx, CompId}; +use crate::runtime2::component::{CompCtx, CompId, PortInstruction}; use crate::runtime2::stdlib::internet::*; use crate::runtime2::poll::*; @@ -41,8 +41,8 @@ pub struct ComponentTcpClient { socket_state: SocketState, sync_state: SyncState, poll_ticket: Option, - inbox_main: Option, - inbox_backup: Vec, + inbox_main: InboxMain, + inbox_backup: InboxBackup, pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket input_union_send_tag_value: i64, @@ -90,8 +90,9 @@ impl Component for ComponentTcpClient { } fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { - if self.inbox_main.is_none() { - self.inbox_main = Some(message); + let slot = &mut self.inbox_main[0]; + if slot.is_none() { + *slot = Some(message); } else { self.inbox_backup.push(message); } @@ -104,22 +105,24 @@ impl Component for ComponentTcpClient { }, Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); }, Message::Control(message) => { - component::default_handle_control_message( + if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx - ); + ) { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + } }, Message::Poll => { - sched_ctx.log("Received polling event"); + sched_ctx.info("Received polling event"); }, } } - fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { - sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { + sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); match self.exec_state.mode { CompMode::BlockedSelect => { @@ -132,20 +135,21 @@ impl Component for ComponentTcpClient { SocketState::Connected(_socket) => { if self.sync_state == SyncState::FinishSyncThenQuit { // Previous request was to let the component shut down - self.exec_state.mode = CompMode::StartExit; + self.exec_state.set_as_start_exit(ExitReason::Termination); } else { // Reset for a new request self.sync_state = SyncState::AwaitingCmd; - self.consensus.notify_sync_start(comp_ctx); - self.exec_state.mode = CompMode::Sync; + component::default_handle_sync_start( + &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus + ); } - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; }, SocketState::Error => { // Could potentially send an error message to the // connected component. - self.exec_state.mode = CompMode::StartExit; - return Ok(CompScheduling::Immediate); + self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync); + return CompScheduling::Immediate; } } }, @@ -153,17 +157,12 @@ impl Component for ComponentTcpClient { // When in sync mode: wait for a command to come in match self.sync_state { SyncState::AwaitingCmd => { - if let Some(message) = &self.inbox_main { - self.consensus.handle_incoming_data_message(comp_ctx, &message); - if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) { - // Check which command we're supposed to execute. - let message = self.inbox_main.take().unwrap(); - let target_port_id = message.data_header.target_port; - component::default_handle_received_data_message( - target_port_id, &mut self.inbox_main, &mut self.inbox_backup, - comp_ctx, sched_ctx, &mut self.control - ); - + match component::default_attempt_get( + &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource, + &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, + &mut self.control, &mut self.consensus + ) { + GetResult::Received(message) => { let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); if tag_value == self.input_union_send_tag_value { // Retrieve bytes from the message @@ -178,29 +177,28 @@ impl Component for ComponentTcpClient { } self.sync_state = SyncState::Putting; - return Ok(CompScheduling::Immediate); } else if tag_value == self.input_union_receive_tag_value { // Component requires a `recv` self.sync_state = SyncState::Getting; - return Ok(CompScheduling::Immediate); } else if tag_value == self.input_union_finish_tag_value { // Component requires us to end the sync round self.sync_state = SyncState::FinishSync; - return Ok(CompScheduling::Immediate); } else if tag_value == self.input_union_shutdown_tag_value { // Component wants to close the connection self.sync_state = SyncState::FinishSyncThenQuit; - return Ok(CompScheduling::Immediate); } else { unreachable!("got tag_value {}", tag_value) } - } else { - todo!("handle sync failure due to message deadlock"); - return Ok(CompScheduling::Sleep); + + return CompScheduling::Immediate; + }, + GetResult::NoMessage => { + return CompScheduling::Sleep; + }, + GetResult::Error(location_and_message) => { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate; } - } else { - self.exec_state.set_as_blocked_get(self.pdl_input_port_id); - return Ok(CompScheduling::Sleep); } }, SyncState::Putting => { @@ -216,7 +214,7 @@ impl Component for ComponentTcpClient { }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { - return Ok(CompScheduling::Sleep); // wait until notified + return CompScheduling::Sleep; // wait until notified } else { todo!("handle socket.send error {:?}", err) } @@ -226,10 +224,8 @@ impl Component for ComponentTcpClient { // If here then we're done putting the data, we can // finish the sync round - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); - self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); - return Ok(CompScheduling::Immediate); + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + return CompScheduling::Requeue; }, SyncState::Getting => { // We're going to try and receive a single message. If @@ -243,13 +239,19 @@ impl Component for ComponentTcpClient { Ok(num_received) => { self.byte_buffer.resize(num_received, 0); let message_content = self.bytes_to_data_message_content(&self.byte_buffer); - let scheduling = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, message_content, sched_ctx, &mut self.consensus, comp_ctx); - self.sync_state = SyncState::AwaitingCmd; - return Ok(scheduling); + let send_result = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, message_content, sched_ctx, &mut self.consensus, comp_ctx); + if let Err(location_and_message) = send_result { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate; + } else { + let scheduling = send_result.unwrap(); + self.sync_state = SyncState::AwaitingCmd; + return scheduling; + } }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { - return Ok(CompScheduling::Sleep); // wait until polled + return CompScheduling::Sleep; // wait until polled } else { todo!("handle socket.receive error {:?}", err) } @@ -257,26 +259,24 @@ impl Component for ComponentTcpClient { } }, SyncState::FinishSync | SyncState::FinishSyncThenQuit => { - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); - self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); - return Ok(CompScheduling::Requeue); + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + return CompScheduling::Requeue; }, } }, CompMode::BlockedGet => { // Entered when awaiting a new command debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; }, CompMode::SyncEnd | CompMode::BlockedPut => - return Ok(CompScheduling::Sleep), + return CompScheduling::Sleep, CompMode::StartExit => - return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)), + return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus), CompMode::BusyExit => - return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)), + return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx), CompMode::Exit => - return Ok(component::default_handle_exit(&self.exec_state)), + return component::default_handle_exit(&self.exec_state), } } } @@ -311,7 +311,7 @@ impl ComponentTcpClient { socket_state: SocketState::Connected(socket.unwrap()), sync_state: SyncState::AwaitingCmd, poll_ticket: None, - inbox_main: None, + inbox_main: vec![None], inbox_backup: Vec::new(), input_union_send_tag_value: -1, input_union_receive_tag_value: -1,