diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 5656f68fddc3c037ed64027f389f582fb509a08d..f57eb821a93f58b66d8c991e5add2d4b4bbf1540 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -8,8 +8,10 @@ use super::component::{self, *}; use super::control_layer::*; use super::consensus::*; + use std::io::ErrorKind as IoErrorKind; use std::net::{IpAddr, Ipv4Addr}; +use crate::protocol::{ProcedureDefinitionId, TypeId}; // ----------------------------------------------------------------------------- // ComponentTcpClient @@ -413,43 +415,32 @@ impl ListenerSocketState { } } +struct PendingComponent { + client: SocketTcpClient, + cmd_rx: PortId, + data_tx: PortId, +} + enum ListenerSyncState { AwaitingCmd, - AcceptCommandReceived, - AcceptChannelGenerated{ client: SocketTcpClient, cmd_rx: PortId, data_tx: PortId }, + AcceptCommandReceived, // just received `Accept` command + AcceptChannelGenerated, // created channel, waiting to end the sync round + AcceptGenerateComponent, // sync ended, back in non-sync, now generate component FinishSyncThenQuit, } -impl ListenerSyncState { - // Bit of a hacky solution: keeps the listener sync state intact, except - // if it is `AcceptChannelGenerated`, that will be replaced with - // `AwaitingCmd`. The reason is to move the `client` out. - fn take(&mut self) -> ListenerSyncState { - use ListenerSyncState::*; - - match self { - AwaitingCmd => return AwaitingCmd, - AcceptCommandReceived => return AcceptCommandReceived, - FinishSyncThenQuit => return FinishSyncThenQuit, - AcceptChannelGenerated{ .. } => { - let mut swapped = ListenerSyncState::AwaitingCmd; - std::mem::swap(self, &mut swapped); - return swapped; - } - } - } -} - pub struct ComponentTcpListener { // Properties for the tcp socket socket_state: ListenerSocketState, sync_state: ListenerSyncState, + pending_component: Option, poll_ticket: Option, inbox_main: InboxMain, inbox_backup: InboxBackup, pdl_input_port_id: PortId, // input port, receives commands pdl_output_port_id: PortId, // output port, sends connections - // Information about union tags + // Type information extracted from protocol + tcp_client_definition: (ProcedureDefinitionId, TypeId), input_union_accept_tag: i64, input_union_shutdown_tag: i64, output_struct_rx_index: usize, @@ -503,7 +494,9 @@ impl Component for ComponentTcpListener { fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { match message { - Message::Data(_message) => unreachable!(), + Message::Data(message) => { + self.handle_incoming_data_message(sched_ctx, comp_ctx, message); + }, Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); @@ -523,7 +516,7 @@ impl Component for ComponentTcpListener { } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { - sched_ctx.info(&format!("Running comonent ComponentTcpListener (mode: {:?})", self.exec_state.mode)); + sched_ctx.info(&format!("Running component ComponentTcpListener (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { CompMode::BlockedSelect @@ -536,21 +529,25 @@ impl Component for ComponentTcpListener { CompMode::NonSync => { match &self.socket_state { ListenerSocketState::Connected(_socket) => { - match self.sync_state.take() { + match self.sync_state { ListenerSyncState::AwaitingCmd => { component::default_handle_sync_start( &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus ); }, - ListenerSyncState::AcceptCommandReceived => unreachable!(), - ListenerSyncState::AcceptChannelGenerated{ client, cmd_rx, data_tx } => { + ListenerSyncState::AcceptCommandReceived | + ListenerSyncState::AcceptChannelGenerated => unreachable!(), + ListenerSyncState::AcceptGenerateComponent => { // Now that we're outside the sync round, create the tcp client // component - let socket_component: Box = Box::new(ComponentTcpClient::new_with_existing_connection(client, cmd_rx, data_tx)); + let pending = self.pending_component.take().unwrap(); + let socket_component: Box = Box::new(ComponentTcpClient::new_with_existing_connection( + pending.client, pending.cmd_rx, pending.data_tx + )); component::special_create_component( &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, &mut self.inbox_main, &mut self.inbox_backup, socket_component, - vec![cmd_rx, data_tx] + vec![pending.cmd_rx, pending.data_tx] ); self.sync_state = ListenerSyncState::AwaitingCmd; // superfluous, see ListenerSyncState.take() }, @@ -619,11 +616,11 @@ impl Component for ComponentTcpListener { // Construct the message containing the appropriate ports that will // be sent to the component commanding this listener. - let mut values = ValueGroup::new_stack(vec![ - Value::Unassigned, Value::Unassigned - ]); - values.values[self.output_struct_tx_index] = Value::Output(port_id_to_eval(cmd_channel.putter_id)); - values.values[self.output_struct_rx_index] = Value::Input(port_id_to_eval(data_channel.getter_id)); + let mut values = ValueGroup::new_stack(Vec::with_capacity(1)); + values.values.push(Value::Struct(0)); + values.regions.push(vec![Value::Unassigned, Value::Unassigned]); + values.regions[0][self.output_struct_tx_index] = Value::Output(port_id_to_eval(cmd_channel.putter_id)); + values.regions[0][self.output_struct_rx_index] = Value::Input(port_id_to_eval(data_channel.getter_id)); if let Err(location_and_message) = component::default_send_data_message( &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, values, sched_ctx, &mut self.consensus, &mut self.control, comp_ctx @@ -633,17 +630,15 @@ impl Component for ComponentTcpListener { ); } - // And finish the consensus round - component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); - - // Enter a state such that when we leave the consensus round and - // go back to the nonsync state, that we will actually create the - // tcp client component. - self.sync_state = ListenerSyncState::AcceptChannelGenerated { + // Prepare for finishing the consensus round, and once finished, + // create the tcp client component + self.sync_state = ListenerSyncState::AcceptChannelGenerated; + debug_assert!(self.pending_component.is_none()); + self.pending_component = Some(PendingComponent{ client, cmd_rx: cmd_channel.getter_id, data_tx: data_channel.putter_id - }; + }); return CompScheduling::Requeue; }, @@ -656,11 +651,16 @@ impl Component for ComponentTcpListener { } } }, - ListenerSyncState::AcceptChannelGenerated{ .. } => unreachable!(), - ListenerSyncState::FinishSyncThenQuit => { + ListenerSyncState::AcceptChannelGenerated => { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + self.sync_state = ListenerSyncState::AcceptGenerateComponent; return CompScheduling::Requeue; } + ListenerSyncState::FinishSyncThenQuit => { + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + return CompScheduling::Requeue; + }, + ListenerSyncState::AcceptGenerateComponent => unreachable!(), } }, CompMode::BlockedGet => { @@ -695,6 +695,7 @@ impl ComponentTcpListener { return Self { socket_state: ListenerSocketState::Connected(socket.unwrap()), sync_state: ListenerSyncState::AwaitingCmd, + pending_component: None, poll_ticket: None, inbox_main: vec![None, None], inbox_backup: InboxBackup::new(), @@ -709,6 +710,21 @@ impl ComponentTcpListener { consensus: Consensus::new(), } } + + fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + if self.exec_state.mode.is_in_sync_block() { + self.consensus.handle_incoming_data_message(comp_ctx, &message); + } + + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => {}, + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); + } + } + } } fn ip_addr_and_port_from_args(