diff --git a/Cargo.toml b/Cargo.toml index 7e7090b3c3a0324c9fa86cc906390a012002fe1e..9c3801265cc09b5ee9216ea9d148e4ce62661815 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ lazy_static = "1.4.0" libc = { version = "^0.2", optional = true } os_socketaddr = { version = "0.1.0", optional = true } -[dev-dependencies] +# randomness rand = "0.8.4" rand_pcg = "0.3.1" diff --git a/docs/runtime/sync.md b/docs/runtime/sync.md new file mode 100644 index 0000000000000000000000000000000000000000..ac3c9fcbe4b40abdf95617b092af5a2f8a1ccd1d --- /dev/null +++ b/docs/runtime/sync.md @@ -0,0 +1,3 @@ +# Synchronous Communication + +## \ No newline at end of file diff --git a/src/common.rs b/src/common.rs deleted file mode 100644 index 08587130bf657da1b637756355705f7d25c40ad5..0000000000000000000000000000000000000000 --- a/src/common.rs +++ /dev/null @@ -1,245 +0,0 @@ -///////////////////// PRELUDE ///////////////////// -pub(crate) use crate::protocol::{ComponentState, ProtocolDescription}; -pub(crate) use crate::runtime::{error::AddComponentError, NonsyncProtoContext, SyncProtoContext}; -pub(crate) use core::{ - cmp::Ordering, - fmt::{Debug, Formatter}, - hash::Hash, - ops::Range, - time::Duration, -}; -pub(crate) use maplit::hashmap; -pub(crate) use mio::{ - net::{TcpListener, TcpStream}, - Events, Interest, Poll, Token, -}; -pub(crate) use std::{ - collections::{BTreeMap, HashMap, HashSet}, - io::{Read, Write}, - net::SocketAddr, - sync::Arc, - time::Instant, -}; -pub(crate) use Polarity::*; - -pub(crate) trait IdParts { - fn id_parts(self) -> (ConnectorId, U32Suffix); -} - -/// Used by various distributed algorithms to identify connectors. -pub type ConnectorId = u32; - -/// Used in conjunction with the `ConnectorId` type to create identifiers for ports and components -pub type U32Suffix = u32; -#[derive(Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd)] - -/// Generalization of a port/component identifier -#[derive(serde::Serialize, serde::Deserialize)] -#[repr(C)] -pub struct Id { - pub(crate) connector_id: ConnectorId, - pub(crate) u32_suffix: U32Suffix, -} -#[derive(Clone, Debug, Default)] -pub struct U32Stream { - next: u32, -} - -/// Identifier of a component in a session -#[derive(Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize)] -pub struct ComponentId(Id); // PUB because it can be returned by errors - -/// Identifier of a port in a session -#[derive(Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize)] -#[repr(transparent)] -pub struct PortId(pub(crate) Id); - -impl PortId { - // TODO: Remove concept of ComponentId and PortId in this file - #[deprecated] - pub fn new(port: u32) -> Self { - return PortId(Id{ - connector_id: u32::MAX, - u32_suffix: port, - }); - } -} - -/// A safely aliasable heap-allocated payload of message bytes -#[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)] -pub struct Payload(pub Arc>); -#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)] - -/// "Orientation" of a port, determining whether they can send or receive messages with `put` and `get` respectively. -#[repr(C)] -#[derive(serde::Serialize, serde::Deserialize)] -pub enum Polarity { - Putter, // output port (from the perspective of the component) - Getter, // input port (from the perspective of the component) -} -#[derive(Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd)] - -/// "Orientation" of a transport-layer network endpoint, dictating how it's connection procedure should -/// be conducted. Corresponds with connect() / accept() familiar to TCP socket programming. -#[repr(C)] -pub enum EndpointPolarity { - Active, // calls connect() - Passive, // calls bind() listen() accept() -} - -#[derive(Debug, Clone)] -pub(crate) enum NonsyncBlocker { - Inconsistent, - ComponentExit, - SyncBlockStart, -} -#[derive(Debug, Clone)] -pub(crate) enum SyncBlocker { - Inconsistent, - SyncBlockEnd, - CouldntReadMsg(PortId), - CouldntCheckFiring(PortId), - PutMsg(PortId, Payload), -} -pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]); -pub(crate) struct DebuggableIter + Clone, T: Debug>(pub(crate) I); -///////////////////// IMPL ///////////////////// -impl IdParts for Id { - fn id_parts(self) -> (ConnectorId, U32Suffix) { - (self.connector_id, self.u32_suffix) - } -} -impl IdParts for PortId { - fn id_parts(self) -> (ConnectorId, U32Suffix) { - self.0.id_parts() - } -} -impl IdParts for ComponentId { - fn id_parts(self) -> (ConnectorId, U32Suffix) { - self.0.id_parts() - } -} -impl U32Stream { - pub(crate) fn next(&mut self) -> u32 { - if self.next == u32::MAX { - panic!("NO NEXT!") - } - self.next += 1; - self.next - 1 - } - pub(crate) fn n_skipped(mut self, n: u32) -> Self { - self.next = self.next.saturating_add(n); - self - } -} -impl From for PortId { - fn from(id: Id) -> PortId { - Self(id) - } -} -impl From for ComponentId { - fn from(id: Id) -> Self { - Self(id) - } -} -impl From<&[u8]> for Payload { - fn from(s: &[u8]) -> Payload { - Payload(Arc::new(s.to_vec())) - } -} -impl Payload { - /// Create a new payload of uninitialized bytes with the given length. - pub fn new(len: usize) -> Payload { - let mut v = Vec::with_capacity(len); - unsafe { - v.set_len(len); - } - Payload(Arc::new(v)) - } - /// Returns the length of the payload's byte sequence - pub fn len(&self) -> usize { - self.0.len() - } - /// Allows shared reading of the payload's contents - pub fn as_slice(&self) -> &[u8] { - &self.0 - } - - /// Allows mutation of the payload's contents. - /// Results in a deep copy in the event this payload is aliased. - pub fn as_mut_vec(&mut self) -> &mut Vec { - Arc::make_mut(&mut self.0) - } - - /// Modifies this payload, concatenating the given immutable payload's contents. - /// Results in a deep copy in the event this payload is aliased. - pub fn concatenate_with(&mut self, other: &Self) { - let bytes = other.as_slice().iter().copied(); - let me = self.as_mut_vec(); - me.extend(bytes); - } -} -impl serde::Serialize for Payload { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let inner: &Vec = &self.0; - inner.serialize(serializer) - } -} -impl<'de> serde::Deserialize<'de> for Payload { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let inner: Vec = Vec::deserialize(deserializer)?; - Ok(Self(Arc::new(inner))) - } -} -impl From> for Payload { - fn from(s: Vec) -> Self { - Self(s.into()) - } -} -impl Debug for PortId { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let (a, b) = self.id_parts(); - write!(f, "pid{}_{}", a, b) - } -} -impl Debug for ComponentId { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let (a, b) = self.id_parts(); - write!(f, "cid{}_{}", a, b) - } -} -impl Debug for Payload { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "Payload[{:?}]", DenseDebugHex(self.as_slice())) - } -} -impl std::ops::Not for Polarity { - type Output = Self; - fn not(self) -> Self::Output { - use Polarity::*; - match self { - Putter => Getter, - Getter => Putter, - } - } -} -impl Debug for DenseDebugHex<'_> { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - for b in self.0 { - write!(f, "{:02X?}", b)?; - } - Ok(()) - } -} - -impl + Clone, T: Debug> Debug for DebuggableIter { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { - f.debug_list().entries(self.0.clone()).finish() - } -} diff --git a/src/lib.rs b/src/lib.rs index fa75b6de1590bf488509d07a6a803de321356ece..69d61ed758a89489632c27ddfd5f90426d5d4fcc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,5 +6,6 @@ mod protocol; pub mod runtime; pub mod runtime2; mod collections; +mod random; pub use protocol::{ProtocolDescription, ProtocolDescriptionBuilder, ComponentCreationError}; \ No newline at end of file diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index 25782476114ff1097fca333d48ad04a7afb1c099..45f0140c2ddca9f96eebf5ad99202de5d8489cb2 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -736,25 +736,25 @@ impl Prompt { Method::SelectStart => { let num_cases = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_uint32(); let num_ports = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_uint32(); - if !ctx.select_start(num_cases, num_ports) { - return Ok(EvalContinuation::SelectStart(num_cases, num_ports)) - } + + return Ok(EvalContinuation::SelectStart(num_cases, num_ports)); }, Method::SelectRegisterCasePort => { let case_index = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_uint32(); let port_index = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_uint32(); let port_value = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_port_id(); - if !ctx.performed_select_start() { - return Ok(EvalContinuation::SelectRegisterPort(case_index, port_index, port_value)); - } + return Ok(EvalContinuation::SelectRegisterPort(case_index, port_index, port_value)); }, Method::SelectWait => { match ctx.performed_select_wait() { Some(select_index) => { cur_frame.expr_values.push_back(Value::UInt32(select_index)); }, - None => return Ok(EvalContinuation::SelectWait), + None => { + cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr.this.upcast())); + return Ok(EvalContinuation::SelectWait) + }, } }, Method::UserComponent => { diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 8a479bec378d74d907e86d322134bde9c0e794a0..45f880fbe64dec700ef72086e324877299969e5b 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -215,8 +215,6 @@ pub trait RunContext { fn fires(&mut self, port: PortId) -> Option; // None if not yet branched fn performed_fork(&mut self) -> Option; // None if not yet forked fn created_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared - fn performed_select_start(&mut self) -> bool; // true if performed - fn performed_select_register_port(&mut self) -> bool; // true if registered fn performed_select_wait(&mut self) -> Option; // None if not yet notified runtime of select blocker } diff --git a/src/protocol/parser/pass_rewriting.rs b/src/protocol/parser/pass_rewriting.rs index d4c57df2fb4049f5cad3aa38e938bcff0139d40d..99d2abe78fd4ca490f99cdf3503ca94e6799d2f2 100644 --- a/src/protocol/parser/pass_rewriting.rs +++ b/src/protocol/parser/pass_rewriting.rs @@ -115,7 +115,7 @@ impl Visitor for PassRewriting { ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId, select_id: SelectStatementId, case_index: usize, select_var_id: VariableId, select_var_type_id: TypeIdReference - ) -> (IfStatementId, EndIfStatementId) { + ) -> (IfStatementId, EndIfStatementId, ScopeId) { // Retrieve statement IDs associated with case let case = &ctx.heap[select_id].cases[case_index]; let case_guard_id = case.guard; @@ -133,7 +133,7 @@ impl Visitor for PassRewriting { // Link up body statement to end-if set_ast_statement_next(ctx, case_body_id, end_if_stmt_id.upcast()); - return (if_stmt_id, end_if_stmt_id) + return (if_stmt_id, end_if_stmt_id, case_scope_id); } // Precreate the block that will end up containing all of the @@ -206,8 +206,8 @@ impl Visitor for PassRewriting { // Create the call that indicates the start of the select block { - let num_cases_expression_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, total_num_cases as u64); - let num_ports_expression_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, total_num_ports as u64); + let num_cases_expression_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, total_num_cases as u64, ctx.arch.uint32_type_id); + let num_ports_expression_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, total_num_ports as u64, ctx.arch.uint32_type_id); let arguments = vec![ num_cases_expression_id.upcast(), num_ports_expression_id.upcast() @@ -230,8 +230,8 @@ impl Visitor for PassRewriting { for case_port_index in 0..case_num_ports { // Arguments to runtime call let (port_variable_id, port_variable_type) = locals[total_port_index]; // so far this variable contains the temporary variables for the port expressions - let case_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_index as u64); - let port_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_port_index as u64); + let case_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_index as u64, ctx.arch.uint32_type_id); + let port_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_port_index as u64, ctx.arch.uint32_type_id); let port_variable_expr_id = create_ast_variable_expr(ctx, self.current_procedure_id, port_variable_id, port_variable_type); let runtime_call_arguments = vec![ case_index_expr_id.upcast(), @@ -267,18 +267,24 @@ impl Visitor for PassRewriting { // Now we transform each of the select block case's guard and code into // a chained if-else statement. + let mut relative_pos = transformed_stmts.len() as i32; if total_num_cases > 0 { - let (if_stmt_id, end_if_stmt_id) = transform_select_case_code(ctx, self.current_procedure_id, id, 0, select_variable_id, select_variable_type); + let (if_stmt_id, end_if_stmt_id, scope_id) = transform_select_case_code(ctx, self.current_procedure_id, id, 0, select_variable_id, select_variable_type); + link_existing_child_to_new_parent_scope(ctx, &mut self.scope_buffer, outer_scope_id, scope_id, relative_pos); let first_end_if_stmt = &mut ctx.heap[end_if_stmt_id]; first_end_if_stmt.next = outer_end_block_id.upcast(); let mut last_if_stmt_id = if_stmt_id; let mut last_end_if_stmt_id = end_if_stmt_id; + let mut last_parent_scope_id = outer_scope_id; + let mut last_relative_pos = transformed_stmts.len() as i32 + 1; transformed_stmts.push(last_if_stmt_id.upcast()); for case_index in 1..total_num_cases { - let (if_stmt_id, end_if_stmt_id) = transform_select_case_code(ctx, self.current_procedure_id, id, case_index, select_variable_id, select_variable_type); + let (if_stmt_id, end_if_stmt_id, scope_id) = transform_select_case_code(ctx, self.current_procedure_id, id, case_index, select_variable_id, select_variable_type); let false_case_scope_id = ctx.heap.alloc_scope(|this| Scope::new(this, ScopeAssociation::If(last_if_stmt_id, false))); + link_existing_child_to_new_parent_scope(ctx, &mut self.scope_buffer, false_case_scope_id, scope_id, 0); + link_new_child_to_existing_parent_scope(ctx, &mut self.scope_buffer, last_parent_scope_id, false_case_scope_id, last_relative_pos); set_ast_if_statement_false_body(ctx, last_if_stmt_id, last_end_if_stmt_id, IfStatementCase{ body: if_stmt_id.upcast(), scope: false_case_scope_id }); let end_if_stmt = &mut ctx.heap[end_if_stmt_id]; @@ -286,11 +292,13 @@ impl Visitor for PassRewriting { last_if_stmt_id = if_stmt_id; last_end_if_stmt_id = end_if_stmt_id; + last_parent_scope_id = false_case_scope_id; + last_relative_pos = 0; } } // Final steps: set the statements of the replacement block statement, - // and link all of those statements together + // link all of those statements together, and update the scopes. let first_stmt_id = transformed_stmts[0]; let mut last_stmt_id = transformed_stmts[0]; for stmt_id in transformed_stmts.iter().skip(1).copied() { @@ -380,8 +388,8 @@ fn create_ast_call_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinit return call_expression_id; } -fn create_ast_literal_integer_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId, unsigned_value: u64) -> LiteralExpressionId { - let literal_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(ctx.arch.uint64_type_id)); +fn create_ast_literal_integer_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId, unsigned_value: u64, type_id: TypeId) -> LiteralExpressionId { + let literal_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(type_id)); return ctx.heap.alloc_literal_expression(|this| LiteralExpression{ this, span: InputSpan::new(), @@ -399,7 +407,7 @@ fn create_ast_equality_comparison_expr( variable_id: VariableId, variable_type: TypeIdReference, value: u64 ) -> BinaryExpressionId { let var_expr_id = create_ast_variable_expr(ctx, containing_procedure_id, variable_id, variable_type); - let int_expr_id = create_ast_literal_integer_expr(ctx, containing_procedure_id, value); + let int_expr_id = create_ast_literal_integer_expr(ctx, containing_procedure_id, value, ctx.arch.uint32_type_id); let cmp_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(ctx.arch.bool_type_id)); let cmp_expr_id = ctx.heap.alloc_binary_expression(|this| BinaryExpression{ this, @@ -440,6 +448,7 @@ fn create_ast_variable_declaration_stmt( ) -> MemoryStatementId { // Create the assignment expression, assigning the initial value to the variable let variable_expr_id = create_ast_variable_expr(ctx, containing_procedure_id, variable_id, variable_type); + let void_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(ctx.arch.void_type_id)); let assignment_expr_id = ctx.heap.alloc_assignment_expression(|this| AssignmentExpression{ this, operator_span: InputSpan::new(), @@ -448,7 +457,7 @@ fn create_ast_variable_declaration_stmt( operation: AssignmentOperator::Set, right: initial_value_expr_id, parent: ExpressionParent::None, - type_index: -1, + type_index: void_type_index, }); // Create the memory statement @@ -623,9 +632,9 @@ fn link_existing_child_to_new_parent_scope(ctx: &mut Ctx, scope_buffer: &mut Sco /// Will add a child scope to a parent scope using the relative position hint. fn add_child_scope_to_parent(ctx: &mut Ctx, scope_buffer: &mut ScopedBuffer, parent_scope_id: ScopeId, child_scope_id: ScopeId, relative_pos_hint: i32) { - let child_scope = &ctx.heap[child_scope_id]; + let parent_scope = &ctx.heap[parent_scope_id]; - let existing_scope_ids = scope_buffer.start_section_initialized(&child_scope.nested); + let existing_scope_ids = scope_buffer.start_section_initialized(&parent_scope.nested); let mut insert_pos = existing_scope_ids.len(); for index in 0..existing_scope_ids.len() { let existing_scope_id = existing_scope_ids[index]; @@ -665,4 +674,4 @@ fn add_new_procedure_expression_type(ctx: &mut Ctx, procedure_id: ProcedureDefin } return type_index as i32; -} +} \ No newline at end of file diff --git a/src/protocol/parser/pass_typing.rs b/src/protocol/parser/pass_typing.rs index 984befe76a8648c5093a3bab15390769ffaadf36..e0a8f8cd4e6f8ef95c3c3668a7c97f4978515729 100644 --- a/src/protocol/parser/pass_typing.rs +++ b/src/protocol/parser/pass_typing.rs @@ -2109,11 +2109,6 @@ impl PassTyping { monomorph.argument_types.push(type_id) } - println!("DEBUG: For procedure {} with polyargs {:#?}", ctx.heap[self.procedure_id].identifier.value.as_str(), self.poly_vars); - for infer_node in self.infer_nodes.iter() { - println!("DEBUG: [{:?}] has type: {}", infer_node.expr_id, infer_node.expr_type.display_name(&ctx.heap)); - } - // Determine if we have already assigned type indices to the expressions // before (the indices that, for a monomorph, can retrieve the type of // the expression). diff --git a/src/protocol/parser/type_table.rs b/src/protocol/parser/type_table.rs index 3d3375ba0a9b3ee3ddfc9738dab4dcd001754640..916ebf1f17a37369911fd8167247ac17c60a032f 100644 --- a/src/protocol/parser/type_table.rs +++ b/src/protocol/parser/type_table.rs @@ -469,8 +469,8 @@ impl PartialEq for MonoSearchKey { while self_index < self.parts.len() && other_index < other.parts.len() { // Retrieve part and flags - let (self_bits, _) = self.parts[self_index]; - let (other_bits, _) = other.parts[other_index]; + let (_self_bits, _) = self.parts[self_index]; + let (_other_bits, _) = other.parts[other_index]; let self_in_use = true; // (self_bits & Self::KEY_IN_USE) != 0; @Deduplication let other_in_use = true; // (other_bits & Self::KEY_IN_USE) != 0; @Deduplication @@ -1523,6 +1523,8 @@ impl TypeTable { _ => unreachable!(), }; let base_type = &self.mono_types[base_type_id.0 as usize]; + let base_type_size = base_type.size; + let base_type_alignment = base_type.alignment; let type_id = TypeId(self.mono_types.len() as i64); Self::set_search_key_to_type(&mut self.mono_search_key, &self.definition_lookup, &concrete_type.parts); @@ -1530,8 +1532,8 @@ impl TypeTable { self.mono_types.push(MonoType{ type_id, concrete_type, - size: base_type.size, - alignment: base_type.alignment, + size: base_type_size, + alignment: base_type_alignment, variant: MonoTypeVariant::Builtin }); diff --git a/src/protocol/tests/utils.rs b/src/protocol/tests/utils.rs index 74ae7df0032656594ca7bc8ced624f23bc6967f9..7dff07dacf06403751b5379528373bca3cd94c92 100644 --- a/src/protocol/tests/utils.rs +++ b/src/protocol/tests/utils.rs @@ -1272,7 +1272,5 @@ impl RunContext for FakeRunContext { fn fires(&mut self, _port: PortId) -> Option { unreachable!() } fn performed_fork(&mut self) -> Option { unreachable!() } fn created_channel(&mut self) -> Option<(Value, Value)> { unreachable!() } - fn performed_select_start(&mut self) -> bool { unreachable!() } - fn performed_select_register_port(&mut self) -> bool { unreachable!() } fn performed_select_wait(&mut self) -> Option { unreachable!() } } \ No newline at end of file diff --git a/src/random.rs b/src/random.rs new file mode 100644 index 0000000000000000000000000000000000000000..46fceaca3874a7b8080cf6b3ce8dc85d38568927 --- /dev/null +++ b/src/random.rs @@ -0,0 +1,39 @@ +/** + * random.rs + * + * Simple wrapper over a random number generator. Put here so that we can have + * a feature flag for particular forms of randomness. For now we'll use pseudo- + * randomness since that will help debugging. + */ + +use rand::{RngCore, SeedableRng}; +use rand_pcg; + +pub(crate) struct Random { + rng: rand_pcg::Lcg64Xsh32, +} + +impl Random { + pub(crate) fn new() -> Self { + use std::time::SystemTime; + + let now = SystemTime::now(); + let elapsed = match now.duration_since(SystemTime::UNIX_EPOCH) { + Ok(elapsed) => elapsed, + Err(err) => err.duration(), + }; + + let elapsed = elapsed.as_nanos(); + let seed = elapsed.to_le_bytes(); + + return Self::new_seeded(seed); + } + + pub(crate) fn new_seeded(seed: [u8; 16]) -> Self { + return Self{ rng: rand_pcg::Pcg32::from_seed(seed) } + } + + pub(crate) fn get_u64(&mut self) -> u64 { + return self.rng.next_u64(); + } +} \ No newline at end of file diff --git a/src/runtime/connector.rs b/src/runtime/connector.rs index 9809dd8ced586b0b5480b1f2278753cb527cc2ee..2aabac130545b88c7fb292d41e42d6a7a12b7493 100644 --- a/src/runtime/connector.rs +++ b/src/runtime/connector.rs @@ -123,8 +123,6 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ }; } - fn performed_select_start(&mut self) -> bool { unreachable!() } - fn performed_select_register_port(&mut self) -> bool { unreachable!() } fn performed_select_wait(&mut self) -> Option { unreachable!() } } diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index 8ed701cd2cf1427747cb35dfab60dcf2b9fd4093..ecd0823e7b5b49bb3004d6273b0adc5c11692fbd 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -28,7 +28,7 @@ pub struct CompCtx { port_id_counter: u32, } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, PartialEq, Eq)] pub struct LocalPortHandle(PortId); #[derive(Copy, Clone)] diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 011ce9823aeb18f74da57befad141de61da4ef0d..14fd2fb0dd6670acda349042897d5b70743c1c47 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -1,3 +1,4 @@ +use crate::random::Random; use crate::protocol::*; use crate::protocol::ast::ProcedureDefinitionId; use crate::protocol::eval::{ @@ -24,8 +25,6 @@ pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, PerformedGet(ValueGroup), - PerformedSelectStart, - PerformedSelectRegister, PerformedSelectWait(u32), None, } @@ -82,27 +81,11 @@ impl RunContext for ExecCtx { } } - fn performed_select_start(&mut self) -> bool { - match self.stmt.take() { - ExecStmt::None => return false, - ExecStmt::PerformedSelectStart => return true, - _ => unreachable!(), - } - } - - fn performed_select_register_port(&mut self) -> bool { - match self.stmt.take() { - ExecStmt::None => return false, - ExecStmt::PerformedSelectRegister => return true, - _ => unreachable!(), - } - } - fn performed_select_wait(&mut self) -> Option { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::PerformedSelectWait(selected_case) => Some(selected_case), - _ => unreachable!(), + _v => unreachable!(), } } } @@ -112,17 +95,136 @@ pub(crate) enum Mode { NonSync, // not in sync mode Sync, // in sync mode, can interact with other components SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block - BlockedGet, - BlockedPut, + BlockedGet, // blocked because we need to receive a message on a particular port + BlockedPut, // component is blocked because the port is blocked + BlockedSelect, // waiting on message to complete the select statement StartExit, // temporary state: if encountered then we start the shutdown process BusyExit, // temporary state: waiting for Acks for all the closed ports Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } +struct SelectCase { + involved_ports: Vec, +} + +// TODO: @Optimize, flatten cases into single array, have index-pointers to next case +struct SelectState { + cases: Vec, + next_case: u32, + num_cases: u32, + random: Random, + candidates_workspace: Vec, +} + +enum SelectDecision { + None, + Case(u32), // contains case index, should be passed along to PDL code +} + +type InboxMain = Vec>; + +impl SelectState { + fn new() -> Self { + return Self{ + cases: Vec::new(), + next_case: 0, + num_cases: 0, + random: Random::new(), + candidates_workspace: Vec::new(), + } + } + + fn handle_select_start(&mut self, num_cases: u32) { + self.cases.clear(); + self.next_case = 0; + self.num_cases = num_cases; + } + + /// Register a port as belonging to a particular case. As for correctness of + /// PDL code one cannot register the same port twice, this function might + /// return an error + fn register_select_case_port(&mut self, comp_ctx: &CompCtx, case_index: u32, _port_index: u32, port_id: PortId) -> Result<(), PortId> { + // Retrieve case and port handle + self.ensure_at_case(case_index); + let cur_case = &mut self.cases[case_index as usize]; + let port_handle = comp_ctx.get_port_handle(port_id); + debug_assert_eq!(cur_case.involved_ports.len(), _port_index as usize); + + // Make sure port wasn't added before, we disallow having the same port + // in the same select guard twice. + if cur_case.involved_ports.contains(&port_handle) { + return Err(port_id); + } + + cur_case.involved_ports.push(port_handle); + return Ok(()); + } + + /// Notification that all ports have been registered and we should now wait + /// until the appropriate messages have come in. + fn handle_select_waiting_point(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { + if self.num_cases != self.next_case { + // This happens when there are >=1 select cases written at the end + // of the select block. + self.ensure_at_case(self.num_cases - 1); + } + + return self.has_decision(inbox, comp_ctx); + } + + fn handle_updated_inbox(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { + return self.has_decision(inbox, comp_ctx); + } + + /// Internal helper, pushes empty cases inbetween last case and provided new + /// case index. + fn ensure_at_case(&mut self, new_case_index: u32) { + // Push an empty case for all intermediate cases that were not + // registered with a port. + debug_assert!(new_case_index >= self.next_case && new_case_index < self.num_cases); + for _ in self.next_case..new_case_index + 1 { + self.cases.push(SelectCase{ involved_ports: Vec::new() }); + } + self.next_case = new_case_index + 1; + } + + /// Checks if a decision can be reached + fn has_decision(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision { + self.candidates_workspace.clear(); + if self.cases.is_empty() { + // If there are no cases then we can immediately reach a "bogus + // decision". + return SelectDecision::Case(0); + } + + // Need to check for valid case + 'case_loop: for (case_index, case) in self.cases.iter().enumerate() { + for port_handle in case.involved_ports.iter().copied() { + let port_index = comp_ctx.get_port_index(port_handle); + if inbox[port_index].is_none() { + // Condition not satisfied + continue 'case_loop; + } + } + + // If here then the case guard is satisfied + self.candidates_workspace.push(case_index); + } + + if self.candidates_workspace.is_empty() { + return SelectDecision::None; + } else { + let candidate_index = self.random.get_u64() as usize % self.candidates_workspace.len(); + return SelectDecision::Case(self.candidates_workspace[candidate_index] as u32); + } + } +} + pub(crate) struct CompPDL { pub mode: Mode, pub mode_port: PortId, // when blocked on a port pub mode_value: ValueGroup, // when blocked on a put + select: SelectState, pub prompt: Prompt, pub control: ControlLayer, pub consensus: Consensus, @@ -132,7 +234,7 @@ pub(crate) struct CompPDL { // reserved per port. // Should be same length as the number of ports. Corresponding indices imply // message is intended for that port. - pub inbox_main: Vec>, + pub inbox_main: InboxMain, pub inbox_backup: Vec, } @@ -148,6 +250,7 @@ impl CompPDL { mode: Mode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), + select: SelectState::new(), prompt: initial_state, control: ControlLayer::default(), consensus: Consensus::new(), @@ -195,7 +298,9 @@ impl CompPDL { // Depending on the mode don't do anything at all, take some special // actions, or fall through and run the PDL code. match self.mode { - Mode::NonSync | Mode::Sync => {}, + Mode::NonSync | Mode::Sync | Mode::BlockedSelect => { + // continue and run PDL code + }, Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => { return Ok(CompScheduling::Sleep); } @@ -257,6 +362,7 @@ impl CompPDL { }, EC::Put(port_id, value) => { debug_assert_eq!(self.mode, Mode::Sync); + sched_ctx.log(&format!("Putting value {:?}", value)); let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); @@ -272,18 +378,32 @@ impl CompPDL { return Ok(CompScheduling::Immediate); } }, - EC::SelectStart(num_cases, num_ports) => { + EC::SelectStart(num_cases, _num_ports) => { debug_assert_eq!(self.mode, Mode::Sync); - todo!("finish handling select start") + self.select.handle_select_start(num_cases); + return Ok(CompScheduling::Requeue); }, EC::SelectRegisterPort(case_index, port_index, port_id) => { debug_assert_eq!(self.mode, Mode::Sync); - todo!("finish handling register port") + let port_id = port_id_from_eval(port_id); + if let Err(_err) = self.select.register_select_case_port(comp_ctx, case_index, port_index, port_id) { + todo!("handle registering a port multiple times"); + } + return Ok(CompScheduling::Immediate); }, EC::SelectWait => { debug_assert_eq!(self.mode, Mode::Sync); - self.handle_select_wait(sched_ctx, comp_ctx); - todo!("finish handling select wait") + let select_decision = self.select.handle_select_waiting_point(&self.inbox_main, comp_ctx); + if let SelectDecision::Case(case_index) = select_decision { + // Reached a conclusion, so we can continue immediately + self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); + self.mode = Mode::Sync; + return Ok(CompScheduling::Immediate); + } else { + // No decision yet + self.mode = Mode::BlockedSelect; + return Ok(CompScheduling::Sleep); + } }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { @@ -372,13 +492,6 @@ impl CompPDL { } } - /// Handles the moment where the PDL code has notified the runtime of all - /// the ports it is waiting on. - fn handle_select_wait(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - sched_ctx.log("Component waiting for select conclusion"); - - } - fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component exiting"); debug_assert_eq!(self.mode, Mode::StartExit); @@ -436,6 +549,12 @@ impl CompPDL { // We were indeed blocked self.mode = Mode::Sync; self.mode_port = PortId::new_invalid(); + } else if self.mode == Mode::BlockedSelect { + let select_decision = self.select.handle_updated_inbox(&self.inbox_main, comp_ctx); + if let SelectDecision::Case(case_index) = select_decision { + self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); + self.mode = Mode::Sync; + } } return; diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index b2bdfc0e49dedcc60515ed2e41aa32c3c5f874cb..666ecf3c637103d1c7646b01252373c244637536 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -424,6 +424,19 @@ impl Consensus { self.highest_id = header.highest_id; for peer in comp_ctx.iter_peers() { if peer.id == header.sending_id { + continue; // do not send to sender: it has the higher ID + } + + // also: only send if we received a message in this round + let mut performed_communication = false; // TODO: Revise, temporary fix + for port in self.ports.iter() { + if port.peer_comp_id == peer.id && port.mapping.is_some() { + performed_communication = true; + break; + } + } + + if !performed_communication { continue; } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index ed151bc3431831455d0c16588626da6b1c9778a9..823efa16f04108cbd8061cfaf249914b7bde1a60 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -89,6 +89,7 @@ fn test_simple_select() { let pd = ProtocolDescription::parse(b" func infinite_assert(T val, T expected) -> () { while (val != expected) { print(\"nope!\"); } + return (); } primitive receiver(in in_a, in in_b, u32 num_sends) { @@ -98,13 +99,13 @@ fn test_simple_select() { sync select { auto v = get(in_a) -> { print(\"got something from A\"); - infinite_assert(v, num_from_a); + auto _ = infinite_assert(v, num_from_a); num_from_a += 1; } auto v = get(in_b) -> { print(\"got something from B\"); - infinite_assert(v, num_from_b); - num_from_b +=1; + auto _ = infinite_assert(v, num_from_b); + num_from_b += 1; } } } @@ -121,7 +122,7 @@ fn test_simple_select() { } composite constructor() { - auto num_sends = 3; + auto num_sends = 15; channel tx_a -> rx_a; channel tx_b -> rx_b; new sender(tx_a, num_sends); @@ -129,6 +130,6 @@ fn test_simple_select() { new sender(tx_b, num_sends); } ").expect("compilation"); - let rt = Runtime::new(1, true, pd); + let rt = Runtime::new(3, false, pd); create_component(&rt, "", "constructor", no_args()); } \ No newline at end of file