diff --git a/src/protocol/ast.rs b/src/protocol/ast.rs index fce70b5b8456ac9fe9a3a23c835a7130a1b5fc62..c119e5ed087671302601906a25c5e0eff565789d 100644 --- a/src/protocol/ast.rs +++ b/src/protocol/ast.rs @@ -1086,6 +1086,7 @@ pub enum ProcedureSource { // Builtin components, available to user CompRandomU32, // TODO: Remove, temporary thing CompTcpClient, + CompTcpListener, } impl ProcedureSource { @@ -1852,6 +1853,7 @@ pub enum Method { // Builtin component, ComponentRandomU32, ComponentTcpClient, + ComponentTcpListener, // User-defined UserFunction, UserComponent, @@ -1862,7 +1864,7 @@ impl Method { use Method::*; match self { Get | Put | Fires | Create | Length | Assert | Print => true, - ComponentRandomU32 | ComponentTcpClient => true, + ComponentRandomU32 | ComponentTcpClient | ComponentTcpListener => true, _ => false, } } diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index 3507bdbbb6d77b6868015cda70c68f49b98b634f..982ee2dd826b4b13a850774b7ce1b20b302ce158 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -770,7 +770,7 @@ impl Prompt { }, } }, - Method::ComponentRandomU32 | Method::ComponentTcpClient => { + Method::ComponentRandomU32 | Method::ComponentTcpClient | Method::ComponentTcpListener => { debug_assert_eq!(heap[expr.procedure].parameters.len(), cur_frame.expr_values.len()); debug_assert_eq!(heap[cur_frame.position].as_new().expression, expr.this); }, diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index bde521997b2e5106181ba3f188786982d15f7c09..f628e7c21f64a040f7dea4b0fe7c027f60cb40bc 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -160,6 +160,32 @@ impl ProtocolDescription { }); } + /// Again a somewhat temporary method. Can be used by components to look up + /// the definition of a particular procedure. Intended use is to find the + /// DefinitionId/TypeId of builtin components. + pub(crate) fn find_procedure(&self, module_name: &[u8], proc_name: &[u8]) -> Option<(ProcedureDefinitionId, TypeId)> { + // Lookup type definition in module + let root_id = self.lookup_module_root(module_name)?; + let module = &self.heap[root_id]; + let definition_id = module.get_definition_by_ident(&self.heap, proc_name)?; + let definition = &self.heap[definition_id]; + + // Make sure the procedure is not polymorphic + if !definition.poly_vars().is_empty() { + return None; + } + if !definition.is_procedure() { + return None; + } + + // Lookup in type table + let definition = definition.as_procedure(); + let type_parts = [ConcreteTypePart::Component(definition.this, 0)]; + let type_id = self.types.get_monomorph_type_id(&definition.this.upcast(), &type_parts) + .expect("type ID for non-polymorphic procedure"); + return Some((definition.this, type_id)); + } + fn lookup_module_root(&self, module_name: &[u8]) -> Option { for module in self.modules.iter() { match &module.name { diff --git a/src/protocol/parser/pass_definitions.rs b/src/protocol/parser/pass_definitions.rs index 74ceba8ad07b0c8073a3f030854c00d27b93894a..23c7b2ab8be45b30fdeb1ad82d654cbc0f803963 100644 --- a/src/protocol/parser/pass_definitions.rs +++ b/src/protocol/parser/pass_definitions.rs @@ -378,6 +378,7 @@ impl PassDefinitions { ("std.global", "print") => ProcedureSource::FuncPrint, ("std.random", "random_u32") => ProcedureSource::CompRandomU32, ("std.internet", "tcp_client") => ProcedureSource::CompTcpClient, + ("std.internet", "tcp_listener") => ProcedureSource::CompTcpListener, _ => panic!( "compiler error: unknown builtin procedure '{}' in module '{}'", procedure_name, module_name @@ -1680,6 +1681,7 @@ impl PassDefinitions { ProcedureSource::FuncPrint => Method::Print, ProcedureSource::CompRandomU32 => Method::ComponentRandomU32, ProcedureSource::CompTcpClient => Method::ComponentTcpClient, + ProcedureSource::CompTcpListener => Method::ComponentTcpListener, _ => todo!("other procedure sources"), }; diff --git a/src/protocol/parser/pass_validation_linking.rs b/src/protocol/parser/pass_validation_linking.rs index 79cd33511254851af711a18539f2a37dcd5c5aac..724ed5f0f244e47449660d821740d0d23faa2c78 100644 --- a/src/protocol/parser/pass_validation_linking.rs +++ b/src/protocol/parser/pass_validation_linking.rs @@ -1158,7 +1158,8 @@ impl Visitor for PassValidationLinking { | Method::SelectRegisterCasePort | Method::SelectWait => unreachable!(), // not usable by programmer directly Method::ComponentRandomU32 - | Method::ComponentTcpClient => { + | Method::ComponentTcpClient + | Method::ComponentTcpListener => { expecting_wrapping_new_stmt = true; }, Method::UserFunction => {} diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index b0dcb7df048c83f40ec96f54b1d9ab7b7e817eb9..05afe71f5e22a5443118a142d3c7f0b74c469540 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -241,6 +241,7 @@ pub(crate) fn create_component( let component: Box = match definition.source { ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)), ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)), + ProcedureSource::CompTcpListener => Box::new(ComponentTcpListener::new(arguments)), _ => unreachable!(), }; diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 5656f68fddc3c037ed64027f389f582fb509a08d..f57eb821a93f58b66d8c991e5add2d4b4bbf1540 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -8,8 +8,10 @@ use super::component::{self, *}; use super::control_layer::*; use super::consensus::*; + use std::io::ErrorKind as IoErrorKind; use std::net::{IpAddr, Ipv4Addr}; +use crate::protocol::{ProcedureDefinitionId, TypeId}; // ----------------------------------------------------------------------------- // ComponentTcpClient @@ -413,43 +415,32 @@ impl ListenerSocketState { } } +struct PendingComponent { + client: SocketTcpClient, + cmd_rx: PortId, + data_tx: PortId, +} + enum ListenerSyncState { AwaitingCmd, - AcceptCommandReceived, - AcceptChannelGenerated{ client: SocketTcpClient, cmd_rx: PortId, data_tx: PortId }, + AcceptCommandReceived, // just received `Accept` command + AcceptChannelGenerated, // created channel, waiting to end the sync round + AcceptGenerateComponent, // sync ended, back in non-sync, now generate component FinishSyncThenQuit, } -impl ListenerSyncState { - // Bit of a hacky solution: keeps the listener sync state intact, except - // if it is `AcceptChannelGenerated`, that will be replaced with - // `AwaitingCmd`. The reason is to move the `client` out. - fn take(&mut self) -> ListenerSyncState { - use ListenerSyncState::*; - - match self { - AwaitingCmd => return AwaitingCmd, - AcceptCommandReceived => return AcceptCommandReceived, - FinishSyncThenQuit => return FinishSyncThenQuit, - AcceptChannelGenerated{ .. } => { - let mut swapped = ListenerSyncState::AwaitingCmd; - std::mem::swap(self, &mut swapped); - return swapped; - } - } - } -} - pub struct ComponentTcpListener { // Properties for the tcp socket socket_state: ListenerSocketState, sync_state: ListenerSyncState, + pending_component: Option, poll_ticket: Option, inbox_main: InboxMain, inbox_backup: InboxBackup, pdl_input_port_id: PortId, // input port, receives commands pdl_output_port_id: PortId, // output port, sends connections - // Information about union tags + // Type information extracted from protocol + tcp_client_definition: (ProcedureDefinitionId, TypeId), input_union_accept_tag: i64, input_union_shutdown_tag: i64, output_struct_rx_index: usize, @@ -503,7 +494,9 @@ impl Component for ComponentTcpListener { fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { match message { - Message::Data(_message) => unreachable!(), + Message::Data(message) => { + self.handle_incoming_data_message(sched_ctx, comp_ctx, message); + }, Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); @@ -523,7 +516,7 @@ impl Component for ComponentTcpListener { } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { - sched_ctx.info(&format!("Running comonent ComponentTcpListener (mode: {:?})", self.exec_state.mode)); + sched_ctx.info(&format!("Running component ComponentTcpListener (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { CompMode::BlockedSelect @@ -536,21 +529,25 @@ impl Component for ComponentTcpListener { CompMode::NonSync => { match &self.socket_state { ListenerSocketState::Connected(_socket) => { - match self.sync_state.take() { + match self.sync_state { ListenerSyncState::AwaitingCmd => { component::default_handle_sync_start( &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus ); }, - ListenerSyncState::AcceptCommandReceived => unreachable!(), - ListenerSyncState::AcceptChannelGenerated{ client, cmd_rx, data_tx } => { + ListenerSyncState::AcceptCommandReceived | + ListenerSyncState::AcceptChannelGenerated => unreachable!(), + ListenerSyncState::AcceptGenerateComponent => { // Now that we're outside the sync round, create the tcp client // component - let socket_component: Box = Box::new(ComponentTcpClient::new_with_existing_connection(client, cmd_rx, data_tx)); + let pending = self.pending_component.take().unwrap(); + let socket_component: Box = Box::new(ComponentTcpClient::new_with_existing_connection( + pending.client, pending.cmd_rx, pending.data_tx + )); component::special_create_component( &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, &mut self.inbox_main, &mut self.inbox_backup, socket_component, - vec![cmd_rx, data_tx] + vec![pending.cmd_rx, pending.data_tx] ); self.sync_state = ListenerSyncState::AwaitingCmd; // superfluous, see ListenerSyncState.take() }, @@ -619,11 +616,11 @@ impl Component for ComponentTcpListener { // Construct the message containing the appropriate ports that will // be sent to the component commanding this listener. - let mut values = ValueGroup::new_stack(vec![ - Value::Unassigned, Value::Unassigned - ]); - values.values[self.output_struct_tx_index] = Value::Output(port_id_to_eval(cmd_channel.putter_id)); - values.values[self.output_struct_rx_index] = Value::Input(port_id_to_eval(data_channel.getter_id)); + let mut values = ValueGroup::new_stack(Vec::with_capacity(1)); + values.values.push(Value::Struct(0)); + values.regions.push(vec![Value::Unassigned, Value::Unassigned]); + values.regions[0][self.output_struct_tx_index] = Value::Output(port_id_to_eval(cmd_channel.putter_id)); + values.regions[0][self.output_struct_rx_index] = Value::Input(port_id_to_eval(data_channel.getter_id)); if let Err(location_and_message) = component::default_send_data_message( &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, values, sched_ctx, &mut self.consensus, &mut self.control, comp_ctx @@ -633,17 +630,15 @@ impl Component for ComponentTcpListener { ); } - // And finish the consensus round - component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); - - // Enter a state such that when we leave the consensus round and - // go back to the nonsync state, that we will actually create the - // tcp client component. - self.sync_state = ListenerSyncState::AcceptChannelGenerated { + // Prepare for finishing the consensus round, and once finished, + // create the tcp client component + self.sync_state = ListenerSyncState::AcceptChannelGenerated; + debug_assert!(self.pending_component.is_none()); + self.pending_component = Some(PendingComponent{ client, cmd_rx: cmd_channel.getter_id, data_tx: data_channel.putter_id - }; + }); return CompScheduling::Requeue; }, @@ -656,11 +651,16 @@ impl Component for ComponentTcpListener { } } }, - ListenerSyncState::AcceptChannelGenerated{ .. } => unreachable!(), - ListenerSyncState::FinishSyncThenQuit => { + ListenerSyncState::AcceptChannelGenerated => { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + self.sync_state = ListenerSyncState::AcceptGenerateComponent; return CompScheduling::Requeue; } + ListenerSyncState::FinishSyncThenQuit => { + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + return CompScheduling::Requeue; + }, + ListenerSyncState::AcceptGenerateComponent => unreachable!(), } }, CompMode::BlockedGet => { @@ -695,6 +695,7 @@ impl ComponentTcpListener { return Self { socket_state: ListenerSocketState::Connected(socket.unwrap()), sync_state: ListenerSyncState::AwaitingCmd, + pending_component: None, poll_ticket: None, inbox_main: vec![None, None], inbox_backup: InboxBackup::new(), @@ -709,6 +710,21 @@ impl ComponentTcpListener { consensus: Consensus::new(), } } + + fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + if self.exec_state.mode.is_in_sync_block() { + self.consensus.handle_incoming_data_message(comp_ctx, &message); + } + + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => {}, + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); + } + } + } } fn ip_addr_and_port_from_args( diff --git a/src/runtime2/tests/internet.rs b/src/runtime2/tests/internet.rs index 246726d62cd982b04b85ee5f814e15aff5203926..4859ca49ef963f54db2e9dbf0140f0707c4fe145 100644 --- a/src/runtime2/tests/internet.rs +++ b/src/runtime2/tests/internet.rs @@ -71,4 +71,108 @@ fn test_stdlib_file() { new fake_client(connection); } ", "constructor", no_args()); +} + +#[test] +fn test_tcp_listener_and_client() { + compile_and_create_component(" + import std.internet::*; + + func listen_port() -> u16 { + return 2393; + } + + comp server(u32 num_connections) { + // Start tcp listener + channel listen_cmd_tx -> listen_cmd_rx; + channel listen_conn_tx -> listen_conn_rx; + new tcp_listener({}, listen_port(), listen_cmd_rx, listen_conn_tx); + + // Fake channels such that we can create a dummy connection variable + channel client_cmd_tx -> unused_client_cmd_rx; + channel unused_client_data_tx -> client_data_rx; + auto new_connection = TcpConnection{ + tx: client_cmd_tx, + rx: client_data_rx, + }; + + auto connection_counter = 0; + while (connection_counter < num_connections) { + // Wait until we get a connection + print(\"server: waiting for an accepted connection\"); + sync { + put(listen_cmd_tx, ListenerCmd::Accept); + new_connection = get(listen_conn_rx); + } + + // We have a new connection, spawn an 'echoer' for it + print(\"server: spawning an echo'ing component\"); + new echo_machine(new_connection); + connection_counter += 1; + } + + // Shut down the listener + print(\"server: shutting down listener\"); + } + + // Waits for a single TCP byte (to simplify potentially having to + // concatenate requests) and echos it + comp echo_machine(TcpConnection conn) { + auto data_to_echo = {}; + + // Wait for a message + sync { + print(\"echo: receiving data\"); + put(conn.tx, ClientCmd::Receive); + data_to_echo = get(conn.rx); + put(conn.tx, ClientCmd::Finish); + } + + // Echo the message + print(\"echo: sending back data\"); + sync put(conn.tx, ClientCmd::Send(data_to_echo)); + + // Ask the tcp connection to shut down + print(\"echo: shutting down\"); + sync put(conn.tx, ClientCmd::Shutdown); + } + + comp echo_requester(u8 byte_to_send) { + channel cmd_tx -> cmd_rx; + channel data_tx -> data_rx; + new tcp_client({127, 0, 0, 1}, listen_port(), cmd_rx, data_tx); + + // Send the message + print(\"requester: sending bytes\"); + sync put(cmd_tx, ClientCmd::Send({ byte_to_send })); + + // Receive the echo'd byte + auto received_byte = byte_to_send + 1; + sync { + print(\"requester: receiving echo response\"); + put(cmd_tx, ClientCmd::Receive); + received_byte = get(data_rx)[0]; + put(cmd_tx, ClientCmd::Finish); + } + + // Silly check, as always + while (byte_to_send != received_byte) { + print(\"requester: Oh no! The echo is an otherworldly distorter\"); + } + + // Shut down the TCP connection + print(\"requester: shutting down TCP component\"); + sync put(cmd_tx, ClientCmd::Shutdown); + } + + comp constructor() { + auto num_connections = 1; + new server(num_connections); + + auto connection_index = 0; + while (connection_index < num_connections) { + new echo_requester(cast(connection_index)); + } + } + ", "constructor", no_args()); } \ No newline at end of file diff --git a/std/std.internet.pdl b/std/std.internet.pdl index 39a900e563f33ffb9fe2e72d1ab6b94bef02b5e9..6d5d355696eba8591b4f7fb5d8ce21971eccc4a9 100644 --- a/std/std.internet.pdl +++ b/std/std.internet.pdl @@ -21,6 +21,6 @@ struct TcpConnection { in rx, } -/* comp tcp_listener(u8[] ip, u16 port, in cmds, out rx) { +comp tcp_listener(u8[] ip, u16 port, in cmds, out rx) { #builtin -} */ \ No newline at end of file +} \ No newline at end of file