From e771fee620aae7adaf24f461587e55f63bb0c869 2021-11-15 12:57:41 From: MH Date: 2021-11-15 12:57:41 Subject: [PATCH] Remove unnecessary layer of abstraction --- diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index 3b16522574521e6ec3bdb70b57ee34b22073a949..f1d6a665dc146700bdb0f38d1bd752a35c97946f 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -192,18 +192,22 @@ impl Frame { type EvalResult = Result; +#[derive(Debug)] pub enum EvalContinuation { + // Returned in both sync and non-sync modes Stepping, - Inconsistent, - Terminal, - SyncBlockStart, + // Returned only in sync mode + BranchInconsistent, SyncBlockEnd, - NewComponent(DefinitionId, i32, ValueGroup), - NewChannel, NewFork, BlockFires(PortId), BlockGet(PortId), - Put(PortId, Value), + Put(PortId, ValueGroup), + // Returned only in non-sync mode + ComponentTerminated, + SyncBlockStart, + NewComponent(DefinitionId, i32, ValueGroup), + NewChannel, } // Note: cloning is fine, methinks. cloning all values and the heap regions then @@ -286,7 +290,7 @@ impl Prompt { if heap[cur_frame.definition].is_function() { todo!("End of function without return, return an evaluation error"); } - return Ok(EvalContinuation::Terminal); + return Ok(EvalContinuation::ComponentTerminated); } debug_log!("Taking step in '{}'", heap[cur_frame.definition].identifier().value.as_str()); @@ -623,7 +627,8 @@ impl Prompt { cur_frame.expr_values.push_front(msg_value); cur_frame.expr_values.push_front(port_value); cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id)); - return Ok(EvalContinuation::Put(port_id, deref_msg_value)); + let value_group = ValueGroup::from_store(&self.store, &[deref_msg_value]); + return Ok(EvalContinuation::Put(port_id, value_group)); } }, Method::Fires => { @@ -692,7 +697,7 @@ impl Prompt { let value = cur_frame.expr_values.pop_front().unwrap(); let value = self.store.maybe_read_ref(&value).clone(); if !value.as_bool() { - return Ok(EvalContinuation::Inconsistent) + return Ok(EvalContinuation::BranchInconsistent) } }, Method::Print => { @@ -942,7 +947,7 @@ impl Prompt { debug_assert!(prev_stack_idx == -1); debug_assert!(self.store.stack.len() == 0); self.store.stack.push(return_value); - return Ok(EvalContinuation::Terminal); + return Ok(EvalContinuation::ComponentTerminated); } debug_assert!(prev_stack_idx >= 0); diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index fade25ccafb1caa91db843d4930698c93c605a4c..64db29608cbe90b1fc3e2601f345642dbea7b1be 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -166,7 +166,7 @@ impl ProtocolDescription { // entirety. Find some way to interface with the parameter's types. pub(crate) fn new_component_v2( &self, module_name: &[u8], identifier: &[u8], arguments: ValueGroup - ) -> Result { + ) -> Result { // Find the module in which the definition can be found let module_root = self.lookup_module_root(module_name); if module_root.is_none() { @@ -210,9 +210,7 @@ impl ProtocolDescription { // By now we're sure that all of the arguments are correct. So create // the connector. - return Ok(ComponentState{ - prompt: Prompt::new(&self.types, &self.heap, definition_id, 0, arguments), - }); + return Ok(Prompt::new(&self.types, &self.heap, definition_id, 0, arguments)); } fn lookup_module_root(&self, module_name: &[u8]) -> Option { @@ -323,8 +321,8 @@ impl ComponentState { Ok(continuation) => match continuation { // TODO: Probably want to remove this translation EC::Stepping => continue, - EC::Inconsistent => return RR::BranchInconsistent, - EC::Terminal => return RR::ComponentTerminated, + EC::BranchInconsistent => return RR::BranchInconsistent, + EC::ComponentTerminated => return RR::ComponentTerminated, EC::SyncBlockStart => return RR::ComponentAtSyncStart, EC::SyncBlockEnd => return RR::BranchAtSyncEnd, EC::NewComponent(definition_id, monomorph_idx, args) => @@ -335,8 +333,7 @@ impl ComponentState { return RR::BranchFork, EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id), EC::BlockGet(port_id) => return RR::BranchGet(port_id), - EC::Put(port_id, value) => { - let value_group = ValueGroup::from_store(&self.prompt.store, &[value]); + EC::Put(port_id, value_group) => { return RR::BranchPut(port_id, value_group); }, } @@ -362,8 +359,8 @@ impl ComponentState { }, Ok(cont) => match cont { EvalContinuation::Stepping => continue, - EvalContinuation::Inconsistent => return NonsyncBlocker::Inconsistent, - EvalContinuation::Terminal => return NonsyncBlocker::ComponentExit, + EvalContinuation::BranchInconsistent => return NonsyncBlocker::Inconsistent, + EvalContinuation::ComponentTerminated => return NonsyncBlocker::ComponentExit, EvalContinuation::SyncBlockStart => return NonsyncBlocker::SyncBlockStart, // Not possible to end sync block if never entered one EvalContinuation::SyncBlockEnd => unreachable!(), @@ -427,9 +424,9 @@ impl ComponentState { }, Ok(cont) => match cont { EvalContinuation::Stepping => continue, - EvalContinuation::Inconsistent => return SyncBlocker::Inconsistent, + EvalContinuation::BranchInconsistent => return SyncBlocker::Inconsistent, // First need to exit synchronous block before definition may end - EvalContinuation::Terminal => unreachable!(), + EvalContinuation::ComponentTerminated => unreachable!(), // No nested synchronous blocks EvalContinuation::SyncBlockStart => unreachable!(), EvalContinuation::SyncBlockEnd => return SyncBlocker::SyncBlockEnd, @@ -445,13 +442,15 @@ impl ComponentState { }, EvalContinuation::Put(port, message) => { let payload; - match message { + + // Extract bytes from `put` + match &message.values[0] { Value::Null => { return SyncBlocker::Inconsistent; }, Value::Message(heap_pos) => { // Create a copy of the payload - let values = &self.prompt.store.heap_regions[heap_pos as usize].values; + let values = &message.regions[*heap_pos as usize]; let mut bytes = Vec::with_capacity(values.len()); for value in values { bytes.push(value.as_uint8()); diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index 511dfc1597390034c4913bd4a68a0b566fd6df2e..af82ecc0c0e1e6c2f3604af3ca8672e599204a40 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -1,8 +1,7 @@ use std::collections::HashMap; use std::ops::{Index, IndexMut}; -use crate::protocol::ComponentState; -use crate::protocol::eval::{Value, ValueGroup}; +use crate::protocol::eval::{Prompt, Value, ValueGroup}; use super::port::PortIdLocal; @@ -88,7 +87,7 @@ pub(crate) struct Branch { pub id: BranchId, pub parent_id: BranchId, // Execution state - pub code_state: ComponentState, + pub code_state: Prompt, pub sync_state: SpeculativeState, pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue` @@ -103,7 +102,7 @@ impl BranchListItem for Branch { impl Branch { /// Creates a new non-speculative branch - pub(crate) fn new_non_sync(component_state: ComponentState) -> Self { + pub(crate) fn new_non_sync(component_state: Prompt) -> Self { Branch { id: BranchId::new_invalid(), parent_id: BranchId::new_invalid(), @@ -200,7 +199,7 @@ pub(crate) struct ExecTree { impl ExecTree { /// Constructs a new execution tree with a single non-sync branch. - pub fn new(component: ComponentState) -> Self { + pub fn new(component: Prompt) -> Self { return Self { branches: vec![Branch::new_non_sync(component)], queues: [BranchQueue::new(); 3] diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index a198f917218ce02aafdf8cd1a2e97c1115df25ca..d6ce463e4618165e4fb42b37230c366570ec0fbd 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -29,9 +29,9 @@ use std::collections::HashMap; use std::sync::atomic::AtomicBool; -use crate::PortId; +use crate::{PortId, ProtocolDescription}; use crate::common::ComponentState; -use crate::protocol::eval::{Prompt, Value, ValueGroup}; +use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; use crate::runtime2::branch::PreparedStatement; @@ -56,12 +56,13 @@ impl ConnectorPublic { } } -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug)] pub(crate) enum ConnectorScheduling { - Immediate, // Run again, immediately - Later, // Schedule for running, at some later point in time - NotNow, // Do not reschedule for running - Exit, // Connector has exited + Immediate, // Run again, immediately + Later, // Schedule for running, at some later point in time + NotNow, // Do not reschedule for running + Exit, // Connector has exited + Error(EvalError), // Connector has experienced a fatal error } pub(crate) struct ConnectorPDL { @@ -149,7 +150,7 @@ impl Connector for ConnectorPDL { } impl ConnectorPDL { - pub fn new(initial: ComponentState) -> Self { + pub fn new(initial: Prompt) -> Self { Self{ tree: ExecTree::new(initial), consensus: Consensus::new(), @@ -225,17 +226,22 @@ impl ConnectorPDL { consensus: &self.consensus, prepared: branch.prepared.take(), }; - let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); + + let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context); + if let Err(eval_error) = run_result { + return ConnectorScheduling::Error(eval_error); + } + let run_result = run_result.unwrap(); // Handle the returned result. Note that this match statement contains // explicit returns in case the run result requires that the component's // code is ran again immediately match run_result { - RunResult::BranchInconsistent => { + EvalContinuation::BranchInconsistent => { // Branch became inconsistent branch.sync_state = SpeculativeState::Inconsistent; }, - RunResult::BranchMissingPortState(port_id) => { + EvalContinuation::BlockFires(port_id) => { // Branch called `fires()` on a port that has not been used yet. let port_id = PortIdLocal::new(port_id.0.u32_suffix); @@ -259,7 +265,7 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; }, - RunResult::BranchGet(port_id) => { + EvalContinuation::BlockGet(port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. let port_id = PortIdLocal::new(port_id.0.u32_suffix); @@ -293,7 +299,7 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; } } - RunResult::BranchAtSyncEnd => { + EvalContinuation::SyncBlockEnd => { let consistency = self.consensus.notify_of_finished_branch(branch_id); if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; @@ -302,7 +308,7 @@ impl ConnectorPDL { branch.sync_state = SpeculativeState::Inconsistent; } }, - RunResult::BranchFork => { + EvalContinuation::NewFork => { // Like the `NewChannel` result. This means we're setting up // a branch and putting a marker inside the RunContext for the // next time we run the PDL code @@ -317,7 +323,7 @@ impl ConnectorPDL { let right_branch = &mut self.tree[right_id]; right_branch.prepared = PreparedStatement::ForkedExecution(false); } - RunResult::BranchPut(port_id, content) => { + EvalContinuation::Put(port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); @@ -353,15 +359,19 @@ impl ConnectorPDL { consensus: &self.consensus, prepared: branch.prepared.take(), }; - let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); + let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context); + if let Err(eval_error) = run_result { + return ConnectorScheduling::Error(eval_error); + } + let run_result = run_result.unwrap(); match run_result { - RunResult::ComponentTerminated => { + EvalContinuation::ComponentTerminated => { branch.sync_state = SpeculativeState::Finished; return ConnectorScheduling::Exit; }, - RunResult::ComponentAtSyncStart => { + EvalContinuation::SyncBlockStart => { comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); debug_assert!(self.last_finished_handled.is_none()); @@ -371,27 +381,25 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; }, - RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { + EvalContinuation::NewComponent(definition_id, monomorph_idx, arguments) => { // Note: we're relinquishing ownership of ports. But because // we are in non-sync mode the scheduler will handle and check // port ownership transfer. debug_assert!(comp_ctx.workspace_ports.is_empty()); find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); - let new_state = ComponentState { - prompt: Prompt::new( - &sched_ctx.runtime.protocol_description.types, - &sched_ctx.runtime.protocol_description.heap, - definition_id, monomorph_idx, arguments - ), - }; - let new_component = ConnectorPDL::new(new_state); + let new_prompt = Prompt::new( + &sched_ctx.runtime.protocol_description.types, + &sched_ctx.runtime.protocol_description.heap, + definition_id, monomorph_idx, arguments + ); + let new_component = ConnectorPDL::new(new_prompt); comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone()); comp_ctx.workspace_ports.clear(); return ConnectorScheduling::Later; }, - RunResult::NewChannel => { + EvalContinuation::NewChannel => { let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); branch.prepared = PreparedStatement::CreatedChannel(( @@ -421,4 +429,18 @@ impl ConnectorPDL { ctx.notify_sync_end(&[]); self.last_finished_handled = None; } + + /// Runs the prompt repeatedly until some kind of execution-blocking + /// condition appears. + #[inline] + fn run_prompt(prompt: &mut Prompt, pd: &ProtocolDescription, ctx: &mut ConnectorRunContext) -> Result { + loop { + let result = prompt.step(&pd.types, &pd.heap, &pd.modules, ctx); + if let Ok(EvalContinuation::Stepping) = result { + continue; + } + + return result; + } + } } \ No newline at end of file diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index cb02d7df66e4af31130a3d9667dc1a8ebb237254..d3a3d9b736dae482f218da5772436b1c3c7ad8e4 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -423,8 +423,8 @@ impl ApplicationInterface { self.owned_ports.remove(position); } - let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?; - let connector = ConnectorPDL::new(state); + let prompt = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?; + let connector = ConnectorPDL::new(prompt); // Put on job queue { diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 092b65f090107f47fe0c48f9656a99c6f473e2e6..22299972b63d32556f9f855b82fd5f9e73066e32 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -48,7 +48,7 @@ impl Scheduler { // Keep running until we should no longer immediately schedule the // connector. let mut cur_schedule = ConnectorScheduling::Immediate; - while cur_schedule == ConnectorScheduling::Immediate { + while let ConnectorScheduling::Immediate = cur_schedule { self.handle_inbox_messages(scheduled); // Run the main behaviour of the connector, depending on its @@ -115,6 +115,11 @@ impl Scheduler { } self.try_go_to_sleep(connector_key, scheduled); + }, + ConnectorScheduling::Error(eval_error) => { + // Display error. Then exit + println!("Oh oh!\n{}", eval_error); + panic!("Abort!"); } } }