Changeset - 432fcde4e554
[Not reviewed]
0 6 0
MH - 3 years ago 2022-05-15 14:41:30
contact@maxhenger.nl
Fix async component creation error in tcp listener
6 files changed with 70 insertions and 25 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -1282,25 +1282,24 @@ fn perform_send_message_with_ports_notify_peers(
 
            let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction;
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports in this message, as that port is already transmitted")
 
            ));
 
        }
 

	
 
        // Set the flag for transmission
 
        transmit_port_info.state.set(PortStateFlag::Transmitted);
 

	
 
        // Block the peer of the port
 
        let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id);
 
        println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message);
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, message, true);
 
    }
 

	
 
    // We've set up the protocol, once all the PPC's are blocked we are supposed
 
    // to transfer the message to the recipient. So store it temporarily
 
    exec_state.mode = CompMode::PutPortsBlockedAwaitingAcks;
 

	
 
    return Ok(());
 
}
 
@@ -1396,25 +1395,24 @@ fn default_handle_ack(
 

	
 
                // Note that the component is intentionally not
 
                // sleeping, so we just wake it up
 
                debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
 
                let key = unsafe { to_schedule.upgrade() };
 
                sched_ctx.runtime.enqueue_work(key);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::UnblockPutWithPorts => {
 
                // Send the message (containing ports) stored in the component
 
                // execution state to the recipient
 
                println!("DEBUG: Unblocking put with ports");
 
                debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks);
 
                exec_state.mode = CompMode::PutPortsBlockedSendingPort;
 
                let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 

	
 
                // Little bit of a hack, we didn't really unblock the sending
 
                // port, but this will mesh nicely with waiting for the sending
 
                // port to become unblocked.
 
                default_handle_recently_unblocked_port(
 
                    exec_state, control, consensus, port_handle, sched_ctx,
 
                    comp_ctx, inbox_main, inbox_backup
 
                )?;
 
            },
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -290,32 +290,48 @@ impl Component for ComponentTcpClient {
 
            CompMode::StartExit =>
 
                return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus),
 
            CompMode::BusyExit =>
 
                return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx),
 
            CompMode::Exit =>
 
                return component::default_handle_exit(&self.exec_state),
 
        }
 
    }
 
}
 

	
 
impl ComponentTcpClient {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        debug_assert_eq!(arguments.values.len(), 4);
 
        // Two possible cases here: if the number of arguments is 3, then we
 
        // get: (socket_handle, input_port, output_port). If the number of
 
        // arguments is 4, then we get: (ip, port, input_port, output_port).
 
        assert!(arguments.values.len() == 3 || arguments.values.len() == 4);
 

	
 
        // Parsing arguments
 
        let (socket, input_port, output_port) = if arguments.values.len() == 3 {
 
            let socket_handle = arguments.values[0].as_sint32();
 
            let socket = SocketTcpClient::new_from_handle(socket_handle);
 

	
 
            let input_port = component::port_id_from_eval(arguments.values[1].as_input());
 
            let output_port = component::port_id_from_eval(arguments.values[2].as_output());
 

	
 
            (socket, input_port, output_port)
 
        } else {
 
            let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1);
 
            let socket = SocketTcpClient::new(ip_address, port);
 

	
 
            let input_port = component::port_id_from_eval(arguments.values[2].as_input());
 
            let output_port = component::port_id_from_eval(arguments.values[3].as_output());
 

	
 
        let socket = SocketTcpClient::new(ip_address, port);
 
