diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 135b41875ecc54b9a89055b13c4eae4e40856b2c..506699b8b80d90f0aa92e3164f83e04d1d606777 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -5,7 +5,7 @@ use crate::runtime2::communication::*; use super::{CompCtx, CompPDL}; use super::component_context::*; -use super::component_ip::*; +use super::component_random::*; use super::control_layer::*; use super::consensus::*; @@ -266,6 +266,31 @@ pub(crate) fn default_handle_busy_exit( } } +/// Handles a potential synchronous round decision. If there was a decision then +/// the `Some(success)` value indicates whether the round succeeded or not. +pub(crate) fn default_handle_sync_decision( + exec_state: &mut CompExecState, decision: SyncRoundDecision, + consensus: &mut Consensus +) -> Option { + debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); + let success = match decision { + SyncRoundDecision::None => return None, + SyncRoundDecision::Solution => true, + SyncRoundDecision::Failure => false, + }; + + debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); + if success { + exec_state.mode = CompMode::NonSync; + consensus.notify_sync_decision(decision); + return Some(true); + } else { + exec_state.mode = CompMode::StartExit; + return Some(false); + } +} + + #[inline] pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { debug_assert_eq!(_exec_state.mode, CompMode::Exit); @@ -357,7 +382,6 @@ fn default_handle_unblock_put( } } - #[inline] pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId { return PortId(port_id.id); diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs new file mode 100644 index 0000000000000000000000000000000000000000..89642f966501369bc59216c2aea94d3a4f2a8d7a --- /dev/null +++ b/src/runtime2/component/component_internet.rs @@ -0,0 +1,151 @@ +use crate::protocol::eval::{ValueGroup, Value, EvalError}; +use crate::runtime2::*; +use crate::runtime2::component::CompCtx; +use crate::runtime2::stdlib::internet::*; + +use super::component::{self, *}; +use super::control_layer::*; +use super::consensus::*; + +enum SocketState { + Connected(SocketTcpClient), + Error, +} + +enum SyncState { + Getting, + Putting +} + +pub struct ComponentTcpClient { + // Properties for the tcp socket + socket_state: SocketState, + pending_recv: Vec, // on the input port + pdl_input_port_id: PortId, // input from PDL, so transmitted over socket + pdl_output_port_id: PortId, // output towards PDL, so received over socket + // Generic component state + exec_state: CompExecState, + control: ControlLayer, + consensus: Consensus, +} + +impl Component for ComponentTcpClient { + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { + self.handle_incoming_data_message(message); + } + + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + match mesage { + Message::Data(message) => { + self.handle_incoming_data_message(message); + }, + Message::Sync(message) => { + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + }, + Message::Control(message) => { + component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx + ); + } + } + } + + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode)); + + match self.exec_state.mode { + CompMode::BlockedGet | CompMode::BlockedSelect => { + // impossible for this component. We always accept the input + // values, and we never perform an explicit select. + unreachable!(); + }, + CompMode::NonSync => { + // When in non-sync mode + match &mut self.socket_state { + SocketState::Connected(socket) => { + if self.pending_tx + }, + SocketState::Error => { + self.exec_state.mode = CompMode::StartExit; + return Ok(CompScheduling::Immediate); + } + } + }, + CompMode::Sync => { + + }, + CompMode::SyncEnd | CompMode::BlockedPut => + return Ok(CompScheduling::Sleep), + CompMode::StartExit => + return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)), + CompMode::BusyExit => + return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)), + CompMode::Exit => + return Ok(component::default_handle_exit(&self.exec_state)), + } + + return Ok(CompScheduling::Immediate); + } +} + +impl ComponentTcpClient { + pub(crate) fn new(arguments: ValueGroup) -> Self { + use std::net::{IpAddr, Ipv4Addr}; + + debug_assert_eq!(arguments.values.len(), 4); + + // Parsing arguments + let ip_heap_pos = arguments.values[0].as_array(); + let ip_elements = &arguments.regions[ip_heap_pos as usize]; + if ip_elements.len() != 4 { + todo!("friendly error reporting: ip contains 4 octects"); + } + let ip_address = IpAddr::V4(Ipv4Addr::new( + ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), + ip_elements[2].as_uint8(), ip_elements[3].as_uint8() + )); + + let port = arguments.values[1].as_uint16(); + let input_port = component::port_id_from_eval(arguments.values[2].as_input()); + let output_port = component::port_id_from_eval(arguments.values[3].as_output()); + + let socket = SocketTcpClient::new(ip_address, port); + if let Err(socket) = socket { + todo!("friendly error reporting: failed to open socket {:?}", socket); + } + + return Self{ + socket_state: SocketState::Connected(socket.unwrap()), + pending_tx: Vec::new(), + pdl_input_port_id: input_port, + pdl_output_port_id: output_port, + exec_state: CompExecState::new(), + control: ControlLayer::default(), + consensus: Consensus::new(), + } + } + + // Handles incoming data from the PDL side (hence, going into the socket) + fn handle_incoming_data_message(&mut self, message: DataMessage) { + // Input message is an array of bytes (u8) + self.pending_recv.push(message); + + } + + fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec) { + debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id); + debug_assert_eq!(message.content.values.len(), 1); + + if let Value::Array(array_pos) = message.content.values[0] { + let region = &message.content.regions[array_pos as usize]; + bytes.reserve(region.len()); + for value in region { + bytes.push(value.as_uint8()); + } + } else { + unreachable!(); + } + } +} \ No newline at end of file diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index c48b57a00d32408e51cf6dabd5eff7600a550d0c..fcd9a8fe33a342cbaed4babdfef2d85ad7512f25 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -235,7 +235,7 @@ impl Component for CompPDL { fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { sched_ctx.log(&format!("handling message: {:#?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { - let mut target = sched_ctx.runtime.get_component_public(new_target); + let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(_should_remove.is_none()); diff --git a/src/runtime2/component/component_ip.rs b/src/runtime2/component/component_random.rs similarity index 90% rename from src/runtime2/component/component_ip.rs rename to src/runtime2/component/component_random.rs index 459110cf79312dbd28e6873288e07476d2bd83fc..56ffe495972a8f55b68e4927b38b07a945fa01cc 100644 --- a/src/runtime2/component/component_ip.rs +++ b/src/runtime2/component/component_random.rs @@ -38,7 +38,7 @@ impl Component for ComponentRandomU32 { Message::Data(_message) => unreachable!(), Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - self.handle_sync_decision(sched_ctx, comp_ctx, decision); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); }, Message::Control(message) => { component::default_handle_control_message( @@ -120,7 +120,7 @@ impl Component for ComponentRandomU32 { sched_ctx.log("Waiting for consensus"); self.exec_state.mode = CompMode::SyncEnd; let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); - self.handle_sync_decision(sched_ctx, comp_ctx, decision); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); return Ok(CompScheduling::Requeue); } }, @@ -158,20 +158,4 @@ impl ComponentRandomU32 { consensus: Consensus::new(), } } - - fn handle_sync_decision(&mut self, _sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { - let success = match decision { - SyncRoundDecision::None => return, - SyncRoundDecision::Solution => true, - SyncRoundDecision::Failure => false, - }; - - debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); - if success { - self.exec_state.mode = CompMode::NonSync; - self.consensus.notify_sync_decision(decision); - } else { - self.exec_state.mode = CompMode::StartExit; - } - } } \ No newline at end of file diff --git a/src/runtime2/component/mod.rs b/src/runtime2/component/mod.rs index 71745de1d588b5c2479bd851d1c6e6ef99f05f24..e0c03a618ed4196164f14d4721d4cd67ac5da257 100644 --- a/src/runtime2/component/mod.rs +++ b/src/runtime2/component/mod.rs @@ -3,7 +3,8 @@ mod component_context; mod control_layer; mod consensus; mod component; -mod component_ip; +mod component_random; +mod component_internet; pub(crate) use component::{Component, CompScheduling}; pub(crate) use component_pdl::{CompPDL}; diff --git a/src/runtime2/error.rs b/src/runtime2/error.rs index eb765640db1b8aed849b581f5b402c03c20e6105..4982a3230d43bbb8dcfe4f2fc31913b84b98a46b 100644 --- a/src/runtime2/error.rs +++ b/src/runtime2/error.rs @@ -28,14 +28,14 @@ impl Display for RtError { fn fmt(&self, f: &mut FmtFormatter<'_>) -> FmtResult { let mut error = self; loop { - write!(f, "[{}:{}] {}", self.file, self.line, self.message).unwrap(); + write!(f, "[{}:{}] {}", self.file, self.line, self.message)?; match &error.cause { Some(cause) => { - writeln!(f, " ..."); + writeln!(f, " ...")?; error = cause.as_ref() }, None => { - writeln!(f).unwrap(); + writeln!(f)?; }, } } diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs index e6589f21270927a7bf73f25f0360dfac2995632d..9337ff6f957b0d4e6e92f0b79505b274be530583 100644 --- a/src/runtime2/poll/mod.rs +++ b/src/runtime2/poll/mod.rs @@ -122,7 +122,7 @@ fn syscall_result(result: c_int) -> io::Result { #[cfg(not(unix))] struct Poller { - + // Not implemented for OS's other than unix } // ----------------------------------------------------------------------------- @@ -135,61 +135,42 @@ enum PollCmd { Shutdown, } -/// Represents the data needed to build interfaces to the polling thread (which -/// should happen first) and to create the polling thread itself. -pub(crate) struct PollingThreadBuilder { +pub struct PollingThread { poller: Arc, - generation_counter: Arc, - queue: QueueDynMpsc, runtime: Arc, + queue: QueueDynMpsc, logging_enabled: bool, } -impl PollingThreadBuilder { - pub(crate) fn new(runtime: Arc, logging_enabled: bool) -> Result { +impl PollingThread { + pub(crate) fn new(runtime: Arc, logging_enabled: bool) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> { let poller = Poller::new() .map_err(|e| rt_error!("failed to create poller, because: {}", e))?; - - return Ok(PollingThreadBuilder { - poller: Arc::new(poller), - generation_counter: Arc::new(AtomicU32::new(0)), - queue: QueueDynMpsc::new(64), - runtime, + let poller = Arc::new(poller); + let queue = QueueDynMpsc::new(64); + let queue_producers = queue.producer_factory(); + + let mut thread_data = PollingThread{ + poller: poller.clone(), + runtime: runtime.clone(), + queue, logging_enabled, - }) - } + }; + let thread_handle = thread::spawn(move || { thread_data.run() }); - pub(crate) fn client(&self) -> PollingClient { - return PollingClient{ - poller: self.poller.clone(), - generation_counter: self.generation_counter.clone(), - queue: self.queue.producer(), - } - } + let thread_handle = PollingThreadHandle{ + queue: Some(queue_producers.producer()), + handle: Some(thread_handle), + }; + let client_factory = PollingClientFactory{ + poller, + generation_counter: Arc::new(AtomicU32::new(0)), + queue_factory: queue_producers, + }; - pub(crate) fn into_thread(self) -> (PollingThread, PollingThreadDestroyer) { - let destroyer = self.queue.producer(); - - return ( - PollingThread{ - poller: self.poller, - runtime: self.runtime, - queue: self.queue, - logging_enabled: self.logging_enabled, - }, - PollingThreadDestroyer::new(destroyer) - ); + return Ok((thread_handle, client_factory)); } -} -pub(crate) struct PollingThread { - poller: Arc, - runtime: Arc, - queue: QueueDynMpsc, - logging_enabled: bool, -} - -impl PollingThread { pub(crate) fn run(&mut self) { use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::Message; @@ -244,7 +225,8 @@ impl PollingThread { for event in events.drain(..) { let key = event.u64; if let Some(handle) = lookup.get(&key) { - self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), event.events)); + let events = event.events; + self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), events)); handle.send_message(&self.runtime, Message::Poll, true); } } @@ -264,23 +246,40 @@ impl PollingThread { } // bit convoluted, but it works -pub(crate) struct PollingThreadDestroyer { - queue: Option>, +pub(crate) struct PollingThreadHandle { + // requires Option, because: + queue: Option>, // destructor needs to be called + handle: Option>, // we need to call `join` } -impl PollingThreadDestroyer { - fn new(queue: QueueDynProducer) -> Self { - return Self{ queue: Some(queue) }; - } - - pub(crate) fn initiate_destruction(&mut self) { +impl PollingThreadHandle { + pub(crate) fn shutdown(&mut self) -> thread::Result<()> { + debug_assert!(self.handle.is_some(), "polling thread already destroyed"); self.queue.take().unwrap().push(PollCmd::Shutdown); + return self.handle.take().unwrap().join(); } } -impl Drop for PollingThreadDestroyer { +impl Drop for PollingThreadHandle { fn drop(&mut self) { - debug_assert!(self.queue.is_none()); + debug_assert!(self.queue.is_none() && self.handle.is_none()); + } +} + +// oh my god, now I'm writing factory objects. I'm not feeling too well +pub(crate) struct PollingClientFactory { + poller: Arc, + generation_counter: Arc, + queue_factory: QueueDynProducerFactory, +} + +impl PollingClientFactory { + pub(crate) fn client(&self) -> PollingClient { + return PollingClient{ + poller: self.poller.clone(), + generation_counter: self.generation_counter.clone(), + queue: self.queue_factory.producer(), + }; } } diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index fb8df772666e3c76b344d84977f4ceeeef71cf2e..2c68998ea2f78728e3938f6aee435a313405c14c 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -4,7 +4,7 @@ use std::thread; use std::collections::VecDeque; use crate::protocol::*; -use crate::runtime2::poll::{PollingThreadBuilder, PollingThreadDestroyer}; +use crate::runtime2::poll::{PollingThread, PollingThreadHandle}; use crate::runtime2::RtError; use super::communication::Message; @@ -162,8 +162,7 @@ impl Drop for CompHandle { pub struct Runtime { pub(crate) inner: Arc, scheduler_threads: Vec>, - polling_destroyer: PollingThreadDestroyer, - polling_thread: Option>, + polling_handle: PollingThreadHandle, } impl Runtime { @@ -179,8 +178,8 @@ impl Runtime { work_condvar: Condvar::new(), active_elements: AtomicU32::new(1), }); - let polling_builder = rt_error_try!( - PollingThreadBuilder::new(runtime_inner.clone(), debug_logging), + let (polling_handle, polling_clients) = rt_error_try!( + PollingThread::new(runtime_inner.clone(), debug_logging), "failed to build polling thread" ); @@ -188,7 +187,7 @@ impl Runtime { for thread_index in 0..num_threads { let mut scheduler = Scheduler::new( - runtime_inner.clone(), polling_builder.client(), + runtime_inner.clone(), polling_clients.client(), thread_index, debug_logging ); let thread_handle = thread::spawn(move || { @@ -198,16 +197,10 @@ impl Runtime { scheduler_threads.push(thread_handle); } - let (mut poller, polling_destroyer) = polling_builder.into_thread(); - let polling_thread = thread::spawn(move || { - poller.run(); - }); - return Ok(Runtime{ inner: runtime_inner, scheduler_threads, - polling_destroyer, - polling_thread: Some(polling_thread), + polling_handle, }); } @@ -234,8 +227,7 @@ impl Drop for Runtime { handle.join().expect("join scheduler thread"); } - self.polling_destroyer.initiate_destruction(); - self.polling_thread.take().unwrap().join().expect("join polling thread"); + self.polling_handle.shutdown().expect("shutdown polling thread"); } } diff --git a/src/runtime2/stdlib/internet.rs b/src/runtime2/stdlib/internet.rs index 7e5c0518b39a19c71977791222bff3dd54d9dbef..94a37c19530842200f7cc6a7c7119372c035fd01 100644 --- a/src/runtime2/stdlib/internet.rs +++ b/src/runtime2/stdlib/internet.rs @@ -1,5 +1,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::mem::size_of; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use libc::{ c_int, @@ -62,7 +63,7 @@ impl SocketTcpClient { /// Receives data from the TCP socket. Returns the number of bytes received. /// More bytes may be present even thought `used < buffer.len()`. - pub fn receive(&self, buffer: &mut [u8]) -> Result { + pub fn receive(&self, buffer: &mut [u8]) -> Result { if self.is_blocking { return self.receive_blocking(buffer); } else { @@ -71,20 +72,20 @@ impl SocketTcpClient { } #[inline] - fn receive_blocking(&self, buffer: &mut [u8]) -> Result { + fn receive_blocking(&self, buffer: &mut [u8]) -> Result { let result = unsafe { let message_pointer = buffer.as_mut_ptr().cast(); libc::recv(self.socket_handle, message_pointer, buffer.len(), 0) }; if result < 0 { - return Err(()); + return Err(IoError::last_os_error()); } return Ok(result as usize); } #[inline] - fn receive_nonblocking(&self, buffer: &mut [u8]) -> Result { + fn receive_nonblocking(&self, buffer: &mut [u8]) -> Result { unsafe { let mut message_pointer = buffer.as_mut_ptr().cast(); let mut remaining = buffer.len(); @@ -94,11 +95,11 @@ impl SocketTcpClient { let result = libc::recv(self.socket_handle, message_pointer, remaining, 0); if result < 0 { // Check reason - let errno = std::io::Error::last_os_error().raw_os_error().expect("os error after failed recv"); - if errno == libc::EWOULDBLOCK || errno == libc::EAGAIN { + let os_error = IoError::last_os_error(); + if os_error.kind() == IoErrorKind::WouldBlock { return Ok(buffer.len() - remaining); } else { - return Err(()); + return Err(os_error); } } diff --git a/src/runtime2/stdlib/mod.rs b/src/runtime2/stdlib/mod.rs index ee9cc7dbb4dce4e1c088c53c5f86c68ec97c23d3..e7dbc18c8a152be6852af5f9b3f5c5bf8741dc53 100644 --- a/src/runtime2/stdlib/mod.rs +++ b/src/runtime2/stdlib/mod.rs @@ -1 +1 @@ -#[cfg(feature="internet")] mod internet; \ No newline at end of file +#[cfg(feature="internet")] pub(crate) mod internet; \ No newline at end of file diff --git a/src/runtime2/store/queue_mpsc.rs b/src/runtime2/store/queue_mpsc.rs index fa7b5a388a5bd855f597fc60bf94f6aea3d67a05..308ac0a09d12b3025fd57230e269e60a770a9e18 100644 --- a/src/runtime2/store/queue_mpsc.rs +++ b/src/runtime2/store/queue_mpsc.rs @@ -5,7 +5,9 @@ use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard}; /// Multiple-producer single-consumer queue. Generally used in the publicly /// accessible fields of a component. The holder of this struct should be the -/// consumer. To retrieve access to the producer-side: call `producer()`. +/// consumer. To retrieve access to the producer-side: call `producer()`. In +/// case the queue is moved before one can call `producer()`, call +/// `producer_factory()`. This incurs a bit more overhead. /// /// This is a queue that will resize (indefinitely) if it becomes full, and will /// not shrink. So probably a temporary thing. @@ -75,7 +77,12 @@ impl QueueDynMpsc { #[inline] pub fn producer(&self) -> QueueDynProducer { - return QueueDynProducer::new(self); + return QueueDynProducer::new(self.inner.as_ref()); + } + + #[inline] + pub fn producer_factory(&self) -> QueueDynProducerFactory { + return QueueDynProducerFactory::new(self.inner.as_ref()); } /// Return `true` if a subsequent call to `pop` will return a value. Note @@ -144,18 +151,11 @@ pub struct QueueDynProducer { } impl QueueDynProducer { - fn new(consumer: &QueueDynMpsc) -> Self { - dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel)); - unsafe { - // If you only knew the power of the dark side! Obi-Wan never told - // you what happened to your father! - let queue = consumer.inner.as_ref() as *const _; - return Self{ queue }; - } + fn new(queue: &Shared) -> Self { + dbg_code!(queue.dbg.fetch_add(1, Ordering::AcqRel)); + return Self{ queue: queue as *const _ }; } - - pub fn push(&self, value: T) { let queue = unsafe{ &*self.queue }; @@ -270,9 +270,7 @@ impl Drop for QueueDynProducer { // producer end is `Send`, because in debug mode we make sure that there are no // more producers when the queue is destroyed. But is not sync, because that -// would circumvent our atomic counter shenanigans. Although, now that I think -// about it, we're rather likely to just drop a single "producer" into the -// public part of a component. +// would circumvent our atomic counter shenanigans. unsafe impl Send for QueueDynProducer{} #[inline] @@ -280,6 +278,30 @@ fn assert_correct_capacity(capacity: usize) { assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2); } +pub struct QueueDynProducerFactory { + queue: *const Shared +} + +impl QueueDynProducerFactory { + fn new(queue: &Shared) -> Self { + dbg_code!(queue.dbg.fetch_add(1, Ordering::AcqRel)); + return Self{ queue: queue as *const _ }; + } + + pub fn producer(&self) -> QueueDynProducer { + return QueueDynProducer::new(unsafe{ &*self.queue }); + } +} + +impl Drop for QueueDynProducerFactory { + fn drop(&mut self) { + dbg_code!({ + let queue = unsafe{ &*self.queue }; + queue.dbg.fetch_sub(1, Ordering::AcqRel); + }); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/std/std.internet.pdl b/std/std.internet.pdl index a60953f775bc7d181acf4e14b4a8ee8ceef3236a..2c7fc0601d3cb7ef441d6919876e6ee4d537d097 100644 --- a/std/std.internet.pdl +++ b/std/std.internet.pdl @@ -1,2 +1,5 @@ #module std.internet +primitive tcp_client(u8[] ip, u16 port, in tx, out rx) { + #builtin +} \ No newline at end of file