diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index a198f917218ce02aafdf8cd1a2e97c1115df25ca..d6ce463e4618165e4fb42b37230c366570ec0fbd 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -29,9 +29,9 @@ use std::collections::HashMap; use std::sync::atomic::AtomicBool; -use crate::PortId; +use crate::{PortId, ProtocolDescription}; use crate::common::ComponentState; -use crate::protocol::eval::{Prompt, Value, ValueGroup}; +use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; use crate::runtime2::branch::PreparedStatement; @@ -56,12 +56,13 @@ impl ConnectorPublic { } } -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug)] pub(crate) enum ConnectorScheduling { - Immediate, // Run again, immediately - Later, // Schedule for running, at some later point in time - NotNow, // Do not reschedule for running - Exit, // Connector has exited + Immediate, // Run again, immediately + Later, // Schedule for running, at some later point in time + NotNow, // Do not reschedule for running + Exit, // Connector has exited + Error(EvalError), // Connector has experienced a fatal error } pub(crate) struct ConnectorPDL { @@ -149,7 +150,7 @@ impl Connector for ConnectorPDL { } impl ConnectorPDL { - pub fn new(initial: ComponentState) -> Self { + pub fn new(initial: Prompt) -> Self { Self{ tree: ExecTree::new(initial), consensus: Consensus::new(), @@ -225,17 +226,22 @@ impl ConnectorPDL { consensus: &self.consensus, prepared: branch.prepared.take(), }; - let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); + + let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context); + if let Err(eval_error) = run_result { + return ConnectorScheduling::Error(eval_error); + } + let run_result = run_result.unwrap(); // Handle the returned result. Note that this match statement contains // explicit returns in case the run result requires that the component's // code is ran again immediately match run_result { - RunResult::BranchInconsistent => { + EvalContinuation::BranchInconsistent => { // Branch became inconsistent branch.sync_state = SpeculativeState::Inconsistent; }, - RunResult::BranchMissingPortState(port_id) => { + EvalContinuation::BlockFires(port_id) => { // Branch called `fires()` on a port that has not been used yet. let port_id = PortIdLocal::new(port_id.0.u32_suffix); @@ -259,7 +265,7 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; }, - RunResult::BranchGet(port_id) => { + EvalContinuation::BlockGet(port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. let port_id = PortIdLocal::new(port_id.0.u32_suffix); @@ -293,7 +299,7 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; } } - RunResult::BranchAtSyncEnd => { + EvalContinuation::SyncBlockEnd => { let consistency = self.consensus.notify_of_finished_branch(branch_id); if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; @@ -302,7 +308,7 @@ impl ConnectorPDL { branch.sync_state = SpeculativeState::Inconsistent; } }, - RunResult::BranchFork => { + EvalContinuation::NewFork => { // Like the `NewChannel` result. This means we're setting up // a branch and putting a marker inside the RunContext for the // next time we run the PDL code @@ -317,7 +323,7 @@ impl ConnectorPDL { let right_branch = &mut self.tree[right_id]; right_branch.prepared = PreparedStatement::ForkedExecution(false); } - RunResult::BranchPut(port_id, content) => { + EvalContinuation::Put(port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); @@ -353,15 +359,19 @@ impl ConnectorPDL { consensus: &self.consensus, prepared: branch.prepared.take(), }; - let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); + let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context); + if let Err(eval_error) = run_result { + return ConnectorScheduling::Error(eval_error); + } + let run_result = run_result.unwrap(); match run_result { - RunResult::ComponentTerminated => { + EvalContinuation::ComponentTerminated => { branch.sync_state = SpeculativeState::Finished; return ConnectorScheduling::Exit; }, - RunResult::ComponentAtSyncStart => { + EvalContinuation::SyncBlockStart => { comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); debug_assert!(self.last_finished_handled.is_none()); @@ -371,27 +381,25 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; }, - RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { + EvalContinuation::NewComponent(definition_id, monomorph_idx, arguments) => { // Note: we're relinquishing ownership of ports. But because // we are in non-sync mode the scheduler will handle and check // port ownership transfer. debug_assert!(comp_ctx.workspace_ports.is_empty()); find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); - let new_state = ComponentState { - prompt: Prompt::new( - &sched_ctx.runtime.protocol_description.types, - &sched_ctx.runtime.protocol_description.heap, - definition_id, monomorph_idx, arguments - ), - }; - let new_component = ConnectorPDL::new(new_state); + let new_prompt = Prompt::new( + &sched_ctx.runtime.protocol_description.types, + &sched_ctx.runtime.protocol_description.heap, + definition_id, monomorph_idx, arguments + ); + let new_component = ConnectorPDL::new(new_prompt); comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone()); comp_ctx.workspace_ports.clear(); return ConnectorScheduling::Later; }, - RunResult::NewChannel => { + EvalContinuation::NewChannel => { let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); branch.prepared = PreparedStatement::CreatedChannel(( @@ -421,4 +429,18 @@ impl ConnectorPDL { ctx.notify_sync_end(&[]); self.last_finished_handled = None; } + + /// Runs the prompt repeatedly until some kind of execution-blocking + /// condition appears. + #[inline] + fn run_prompt(prompt: &mut Prompt, pd: &ProtocolDescription, ctx: &mut ConnectorRunContext) -> Result { + loop { + let result = prompt.step(&pd.types, &pd.heap, &pd.modules, ctx); + if let Ok(EvalContinuation::Stepping) = result { + continue; + } + + return result; + } + } } \ No newline at end of file