diff --git a/src/protocol/ast.rs b/src/protocol/ast.rs index 11be68c6ee6f77771ad26070ee6019a0be8c6bbb..87ea2bb9e079afc72d907ac79c37d0a0b9b90ad2 100644 --- a/src/protocol/ast.rs +++ b/src/protocol/ast.rs @@ -1086,6 +1086,7 @@ pub enum ProcedureSource { FuncSelectWait, // Builtin components, available to user CompRandomU32, // TODO: Remove, temporary thing + CompTcpClient, } impl ProcedureSource { @@ -1852,6 +1853,7 @@ pub enum Method { SelectWait, // SelectWait() -> u32 // Builtin component, ComponentRandomU32, + ComponentTcpClient, // User-defined UserFunction, UserComponent, @@ -1862,7 +1864,7 @@ impl Method { use Method::*; match self { Get | Put | Fires | Create | Length | Assert | Print => true, - ComponentRandomU32 => true, + ComponentRandomU32 | ComponentTcpClient => true, _ => false, } } diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index ee304a55bd6a62074c4d83466a944773f3480ab6..55ee38824b1f59d398d9a2d3873e956cdf4c8c18 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -755,7 +755,7 @@ impl Prompt { }, } }, - Method::ComponentRandomU32 => { + Method::ComponentRandomU32 | Method::ComponentTcpClient => { debug_assert_eq!(heap[expr.procedure].parameters.len(), cur_frame.expr_values.len()); debug_assert_eq!(heap[cur_frame.position].as_new().expression, expr.this); }, diff --git a/src/protocol/parser/mod.rs b/src/protocol/parser/mod.rs index 3d4bcba9ff7d17d06a067db4fd631bfc28b8e594..1d2eee5bc1b2683818d37d490b6d745636b034f2 100644 --- a/src/protocol/parser/mod.rs +++ b/src/protocol/parser/mod.rs @@ -298,8 +298,9 @@ impl Parser { use std::fs; // Pair is (name, add_to_global_namespace) - const FILES: [(&'static str, bool); 2] = [ + const FILES: [(&'static str, bool); 3] = [ ("std.global.pdl", true), + ("std.internet.pdl", false), ("std.random.pdl", false), ]; diff --git a/src/protocol/parser/pass_definitions.rs b/src/protocol/parser/pass_definitions.rs index 17b71cecd9f6981c66bf39a4775d6a55a9f83c6d..e8c8b43705f8d4329dee6d197286f3467dec7a90 100644 --- a/src/protocol/parser/pass_definitions.rs +++ b/src/protocol/parser/pass_definitions.rs @@ -377,6 +377,7 @@ impl PassDefinitions { ("std.global", "assert") => ProcedureSource::FuncAssert, ("std.global", "print") => ProcedureSource::FuncPrint, ("std.random", "random_u32") => ProcedureSource::CompRandomU32, + ("std.internet", "tcp_client") => ProcedureSource::CompTcpClient, _ => panic!( "compiler error: unknown builtin procedure '{}' in module '{}'", procedure_name, module_name @@ -1658,6 +1659,7 @@ impl PassDefinitions { ProcedureSource::FuncAssert => Method::Assert, ProcedureSource::FuncPrint => Method::Print, ProcedureSource::CompRandomU32 => Method::ComponentRandomU32, + ProcedureSource::CompTcpClient => Method::ComponentTcpClient, _ => todo!("other procedure sources"), }; diff --git a/src/protocol/parser/pass_validation_linking.rs b/src/protocol/parser/pass_validation_linking.rs index 1a2b63139abc5a5e5174f9a7b0825ca370d506c7..5456872357ce071f7b357bd86b4a1440af4d63c9 100644 --- a/src/protocol/parser/pass_validation_linking.rs +++ b/src/protocol/parser/pass_validation_linking.rs @@ -1160,7 +1160,8 @@ impl Visitor for PassValidationLinking { Method::SelectStart | Method::SelectRegisterCasePort | Method::SelectWait => unreachable!(), // not usable by programmer directly - Method::ComponentRandomU32 => { + Method::ComponentRandomU32 + | Method::ComponentTcpClient => { expecting_wrapping_new_stmt = true; }, Method::UserFunction => {} diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index a18a96f7a77b3b69cd72e287478626649fb3b423..828421de9704ad6aa50d13a770b5ba0fe6984cbd 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -3,9 +3,10 @@ use crate::protocol::*; use crate::runtime2::*; use crate::runtime2::communication::*; -use super::{CompCtx, CompPDL}; +use super::{CompCtx, CompPDL, CompId}; use super::component_context::*; use super::component_random::*; +use super::component_internet::*; use super::control_layer::*; use super::consensus::*; @@ -19,7 +20,7 @@ pub enum CompScheduling { /// Generic representation of a component (as viewed by a scheduler). pub(crate) trait Component { /// Called upon the creation of the component. - fn on_creation(&mut self, sched_ctx: &SchedulerCtx); + fn on_creation(&mut self, comp_id: CompId, sched_ctx: &SchedulerCtx); /// Called if the component is created by another component and the messages /// are being transferred between the two. @@ -116,8 +117,9 @@ pub(crate) fn create_component( if definition.source.is_builtin() { // Builtin component - let component = match definition.source { + let component: Box = match definition.source { ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)), + ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)), _ => unreachable!(), }; diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 2873ce68b0775c786311f1ab12314c9fcb512d95..710de32baa0fc6e0f1c11e39b7b6bf67bf87b200 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -1,7 +1,8 @@ use crate::protocol::eval::{ValueGroup, Value, EvalError}; use crate::runtime2::*; -use crate::runtime2::component::CompCtx; +use crate::runtime2::component::{CompCtx, CompId}; use crate::runtime2::stdlib::internet::*; +use crate::runtime2::poll::*; use super::component::{self, *}; use super::control_layer::*; @@ -38,6 +39,7 @@ pub struct ComponentTcpClient { // Properties for the tcp socket socket_state: SocketState, sync_state: SyncState, + poll_ticket: Option, inbox_main: Option, inbox_backup: Vec, pdl_input_port_id: PortId, // input from PDL, so transmitted over socket @@ -45,6 +47,7 @@ pub struct ComponentTcpClient { input_union_send_tag_value: i64, input_union_receive_tag_value: i64, input_union_finish_tag_value: i64, + input_union_shutdown_tag_value: i64, // Generic component state exec_state: CompExecState, control: ControlLayer, @@ -54,7 +57,8 @@ pub struct ComponentTcpClient { } impl Component for ComponentTcpClient { - fn on_creation(&mut self, sched_ctx: &SchedulerCtx) { + fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) { + // Retrieve type information for messages we're going to receive let pd = &sched_ctx.runtime.protocol; let cmd_type = pd.find_type(b"std.internet", b"Cmd") .expect("'Cmd' type in the 'std.internet' module"); @@ -64,6 +68,17 @@ impl Component for ComponentTcpClient { self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap(); self.input_union_receive_tag_value = cmd_type.get_variant_tag_value(b"Receive").unwrap(); self.input_union_finish_tag_value = cmd_type.get_variant_tag_value(b"Finish").unwrap(); + self.input_union_shutdown_tag_value = cmd_type.get_variant_tag_value(b"Shutdown").unwrap(); + + // Register socket for async events + if let SocketState::Connected(socket) = &self.socket_state { + let self_handle = sched_ctx.runtime.get_component_public(id); + let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, true) + .expect("registering tcp component"); + + debug_assert!(self.poll_ticket.is_none()); + self.poll_ticket = Some(poll_ticket); + } } fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { @@ -150,6 +165,9 @@ impl Component for ComponentTcpClient { // Component requires us to end the sync round let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + } else if tag_value == self.input_union_shutdown_tag_value { + // Component wants to close the connection + todo!("implement clean shutdown, don't forget to unregister to poll ticket"); } } else { todo!("handle sync failure due to message deadlock"); @@ -268,11 +286,13 @@ impl ComponentTcpClient { return Self{ socket_state: SocketState::Connected(socket.unwrap()), sync_state: SyncState::AwaitingCmd, + poll_ticket: None, inbox_main: None, inbox_backup: Vec::new(), input_union_send_tag_value: -1, input_union_receive_tag_value: -1, input_union_finish_tag_value: -1, + input_union_shutdown_tag_value: -1, pdl_input_port_id: input_port, pdl_output_port_id: output_port, exec_state: CompExecState::new(), diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index a18aaab3c7e49842356fd65651692255b7743501..338d05bde302f39e2d796e59f12173102be2b09d 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -7,6 +7,7 @@ use crate::protocol::eval::{ EvalContinuation, EvalResult, EvalError }; +use crate::runtime2::runtime::CompId; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; @@ -222,7 +223,7 @@ pub(crate) struct CompPDL { } impl Component for CompPDL { - fn on_creation(&mut self, _sched_ctx: &SchedulerCtx) { + fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) { // Intentionally empty } @@ -591,6 +592,12 @@ impl CompPDL { // Handling ports // ------------------------------------------------------------------------- + /// Creates a new component and transfers ports. Because of the stepwise + /// process in which memory is allocated, ports are transferred, messages + /// are exchanged, component lifecycle methods are called, etc. This + /// function facilitates a lot of implicit assumptions (e.g. when the + /// `Component::on_creation` method is called, the component is already + /// registered at the runtime). fn create_component_and_transfer_ports( &mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, @@ -709,7 +716,7 @@ impl CompPDL { let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, ); - component.component.on_creation(sched_ctx); + component.component.on_creation(created_key.downgrade(), sched_ctx); // Now modify the creator's ports: remove every transferred port and // potentially remove the peer component. diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index 2bba8b26583bd3d39d59adfd39f1b83ebb2b3857..1e68695ba53e3d240857ae240639883b216ea05b 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -27,7 +27,7 @@ pub struct ComponentRandomU32 { } impl Component for ComponentRandomU32 { - fn on_creation(&mut self, _sched_ctx: &SchedulerCtx) { + fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) { } fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs index 9337ff6f957b0d4e6e92f0b79505b274be530583..75cf8e9a90c00b6f9d785db8c8ce75221c69f0a3 100644 --- a/src/runtime2/poll/mod.rs +++ b/src/runtime2/poll/mod.rs @@ -295,7 +295,7 @@ pub(crate) struct PollingClient { } impl PollingClient { - fn register(&self, entity: F, handle: CompHandle, read: bool, write: bool) -> Result { + pub(crate) fn register(&self, entity: &F, handle: CompHandle, read: bool, write: bool) -> Result { let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed); let user_data = user_data_for_component(handle.id().0, generation); self.queue.push(PollCmd::Register(handle, user_data)); @@ -307,7 +307,7 @@ impl PollingClient { return Ok(PollTicket(file_descriptor, user_data.0)); } - fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> { + pub(crate) fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> { let file_descriptor = ticket.0; let user_data = UserData(ticket.1); self.queue.push(PollCmd::Unregister(file_descriptor, user_data)); diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 2c68998ea2f78728e3938f6aee435a313405c14c..80865946a82159b688217f614d29ba7a73311f6c 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -306,12 +306,15 @@ impl RuntimeInner { return CompHandle::new(id, &component.public); } + /// Will remove a component and its memory from the runtime. May only be + /// called if the necessary conditions for destruction have been met. pub(crate) fn destroy_component(&self, key: CompKey) { dbg_code!({ let component = self.get_component(key); debug_assert!(component.exiting); debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0); }); + self.decrement_active_components(); self.components.destroy(key.0); } diff --git a/src/runtime2/stdlib/internet.rs b/src/runtime2/stdlib/internet.rs index bb418c8ffb046d5ff9546bc8164832d58bdf2cae..a3d150a9efea7e821564ec0521e37461918cb2dc 100644 --- a/src/runtime2/stdlib/internet.rs +++ b/src/runtime2/stdlib/internet.rs @@ -9,6 +9,8 @@ use libc::{ }; use mio::{event, Interest, Registry, Token}; +use crate::runtime2::poll::{AsFileDescriptor, FileDescriptor}; + #[derive(Debug)] pub enum SocketError { Opening, @@ -83,6 +85,12 @@ impl Drop for SocketTcpClient { } } +impl AsFileDescriptor for SocketTcpClient { + fn as_file_descriptor(&self) -> FileDescriptor { + return self.socket_handle; + } +} + /// Raw socket receiver. Essentially a listener that accepts a single connection struct SocketRawRx { listen_handle: c_int, @@ -309,28 +317,4 @@ fn socket_family_from_ip(ip: IpAddr) -> libc::c_int { #[inline] fn htons(port: u16) -> u16 { return port.to_be(); -} - -mod tests { - use std::net::*; - use super::*; - - // #[test] @nocommit Remove this - // fn test_inet_thingo() { - // const SIZE: usize = 1024; - // - // let s = SocketTcpClient::new(IpAddr::V4(Ipv4Addr::new(142, 250, 179, 163)), 80).expect("connect"); - // s.send(b"GET / HTTP/1.1\r\n\r\n").expect("sending"); - // let mut total = Vec::::new(); - // let mut buffer = [0; SIZE]; - // let mut received = SIZE; - // - // while received > 0 { - // received = s.receive(&mut buffer).expect("receiving"); - // println!("DEBUG: Received {} bytes", received); - // total.extend_from_slice(&buffer[..received]); - // } - // let as_str = String::from_utf8_lossy(total.as_slice()); - // println!("Yay! Got {} bytes:\n{}", as_str.len(), as_str); - // } } \ No newline at end of file diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index e9b3ec0cf3d9df25958d38610dc9bbc125527af9..b795bc4b6387425f1b393d2029784e0561618d45 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -246,4 +246,4 @@ fn test_random_u32_temporary_thingo() { ").expect("compilation"); let rt = Runtime::new(1, true, pd).unwrap(); create_component(&rt, "", "constructor", no_args()); -} \ No newline at end of file +} diff --git a/std/std.internet.pdl b/std/std.internet.pdl index fa94ba6f198e11b59fa2ad744d3904bed4d79fc6..0d8d360f6e30a20342ad847e8797d2ccda555dc3 100644 --- a/std/std.internet.pdl +++ b/std/std.internet.pdl @@ -4,6 +4,7 @@ union Cmd { Send(u8[]), Receive, Finish, + Shutdown, } primitive tcp_client(u8[] ip, u16 port, in cmds, out rx) {