Changeset - bf5b20db8be4
[Not reviewed]
0 3 0
MH - 3 years ago 2022-05-19 22:47:13
contact@maxhenger.nl
Finish tcp client/server example
3 files changed with 207 insertions and 0 deletions:
0 comments (0 inline, 0 general)
bin-compiler/src/main.rs
Show inline comments
 
@@ -103,12 +103,13 @@ fn main() {
 
            println!("FAILED (to read file)\nbecause:\n{}", err);
 
            return;
 
        }
 

	
 
        if let Err(err) = builder.add(input_file.to_string(), file_buffer.clone()) {
 
            println!("FAILED (to tokenize file)\nbecause:\n{}", err);
 
            return;
 
        }
 

	
 
        println!("Success");
 
    }
 

	
 
    // Compile the program
src/runtime2/stdlib/internet.rs
Show inline comments
 
@@ -116,12 +116,16 @@ impl SocketTcpListener {
 
            libc::SOCK_STREAM, libc::IPPROTO_TCP, ip, port
 
        )?;
 
        if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 
        if !set_socket_reuse_address(socket_handle) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        // Listen
 
        unsafe {
 
            let result = listen(socket_handle, libc::SOMAXCONN);
 
            if result < 0 {
 
                unsafe{ libc::close(socket_handle); }
 
@@ -385,12 +389,41 @@ fn set_socket_blocking(handle: libc::c_int, blocking: bool) -> bool {
 
        }
 
    }
 

	
 
    return true;
 
}
 

	
 
#[inline]
 
fn set_socket_reuse_address(handle: libc::c_int) -> bool {
 
    if handle < 0 {
 
        return false;
 
    }
 

	
 
    unsafe {
 
        let enable: libc::c_int = 1;
 
        let enable_ptr: *const _ = &enable;
 
        let result = libc::setsockopt(
 
            handle, libc::SOL_SOCKET, libc::SO_REUSEADDR,
 
            enable_ptr.cast(), size_of::<libc::c_int>() as libc::socklen_t
 
        );
 
        if result < 0 {
 
            return false;
 
        }
 

	
 
        let result = libc::setsockopt(
 
            handle, libc::SOL_SOCKET, libc::SO_REUSEPORT,
 
            enable_ptr.cast(), size_of::<libc::c_int>() as libc::socklen_t
 
        );
 
        if result < 0 {
 
            return false;
 
        }
 
    }
 

	
 
    return true;
 
}
 

	
 
#[inline]
 
