From 1bc57ab68e0ee12f25d67e8b0ce6ebb603e2e3a0 2022-04-13 14:57:03 From: Max Henger Date: 2022-04-13 14:57:03 Subject: [PATCH] Merge branch 'feat-builtin-ip' into 'master' feat: Builtin internet component See merge request nl-cwi-csy/reowolf!6 --- diff --git a/Cargo.toml b/Cargo.toml index 9c3801265cc09b5ee9216ea9d148e4ce62661815..0111c5509eb738484982e085a3da446386f226d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,29 +8,14 @@ authors = [ ] edition = "2021" -[dependencies] -# convenience macros -maplit = "1.0.2" -derive_more = "0.99.2" - -# runtime -bincode = "1.3.1" -serde = { version = "1.0.114", features = ["derive"] } -getrandom = "0.1.14" # tiny crate. used to guess controller-id - -# network -mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] } -socket2 = { version = "0.3.12", optional = true } +[features] +default=["internet"] +internet=["libc"] -# protocol -backtrace = "0.3" -lazy_static = "1.4.0" - -# ffi +[dependencies] -# socket ffi -libc = { version = "^0.2", optional = true } -os_socketaddr = { version = "0.1.0", optional = true } +libc = { version = "^0.2", optional = true } # raw sockets +mio = { version = "0.8", features = ["os-poll"] } # cross-platform IO notification queue # randomness rand = "0.8.4" diff --git a/bin-compiler/src/main.rs b/bin-compiler/src/main.rs index f9d1b46c4bb3f139e60139fc4f4394361433a731..1482ae752710211bac6dcaab87745d6bbad57d16 100644 --- a/bin-compiler/src/main.rs +++ b/bin-compiler/src/main.rs @@ -31,6 +31,13 @@ fn main() { .long("debug") .short('d') .help("enable debug logging") + ) + .arg( + Arg::new("stdlib") + .long("stdlib") + .short('s') + .help("standard library directory (overrides default)") + .takes_value(true) ); // Retrieve arguments and convert @@ -59,11 +66,15 @@ fn main() { let debug_enabled = app.is_present("debug"); + let standard_library_dir = app.value_of("stdlib") + .map(|v| v.to_string()); + // Add input files to file buffer let input_files = input_files.unwrap(); assert!(input_files.len() > 0); // because arg is required - let mut builder = rw::ProtocolDescriptionBuilder::new(); + let mut builder = rw::ProtocolDescriptionBuilder::new(standard_library_dir) + .expect("create protocol description builder"); let mut file_buffer = Vec::with_capacity(4096); for input_file in input_files { @@ -101,9 +112,17 @@ fn main() { println!("Success"); + // Start runtime + print!("Startup of runtime ... "); + let runtime = rw::runtime2::Runtime::new(num_threads, debug_enabled, protocol_description); + if let Err(err) = &runtime { + println!("FAILED\nbecause:\n{}", err); + } + println!("Success"); + // Make sure there is a nameless module with a main component print!("Creating main component ... "); - let runtime = rw::runtime2::Runtime::new(num_threads, debug_enabled, protocol_description); + let runtime = runtime.unwrap(); if let Err(err) = runtime.create_component(b"", b"main") { use rw::ComponentCreationError as CCE; let reason = match err { diff --git a/src/collections/scoped_buffer.rs b/src/collections/scoped_buffer.rs index da789984728f3c4796ea46d3715fc91395ab10fd..956d77b87b9fd6351d77e29b7bf54454b59683c0 100644 --- a/src/collections/scoped_buffer.rs +++ b/src/collections/scoped_buffer.rs @@ -176,7 +176,10 @@ impl std::ops::IndexMut for ScopedSection { } } -#[cfg(debug_assertions)] +// note: this `Drop` impl used to be debug-only, requiring the programmer to +// call `into_vec` or `forget`. But this is rather error prone. So we'll check +// in debug mode, but always truncate in release mode (even though this is a +// noop in most cases). impl Drop for ScopedSection { fn drop(&mut self) { let vec = unsafe{&mut *self.inner}; diff --git a/src/protocol/ast.rs b/src/protocol/ast.rs index e137985fa110c6e2cea9a0f83be7cedf79aef88d..3bcc0b525893c980586cd57246693feb65892865 100644 --- a/src/protocol/ast.rs +++ b/src/protocol/ast.rs @@ -242,7 +242,7 @@ pub struct Root { } impl Root { - pub fn get_definition_ident(&self, h: &Heap, id: &[u8]) -> Option { + pub fn get_definition_by_ident(&self, h: &Heap, id: &[u8]) -> Option { for &def in self.definitions.iter() { if h[def].identifier().value.as_bytes() == id { return Some(def); @@ -932,7 +932,6 @@ pub struct StructDefinition { pub this: StructDefinitionId, pub defined_in: RootId, // Symbol scanning - pub span: InputSpan, pub identifier: Identifier, pub poly_vars: Vec, // Parsing @@ -941,10 +940,10 @@ pub struct StructDefinition { impl StructDefinition { pub(crate) fn new_empty( - this: StructDefinitionId, defined_in: RootId, span: InputSpan, + this: StructDefinitionId, defined_in: RootId, identifier: Identifier, poly_vars: Vec ) -> Self { - Self{ this, defined_in, span, identifier, poly_vars, fields: Vec::new() } + Self{ this, defined_in, identifier, poly_vars, fields: Vec::new() } } } @@ -965,7 +964,6 @@ pub struct EnumDefinition { pub this: EnumDefinitionId, pub defined_in: RootId, // Symbol scanning - pub span: InputSpan, pub identifier: Identifier, pub poly_vars: Vec, // Parsing @@ -974,10 +972,10 @@ pub struct EnumDefinition { impl EnumDefinition { pub(crate) fn new_empty( - this: EnumDefinitionId, defined_in: RootId, span: InputSpan, + this: EnumDefinitionId, defined_in: RootId, identifier: Identifier, poly_vars: Vec ) -> Self { - Self{ this, defined_in, span, identifier, poly_vars, variants: Vec::new() } + Self{ this, defined_in, identifier, poly_vars, variants: Vec::new() } } } @@ -993,7 +991,6 @@ pub struct UnionDefinition { pub this: UnionDefinitionId, pub defined_in: RootId, // Phase 1: symbol scanning - pub span: InputSpan, pub identifier: Identifier, pub poly_vars: Vec, // Phase 2: parsing @@ -1002,10 +999,10 @@ pub struct UnionDefinition { impl UnionDefinition { pub(crate) fn new_empty( - this: UnionDefinitionId, defined_in: RootId, span: InputSpan, + this: UnionDefinitionId, defined_in: RootId, identifier: Identifier, poly_vars: Vec ) -> Self { - Self{ this, defined_in, span, identifier, poly_vars, variants: Vec::new() } + Self{ this, defined_in, identifier, poly_vars, variants: Vec::new() } } } @@ -1071,6 +1068,37 @@ impl ExpressionInfoVariant { } } +#[derive(Debug)] +pub enum ProcedureSource { + FuncUserDefined, + CompUserDefined, + // Builtin functions, available to user + FuncGet, + FuncPut, + FuncFires, + FuncCreate, + FuncLength, + FuncAssert, + FuncPrint, + // Buitlin functions, not available to user + FuncSelectStart, + FuncSelectRegisterCasePort, + FuncSelectWait, + // Builtin components, available to user + CompRandomU32, // TODO: Remove, temporary thing + CompTcpClient, +} + +impl ProcedureSource { + pub(crate) fn is_builtin(&self) -> bool { + match self { + ProcedureSource::FuncUserDefined | ProcedureSource::CompUserDefined => false, + _ => true, + } + } +} + + /// Generic storage for functions, primitive components and composite /// components. // Note that we will have function definitions for builtin functions as well. In @@ -1080,12 +1108,11 @@ pub struct ProcedureDefinition { pub this: ProcedureDefinitionId, pub defined_in: RootId, // Symbol scanning - pub builtin: bool, pub kind: ProcedureKind, - pub span: InputSpan, pub identifier: Identifier, pub poly_vars: Vec, // Parser + pub source: ProcedureSource, pub return_type: Option, // present on functions, not components pub parameters: Vec, pub scope: ScopeId, @@ -1096,14 +1123,13 @@ pub struct ProcedureDefinition { impl ProcedureDefinition { pub(crate) fn new_empty( - this: ProcedureDefinitionId, defined_in: RootId, span: InputSpan, + this: ProcedureDefinitionId, defined_in: RootId, kind: ProcedureKind, identifier: Identifier, poly_vars: Vec ) -> Self { Self { this, defined_in, - builtin: false, - span, kind, identifier, poly_vars, + source: ProcedureSource::FuncUserDefined, return_type: None, parameters: Vec::new(), scope: ScopeId::new_invalid(), @@ -1813,7 +1839,7 @@ pub struct CallExpression { #[derive(Debug, Clone, PartialEq, Eq)] pub enum Method { - // Builtin, accessible by programmer + // Builtin function, accessible by programmer Get, Put, Fires, @@ -1821,10 +1847,13 @@ pub enum Method { Length, Assert, Print, - // Builtin, not accessible by programmer + // Builtin function, not accessible by programmer SelectStart, // SelectStart(total_num_cases, total_num_ports) SelectRegisterCasePort, // SelectRegisterCasePort(case_index, port_index, port_id) SelectWait, // SelectWait() -> u32 + // Builtin component, + ComponentRandomU32, + ComponentTcpClient, // User-defined UserFunction, UserComponent, @@ -1835,6 +1864,7 @@ impl Method { use Method::*; match self { Get | Put | Fires | Create | Length | Assert | Print => true, + ComponentRandomU32 | ComponentTcpClient => true, _ => false, } } @@ -1866,6 +1896,7 @@ pub enum Literal { True, False, Character(char), + Bytestring(Vec), String(StringRef<'static>), Integer(LiteralInteger), Struct(LiteralStruct), diff --git a/src/protocol/ast_printer.rs b/src/protocol/ast_writer.rs similarity index 98% rename from src/protocol/ast_printer.rs rename to src/protocol/ast_writer.rs index 22a5db0376dda2d918d174df982d37d19cec4970..1b376a16f7f674291c2b55efa010ea1b25ff23d9 100644 --- a/src/protocol/ast_printer.rs +++ b/src/protocol/ast_writer.rs @@ -365,8 +365,12 @@ impl ASTWriter { self.write_variable(heap, *variable_id, indent3); } - self.kv(indent2).with_s_key("Body"); - self.write_stmt(heap, def.body.upcast(), indent3); + if def.source.is_builtin() { + self.kv(indent2).with_s_key("Body").with_s_val("Builtin"); + } else { + self.kv(indent2).with_s_key("Body"); + self.write_stmt(heap, def.body.upcast(), indent3); + } }, } } @@ -685,6 +689,11 @@ impl ASTWriter { Literal::True => { val.with_s_val("true"); }, Literal::False => { val.with_s_val("false"); }, Literal::Character(data) => { val.with_disp_val(data); }, + Literal::Bytestring(bytes) => { + // Bytestrings are ASCII, so just convert back + let string = String::from_utf8_lossy(bytes.as_slice()); + val.with_disp_val(&string); + }, Literal::String(data) => { // Stupid hack let string = String::from(data.as_str()); @@ -770,7 +779,7 @@ impl ASTWriter { self.kv(indent2).with_s_key("Method").with_debug_val(&expr.method); if !expr.procedure.is_invalid() { let definition = &heap[expr.procedure]; - self.kv(indent2).with_s_key("BuiltIn").with_disp_val(&definition.builtin); + self.kv(indent2).with_s_key("Source").with_debug_val(&definition.source); self.kv(indent2).with_s_key("Variant").with_debug_val(&definition.kind); self.kv(indent2).with_s_key("MethodName").with_identifier_val(&definition.identifier); self.kv(indent2).with_s_key("ParserType") diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index 2a77a1ebb0dcb357eb413cd626ea2b65a7abb639..51fd02d09163c89b2881c71691a6931795858990 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -135,7 +135,7 @@ impl Frame { // Here we only care about literals that have subexpressions match &expr.value { Literal::Null | Literal::True | Literal::False | - Literal::Character(_) | Literal::String(_) | + Literal::Character(_) | Literal::Bytestring(_) | Literal::String(_) | Literal::Integer(_) | Literal::Enum(_) => { // No subexpressions }, @@ -514,6 +514,16 @@ impl Prompt { Literal::True => Value::Bool(true), Literal::False => Value::Bool(false), Literal::Character(lit_value) => Value::Char(*lit_value), + Literal::Bytestring(lit_value) => { + let heap_pos = self.store.alloc_heap(); + let values = &mut self.store.heap_regions[heap_pos as usize].values; + debug_assert!(values.is_empty()); + values.reserve(lit_value.len()); + for byte in lit_value { + values.push(Value::UInt8(*byte)); + } + Value::Array(heap_pos) + } Literal::String(lit_value) => { let heap_pos = self.store.alloc_heap(); let values = &mut self.store.heap_regions[heap_pos as usize].values; @@ -718,6 +728,8 @@ impl Prompt { // Convert the runtime-variant of a string // into an actual string. let value = cur_frame.expr_values.pop_front().unwrap(); + let mut is_literal_string = value.get_heap_pos().is_some(); + let value = self.store.maybe_read_ref(&value); let value_heap_pos = value.as_string(); let elements = &self.store.heap_regions[value_heap_pos as usize].values; @@ -728,7 +740,10 @@ impl Prompt { // Drop the heap-allocated value from the // store - self.store.drop_heap_pos(value_heap_pos); + if is_literal_string { + self.store.drop_heap_pos(value_heap_pos); + } + println!("{}", message); }, Method::SelectStart => { @@ -755,11 +770,15 @@ impl Prompt { }, } }, + Method::ComponentRandomU32 | Method::ComponentTcpClient => { + debug_assert_eq!(heap[expr.procedure].parameters.len(), cur_frame.expr_values.len()); + debug_assert_eq!(heap[cur_frame.position].as_new().expression, expr.this); + }, Method::UserComponent => { // This is actually handled by the evaluation // of the statement. debug_assert_eq!(heap[expr.procedure].parameters.len(), cur_frame.expr_values.len()); - debug_assert_eq!(heap[cur_frame.position].as_new().expression, expr.this) + debug_assert_eq!(heap[cur_frame.position].as_new().expression, expr.this); }, Method::UserFunction => { // Push a new frame. Note that all expressions have diff --git a/src/protocol/eval/value.rs b/src/protocol/eval/value.rs index d8bf773b7bc74426a37fa54ad573c4c0d6d8bd00..c4c5060485d669df5943ff6af48090d89b0f604e 100644 --- a/src/protocol/eval/value.rs +++ b/src/protocol/eval/value.rs @@ -183,6 +183,7 @@ impl ValueGroup { regions: Vec::new(), } } + pub(crate) fn from_store(store: &Store, values: &[Value]) -> Self { let mut group = ValueGroup{ values: Vec::with_capacity(values.len()), @@ -197,6 +198,15 @@ impl ValueGroup { group } + /// Creates a clone of the value group, but leaves the memory inside of the + /// ValueGroup vectors allocated. + pub(crate) fn take(&mut self) -> ValueGroup { + let cloned = self.clone(); + self.values.clear(); + self.regions.clear(); + return cloned; + } + /// Transfers a provided value from a store into a local value with its /// heap allocations (if any) stored in the ValueGroup. Calling this /// function will not store the returned value in the `values` member. diff --git a/src/protocol/input_source.rs b/src/protocol/input_source.rs index f2cf10693ae87bc134610829d91344bcc1f221f1..c2c7cd164eea9c9e433e2ecd569b4c65de2ae4ee 100644 --- a/src/protocol/input_source.rs +++ b/src/protocol/input_source.rs @@ -170,6 +170,13 @@ impl InputSource { return lookup; } + /// Retrieves the column associated with a line. Calling this incurs a read + /// lock, so don't spam it in happy-path compiler code. + pub(crate) fn get_column(&self, pos: InputPosition) -> u32 { + let line_start = self.lookup_line_start_offset(pos.line); + return pos.offset - line_start + 1; + } + /// Retrieves offset at which line starts (right after newline) fn lookup_line_start_offset(&self, line_number: u32) -> u32 { let lookup = self.get_lookup(); diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 45f880fbe64dec700ef72086e324877299969e5b..d0dd2453c06c9f49d1cce7f935c3c0f092dba0ce 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -5,12 +5,13 @@ mod parser; #[cfg(test)] mod tests; pub(crate) mod ast; -pub(crate) mod ast_printer; +pub(crate) mod ast_writer; +mod token_writer; use std::sync::Mutex; use crate::collections::{StringPool, StringRef}; -use crate::protocol::ast::*; +pub use crate::protocol::ast::*; use crate::protocol::eval::*; use crate::protocol::input_source::*; use crate::protocol::parser::*; @@ -51,7 +52,7 @@ pub enum ComponentCreationError { impl ProtocolDescription { pub fn parse(buffer: &[u8]) -> Result { let source = InputSource::new(String::new(), Vec::from(buffer)); - let mut parser = Parser::new(); + let mut parser = Parser::new(None)?; parser.feed(source).expect("failed to feed source"); if let Err(err) = parser.parse() { @@ -59,7 +60,6 @@ impl ProtocolDescription { return Err(format!("{}", err)) } - debug_assert_eq!(parser.modules.len(), 1, "only supporting one module here for now"); let modules: Vec = parser.modules.into_iter() .map(|module| Module{ source: module.source, @@ -87,7 +87,7 @@ impl ProtocolDescription { let module_root = module_root.unwrap(); let root = &self.heap[module_root]; - let definition_id = root.get_definition_ident(&self.heap, identifier); + let definition_id = root.get_definition_by_ident(&self.heap, identifier); if definition_id.is_none() { return Err(ComponentCreationError::DefinitionDoesntExist); } @@ -108,7 +108,7 @@ impl ProtocolDescription { // - check number of arguments by retrieving the one instantiated // monomorph let concrete_type = ConcreteType{ parts: vec![ConcreteTypePart::Component(ast_definition.this, 0)] }; - let procedure_type_id = self.types.get_procedure_monomorph_type_id(&definition_id, &concrete_type.parts).unwrap(); + let procedure_type_id = self.types.get_monomorph_type_id(&definition_id, &concrete_type.parts).unwrap(); let procedure_monomorph_index = self.types.get_monomorph(procedure_type_id).variant.as_procedure().monomorph_index; let monomorph_info = &ast_definition.monomorphs[procedure_monomorph_index as usize]; if monomorph_info.argument_types.len() != arguments.values.len() { @@ -130,6 +130,36 @@ impl ProtocolDescription { return Ok(Prompt::new(&self.types, &self.heap, ast_definition.this, procedure_type_id, arguments)); } + /// A somewhat temporary method. Can be used by components to lookup type + /// definitions by their name (to have their implementation somewhat + /// resistant to changes in the standard library) + pub(crate) fn find_type<'a>(&'a self, module_name: &[u8], type_name: &[u8]) -> Option> { + // Lookup type definition in module + let root_id = self.lookup_module_root(module_name)?; + let module = &self.heap[root_id]; + let definition_id = module.get_definition_by_ident(&self.heap, type_name)?; + let definition = &self.heap[definition_id]; + + // Make sure type is not polymorphic and is not a procedure + if !definition.poly_vars().is_empty() { + return None; + } + if definition.is_procedure() { + return None; + } + + // Lookup type in type table + let type_parts = [ConcreteTypePart::Instance(definition_id, 0)]; + let type_id = self.types.get_monomorph_type_id(&definition_id, &type_parts) + .expect("type ID for non-polymorphic type"); + let type_monomorph = self.types.get_monomorph(type_id); + + return Some(TypeInspector{ + heap: definition, + type_table: type_monomorph + }); + } + fn lookup_module_root(&self, module_name: &[u8]) -> Option { for module in self.modules.iter() { match &module.name { @@ -223,10 +253,9 @@ pub struct ProtocolDescriptionBuilder { } impl ProtocolDescriptionBuilder { - pub fn new() -> Self { - return Self{ - parser: Parser::new(), - } + pub fn new(std_lib_dir: Option) -> Result { + let mut parser = Parser::new(std_lib_dir)?; + return Ok(Self{ parser }) } pub fn add(&mut self, filename: String, buffer: Vec) -> Result<(), ParseError> { @@ -255,3 +284,30 @@ impl ProtocolDescriptionBuilder { }); } } + +pub struct TypeInspector<'a> { + heap: &'a Definition, + type_table: &'a MonoType, +} + +impl<'a> TypeInspector<'a> { + pub fn as_union(&'a self) -> UnionTypeInspector<'a> { + let heap = self.heap.as_union(); + let type_table = self.type_table.variant.as_union(); + return UnionTypeInspector{ heap, type_table }; + } +} + +pub struct UnionTypeInspector<'a> { + heap: &'a UnionDefinition, + type_table: &'a UnionMonomorph, +} + +impl UnionTypeInspector<'_> { + /// Retrieves union variant tag value. + pub fn get_variant_tag_value(&self, variant_name: &[u8]) -> Option { + let variant_index = self.heap.variants.iter() + .position(|v| v.identifier.value.as_bytes() == variant_name)?; + return Some(variant_index as i64); + } +} \ No newline at end of file diff --git a/src/protocol/parser/mod.rs b/src/protocol/parser/mod.rs index e55bf6b31adb04d40e19891ed39a219762f616dd..8f1500639951df5c37527325b710f7e2f3e88097 100644 --- a/src/protocol/parser/mod.rs +++ b/src/protocol/parser/mod.rs @@ -30,8 +30,12 @@ use type_table::*; use crate::protocol::ast::*; use crate::protocol::input_source::*; -use crate::protocol::ast_printer::ASTWriter; +use crate::protocol::ast_writer::ASTWriter; use crate::protocol::parser::type_table::PolymorphicVariable; +use crate::protocol::token_writer::TokenWriter; + +const REOWOLF_PATH_ENV: &'static str = "REOWOLF_ROOT"; // first lookup reowolf path +const REOWOLF_PATH_DIR: &'static str = "std"; // then try folder in current working directory #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum ModuleCompilationPhase { @@ -48,10 +52,10 @@ pub enum ModuleCompilationPhase { } pub struct Module { - // Buffers pub source: InputSource, pub tokens: TokenBuffer, - // Identifiers + pub is_compiler_file: bool, // TODO: @Hack for custom compiler-only types + pub add_to_global_namespace: bool, pub root_id: RootId, pub name: Option<(PragmaId, StringRef<'static>)>, pub version: Option<(PragmaId, i64)>, @@ -118,6 +122,7 @@ pub struct Parser { pub(crate) modules: Vec, pub(crate) symbol_table: SymbolTable, pub(crate) type_table: TypeTable, + pub(crate) global_module_index: usize, // contains globals, implicitly imported everywhere // Compiler passes, used as little state machine that keep their memory // around. pass_tokenizer: PassTokenizer, @@ -129,18 +134,21 @@ pub struct Parser { pass_rewriting: PassRewriting, pass_stack_size: PassStackSize, // Compiler options + pub write_tokens_to: Option, pub write_ast_to: Option, + pub std_lib_dir: Option, pub(crate) arch: TargetArch, } impl Parser { - pub fn new() -> Self { + pub fn new(std_lib_dir: Option) -> Result { let mut parser = Parser{ heap: Heap::new(), string_pool: StringPool::new(), modules: Vec::new(), symbol_table: SymbolTable::new(), type_table: TypeTable::new(), + global_module_index: 0, pass_tokenizer: PassTokenizer::new(), pass_symbols: PassSymbols::new(), pass_import: PassImport::new(), @@ -149,7 +157,9 @@ impl Parser { pass_typing: PassTyping::new(), pass_rewriting: PassRewriting::new(), pass_stack_size: PassStackSize::new(), + write_tokens_to: None, write_ast_to: None, + std_lib_dir, arch: TargetArch::new(), }; @@ -176,78 +186,17 @@ impl Parser { parser.arch.output_type_id = insert_builtin_type(&mut parser.type_table, vec![ConcreteTypePart::Output, ConcreteTypePart::Void], true, 8, 8); parser.arch.pointer_type_id = insert_builtin_type(&mut parser.type_table, vec![ConcreteTypePart::Pointer, ConcreteTypePart::Void], true, 8, 8); - // Insert builtin functions - fn quick_type(variants: &[ParserTypeVariant]) -> ParserType { - let mut t = ParserType{ elements: Vec::with_capacity(variants.len()), full_span: InputSpan::new() }; - for variant in variants { - t.elements.push(ParserTypeElement{ element_span: InputSpan::new(), variant: variant.clone() }); - } - t - } + // Parse standard library + parser.feed_standard_library()?; - use ParserTypeVariant as PTV; - insert_builtin_function(&mut parser, "get", &["T"], |id| ( - vec![ - ("input", quick_type(&[PTV::Input, PTV::PolymorphicArgument(id.upcast(), 0)])) - ], - quick_type(&[PTV::PolymorphicArgument(id.upcast(), 0)]) - )); - insert_builtin_function(&mut parser, "put", &["T"], |id| ( - vec![ - ("output", quick_type(&[PTV::Output, PTV::PolymorphicArgument(id.upcast(), 0)])), - ("value", quick_type(&[PTV::PolymorphicArgument(id.upcast(), 0)])), - ], - quick_type(&[PTV::Void]) - )); - insert_builtin_function(&mut parser, "fires", &["T"], |id| ( - vec![ - ("port", quick_type(&[PTV::InputOrOutput, PTV::PolymorphicArgument(id.upcast(), 0)])) - ], - quick_type(&[PTV::Bool]) - )); - insert_builtin_function(&mut parser, "create", &["T"], |id| ( - vec![ - ("length", quick_type(&[PTV::IntegerLike])) - ], - quick_type(&[PTV::ArrayLike, PTV::PolymorphicArgument(id.upcast(), 0)]) - )); - insert_builtin_function(&mut parser, "length", &["T"], |id| ( - vec![ - ("array", quick_type(&[PTV::ArrayLike, PTV::PolymorphicArgument(id.upcast(), 0)])) - ], - quick_type(&[PTV::UInt32]) // TODO: @PtrInt - )); - insert_builtin_function(&mut parser, "assert", &[], |_id| ( - vec![ - ("condition", quick_type(&[PTV::Bool])), - ], - quick_type(&[PTV::Void]) - )); - insert_builtin_function(&mut parser, "print", &[], |_id| ( - vec![ - ("message", quick_type(&[PTV::String])), - ], - quick_type(&[PTV::Void]) - )); - - parser + return Ok(parser) } - pub fn feed(&mut self, mut source: InputSource) -> Result<(), ParseError> { - let mut token_buffer = TokenBuffer::new(); - self.pass_tokenizer.tokenize(&mut source, &mut token_buffer)?; - - let module = Module{ - source, - tokens: token_buffer, - root_id: RootId::new_invalid(), - name: None, - version: None, - phase: ModuleCompilationPhase::Tokenized, - }; - self.modules.push(module); - - Ok(()) + /// Feeds a new InputSource to the parser, which will tokenize it and store + /// it internally for later parsing (when all modules are present). Returns + /// the index of the new module. + pub fn feed(&mut self, mut source: InputSource) -> Result { + return self.feed_internal(source, false, false); } pub fn parse(&mut self) -> Result<(), ParseError> { @@ -270,6 +219,12 @@ impl Parser { self.pass_definitions.parse(&mut self.modules, module_idx, &mut pass_ctx)?; } + if let Some(filename) = &self.write_tokens_to { + let mut writer = TokenWriter::new(); + let mut file = std::fs::File::create(std::path::Path::new(filename)).unwrap(); + writer.write(&mut file, &self.modules); + } + // Add every known type to the type table self.type_table.build_base_types(&mut self.modules, &mut pass_ctx)?; @@ -337,6 +292,103 @@ impl Parser { Ok(()) } + + /// Tries to find the standard library and add the files for parsing. + fn feed_standard_library(&mut self) -> Result<(), String> { + use std::env; + use std::path::{Path, PathBuf}; + use std::fs; + + // Pair is (name, add_to_global_namespace) + const FILES: [(&'static str, bool); 3] = [ + ("std.global.pdl", true), + ("std.internet.pdl", false), + ("std.random.pdl", false), + ]; + + // Determine base directory + let (base_path, from_env) = if let Ok(path) = env::var(REOWOLF_PATH_ENV) { + // Path variable is set + (path, true) + } else { + let path = match self.std_lib_dir.take() { + Some(path) => path, + None => { + let mut path = String::with_capacity(REOWOLF_PATH_DIR.len() + 2); + path.push_str("./"); + path.push_str(REOWOLF_PATH_DIR); + path + } + }; + + (path, false) + }; + + // Make sure directory exists + let path = Path::new(&base_path); + if !path.exists() { + return Err(format!("std lib root directory '{}' does not exist", base_path)); + } + + // Try to load all standard library files. We might need a more unified + // way to do this in the future (i.e. a "std" package, containing all + // of the modules) + let mut file_path = PathBuf::new(); + let mut first_file = true; + + for (file, add_to_global_namespace) in FILES { + file_path.clear(); + file_path.push(path); + file_path.push(file); + + let source = fs::read(file_path.as_path()); + if let Err(err) = source { + return Err(format!( + "failed to read std lib file '{}' in root directory '{}', because: {}", + file, base_path, err + )); + } + + let source = source.unwrap(); + let input_source = InputSource::new(file.to_string(), source); + + let module_index = self.feed_internal(input_source, true, add_to_global_namespace); + if let Err(err) = module_index { + // A bit of a hack, but shouldn't really happen anyway: the + // compiler should ship with a decent standard library (at some + // point) + return Err(format!("{}", err)); + } + let module_index = module_index.unwrap(); + + if first_file { + self.global_module_index = module_index; + first_file = false; + } + } + + return Ok(()) + } + + fn feed_internal(&mut self, mut source: InputSource, is_compiler_file: bool, add_to_global_namespace: bool) -> Result { + let mut token_buffer = TokenBuffer::new(); + self.pass_tokenizer.tokenize(&mut source, &mut token_buffer)?; + + let module = Module{ + source, + tokens: token_buffer, + is_compiler_file, + add_to_global_namespace, + root_id: RootId::new_invalid(), + name: None, + version: None, + phase: ModuleCompilationPhase::Tokenized, + }; + let module_index = self.modules.len(); + self.modules.push(module); + + return Ok(module_index); + } } fn insert_builtin_type(type_table: &mut TypeTable, parts: Vec, has_poly_var: bool, size: usize, alignment: usize) -> TypeId { @@ -353,79 +405,4 @@ fn insert_builtin_type(type_table: &mut TypeTable, parts: Vec, }; return type_table.add_builtin_data_type(concrete_type, poly_var, size, alignment); -} - -// Note: args and return type need to be a function because we need to know the function ID. -fn insert_builtin_function (Vec<(&'static str, ParserType)>, ParserType)> ( - p: &mut Parser, func_name: &str, polymorphic: &[&str], arg_and_return_fn: T -) { - // Insert into AST (to get an ID), also prepare the polymorphic variables - // we need later for the type table - let mut ast_poly_vars = Vec::with_capacity(polymorphic.len()); - let mut type_poly_vars = Vec::with_capacity(polymorphic.len()); - for poly_var in polymorphic { - let identifier = Identifier{ span: InputSpan::new(), value: p.string_pool.intern(poly_var.as_bytes()) } ; - ast_poly_vars.push(identifier.clone()); - type_poly_vars.push(PolymorphicVariable{ identifier, is_in_use: false }); - } - - let func_ident_ref = p.string_pool.intern(func_name.as_bytes()); - let procedure_id = p.heap.alloc_procedure_definition(|this| ProcedureDefinition { - this, - defined_in: RootId::new_invalid(), - builtin: true, - kind: ProcedureKind::Function, - span: InputSpan::new(), - identifier: Identifier{ span: InputSpan::new(), value: func_ident_ref.clone() }, - poly_vars: ast_poly_vars, - return_type: None, - parameters: Vec::new(), - scope: ScopeId::new_invalid(), - body: BlockStatementId::new_invalid(), - monomorphs: Vec::new(), - }); - - // Modify AST with more information about the procedure - let (arguments, return_type) = arg_and_return_fn(procedure_id); - - let mut parameters = Vec::with_capacity(arguments.len()); - for (arg_name, arg_type) in arguments { - let identifier = Identifier{ span: InputSpan::new(), value: p.string_pool.intern(arg_name.as_bytes()) }; - let param_id = p.heap.alloc_variable(|this| Variable{ - this, - kind: VariableKind::Parameter, - parser_type: arg_type.clone(), - identifier, - relative_pos_in_parent: 0, - unique_id_in_scope: 0 - }); - parameters.push(param_id); - } - - let func = &mut p.heap[procedure_id]; - func.parameters = parameters; - func.return_type = Some(return_type); - - // Insert into symbol table - p.symbol_table.insert_symbol(SymbolScope::Global, Symbol{ - name: func_ident_ref, - variant: SymbolVariant::Definition(SymbolDefinition{ - defined_in_module: RootId::new_invalid(), - defined_in_scope: SymbolScope::Global, - definition_span: InputSpan::new(), - identifier_span: InputSpan::new(), - imported_at: None, - class: DefinitionClass::Function, - definition_id: procedure_id.upcast(), - }) - }).unwrap(); - - // Insert into type table - // let mut concrete_type = ConcreteType::default(); - // concrete_type.parts.push(ConcreteTypePart::Function(procedure_id, type_poly_vars.len() as u32)); - // - // for _ in 0..type_poly_vars.len() { - // concrete_type.parts.push(ConcreteTypePart::Void); // doesn't matter (I hope...) - // } - // p.type_table.add_builtin_procedure_type(concrete_type, &type_poly_vars); } \ No newline at end of file diff --git a/src/protocol/parser/pass_definitions.rs b/src/protocol/parser/pass_definitions.rs index 8ce576bcfa720aef0e2e4d9701150fb216f14b7a..f1daca6c3081615e0dee6cdde1a16065d2eaf752 100644 --- a/src/protocol/parser/pass_definitions.rs +++ b/src/protocol/parser/pass_definitions.rs @@ -43,35 +43,33 @@ impl PassDefinitions { pub(crate) fn parse(&mut self, modules: &mut [Module], module_idx: usize, ctx: &mut PassCtx) -> Result<(), ParseError> { let module = &modules[module_idx]; - let module_range = &module.tokens.ranges[0]; debug_assert_eq!(module.phase, ModuleCompilationPhase::ImportsResolved); - debug_assert_eq!(module_range.range_kind, TokenRangeKind::Module); - // Although we only need to parse the definitions, we want to go through - // code ranges as well such that we can throw errors if we get - // unexpected tokens at the module level of the source. - let mut range_idx = module_range.first_child_idx; - loop { - let range_idx_usize = range_idx as usize; - let cur_range = &module.tokens.ranges[range_idx_usize]; - - match cur_range.range_kind { - TokenRangeKind::Module => unreachable!(), // should not be reachable - TokenRangeKind::Pragma | TokenRangeKind::Import => { - // Already fully parsed, fall through and go to next range - }, - TokenRangeKind::Definition | TokenRangeKind::Code => { - // Visit range even if it is a "code" range to provide - // proper error messages. - self.visit_range(modules, module_idx, ctx, range_idx_usize)?; - }, + // We iterate through the entire document. If we find a marker that has + // been handled then we skip over it. It is important that we properly + // parse all other tokens in the document to ensure that we throw the + // correct kind of errors. + let num_tokens = module.tokens.tokens.len() as u32; + let num_markers = module.tokens.markers.len(); + + let mut marker_index = 0; + let mut first_token_index = 0; + while first_token_index < num_tokens { + // Seek ahead to the next marker that was already handled. + let mut last_token_index = num_tokens; + let mut new_first_token_index = num_tokens; + while marker_index < num_markers { + let marker = &module.tokens.markers[marker_index]; + marker_index += 1; + if marker.handled { + last_token_index = marker.first_token; + new_first_token_index = marker.last_token; + break; + } } - if cur_range.next_sibling_idx == NO_SIBLING { - break; - } else { - range_idx = cur_range.next_sibling_idx; - } + self.visit_token_range(modules, module_idx, ctx, first_token_index, last_token_index)?; + first_token_index = new_first_token_index; } modules[module_idx].phase = ModuleCompilationPhase::DefinitionsParsed; @@ -79,15 +77,14 @@ impl PassDefinitions { Ok(()) } - fn visit_range( - &mut self, modules: &[Module], module_idx: usize, ctx: &mut PassCtx, range_idx: usize + fn visit_token_range( + &mut self, modules: &[Module], module_idx: usize, ctx: &mut PassCtx, + token_range_begin: u32, token_range_end: u32, ) -> Result<(), ParseError> { let module = &modules[module_idx]; - let cur_range = &module.tokens.ranges[range_idx]; - debug_assert!(cur_range.range_kind == TokenRangeKind::Definition || cur_range.range_kind == TokenRangeKind::Code); // Detect which definition we're parsing - let mut iter = module.tokens.iter_range(cur_range); + let mut iter = module.tokens.iter_range(token_range_begin, Some(token_range_end)); loop { let next = iter.next(); if next.is_none() { @@ -134,7 +131,7 @@ impl PassDefinitions { let start_pos = iter.last_valid_pos(); let parser_type = self.type_parser.consume_parser_type( iter, &ctx.heap, source, &ctx.symbols, poly_vars, definition_id, - module_scope, false, None + module_scope, false, false, None )?; let field = consume_ident_interned(source, iter, ctx)?; Ok(StructFieldDefinition{ @@ -221,7 +218,7 @@ impl PassDefinitions { let poly_vars = ctx.heap[definition_id].poly_vars(); self.type_parser.consume_parser_type( iter, &ctx.heap, source, &ctx.symbols, poly_vars, definition_id, - module_scope, false, None + module_scope, false, false, None ) }, &mut types_section, "an embedded type", Some(&mut close_pos) @@ -261,13 +258,15 @@ impl PassDefinitions { let definition_id = ctx.symbols.get_symbol_by_name_defined_in_scope(module_scope, ident_text) .unwrap().variant.as_definition().definition_id; self.cur_definition = definition_id; + let allow_compiler_types = module.is_compiler_file; consume_polymorphic_vars_spilled(&module.source, iter, ctx)?; // Parse function's argument list let mut parameter_section = self.variables.start_section(); consume_parameter_list( - &mut self.type_parser, &module.source, iter, ctx, &mut parameter_section, module_scope, definition_id + &mut self.type_parser, &module.source, iter, ctx, &mut parameter_section, + module_scope, definition_id, allow_compiler_types )?; let parameters = parameter_section.into_vec(); @@ -276,15 +275,16 @@ impl PassDefinitions { let poly_vars = ctx.heap[definition_id].poly_vars(); let parser_type = self.type_parser.consume_parser_type( iter, &ctx.heap, &module.source, &ctx.symbols, poly_vars, definition_id, - module_scope, false, None + module_scope, false, allow_compiler_types, None )?; - // Consume block and the definition's scope - let body_id = self.consume_block_statement(module, iter, ctx)?; + // Consume body + let (body_id, source) = self.consume_procedure_body(module, iter, ctx, definition_id, ProcedureKind::Function)?; let scope_id = ctx.heap.alloc_scope(|this| Scope::new(this, ScopeAssociation::Definition(definition_id))); // Assign everything in the preallocated AST node let function = ctx.heap[definition_id].as_procedure_mut(); + function.source = source; function.return_type = Some(parser_type); function.parameters = parameters; function.scope = scope_id; @@ -306,23 +306,27 @@ impl PassDefinitions { let definition_id = ctx.symbols.get_symbol_by_name_defined_in_scope(module_scope, ident_text) .unwrap().variant.as_definition().definition_id; self.cur_definition = definition_id; + let allow_compiler_types = module.is_compiler_file; consume_polymorphic_vars_spilled(&module.source, iter, ctx)?; // Parse component's argument list let mut parameter_section = self.variables.start_section(); consume_parameter_list( - &mut self.type_parser, &module.source, iter, ctx, &mut parameter_section, module_scope, definition_id + &mut self.type_parser, &module.source, iter, ctx, &mut parameter_section, + module_scope, definition_id, allow_compiler_types )?; let parameters = parameter_section.into_vec(); - // Consume block - let body_id = self.consume_block_statement(module, iter, ctx)?; + // Consume body + let procedure_kind = ctx.heap[definition_id].as_procedure().kind; + let (body_id, source) = self.consume_procedure_body(module, iter, ctx, definition_id, procedure_kind)?; let scope_id = ctx.heap.alloc_scope(|this| Scope::new(this, ScopeAssociation::Definition(definition_id))); // Assign everything in the AST node let component = ctx.heap[definition_id].as_procedure_mut(); debug_assert!(component.return_type.is_none()); + component.source = source; component.parameters = parameters; component.scope = scope_id; component.body = body_id; @@ -330,6 +334,70 @@ impl PassDefinitions { Ok(()) } + /// Consumes a procedure's body: either a user-defined procedure, which we + /// parse as normal, or a builtin function, where we'll make sure we expect + /// the particular builtin. + /// + /// We expect that the procedure's name is already stored in the + /// preallocated AST node. + fn consume_procedure_body( + &mut self, module: &Module, iter: &mut TokenIter, ctx: &mut PassCtx, definition_id: DefinitionId, kind: ProcedureKind + ) -> Result<(BlockStatementId, ProcedureSource), ParseError> { + if iter.next() == Some(TokenKind::OpenCurly) && iter.peek() == Some(TokenKind::Pragma) { + // Consume the placeholder "{ #builtin }" tokens + iter.consume(); // opening curly brace + let (pragma, pragma_span) = consume_pragma(&module.source, iter)?; + if pragma != b"#builtin" { + return Err(ParseError::new_error_str_at_span( + &module.source, pragma_span, + "expected a '#builtin' pragma, or a function body" + )); + } + + if iter.next() != Some(TokenKind::CloseCurly) { + // Just to keep the compiler writers in line ;) + panic!("compiler error: when using the #builtin pragma, wrap it in curly braces"); + } + iter.consume(); + + // Retrieve module and procedure name + assert!(module.name.is_some(), "compiler error: builtin procedure body in unnamed module"); + let (_, module_name) = module.name.as_ref().unwrap(); + let module_name = module_name.as_str(); + + let definition = ctx.heap[definition_id].as_procedure(); + let procedure_name = definition.identifier.value.as_str(); + + let source = match (module_name, procedure_name) { + ("std.global", "get") => ProcedureSource::FuncGet, + ("std.global", "put") => ProcedureSource::FuncPut, + ("std.global", "fires") => ProcedureSource::FuncFires, + ("std.global", "create") => ProcedureSource::FuncCreate, + ("std.global", "length") => ProcedureSource::FuncLength, + ("std.global", "assert") => ProcedureSource::FuncAssert, + ("std.global", "print") => ProcedureSource::FuncPrint, + ("std.random", "random_u32") => ProcedureSource::CompRandomU32, + ("std.internet", "tcp_client") => ProcedureSource::CompTcpClient, + _ => panic!( + "compiler error: unknown builtin procedure '{}' in module '{}'", + procedure_name, module_name + ), + }; + + return Ok((BlockStatementId::new_invalid(), source)); + } else { + let body_id = self.consume_block_statement(module, iter, ctx)?; + let source = match kind { + ProcedureKind::Function => + ProcedureSource::FuncUserDefined, + ProcedureKind::Primitive | ProcedureKind::Composite => + ProcedureSource::CompUserDefined, + }; + + return Ok((body_id, source)) + } + } + /// Consumes a statement and returns a boolean indicating whether it was a /// block or not. fn consume_statement(&mut self, module: &Module, iter: &mut TokenIter, ctx: &mut PassCtx) -> Result { @@ -759,10 +827,8 @@ impl PassDefinitions { if let Expression::Call(expression) = expression { // Allow both components and functions, as it makes more sense to // check their correct use in the validation and linking pass - if expression.method == Method::UserComponent || expression.method == Method::UserFunction { - call_id = expression.this; - valid = true; - } + call_id = expression.this; + valid = true; } if !valid { @@ -797,7 +863,7 @@ impl PassDefinitions { let parser_type = self.type_parser.consume_parser_type( iter, &ctx.heap, &module.source, &ctx.symbols, poly_vars, definition_id, SymbolScope::Module(module.root_id), - true, Some(angle_start_pos) + true, false, Some(angle_start_pos) )?; (parser_type.elements, parser_type.full_span.end) @@ -893,7 +959,8 @@ impl PassDefinitions { let parser_type = self.type_parser.consume_parser_type( iter, &ctx.heap, &module.source, &ctx.symbols, poly_vars, - definition_id, SymbolScope::Definition(definition_id), true, None + definition_id, SymbolScope::Definition(definition_id), + true, false, None ); if let Ok(parser_type) = parser_type { @@ -1456,11 +1523,25 @@ impl PassDefinitions { } else if next == Some(TokenKind::Integer) { let (literal, span) = consume_integer_literal(&module.source, iter, &mut self.buffer)?; + ctx.heap.alloc_literal_expression(|this| LiteralExpression { + this, + span, + value: Literal::Integer(LiteralInteger { unsigned_value: literal, negated: false }), + parent: ExpressionParent::None, + type_index: -1, + }).upcast() + } else if next == Some(TokenKind::Bytestring) { + let span = consume_bytestring_literal(&module.source, iter, &mut self.buffer)?; + let mut bytes = Vec::with_capacity(self.buffer.len()); + for byte in self.buffer.as_bytes().iter().copied() { + bytes.push(byte); + } + ctx.heap.alloc_literal_expression(|this| LiteralExpression{ this, span, - value: Literal::Integer(LiteralInteger{ unsigned_value: literal, negated: false }), + value: Literal::Bytestring(bytes), parent: ExpressionParent::None, - type_index: -1, + type_index: -1 }).upcast() } else if next == Some(TokenKind::String) { let span = consume_string_literal(&module.source, iter, &mut self.buffer)?; @@ -1500,7 +1581,7 @@ impl PassDefinitions { let poly_vars = ctx.heap[self.cur_definition].poly_vars(); let parser_type = self.type_parser.consume_parser_type( iter, &ctx.heap, &module.source, &ctx.symbols, poly_vars, self.cur_definition, - symbol_scope, true, None + symbol_scope, true, false, None )?; debug_assert!(!parser_type.elements.is_empty()); match parser_type.elements[0].variant { @@ -1579,22 +1660,21 @@ impl PassDefinitions { }, Definition::Procedure(proc_def) => { // Check whether it is a builtin function + // TODO: Once we start generating bytecode this is unnecessary let procedure_id = proc_def.this; - let method = if proc_def.builtin { - match proc_def.identifier.value.as_bytes() { - KW_FUNC_GET => Method::Get, - KW_FUNC_PUT => Method::Put, - KW_FUNC_FIRES => Method::Fires, - KW_FUNC_CREATE => Method::Create, - KW_FUNC_LENGTH => Method::Length, - KW_FUNC_ASSERT => Method::Assert, - KW_FUNC_PRINT => Method::Print, - _ => unreachable!(), - } - } else if proc_def.kind == ProcedureKind::Function { - Method::UserFunction - } else { - Method::UserComponent + let method = match proc_def.source { + ProcedureSource::FuncUserDefined => Method::UserFunction, + ProcedureSource::CompUserDefined => Method::UserComponent, + ProcedureSource::FuncGet => Method::Get, + ProcedureSource::FuncPut => Method::Put, + ProcedureSource::FuncFires => Method::Fires, + ProcedureSource::FuncCreate => Method::Create, + ProcedureSource::FuncLength => Method::Length, + ProcedureSource::FuncAssert => Method::Assert, + ProcedureSource::FuncPrint => Method::Print, + ProcedureSource::CompRandomU32 => Method::ComponentRandomU32, + ProcedureSource::CompTcpClient => Method::ComponentTcpClient, + _ => todo!("other procedure sources"), }; // Function call: consume the arguments @@ -1668,7 +1748,7 @@ impl PassDefinitions { self.type_parser.consume_parser_type( iter, &ctx.heap, &module.source, &ctx.symbols, poly_vars, definition_id, SymbolScope::Module(module.root_id), - true, Some(angle_start_pos) + true, false, Some(angle_start_pos) )? } else { // Automatic casting with inferred target type @@ -1804,7 +1884,7 @@ fn consume_polymorphic_vars_spilled(source: &InputSource, iter: &mut TokenIter, fn consume_parameter_list( parser: &mut ParserTypeParser, source: &InputSource, iter: &mut TokenIter, ctx: &mut PassCtx, target: &mut ScopedSection, - scope: SymbolScope, definition_id: DefinitionId + scope: SymbolScope, definition_id: DefinitionId, allow_compiler_types: bool ) -> Result<(), ParseError> { consume_comma_separated( TokenKind::OpenParen, TokenKind::CloseParen, source, iter, ctx, @@ -1812,7 +1892,7 @@ fn consume_parameter_list( let poly_vars = ctx.heap[definition_id].poly_vars(); // Rust being rust, multiple lookups let parser_type = parser.consume_parser_type( iter, &ctx.heap, source, &ctx.symbols, poly_vars, definition_id, - scope, false, None + scope, false, allow_compiler_types, None )?; let identifier = consume_ident_interned(source, iter, ctx)?; let parameter_id = ctx.heap.alloc_variable(|this| Variable{ diff --git a/src/protocol/parser/pass_definitions_types.rs b/src/protocol/parser/pass_definitions_types.rs index 11603e2ffe11da4a97c02a6c51ee3186070a0b61..0f574fc99222e1b195e5f437c2970d4d362ad125 100644 --- a/src/protocol/parser/pass_definitions_types.rs +++ b/src/protocol/parser/pass_definitions_types.rs @@ -59,7 +59,8 @@ impl ParserTypeParser { &mut self, iter: &mut TokenIter, heap: &Heap, source: &InputSource, symbols: &SymbolTable, poly_vars: &[Identifier], wrapping_definition: DefinitionId, cur_scope: SymbolScope, - allow_inference: bool, inside_angular_bracket: Option, + allow_inference: bool, allow_compiler_types: bool, + inside_angular_bracket: Option, ) -> Result { // Prepare self.entries.clear(); @@ -71,9 +72,10 @@ impl ParserTypeParser { } let initial_state = match iter.next() { - Some(TokenKind::Ident) => { + Some(TokenKind::Ident) | Some(TokenKind::Pragma) => { let element = Self::consume_parser_type_element( - iter, source, heap, symbols, wrapping_definition, poly_vars, cur_scope, allow_inference + iter, source, heap, symbols, wrapping_definition, poly_vars, cur_scope, + allow_inference, allow_compiler_types )?; self.first_pos = element.element_span.begin; self.last_pos = element.element_span.end; @@ -154,7 +156,8 @@ impl ParserTypeParser { // Allowed tokens: ident ( match next { Some(TokenKind::Ident) => self.consume_type_idents( - source, heap, symbols, wrapping_definition, poly_vars, cur_scope, allow_inference, iter + source, heap, symbols, wrapping_definition, poly_vars, cur_scope, + allow_inference, allow_compiler_types, iter )?, Some(TokenKind::OpenParen) => self.consume_open_paren(iter), _ => return Err(ParseError::new_error_str_at_pos( @@ -168,7 +171,8 @@ impl ParserTypeParser { // We'll strip the nested tuple later in this function match next { Some(TokenKind::Ident) => self.consume_type_idents( - source, heap, symbols, wrapping_definition, poly_vars, cur_scope, allow_inference, iter + source, heap, symbols, wrapping_definition, poly_vars, cur_scope, + allow_inference, allow_compiler_types, iter )?, Some(TokenKind::OpenParen) => self.consume_open_paren(iter), Some(TokenKind::CloseParen) => self.consume_close_paren(source, iter)?, @@ -182,7 +186,8 @@ impl ParserTypeParser { // Allowed tokens: ident ( > >> ) match next { Some(TokenKind::Ident) => self.consume_type_idents( - source, heap, symbols, wrapping_definition, poly_vars, cur_scope, allow_inference, iter + source, heap, symbols, wrapping_definition, poly_vars, cur_scope, + allow_inference, allow_compiler_types, iter )?, Some(TokenKind::OpenParen) => self.consume_open_paren(iter), Some(TokenKind::CloseAngle) => self.consume_close_angle(source, iter)?, @@ -288,10 +293,12 @@ impl ParserTypeParser { fn consume_type_idents( &mut self, source: &InputSource, heap: &Heap, symbols: &SymbolTable, wrapping_definition: DefinitionId, poly_vars: &[Identifier], - cur_scope: SymbolScope, allow_inference: bool, iter: &mut TokenIter + cur_scope: SymbolScope, allow_inference: bool, allow_compiler_types: bool, + iter: &mut TokenIter ) -> Result<(), ParseError> { let element = Self::consume_parser_type_element( - iter, source, heap, symbols, wrapping_definition, poly_vars, cur_scope, allow_inference + iter, source, heap, symbols, wrapping_definition, poly_vars, cur_scope, + allow_inference, allow_compiler_types )?; let depth = self.cur_depth(); self.last_pos = element.element_span.end; @@ -428,11 +435,35 @@ impl ParserTypeParser { fn consume_parser_type_element( iter: &mut TokenIter, source: &InputSource, heap: &Heap, symbols: &SymbolTable, wrapping_definition: DefinitionId, poly_vars: &[Identifier], - mut scope: SymbolScope, allow_inference: bool, + mut scope: SymbolScope, allow_inference: bool, allow_compiler_types: bool, ) -> Result { use ParserTypeVariant as PTV; - let (mut type_text, mut type_span) = consume_any_ident(source, iter)?; + // Early check for special builtin types available to the compiler + if iter.next() == Some(TokenKind::Pragma) { + let (type_text, pragma_span) = consume_pragma(source, iter)?; + let variant = match type_text { + PRAGMA_TYPE_VOID => Some(PTV::Void), + PRAGMA_TYPE_PORTLIKE => Some(PTV::InputOrOutput), + PRAGMA_TYPE_INTEGERLIKE => Some(PTV::IntegerLike), + PRAGMA_TYPE_ARRAYLIKE => Some(PTV::ArrayLike), + _ => None, + }; + + if !allow_compiler_types || variant.is_none() { + return Err(ParseError::new_error_str_at_span( + source, pragma_span, "unexpected pragma in type" + )); + } + + return Ok(ParserTypeElement{ + variant: variant.unwrap(), + element_span: pragma_span, + }); + } + + // No special type, parse as normal + let (mut type_text, mut type_span) = consume_any_ident(source, iter)?; let variant = match type_text { KW_TYPE_MESSAGE => PTV::Message, KW_TYPE_BOOL => PTV::Bool, diff --git a/src/protocol/parser/pass_imports.rs b/src/protocol/parser/pass_imports.rs index 98c06ff2fa3f4c2d752e446b720efc40f160e2ca..7fe1d8c77451ed17ffe7335cbc31969f15b27d68 100644 --- a/src/protocol/parser/pass_imports.rs +++ b/src/protocol/parser/pass_imports.rs @@ -25,28 +25,23 @@ impl PassImport { } pub(crate) fn parse(&mut self, modules: &mut [Module], module_idx: usize, ctx: &mut PassCtx) -> Result<(), ParseError> { let module = &modules[module_idx]; - let module_range = &module.tokens.ranges[0]; debug_assert!(modules.iter().all(|m| m.phase >= ModuleCompilationPhase::SymbolsScanned)); debug_assert_eq!(module.phase, ModuleCompilationPhase::SymbolsScanned); - debug_assert_eq!(module_range.range_kind, TokenRangeKind::Module); - let mut range_idx = module_range.first_child_idx; - loop { - let range_idx_usize = range_idx as usize; - let cur_range = &module.tokens.ranges[range_idx_usize]; + let module_root_id = module.root_id; + let num_markers = module.tokens.markers.len(); - if cur_range.range_kind == TokenRangeKind::Import { - self.visit_import_range(modules, module_idx, ctx, range_idx_usize)?; - } - - if cur_range.next_sibling_idx == NO_SIBLING { - break; - } else { - range_idx = cur_range.next_sibling_idx; + for marker_index in 0..num_markers { + let marker = &modules[module_idx].tokens.markers[marker_index]; + match marker.kind { + TokenMarkerKind::Import => { + self.visit_import_marker(modules, module_idx, ctx, marker_index)?; + }, + TokenMarkerKind::Definition | TokenMarkerKind::Pragma => {}, } } - let root = &mut ctx.heap[module.root_id]; + let root = &mut ctx.heap[module_root_id]; root.imports.extend(self.imports.drain(..)); let module = &mut modules[module_idx]; @@ -55,14 +50,13 @@ impl PassImport { Ok(()) } - pub(crate) fn visit_import_range( - &mut self, modules: &[Module], module_idx: usize, ctx: &mut PassCtx, range_idx: usize + pub(crate) fn visit_import_marker( + &mut self, modules: &mut [Module], module_idx: usize, ctx: &mut PassCtx, marker_index: usize ) -> Result<(), ParseError> { let module = &modules[module_idx]; - let import_range = &module.tokens.ranges[range_idx]; - debug_assert_eq!(import_range.range_kind, TokenRangeKind::Import); + let marker = &module.tokens.markers[marker_index]; - let mut iter = module.tokens.iter_range(import_range); + let mut iter = module.tokens.iter_range(marker.first_token, None); // Consume "import" let (_import_ident, import_span) = @@ -315,6 +309,12 @@ impl PassImport { consume_token(&module.source, &mut iter, TokenKind::SemiColon)?; self.imports.push(import_id); + // Update the marker + let marker_last_token = iter.token_index(); + let marker = &mut modules[module_idx].tokens.markers[marker_index]; + marker.last_token = marker_last_token; + marker.handled = true; + Ok(()) } } diff --git a/src/protocol/parser/pass_rewriting.rs b/src/protocol/parser/pass_rewriting.rs index a9f869d38c905cdbbe880ecdbd4bc522a8bf5b82..82702bd1056e5706249f1636604aba452aeb820c 100644 --- a/src/protocol/parser/pass_rewriting.rs +++ b/src/protocol/parser/pass_rewriting.rs @@ -49,6 +49,10 @@ impl Visitor for PassRewriting { fn visit_procedure_definition(&mut self, ctx: &mut Ctx, id: ProcedureDefinitionId) -> VisitorResult { let definition = &ctx.heap[id]; + if definition.source.is_builtin() { + return Ok(()); + } + let body_id = definition.body; self.current_scope = definition.scope; self.current_procedure_id = id; diff --git a/src/protocol/parser/pass_symbols.rs b/src/protocol/parser/pass_symbols.rs index 995dfa15100c3f0d1e3f44779257ea150cbeb60e..2688658feb17f4d0e48c04846443c44f55b7a46c 100644 --- a/src/protocol/parser/pass_symbols.rs +++ b/src/protocol/parser/pass_symbols.rs @@ -45,11 +45,10 @@ impl PassSymbols { self.reset(); let module = &mut modules[module_idx]; - let module_range = &module.tokens.ranges[0]; + let add_to_global_namespace = module.add_to_global_namespace; debug_assert_eq!(module.phase, ModuleCompilationPhase::Tokenized); - debug_assert_eq!(module_range.range_kind, TokenRangeKind::Module); - debug_assert!(module.root_id.is_invalid()); // not set yet, + debug_assert!(module.root_id.is_invalid()); // not set yet // Preallocate root in the heap let root_id = ctx.heap.alloc_protocol_description(|this| { @@ -62,28 +61,21 @@ impl PassSymbols { }); module.root_id = root_id; - // Retrieve first range index, then make immutable borrow - let mut range_idx = module_range.first_child_idx; - - // Visit token ranges to detect definitions and pragmas - loop { + // Use pragma token markers to detects symbol definitions and pragmas + let num_markers = module.tokens.markers.len(); + for marker_index in 0..num_markers { let module = &modules[module_idx]; - let range_idx_usize = range_idx as usize; - let cur_range = &module.tokens.ranges[range_idx_usize]; - let next_sibling_idx = cur_range.next_sibling_idx; - let range_kind = cur_range.range_kind; + let marker = &module.tokens.markers[marker_index]; // Parse if it is a definition or a pragma - if range_kind == TokenRangeKind::Definition { - self.visit_definition_range(modules, module_idx, ctx, range_idx_usize)?; - } else if range_kind == TokenRangeKind::Pragma { - self.visit_pragma_range(modules, module_idx, ctx, range_idx_usize)?; - } - - if next_sibling_idx == NO_SIBLING { - break; - } else { - range_idx = next_sibling_idx; + match marker.kind { + TokenMarkerKind::Pragma => { + self.visit_pragma_marker(modules, module_idx, ctx, marker_index)?; + }, + TokenMarkerKind::Definition => { + self.visit_definition_marker(modules, module_idx, ctx, marker_index)?; + } + TokenMarkerKind::Import => {}, // we don't care yet } } @@ -97,6 +89,14 @@ impl PassSymbols { } } + if add_to_global_namespace { + debug_assert!(self.symbols.is_empty()); + ctx.symbols.get_all_symbols_defined_in_scope(module_scope, &mut self.symbols); + for symbol in self.symbols.drain(..) { + ctx.symbols.insert_symbol_in_global_scope(symbol); + } + } + // Modify the preallocated root let root = &mut ctx.heap[root_id]; root.pragmas.extend(self.pragmas.drain(..)); @@ -109,29 +109,27 @@ impl PassSymbols { Ok(()) } - fn visit_pragma_range(&mut self, modules: &mut [Module], module_idx: usize, ctx: &mut PassCtx, range_idx: usize) -> Result<(), ParseError> { + fn visit_pragma_marker(&mut self, modules: &mut [Module], module_idx: usize, ctx: &mut PassCtx, marker_index: usize) -> Result<(), ParseError> { let module = &mut modules[module_idx]; - let range = &module.tokens.ranges[range_idx]; - let mut iter = module.tokens.iter_range(range); + let marker = &module.tokens.markers[marker_index]; + let mut iter = module.tokens.iter_range(marker.first_token, None); // Consume pragma name - let (pragma_section, pragma_start, _) = consume_pragma(&module.source, &mut iter)?; + let (pragma_section, mut pragma_span) = consume_pragma(&module.source, &mut iter)?; // Consume pragma values if pragma_section == b"#module" { // Check if name is defined twice within the same file if self.has_pragma_module { - return Err(ParseError::new_error_str_at_pos(&module.source, pragma_start, "module name is defined twice")); + return Err(ParseError::new_error_str_at_span(&module.source, pragma_span, "module name is defined twice")); } - // Consume the domain-name + // Consume the domain-name, then record end of pragma let (module_name, module_span) = consume_domain_ident(&module.source, &mut iter)?; - if iter.next().is_some() { - return Err(ParseError::new_error_str_at_pos(&module.source, iter.last_valid_pos(), "expected end of #module pragma after module name")); - } + let marker_last_token = iter.token_index(); // Add to heap and symbol table - let pragma_span = InputSpan::from_positions(pragma_start, module_span.end); + pragma_span.end = module_span.end; let module_name = ctx.pool.intern(module_name); let pragma_id = ctx.heap.alloc_pragma(|this| Pragma::Module(PragmaModule{ this, @@ -153,42 +151,45 @@ impl PassSymbols { )); } + let marker = &mut module.tokens.markers[marker_index]; + marker.last_token = marker_last_token; + marker.handled = true; + module.name = Some((pragma_id, module_name)); self.has_pragma_module = true; } else if pragma_section == b"#version" { // Check if version is defined twice within the same file if self.has_pragma_version { - return Err(ParseError::new_error_str_at_pos(&module.source, pragma_start, "module version is defined twice")); + return Err(ParseError::new_error_str_at_span(&module.source, pragma_span, "module version is defined twice")); } // Consume the version pragma let (version, version_span) = consume_integer_literal(&module.source, &mut iter, &mut self.buffer)?; + let marker_last_token = iter.token_index(); + + pragma_span.end = version_span.end; let pragma_id = ctx.heap.alloc_pragma(|this| Pragma::Version(PragmaVersion{ this, - span: InputSpan::from_positions(pragma_start, version_span.end), + span: pragma_span, version, })); self.pragmas.push(pragma_id); + let marker = &mut module.tokens.markers[marker_index]; + marker.last_token = marker_last_token; + marker.handled = true; + module.version = Some((pragma_id, version as i64)); self.has_pragma_version = true; - } else { - // Custom pragma, maybe we support this in the future, but for now - // we don't. - return Err(ParseError::new_error_str_at_pos(&module.source, pragma_start, "illegal pragma name")); - } + } // else: custom pragma used for something else, will be handled later (or rejected with an error) Ok(()) } - fn visit_definition_range(&mut self, modules: &[Module], module_idx: usize, ctx: &mut PassCtx, range_idx: usize) -> Result<(), ParseError> { + fn visit_definition_marker(&mut self, modules: &[Module], module_idx: usize, ctx: &mut PassCtx, marker_index: usize) -> Result<(), ParseError> { let module = &modules[module_idx]; - let range = &module.tokens.ranges[range_idx]; - let definition_span = InputSpan::from_positions( - module.tokens.start_pos(range), - module.tokens.end_pos(range) - ); - let mut iter = module.tokens.iter_range(range); + let marker = &module.tokens.markers[marker_index]; + let mut iter = module.tokens.iter_range(marker.first_token, None); // First ident must be type of symbol let (kw_text, _) = consume_any_ident(&module.source, &mut iter).unwrap(); @@ -210,28 +211,28 @@ impl PassSymbols { match kw_text { KW_STRUCT => { let struct_def_id = ctx.heap.alloc_struct_definition(|this| { - StructDefinition::new_empty(this, module.root_id, definition_span, identifier, poly_vars) + StructDefinition::new_empty(this, module.root_id, identifier, poly_vars) }); definition_class = DefinitionClass::Struct; ast_definition_id = struct_def_id.upcast(); }, KW_ENUM => { let enum_def_id = ctx.heap.alloc_enum_definition(|this| { - EnumDefinition::new_empty(this, module.root_id, definition_span, identifier, poly_vars) + EnumDefinition::new_empty(this, module.root_id, identifier, poly_vars) }); definition_class = DefinitionClass::Enum; ast_definition_id = enum_def_id.upcast(); }, KW_UNION => { let union_def_id = ctx.heap.alloc_union_definition(|this| { - UnionDefinition::new_empty(this, module.root_id, definition_span, identifier, poly_vars) + UnionDefinition::new_empty(this, module.root_id, identifier, poly_vars) }); definition_class = DefinitionClass::Union; ast_definition_id = union_def_id.upcast() }, KW_FUNCTION => { let proc_def_id = ctx.heap.alloc_procedure_definition(|this| { - ProcedureDefinition::new_empty(this, module.root_id, definition_span, ProcedureKind::Function, identifier, poly_vars) + ProcedureDefinition::new_empty(this, module.root_id, ProcedureKind::Function, identifier, poly_vars) }); definition_class = DefinitionClass::Function; ast_definition_id = proc_def_id.upcast(); @@ -243,7 +244,7 @@ impl PassSymbols { ProcedureKind::Composite }; let proc_def_id = ctx.heap.alloc_procedure_definition(|this| { - ProcedureDefinition::new_empty(this, module.root_id, definition_span, procedure_kind, identifier, poly_vars) + ProcedureDefinition::new_empty(this, module.root_id, procedure_kind, identifier, poly_vars) }); definition_class = DefinitionClass::Component; ast_definition_id = proc_def_id.upcast(); @@ -256,7 +257,6 @@ impl PassSymbols { variant: SymbolVariant::Definition(SymbolDefinition{ defined_in_module: module.root_id, defined_in_scope: SymbolScope::Module(module.root_id), - definition_span, identifier_span: ident_span, imported_at: None, class: definition_class, diff --git a/src/protocol/parser/pass_tokenizer.rs b/src/protocol/parser/pass_tokenizer.rs index d41b6f65939a7267f7a38fb635d78b51fb634e1d..c611c9c4dc6b79e5d39c2f23c19a8742c7f8db3c 100644 --- a/src/protocol/parser/pass_tokenizer.rs +++ b/src/protocol/parser/pass_tokenizer.rs @@ -21,15 +21,12 @@ pub(crate) struct PassTokenizer { // unmatched opening braces, unmatched closing braces are detected // immediately. curly_stack: Vec, - // Points to an element in the `TokenBuffer.ranges` variable. - stack_idx: usize, } impl PassTokenizer { pub(crate) fn new() -> Self { Self{ curly_stack: Vec::with_capacity(32), - stack_idx: 0 } } @@ -37,23 +34,6 @@ impl PassTokenizer { // Assert source and buffer are at start debug_assert_eq!(source.pos().offset, 0); debug_assert!(target.tokens.is_empty()); - debug_assert!(target.ranges.is_empty()); - - // Set up for tokenization by pushing the first range onto the stack. - // This range may get transformed into the appropriate range kind later, - // see `push_range` and `pop_range`. - self.stack_idx = 0; - target.ranges.push(TokenRange{ - parent_idx: NO_RELATION, - range_kind: TokenRangeKind::Module, - curly_depth: 0, - start: 0, - end: 0, - num_child_ranges: 0, - first_child_idx: NO_RELATION, - last_child_idx: NO_RELATION, - next_sibling_idx: NO_RELATION, - }); // Main tokenization loop while let Some(c) = source.next() { @@ -61,35 +41,31 @@ impl PassTokenizer { if is_char_literal_start(c) { self.consume_char_literal(source, target)?; + } else if is_bytestring_literal_start(c, source) { + self.consume_bytestring_literal(source, target)?; } else if is_string_literal_start(c) { self.consume_string_literal(source, target)?; } else if is_identifier_start(c) { let ident = self.consume_identifier(source, target)?; - if demarks_definition(ident) { - self.push_range(target, TokenRangeKind::Definition, token_index); + if demarks_symbol(ident) { + self.emit_marker(target, TokenMarkerKind::Definition, token_index); } else if demarks_import(ident) { - self.push_range(target, TokenRangeKind::Import, token_index); + self.emit_marker(target, TokenMarkerKind::Import, token_index); } } else if is_integer_literal_start(c) { self.consume_number(source, target)?; } else if is_pragma_start_or_pound(c) { let was_pragma = self.consume_pragma_or_pound(c, source, target)?; if was_pragma { - self.push_range(target, TokenRangeKind::Pragma, token_index); + self.emit_marker(target, TokenMarkerKind::Pragma, token_index); } } else if self.is_line_comment_start(c, source) { self.consume_line_comment(source, target)?; } else if self.is_block_comment_start(c, source) { self.consume_block_comment(source, target)?; } else if is_whitespace(c) { - let contained_newline = self.consume_whitespace(source); - if contained_newline { - let range = &target.ranges[self.stack_idx]; - if range.range_kind == TokenRangeKind::Pragma { - self.pop_range(target, target.tokens.len() as u32); - } - } + self.consume_whitespace(source); } else { let was_punctuation = self.maybe_parse_punctuation(c, source, target)?; if let Some((token, token_pos)) = was_punctuation { @@ -105,20 +81,6 @@ impl PassTokenizer { } self.curly_stack.pop(); - - let range = &target.ranges[self.stack_idx]; - if range.range_kind == TokenRangeKind::Definition && range.curly_depth == self.curly_stack.len() as u32 { - self.pop_range(target, target.tokens.len() as u32); - } - - // Exit early if we have more closing curly braces than - // opening curly braces - } else if token == TokenKind::SemiColon { - // Check if this marks the end of an import - let range = &target.ranges[self.stack_idx]; - if range.range_kind == TokenRangeKind::Import { - self.pop_range(target, target.tokens.len() as u32); - } } } else { return Err(ParseError::new_error_str_at_pos( @@ -142,21 +104,6 @@ impl PassTokenizer { )); } - // Ranges that did not depend on curly braces may have missing tokens. - // So close all of the active tokens - while self.stack_idx != 0 { - self.pop_range(target, target.tokens.len() as u32); - } - - // And finally, we may have a token range at the end that doesn't belong - // to a range yet, so insert a "code" range if this is the case. - debug_assert_eq!(self.stack_idx, 0); - let last_registered_idx = target.ranges[0].end; - let last_token_idx = target.tokens.len() as u32; - if last_registered_idx != last_token_idx { - self.add_code_range(target, 0, last_registered_idx, last_token_idx, NO_RELATION); - } - Ok(()) } @@ -411,41 +358,21 @@ impl PassTokenizer { Ok(()) } - fn consume_string_literal(&mut self, source: &mut InputSource, target: &mut TokenBuffer) -> Result<(), ParseError> { + fn consume_bytestring_literal(&mut self, source: &mut InputSource, target: &mut TokenBuffer) -> Result<(), ParseError> { let begin_pos = source.pos(); - - // Consume the leading double quotes - debug_assert!(source.next().unwrap() == b'"'); + debug_assert!(source.next().unwrap() == b'b'); source.consume(); - let mut prev_char = b'"'; - while let Some(c) = source.next() { - if !c.is_ascii() { - return Err(ParseError::new_error_str_at_pos(source, source.pos(), "non-ASCII character in string literal")); - } - - source.consume(); - if c == b'"' && prev_char != b'\\' { - // Unescaped string terminator - prev_char = c; - break; - } - - if prev_char == b'\\' && c == b'\\' { - // Escaped backslash, set prev_char to bogus to not conflict - // with escaped-" and unterminated string literal detection. - prev_char = b'\0'; - } else { - prev_char = c; - } - } + let end_pos = self.consume_ascii_string(begin_pos, source)?; + target.tokens.push(Token::new(TokenKind::Bytestring, begin_pos)); + target.tokens.push(Token::new(TokenKind::SpanEnd, end_pos)); - if prev_char != b'"' { - // Unterminated string literal - return Err(ParseError::new_error_str_at_pos(source, begin_pos, "encountered unterminated string literal")); - } + Ok(()) + } - let end_pos = source.pos(); + fn consume_string_literal(&mut self, source: &mut InputSource, target: &mut TokenBuffer) -> Result<(), ParseError> { + let begin_pos = source.pos(); + let end_pos = self.consume_ascii_string(begin_pos, source)?; target.tokens.push(Token::new(TokenKind::String, begin_pos)); target.tokens.push(Token::new(TokenKind::SpanEnd, end_pos)); @@ -509,10 +436,9 @@ impl PassTokenizer { // Modify offset to not include the newline characters if cur_char == b'\n' { if prev_char == b'\r' { - end_pos.offset -= 2; - } else { end_pos.offset -= 1; } + // Consume final newline source.consume(); } else { @@ -604,6 +530,44 @@ impl PassTokenizer { Ok(()) } + // Consumes the ascii string (including leading and trailing quotation + // marks) and returns the input position *after* the last quotation mark (or + // an error, if something went wrong). + fn consume_ascii_string(&self, begin_pos: InputPosition, source: &mut InputSource) -> Result { + debug_assert!(source.next().unwrap() == b'"'); + source.consume(); + + let mut prev_char = b'"'; + while let Some(c) = source.next() { + if !c.is_ascii() { + return Err(ParseError::new_error_str_at_pos(source, source.pos(), "non-ASCII character in string literal")); + } + + source.consume(); + if c == b'"' && prev_char != b'\\' { + // Unescaped string terminator + prev_char = c; + break; + } + + if prev_char == b'\\' && c == b'\\' { + // Escaped backslash, set prev_char to bogus to not conflict + // with escaped-" and unterminated string literal detection. + prev_char = b'\0'; + } else { + prev_char = c; + } + } + + if prev_char != b'"' { + // Unterminated string literal + return Err(ParseError::new_error_str_at_pos(source, begin_pos, "encountered unterminated string literal")); + } + + let end_pos = source.pos(); + return Ok(end_pos) + } + // Consumes whitespace and returns whether or not the whitespace contained // a newline. fn consume_whitespace(&self, source: &mut InputSource) -> bool { @@ -624,99 +588,22 @@ impl PassTokenizer { has_newline } - fn add_code_range( - &mut self, target: &mut TokenBuffer, parent_idx: i32, - code_start_idx: u32, code_end_idx: u32, next_sibling_idx: i32 - ) { - let new_range_idx = target.ranges.len() as i32; - let parent_range = &mut target.ranges[parent_idx as usize]; - debug_assert_ne!(parent_range.end, code_end_idx, "called push_code_range without a need to do so"); - - let sibling_idx = parent_range.last_child_idx; - - parent_range.last_child_idx = new_range_idx; - parent_range.end = code_end_idx; - parent_range.num_child_ranges += 1; - - let curly_depth = self.curly_stack.len() as u32; - target.ranges.push(TokenRange{ - parent_idx, - range_kind: TokenRangeKind::Code, - curly_depth, - start: code_start_idx, - end: code_end_idx, - num_child_ranges: 0, - first_child_idx: NO_RELATION, - last_child_idx: NO_RELATION, - next_sibling_idx, + fn emit_marker(&mut self, target: &mut TokenBuffer, kind: TokenMarkerKind, first_token: u32) { + debug_assert!( + target.markers + .last().map(|v| v.first_token < first_token) + .unwrap_or(true) + ); + + target.markers.push(TokenMarker{ + kind, + curly_depth: self.curly_stack.len() as u32, + first_token, + last_token: u32::MAX, + handled: false, }); - - // Fix up the sibling indices - if sibling_idx != NO_RELATION { - let sibling_range = &mut target.ranges[sibling_idx as usize]; - sibling_range.next_sibling_idx = new_range_idx; - } - } - - fn push_range(&mut self, target: &mut TokenBuffer, range_kind: TokenRangeKind, first_token_idx: u32) { - let new_range_idx = target.ranges.len() as i32; - let parent_idx = self.stack_idx as i32; - let parent_range = &mut target.ranges[self.stack_idx]; - - if parent_range.first_child_idx == NO_RELATION { - parent_range.first_child_idx = new_range_idx; - } - - let last_registered_idx = parent_range.end; - if last_registered_idx != first_token_idx { - self.add_code_range(target, parent_idx, last_registered_idx, first_token_idx, new_range_idx + 1); - } - - // Push the new range - self.stack_idx = target.ranges.len(); - let curly_depth = self.curly_stack.len() as u32; - target.ranges.push(TokenRange{ - parent_idx, - range_kind, - curly_depth, - start: first_token_idx, - end: first_token_idx, // modified when popped - num_child_ranges: 0, - first_child_idx: NO_RELATION, - last_child_idx: NO_RELATION, - next_sibling_idx: NO_RELATION - }) - } - - fn pop_range(&mut self, target: &mut TokenBuffer, end_token_idx: u32) { - let popped_idx = self.stack_idx as i32; - let popped_range = &mut target.ranges[self.stack_idx]; - debug_assert!(self.stack_idx != 0, "attempting to pop top-level range"); - - // Fix up the current range before going back to parent - popped_range.end = end_token_idx; - debug_assert_ne!(popped_range.start, end_token_idx); - - // Go back to parent and fix up its child pointers, but remember the - // last child, so we can link it to the newly popped range. - self.stack_idx = popped_range.parent_idx as usize; - let parent = &mut target.ranges[self.stack_idx]; - if parent.first_child_idx == NO_RELATION { - parent.first_child_idx = popped_idx; - } - let prev_sibling_idx = parent.last_child_idx; - parent.last_child_idx = popped_idx; - parent.end = end_token_idx; - parent.num_child_ranges += 1; - - // Fix up the sibling (if it exists) - if prev_sibling_idx != NO_RELATION { - let sibling = &mut target.ranges[prev_sibling_idx as usize]; - sibling.next_sibling_idx = popped_idx; - } } - fn check_ascii(&self, source: &InputSource) -> Result<(), ParseError> { match source.next() { Some(c) if !c.is_ascii() => { @@ -730,7 +617,7 @@ impl PassTokenizer { } // Helpers for characters -fn demarks_definition(ident: &[u8]) -> bool { +fn demarks_symbol(ident: &[u8]) -> bool { return ident == KW_STRUCT || ident == KW_ENUM || @@ -740,22 +627,32 @@ fn demarks_definition(ident: &[u8]) -> bool { ident == KW_COMPOSITE } +#[inline] fn demarks_import(ident: &[u8]) -> bool { return ident == KW_IMPORT; } +#[inline] fn is_whitespace(c: u8) -> bool { c.is_ascii_whitespace() } +#[inline] fn is_char_literal_start(c: u8) -> bool { return c == b'\''; } +#[inline] +fn is_bytestring_literal_start(c: u8, source: &InputSource) -> bool { + return c == b'b' && source.lookahead(1) == Some(b'"'); +} + +#[inline] fn is_string_literal_start(c: u8) -> bool { return c == b'"'; } +#[inline] fn is_pragma_start_or_pound(c: u8) -> bool { return c == b'#'; } @@ -775,6 +672,7 @@ fn is_identifier_remaining(c: u8) -> bool { c == b'_' } +#[inline] fn is_integer_literal_start(c: u8) -> bool { return c >= b'0' && c <= b'9'; } diff --git a/src/protocol/parser/pass_typing.rs b/src/protocol/parser/pass_typing.rs index e0a8f8cd4e6f8ef95c3c3668a7c97f4978515729..3d8c5da010bc001ef8f05a1879bab46bc697f8fe 100644 --- a/src/protocol/parser/pass_typing.rs +++ b/src/protocol/parser/pass_typing.rs @@ -64,6 +64,7 @@ const VOID_TEMPLATE: [InferenceTypePart; 1] = [ InferenceTypePart::Void ]; const MESSAGE_TEMPLATE: [InferenceTypePart; 2] = [ InferenceTypePart::Message, InferenceTypePart::UInt8 ]; const BOOL_TEMPLATE: [InferenceTypePart; 1] = [ InferenceTypePart::Bool ]; const CHARACTER_TEMPLATE: [InferenceTypePart; 1] = [ InferenceTypePart::Character ]; +const BYTEARRAY_TEMPLATE: [InferenceTypePart; 2] = [ InferenceTypePart::Array, InferenceTypePart::UInt8 ]; const STRING_TEMPLATE: [InferenceTypePart; 2] = [ InferenceTypePart::String, InferenceTypePart::Character ]; const NUMBERLIKE_TEMPLATE: [InferenceTypePart; 1] = [ InferenceTypePart::NumberLike ]; const INTEGERLIKE_TEMPLATE: [InferenceTypePart; 1] = [ InferenceTypePart::IntegerLike ]; @@ -1223,6 +1224,7 @@ impl PassTyping { self.procedure_id = id; self.procedure_kind = procedure_def.kind; let body_id = procedure_def.body; + let procedure_is_builtin = procedure_def.source.is_builtin(); debug_log!("{}", "-".repeat(50)); debug_log!("Visiting procedure: '{}' (id: {}, kind: {:?})", procedure_def.identifier.value.as_str(), id.0.index, procedure_def.kind); @@ -1245,7 +1247,11 @@ impl PassTyping { // Visit all of the expressions within the body self.parent_index = None; - return self.visit_block_stmt(ctx, body_id); + if !procedure_is_builtin { + return self.visit_block_stmt(ctx, body_id); + } else { + return Ok(()); + } } // Statements @@ -1721,6 +1727,10 @@ impl PassTyping { let node = &mut self.infer_nodes[self_index]; node.inference_rule = InferenceRule::MonoTemplate(InferenceRuleTemplate::new_forced(&CHARACTER_TEMPLATE)); }, + Literal::Bytestring(_) => { + let node = &mut self.infer_nodes[self_index]; + node.inference_rule = InferenceRule::MonoTemplate(InferenceRuleTemplate::new_forced(&BYTEARRAY_TEMPLATE)); + }, Literal::String(_) => { let node = &mut self.infer_nodes[self_index]; node.inference_rule = InferenceRule::MonoTemplate(InferenceRuleTemplate::new_forced(&STRING_TEMPLATE)); @@ -1871,6 +1881,7 @@ impl PassTyping { expr_ids.forget(); let argument_indices = expr_indices.into_vec(); + let node = &mut self.infer_nodes[self_index]; node.poly_data_index = extra_index; node.inference_rule = InferenceRule::CallExpr(InferenceRuleCallExpr{ @@ -2063,7 +2074,7 @@ impl PassTyping { ctx, infer_node.expr_id, &poly_data.poly_vars, first_part )?; - let (type_id, monomorph_index) = if let Some(type_id) = ctx.types.get_procedure_monomorph_type_id(&definition_id, &signature_type.parts) { + let (type_id, monomorph_index) = if let Some(type_id) = ctx.types.get_monomorph_type_id(&definition_id, &signature_type.parts) { // Procedure is already typechecked let monomorph_index = ctx.types.get_monomorph(type_id).variant.as_procedure().monomorph_index; (type_id, monomorph_index) @@ -2074,7 +2085,7 @@ impl PassTyping { procedure_to_check.monomorphs.push(ProcedureDefinitionMonomorph::new_invalid()); let type_id = ctx.types.reserve_procedure_monomorph_type_id(&definition_id, signature_type, monomorph_index); - if !procedure_to_check.builtin { + if !procedure_to_check.source.is_builtin() { // Only perform typechecking on the user-defined // procedures queue.push_back(ResolveQueueElement{ @@ -2141,7 +2152,10 @@ impl PassTyping { use InferenceRule as IR; let node = &self.infer_nodes[node_index]; - match &node.inference_rule { + debug_log!("Progressing inference node (node_index: {})", node_index); + debug_log!(" * Expression ID: {}", node.expr_id.index); + debug_log!(" * Expression type pre : {}", node.expr_type.display_name(&ctx.heap)); + let result = match &node.inference_rule { IR::Noop => unreachable!(), IR::MonoTemplate(_) => @@ -2178,7 +2192,10 @@ impl PassTyping { self.progress_inference_rule_call_expr(ctx, node_index), IR::VariableExpr(_) => self.progress_inference_rule_variable_expr(ctx, node_index), - } + }; + + debug_log!(" * Expression type post: {}", self.infer_nodes[node_index].expr_type.display_name(&ctx.heap)); + return result; } fn progress_inference_rule_mono_template(&mut self, ctx: &Ctx, node_index: InferNodeIndex) -> Result<(), ParseError> { @@ -2669,6 +2686,8 @@ impl PassTyping { if progress_literal_1 || progress_literal_2 { self.queue_node_parent(node_index); } poly_progress_section.forget(); + element_indices_section.forget(); + self.finish_polydata_constraint(node_index); return Ok(()); } @@ -2822,6 +2841,8 @@ impl PassTyping { let node_expr_id = node.expr_id; let rule = node.inference_rule.as_call_expr(); + debug_log!("Progressing call expression inference rule (node index {})", node_index); + let mut poly_progress_section = self.poly_progress_buffer.start_section(); let argument_node_indices = self.index_buffer.start_section_initialized(&rule.argument_indices); @@ -2829,21 +2850,29 @@ impl PassTyping { // out the polymorphic variables for (argument_index, argument_node_index) in argument_node_indices.iter_copied().enumerate() { let argument_expr_id = self.infer_nodes[argument_node_index].expr_id; + debug_log!(" * Argument {}: Provided by node index {}", argument_index, argument_node_index); + debug_log!(" * --- Pre: {}", self.infer_nodes[argument_node_index].expr_type.display_name(&ctx.heap)); let (_, progress_argument) = self.apply_polydata_equal2_constraint( ctx, node_index, argument_expr_id, "argument's", PolyDataTypeIndex::Associated(argument_index), 0, argument_node_index, 0, &mut poly_progress_section )?; + debug_log!(" * --- Post: {}", self.infer_nodes[argument_node_index].expr_type.display_name(&ctx.heap)); + debug_log!(" * --- Progression: {}", progress_argument); if progress_argument { self.queue_node(argument_node_index); } } // Same for the return type. + debug_log!(" * Return type: Provided by node index {}", node_index); + debug_log!(" * --- Pre: {}", self.infer_nodes[node_index].expr_type.display_name(&ctx.heap)); let (_, progress_call_1) = self.apply_polydata_equal2_constraint( ctx, node_index, node_expr_id, "return", PolyDataTypeIndex::Returned, 0, node_index, 0, &mut poly_progress_section )?; + debug_log!(" * --- Post: {}", self.infer_nodes[node_index].expr_type.display_name(&ctx.heap)); + debug_log!(" * --- Progression: {}", progress_call_1); // We will now apply any progression in the polymorphic variable type // back to the arguments. diff --git a/src/protocol/parser/pass_validation_linking.rs b/src/protocol/parser/pass_validation_linking.rs index 5490af080f9b8eec88f98caa0ec7b0be6ad14862..ae2f403e976a52ca8b56971f1b892e21f74264a8 100644 --- a/src/protocol/parser/pass_validation_linking.rs +++ b/src/protocol/parser/pass_validation_linking.rs @@ -200,6 +200,7 @@ impl Visitor for PassValidationLinking { let definition = &ctx.heap[id]; let body_id = definition.body; + let definition_is_builtin = definition.source.is_builtin(); let section = self.variable_buffer.start_section_initialized(&definition.parameters); for variable_idx in 0..section.len() { let variable_id = section[variable_idx]; @@ -207,8 +208,11 @@ impl Visitor for PassValidationLinking { } section.forget(); - // Visit statements in function body - self.visit_block_stmt(ctx, body_id)?; + // Visit statements in function body, if present at all + if !definition_is_builtin { + self.visit_block_stmt(ctx, body_id)?; + } + self.pop_scope(old_scope); self.resolve_pending_control_flow_targets(ctx)?; @@ -898,7 +902,8 @@ impl Visitor for PassValidationLinking { match &mut literal_expr.value { Literal::Null | Literal::True | Literal::False | - Literal::Character(_) | Literal::String(_) | Literal::Integer(_) => { + Literal::Character(_) | Literal::Bytestring(_) | Literal::String(_) | + Literal::Integer(_) => { // Just the parent has to be set, done above }, Literal::Struct(literal) => { @@ -1156,6 +1161,10 @@ impl Visitor for PassValidationLinking { Method::SelectStart | Method::SelectRegisterCasePort | Method::SelectWait => unreachable!(), // not usable by programmer directly + Method::ComponentRandomU32 + | Method::ComponentTcpClient => { + expecting_wrapping_new_stmt = true; + }, Method::UserFunction => {} Method::UserComponent => { expecting_wrapping_new_stmt = true; diff --git a/src/protocol/parser/symbol_table.rs b/src/protocol/parser/symbol_table.rs index 0d64b4d7d5515fa4c0c530f8dc43dff3d74f36b6..088dc52d102199b6cf2e4b2f981cb4aebc30af6f 100644 --- a/src/protocol/parser/symbol_table.rs +++ b/src/protocol/parser/symbol_table.rs @@ -85,7 +85,6 @@ pub struct SymbolDefinition { // spans and module IDs pub defined_in_module: RootId, pub defined_in_scope: SymbolScope, - pub definition_span: InputSpan, // full span of definition pub identifier_span: InputSpan, // span of just the identifier // Location where the symbol is introduced in its scope pub imported_at: Option, @@ -231,6 +230,14 @@ impl SymbolTable { Ok(()) } + /// Insert a symbol in the global scope. Naturally there will be a + /// collision (as the symbol originates from a module), so we do *not* check + /// for this. + pub(crate) fn insert_symbol_in_global_scope(&mut self, symbol: Symbol) { + let scoped_symbols = self.scope_lookup.get_mut(&SymbolScope::Global).unwrap(); + scoped_symbols.symbols.push(symbol); + } + /// Retrieves a symbol by name by searching in a particular scope and that scope's parents. The /// returned symbol may both be imported as defined within any of the searched scopes. pub(crate) fn get_symbol_by_name( diff --git a/src/protocol/parser/token_parsing.rs b/src/protocol/parser/token_parsing.rs index 47f02e7f6c2367f467a348b4363af2e28a590014..28663793ff335f24a9db786c9514ece91f92536b 100644 --- a/src/protocol/parser/token_parsing.rs +++ b/src/protocol/parser/token_parsing.rs @@ -86,6 +86,15 @@ pub(crate) const KW_TYPE_CHAR: &'static [u8] = KW_TYPE_CHAR_STR.as_bytes(); pub(crate) const KW_TYPE_STRING: &'static [u8] = KW_TYPE_STRING_STR.as_bytes(); pub(crate) const KW_TYPE_INFERRED: &'static [u8] = KW_TYPE_INFERRED_STR.as_bytes(); +// Builtin pragma types +// Not usable by the programmer, but usable in the standard library. These hint +// at the fact that we need a different system (e.g. function overloading) +pub(crate) const PRAGMA_TYPE_VOID: &'static [u8] = b"#type_void"; +pub(crate) const PRAGMA_TYPE_PORTLIKE: &'static [u8] = b"#type_portlike"; +pub(crate) const PRAGMA_TYPE_INTEGERLIKE: &'static [u8] = b"#type_integerlike"; +pub(crate) const PRAGMA_TYPE_ARRAYLIKE: &'static [u8] = b"#type_arraylike"; + + /// A special trait for when consuming comma-separated things such that we can /// push them onto a `Vec` and onto a `ScopedSection`. As we monomorph for /// very specific comma-separated cases I don't expect polymorph bloat. @@ -381,28 +390,57 @@ pub(crate) fn consume_character_literal( return Err(ParseError::new_error_str_at_span(source, span, "too many characters in character literal")) } +/// Consumes a bytestring literal: a string interpreted as a byte array. See +/// `consume_string_literal` for further remarks. +pub(crate) fn consume_bytestring_literal( + source: &InputSource, iter: &mut TokenIter, buffer: &mut String +) -> Result { + // Retrieve string span, adjust to remove the leading "b" character + if Some(TokenKind::Bytestring) != iter.next() { + return Err(ParseError::new_error_str_at_pos(source, iter.last_valid_pos(), "expected a bytestring literal")); + } + + let span = iter.next_span(); + iter.consume(); + debug_assert_eq!(source.section_at_pos(span.begin, span.begin.with_offset(1)), b"b"); + + // Parse into buffer + let text_span = InputSpan::from_positions(span.begin.with_offset(1), span.end); + parse_escaped_string(source, text_span, buffer)?; + + return Ok(span); +} + /// Consumes a string literal. We currently support a limited number of /// backslash-escaped characters. Note that the result is stored in the /// buffer. pub(crate) fn consume_string_literal( source: &InputSource, iter: &mut TokenIter, buffer: &mut String ) -> Result { + // Retrieve string span from token stream if Some(TokenKind::String) != iter.next() { return Err(ParseError::new_error_str_at_pos(source, iter.last_valid_pos(), "expected a string literal")); } - buffer.clear(); let span = iter.next_span(); iter.consume(); - let text = source.section_at_span(span); + // Parse into buffer + parse_escaped_string(source, span, buffer)?; + + return Ok(span); +} + +fn parse_escaped_string(source: &InputSource, text_span: InputSpan, buffer: &mut String) -> Result<(), ParseError> { + let text = source.section_at_span(text_span); if !text.is_ascii() { - return Err(ParseError::new_error_str_at_span(source, span, "expected an ASCII string literal")); + return Err(ParseError::new_error_str_at_span(source, text_span, "expected an ASCII string literal")); } debug_assert_eq!(text[0], b'"'); // here as kind of a reminder: the span includes the bounding quotation marks debug_assert_eq!(text[text.len() - 1], b'"'); + buffer.clear(); buffer.reserve(text.len() - 2); let mut was_escape = false; @@ -410,9 +448,9 @@ pub(crate) fn consume_string_literal( let cur = text[idx]; let is_escape = cur == b'\\'; if was_escape { - let to_push = parse_escaped_character(source, span, cur)?; + let to_push = parse_escaped_character(source, text_span, cur)?; buffer.push(to_push); - } else { + } else if !is_escape { buffer.push(cur as char); } @@ -425,9 +463,10 @@ pub(crate) fn consume_string_literal( debug_assert!(!was_escape); // because otherwise we couldn't have ended the string literal - Ok(span) + return Ok(()); } +#[inline] fn parse_escaped_character(source: &InputSource, literal_span: InputSpan, v: u8) -> Result { let result = match v { b'r' => '\r', @@ -449,13 +488,13 @@ fn parse_escaped_character(source: &InputSource, literal_span: InputSpan, v: u8) Ok(result) } -pub(crate) fn consume_pragma<'a>(source: &'a InputSource, iter: &mut TokenIter) -> Result<(&'a [u8], InputPosition, InputPosition), ParseError> { +pub(crate) fn consume_pragma<'a>(source: &'a InputSource, iter: &mut TokenIter) -> Result<(&'a [u8], InputSpan), ParseError> { if Some(TokenKind::Pragma) != iter.next() { return Err(ParseError::new_error_str_at_pos(source, iter.last_valid_pos(), "expected a pragma")); } - let (pragma_start, pragma_end) = iter.next_positions(); + let pragma_span = iter.next_span(); iter.consume(); - Ok((source.section_at_pos(pragma_start, pragma_end), pragma_start, pragma_end)) + Ok((source.section_at_span(pragma_span), pragma_span)) } pub(crate) fn has_ident(source: &InputSource, iter: &mut TokenIter, expected: &[u8]) -> bool { @@ -540,7 +579,6 @@ fn is_reserved_expression_keyword(text: &[u8]) -> bool { match text { KW_LET | KW_CAST | KW_LIT_TRUE | KW_LIT_FALSE | KW_LIT_NULL | - KW_FUNC_GET | KW_FUNC_PUT | KW_FUNC_FIRES | KW_FUNC_CREATE | KW_FUNC_ASSERT | KW_FUNC_LENGTH | KW_FUNC_PRINT => true, _ => false, } } @@ -603,15 +641,16 @@ pub(crate) fn construct_symbol_conflict_error( format!("the type '{}' imported here", symbol.name.as_str()), Some(import.as_symbols().span) ); - } else { - // This is a defined symbol. So this must mean that the - // error was caused by it being defined. - debug_assert_eq!(definition.defined_in_module, module.root_id); - + } else if definition.defined_in_module == module.root_id { + // This is a symbol defined in the same module return ( format!("the type '{}' defined here", symbol.name.as_str()), Some(definition.identifier_span) ) + } else { + // Not imported, not defined in the module, so must be + // a global + return (format!("the global '{}'", symbol.name.as_str()), None) } } } diff --git a/src/protocol/parser/tokens.rs b/src/protocol/parser/tokens.rs index ccf6982e268efdeaf5a532c9644a6803fd9d148a..2c1de3259841299875c763f62d3ab2655d576ea0 100644 --- a/src/protocol/parser/tokens.rs +++ b/src/protocol/parser/tokens.rs @@ -12,6 +12,7 @@ pub enum TokenKind { Ident, // regular identifier Pragma, // identifier with prefixed `#`, range includes `#` Integer, // integer literal + Bytestring, // string literal, interpreted as byte array, range includes 'b"' String, // string literal, range includes `"` Character, // character literal, range includes `'` LineComment, // line comment, range includes leading `//`, but not newline @@ -78,7 +79,7 @@ pub enum TokenKind { impl TokenKind { /// Returns true if the next expected token is the special `TokenKind::SpanEnd` token. This is /// the case for tokens of variable length (e.g. an identifier). - fn has_span_end(&self) -> bool { + pub(crate) fn has_span_end(&self) -> bool { return *self <= TokenKind::BlockComment } @@ -152,7 +153,8 @@ impl TokenKind { TK::ShiftLeftEquals => "<<=", TK::ShiftRightEquals => ">>=", // Lets keep these in explicitly for now, in case we want to add more symbols - TK::Ident | TK::Pragma | TK::Integer | TK::String | TK::Character | + TK::Ident | TK::Pragma | TK::Integer | + TK::Bytestring | TK::String | TK::Character | TK::LineComment | TK::BlockComment | TK::SpanEnd => unreachable!(), } } @@ -170,64 +172,48 @@ impl Token { } } -/// The kind of token ranges that are specially parsed by the tokenizer. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum TokenRangeKind { - Module, +#[derive(Debug, Clone, Copy)] +pub enum TokenMarkerKind { Pragma, Import, Definition, - Code, } -pub const NO_RELATION: i32 = -1; -pub const NO_SIBLING: i32 = NO_RELATION; - -/// A range of tokens with a specific meaning. Such a range is part of a tree -/// where each parent tree envelops all of its children. +/// A marker for a specific token. These are stored separately from the array of +/// tokens. These are used for initial symbol, module name, and import +/// discovery. #[derive(Debug)] -pub struct TokenRange { - // Index of parent in `TokenBuffer.ranges`, does not have a parent if the - // range kind is Module, in that case the parent index is -1. - pub parent_idx: i32, - pub range_kind: TokenRangeKind, +pub struct TokenMarker { + pub kind: TokenMarkerKind, pub curly_depth: u32, - // Offsets into `TokenBuffer.ranges`: the tokens belonging to this range. - pub start: u32, // first token (inclusive index) - pub end: u32, // last token (exclusive index) - // Child ranges - pub num_child_ranges: u32, // Number of subranges - pub first_child_idx: i32, // First subrange (or -1 if no subranges) - pub last_child_idx: i32, // Last subrange (or -1 if no subranges) - pub next_sibling_idx: i32, // Next subrange (or -1 if no next subrange) + // Indices into token buffer. The first token is inclusive and set upon + // tokenization, the last token is set at a later stage in parsing (e.g. + // at symbol discovery we may parse some of the `Pragma` tokens and set the + // last parsed token) + pub first_token: u32, + pub last_token: u32, + pub handled: bool, } pub struct TokenBuffer { pub tokens: Vec, - pub ranges: Vec, + pub markers: Vec, } impl TokenBuffer { pub(crate) fn new() -> Self { - Self{ tokens: Vec::new(), ranges: Vec::new() } - } - - pub(crate) fn iter_range<'a>(&'a self, range: &TokenRange) -> TokenIter<'a> { - TokenIter::new(self, range.start as usize, range.end as usize) - } - - pub(crate) fn start_pos(&self, range: &TokenRange) -> InputPosition { - self.tokens[range.start as usize].pos + return Self{ + tokens: Vec::new(), + markers: Vec::new(), + }; } - pub(crate) fn end_pos(&self, range: &TokenRange) -> InputPosition { - let last_token = &self.tokens[range.end as usize - 1]; - if last_token.kind == TokenKind::SpanEnd { - return last_token.pos - } else { - debug_assert!(!last_token.kind.has_span_end()); - return last_token.pos.with_offset(last_token.kind.num_characters()); - } + pub(crate) fn iter_range( + &self, inclusive_start: u32, exclusive_end: Option + ) -> TokenIter { + let exclusive_end = exclusive_end.unwrap_or(self.tokens.len() as u32) as usize; + debug_assert!(exclusive_end <= self.tokens.len()); + TokenIter::new(self, inclusive_start as usize, exclusive_end) } } @@ -337,6 +323,10 @@ impl<'a> TokenIter<'a> { } } + pub(crate) fn token_index(&self) -> u32 { + return self.cur as u32; + } + /// Saves the current iteration position, may be passed to `load` to return /// the iterator to a previous position. pub(crate) fn save(&self) -> (usize, usize) { diff --git a/src/protocol/parser/type_table.rs b/src/protocol/parser/type_table.rs index 97ee8c39d8258248411808abf16807bb44fa536f..2084a2474138f966eba4120d4ecf030ee1b506b1 100644 --- a/src/protocol/parser/type_table.rs +++ b/src/protocol/parser/type_table.rs @@ -683,10 +683,10 @@ impl TypeTable { self.definition_lookup.get(&definition_id) } - /// Returns the index into the monomorph type array if the procedure type + /// Returns the index into the monomorph type array if the provided type /// already has a (reserved) monomorph. #[inline] - pub(crate) fn get_procedure_monomorph_type_id(&self, definition_id: &DefinitionId, type_parts: &[ConcreteTypePart]) -> Option { + pub(crate) fn get_monomorph_type_id(&self, definition_id: &DefinitionId, type_parts: &[ConcreteTypePart]) -> Option { // Cannot use internal search key due to mutability issues. But this // method should end up being deprecated at some point anyway. debug_assert_eq!(get_concrete_type_definition(type_parts).unwrap(), *definition_id); @@ -969,7 +969,7 @@ impl TypeTable { // Check and construct return types and argument types. if let Some(return_type) = &definition.return_type { Self::check_member_parser_type( - modules, ctx, root_id, return_type, definition.builtin + modules, ctx, root_id, return_type, definition.source.is_builtin() )?; } @@ -977,7 +977,7 @@ impl TypeTable { for parameter_id in &definition.parameters { let parameter = &ctx.heap[*parameter_id]; Self::check_member_parser_type( - modules, ctx, root_id, ¶meter.parser_type, definition.builtin + modules, ctx, root_id, ¶meter.parser_type, definition.source.is_builtin() )?; arguments.push(ProcedureArgument{ @@ -2045,6 +2045,7 @@ impl TypeTable { // And now, we're actually, properly, done self.encountered_types.clear(); + self.size_alignment_stack.clear(); } /// Attempts to compute size/alignment for the provided type. Note that this diff --git a/src/protocol/tests/parser_literals.rs b/src/protocol/tests/parser_literals.rs index de343928abfb66f8f8b6cc0399badfa476c205b4..9560cbfe1a33d149a16f4d475688938e841f86ae 100644 --- a/src/protocol/tests/parser_literals.rs +++ b/src/protocol/tests/parser_literals.rs @@ -69,6 +69,47 @@ fn test_string_literals() { ").error(|e| { e.assert_msg_has(0, "non-ASCII character in string literal"); }); } +#[test] +fn test_bytestring_literals() { + Tester::new_single_source_expect_ok("valid", " + func test() -> u8[] { + auto v1 = b\"Hello, world!\"; + auto v2 = b\"\\t\\r\\n\\\\\"; // why hello there, confusing thing + auto v3 = b\"\"; + return b\"No way, dude!\"; + } + ").for_function("test", |f| { f + .for_variable("v1", |v| { v.assert_concrete_type("u8[]"); }) + .for_variable("v2", |v| { v.assert_concrete_type("u8[]"); }) + .for_variable("v3", |v| { v.assert_concrete_type("u8[]"); }); + }); + + Tester::new_single_source_expect_err("unterminated simple", " + func test() -> u8[] { return b\"'; } + ").error(|e| { e + .assert_num(1) + .assert_occurs_at(0, "b\"") + .assert_msg_has(0, "unterminated"); + }); + + Tester::new_single_source_expect_err("unterminated with preceding escaped", " + func test() -> u8[] { return b\"\\\"; } + ").error(|e| { e + .assert_num(1) + .assert_occurs_at(0, "b\"\\") + .assert_msg_has(0, "unterminated"); + }); + + Tester::new_single_source_expect_err("invalid escaped character", " + func test() -> u8[] { return b\"\\y\"; } + ").error(|e| { e.assert_msg_has(0, "unsupported escape character 'y'"); }); + + // Note sure if this should always be in here... + Tester::new_single_source_expect_err("non-ASCII string", " + func test() -> u8[] { return b\"💧\"; } + ").error(|e| { e.assert_msg_has(0, "non-ASCII character in string literal"); }); +} + #[test] fn test_tuple_literals() { Tester::new_single_source_expect_ok("zero tuples", " diff --git a/src/protocol/tests/utils.rs b/src/protocol/tests/utils.rs index 7dff07dacf06403751b5379528373bca3cd94c92..8ee104fec54e1c6076614eabee40a92c987b1f5d 100644 --- a/src/protocol/tests/utils.rs +++ b/src/protocol/tests/utils.rs @@ -59,7 +59,8 @@ impl Tester { } pub(crate) fn compile(self) -> AstTesterResult { - let mut parser = Parser::new(); + let mut parser = Parser::new(None).unwrap(); + for source in self.sources.into_iter() { let source = source.into_bytes(); let input_source = InputSource::new(String::from(""), source); @@ -600,7 +601,8 @@ impl<'a> FunctionTester<'a> { // Find the first occurrence of the expression after the definition of // the function, we'll check that it is included in the body later. - let mut outer_match_idx = self.def.span.begin.offset as usize; + let body = &self.ctx.heap[self.def.body]; + let mut outer_match_idx = body.span.begin.offset as usize; while outer_match_idx < module.source.input.len() { if module.source.input[outer_match_idx..].starts_with(outer_match.as_bytes()) { break; @@ -702,7 +704,7 @@ impl<'a> FunctionTester<'a> { // Assuming the function is not polymorphic let definition_id = self.def.this; let func_type = [ConcreteTypePart::Function(definition_id, 0)]; - let mono_index = self.ctx.types.get_procedure_monomorph_type_id(&definition_id.upcast(), &func_type).unwrap(); + let mono_index = self.ctx.types.get_monomorph_type_id(&definition_id.upcast(), &func_type).unwrap(); let mut prompt = Prompt::new(&self.ctx.types, &self.ctx.heap, definition_id, mono_index, ValueGroup::new_stack(Vec::new())); let mut call_context = FakeRunContext{}; @@ -818,7 +820,7 @@ fn get_procedure_monomorph<'a>(heap: &Heap, types: &'a TypeTable, definition_id: [ConcreteTypePart::Component(ast_definition.this, 0)] }; - let mono_index = types.get_procedure_monomorph_type_id(&definition_id, &func_type).unwrap(); + let mono_index = types.get_monomorph_type_id(&definition_id, &func_type).unwrap(); let mono_data = types.get_monomorph(mono_index).variant.as_procedure(); mono_data diff --git a/src/protocol/token_writer.rs b/src/protocol/token_writer.rs new file mode 100644 index 0000000000000000000000000000000000000000..a6028e9b5a315217ffc3d38ff94bf4076b908393 --- /dev/null +++ b/src/protocol/token_writer.rs @@ -0,0 +1,102 @@ +#![allow(dead_code)] + +use std::fmt::{Write, Error as FmtError}; +use std::io::Write as IOWrite; + +use crate::protocol::input_source::{InputSource, InputSpan}; +use crate::protocol::parser::Module; +use crate::protocol::tokens::{Token, TokenKind, TokenMarker}; + +pub(crate) struct TokenWriter { + buffer: String, +} + +impl TokenWriter { + pub(crate) fn new() -> Self { + return Self{ + buffer: String::with_capacity(4096), + } + } + + pub(crate) fn write(&mut self, w: &mut W, modules: &[Module]) { + self.buffer.clear(); + for module in modules { + self.write_module_tokens(module); + } + + w.write_all(self.buffer.as_bytes()).expect("write tokens"); + } + + fn write_module_tokens(&mut self, module: &Module) { + self.write_dashed_indent(0); + + match &module.name { + Some(name) => writeln!(self.buffer, "Module: {}", name.1.as_str()).unwrap(), + None => self.buffer.push_str("Unnamed module\n"), + } + + self.write_marker_array(&module.tokens.markers, 1).expect("write markers"); + self.write_token_array(&module.source, &module.tokens.tokens, 1).expect("write tokens"); + } + + fn write_marker_array(&mut self, markers: &[TokenMarker], indent: u32) -> Result<(), FmtError> { + self.write_indent(indent); + writeln!(self.buffer, "Markers: [")?; + + let marker_indent = indent + 1; + for marker in markers { + self.write_indent(marker_indent); + writeln!(self.buffer, "{:?}", marker)?; + } + + self.write_indent(indent); + writeln!(self.buffer, "]")?; + + return Ok(()); + } + + fn write_token_array(&mut self, source: &InputSource, tokens: &[Token], indent: u32) -> Result<(), FmtError> { + self.write_indent(indent); + writeln!(self.buffer, "Tokens: [")?; + + let num_tokens = tokens.len(); + let token_indent = indent + 1; + for token_index in 0..num_tokens { + // Skip uninteresting tokens + let token = &tokens[token_index]; + if token.kind == TokenKind::SpanEnd { + continue; + } + + self.write_indent(token_indent); + write!(self.buffer, "{:?} (index {})", token.kind, token_index)?; + if token.kind.has_span_end() { + let token_start = token.pos; + let token_end = tokens[token_index + 1].pos; + let section = source.section_at_span(InputSpan::from_positions(token_start, token_end)); + writeln!(self.buffer, " text: {}", String::from_utf8_lossy(section))?; + } else { + self.buffer.push('\n'); + } + } + + self.write_indent(indent); + writeln!(self.buffer, "]")?; + + return Ok(()); + } + + fn write_dashed_indent(&mut self, indent: u32) { + for _ in 0..indent * 2 { + self.buffer.push(' '); + } + self.buffer.push('-'); + self.buffer.push(' '); + } + + fn write_indent(&mut self, indent: u32) { + for _ in 0..(indent + 1)*2 { + self.buffer.push(' '); + } + } +} \ No newline at end of file diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index c092fc99311d9c12835221b9707fe1e20b6827be..cebb7b67d319eb1fb2106c9ac06857d5a575659f 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -17,11 +17,6 @@ impl PortId { } } -pub struct CompPortIds { - pub comp: CompId, - pub port: PortId, -} - #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum PortKind { Putter, @@ -202,6 +197,7 @@ pub enum Message { Data(DataMessage), Sync(SyncMessage), Control(ControlMessage), + Poll, } impl Message { @@ -213,6 +209,8 @@ impl Message { return v.target_port_id, Message::Sync(_) => return None, + Message::Poll => + return None, } } @@ -223,6 +221,7 @@ impl Message { Message::Control(v) => v.target_port_id = Some(port_id), Message::Sync(_) => unreachable!(), // should never be called for this message type + Message::Poll => unreachable!(), } } } diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs new file mode 100644 index 0000000000000000000000000000000000000000..a0cc811fcc5e0ee8cfbf52a3be015719545f2dbd --- /dev/null +++ b/src/runtime2/component/component.rs @@ -0,0 +1,533 @@ +use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId}; +use crate::protocol::*; +use crate::runtime2::*; +use crate::runtime2::communication::*; + +use super::{CompCtx, CompPDL, CompId}; +use super::component_context::*; +use super::component_random::*; +use super::component_internet::*; +use super::control_layer::*; +use super::consensus::*; + +pub enum CompScheduling { + Immediate, + Requeue, + Sleep, + Exit, +} + +/// Generic representation of a component (as viewed by a scheduler). +pub(crate) trait Component { + /// Called upon the creation of the component. Note that the scheduler + /// context is officially running another component (the component that is + /// creating the new component). + fn on_creation(&mut self, comp_id: CompId, sched_ctx: &SchedulerCtx); + + /// Called when a component crashes or wishes to exit. So is not called + /// right before destruction, other components may still hold a handle to + /// the component and send it messages! + fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx); + + /// Called if the component is created by another component and the messages + /// are being transferred between the two. + fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage); + + /// Called if the component receives a new message. The component is + /// responsible for deciding where that messages goes. + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message); + + /// Called if the component's routine should be executed. The return value + /// can be used to indicate when the routine should be run again. + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result; +} + +/// Representation of the generic operating mode of a component. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) enum CompMode { + NonSync, // not in sync mode + Sync, // in sync mode, can interact with other components + SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block + BlockedGet, // blocked because we need to receive a message on a particular port + BlockedPut, // component is blocked because the port is blocked + BlockedSelect, // waiting on message to complete the select statement + StartExit, // temporary state: if encountered then we start the shutdown process + BusyExit, // temporary state: waiting for Acks for all the closed ports + Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 +} + +impl CompMode { + pub(crate) fn is_in_sync_block(&self) -> bool { + use CompMode::*; + + match self { + Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, + NonSync | StartExit | BusyExit | Exit => false, + } + } +} + +/// Component execution state: the execution mode along with some descriptive +/// fields. Fields are public for ergonomic reasons, use member functions when +/// appropriate. +pub(crate) struct CompExecState { + pub mode: CompMode, + pub mode_port: PortId, // valid if blocked on a port (put/get) + pub mode_value: ValueGroup, // valid if blocked on a put +} + +impl CompExecState { + pub(crate) fn new() -> Self { + return Self{ + mode: CompMode::NonSync, + mode_port: PortId::new_invalid(), + mode_value: ValueGroup::default(), + } + } + + pub(crate) fn set_as_blocked_get(&mut self, port: PortId) { + self.mode = CompMode::BlockedGet; + self.mode_port = port; + debug_assert!(self.mode_value.values.is_empty()); + } + + pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool { + return + self.mode == CompMode::BlockedGet && + self.mode_port == port; + } + + pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) { + self.mode = CompMode::BlockedPut; + self.mode_port = port; + self.mode_value = value; + } + + pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool { + return + self.mode == CompMode::BlockedPut && + self.mode_port == port; + } +} + +/// Creates a new component based on its definition. Meaning that if it is a +/// user-defined component then we set up the PDL code state. Otherwise we +/// construct a custom component. This does NOT take care of port and message +/// management. +pub(crate) fn create_component( + protocol: &ProtocolDescription, + definition_id: ProcedureDefinitionId, type_id: TypeId, + arguments: ValueGroup, num_ports: usize +) -> Box { + let definition = &protocol.heap[definition_id]; + debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite); + + if definition.source.is_builtin() { + // Builtin component + let component: Box = match definition.source { + ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)), + ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)), + _ => unreachable!(), + }; + + return component; + } else { + // User-defined component + let prompt = Prompt::new( + &protocol.types, &protocol.heap, + definition_id, type_id, arguments + ); + let component = CompPDL::new(prompt, num_ports); + return Box::new(component); + } +} + +// ----------------------------------------------------------------------------- +// Generic component messaging utilities (for sending and receiving) +// ----------------------------------------------------------------------------- + +/// Default handling of sending a data message. In case the port is blocked then +/// the `ExecState` will become blocked as well. Note that +/// `default_handle_control_message` will ensure that the port becomes +/// unblocked if so instructed by the receiving component. The returned +/// scheduling value must be used. +#[must_use] +pub(crate) fn default_send_data_message( + exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup, + sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx +) -> CompScheduling { + debug_assert_eq!(exec_state.mode, CompMode::Sync); + + // TODO: Handle closed ports + let port_handle = comp_ctx.get_port_handle(transmitting_port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state.is_blocked() { + // Port is blocked, so we cannot send + exec_state.set_as_blocked_put(transmitting_port_id, value); + + return CompScheduling::Sleep; + } else { + // Port is not blocked, so send to the peer + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); + + return CompScheduling::Immediate; + } +} + +pub(crate) enum IncomingData { + PlacedInSlot, + SlotFull(DataMessage), +} + +/// Default handling of receiving a data message. In case there is no room for +/// the message it is returned from this function. Note that this function is +/// different from PDL code performing a `get` on a port; this is the case where +/// the message first arrives at the component. +// NOTE: This is supposed to be a somewhat temporary implementation. It would be +// nicest if the sending component can figure out it cannot send any more data. +#[must_use] +pub(crate) fn default_handle_incoming_data_message( + exec_state: &mut CompExecState, port_value_slot: &mut Option, + comp_ctx: &mut CompCtx, incoming_message: DataMessage, + sched_ctx: &SchedulerCtx, control: &mut ControlLayer +) -> IncomingData { + let target_port_id = incoming_message.data_header.target_port; + + if port_value_slot.is_none() { + // We can put the value in the slot + *port_value_slot = Some(incoming_message); + + // Check if we're blocked on receiving this message. + dbg_code!({ + // Our port cannot have been blocked itself, because we're able to + // directly insert the message into its slot. + let port_handle = comp_ctx.get_port_handle(target_port_id); + assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); + }); + + if exec_state.is_blocked_on_get(target_port_id) { + // Return to normal operation + exec_state.mode = CompMode::Sync; + exec_state.mode_port = PortId::new_invalid(); + debug_assert!(exec_state.mode_value.values.is_empty()); + } + + return IncomingData::PlacedInSlot + } else { + // Slot is already full, so if the port was previously opened, it will + // now become closed + let port_handle = comp_ctx.get_port_handle(target_port_id); + let port_info = comp_ctx.get_port_mut(port_handle); + debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future + + if port_info.state == PortState::Open { + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); + let (peer_handle, message) = + control.initiate_port_blocking(comp_ctx, port_handle); + let peer = comp_ctx.get_peer(peer_handle); + peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + } + + return IncomingData::SlotFull(incoming_message) + } +} + +/// Default handling that has been received through a `get`. Will check if any +/// more messages are waiting, and if the corresponding port was blocked because +/// of full buffers (hence, will use the control layer to make sure the peer +/// will become unblocked). +pub(crate) fn default_handle_received_data_message( + targeted_port: PortId, slot: &mut Option, inbox_backup: &mut Vec, + comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer +) { + debug_assert!(slot.is_none()); // because we've just received from it + + // Check if there are any more messages in the backup buffer + let port_handle = comp_ctx.get_port_handle(targeted_port); + let port_info = comp_ctx.get_port(port_handle); + for message_index in 0..inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == targeted_port { + // One more message, place it in the slot + let message = inbox_backup.remove(message_index); + debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup + *slot = Some(message); + + return; + } + } + + // Did not have any more messages, so if we were blocked, then we need to + // unblock the port now (and inform the peer of this unblocking) + if port_info.state == PortState::BlockedDueToFullBuffers { + comp_ctx.set_port_state(port_handle, PortState::Open); + let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle); + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + } +} + +/// Handles control messages in the default way. Note that this function may +/// take a lot of actions in the name of the caller: pending messages may be +/// sent, ports may become blocked/unblocked, etc. So the execution +/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`) +/// state may all change. +pub(crate) fn default_handle_control_message( + exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, + message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx +) { + match message.content { + ControlMessageContent::Ack => { + default_handle_ack(control, message.id, sched_ctx, comp_ctx); + }, + ControlMessageContent::BlockPort(port_id) => { + // One of our messages was accepted, but the port should be + // blocked. + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state == PortState::Open { + // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); + } + }, + ControlMessageContent::ClosePort(port_id) => { + // Request to close the port. We immediately comply and remove + // the component handle as well + let port_handle = comp_ctx.get_port_handle(port_id); + let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + + // One exception to sending an `Ack` is if we just closed the + // port ourselves, meaning that the `ClosePort` messages got + // sent to one another. + if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) { + default_handle_ack(control, control_id, sched_ctx, comp_ctx); + } else { + default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); + comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed + comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed + } + }, + ControlMessageContent::UnblockPort(port_id) => { + // We were previously blocked (or already closed) + let port_handle = comp_ctx.get_port_handle(port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state == PortState::BlockedDueToFullBuffers { + default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + } + }, + ControlMessageContent::PortPeerChangedBlock(port_id) => { + // The peer of our port has just changed. So we are asked to + // temporarily block the port (while our original recipient is + // potentially rerouting some of the in-flight messages) and + // Ack. Then we wait for the `unblock` call. + debug_assert_eq!(message.target_port_id, Some(port_id)); + let port_handle = comp_ctx.get_port_handle(port_id); + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); + + let port_info = comp_ctx.get_port(port_handle); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + + default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); + }, + ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { + let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); + let port_info = comp_ctx.get_port(port_handle); + debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); + let old_peer_id = port_info.peer_comp_id; + + comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); + + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.peer_comp_id = new_comp_id; + port_info.peer_port_id = new_port_id; + comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); + default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + } + } +} + +/// Handles a component initiating the exiting procedure, and closing all of its +/// ports. Should only be called once per component (which is ensured by +/// checking and modifying the mode in the execution state). +#[must_use] +pub(crate) fn default_handle_start_exit( + exec_state: &mut CompExecState, control: &mut ControlLayer, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx +) -> CompScheduling { + debug_assert_eq!(exec_state.mode, CompMode::StartExit); + sched_ctx.log("Component starting exit"); + exec_state.mode = CompMode::BusyExit; + + // Iterating by index to work around borrowing rules + for port_index in 0..comp_ctx.num_ports() { + let port = comp_ctx.get_port_by_index_mut(port_index); + if port.state == PortState::Closed { + // Already closed, or in the process of being closed + continue; + } + + // Mark as closed + let port_id = port.self_id; + port.state = PortState::Closed; + + // Notify peer of closing + let port_handle = comp_ctx.get_port_handle(port_id); + let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx); + let peer_info = comp_ctx.get_peer(peer); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + } + + return CompScheduling::Immediate; // to check if we can shut down immediately +} + +/// Handles a component waiting until all peers are notified that it is quitting +/// (i.e. after calling `default_handle_start_exit`). +#[must_use] +pub(crate) fn default_handle_busy_exit( + exec_state: &mut CompExecState, control: &ControlLayer, + sched_ctx: &SchedulerCtx +) -> CompScheduling { + debug_assert_eq!(exec_state.mode, CompMode::BusyExit); + if control.has_acks_remaining() { + sched_ctx.log("Component busy exiting, still has `Ack`s remaining"); + return CompScheduling::Sleep; + } else { + sched_ctx.log("Component busy exiting, now shutting down"); + exec_state.mode = CompMode::Exit; + return CompScheduling::Exit; + } +} + +/// Handles a potential synchronous round decision. If there was a decision then +/// the `Some(success)` value indicates whether the round succeeded or not. +/// Might also end up changing the `ExecState`. +pub(crate) fn default_handle_sync_decision( + exec_state: &mut CompExecState, decision: SyncRoundDecision, + consensus: &mut Consensus +) -> Option { + debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); + let success = match decision { + SyncRoundDecision::None => return None, + SyncRoundDecision::Solution => true, + SyncRoundDecision::Failure => false, + }; + + debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); + if success { + exec_state.mode = CompMode::NonSync; + consensus.notify_sync_decision(decision); + return Some(true); + } else { + exec_state.mode = CompMode::StartExit; + return Some(false); + } +} + + +#[inline] +pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { + debug_assert_eq!(_exec_state.mode, CompMode::Exit); + return CompScheduling::Exit; +} + +// ----------------------------------------------------------------------------- +// Internal messaging/state utilities +// ----------------------------------------------------------------------------- + +/// Handles an `Ack` for the control layer. +fn default_handle_ack( + control: &mut ControlLayer, control_id: ControlId, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx +) { + // Since an `Ack` may cause another one, handle them in a loop + let mut to_ack = control_id; + loop { + let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx); + match action { + AckAction::SendMessage(target_comp, message) => { + // FIX @NoDirectHandle + let mut handle = sched_ctx.runtime.get_component_public(target_comp); + handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + let _should_remove = handle.decrement_users(); + debug_assert!(_should_remove.is_none()); + }, + AckAction::ScheduleComponent(to_schedule) => { + // FIX @NoDirectHandle + let mut handle = sched_ctx.runtime.get_component_public(to_schedule); + + // Note that the component is intentionally not + // sleeping, so we just wake it up + debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); + let key = unsafe { to_schedule.upgrade() }; + sched_ctx.runtime.enqueue_work(key); + let _should_remove = handle.decrement_users(); + debug_assert!(_should_remove.is_none()); + }, + AckAction::None => {} + } + + match new_to_ack { + Some(new_to_ack) => to_ack = new_to_ack, + None => break, + } + } +} + +/// Little helper for sending the most common kind of `Ack` +fn default_send_ack( + causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle, + sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx +) { + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{ + id: causer_of_ack_id, + sender_comp_id: comp_ctx.id, + target_port_id: None, + content: ControlMessageContent::Ack + }), true); +} + +/// Handles the unblocking of a putter port. In case there is a pending message +/// on that port then it will be sent. +fn default_handle_unblock_put( + exec_state: &mut CompExecState, consensus: &mut Consensus, + port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, +) { + let port_info = comp_ctx.get_port_mut(port_handle); + let port_id = port_info.self_id; + debug_assert!(port_info.state.is_blocked()); + port_info.state = PortState::Open; + + if exec_state.is_blocked_on_put(port_id) { + // Annotate the message that we're going to send + let port_info = comp_ctx.get_port(port_handle); // for immutable access + debug_assert_eq!(port_info.kind, PortKind::Putter); + let to_send = exec_state.mode_value.take(); + let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send); + + // Retrieve peer to send the message + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true); + + exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state. + exec_state.mode_port = PortId::new_invalid(); + } +} + +#[inline] +pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId { + return PortId(port_id.id); +} + +#[inline] +pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId { + return EvalPortId{ id: port_id.0 }; +} diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs new file mode 100644 index 0000000000000000000000000000000000000000..3b9c9ca77e42b135a396a2965dbfcba32f8b81cb --- /dev/null +++ b/src/runtime2/component/component_internet.rs @@ -0,0 +1,374 @@ +use crate::protocol::eval::{ValueGroup, Value, EvalError}; +use crate::runtime2::*; +use crate::runtime2::component::{CompCtx, CompId}; +use crate::runtime2::stdlib::internet::*; +use crate::runtime2::poll::*; + +use super::component::{self, *}; +use super::control_layer::*; +use super::consensus::*; + +use std::io::ErrorKind as IoErrorKind; + +enum SocketState { + Connected(SocketTcpClient), + Error, +} + +impl SocketState { + fn get_socket(&self) -> &SocketTcpClient { + match self { + SocketState::Connected(v) => v, + SocketState::Error => unreachable!(), + } + } +} + +/// States from the point of view of the component that is connecting to this +/// TCP component (i.e. from the point of view of attempting to interface with +/// a socket). +#[derive(PartialEq, Debug)] +enum SyncState { + AwaitingCmd, + Getting, + Putting, + FinishSync, + FinishSyncThenQuit, +} + +pub struct ComponentTcpClient { + // Properties for the tcp socket + socket_state: SocketState, + sync_state: SyncState, + poll_ticket: Option, + inbox_main: Option, + inbox_backup: Vec, + pdl_input_port_id: PortId, // input from PDL, so transmitted over socket + pdl_output_port_id: PortId, // output towards PDL, so received over socket + input_union_send_tag_value: i64, + input_union_receive_tag_value: i64, + input_union_finish_tag_value: i64, + input_union_shutdown_tag_value: i64, + // Generic component state + exec_state: CompExecState, + control: ControlLayer, + consensus: Consensus, + // Temporary variables + byte_buffer: Vec, +} + +impl Component for ComponentTcpClient { + fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) { + // Retrieve type information for messages we're going to receive + let pd = &sched_ctx.runtime.protocol; + let cmd_type = pd.find_type(b"std.internet", b"Cmd") + .expect("'Cmd' type in the 'std.internet' module"); + let cmd_type = cmd_type + .as_union(); + + self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap(); + self.input_union_receive_tag_value = cmd_type.get_variant_tag_value(b"Receive").unwrap(); + self.input_union_finish_tag_value = cmd_type.get_variant_tag_value(b"Finish").unwrap(); + self.input_union_shutdown_tag_value = cmd_type.get_variant_tag_value(b"Shutdown").unwrap(); + + // Register socket for async events + if let SocketState::Connected(socket) = &self.socket_state { + let self_handle = sched_ctx.runtime.get_component_public(id); + let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, true) + .expect("registering tcp component"); + + debug_assert!(self.poll_ticket.is_none()); + self.poll_ticket = Some(poll_ticket); + } + } + + fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) { + if let Some(poll_ticket) = self.poll_ticket.take() { + sched_ctx.polling.unregister(poll_ticket) + .expect("unregistering tcp component"); + } + } + + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) { + if self.inbox_main.is_none() { + self.inbox_main = Some(message); + } else { + self.inbox_backup.push(message); + } + } + + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + match message { + Message::Data(message) => { + self.handle_incoming_data_message(sched_ctx, comp_ctx, message); + }, + Message::Sync(message) => { + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + }, + Message::Control(message) => { + component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx + ); + }, + Message::Poll => { + sched_ctx.log("Received polling event"); + }, + } + } + + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state)); + + match self.exec_state.mode { + CompMode::BlockedSelect => { + // Not possible: we never enter this state + unreachable!(); + }, + CompMode::NonSync => { + // When in non-sync mode + match &mut self.socket_state { + SocketState::Connected(_socket) => { + if self.sync_state == SyncState::FinishSyncThenQuit { + // Previous request was to let the component shut down + self.exec_state.mode = CompMode::StartExit; + } else { + // Reset for a new request + self.sync_state = SyncState::AwaitingCmd; + self.consensus.notify_sync_start(comp_ctx); + self.exec_state.mode = CompMode::Sync; + } + return Ok(CompScheduling::Immediate); + }, + SocketState::Error => { + // Could potentially send an error message to the + // connected component. + self.exec_state.mode = CompMode::StartExit; + return Ok(CompScheduling::Immediate); + } + } + }, + CompMode::Sync => { + // When in sync mode: wait for a command to come in + match self.sync_state { + SyncState::AwaitingCmd => { + if let Some(message) = &self.inbox_main { + self.consensus.handle_incoming_data_message(comp_ctx, &message); + if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) { + // Check which command we're supposed to execute. + let message = self.inbox_main.take().unwrap(); + let target_port_id = message.data_header.target_port; + component::default_handle_received_data_message( + target_port_id, &mut self.inbox_main, &mut self.inbox_backup, + comp_ctx, sched_ctx, &mut self.control + ); + + let (tag_value, embedded_heap_pos) = message.content.values[0].as_union(); + if tag_value == self.input_union_send_tag_value { + // Retrieve bytes from the message + self.byte_buffer.clear(); + let union_content = &message.content.regions[embedded_heap_pos as usize]; + debug_assert_eq!(union_content.len(), 1); + let array_heap_pos = union_content[0].as_array(); + let array_values = &message.content.regions[array_heap_pos as usize]; + self.byte_buffer.reserve(array_values.len()); + for value in array_values { + self.byte_buffer.push(value.as_uint8()); + } + + self.sync_state = SyncState::Putting; + return Ok(CompScheduling::Immediate); + } else if tag_value == self.input_union_receive_tag_value { + // Component requires a `recv` + self.sync_state = SyncState::Getting; + return Ok(CompScheduling::Immediate); + } else if tag_value == self.input_union_finish_tag_value { + // Component requires us to end the sync round + self.sync_state = SyncState::FinishSync; + return Ok(CompScheduling::Immediate); + } else if tag_value == self.input_union_shutdown_tag_value { + // Component wants to close the connection + self.sync_state = SyncState::FinishSyncThenQuit; + return Ok(CompScheduling::Immediate); + } else { + unreachable!("got tag_value {}", tag_value) + } + } else { + todo!("handle sync failure due to message deadlock"); + return Ok(CompScheduling::Sleep); + } + } else { + self.exec_state.set_as_blocked_get(self.pdl_input_port_id); + return Ok(CompScheduling::Sleep); + } + }, + SyncState::Putting => { + // We're supposed to send a user-supplied message fully + // over the socket. But we might end up blocking. In + // that case the component goes to sleep until it is + // polled. + let socket = self.socket_state.get_socket(); + while !self.byte_buffer.is_empty() { + match socket.send(&self.byte_buffer) { + Ok(bytes_sent) => { + self.byte_buffer.drain(..bytes_sent); + }, + Err(err) => { + if err.kind() == IoErrorKind::WouldBlock { + return Ok(CompScheduling::Sleep); // wait until notified + } else { + todo!("handle socket.send error {:?}", err) + } + } + } + } + + // If here then we're done putting the data, we can + // finish the sync round + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.exec_state.mode = CompMode::SyncEnd; + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + return Ok(CompScheduling::Immediate); + }, + SyncState::Getting => { + // We're going to try and receive a single message. If + // this causes us to end up blocking the component + // goes to sleep until it is polled. + const BUFFER_SIZE: usize = 1024; // TODO: Move to config + + let socket = self.socket_state.get_socket(); + self.byte_buffer.resize(BUFFER_SIZE, 0); + match socket.receive(&mut self.byte_buffer) { + Ok(num_received) => { + self.byte_buffer.resize(num_received, 0); + let message_content = self.bytes_to_data_message_content(&self.byte_buffer); + let scheduling = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, message_content, sched_ctx, &mut self.consensus, comp_ctx); + self.sync_state = SyncState::AwaitingCmd; + return Ok(scheduling); + }, + Err(err) => { + if err.kind() == IoErrorKind::WouldBlock { + return Ok(CompScheduling::Sleep); // wait until polled + } else { + todo!("handle socket.receive error {:?}", err) + } + } + } + }, + SyncState::FinishSync | SyncState::FinishSyncThenQuit => { + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.exec_state.mode = CompMode::SyncEnd; + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + return Ok(CompScheduling::Requeue); + }, + } + }, + CompMode::BlockedGet => { + // Entered when awaiting a new command + debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); + return Ok(CompScheduling::Sleep); + }, + CompMode::SyncEnd | CompMode::BlockedPut => + return Ok(CompScheduling::Sleep), + CompMode::StartExit => + return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)), + CompMode::BusyExit => + return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)), + CompMode::Exit => + return Ok(component::default_handle_exit(&self.exec_state)), + } + } +} + +impl ComponentTcpClient { + pub(crate) fn new(arguments: ValueGroup) -> Self { + use std::net::{IpAddr, Ipv4Addr}; + + debug_assert_eq!(arguments.values.len(), 4); + + // Parsing arguments + let ip_heap_pos = arguments.values[0].as_array(); + let ip_elements = &arguments.regions[ip_heap_pos as usize]; + if ip_elements.len() != 4 { + todo!("friendly error reporting: ip contains 4 octects"); + } + let ip_address = IpAddr::V4(Ipv4Addr::new( + ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), + ip_elements[2].as_uint8(), ip_elements[3].as_uint8() + )); + + let port = arguments.values[1].as_uint16(); + let input_port = component::port_id_from_eval(arguments.values[2].as_input()); + let output_port = component::port_id_from_eval(arguments.values[3].as_output()); + + let socket = SocketTcpClient::new(ip_address, port); + if let Err(socket) = socket { + todo!("friendly error reporting: failed to open socket (reason: {:?})", socket); + } + + return Self{ + socket_state: SocketState::Connected(socket.unwrap()), + sync_state: SyncState::AwaitingCmd, + poll_ticket: None, + inbox_main: None, + inbox_backup: Vec::new(), + input_union_send_tag_value: -1, + input_union_receive_tag_value: -1, + input_union_finish_tag_value: -1, + input_union_shutdown_tag_value: -1, + pdl_input_port_id: input_port, + pdl_output_port_id: output_port, + exec_state: CompExecState::new(), + control: ControlLayer::default(), + consensus: Consensus::new(), + byte_buffer: Vec::new(), + } + } + + // Handles incoming data from the PDL side (hence, going into the socket) + fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + if self.exec_state.mode.is_in_sync_block() { + self.consensus.handle_incoming_data_message(comp_ctx, &message); + } + + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => {}, + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); + } + } + } + + fn data_message_to_bytes(&self, message: DataMessage, bytes: &mut Vec) { + debug_assert_eq!(message.data_header.target_port, self.pdl_input_port_id); + debug_assert_eq!(message.content.values.len(), 1); + + if let Value::Array(array_pos) = message.content.values[0] { + let region = &message.content.regions[array_pos as usize]; + bytes.reserve(region.len()); + for value in region { + bytes.push(value.as_uint8()); + } + } else { + unreachable!(); + } + } + + fn bytes_to_data_message_content(&self, buffer: &[u8]) -> ValueGroup { + // Turn bytes into silly executor-style array + let mut values = Vec::with_capacity(buffer.len()); + for byte in buffer.iter().copied() { + values.push(Value::UInt8(byte)); + } + + // Put in a value group + let mut value_group = ValueGroup::default(); + value_group.regions.push(values); + value_group.values.push(Value::Array(0)); + + return value_group; + } +} \ No newline at end of file diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index a73582435ecaeecf86030c59d95f085e71cff9b1..dd3081ee1d12512335b33c706a690cfa21fc7612 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -7,20 +7,19 @@ use crate::protocol::eval::{ EvalContinuation, EvalResult, EvalError }; +use crate::runtime2::runtime::CompId; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; +use super::component::{ + self, + CompExecState, Component, CompScheduling, CompMode, + port_id_from_eval, port_id_to_eval +}; use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; -pub enum CompScheduling { - Immediate, - Requeue, - Sleep, - Exit, -} - pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, @@ -90,30 +89,6 @@ impl RunContext for ExecCtx { } } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub(crate) enum Mode { - NonSync, // not in sync mode - Sync, // in sync mode, can interact with other components - SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block - BlockedGet, // blocked because we need to receive a message on a particular port - BlockedPut, // component is blocked because the port is blocked - BlockedSelect, // waiting on message to complete the select statement - StartExit, // temporary state: if encountered then we start the shutdown process - BusyExit, // temporary state: waiting for Acks for all the closed ports - Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 -} - -impl Mode { - fn is_in_sync_block(&self) -> bool { - use Mode::*; - - match self { - Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true, - NonSync | StartExit | BusyExit | Exit => false, - } - } -} - struct SelectCase { involved_ports: Vec, } @@ -232,10 +207,8 @@ impl SelectState { } pub(crate) struct CompPDL { - pub mode: Mode, - pub mode_port: PortId, // when blocked on a port - pub mode_value: ValueGroup, // when blocked on a put - select: SelectState, + pub exec_state: CompExecState, + select_state: SelectState, pub prompt: Prompt, pub control: ControlLayer, pub consensus: Consensus, @@ -249,36 +222,30 @@ pub(crate) struct CompPDL { pub inbox_backup: Vec, } -impl CompPDL { - pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { - let mut inbox_main = Vec::new(); - inbox_main.reserve(num_ports); - for _ in 0..num_ports { - inbox_main.push(None); - } +impl Component for CompPDL { + fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) { + // Intentionally empty + } - return Self{ - mode: Mode::NonSync, - mode_port: PortId::new_invalid(), - mode_value: ValueGroup::default(), - select: SelectState::new(), - prompt: initial_state, - control: ControlLayer::default(), - consensus: Consensus::new(), - sync_counter: 0, - exec_ctx: ExecCtx{ - stmt: ExecStmt::None, - }, - inbox_main, - inbox_backup: Vec::new(), + fn on_shutdown(&mut self, _sched_ctx: &SchedulerCtx) { + // Intentionally empty + } + + fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) { + let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); + let port_index = comp_ctx.get_port_index(port_handle); + if self.inbox_main[port_index].is_none() { + self.inbox_main[port_index] = Some(message); + } else { + self.inbox_backup.push(message); } } - pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { - sched_ctx.log(&format!("handling message: {:#?}", message)); + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { + // sched_ctx.log(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { - let mut target = sched_ctx.runtime.get_component_public(new_target); - target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks + let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle + target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(_should_remove.is_none()); return; @@ -289,47 +256,41 @@ impl CompPDL { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { - self.handle_incoming_control_message(sched_ctx, comp_ctx, message); + component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx + ); }, Message::Sync(message) => { self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); + }, + Message::Poll => { + unreachable!(); // because we never register at the polling thread } } } - // ------------------------------------------------------------------------- - // Running component and handling changes in global component state - // ------------------------------------------------------------------------- - - pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; - sched_ctx.log(&format!("Running component (mode: {:?})", self.mode)); + sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode)); // Depending on the mode don't do anything at all, take some special // actions, or fall through and run the PDL code. - match self.mode { - Mode::NonSync | Mode::Sync | Mode::BlockedSelect => { + match self.exec_state.mode { + CompMode::NonSync | CompMode::Sync => { // continue and run PDL code }, - Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => { + CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { return Ok(CompScheduling::Sleep); } - Mode::StartExit => { - self.handle_component_exit(sched_ctx, comp_ctx); - return Ok(CompScheduling::Immediate); - }, - Mode::BusyExit => { - if self.control.has_acks_remaining() { - return Ok(CompScheduling::Sleep); - } else { - self.mode = Mode::Exit; - return Ok(CompScheduling::Exit); - } - }, - Mode::Exit => { - return Ok(CompScheduling::Exit); - } + CompMode::StartExit => return Ok(component::default_handle_start_exit( + &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx + )), + CompMode::BusyExit => return Ok(component::default_handle_busy_exit( + &mut self.exec_state, &self.control, sched_ctx + )), + CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), } let run_result = self.execute_prompt(&sched_ctx)?; @@ -339,12 +300,12 @@ impl CompPDL { EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.handle_sync_end(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::BlockGet(port_id) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); debug_assert!(self.exec_ctx.stmt.is_none()); let port_id = port_id_from_eval(port_id); @@ -356,7 +317,10 @@ impl CompPDL { // Message was received. Make sure any blocked peers and // pending messages are handled. let message = self.inbox_main[port_index].take().unwrap(); - self.handle_received_data_message(sched_ctx, comp_ctx, port_handle); + component::default_handle_received_data_message( + port_id, &mut self.inbox_main[port_index], &mut self.inbox_backup, + comp_ctx, sched_ctx, &mut self.control + ); self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); return Ok(CompScheduling::Immediate); @@ -366,68 +330,66 @@ impl CompPDL { } } else { // We need to wait - self.mode = Mode::BlockedGet; - self.mode_port = port_id; + self.exec_state.set_as_blocked_get(port_id); return Ok(CompScheduling::Sleep); } }, EC::Put(port_id, value) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); sched_ctx.log(&format!("Putting value {:?}", value)); - let port_id = port_id_from_eval(port_id); - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - if port_info.state.is_blocked() { - self.mode = Mode::BlockedPut; - self.mode_port = port_id; - self.mode_value = value; - self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked - return Ok(CompScheduling::Sleep); - } else { - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value); - self.exec_ctx.stmt = ExecStmt::PerformedPut; - return Ok(CompScheduling::Immediate); - } + + // Send the message + let target_port_id = port_id_from_eval(port_id); + let scheduling = component::default_send_data_message( + &mut self.exec_state, target_port_id, value, + sched_ctx, &mut self.consensus, comp_ctx + ); + + // When `run` is called again (potentially after becoming + // unblocked) we need to instruct the executor that we performed + // the `put` + self.exec_ctx.stmt = ExecStmt::PerformedPut; + return Ok(scheduling); }, EC::SelectStart(num_cases, _num_ports) => { - debug_assert_eq!(self.mode, Mode::Sync); - self.select.handle_select_start(num_cases); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); + self.select_state.handle_select_start(num_cases); return Ok(CompScheduling::Requeue); }, EC::SelectRegisterPort(case_index, port_index, port_id) => { - debug_assert_eq!(self.mode, Mode::Sync); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); let port_id = port_id_from_eval(port_id); - if let Err(_err) = self.select.register_select_case_port(comp_ctx, case_index, port_index, port_id) { + if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { todo!("handle registering a port multiple times"); } return Ok(CompScheduling::Immediate); }, EC::SelectWait => { - debug_assert_eq!(self.mode, Mode::Sync); - let select_decision = self.select.handle_select_waiting_point(&self.inbox_main, comp_ctx); + debug_assert_eq!(self.exec_state.mode, CompMode::Sync); + let select_decision = self.select_state.handle_select_waiting_point(&self.inbox_main, comp_ctx); if let SelectDecision::Case(case_index) = select_decision { // Reached a conclusion, so we can continue immediately self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); - self.mode = Mode::Sync; + self.exec_state.mode = CompMode::Sync; return Ok(CompScheduling::Immediate); } else { // No decision yet - self.mode = Mode::BlockedSelect; + self.exec_state.mode = CompMode::BlockedSelect; return Ok(CompScheduling::Sleep); } }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { - self.mode = Mode::StartExit; // next call we'll take care of the exit + self.exec_state.mode = CompMode::StartExit; // next call we'll take care of the exit return Ok(CompScheduling::Immediate); }, EC::SyncBlockStart => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); return Ok(CompScheduling::Immediate); }, EC::NewComponent(definition_id, type_id, arguments) => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.create_component_and_transfer_ports( sched_ctx, comp_ctx, definition_id, type_id, arguments @@ -435,7 +397,7 @@ impl CompPDL { return Ok(CompScheduling::Requeue); }, EC::NewChannel => { - debug_assert_eq!(self.mode, Mode::NonSync); + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); debug_assert!(self.exec_ctx.stmt.is_none()); let channel = comp_ctx.create_channel(); self.exec_ctx.stmt = ExecStmt::CreatedChannel(( @@ -448,6 +410,34 @@ impl CompPDL { } } } +} + +impl CompPDL { + pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { + let mut inbox_main = Vec::new(); + inbox_main.reserve(num_ports); + for _ in 0..num_ports { + inbox_main.push(None); + } + + return Self{ + exec_state: CompExecState::new(), + select_state: SelectState::new(), + prompt: initial_state, + control: ControlLayer::default(), + consensus: Consensus::new(), + sync_counter: 0, + exec_ctx: ExecCtx{ + stmt: ExecStmt::None, + }, + inbox_main, + inbox_backup: Vec::new(), + } + } + + // ------------------------------------------------------------------------- + // Running component and handling changes in global component state + // ------------------------------------------------------------------------- fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { let mut step_result = EvalContinuation::Stepping; @@ -466,11 +456,11 @@ impl CompPDL { self.consensus.notify_sync_start(comp_ctx); for message in self.inbox_main.iter() { if let Some(message) = message { - self.consensus.handle_new_data_message(comp_ctx, message); + self.consensus.handle_incoming_data_message(comp_ctx, message); } } - debug_assert_eq!(self.mode, Mode::NonSync); - self.mode = Mode::Sync; + debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); + self.exec_state.mode = CompMode::Sync; } /// Handles end of sync. The conclusion to the sync round might arise @@ -480,7 +470,7 @@ impl CompPDL { fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); - self.mode = Mode::SyncEnd; + self.exec_state.mode = CompMode::SyncEnd; self.handle_sync_decision(sched_ctx, comp_ctx, decision); } @@ -488,30 +478,28 @@ impl CompPDL { /// the internal `Mode`, such that the next call to `run` can take the /// appropriate next steps. fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { - sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.mode)); - let is_success = match decision { + sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.exec_state.mode)); + match decision { SyncRoundDecision::None => { // No decision yet return; }, - SyncRoundDecision::Solution => true, - SyncRoundDecision::Failure => false, - }; - - // If here then we've reached a decision - debug_assert_eq!(self.mode, Mode::SyncEnd); - if is_success { - self.mode = Mode::NonSync; - self.consensus.notify_sync_decision(decision); - } else { - self.mode = Mode::StartExit; + SyncRoundDecision::Solution => { + debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); + self.exec_state.mode = CompMode::NonSync; + self.consensus.notify_sync_decision(decision); + }, + SyncRoundDecision::Failure => { + debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); + self.exec_state.mode = CompMode::StartExit; + }, } } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component exiting"); - debug_assert_eq!(self.mode, Mode::StartExit); - self.mode = Mode::BusyExit; + debug_assert_eq!(self.exec_state.mode, CompMode::StartExit); + self.exec_state.mode = CompMode::BusyExit; // Doing this by index, then retrieving the handle is a bit rediculous, // but Rust is being Rust with its borrowing rules. @@ -530,7 +518,7 @@ impl CompPDL { let port_handle = comp_ctx.get_port_handle(port_id); let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx); let peer_info = comp_ctx.get_peer(peer); - peer_info.handle.send_message(sched_ctx, Message::Control(message), true); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } } @@ -538,180 +526,34 @@ impl CompPDL { // Handling messages // ------------------------------------------------------------------------- - fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) { - let port_info = comp_ctx.get_port(source_port_handle); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - let peer_info = comp_ctx.get_peer(peer_handle); - let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true); - } - /// Handles a message that came in through the public inbox. This function /// will handle putting it in the correct place, and potentially blocking /// the port in case too many messages are being received. fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { - // Whatever we do, glean information from headers in message - if self.mode.is_in_sync_block() { - self.consensus.handle_new_data_message(comp_ctx, &message); - } - - // Check if we can insert it directly into the storage associated with - // the port - let target_port_id = message.data_header.target_port; - let port_handle = comp_ctx.get_port_handle(target_port_id); - let port_index = comp_ctx.get_port_index(port_handle); - if self.inbox_main[port_index].is_none() { - self.inbox_main[port_index] = Some(message); + use component::IncomingData; - // After direct insertion, check if this component's execution is - // blocked on receiving a message on that port - debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly - if self.mode == Mode::BlockedGet && self.mode_port == target_port_id { - // We were indeed blocked - self.mode = Mode::Sync; - self.mode_port = PortId::new_invalid(); - } else if self.mode == Mode::BlockedSelect { - let select_decision = self.select.handle_updated_inbox(&self.inbox_main, comp_ctx); - if let SelectDecision::Case(case_index) = select_decision { - self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); - self.mode = Mode::Sync; - } - } - - return; - } - - // The direct inbox is full, so the port will become (or was already) blocked - let port_info = comp_ctx.get_port_mut(port_handle); - debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); - - if port_info.state == PortState::Open { - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); - let (peer_handle, message) = - self.control.initiate_port_blocking(comp_ctx, port_handle); - - let peer = comp_ctx.get_peer(peer_handle); - peer.handle.send_message(sched_ctx, Message::Control(message), true); + // Whatever we do, glean information from headers in message + if self.exec_state.mode.is_in_sync_block() { + self.consensus.handle_incoming_data_message(comp_ctx, &message); } - // But we still need to remember the message, so: - self.inbox_backup.push(message); - } - - /// Handles when a message has been handed off from the inbox to the PDL - /// code. We check to see if there are more messages waiting and, if not, - /// then we handle the case where the port might have been blocked - /// previously. - fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { + let port_handle = comp_ctx.get_port_handle(message.data_header.target_port); let port_index = comp_ctx.get_port_index(port_handle); - debug_assert!(self.inbox_main[port_index].is_none()); // this function should be called after the message is taken out - - // Check for any more messages - let port_info = comp_ctx.get_port(port_handle); - for message_index in 0..self.inbox_backup.len() { - let message = &self.inbox_backup[message_index]; - if message.data_header.target_port == port_info.self_id { - // One more message for this port - let message = self.inbox_backup.remove(message_index); - debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we had >1 message on the port - self.inbox_main[port_index] = Some(message); - - return; - } - } - - // Did not have any more messages. So if we were blocked, then we need - // to send the "unblock" message. - if port_info.state == PortState::BlockedDueToFullBuffers { - comp_ctx.set_port_state(port_handle, PortState::Open); - let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle); - let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Control(message), true); - } - } - - fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) { - // Little local utility to send an Ack - fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_handle: LocalPeerHandle) { - let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{ - id: causer_id, - sender_comp_id: comp_ctx.id, - target_port_id: None, - content: ControlMessageContent::Ack, - }), true); - } - - // Handle the content of the control message, and optionally Ack it - match message.content { - ControlMessageContent::Ack => { - self.handle_ack(sched_ctx, comp_ctx, message.id); - }, - ControlMessageContent::BlockPort(port_id) => { - // On of our messages was accepted, but the port should be - // blocked. - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::Open { - // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); - } - }, - ControlMessageContent::ClosePort(port_id) => { - // Request to close the port. We immediately comply and remove - // the component handle as well - let port_handle = comp_ctx.get_port_handle(port_id); - let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; - let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - - // One exception to sending an `Ack` is if we just closed the - // port ourselves, meaning that the `ClosePort` messages got - // sent to one another. - if let Some(control_id) = self.control.has_close_port_entry(port_handle, comp_ctx) { - self.handle_ack(sched_ctx, comp_ctx, control_id); - } else { - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); - comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed - comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed - } - }, - ControlMessageContent::UnblockPort(port_id) => { - // We were previously blocked (or already closed) - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); - debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::BlockedDueToFullBuffers { - self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message, + sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => { + if self.exec_state.mode == CompMode::BlockedSelect { + let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx); + if let SelectDecision::Case(case_index) = select_decision { + self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); + self.exec_state.mode = CompMode::Sync; + } } }, - ControlMessageContent::PortPeerChangedBlock(port_id) => { - // The peer of our port has just changed. So we are asked to - // temporarily block the port (while our original recipient is - // potentially rerouting some of the in-flight messages) and - // Ack. Then we wait for the `unblock` call. - debug_assert_eq!(message.target_port_id, Some(port_id)); - let port_handle = comp_ctx.get_port_handle(port_id); - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); - - let port_info = comp_ctx.get_port(port_handle); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); - - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); - }, - ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { - let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); - let port_info = comp_ctx.get_port(port_handle); - debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); - let old_peer_id = port_info.peer_comp_id; - - comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); - - let port_info = comp_ctx.get_port_mut(port_handle); - port_info.peer_comp_id = new_comp_id; - port_info.peer_port_id = new_port_id; - comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); - self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); } } } @@ -721,67 +563,16 @@ impl CompPDL { self.handle_sync_decision(sched_ctx, comp_ctx, decision); } - /// Little helper that notifies the control layer of an `Ack`, and takes the - /// appropriate subsequent action - fn handle_ack(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control_id: ControlId) { - let mut to_ack = control_id; - loop { - let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); - match action { - AckAction::SendMessage(target_comp, message) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(target_comp); - handle.send_message(sched_ctx, Message::Control(message), true); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::ScheduleComponent(to_schedule) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(to_schedule); - - // Note that the component is intentionally not - // sleeping, so we just wake it up - debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); - let key = unsafe{ to_schedule.upgrade() }; - sched_ctx.runtime.enqueue_work(key); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::None => {} - } - - match new_to_ack { - Some(new_to_ack) => to_ack = new_to_ack, - None => break, - } - } - } - // ------------------------------------------------------------------------- // Handling ports // ------------------------------------------------------------------------- - /// Unblocks a port, potentially continuing execution of the component, in - /// response to a message that told us to unblock a previously blocked - fn handle_unblock_port_instruction(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { - let port_info = comp_ctx.get_port_mut(port_handle); - let port_id = port_info.self_id; - debug_assert!(port_info.state.is_blocked()); - port_info.state = PortState::Open; - - if self.mode == Mode::BlockedPut && port_id == self.mode_port { - // We were blocked on the port that just became unblocked, so - // send the message. - debug_assert_eq!(port_info.kind, PortKind::Putter); - let mut replacement = ValueGroup::default(); - std::mem::swap(&mut replacement, &mut self.mode_value); - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, replacement); - - self.mode = Mode::Sync; - self.mode_port = PortId::new_invalid(); - } - } - + /// Creates a new component and transfers ports. Because of the stepwise + /// process in which memory is allocated, ports are transferred, messages + /// are exchanged, component lifecycle methods are called, etc. This + /// function facilitates a lot of implicit assumptions (e.g. when the + /// `Component::on_creation` method is called, the component is already + /// registered at the runtime). fn create_component_and_transfer_ports( &mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, @@ -793,15 +584,27 @@ impl CompPDL { created_handle: LocalPortHandle, created_id: PortId, } - let mut port_id_pairs = Vec::new(); + let mut opened_port_id_pairs = Vec::new(); + let mut closed_port_id_pairs = Vec::new(); let reservation = sched_ctx.runtime.start_create_pdl_component(); let mut created_ctx = CompCtx::new(&reservation); + let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; + let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; + + // dbg_code!({ + // sched_ctx.log(&format!( + // "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", + // self_proc.identifier.value.as_str(), creator_ctx.id, + // other_proc.identifier.value.as_str(), reservation.id() + // )); + // }); + // Take all the ports ID that are in the `args` (and currently belong to // the creator component) and translate them into new IDs that are // associated with the component we're about to create - let mut arg_iter = ValueGroupIter::new(&mut arguments); + let mut arg_iter = ValueGroupPortIter::new(&mut arguments); while let Some(port_reference) = arg_iter.next() { // Create port entry for new component let creator_port_id = port_reference.id; @@ -814,12 +617,18 @@ impl CompPDL { let created_port = created_ctx.get_port(created_port_handle); let created_port_id = created_port.self_id; - port_id_pairs.push(PortPair{ + let port_id_pair = PortPair { creator_handle: creator_port_handle, creator_id: creator_port_id, created_handle: created_port_handle, created_id: created_port_id, - }); + }; + + if creator_port.state == PortState::Closed { + closed_port_id_pairs.push(port_id_pair) + } else { + opened_port_id_pairs.push(port_id_pair); + } // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) let arg_value = if let Some(heap_pos) = port_reference.heap_pos { @@ -839,20 +648,20 @@ impl CompPDL { // the new component. let mut created_component_has_remote_peers = false; - for pair in port_id_pairs.iter() { + for pair in opened_port_id_pairs.iter() { let creator_port_info = creator_ctx.get_port(pair.creator_handle); let created_port_info = created_ctx.get_port_mut(pair.created_handle); if created_port_info.peer_comp_id == creator_ctx.id { // Port peer is owned by the creator as well - let created_peer_port_index = port_id_pairs + let created_peer_port_index = opened_port_id_pairs .iter() .position(|v| v.creator_id == creator_port_info.peer_port_id); match created_peer_port_index { Some(created_peer_port_index) => { // Peer port moved to the new component as well. So // adjust IDs appropriately. - let peer_pair = &port_id_pairs[created_peer_port_index]; + let peer_pair = &opened_port_id_pairs[created_peer_port_index]; created_port_info.peer_port_id = peer_pair.created_id; created_port_info.peer_comp_id = reservation.id(); todo!("either add 'self peer', or remove that idea from Ctx altogether") @@ -877,20 +686,16 @@ impl CompPDL { // actual component. Note that we initialize it as "not sleeping" as // its initial scheduling might be performed based on `Ack`s in response // to message exchanges between remote peers. - let prompt = Prompt::new( - &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, - definition_id, type_id, arguments, - ); - let component = CompPDL::new(prompt, port_id_pairs.len()); + let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); + let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, ); - let created_ctx = &component.ctx; + component.component.on_creation(created_key.downgrade(), sched_ctx); // Now modify the creator's ports: remove every transferred port and - // potentially remove the peer component. Here is also where we will - // transfer messages in the main inbox. - for pair in port_id_pairs.iter() { + // potentially remove the peer component. + for pair in opened_port_id_pairs.iter() { // Remove peer if appropriate let creator_port_info = creator_ctx.get_port(pair.creator_handle); let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); @@ -899,12 +704,9 @@ impl CompPDL { creator_ctx.remove_port(pair.creator_handle); // Transfer any messages - let created_port_index = created_ctx.get_port_index(pair.created_handle); - let created_port_info = created_ctx.get_port(pair.created_handle); - debug_assert!(component.code.inbox_main[created_port_index].is_none()); if let Some(mut message) = self.inbox_main.remove(creator_port_index) { message.data_header.target_port = pair.created_id; - component.code.inbox_main[created_port_index] = Some(message); + component.component.adopt_message(&mut component.ctx, message) } let mut message_index = 0; @@ -914,27 +716,43 @@ impl CompPDL { // transfer message let mut message = self.inbox_backup.remove(message_index); message.data_header.target_port = pair.created_id; - component.code.inbox_backup.push(message); + component.component.adopt_message(&mut component.ctx, message); } else { message_index += 1; } } // Handle potential channel between creator and created component + let created_port_info = component.ctx.get_port(pair.created_handle); + if created_port_info.peer_comp_id == creator_ctx.id { let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); - peer_port_info.peer_comp_id = created_ctx.id; + peer_port_info.peer_comp_id = component.ctx.id; peer_port_info.peer_port_id = created_port_info.self_id; - creator_ctx.add_peer(peer_port_handle, sched_ctx, created_ctx.id, None); + creator_ctx.add_peer(peer_port_handle, sched_ctx, component.ctx.id, None); } } - // By now all ports have been transferred. We'll now do any of the setup - // for rerouting/messaging + // Do the same for the closed ports + for pair in closed_port_id_pairs.iter() { + let port_index = creator_ctx.get_port_index(pair.creator_handle); + creator_ctx.remove_port(pair.creator_handle); + let _removed_message = self.inbox_main.remove(port_index); + + // In debug mode: since we've closed the port we shouldn't have any + // messages for that port. + debug_assert!(_removed_message.is_none()); + debug_assert!(!self.inbox_backup.iter().any(|v| v.data_header.target_port == pair.creator_id)); + } + + // By now all ports and messages have been transferred. If there are any + // peers that need to be notified about this new component, then we + // initiate the protocol that will notify everyone here. if created_component_has_remote_peers { + let created_ctx = &component.ctx; let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); - for pair in port_id_pairs.iter() { + for pair in opened_port_id_pairs.iter() { let port_info = created_ctx.get_port(pair.created_handle); if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { let message = self.control.add_reroute_entry( @@ -944,7 +762,7 @@ impl CompPDL { ); let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = created_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, message, true); + peer_info.handle.send_message(&sched_ctx.runtime, message, true); } } } else { @@ -954,16 +772,6 @@ impl CompPDL { } } -#[inline] -fn port_id_from_eval(port_id: EvalPortId) -> PortId { - return PortId(port_id.id); -} - -#[inline] -fn port_id_to_eval(port_id: PortId) -> EvalPortId { - return EvalPortId{ id: port_id.0 }; -} - /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { @@ -1005,13 +813,13 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve } } -struct ValueGroupIter<'a> { +struct ValueGroupPortIter<'a> { group: &'a mut ValueGroup, heap_stack: Vec<(usize, usize)>, index: usize, } -impl<'a> ValueGroupIter<'a> { +impl<'a> ValueGroupPortIter<'a> { fn new(group: &'a mut ValueGroup) -> Self { return Self{ group, heap_stack: Vec::new(), index: 0 } } @@ -1023,7 +831,7 @@ struct ValueGroupPortRef { index: usize, } -impl<'a> Iterator for ValueGroupIter<'a> { +impl<'a> Iterator for ValueGroupPortIter<'a> { type Item = ValueGroupPortRef; fn next(&mut self) -> Option { diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs new file mode 100644 index 0000000000000000000000000000000000000000..dce5d709a86cf6b0e73d63bef292113aa38f691c --- /dev/null +++ b/src/runtime2/component/component_random.rs @@ -0,0 +1,151 @@ +use rand::prelude as random; +use rand::RngCore; + +use crate::protocol::eval::{ValueGroup, Value, EvalError}; +use crate::runtime2::*; + +use super::*; +use super::component::{self, Component, CompExecState, CompScheduling, CompMode}; +use super::control_layer::*; +use super::consensus::*; + +/// TODO: Temporary component to figure out what to do with custom components. +/// This component sends random numbers between two u32 limits +pub struct ComponentRandomU32 { + // Properties for this specific component + output_port_id: PortId, + random_minimum: u32, + random_maximum: u32, + num_sends: u32, + max_num_sends: u32, + generator: random::ThreadRng, + // Generic state-tracking + exec_state: CompExecState, + did_perform_send: bool, // when in sync mode + control: ControlLayer, + consensus: Consensus, +} + +impl Component for ComponentRandomU32 { + fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {} + + fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) {} + + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { + // Impossible since this component does not have any input ports in its + // signature. + unreachable!(); + } + + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + match message { + Message::Data(_message) => unreachable!(), + Message::Sync(message) => { + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + }, + Message::Control(message) => { + component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx + ); + }, + Message::Poll => unreachable!(), + } + } + + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); + + match self.exec_state.mode { + CompMode::BlockedGet | CompMode::BlockedSelect => { + // impossible for this component, no input ports and no select + // blocks + unreachable!(); + } + CompMode::NonSync => { + // If in non-sync mode then we check if the arguments make sense + // (at some point in the future, this is just a testing + // component). + if self.random_minimum >= self.random_maximum { + // Could throw an evaluation error, but lets just panic + panic!("going to crash 'n burn your system now, please provide valid arguments"); + } + + if self.num_sends >= self.max_num_sends { + self.exec_state.mode = CompMode::StartExit; + } else { + sched_ctx.log("Entering sync mode"); + self.did_perform_send = false; + self.consensus.notify_sync_start(comp_ctx); + self.exec_state.mode = CompMode::Sync; + } + + return Ok(CompScheduling::Immediate); + }, + CompMode::Sync => { + // This component just sends a single message, then waits until + // consensus has been reached + if !self.did_perform_send { + sched_ctx.log("Sending random message"); + let mut random = self.generator.next_u32() - self.random_minimum; + let random_delta = self.random_maximum - self.random_minimum; + random %= random_delta; + random += self.random_minimum; + let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]); + + let scheduling = component::default_send_data_message( + &mut self.exec_state, self.output_port_id, value_group, + sched_ctx, &mut self.consensus, comp_ctx + ); + + // Blocked or not, we set `did_perform_send` to true. If + // blocked then the moment we become unblocked (and are back + // at the `Sync` mode) we have sent the message. + self.did_perform_send = true; + self.num_sends += 1; + return Ok(scheduling) + } else { + // Message was sent, finish this sync round + sched_ctx.log("Waiting for consensus"); + self.exec_state.mode = CompMode::SyncEnd; + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + return Ok(CompScheduling::Requeue); + } + }, + CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), + CompMode::StartExit => return Ok(component::default_handle_start_exit( + &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx + )), + CompMode::BusyExit => return Ok(component::default_handle_busy_exit( + &mut self.exec_state, &self.control, sched_ctx + )), + CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), + } + } +} + +impl ComponentRandomU32 { + pub(crate) fn new(arguments: ValueGroup) -> Self { + debug_assert_eq!(arguments.values.len(), 4); + debug_assert!(arguments.regions.is_empty()); + let port_id = component::port_id_from_eval(arguments.values[0].as_port_id()); + let minimum = arguments.values[1].as_uint32(); + let maximum = arguments.values[2].as_uint32(); + let num_sends = arguments.values[3].as_uint32(); + + return Self{ + output_port_id: port_id, + random_minimum: minimum, + random_maximum: maximum, + num_sends: 0, + max_num_sends: num_sends, + generator: random::thread_rng(), + exec_state: CompExecState::new(), + did_perform_send: false, + control: ControlLayer::default(), + consensus: Consensus::new(), + } + } +} \ No newline at end of file diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index b782ef0bf1d7ee20baaa589eb2b6aceaa3e1b2c6..f9f3ab3f1df69fe514ced318d8eb388adec96ea9 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -378,7 +378,10 @@ impl Consensus { /// Handles the arrival of a new data message (needs to be called for every /// new data message, even though it might not end up being received). This /// is used to determine peers of `get`ter ports. - pub(crate) fn handle_new_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) { + // TODO: The use of this function is rather ugly. Find a more robust + // scheme about owners of `get`ter ports not knowing about their peers. + // (also, figure out why this was written again, I forgot). + pub(crate) fn handle_incoming_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) { let target_handle = comp_ctx.get_port_handle(message.data_header.target_port); let target_index = comp_ctx.get_port_index(target_handle); let annotation = &mut self.ports[target_index]; @@ -504,7 +507,7 @@ impl Consensus { sync_header: self.create_sync_header(comp_ctx), content: SyncMessageContent::NotificationOfLeader, }; - peer.handle.send_message(sched_ctx, Message::Sync(message), true); + peer.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true); } self.forward_partial_solution(sched_ctx, comp_ctx); @@ -516,7 +519,7 @@ impl Consensus { }; let peer_handle = comp_ctx.get_peer_handle(header.sending_id); let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(sched_ctx, Message::Sync(message), true); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true); } // else: exactly equal } @@ -622,7 +625,7 @@ impl Consensus { sync_header: self.create_sync_header(comp_ctx), content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure }, }); - handle.send_message(sched_ctx, message, true); + handle.send_message(&sched_ctx.runtime, message, true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); } @@ -631,7 +634,7 @@ impl Consensus { fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) { debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id); - leader_info.send_message(sched_ctx, message, true); + leader_info.send_message(&sched_ctx.runtime, message, true); let should_remove = leader_info.decrement_users(); if let Some(key) = should_remove { sched_ctx.runtime.destroy_component(key); diff --git a/src/runtime2/component/mod.rs b/src/runtime2/component/mod.rs index 58b0ea3defc4d26d844fcc1bb86f53a6d84d98ff..e0c03a618ed4196164f14d4721d4cd67ac5da257 100644 --- a/src/runtime2/component/mod.rs +++ b/src/runtime2/component/mod.rs @@ -2,8 +2,12 @@ mod component_pdl; mod component_context; mod control_layer; mod consensus; +mod component; +mod component_random; +mod component_internet; -pub(crate) use component_pdl::{CompPDL, CompScheduling}; +pub(crate) use component::{Component, CompScheduling}; +pub(crate) use component_pdl::{CompPDL}; pub(crate) use component_context::CompCtx; pub(crate) use control_layer::{ControlId}; @@ -13,7 +17,7 @@ use super::runtime::*; /// If the component is sleeping, then that flag will be atomically set to /// false. If we're the ones that made that happen then we add it to the work /// queue. -pub(crate) fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) { +pub(crate) fn wake_up_if_sleeping(runtime: &RuntimeInner, comp_id: CompId, handle: &CompHandle) { use std::sync::atomic::Ordering; let should_wake_up = handle.sleeping @@ -22,6 +26,6 @@ pub(crate) fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, han if should_wake_up { let comp_key = unsafe{ comp_id.upgrade() }; - sched_ctx.runtime.enqueue_work(comp_key); + runtime.enqueue_work(comp_key); } } \ No newline at end of file diff --git a/src/runtime2/error.rs b/src/runtime2/error.rs new file mode 100644 index 0000000000000000000000000000000000000000..4982a3230d43bbb8dcfe4f2fc31913b84b98a46b --- /dev/null +++ b/src/runtime2/error.rs @@ -0,0 +1,70 @@ +use std::fmt::{Write, Debug, Display, Formatter as FmtFormatter, Result as FmtResult}; + +/// Represents an unrecoverable runtime error that is reported to the user (for +/// debugging purposes). Basically a human-readable message with its source +/// location. The error is chainable. +pub struct RtError { + file: &'static str, + line: u32, + message: String, + cause: Option>, +} + +impl RtError { + pub(crate) fn new(file: &'static str, line: u32, message: String) -> RtError { + return RtError { + file, line, message, cause: None, + } + } + + pub(crate) fn wrap(self, file: &'static str, line: u32, message: String) -> RtError { + return RtError { + file, line, message, cause: Some(Box::new(self)) + } + } +} + +impl Display for RtError { + fn fmt(&self, f: &mut FmtFormatter<'_>) -> FmtResult { + let mut error = self; + loop { + write!(f, "[{}:{}] {}", self.file, self.line, self.message)?; + match &error.cause { + Some(cause) => { + writeln!(f, " ...")?; + error = cause.as_ref() + }, + None => { + writeln!(f)?; + }, + } + } + } +} + +impl Debug for RtError { + fn fmt(&self, f: &mut FmtFormatter<'_>) -> FmtResult { + return (self as &dyn Display).fmt(f); + } +} + +macro_rules! rt_error { + ($fmt:expr) => { + $crate::runtime2::error::RtError::new(file!(), line!(), $fmt.to_string()) + }; + ($fmt:expr, $($args:expr),*) => { + $crate::runtime2::error::RtError::new(file!(), line!(), format!($fmt, $($args),*)) + }; +} + +macro_rules! rt_error_try { + ($prev:expr, $($fmt_and_args:expr),*) => { + { + let result = $prev; + match result { + Ok(result) => result, + Err(result) => return Err(result.wrap(file!(), line!(), format!($($fmt_and_args),*))), + } + } + } +} \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 05881c7b85be8a3fd71f8e4d892c8d6c395064db..50fd6dc2813384d890c9d4fe875f745b6d872f48 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -1,8 +1,18 @@ +#[macro_use] mod error; mod store; mod runtime; mod component; mod communication; mod scheduler; +mod poll; +mod stdlib; #[cfg(test)] mod tests; -pub use runtime::Runtime; \ No newline at end of file +pub use runtime::Runtime; +pub(crate) use error::RtError; +pub(crate) use scheduler::SchedulerCtx; +pub(crate) use communication::{ + PortId, PortKind, PortState, + Message, ControlMessage, SyncMessage, DataMessage, + SyncRoundDecision +}; \ No newline at end of file diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..75cf8e9a90c00b6f9d785db8c8ce75221c69f0a3 --- /dev/null +++ b/src/runtime2/poll/mod.rs @@ -0,0 +1,324 @@ +use libc::{self, c_int}; + +use std::{io, ptr, time, thread}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::collections::HashMap; + +use crate::runtime2::RtError; +use crate::runtime2::runtime::{CompHandle, RuntimeInner}; +use crate::runtime2::store::queue_mpsc::*; + + +pub(crate) type FileDescriptor = c_int; + +pub(crate) trait AsFileDescriptor { + fn as_file_descriptor(&self) -> FileDescriptor; + +} + +#[derive(Copy, Clone)] +pub(crate) struct UserData(u64); + +// ----------------------------------------------------------------------------- +// Poller +// ----------------------------------------------------------------------------- + +#[cfg(unix)] +pub(crate) struct Poller { + handle: c_int, +} + +// All of this is gleaned from the `mio` crate. +#[cfg(unix)] +impl Poller { + pub fn new() -> io::Result { + let handle = syscall_result(unsafe{ libc::epoll_create1(libc::EPOLL_CLOEXEC) })?; + + return Ok(Self{ + handle, + }) + } + + fn register(&self, fd: FileDescriptor, user: UserData, read: bool, write: bool) -> io::Result<()> { + let mut event = libc::epoll_event{ + events: Self::events_from_rw_flags(read, write), + u64: user.0, + }; + syscall_result(unsafe{ + libc::epoll_ctl(self.handle, libc::EPOLL_CTL_ADD, fd, &mut event) + })?; + + return Ok(()); + } + + fn unregister(&self, fd: FileDescriptor) -> io::Result<()> { + syscall_result(unsafe{ + libc::epoll_ctl(self.handle, libc::EPOLL_CTL_DEL, fd, ptr::null_mut()) + })?; + + return Ok(()); + } + + /// Performs `epoll_wait`, waiting for the provided timeout or until events + /// are reported. They are stored in the `events` variable (up to + /// `events.cap()` are reported, so ensure it is preallocated). + pub fn wait(&self, events: &mut Vec, timeout: time::Duration) -> io::Result<()> { + // See `mio` for the reason. Works around a linux bug + #[cfg(target_pointer_width = "32")] + const MAX_TIMEOUT: u128 = 1789569; + #[cfg(not(target_pointer_width = "32"))] + const MAX_TIMEOUT: u128 = c_int::MAX as u128; + + let timeout_millis = timeout.as_millis(); + let timeout_millis = if timeout_millis > MAX_TIMEOUT { + -1 // effectively infinite + } else { + timeout_millis as c_int + }; + + debug_assert!(events.is_empty()); + debug_assert!(events.capacity() > 0 && events.capacity() < i32::MAX as usize); + let num_events = syscall_result(unsafe{ + libc::epoll_wait(self.handle, events.as_mut_ptr(), events.capacity() as i32, timeout_millis) + })?; + + unsafe{ + debug_assert!(num_events >= 0); + events.set_len(num_events as usize); + } + + return Ok(()); + } + + fn events_from_rw_flags(read: bool, write: bool) -> u32 { + let mut events = libc::EPOLLET; + if read { + events |= libc::EPOLLIN | libc::EPOLLRDHUP; + } + if write { + events |= libc::EPOLLOUT; + } + + return events as u32; + } +} + +#[cfg(unix)] +impl Drop for Poller { + fn drop(&mut self) { + unsafe{ libc::close(self.handle); } + } +} + +#[inline] +fn syscall_result(result: c_int) -> io::Result { + if result < 0 { + return Err(io::Error::last_os_error()); + } else { + return Ok(result); + } +} + +#[cfg(not(unix))] +struct Poller { + // Not implemented for OS's other than unix +} + +// ----------------------------------------------------------------------------- +// Polling Thread +// ----------------------------------------------------------------------------- + +enum PollCmd { + Register(CompHandle, UserData), + Unregister(FileDescriptor, UserData), + Shutdown, +} + +pub struct PollingThread { + poller: Arc, + runtime: Arc, + queue: QueueDynMpsc, + logging_enabled: bool, +} + +impl PollingThread { + pub(crate) fn new(runtime: Arc, logging_enabled: bool) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> { + let poller = Poller::new() + .map_err(|e| rt_error!("failed to create poller, because: {}", e))?; + let poller = Arc::new(poller); + let queue = QueueDynMpsc::new(64); + let queue_producers = queue.producer_factory(); + + let mut thread_data = PollingThread{ + poller: poller.clone(), + runtime: runtime.clone(), + queue, + logging_enabled, + }; + let thread_handle = thread::spawn(move || { thread_data.run() }); + + let thread_handle = PollingThreadHandle{ + queue: Some(queue_producers.producer()), + handle: Some(thread_handle), + }; + let client_factory = PollingClientFactory{ + poller, + generation_counter: Arc::new(AtomicU32::new(0)), + queue_factory: queue_producers, + }; + + return Ok((thread_handle, client_factory)); + } + + pub(crate) fn run(&mut self) { + use crate::runtime2::scheduler::SchedulerCtx; + use crate::runtime2::communication::Message; + + const NUM_EVENTS: usize = 256; + const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250); + + // @performance: Lot of improvements possible here, a HashMap is likely + // a horrible way to do this. + let mut events = Vec::with_capacity(NUM_EVENTS); + let mut lookup = HashMap::with_capacity(64); + self.log("Starting polling thread"); + + loop { + // Retrieve events first (because the PollingClient will first + // register at epoll, and then push a command into the queue). + self.poller.wait(&mut events, EPOLL_DURATION).unwrap(); + + // Then handle everything in the command queue. + while let Some(command) = self.queue.pop() { + match command { + PollCmd::Register(handle, user_data) => { + self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0)); + let key = Self::user_data_as_key(user_data); + debug_assert!(!lookup.contains_key(&key)); + lookup.insert(key, handle); + }, + PollCmd::Unregister(_file_descriptor, user_data) => { + let key = Self::user_data_as_key(user_data); + debug_assert!(lookup.contains_key(&key)); + let mut handle = lookup.remove(&key).unwrap(); + self.log(&format!("Unregistering component {:?} as {}", handle.id(), user_data.0)); + if let Some(key) = handle.decrement_users() { + self.runtime.destroy_component(key); + } + }, + PollCmd::Shutdown => { + // The contract is that all scheduler threads shutdown + // before the polling thread. This happens when all + // components are removed. + self.log("Received shutdown signal"); + debug_assert!(lookup.is_empty()); + return; + } + } + } + + // Now process all of the events. Because we might have had a + // `Register` command followed by an `Unregister` command (e.g. a + // component has died), we might get events that are not associated + // with an entry in the lookup. + for event in events.drain(..) { + let key = event.u64; + if let Some(handle) = lookup.get(&key) { + let events = event.events; + self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), events)); + handle.send_message(&self.runtime, Message::Poll, true); + } + } + } + } + + #[inline] + fn user_data_as_key(data: UserData) -> u64 { + return data.0; + } + + fn log(&self, message: &str) { + if self.logging_enabled { + println!("[polling] {}", message); + } + } +} + +// bit convoluted, but it works +pub(crate) struct PollingThreadHandle { + // requires Option, because: + queue: Option>, // destructor needs to be called + handle: Option>, // we need to call `join` +} + +impl PollingThreadHandle { + pub(crate) fn shutdown(&mut self) -> thread::Result<()> { + debug_assert!(self.handle.is_some(), "polling thread already destroyed"); + self.queue.take().unwrap().push(PollCmd::Shutdown); + return self.handle.take().unwrap().join(); + } +} + +impl Drop for PollingThreadHandle { + fn drop(&mut self) { + debug_assert!(self.queue.is_none() && self.handle.is_none()); + } +} + +// oh my god, now I'm writing factory objects. I'm not feeling too well +pub(crate) struct PollingClientFactory { + poller: Arc, + generation_counter: Arc, + queue_factory: QueueDynProducerFactory, +} + +impl PollingClientFactory { + pub(crate) fn client(&self) -> PollingClient { + return PollingClient{ + poller: self.poller.clone(), + generation_counter: self.generation_counter.clone(), + queue: self.queue_factory.producer(), + }; + } +} + +pub(crate) struct PollTicket(FileDescriptor, u64); + +/// A structure that allows the owner to register components at the polling +/// thread. Because of assumptions in the communication queue all of these +/// clients should be dropped before stopping the polling thread. +pub(crate) struct PollingClient { + poller: Arc, + generation_counter: Arc, + queue: QueueDynProducer, +} + +impl PollingClient { + pub(crate) fn register(&self, entity: &F, handle: CompHandle, read: bool, write: bool) -> Result { + let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed); + let user_data = user_data_for_component(handle.id().0, generation); + self.queue.push(PollCmd::Register(handle, user_data)); + + let file_descriptor = entity.as_file_descriptor(); + self.poller.register(file_descriptor, user_data, read, write) + .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?; + + return Ok(PollTicket(file_descriptor, user_data.0)); + } + + pub(crate) fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> { + let file_descriptor = ticket.0; + let user_data = UserData(ticket.1); + self.queue.push(PollCmd::Unregister(file_descriptor, user_data)); + self.poller.unregister(file_descriptor) + .map_err(|e| rt_error!("failed to unregister polling, because: {}", e))?; + + return Ok(()); + } +} + +#[inline] +fn user_data_for_component(component_id: u32, generation: u32) -> UserData { + return UserData((generation as u64) << 32 | (component_id as u64)); +} \ No newline at end of file diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 5c6ec8c980e4c03212e62d4123cdac96b437f0a2..9da208a48a371a354cd6ec2d7aece2ff4a5fdf21 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -1,11 +1,14 @@ use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; +use std::thread; use std::collections::VecDeque; use crate::protocol::*; +use crate::runtime2::poll::{PollingThread, PollingThreadHandle}; +use crate::runtime2::RtError; use super::communication::Message; -use super::component::{wake_up_if_sleeping, CompPDL, CompCtx}; +use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx}; use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer}; use super::scheduler::*; @@ -25,7 +28,7 @@ impl CompKey { } } -/// Generational ID of a component +/// Generational ID of a component. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct CompId(pub u32); @@ -53,11 +56,12 @@ impl CompReserved { } } -/// Private fields of a component, may only be modified by a single thread at -/// a time. +/// Representation of a runtime component. Contains the bookkeeping variables +/// for the schedulers, the publicly accessible fields, and the private fields +/// that should only be accessed by the thread running the component's routine. pub(crate) struct RuntimeComp { pub public: CompPublic, - pub code: CompPDL, + pub component: Box, pub ctx: CompCtx, pub inbox: QueueDynMpsc, pub exiting: bool, @@ -79,7 +83,7 @@ pub(crate) struct CompPublic { /// code to make sure this actually happens. pub(crate) struct CompHandle { target: *const CompPublic, - id: CompId, // TODO: @Remove after debugging + id: CompId, #[cfg(debug_assertions)] decremented: bool, } @@ -94,14 +98,17 @@ impl CompHandle { return handle; } - pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) { - sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message)); + pub(crate) fn send_message(&self, runtime: &RuntimeInner, message: Message, try_wake_up: bool) { self.inbox.push(message); if try_wake_up { - wake_up_if_sleeping(sched_ctx, self.id, self); + wake_up_if_sleeping(runtime, self.id, self); } } + pub(crate) fn id(&self) -> CompId { + return self.id; + } + fn increment_users(&self) { let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel); debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed @@ -154,13 +161,16 @@ impl Drop for CompHandle { pub struct Runtime { pub(crate) inner: Arc, - threads: Vec>, + scheduler_threads: Vec>, + polling_handle: PollingThreadHandle, } impl Runtime { // TODO: debug_logging should be removed at some point - pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Runtime { - assert!(num_threads > 0, "need a thread to perform work"); + pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Result { + if num_threads == 0 { + return Err(rt_error!("need at least one thread to create the runtime")); + } let runtime_inner = Arc::new(RuntimeInner { protocol: protocol_description, components: ComponentStore::new(128), @@ -168,21 +178,30 @@ impl Runtime { work_condvar: Condvar::new(), active_elements: AtomicU32::new(1), }); - let mut runtime = Runtime { - inner: runtime_inner, - threads: Vec::with_capacity(num_threads as usize), - }; + let (polling_handle, polling_clients) = rt_error_try!( + PollingThread::new(runtime_inner.clone(), debug_logging), + "failed to build polling thread" + ); + + let mut scheduler_threads = Vec::with_capacity(num_threads as usize); for thread_index in 0..num_threads { - let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index, debug_logging); - let thread_handle = std::thread::spawn(move || { + let mut scheduler = Scheduler::new( + runtime_inner.clone(), polling_clients.client(), + thread_index, debug_logging + ); + let thread_handle = thread::spawn(move || { scheduler.run(); }); - runtime.threads.push(thread_handle); + scheduler_threads.push(thread_handle); } - return runtime; + return Ok(Runtime{ + inner: runtime_inner, + scheduler_threads, + polling_handle, + }); } pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> { @@ -193,7 +212,8 @@ impl Runtime { )?; let reserved = self.inner.start_create_pdl_component(); let ctx = CompCtx::new(&reserved); - let (key, _) = self.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false); + let component = Box::new(CompPDL::new(prompt, 0)); + let (key, _) = self.inner.finish_create_pdl_component(reserved, component, ctx, false); self.inner.enqueue_work(key); return Ok(()) @@ -203,9 +223,11 @@ impl Runtime { impl Drop for Runtime { fn drop(&mut self) { self.inner.decrement_active_components(); - for handle in self.threads.drain(..) { + for handle in self.scheduler_threads.drain(..) { handle.join().expect("join scheduler thread"); } + + self.polling_handle.shutdown().expect("shutdown polling thread"); } } @@ -214,7 +236,7 @@ impl Drop for Runtime { pub(crate) struct RuntimeInner { pub protocol: ProtocolDescription, components: ComponentStore, - work_queue: Mutex>, + work_queue: Mutex>, // TODO: should be MPMC queue work_condvar: Condvar, active_elements: AtomicU32, // active components and APIs (i.e. component creators) } @@ -248,7 +270,7 @@ impl RuntimeInner { pub(crate) fn finish_create_pdl_component( &self, reserved: CompReserved, - component: CompPDL, mut context: CompCtx, initially_sleeping: bool, + component: Box, mut context: CompCtx, initially_sleeping: bool, ) -> (CompKey, &mut RuntimeComp) { let inbox_queue = QueueDynMpsc::new(16); let inbox_producer = inbox_queue.producer(); @@ -261,7 +283,7 @@ impl RuntimeInner { num_handles: AtomicU32::new(1), // the component itself acts like a handle inbox: inbox_producer, }, - code: component, + component, ctx: context, inbox: inbox_queue, exiting: false, @@ -284,12 +306,15 @@ impl RuntimeInner { return CompHandle::new(id, &component.public); } + /// Will remove a component and its memory from the runtime. May only be + /// called if the necessary conditions for destruction have been met. pub(crate) fn destroy_component(&self, key: CompKey) { dbg_code!({ let component = self.get_component(key); debug_assert!(component.exiting); debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0); }); + self.decrement_active_components(); self.components.destroy(key.0); } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index e7359488f13d940ccc259ceabb1cb744c0b6de23..d3159c82aec89f2d63ea6b11a71917be3a285417 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,5 +1,6 @@ use std::sync::Arc; use std::sync::atomic::Ordering; +use crate::runtime2::poll::PollingClient; use super::component::*; use super::runtime::*; @@ -7,21 +8,24 @@ use super::runtime::*; /// Data associated with a scheduler thread pub(crate) struct Scheduler { runtime: Arc, + polling: PollingClient, scheduler_id: u32, debug_logging: bool, } pub(crate) struct SchedulerCtx<'a> { pub runtime: &'a RuntimeInner, + pub polling: &'a PollingClient, pub id: u32, pub comp: u32, pub logging_enabled: bool, } impl<'a> SchedulerCtx<'a> { - pub fn new(runtime: &'a RuntimeInner, id: u32, logging_enabled: bool) -> Self { + pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, logging_enabled: bool) -> Self { return Self { runtime, + polling, id, comp: 0, logging_enabled, @@ -38,12 +42,12 @@ impl<'a> SchedulerCtx<'a> { impl Scheduler { // public interface to thread - pub fn new(runtime: Arc, scheduler_id: u32, debug_logging: bool) -> Self { - return Scheduler{ runtime, scheduler_id, debug_logging } + pub fn new(runtime: Arc, polling: PollingClient, scheduler_id: u32, debug_logging: bool) -> Self { + return Scheduler{ runtime, polling, scheduler_id, debug_logging } } pub fn run(&mut self) { - let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id, self.debug_logging); + let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.debug_logging); 'run_loop: loop { // Wait until we have something to do (or need to quit) @@ -61,9 +65,9 @@ impl Scheduler { let mut new_scheduling = CompScheduling::Immediate; while let CompScheduling::Immediate = new_scheduling { while let Some(message) = component.inbox.pop() { - component.code.handle_message(&mut scheduler_ctx, &mut component.ctx, message); + component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message); } - new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error"); + new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error"); } // Handle the new scheduling @@ -71,7 +75,10 @@ impl Scheduler { CompScheduling::Immediate => unreachable!(), CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); }, CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); }, - CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); } + CompScheduling::Exit => { + component.component.on_shutdown(&scheduler_ctx); + self.mark_component_as_exiting(&scheduler_ctx, component); + } } } } diff --git a/src/runtime2/stdlib/internet.rs b/src/runtime2/stdlib/internet.rs new file mode 100644 index 0000000000000000000000000000000000000000..a3d150a9efea7e821564ec0521e37461918cb2dc --- /dev/null +++ b/src/runtime2/stdlib/internet.rs @@ -0,0 +1,320 @@ +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::mem::size_of; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; + +use libc::{ + c_int, + sockaddr_in, sockaddr_in6, in_addr, in6_addr, + socket, bind, listen, accept, connect, close, +}; +use mio::{event, Interest, Registry, Token}; + +use crate::runtime2::poll::{AsFileDescriptor, FileDescriptor}; + +#[derive(Debug)] +pub enum SocketError { + Opening, + Modifying, + Binding, + Listening, + Connecting, + Accepted, + Accepting, +} + +enum SocketState { + Opened, + Listening, +} + +/// TCP connection +pub struct SocketTcpClient { + socket_handle: libc::c_int, + is_blocking: bool, +} + +impl SocketTcpClient { + pub fn new(ip: IpAddr, port: u16) -> Result { + const BLOCKING: bool = false; + + let socket_handle = create_and_connect_socket( + libc::SOCK_STREAM, libc::IPPROTO_TCP, ip, port + )?; + if !set_socket_blocking(socket_handle, BLOCKING) { + unsafe{ libc::close(socket_handle); } + return Err(SocketError::Modifying); + } + + return Ok(SocketTcpClient{ + socket_handle, + is_blocking: BLOCKING, + }) + } + + pub fn send(&self, message: &[u8]) -> Result { + let result = unsafe{ + let message_pointer = message.as_ptr().cast(); + libc::send(self.socket_handle, message_pointer, message.len() as libc::size_t, 0) + }; + if result < 0 { + return Err(IoError::last_os_error()); + } + + return Ok(result as usize); + } + + /// Receives data from the TCP socket. Returns the number of bytes received. + /// More bytes may be present even thought `used < buffer.len()`. + pub fn receive(&self, buffer: &mut [u8]) -> Result { + let result = unsafe { + let message_pointer = buffer.as_mut_ptr().cast(); + libc::recv(self.socket_handle, message_pointer, buffer.len(), 0) + }; + if result < 0 { + return Err(IoError::last_os_error()); + } + + return Ok(result as usize); + } +} + +impl Drop for SocketTcpClient { + fn drop(&mut self) { + debug_assert!(self.socket_handle >= 0); + unsafe{ close(self.socket_handle) }; + } +} + +impl AsFileDescriptor for SocketTcpClient { + fn as_file_descriptor(&self) -> FileDescriptor { + return self.socket_handle; + } +} + +/// Raw socket receiver. Essentially a listener that accepts a single connection +struct SocketRawRx { + listen_handle: c_int, + accepted_handle: c_int, +} + +impl SocketRawRx { + pub fn new(ip: Option, port: u16) -> Result { + let ip = ip.unwrap_or(Ipv4Addr::UNSPECIFIED); // unspecified is the same as INADDR_ANY + let address = unsafe{ in_addr{ + s_addr: std::mem::transmute(ip.octets()), + }}; + let socket_address = sockaddr_in{ + sin_family: libc::AF_INET as libc::sa_family_t, + sin_port: htons(port), + sin_addr: address, + sin_zero: [0; 8], + }; + + unsafe { + let socket_handle = create_and_bind_socket(libc::SOCK_RAW, 0, IpAddr::V4(ip), port)?; + + let result = listen(socket_handle, 3); + if result < 0 { return Err(SocketError::Listening); } + + return Ok(SocketRawRx{ + listen_handle: socket_handle, + accepted_handle: -1, + }); + } + } + + // pub fn try_accept(&mut self, timeout_ms: u32) -> Result<(), SocketError> { + // if self.accepted_handle >= 0 { + // // Already accepted a connection + // return Err(SocketError::Accepted); + // } + // + // let mut socket_address = sockaddr_in{ + // sin_family: 0, + // sin_port: 0, + // sin_addr: in_addr{ s_addr: 0 }, + // sin_zero: [0; 8] + // }; + // let mut size = size_of::() as u32; + // unsafe { + // let result = accept(self.listen_handle, &mut socket_address as *mut _, &mut size as *mut _); + // if result < 0 { + // return Err(SocketError::Accepting); + // } + // } + // + // return Ok(()); + // } +} + +impl Drop for SocketRawRx { + fn drop(&mut self) { + if self.accepted_handle >= 0 { + unsafe { + close(self.accepted_handle); + } + } + + if self.listen_handle >= 0 { + unsafe { + close(self.listen_handle); + } + } + } +} + +// The following is essentially stolen from `mio`'s io_source.rs file. +#[cfg(unix)] +trait AsRawFileDescriptor { + fn as_raw_file_descriptor(&self) -> c_int; +} + +impl AsRawFileDescriptor for SocketTcpClient { + fn as_raw_file_descriptor(&self) -> c_int { + return self.socket_handle; + } +} + +/// Performs the `socket` and `bind` calls. +fn create_and_bind_socket(socket_type: libc::c_int, protocol: libc::c_int, ip: IpAddr, port: u16) -> Result { + let family = socket_family_from_ip(ip); + + unsafe { + let socket_handle = socket(family, socket_type, protocol); + if socket_handle < 0 { + return Err(SocketError::Opening); + } + + let result = match ip { + IpAddr::V4(ip) => { + let (socket_address, address_size) = create_sockaddr_in_v4(ip, port); + let socket_pointer = &socket_address as *const sockaddr_in; + bind(socket_handle, socket_pointer.cast(), address_size) + }, + IpAddr::V6(ip) => { + let (socket_address, address_size) = create_sockaddr_in_v6(ip, port); + let socket_pointer= &socket_address as *const sockaddr_in6; + bind(socket_handle, socket_pointer.cast(), address_size) + } + }; + if result < 0 { + close(socket_handle); + return Err(SocketError::Binding); + } + + return Ok(socket_handle); + } +} + +/// Performs the `socket` and `connect` calls +fn create_and_connect_socket(socket_type: libc::c_int, protocol: libc::c_int, ip: IpAddr, port: u16) -> Result { + let family = socket_family_from_ip(ip); + unsafe { + let socket_handle = socket(family, socket_type, protocol); + if socket_handle < 0 { + return Err(SocketError::Opening); + } + + let result = match ip { + IpAddr::V4(ip) => { + let (socket_address, address_size) = create_sockaddr_in_v4(ip, port); + let socket_pointer = &socket_address as *const sockaddr_in; + connect(socket_handle, socket_pointer.cast(), address_size) + }, + IpAddr::V6(ip) => { + let (socket_address, address_size) = create_sockaddr_in_v6(ip, port); + let socket_pointer= &socket_address as *const sockaddr_in6; + connect(socket_handle, socket_pointer.cast(), address_size) + } + }; + if result < 0 { + close(socket_handle); + return Err(SocketError::Connecting); + } + + return Ok(socket_handle); + } +} + +#[inline] +fn create_sockaddr_in_v4(ip: Ipv4Addr, port: u16) -> (sockaddr_in, libc::socklen_t) { + let address = unsafe{ + in_addr{ + s_addr: std::mem::transmute(ip.octets()) + } + }; + + let socket_address = sockaddr_in{ + sin_family: libc::AF_INET as libc::sa_family_t, + sin_port: htons(port), + sin_addr: address, + sin_zero: [0; 8] + }; + let address_size = size_of::(); + + return (socket_address, address_size as _); +} + +#[inline] +fn create_sockaddr_in_v6(ip: Ipv6Addr, port: u16) -> (sockaddr_in6, libc::socklen_t) { + // flow label is advised to be, according to RFC6437 a (somewhat + // secure) random number taken from a uniform distribution + let flow_info = rand::random(); + + let address = unsafe{ + in6_addr{ + s6_addr: ip.octets() + } + }; + + let socket_address = sockaddr_in6{ + sin6_family: libc::AF_INET6 as libc::sa_family_t, + sin6_port: htons(port), + sin6_flowinfo: flow_info, + sin6_addr: address, + sin6_scope_id: 0, // incorrect in case of loopback address + }; + let address_size = size_of::(); + + return (socket_address, address_size as _); +} + +#[inline] +fn set_socket_blocking(handle: libc::c_int, blocking: bool) -> bool { + if handle < 0 { + return false; + } + + unsafe{ + let mut flags = libc::fcntl(handle, libc::F_GETFL, 0); + if flags < 0 { + return false; + } + + if blocking { + flags &= !libc::O_NONBLOCK; + } else { + flags |= libc::O_NONBLOCK; + } + + let result = libc::fcntl(handle, libc::F_SETFL, flags); + if result < 0 { + return false; + } + } + + return true; +} + +#[inline] +fn socket_family_from_ip(ip: IpAddr) -> libc::c_int { + return match ip { + IpAddr::V4(_) => libc::AF_INET, + IpAddr::V6(_) => libc::AF_INET6, + }; +} + +#[inline] +fn htons(port: u16) -> u16 { + return port.to_be(); +} \ No newline at end of file diff --git a/src/runtime2/stdlib/mod.rs b/src/runtime2/stdlib/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..e7dbc18c8a152be6852af5f9b3f5c5bf8741dc53 --- /dev/null +++ b/src/runtime2/stdlib/mod.rs @@ -0,0 +1 @@ +#[cfg(feature="internet")] pub(crate) mod internet; \ No newline at end of file diff --git a/src/runtime2/store/queue_mpsc.rs b/src/runtime2/store/queue_mpsc.rs index cbf75e8379809ec83a16aad961922690a74f3d29..308ac0a09d12b3025fd57230e269e60a770a9e18 100644 --- a/src/runtime2/store/queue_mpsc.rs +++ b/src/runtime2/store/queue_mpsc.rs @@ -5,7 +5,9 @@ use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard}; /// Multiple-producer single-consumer queue. Generally used in the publicly /// accessible fields of a component. The holder of this struct should be the -/// consumer. To retrieve access to the producer-side: call `producer()`. +/// consumer. To retrieve access to the producer-side: call `producer()`. In +/// case the queue is moved before one can call `producer()`, call +/// `producer_factory()`. This incurs a bit more overhead. /// /// This is a queue that will resize (indefinitely) if it becomes full, and will /// not shrink. So probably a temporary thing. @@ -75,7 +77,12 @@ impl QueueDynMpsc { #[inline] pub fn producer(&self) -> QueueDynProducer { - return QueueDynProducer::new(self); + return QueueDynProducer::new(self.inner.as_ref()); + } + + #[inline] + pub fn producer_factory(&self) -> QueueDynProducerFactory { + return QueueDynProducerFactory::new(self.inner.as_ref()); } /// Return `true` if a subsequent call to `pop` will return a value. Note @@ -144,14 +151,9 @@ pub struct QueueDynProducer { } impl QueueDynProducer { - fn new(consumer: &QueueDynMpsc) -> Self { - dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel)); - unsafe { - // If you only knew the power of the dark side! Obi-Wan never told - // you what happened to your father! - let queue: *const _ = std::mem::transmute(consumer.inner.as_ref()); - return Self{ queue }; - } + fn new(queue: &Shared) -> Self { + dbg_code!(queue.dbg.fetch_add(1, Ordering::AcqRel)); + return Self{ queue: queue as *const _ }; } pub fn push(&self, value: T) { @@ -268,9 +270,7 @@ impl Drop for QueueDynProducer { // producer end is `Send`, because in debug mode we make sure that there are no // more producers when the queue is destroyed. But is not sync, because that -// would circumvent our atomic counter shenanigans. Although, now that I think -// about it, we're rather likely to just drop a single "producer" into the -// public part of a component. +// would circumvent our atomic counter shenanigans. unsafe impl Send for QueueDynProducer{} #[inline] @@ -278,6 +278,30 @@ fn assert_correct_capacity(capacity: usize) { assert!(capacity.is_power_of_two() && capacity < (u32::MAX as usize) / 2); } +pub struct QueueDynProducerFactory { + queue: *const Shared +} + +impl QueueDynProducerFactory { + fn new(queue: &Shared) -> Self { + dbg_code!(queue.dbg.fetch_add(1, Ordering::AcqRel)); + return Self{ queue: queue as *const _ }; + } + + pub fn producer(&self) -> QueueDynProducer { + return QueueDynProducer::new(unsafe{ &*self.queue }); + } +} + +impl Drop for QueueDynProducerFactory { + fn drop(&mut self) { + dbg_code!({ + let queue = unsafe{ &*self.queue }; + queue.dbg.fetch_sub(1, Ordering::AcqRel); + }); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index a316a1561c391dd2207cc46779f1acfeb2834c14..fb47364e1adef4283a1a5f4961be67cd7bead285 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -9,7 +9,8 @@ fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: V ).expect("create prompt"); let reserved = rt.inner.start_create_pdl_component(); let ctx = CompCtx::new(&reserved); - let (key, _) = rt.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false); + let component = Box::new(CompPDL::new(prompt, 0)); + let (key, _) = rt.inner.finish_create_pdl_component(reserved, component, ctx, false); rt.inner.enqueue_work(key); } @@ -23,7 +24,7 @@ fn test_component_creation() { auto b = 5 + a; } ").expect("compilation"); - let rt = Runtime::new(1, true, pd); + let rt = Runtime::new(1, true, pd).unwrap(); for _i in 0..20 { create_component(&rt, "", "nothing_at_all", no_args()); @@ -80,7 +81,7 @@ fn test_component_communication() { new sender(o_mrmm, 5, 5); new receiver(i_mrmm, 5, 5); }").expect("compilation"); - let rt = Runtime::new(3, true, pd); + let rt = Runtime::new(3, true, pd).unwrap(); create_component(&rt, "", "constructor", no_args()); } @@ -129,7 +130,7 @@ fn test_intermediate_messenger() { new constructor_template(); } ").expect("compilation"); - let rt = Runtime::new(3, true, pd); + let rt = Runtime::new(3, true, pd).unwrap(); create_component(&rt, "", "constructor", no_args()); } @@ -171,7 +172,7 @@ fn test_simple_select() { } composite constructor() { - auto num_sends = 15; + auto num_sends = 1; channel tx_a -> rx_a; channel tx_b -> rx_b; new sender(tx_a, num_sends); @@ -179,7 +180,7 @@ fn test_simple_select() { new sender(tx_b, num_sends); } ").expect("compilation"); - let rt = Runtime::new(3, false, pd); + let rt = Runtime::new(3, true, pd).unwrap(); create_component(&rt, "", "constructor", no_args()); } @@ -201,7 +202,7 @@ fn test_unguarded_select() { } } ").expect("compilation"); - let rt = Runtime::new(3, false, pd); + let rt = Runtime::new(3, false, pd).unwrap(); create_component(&rt, "", "constructor_outside_select", no_args()); create_component(&rt, "", "constructor_inside_select", no_args()); } @@ -217,6 +218,162 @@ fn test_empty_select() { } } ").expect("compilation"); - let rt = Runtime::new(3, false, pd); + let rt = Runtime::new(3, false, pd).unwrap(); create_component(&rt, "", "constructor", no_args()); +} + +#[test] +fn test_random_u32_temporary_thingo() { + let pd = ProtocolDescription::parse(b" + import std.random::random_u32; + + primitive random_taker(in generator, u32 num_values) { + auto i = 0; + while (i < num_values) { + sync { + auto a = get(generator); + } + i += 1; + } + } + + composite constructor() { + channel tx -> rx; + auto num_values = 25; + new random_u32(tx, 1, 100, num_values); + new random_taker(rx, num_values); + } + ").expect("compilation"); + let rt = Runtime::new(1, true, pd).unwrap(); + create_component(&rt, "", "constructor", no_args()); +} + +#[test] +fn test_tcp_socket_http_request() { + let _pd = ProtocolDescription::parse(b" + import std.internet::*; + + primitive requester(out cmd_tx, in data_rx) { + print(\"*** TCPSocket: Sending request\"); + sync { + put(cmd_tx, Cmd::Send(b\"GET / HTTP/1.1\\r\\n\\r\\n\")); + } + + print(\"*** TCPSocket: Receiving response\"); + auto buffer = {}; + auto done_receiving = false; + sync while (!done_receiving) { + put(cmd_tx, Cmd::Receive); + auto data = get(data_rx); + buffer @= data; + + // Completely crap detection of end-of-document. But here we go, we + // try to detect the trailing . Proper way would be to parse + // for 'content-length' or 'content-encoding' + s32 index = 0; + s32 partial_length = cast(length(data) - 7); + while (index < partial_length) { + // No string conversion yet, so check byte buffer one byte at + // a time. + auto c1 = data[index]; + if (c1 == cast('<')) { + auto c2 = data[index + 1]; + auto c3 = data[index + 2]; + auto c4 = data[index + 3]; + auto c5 = data[index + 4]; + auto c6 = data[index + 5]; + auto c7 = data[index + 6]; + if ( // i.e. if (data[index..] == '' + c2 == cast('/') && c3 == cast('h') && c4 == cast('t') && + c5 == cast('m') && c6 == cast('l') && c7 == cast('>') + ) { + print(\"*** TCPSocket: Detected \"); + put(cmd_tx, Cmd::Finish); + done_receiving = true; + } + } + index += 1; + } + } + + print(\"*** TCPSocket: Requesting shutdown\"); + sync { + put(cmd_tx, Cmd::Shutdown); + } + } + + composite main() { + channel cmd_tx -> cmd_rx; + channel data_tx -> data_rx; + new tcp_client({142, 250, 179, 163}, 80, cmd_rx, data_tx); // port 80 of google + new requester(cmd_tx, data_rx); + } + ").expect("compilation"); + + // This test is disabled because it performs a HTTP request to google. + // let rt = Runtime::new(1, true, pd).unwrap(); + // create_component(&rt, "", "main", no_args()); +} + +#[test] +fn test_sending_receiving_union() { + let pd = ProtocolDescription::parse(b" + union Cmd { + Set(u8[]), + Get, + Shutdown, + } + + primitive database(in rx, out tx) { + auto stored = {}; + auto done = false; + while (!done) { + sync { + auto command = get(rx); + if (let Cmd::Set(bytes) = command) { + print(\"database: storing value\"); + stored = bytes; + } else if (let Cmd::Get = command) { + print(\"database: returning value\"); + put(tx, stored); + } else if (let Cmd::Shutdown = command) { + print(\"database: shutting down\"); + done = true; + } else while (true) print(\"impossible\"); // no other case possible + } + } + } + + primitive client(out tx, in rx, u32 num_rounds) { + auto round = 0; + while (round < num_rounds) { + auto set_value = b\"hello there\"; + print(\"client: putting a value\"); + sync put(tx, Cmd::Set(set_value)); + + auto retrieved = {}; + print(\"client: retrieving what was sent\"); + sync { + put(tx, Cmd::Get); + retrieved = get(rx); + } + + if (set_value != retrieved) while (true) print(\"wrong!\"); + + round += 1; + } + + sync put(tx, Cmd::Shutdown); + } + + composite main() { + auto num_rounds = 5; + channel cmd_tx -> cmd_rx; + channel data_tx -> data_rx; + new database(cmd_rx, data_tx); + new client(cmd_tx, data_rx, num_rounds); + } + ").expect("compilation"); + let rt = Runtime::new(1, false, pd).unwrap(); + create_component(&rt, "", "main", no_args()); } \ No newline at end of file diff --git a/std/std.global.pdl b/std/std.global.pdl new file mode 100644 index 0000000000000000000000000000000000000000..6691b79689ad5b304289b4c9feff3de3011bf515 --- /dev/null +++ b/std/std.global.pdl @@ -0,0 +1,9 @@ +#module std.global + +func get(in input) -> T { #builtin } +func put(out output, T value) -> #type_void { #builtin } +func fires(#type_portlike port) -> bool { #builtin } +func create(#type_integerlike len) -> T[] { #builtin } +func length(#type_arraylike array) -> u32 { #builtin } +func assert(bool condition) -> #type_void { #builtin } +func print(string message) -> #type_void { #builtin } \ No newline at end of file diff --git a/std/std.internet.pdl b/std/std.internet.pdl new file mode 100644 index 0000000000000000000000000000000000000000..3e016622c4f8586d89ec371eb4e778dcc28f719e --- /dev/null +++ b/std/std.internet.pdl @@ -0,0 +1,12 @@ +#module std.internet + +union Cmd { + Send(u8[]), + Receive, + Finish, + Shutdown, +} + +primitive tcp_client(u8[] ip, u16 port, in cmds, out rx) { + #builtin +} diff --git a/std/std.random.pdl b/std/std.random.pdl new file mode 100644 index 0000000000000000000000000000000000000000..840e195232766dd8441b9ecb3ae89210e339739d --- /dev/null +++ b/std/std.random.pdl @@ -0,0 +1,3 @@ +#module std.random + +primitive random_u32(out generator, u32 min, u32 max, u32 num_sends) { #builtin }