            (socket, input_port, output_port)
 
        };
 

	
 
        if let Err(socket) = socket {
 
            todo!("friendly error reporting: failed to open socket (reason: {:?})", socket);
 
        }
 

	
 
        return Self{
 
            socket_state: ClientSocketState::Connected(socket.unwrap()),
 
            sync_state: ClientSyncState::AwaitingCmd,
 
            poll_ticket: None,
 
            inbox_main: vec![None, None],
 
            inbox_backup: Vec::new(),
 
            input_union_send_tag_value: -1,
 
            input_union_receive_tag_value: -1,
 
@@ -407,25 +423,25 @@ enum ListenerSocketState {
 
}
 

	
 
impl ListenerSocketState {
 
    fn get_socket(&self) -> &SocketTcpListener {
 
        match self {
 
            ListenerSocketState::Connected(v) => return v,
 
            ListenerSocketState::Error => unreachable!(),
 
        }
 
    }
 
}
 

	
 
struct PendingComponent {
 
    client: SocketTcpClient,
 
    client: i32, // OS socket handle
 
    cmd_rx: PortId,
 
    data_tx: PortId,
 
}
 

	
 
enum ListenerSyncState {
 
    AwaitingCmd,
 
    AcceptCommandReceived, // just received `Accept` command
 
    AcceptChannelGenerated, // created channel, waiting to end the sync round
 
    AcceptGenerateComponent, // sync ended, back in non-sync, now generate component
 
    FinishSyncThenQuit,
 
}
 

	
 
@@ -447,24 +463,27 @@ pub struct ComponentTcpListener {
 
    output_struct_tx_index: usize,
 
    // Generic component state
 
    exec_state: CompExecState,
 
    control: ControlLayer,
 
    consensus: Consensus,
 
}
 

	
 
impl Component for ComponentTcpListener {
 
    fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) {
 
        // Retrieve type information for the message with ports we're going to send
 
        let pd = &sched_ctx.runtime.protocol;
 

	
 
        self.tcp_client_definition = sched_ctx.runtime.protocol.find_procedure(b"std.internet", b"tcp_client")
 
            .expect("'tcp_client' component in the 'std.internet' module");
 

	
 
        let cmd_type = pd.find_type(b"std.internet", b"ListenerCmd")
 
            .expect("'ListenerCmd' type in the 'std.internet' module");
 
        let cmd_type = cmd_type.as_union();
 

	
 
        self.input_union_accept_tag = cmd_type.get_variant_tag_value(b"Accept").unwrap();
 
        self.input_union_shutdown_tag = cmd_type.get_variant_tag_value(b"Shutdown").unwrap();
 

	
 
        let conn_type = pd.find_type(b"std.internet", b"TcpConnection")
 
            .expect("'TcpConnection' type in the 'std.internet' module");
 
        let conn_type = conn_type.as_struct();
 

	
 
        assert_eq!(conn_type.get_num_struct_fields(), 2);
 
@@ -532,33 +551,37 @@ impl Component for ComponentTcpListener {
 
                        match self.sync_state {
 
                            ListenerSyncState::AwaitingCmd => {
 
                                component::default_handle_sync_start(
 
                                    &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                                );
 
                            },
 
                            ListenerSyncState::AcceptCommandReceived |
 
                            ListenerSyncState::AcceptChannelGenerated => unreachable!(),
 
                            ListenerSyncState::AcceptGenerateComponent => {
 
                                // Now that we're outside the sync round, create the tcp client
 
                                // component
 
                                let pending = self.pending_component.take().unwrap();
 
                                let socket_component: Box<dyn Component> = Box::new(ComponentTcpClient::new_with_existing_connection(
 
                                    pending.client, pending.cmd_rx, pending.data_tx
 
                                ));
 
                                component::special_create_component(
 

	
 
                                let arguments = ValueGroup::new_stack(vec![
 
                                    Value::SInt32(pending.client),
 
                                    Value::Input(port_id_to_eval(pending.cmd_rx)),
 
                                    Value::Output(port_id_to_eval(pending.data_tx)),
 
                                ]);
 
                                component::default_start_create_component(
 
                                    &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control,
 
                                    &mut self.inbox_main, &mut self.inbox_backup, socket_component,
 
                                    vec![pending.cmd_rx, pending.data_tx]
 
                                    &mut self.inbox_main, &mut self.inbox_backup,
 
                                    self.tcp_client_definition.0, self.tcp_client_definition.1,
 
                                    arguments
 
                                );
 
                                self.sync_state = ListenerSyncState::AwaitingCmd; // superfluous, see ListenerSyncState.take()
 
                                self.sync_state = ListenerSyncState::AwaitingCmd;
 
                            },
 
                            ListenerSyncState::FinishSyncThenQuit => {
 
                                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                            },
 
                        }
 

	
 
                        return CompScheduling::Immediate;
 
                    },
 
                    ListenerSocketState::Error => {
 
                        self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
                        return CompScheduling::Immediate;
 
                    }
 
@@ -587,28 +610,27 @@ impl Component for ComponentTcpListener {
 
                            GetResult::NoMessage => {
 
                                return CompScheduling::Sleep;
 
                            },
 
                            GetResult::Error(location_and_message) => {
 
                                component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                return CompScheduling::Immediate;
 
                            }
 
                        }
 
                    },
 
                    ListenerSyncState::AcceptCommandReceived => {
 
                        let socket = self.socket_state.get_socket();
 
                        match socket.accept() {
 
                            Ok(client) => {
 
                            Ok(client_handle) => {
 
                                // Create the channels (and the inbox entries, to stay consistent
 
                                // with the expectations from the `component` module's functions)
 
                                let client = client.unwrap();
 
                                let cmd_channel = comp_ctx.create_channel();
 
                                let data_channel = comp_ctx.create_channel();
 

	
 
                                let port_ids = [
 
                                    cmd_channel.putter_id, cmd_channel.getter_id,
 
                                    data_channel.putter_id, data_channel.getter_id,
 
                                ];
 
                                for port_id in port_ids {
 
                                    let expected_port_index = self.inbox_main.len();
 
                                    let port_handle = comp_ctx.get_port_handle(port_id);
 
                                    self.inbox_main.push(None);
 
                                    self.consensus.notify_of_new_port(expected_port_index, port_handle, comp_ctx);
 
@@ -626,25 +648,25 @@ impl Component for ComponentTcpListener {
 
                                    sched_ctx, &mut self.consensus, &mut self.control, comp_ctx
 
                                ) {
 
                                    component::default_handle_error_for_builtin(
 
                                        &mut self.exec_state, sched_ctx, location_and_message
 
                                    );
 
                                }
 

	
 
                                // Prepare for finishing the consensus round, and once finished,
 
                                // create the tcp client component
 
                                self.sync_state = ListenerSyncState::AcceptChannelGenerated;
 
                                debug_assert!(self.pending_component.is_none());
 
                                self.pending_component = Some(PendingComponent{
 
                                    client,
 
                                    client: client_handle,
 
                                    cmd_rx: cmd_channel.getter_id,
 
                                    data_tx: data_channel.putter_id
 
                                });
 

	
 
                                return CompScheduling::Requeue;
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return CompScheduling::Sleep;
 
                                } else {
 
                                    todo!("handle listener.accept error {:?}", err)
 
                                }
 
@@ -692,24 +714,25 @@ impl ComponentTcpListener {
 
            todo!("friendly error reporting: failed to open socket (reason: {:?})", socket);
 
        }
 

	
 
        return Self {
 
            socket_state: ListenerSocketState::Connected(socket.unwrap()),
 
            sync_state: ListenerSyncState::AwaitingCmd,
 
            pending_component: None,
 
            poll_ticket: None,
 
            inbox_main: vec![None, None],
 
            inbox_backup: InboxBackup::new(),
 
            pdl_input_port_id: input_port,
 
            pdl_output_port_id: output_port,
 
            tcp_client_definition: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()),
 
            input_union_accept_tag: -1,
 
            input_union_shutdown_tag: -1,
 
            output_struct_tx_index: 0,
 
            output_struct_rx_index: 0,
 
            exec_state: CompExecState::new(),
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
        }
 
    }
 

	
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        if self.exec_state.mode.is_in_sync_block() {
src/runtime2/poll/mod.rs
Show inline comments
 
@@ -251,25 +251,25 @@ impl PollingThread {
 
                    handle.send_message(&self.runtime, Message::Poll, true);
 
                }
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn user_data_as_key(data: UserData) -> u64 {
 
        return data.0;
 
    }
 

	
 
    fn log(&self, message: &str) {
 
        if self.log_level >= LogLevel::Info {
 
        if LogLevel::Info >= self.log_level {
 
            println!("[polling] {}", message);
 
        }
 
    }
 
}
 

	
 
// bit convoluted, but it works
 
pub(crate) struct PollingThreadHandle {
 
    // requires Option, because:
 
    queue: Option<QueueDynProducer<PollCmd>>, // destructor needs to be called
 
    handle: Option<thread::JoinHandle<()>>, // we need to call `join`
 
}
 

	
src/runtime2/stdlib/internet.rs
Show inline comments
 
@@ -36,31 +36,32 @@ pub struct SocketTcpClient {
 

	
 
impl SocketTcpClient {
 
    pub fn new(ip: IpAddr, port: u16) -> Result<Self, SocketError> {
 

	
 
        let socket_handle = create_and_connect_socket(
 
            libc::SOCK_STREAM, libc::IPPROTO_TCP, ip, port
 
        )?;
 
        if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        println!(" CREATE  [{:04}] client", socket_handle);
 
        return Ok(SocketTcpClient{
 
            socket_handle,
 
            is_blocking: SOCKET_BLOCKING,
 
        })
 
    }
 

	
 
    fn new_from_handle(socket_handle: libc::c_int) -> Result<Self, SocketError> {
 
    pub(crate) fn new_from_handle(socket_handle: libc::c_int) -> Result<Self, SocketError> {
 
        if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        return Ok(SocketTcpClient{
 
            socket_handle,
 
            is_blocking: SOCKET_BLOCKING,
 
        })
 
    }
 

	
 
    pub fn send(&self, message: &[u8]) -> Result<usize, IoError> {
 
@@ -83,24 +84,25 @@ impl SocketTcpClient {
 
            libc::recv(self.socket_handle, message_pointer, buffer.len(), 0)
 
        };
 
        if result < 0 {
 
            return Err(IoError::last_os_error());
 
        }
 

	
 
        return Ok(result as usize);
 
    }
 
}
 

	
 
impl Drop for SocketTcpClient {
 
    fn drop(&mut self) {
 
        println!("DESTRUCT [{:04}] client", self.socket_handle);
 
        debug_assert!(self.socket_handle >= 0);
 
        unsafe{ close(self.socket_handle) };
 
    }
 
}
 

	
 
impl AsFileDescriptor for SocketTcpClient {
 
    fn as_file_descriptor(&self) -> FileDescriptor {
 
        return self.socket_handle;
 
    }
 
}
 

	
 
/// TCP listener. Yielding new connections
 
@@ -120,44 +122,48 @@ impl SocketTcpListener {
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        // Listen
 
        unsafe {
 
            let result = listen(socket_handle, libc::SOMAXCONN);
 
            if result < 0 {
 
                unsafe{ libc::close(socket_handle); }
 
                return Err(SocketError::Listening);
 
            }
 
        }
 

	
 

	
 
        println!(" CREATE  [{:04}] listener", socket_handle);
 
        return Ok(SocketTcpListener{
 
            socket_handle,
 
            is_blocking: SOCKET_BLOCKING,
 
        });
 
    }
 

	
 
    pub fn accept(&self) -> Result<Result<SocketTcpClient, SocketError>, IoError> {
 
    pub fn accept(&self) -> Result<libc::c_int, IoError> {
 
        let (mut address, mut address_size) = create_sockaddr_in_empty();
 
        let address_pointer = &mut address as *mut sockaddr_in;
 
        let socket_handle = unsafe { accept(self.socket_handle, address_pointer.cast(), &mut address_size) };
 
        if socket_handle < 0 {
 
            return Err(IoError::last_os_error());
 
        }
 

	
 
        return Ok(SocketTcpClient::new_from_handle(socket_handle));
 
        println!(" CREATE  [{:04}] client (from listener)", socket_handle);
 
        return Ok(socket_handle);
 
    }
 
}
 

	
 
impl Drop for SocketTcpListener {
 
    fn drop(&mut self) {
 
        println!("DESTRUCT [{:04}] listener", self.socket_handle);
 
        debug_assert!(self.socket_handle >= 0);
 
        unsafe{ close(self.socket_handle) };
 
    }
 
}
 

	
 
impl AsFileDescriptor for SocketTcpListener {
 
    fn as_file_descriptor(&self) -> FileDescriptor {
 
        return self.socket_handle;
 
    }
 
}
 

	
 
/// Raw socket receiver. Essentially a listener that accepts a single connection
src/runtime2/tests/internet.rs
Show inline comments
 
@@ -70,28 +70,28 @@ fn test_stdlib_file() {
 

	
 
        new fake_client(connection);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_tcp_listener_and_client() {
 
    compile_and_create_component("
 
    import std.internet::*;
 

	
 
    func listen_port() -> u16 {
 
        return 2393;
 
        return 2392;
 
    }
 

	
 
    comp server(u32 num_connections) {
 
    comp server(u32 num_connections, in<()> shutdown) {
 
        // Start tcp listener
 
        channel listen_cmd_tx -> listen_cmd_rx;
 
        channel listen_conn_tx -> listen_conn_rx;
 
        new tcp_listener({}, listen_port(), listen_cmd_rx, listen_conn_tx);
 

	
 
        // Fake channels such that we can create a dummy connection variable
 
        channel client_cmd_tx -> unused_client_cmd_rx;
 
        channel unused_client_data_tx -> client_data_rx;
 
        auto new_connection = TcpConnection{
 
            tx: client_cmd_tx,
 
            rx: client_data_rx,
 
        };
 
@@ -104,49 +104,51 @@ fn test_tcp_listener_and_client() {
 
                put(listen_cmd_tx, ListenerCmd::Accept);
 
                new_connection = get(listen_conn_rx);
 
            }
 

	
 
            // We have a new connection, spawn an 'echoer' for it
 
            print(\"server: spawning an echo'ing component\");
 
            new echo_machine(new_connection);
 
            connection_counter += 1;
 
        }
 

	
 
        // Shut down the listener
 
        print(\"server: shutting down listener\");
 
        sync auto v = get(shutdown);
 
        sync put(listen_cmd_tx, ListenerCmd::Shutdown);
 
    }
 

	
 
    // Waits for a single TCP byte (to simplify potentially having to
 
    // concatenate requests) and echos it
 
    comp echo_machine(TcpConnection conn) {
 
        auto data_to_echo = {};
 

	
 
        // Wait for a message
 
        sync {
 
            print(\"echo: receiving data\");
 
            put(conn.tx, ClientCmd::Receive);
 
            data_to_echo = get(conn.rx);
 
            put(conn.tx, ClientCmd::Finish);
 
        }
 

	
 
        // Echo the message
 
        print(\"echo: sending back data\");
 
        sync put(conn.tx, ClientCmd::Send(data_to_echo));
 

	
 
        // Ask the tcp connection to shut down
 
        print(\"echo: shutting down\");
 
        sync put(conn.tx, ClientCmd::Shutdown);
 
    }
 

	
 
    comp echo_requester(u8 byte_to_send) {
 
    comp echo_requester(u8 byte_to_send, out<()> done) {
 
        channel cmd_tx -> cmd_rx;
 
        channel data_tx -> data_rx;
 
        new tcp_client({127, 0, 0, 1}, listen_port(), cmd_rx, data_tx);
 

	
 
        // Send the message
 
        print(\"requester: sending bytes\");
 
        sync put(cmd_tx, ClientCmd::Send({ byte_to_send }));
 

	
 
        // Receive the echo'd byte
 
        auto received_byte = byte_to_send + 1;
 
        sync {
 
            print(\"requester: receiving echo response\");
 
@@ -154,25 +156,41 @@ fn test_tcp_listener_and_client() {
 
            received_byte = get(data_rx)[0];
 
            put(cmd_tx, ClientCmd::Finish);
 
        }
 

	
 
        // Silly check, as always
 
        while (byte_to_send != received_byte) {
 
            print(\"requester: Oh no! The echo is an otherworldly distorter\");
 
        }
 

	
 
        // Shut down the TCP connection
 
        print(\"requester: shutting down TCP component\");
 
        sync put(cmd_tx, ClientCmd::Shutdown);
 
        sync put(done, ());
 
    }
 

	
 
    comp constructor() {
 
        auto num_connections = 1;
 
        new server(num_connections);
 
        channel shutdown_listener_tx -> shutdown_listener_rx;
 
        new server(num_connections, shutdown_listener_rx);
 

	
 
        auto connection_index = 0;
 
        auto all_done = {};
 
        while (connection_index < num_connections) {
 
            new echo_requester(cast(connection_index));
 
            channel done_tx -> done_rx;
 
            new echo_requester(cast(connection_index), done_tx);
 
            connection_index += 1;
 
            all_done @= {done_rx};
 
        }
 

	
 
        auto counter = 0;
 
        while (counter < length(all_done)) {
 
            print(\"constructor: waiting for requester to exit\");
 
            sync auto v = get(all_done[counter]);
 
            counter += 1;
 
        }
 

	
 
        print(\"constructor: instructing listener to exit\");
 
        sync put(shutdown_listener_tx, ());
 
    }
 
    ", "constructor", no_args());
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -5,25 +5,25 @@ use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
mod messaging;
 
mod error_handling;
 
mod transfer_ports;
 
mod internet;
 

	
 
const LOG_LEVEL: LogLevel = LogLevel::Debug;
 
const NUM_THREADS: u32 = 1;
 

	
 
pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) {
 
    let protocol = ProtocolDescription::parse(source.as_bytes())
 
        .expect("successful compilation");
 
    let runtime = Runtime::new(NUM_THREADS, LOG_LEVEL, protocol)
 
    let runtime = Runtime::new(NUM_THREADS, LogLevel::None, protocol)
 
        .expect("successful runtime startup");
 
    create_component(&runtime, "", routine_name, args);
 
}
 

	
 
pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let component = Box::new(CompPDL::new(prompt, 0));
 
    let (key, _) = rt.inner.finish_create_component(reserved, component, ctx, false);
0 comments (0 inline, 0 general)