diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 4f7c23baab3a6d06e4d7f391d0574ab5b733da77..a198f917218ce02aafdf8cd1a2e97c1115df25ca 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -33,6 +33,7 @@ use crate::PortId; use crate::common::ComponentState; use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; +use crate::runtime2::branch::PreparedStatement; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; @@ -69,28 +70,28 @@ pub(crate) struct ConnectorPDL { last_finished_handled: Option, } +// TODO: Remove remaining fields once 'fires()' is removed from language. struct ConnectorRunContext<'a> { branch_id: BranchId, consensus: &'a Consensus, - received: &'a HashMap, - scheduler: SchedulerCtx<'a>, - prepared_channel: Option<(Value, Value)>, - prepared_fork: Option, + prepared: PreparedStatement, } impl<'a> RunContext for ConnectorRunContext<'a>{ - fn did_put(&mut self, port: PortId) -> bool { - let port_id = PortIdLocal::new(port.0.u32_suffix); - let annotation = self.consensus.get_annotation(self.branch_id, port_id); - return annotation.registered_id.is_some(); + fn performed_put(&mut self, _port: PortId) -> bool { + return match self.prepared.take() { + PreparedStatement::None => false, + PreparedStatement::PerformedPut => true, + taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken) + }; } - fn get(&mut self, port: PortId) -> Option { - let port_id = PortIdLocal::new(port.0.u32_suffix); - match self.received.get(&port_id) { - Some(data) => Some(data.clone()), - None => None, - } + fn performed_get(&mut self, _port: PortId) -> Option { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::PerformedGet(value) => Some(value), + taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken), + }; } fn fires(&mut self, port: PortId) -> Option { @@ -99,12 +100,20 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ return annotation.expected_firing.map(|v| Value::Bool(v)); } - fn get_channel(&mut self) -> Option<(Value, Value)> { - return self.prepared_channel.take(); + fn created_channel(&mut self) -> Option<(Value, Value)> { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::CreatedChannel(ports) => Some(ports), + taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken), + }; } - fn get_fork(&mut self) -> Option { - return self.prepared_fork.take(); + fn performed_fork(&mut self) -> Option { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::ForkedExecution(path) => Some(path), + taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken), + }; } } @@ -181,7 +190,9 @@ impl ConnectorPDL { self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; - receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); + debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port); + receiving_branch.awaiting_port = PortIdLocal::new_invalid(); + receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); self.consensus.notify_of_received_message(receiving_branch_id, &message); // And prepare the branch for running @@ -212,10 +223,7 @@ impl ConnectorPDL { let mut run_context = ConnectorRunContext{ branch_id, consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), - prepared_fork: branch.prepared_fork.take(), + prepared: branch.prepared.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -251,43 +259,38 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; }, - RunResult::BranchMissingPortValue(port_id) => { + RunResult::BranchGet(port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `get()` is valid, so mark the branch as awaiting a message - branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - branch.awaiting_port = port_id; - self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); - - // Note: we only know that a branch is waiting on a message when - // it reaches the `get` call. But we might have already received - // a message that targets this branch, so check now. - let mut any_message_received = false; - for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message) { - // This branch can receive the message, so we do the - // fork-and-receive dance - let receiving_branch_id = self.tree.fork_branch(branch_id); - let branch = &mut self.tree[receiving_branch_id]; - - branch.insert_message(port_id, message.content.as_message().unwrap().clone()); - - self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message); - self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); - - any_message_received = true; - } - } - if any_message_received { - return ConnectorScheduling::Immediate; + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + // Note: we only know that a branch is waiting on a message when + // it reaches the `get` call. But we might have already received + // a message that targets this branch, so check now. + let mut any_message_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + branch.awaiting_port = PortIdLocal::new_invalid(); + branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_message_received = true; } - } else { - branch.sync_state = SpeculativeState::Inconsistent; + } + + if any_message_received { + return ConnectorScheduling::Immediate; } } RunResult::BranchAtSyncEnd => { @@ -310,27 +313,22 @@ impl ConnectorPDL { self.tree.push_into_queue(QueueKind::Runnable, right_id); let left_branch = &mut self.tree[left_id]; - left_branch.prepared_fork = Some(true); + left_branch.prepared = PreparedStatement::ForkedExecution(true); let right_branch = &mut self.tree[right_id]; - right_branch.prepared_fork = Some(false); + right_branch.prepared = PreparedStatement::ForkedExecution(false); } RunResult::BranchPut(port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `put()` is valid. - let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - comp_ctx.submit_message(Message::Data(DataMessage { - sync_header, data_header, - content: DataContent::Message(content), - })); - - self.tree.push_into_queue(QueueKind::Runnable, branch_id); - return ConnectorScheduling::Immediate; - } else { - branch.sync_state = SpeculativeState::Inconsistent; - } + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + comp_ctx.submit_message(Message::Data(DataMessage { + sync_header, data_header, + content: DataContent::Message(content), + })); + + branch.prepared = PreparedStatement::PerformedPut; + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; }, _ => unreachable!("unexpected run result {:?} in sync mode", run_result), } @@ -353,10 +351,7 @@ impl ConnectorPDL { let mut run_context = ConnectorRunContext{ branch_id: branch.id, consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), - prepared_fork: branch.prepared_fork.take(), + prepared: branch.prepared.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -399,7 +394,7 @@ impl ConnectorPDL { RunResult::NewChannel => { let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); - branch.prepared_channel = Some(( + branch.prepared = PreparedStatement::CreatedChannel(( Value::Output(PortId::new(putter.self_id.index)), Value::Input(PortId::new(getter.self_id.index)), ));