fn socket_family_from_ip(ip: IpAddr) -> libc::c_int {
 
    return match ip {
 
        IpAddr::V4(_) => libc::AF_INET,
 
        IpAddr::V6(_) => libc::AF_INET6,
 
    };
testdata/examples/04_native_components.pdl
Show inline comments
 
// We'll start by important the standard library that defines the builtin
 
// components that support a TCP listener and a TCP client.
 

	
 
import std.internet::*;
 

	
 
// We'll define a little utility used through this document that is called to
 
// retrieve the port we're going to listen on.
 

	
 
func listen_port() -> u16 {
 
    return 2392;
 
}
 

	
 
// Next we define our server. The server accepts (for the case of this example)
 
// a number of connections until it will stop listening. At that point it will
 
// wait until it receives a signal that allows it to shut down.
 

	
 
comp server(u32 num_connections, in<()> shutdown) {
 
    // Here we set up the channels for commands, going to the listener
 
    // component, and the channel that sends new connections back to us.
 
    channel listen_cmd_tx -> listen_cmd_rx;
 
    channel listen_conn_tx -> listen_conn_rx;
 

	
 
    // And we create the tcp_listener, imported from the standard library, here.
 
    new tcp_listener({}, listen_port(), listen_cmd_rx, listen_conn_tx);
 

	
 
    // Here we set up a variable that will hold our received connections
 
    channel client_cmd_tx -> unused_client_cmd_rx;
 
    channel unused_client_data_tx -> client_data_rx;
 
    auto new_connection = TcpConnection{
 
        tx: client_cmd_tx,
 
        rx: client_data_rx,
 
    };
 

	
 
    auto connection_counter = 0;
 
    while (connection_counter < num_connections) {
 
        // We wait until we receive a new connection
 
        print("server: waiting for an accepted connection");
 
        sync {
 
            // The way the standard library is currently written, we need to
 
            // send the `tcp_listener` component the command that it should
 
            // listen to for the next connection. This is only one way in which
 
            // the standard library could be written. We could also write it
 
            // such a way such that a separate component buffers new incoming
 
            // connections, such that we only have to `get` from that separate
 
            // component.
 
            //
 
            // Note that when we get such a new connection, (see the
 
            // TcpConnection struct in the standard library), the peers of the
 
            // two ports are already hooked up to a `tcp_client` component, also
 
            // defined in the standard library.
 
            put(listen_cmd_tx, ListenerCmd::Accept);
 
            new_connection = get(listen_conn_rx);
 
        }
 

	
 
        // In any case, now that the code is here, the synchronous round that
 
        // governed receiving the new connection has completed. And so we send
 
        // that connection off to a handler component. In this case we have the
 
        // `echo_machine` component, defined in this file as well.
 
        print("server: spawning an echo'ing component");
 
        new echo_machine(new_connection);
 
        connection_counter += 1;
 
    }
 

	
 
    // When all of the desired connections have been handled, we first await a
 
    // shutdown signal from another component.
 
    print("server: awaiting shutdown signal");
 
    sync auto v = get(shutdown);
 

	
 
    // And once we have received that signal, we'll instruct the listener
 
    // component to shut down.
 
    print("server: shutting down listener");
 
    sync put(listen_cmd_tx, ListenerCmd::Shutdown);
 
}
 

	
 
// This is the component that is spawned by the server component to handle new
 
// connections. All it does is wait for a single incoming TCP packet, where it
 
// expects a single byte of data, and then echo that back to the peer.
 

	
 
comp echo_machine(TcpConnection conn) {
 
    auto data_to_echo = {};
 

	
 
    // Here is where we receive a message from a peer ...
 
    sync {
 
        print("echo: receiving data");
 
        put(conn.tx, ClientCmd::Receive);
 
        data_to_echo = get(conn.rx);
 
        put(conn.tx, ClientCmd::Finish);
 
    }
 

	
 
    // ... and send it right back to our peer.
 
    print("echo: sending back data");
 
    sync put(conn.tx, ClientCmd::Send(data_to_echo));
 

	
 
    // And we ask the `tcp_client` to shut down neatly.
 
    print("echo: shutting down");
 
    sync put(conn.tx, ClientCmd::Shutdown);
 
}
 

	
 
// Here is the component that we will instantiate to connect to the `server`
 
// component above (more specifically, to the `tcp_listener` component
 
// instantiated by the `server`). This is the component that will ask the
 
// `echo_machine` component to echo a byte of data.
 

	
 
comp echo_requester(u8 byte_to_send, out<()> done) {
 
    // We instantiate the `tcp_client` from the standard library. This will
 
    // perform the "connect" call to the `tcp_listener`.
 
    channel cmd_tx -> cmd_rx;
 
    channel data_tx -> data_rx;
 
    new tcp_client({127, 0, 0, 1}, listen_port(), cmd_rx, data_tx);
 

	
 
    // And once we are connected, we send the single byte to the other side.
 
    print("requester: sending bytes");
 
    sync put(cmd_tx, ClientCmd::Send({ byte_to_send }));
 

	
 
    // This sent byte will arrive at the `echo_machine`, which will send it
 
    // right back to us. So here is where we wait for that byte to arrive.
 
    auto received_byte = byte_to_send + 1;
 
    sync {
 
        print("requester: receiving echo response");
 
        put(cmd_tx, ClientCmd::Receive);
 
        received_byte = get(data_rx)[0];
 
        put(cmd_tx, ClientCmd::Finish);
 
    }
 

	
 
    // We make sure that we got back what we sent
 
    while (byte_to_send != received_byte) {
 
        print("requester: Oh no! we got back a byte different from the one we sent");
 
    }
 

	
 
    // And we shut down the TCP connection
 
    print("requester: shutting down TCP component");
 
    sync put(cmd_tx, ClientCmd::Shutdown);
 

	
 
    // And finally we send a signal to another component (the `main` component)
 
    // to let it know we have finished our little protocol.
 
    sync put(done, ());
 
}
 

	
 
// And here the entry point for our program
 

	
 
comp main() {
 
    // Some settings for the example
 
    auto num_connections = 12;
 

	
 
    // We create a new channel that allows us to shut down our server component.
 
    // That channel being created, we can instantiate the server component.
 
    channel shutdown_listener_tx -> shutdown_listener_rx;
 
    new server(num_connections, shutdown_listener_rx);
 

	
 
    // Here we create all the requesters that will ask their peer to echo back
 
    // a particular byte.
 
    auto connection_index = 0;
 
    auto all_done = {};
 
    while (connection_index < num_connections) {
 
        channel done_tx -> done_rx;
 
        new echo_requester(cast(connection_index), done_tx);
 
        connection_index += 1;
 
        all_done @= {done_rx};
 
    }
 

	
 
    // Here our program starts to shut down. First we'll wait until all of our
 
    // requesting components have gotten back the byte they're expecting.
 
    auto counter = 0;
 
    while (counter < length(all_done)) {
 
        print("constructor: waiting for requester to exit");
 
        sync auto v = get(all_done[counter]);
 
        counter += 1;
 
    }
 

	
 
    // And we shut down our server.
 
    print("constructor: instructing listener to exit");
 
    sync put(shutdown_listener_tx, ());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)