diff --git a/src/protocol/ast.rs b/src/protocol/ast.rs index a0b2c384c1a9684c8d213a3c57621a4769c4200a..11be68c6ee6f77771ad26070ee6019a0be8c6bbb 100644 --- a/src/protocol/ast.rs +++ b/src/protocol/ast.rs @@ -242,7 +242,7 @@ pub struct Root { } impl Root { - pub fn get_definition_ident(&self, h: &Heap, id: &[u8]) -> Option { + pub fn get_definition_by_ident(&self, h: &Heap, id: &[u8]) -> Option { for &def in self.definitions.iter() { if h[def].identifier().value.as_bytes() == id { return Some(def); diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 2e46cc167c05753e9aee603c74506c57a3fd5c9b..febb0a5125eee904e53feefb132373f9e78e0c21 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -87,7 +87,7 @@ impl ProtocolDescription { let module_root = module_root.unwrap(); let root = &self.heap[module_root]; - let definition_id = root.get_definition_ident(&self.heap, identifier); + let definition_id = root.get_definition_by_ident(&self.heap, identifier); if definition_id.is_none() { return Err(ComponentCreationError::DefinitionDoesntExist); } @@ -108,7 +108,7 @@ impl ProtocolDescription { // - check number of arguments by retrieving the one instantiated // monomorph let concrete_type = ConcreteType{ parts: vec![ConcreteTypePart::Component(ast_definition.this, 0)] }; - let procedure_type_id = self.types.get_procedure_monomorph_type_id(&definition_id, &concrete_type.parts).unwrap(); + let procedure_type_id = self.types.get_monomorph_type_id(&definition_id, &concrete_type.parts).unwrap(); let procedure_monomorph_index = self.types.get_monomorph(procedure_type_id).variant.as_procedure().monomorph_index; let monomorph_info = &ast_definition.monomorphs[procedure_monomorph_index as usize]; if monomorph_info.argument_types.len() != arguments.values.len() { @@ -130,6 +130,36 @@ impl ProtocolDescription { return Ok(Prompt::new(&self.types, &self.heap, ast_definition.this, procedure_type_id, arguments)); } + /// A somewhat temporary method. Can be used by components to lookup type + /// definitions by their name (to have their implementation somewhat + /// resistant to changes in the standard library) + pub(crate) fn find_type(&self, module_name: &[u8], type_name: &[u8]) -> Option { + // Lookup type definition in module + let root_id = self.lookup_module_root(module_name)?; + let module = &self.heap[root_id]; + let definition_id = module.get_definition_by_ident(&self.heap, type_name)?; + let definition = &self.heap[definition_id]; + + // Make sure type is not polymorphic and is not a procedure + if !definition.poly_vars().is_empty() { + return None; + } + if definition.is_procedure() { + return None; + } + + // Lookup type in type table + let type_parts = [ConcreteTypePart::Instance(definition_id, 0)]; + let type_id = self.types.get_monomorph_type_id(&definition_id, &type_parts) + .expect("type ID for non-polymorphic type"); + let type_monomorph = self.types.get_monomorph(type_id); + + return Some(TypeInspector{ + heap: definition, + type_table: type_monomorph + }); + } + fn lookup_module_root(&self, module_name: &[u8]) -> Option { for module in self.modules.iter() { match &module.name { @@ -255,3 +285,30 @@ impl ProtocolDescriptionBuilder { }); } } + +pub struct TypeInspector<'a> { + heap: &'a Definition, + type_table: &'a MonoType, +} + +impl TypeInspector { + pub fn as_union(&self) -> UnionTypeInspector { + let heap = self.heap.as_union(); + let type_table = self.type_table.variant.as_union(); + return UnionTypeInspector{ heap, type_table }; + } +} + +pub struct UnionTypeInspector<'a> { + heap: &'a UnionDefinition, + type_table: &'a UnionMonomorph, +} + +impl UnionTypeInspector { + /// Retrieves union variant tag value. + pub fn get_variant_tag_value(&self, variant_name: &[u8]) -> Option { + let variant_index = self.heap.variants.iter() + .position(|v| v.identifier.value.as_bytes() == variant_name)?; + return Some(variant_index as i64); + } +} \ No newline at end of file diff --git a/src/protocol/parser/pass_typing.rs b/src/protocol/parser/pass_typing.rs index 99234f7eb2f1dec4b63694868262d7f8d24f1e9c..e3517585e4766375ee36dba416442a668fc97c73 100644 --- a/src/protocol/parser/pass_typing.rs +++ b/src/protocol/parser/pass_typing.rs @@ -2068,7 +2068,7 @@ impl PassTyping { ctx, infer_node.expr_id, &poly_data.poly_vars, first_part )?; - let (type_id, monomorph_index) = if let Some(type_id) = ctx.types.get_procedure_monomorph_type_id(&definition_id, &signature_type.parts) { + let (type_id, monomorph_index) = if let Some(type_id) = ctx.types.get_monomorph_type_id(&definition_id, &signature_type.parts) { // Procedure is already typechecked let monomorph_index = ctx.types.get_monomorph(type_id).variant.as_procedure().monomorph_index; (type_id, monomorph_index) diff --git a/src/protocol/parser/type_table.rs b/src/protocol/parser/type_table.rs index 9737204e3d853d5e6f9c567ccf42c8b6b4ebdb69..2084a2474138f966eba4120d4ecf030ee1b506b1 100644 --- a/src/protocol/parser/type_table.rs +++ b/src/protocol/parser/type_table.rs @@ -683,10 +683,10 @@ impl TypeTable { self.definition_lookup.get(&definition_id) } - /// Returns the index into the monomorph type array if the procedure type + /// Returns the index into the monomorph type array if the provided type /// already has a (reserved) monomorph. #[inline] - pub(crate) fn get_procedure_monomorph_type_id(&self, definition_id: &DefinitionId, type_parts: &[ConcreteTypePart]) -> Option { + pub(crate) fn get_monomorph_type_id(&self, definition_id: &DefinitionId, type_parts: &[ConcreteTypePart]) -> Option { // Cannot use internal search key due to mutability issues. But this // method should end up being deprecated at some point anyway. debug_assert_eq!(get_concrete_type_definition(type_parts).unwrap(), *definition_id); diff --git a/src/protocol/tests/utils.rs b/src/protocol/tests/utils.rs index d0694aa883873531183aedd7c23ba71dee352d16..6a321b67a212328b6ba181b77eb795a50b339bd0 100644 --- a/src/protocol/tests/utils.rs +++ b/src/protocol/tests/utils.rs @@ -703,7 +703,7 @@ impl<'a> FunctionTester<'a> { // Assuming the function is not polymorphic let definition_id = self.def.this; let func_type = [ConcreteTypePart::Function(definition_id, 0)]; - let mono_index = self.ctx.types.get_procedure_monomorph_type_id(&definition_id.upcast(), &func_type).unwrap(); + let mono_index = self.ctx.types.get_monomorph_type_id(&definition_id.upcast(), &func_type).unwrap(); let mut prompt = Prompt::new(&self.ctx.types, &self.ctx.heap, definition_id, mono_index, ValueGroup::new_stack(Vec::new())); let mut call_context = FakeRunContext{}; @@ -819,7 +819,7 @@ fn get_procedure_monomorph<'a>(heap: &Heap, types: &'a TypeTable, definition_id: [ConcreteTypePart::Component(ast_definition.this, 0)] }; - let mono_index = types.get_procedure_monomorph_type_id(&definition_id, &func_type).unwrap(); + let mono_index = types.get_monomorph_type_id(&definition_id, &func_type).unwrap(); let mono_data = types.get_monomorph(mono_index).variant.as_procedure(); mono_data diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 506699b8b80d90f0aa92e3164f83e04d1d606777..d50a9ef1a923719d29f2ab9e348e11c9e66117a9 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -18,6 +18,9 @@ pub enum CompScheduling { /// Generic representation of a component (as viewed by a scheduler). pub(crate) trait Component { + /// Called upon the creation of the component. + fn on_creation(&mut self, sched_ctx: &SchedulerCtx); + /// Called if the component is created by another component and the messages /// are being transferred between the two. fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage); diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 89642f966501369bc59216c2aea94d3a4f2a8d7a..64cf8f35d58c40467a2a7d157570f43f64a5f2b4 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -7,12 +7,27 @@ use super::component::{self, *}; use super::control_layer::*; use super::consensus::*; +use std::io::ErrorKind as IoErrorKind; + enum SocketState { Connected(SocketTcpClient), Error, } +impl SocketState { + fn get_socket(&self) -> &SocketTcpClient { + match self { + SocketState::Connected(v) => v, + SocketState::Error => unreachable!(), + } + } +} + +/// States from the point of view of the component that is connecting to this +/// TCP component (i.e. from the point of view of attempting to interface with +/// a socket). enum SyncState { + AwaitingCmd, Getting, Putting } @@ -20,16 +35,33 @@ enum SyncState { pub struct ComponentTcpClient { // Properties for the tcp socket socket_state: SocketState, + sync_state: SyncState, pending_recv: Vec, // on the input port pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket + input_union_send_tag_value: i64, + input_union_receive_tag_value: i64, + input_union_finish_tag_value: i64, // Generic component state exec_state: CompExecState, control: ControlLayer, consensus: Consensus, + // Temporary variables + byte_buffer: Vec, } impl Component for ComponentTcpClient { + fn on_creation(&mut self, sched_ctx: &SchedulerCtx) { + let pd = &sched_ctx.runtime.protocol; + let cmd_type = pd.find_type(b"std.internet", b"Cmd") + .expect("'Cmd' type in the 'std.internet' module") + .as_union(); + + self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap(); + self.input_union_receive_tag_value = cmd_type.get_variant_tag_value(b"Receive").unwrap(); + self.input_union_finish_tag_value = cmd_type.get_variant_tag_value(b"Finish").unwrap(); + } + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { self.handle_incoming_data_message(message); } @@ -56,25 +88,127 @@ impl Component for ComponentTcpClient { sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}", self.exec_state.mode)); match self.exec_state.mode { - CompMode::BlockedGet | CompMode::BlockedSelect => { - // impossible for this component. We always accept the input - // values, and we never perform an explicit select. + CompMode::BlockedSelect => { + // Not possible: we never enter this state unreachable!(); }, CompMode::NonSync => { // When in non-sync mode match &mut self.socket_state { - SocketState::Connected(socket) => { - if self.pending_tx + SocketState::Connected(_socket) => { + // Always move into the sync-state + self.sync_state = SyncState::AwaitingCmd; + self.consensus.notify_sync_start(comp_ctx); + self.exec_state.mode = CompMode::Sync; }, SocketState::Error => { + // Could potentially send an error message to the + // connected component. self.exec_state.mode = CompMode::StartExit; return Ok(CompScheduling::Immediate); } } }, CompMode::Sync => { + // When in sync mode: wait for a command to come in + match self.sync_state { + SyncState::AwaitingCmd => { + if let Some(message) = self.pending_recv.pop() { + if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) { + // Check which command we're supposed to execute. + let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); + if tag_value == self.input_union_send_tag_value { + // Retrieve bytes from the message + self.byte_buffer.clear(); + let union_content = &message.content.regions[embedded_heap_pos as usize]; + debug_assert_eq!(union_content.len(), 1); + let array_heap_pos = union_content[0].as_array(); + let array_values = &message.content.regions[array_heap_pos as usize]; + self.byte_buffer.reserve(array_values.len()); + for value in array_values { + self.byte_buffer.push(value.as_uint8()); + } + + self.sync_state = SyncState::Putting; + return Ok(CompScheduling::Immediate); + } else if tag_value == self.input_union_receive_tag_value { + // Component requires a `recv` + self.sync_state = SyncState::Getting; + return Ok(CompScheduling::Immediate); + } else if tag_value == self.input_union_finish_tag_value { + // Component requires us to end the sync round + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + } + } else { + todo!("handle sync failure due to message deadlock"); + return Ok(CompScheduling::Sleep); + } + } else { + self.exec_state.set_as_blocked_get(self.pdl_input_port_id); + return Ok(CompScheduling::Sleep); + } + }, + SyncState::Putting => { + // We're supposed to send a user-supplied message fully + // over the socket. But we might end up blocking. In + // that case the component goes to sleep until it is + // polled. + let socket = self.socket_state.get_socket(); + while !self.byte_buffer.is_empty() { + match socket.send(&self.byte_buffer) { + Ok(bytes_sent) => { + self.byte_buffer.drain(..bytes_sent); + }, + Err(err) => { + if err.kind() == IoErrorKind::WouldBlock { + return Ok(CompScheduling::Sleep); // wait until notified + } else { + todo!("handle socket.send error {:?}", err) + } + } + } + } + + // If here then we're done putting the data, we can + // finish the sync round + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + }, + SyncState::Getting => { + // We're going to try and receive a single message. If + // this causes us to end up blocking the component + // goes to sleep until it is polled. + const BUFFER_SIZE: usize = 1024; // TODO: Move to config + + let socket = self.socket_state.get_socket(); + debug_assert!(self.byte_buffer.is_empty()); + self.byte_buffer.resize(BUFFER_SIZE, 0); + match socket.receive(&mut self.byte_buffer) { + Ok(num_received) => { + self.byte_buffer.resize(num_received); + let message_content = self.bytes_to_data_message_content(&self.byte_buffer); + let port_handle = comp_ctx.get_port_handle(self.pdl_output_port_id); + let port_info = comp_ctx.get_port(port_handle); + let message = self.consensus.annotate_data_message(comp_ctx, port_info, message_content); + + }, + Err(err) => { + if err.kind() == IoErrorKind::WouldBlock { + return Ok(CompScheduling::Sleep); // wait until polled + } else { + todo!("handle socket.receive error {:?}", err); + } + } + } + }, + } + }, + CompMode::BlockedGet => { + // Entered when awaiting a new command + debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); + if self. }, CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), @@ -118,7 +252,9 @@ impl ComponentTcpClient { return Self{ socket_state: SocketState::Connected(socket.unwrap()), - pending_tx: Vec::new(), + input_union_send_tag_value: -1, + input_union_receive_tag_value: -1, + input_union_finish_tag_value: -1, pdl_input_port_id: input_port, pdl_output_port_id: output_port, exec_state: CompExecState::new(), @@ -131,7 +267,6 @@ impl ComponentTcpClient { fn handle_incoming_data_message(&mut self, message: DataMessage) { // Input message is an array of bytes (u8) self.pending_recv.push(message); - } fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec) { @@ -148,4 +283,19 @@ impl ComponentTcpClient { unreachable!(); } } + + fn bytes_to_data_message_content(&self, buffer: &[u8]) -> ValueGroup { + // Turn bytes into silly executor-style array + let mut values = Vec::with_capacity(buffer.len()); + for byte in buffer.iter().copied() { + values.push(Value::UInt8(byte)); + } + + // Put in a value group + let mut value_group = ValueGroup::default(); + value_group.regions.push(values); + value_group.values.push(Value::Array(0)); + + return value_group; + } } \ No newline at end of file diff --git a/src/runtime2/stdlib/internet.rs b/src/runtime2/stdlib/internet.rs index 94a37c19530842200f7cc6a7c7119372c035fd01..bb418c8ffb046d5ff9546bc8164832d58bdf2cae 100644 --- a/src/runtime2/stdlib/internet.rs +++ b/src/runtime2/stdlib/internet.rs @@ -49,13 +49,13 @@ impl SocketTcpClient { }) } - pub fn send(&self, message: &[u8]) -> Result { + pub fn send(&self, message: &[u8]) -> Result { let result = unsafe{ let message_pointer = message.as_ptr().cast(); libc::send(self.socket_handle, message_pointer, message.len() as libc::size_t, 0) }; if result < 0 { - return Err(()) + return Err(IoError::last_os_error()); } return Ok(result as usize); @@ -64,15 +64,6 @@ impl SocketTcpClient { /// Receives data from the TCP socket. Returns the number of bytes received. /// More bytes may be present even thought `used < buffer.len()`. pub fn receive(&self, buffer: &mut [u8]) -> Result { - if self.is_blocking { - return self.receive_blocking(buffer); - } else { - return self.receive_nonblocking(buffer); - } - } - - #[inline] - fn receive_blocking(&self, buffer: &mut [u8]) -> Result { let result = unsafe { let message_pointer = buffer.as_mut_ptr().cast(); libc::recv(self.socket_handle, message_pointer, buffer.len(), 0) @@ -83,37 +74,6 @@ impl SocketTcpClient { return Ok(result as usize); } - - #[inline] - fn receive_nonblocking(&self, buffer: &mut [u8]) -> Result { - unsafe { - let mut message_pointer = buffer.as_mut_ptr().cast(); - let mut remaining = buffer.len(); - - loop { - // Receive more data - let result = libc::recv(self.socket_handle, message_pointer, remaining, 0); - if result < 0 { - // Check reason - let os_error = IoError::last_os_error(); - if os_error.kind() == IoErrorKind::WouldBlock { - return Ok(buffer.len() - remaining); - } else { - return Err(os_error); - } - } - - // Modify pointer and remaining bytes - let received = result as usize; - message_pointer = message_pointer.add(received); - remaining -= received; - - if remaining == 0 { - return Ok(buffer.len()); - } - } - } - } } impl Drop for SocketTcpClient { @@ -355,22 +315,22 @@ mod tests { use std::net::*; use super::*; - #[test] - fn test_inet_thingo() { - const SIZE: usize = 1024; - - let s = SocketTcpClient::new(IpAddr::V4(Ipv4Addr::new(142, 250, 179, 163)), 80).expect("connect"); - s.send(b"GET / HTTP/1.1\r\n\r\n").expect("sending"); - let mut total = Vec::::new(); - let mut buffer = [0; SIZE]; - let mut received = SIZE; - - while received > 0 { - received = s.receive(&mut buffer).expect("receiving"); - println!("DEBUG: Received {} bytes", received); - total.extend_from_slice(&buffer[..received]); - } - let as_str = String::from_utf8_lossy(total.as_slice()); - println!("Yay! Got {} bytes:\n{}", as_str.len(), as_str); - } + // #[test] @nocommit Remove this + // fn test_inet_thingo() { + // const SIZE: usize = 1024; + // + // let s = SocketTcpClient::new(IpAddr::V4(Ipv4Addr::new(142, 250, 179, 163)), 80).expect("connect"); + // s.send(b"GET / HTTP/1.1\r\n\r\n").expect("sending"); + // let mut total = Vec::::new(); + // let mut buffer = [0; SIZE]; + // let mut received = SIZE; + // + // while received > 0 { + // received = s.receive(&mut buffer).expect("receiving"); + // println!("DEBUG: Received {} bytes", received); + // total.extend_from_slice(&buffer[..received]); + // } + // let as_str = String::from_utf8_lossy(total.as_slice()); + // println!("Yay! Got {} bytes:\n{}", as_str.len(), as_str); + // } } \ No newline at end of file diff --git a/std/std.internet.pdl b/std/std.internet.pdl index 2c7fc0601d3cb7ef441d6919876e6ee4d537d097..fa94ba6f198e11b59fa2ad744d3904bed4d79fc6 100644 --- a/std/std.internet.pdl +++ b/std/std.internet.pdl @@ -1,5 +1,11 @@ #module std.internet -primitive tcp_client(u8[] ip, u16 port, in tx, out rx) { +union Cmd { + Send(u8[]), + Receive, + Finish, +} + +primitive tcp_client(u8[] ip, u16 port, in cmds, out rx) { #builtin } \ No newline at end of file