diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 3b9c9ca77e42b135a396a2965dbfcba32f8b81cb..e9e41ac593a5b19ee65ba52fb9d25647a1a9fe4c 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -1,6 +1,6 @@ -use crate::protocol::eval::{ValueGroup, Value, EvalError}; +use crate::protocol::eval::{ValueGroup, Value}; use crate::runtime2::*; -use crate::runtime2::component::{CompCtx, CompId}; +use crate::runtime2::component::{CompCtx, CompId, PortInstruction}; use crate::runtime2::stdlib::internet::*; use crate::runtime2::poll::*; @@ -104,13 +104,15 @@ impl Component for ComponentTcpClient { }, Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); }, Message::Control(message) => { - component::default_handle_control_message( + if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx - ); + ) { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + } }, Message::Poll => { sched_ctx.log("Received polling event"); @@ -118,7 +120,7 @@ impl Component for ComponentTcpClient { } } - fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); match self.exec_state.mode { @@ -132,20 +134,20 @@ impl Component for ComponentTcpClient { SocketState::Connected(_socket) => { if self.sync_state == SyncState::FinishSyncThenQuit { // Previous request was to let the component shut down - self.exec_state.mode = CompMode::StartExit; + self.exec_state.set_as_start_exit(ExitReason::Termination); } else { // Reset for a new request self.sync_state = SyncState::AwaitingCmd; self.consensus.notify_sync_start(comp_ctx); self.exec_state.mode = CompMode::Sync; } - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; }, SocketState::Error => { // Could potentially send an error message to the // connected component. - self.exec_state.mode = CompMode::StartExit; - return Ok(CompScheduling::Immediate); + self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync); + return CompScheduling::Immediate; } } }, @@ -159,48 +161,54 @@ impl Component for ComponentTcpClient { // Check which command we're supposed to execute. let message = self.inbox_main.take().unwrap(); let target_port_id = message.data_header.target_port; - component::default_handle_received_data_message( - target_port_id, &mut self.inbox_main, &mut self.inbox_backup, + let receive_result = component::default_handle_received_data_message( + target_port_id, PortInstruction::NoSource, + &mut self.inbox_main, &mut self.inbox_backup, comp_ctx, sched_ctx, &mut self.control ); - let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); - if tag_value == self.input_union_send_tag_value { - // Retrieve bytes from the message - self.byte_buffer.clear(); - let union_content = &message.content.regions[embedded_heap_pos as usize]; - debug_assert_eq!(union_content.len(), 1); - let array_heap_pos = union_content[0].as_array(); - let array_values = &message.content.regions[array_heap_pos as usize]; - self.byte_buffer.reserve(array_values.len()); - for value in array_values { - self.byte_buffer.push(value.as_uint8()); - } - - self.sync_state = SyncState::Putting; - return Ok(CompScheduling::Immediate); - } else if tag_value == self.input_union_receive_tag_value { - // Component requires a `recv` - self.sync_state = SyncState::Getting; - return Ok(CompScheduling::Immediate); - } else if tag_value == self.input_union_finish_tag_value { - // Component requires us to end the sync round - self.sync_state = SyncState::FinishSync; - return Ok(CompScheduling::Immediate); - } else if tag_value == self.input_union_shutdown_tag_value { - // Component wants to close the connection - self.sync_state = SyncState::FinishSyncThenQuit; - return Ok(CompScheduling::Immediate); + if let Err(location_and_message) = receive_result { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate; } else { - unreachable!("got tag_value {}", tag_value) + let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); + if tag_value == self.input_union_send_tag_value { + // Retrieve bytes from the message + self.byte_buffer.clear(); + let union_content = &message.content.regions[embedded_heap_pos as usize]; + debug_assert_eq!(union_content.len(), 1); + let array_heap_pos = union_content[0].as_array(); + let array_values = &message.content.regions[array_heap_pos as usize]; + self.byte_buffer.reserve(array_values.len()); + for value in array_values { + self.byte_buffer.push(value.as_uint8()); + } + + self.sync_state = SyncState::Putting; + return CompScheduling::Immediate; + } else if tag_value == self.input_union_receive_tag_value { + // Component requires a `recv` + self.sync_state = SyncState::Getting; + return CompScheduling::Immediate; + } else if tag_value == self.input_union_finish_tag_value { + // Component requires us to end the sync round + self.sync_state = SyncState::FinishSync; + return CompScheduling::Immediate; + } else if tag_value == self.input_union_shutdown_tag_value { + // Component wants to close the connection + self.sync_state = SyncState::FinishSyncThenQuit; + return CompScheduling::Immediate; + } else { + unreachable!("got tag_value {}", tag_value) + } } } else { todo!("handle sync failure due to message deadlock"); - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } } else { self.exec_state.set_as_blocked_get(self.pdl_input_port_id); - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } }, SyncState::Putting => { @@ -216,7 +224,7 @@ impl Component for ComponentTcpClient { }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { - return Ok(CompScheduling::Sleep); // wait until notified + return CompScheduling::Sleep; // wait until notified } else { todo!("handle socket.send error {:?}", err) } @@ -226,10 +234,10 @@ impl Component for ComponentTcpClient { // If here then we're done putting the data, we can // finish the sync round - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); - return Ok(CompScheduling::Immediate); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + return CompScheduling::Immediate; }, SyncState::Getting => { // We're going to try and receive a single message. If @@ -243,13 +251,19 @@ impl Component for ComponentTcpClient { Ok(num_received) => { self.byte_buffer.resize(num_received, 0); let message_content = self.bytes_to_data_message_content(&self.byte_buffer); - let scheduling = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, message_content, sched_ctx, &mut self.consensus, comp_ctx); - self.sync_state = SyncState::AwaitingCmd; - return Ok(scheduling); + let send_result = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, message_content, sched_ctx, &mut self.consensus, comp_ctx); + if let Err(location_and_message) = send_result { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate; + } else { + let scheduling = send_result.unwrap(); + self.sync_state = SyncState::AwaitingCmd; + return scheduling; + } }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { - return Ok(CompScheduling::Sleep); // wait until polled + return CompScheduling::Sleep; // wait until polled } else { todo!("handle socket.receive error {:?}", err) } @@ -257,26 +271,26 @@ impl Component for ComponentTcpClient { } }, SyncState::FinishSync | SyncState::FinishSyncThenQuit => { - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); - return Ok(CompScheduling::Requeue); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + return CompScheduling::Requeue; }, } }, CompMode::BlockedGet => { // Entered when awaiting a new command debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; }, CompMode::SyncEnd | CompMode::BlockedPut => - return Ok(CompScheduling::Sleep), + return CompScheduling::Sleep, CompMode::StartExit => - return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)), + return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus), CompMode::BusyExit => - return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)), + return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx), CompMode::Exit => - return Ok(component::default_handle_exit(&self.exec_state)), + return component::default_handle_exit(&self.exec_state), } } }