diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 2873ce68b0775c786311f1ab12314c9fcb512d95..710de32baa0fc6e0f1c11e39b7b6bf67bf87b200 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -1,7 +1,8 @@ use crate::protocol::eval::{ValueGroup, Value, EvalError}; use crate::runtime2::*; -use crate::runtime2::component::CompCtx; +use crate::runtime2::component::{CompCtx, CompId}; use crate::runtime2::stdlib::internet::*; +use crate::runtime2::poll::*; use super::component::{self, *}; use super::control_layer::*; @@ -38,6 +39,7 @@ pub struct ComponentTcpClient { // Properties for the tcp socket socket_state: SocketState, sync_state: SyncState, + poll_ticket: Option, inbox_main: Option, inbox_backup: Vec, pdl_input_port_id: PortId, // input from PDL, so transmitted over socket @@ -45,6 +47,7 @@ pub struct ComponentTcpClient { input_union_send_tag_value: i64, input_union_receive_tag_value: i64, input_union_finish_tag_value: i64, + input_union_shutdown_tag_value: i64, // Generic component state exec_state: CompExecState, control: ControlLayer, @@ -54,7 +57,8 @@ pub struct ComponentTcpClient { } impl Component for ComponentTcpClient { - fn on_creation(&mut self, sched_ctx: &SchedulerCtx) { + fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) { + // Retrieve type information for messages we're going to receive let pd = &sched_ctx.runtime.protocol; let cmd_type = pd.find_type(b"std.internet", b"Cmd") .expect("'Cmd' type in the 'std.internet' module"); @@ -64,6 +68,17 @@ impl Component for ComponentTcpClient { self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap(); self.input_union_receive_tag_value = cmd_type.get_variant_tag_value(b"Receive").unwrap(); self.input_union_finish_tag_value = cmd_type.get_variant_tag_value(b"Finish").unwrap(); + self.input_union_shutdown_tag_value = cmd_type.get_variant_tag_value(b"Shutdown").unwrap(); + + // Register socket for async events + if let SocketState::Connected(socket) = &self.socket_state { + let self_handle = sched_ctx.runtime.get_component_public(id); + let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, true) + .expect("registering tcp component"); + + debug_assert!(self.poll_ticket.is_none()); + self.poll_ticket = Some(poll_ticket); + } } fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { @@ -150,6 +165,9 @@ impl Component for ComponentTcpClient { // Component requires us to end the sync round let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + } else if tag_value == self.input_union_shutdown_tag_value { + // Component wants to close the connection + todo!("implement clean shutdown, don't forget to unregister to poll ticket"); } } else { todo!("handle sync failure due to message deadlock"); @@ -268,11 +286,13 @@ impl ComponentTcpClient { return Self{ socket_state: SocketState::Connected(socket.unwrap()), sync_state: SyncState::AwaitingCmd, + poll_ticket: None, inbox_main: None, inbox_backup: Vec::new(), input_union_send_tag_value: -1, input_union_receive_tag_value: -1, input_union_finish_tag_value: -1, + input_union_shutdown_tag_value: -1, pdl_input_port_id: input_port, pdl_output_port_id: output_port, exec_state: CompExecState::new(),