diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 9fec80daf7169dba1681825cdfe547e42714ec11..5656f68fddc3c037ed64027f389f582fb509a08d 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -330,7 +330,7 @@ impl ComponentTcpClient { pub(crate) fn new_with_existing_connection(socket: SocketTcpClient, input_port: PortId, output_port: PortId) -> Self { return Self{ - socket_state: ClientSocketState::Connected(socket);, + socket_state: ClientSocketState::Connected(socket), sync_state: ClientSyncState::AwaitingCmd, poll_ticket: None, inbox_main: vec![None, None], @@ -413,13 +413,33 @@ impl ListenerSocketState { } } -#[derive(PartialEq, Debug)] enum ListenerSyncState { AwaitingCmd, - Accept, + AcceptCommandReceived, + AcceptChannelGenerated{ client: SocketTcpClient, cmd_rx: PortId, data_tx: PortId }, FinishSyncThenQuit, } +impl ListenerSyncState { + // Bit of a hacky solution: keeps the listener sync state intact, except + // if it is `AcceptChannelGenerated`, that will be replaced with + // `AwaitingCmd`. The reason is to move the `client` out. + fn take(&mut self) -> ListenerSyncState { + use ListenerSyncState::*; + + match self { + AwaitingCmd => return AwaitingCmd, + AcceptCommandReceived => return AcceptCommandReceived, + FinishSyncThenQuit => return FinishSyncThenQuit, + AcceptChannelGenerated{ .. } => { + let mut swapped = ListenerSyncState::AwaitingCmd; + std::mem::swap(self, &mut swapped); + return swapped; + } + } + } +} + pub struct ComponentTcpListener { // Properties for the tcp socket socket_state: ListenerSocketState, @@ -516,14 +536,29 @@ impl Component for ComponentTcpListener { CompMode::NonSync => { match &self.socket_state { ListenerSocketState::Connected(_socket) => { - if self.sync_state == ListenerSyncState::FinishSyncThenQuit { - self.exec_state.set_as_start_exit(ExitReason::Termination); - } else { - self.sync_state = ListenerSyncState::AwaitingCmd; - component::default_handle_sync_start( - &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus - ); + match self.sync_state.take() { + ListenerSyncState::AwaitingCmd => { + component::default_handle_sync_start( + &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus + ); + }, + ListenerSyncState::AcceptCommandReceived => unreachable!(), + ListenerSyncState::AcceptChannelGenerated{ client, cmd_rx, data_tx } => { + // Now that we're outside the sync round, create the tcp client + // component + let socket_component: Box = Box::new(ComponentTcpClient::new_with_existing_connection(client, cmd_rx, data_tx)); + component::special_create_component( + &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, + &mut self.inbox_main, &mut self.inbox_backup, socket_component, + vec![cmd_rx, data_tx] + ); + self.sync_state = ListenerSyncState::AwaitingCmd; // superfluous, see ListenerSyncState.take() + }, + ListenerSyncState::FinishSyncThenQuit => { + self.exec_state.set_as_start_exit(ExitReason::Termination); + }, } + return CompScheduling::Immediate; }, ListenerSocketState::Error => { @@ -543,7 +578,7 @@ impl Component for ComponentTcpListener { GetResult::Received(message) => { let (tag_value, _) = message.content.values[0].as_union(); if tag_value == self.input_union_accept_tag { - self.sync_state = ListenerSyncState::Accept; + self.sync_state = ListenerSyncState::AcceptCommandReceived; } else if tag_value == self.input_union_shutdown_tag { self.sync_state = ListenerSyncState::FinishSyncThenQuit; } else { @@ -561,33 +596,74 @@ impl Component for ComponentTcpListener { } } }, - ListenerSyncState::Accept => { + ListenerSyncState::AcceptCommandReceived => { let socket = self.socket_state.get_socket(); match socket.accept() { Ok(client) => { - // TODO: Continue here, somehow precreate component, but do the transfer correctly + // Create the channels (and the inbox entries, to stay consistent + // with the expectations from the `component` module's functions) let client = client.unwrap(); - component::default_start_create_component() - todo!("actually create the component") + let cmd_channel = comp_ctx.create_channel(); + let data_channel = comp_ctx.create_channel(); + + let port_ids = [ + cmd_channel.putter_id, cmd_channel.getter_id, + data_channel.putter_id, data_channel.getter_id, + ]; + for port_id in port_ids { + let expected_port_index = self.inbox_main.len(); + let port_handle = comp_ctx.get_port_handle(port_id); + self.inbox_main.push(None); + self.consensus.notify_of_new_port(expected_port_index, port_handle, comp_ctx); + } + + // Construct the message containing the appropriate ports that will + // be sent to the component commanding this listener. + let mut values = ValueGroup::new_stack(vec![ + Value::Unassigned, Value::Unassigned + ]); + values.values[self.output_struct_tx_index] = Value::Output(port_id_to_eval(cmd_channel.putter_id)); + values.values[self.output_struct_rx_index] = Value::Input(port_id_to_eval(data_channel.getter_id)); + if let Err(location_and_message) = component::default_send_data_message( + &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, values, + sched_ctx, &mut self.consensus, &mut self.control, comp_ctx + ) { + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, location_and_message + ); + } + + // And finish the consensus round + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + + // Enter a state such that when we leave the consensus round and + // go back to the nonsync state, that we will actually create the + // tcp client component. + self.sync_state = ListenerSyncState::AcceptChannelGenerated { + client, + cmd_rx: cmd_channel.getter_id, + data_tx: data_channel.putter_id + }; + + return CompScheduling::Requeue; }, Err(err) => { if err.kind() == IoErrorKind::WouldBlock { return CompScheduling::Sleep; } else { - todo!("handle listener.accept error {:?}", err); + todo!("handle listener.accept error {:?}", err) } } } }, + ListenerSyncState::AcceptChannelGenerated{ .. } => unreachable!(), ListenerSyncState::FinishSyncThenQuit => { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; } } - return CompScheduling::Sleep; }, CompMode::BlockedGet => { - debug_assert_eq!(self.sync_state, ListenerSyncState::AwaitingCmd); return CompScheduling::Sleep; }, CompMode::SyncEnd | CompMode::BlockedPut