diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 943e149661563948bd38169e4e07b8c0ddd74a11..a198f917218ce02aafdf8cd1a2e97c1115df25ca 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -33,6 +33,7 @@ use crate::PortId; use crate::common::ComponentState; use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; +use crate::runtime2::branch::PreparedStatement; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; @@ -55,7 +56,7 @@ impl ConnectorPublic { } } -#[derive(Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time @@ -66,29 +67,31 @@ pub(crate) enum ConnectorScheduling { pub(crate) struct ConnectorPDL { tree: ExecTree, consensus: Consensus, + last_finished_handled: Option, } +// TODO: Remove remaining fields once 'fires()' is removed from language. struct ConnectorRunContext<'a> { branch_id: BranchId, consensus: &'a Consensus, - received: &'a HashMap, - scheduler: SchedulerCtx<'a>, - prepared_channel: Option<(Value, Value)>, + prepared: PreparedStatement, } impl<'a> RunContext for ConnectorRunContext<'a>{ - fn did_put(&mut self, port: PortId) -> bool { - let port_id = PortIdLocal::new(port.0.u32_suffix); - let annotation = self.consensus.get_annotation(self.branch_id, port_id); - return annotation.registered_id.is_some(); + fn performed_put(&mut self, _port: PortId) -> bool { + return match self.prepared.take() { + PreparedStatement::None => false, + PreparedStatement::PerformedPut => true, + taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken) + }; } - fn get(&mut self, port: PortId) -> Option { - let port_id = PortIdLocal::new(port.0.u32_suffix); - match self.received.get(&port_id) { - Some(data) => Some(data.clone()), - None => None, - } + fn performed_get(&mut self, _port: PortId) -> Option { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::PerformedGet(value) => Some(value), + taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken), + }; } fn fires(&mut self, port: PortId) -> Option { @@ -97,8 +100,20 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ return annotation.expected_firing.map(|v| Value::Bool(v)); } - fn get_channel(&mut self) -> Option<(Value, Value)> { - return self.prepared_channel.take(); + fn created_channel(&mut self) -> Option<(Value, Value)> { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::CreatedChannel(ports) => Some(ports), + taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken), + }; + } + + fn performed_fork(&mut self) -> Option { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::ForkedExecution(path) => Some(path), + taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken), + }; } } @@ -106,13 +121,26 @@ impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { self.handle_new_messages(comp_ctx); if self.tree.is_in_sync() { + // Run in sync mode let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); - if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); - return ConnectorScheduling::Immediate; - } else { - return scheduling + + // Handle any new finished branches + let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + self.last_finished_handled = Some(branch_id); + + + if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + // Actually found a solution + self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); + return ConnectorScheduling::Immediate; + } + + self.last_finished_handled = Some(branch_id); } + + return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; @@ -125,6 +153,7 @@ impl ConnectorPDL { Self{ tree: ExecTree::new(initial), consensus: Consensus::new(), + last_finished_handled: None, } } @@ -143,21 +172,28 @@ impl ConnectorPDL { pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - debug_assert!(ctx.workspace_branches.is_empty()); - let mut branches = Vec::new(); // TODO: @Remove - if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) { + if !self.consensus.handle_new_data_message(&message, ctx) { // Old message, so drop it return; } - for branch_id in branches.drain(..) { + let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + + let branch = &self.tree[branch_id]; + if branch.awaiting_port != message.data_header.target_port { continue; } + if !self.consensus.branch_can_receive(branch_id, &message) { continue; } + // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; - receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); + debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port); + receiving_branch.awaiting_port = PortIdLocal::new_invalid(); + receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); + self.consensus.notify_of_received_message(receiving_branch_id, &message); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); @@ -187,9 +223,7 @@ impl ConnectorPDL { let mut run_context = ConnectorRunContext{ branch_id, consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), + prepared: branch.prepared.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -225,43 +259,38 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; }, - RunResult::BranchMissingPortValue(port_id) => { + RunResult::BranchGet(port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `get()` is valid, so mark the branch as awaiting a message - branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - branch.awaiting_port = port_id; - self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); - - // Note: we only know that a branch is waiting on a message when - // it reaches the `get` call. But we might have already received - // a message that targets this branch, so check now. - let mut any_branch_received = false; - for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) { - // This branch can receive the message, so we do the - // fork-and-receive dance - let receiving_branch_id = self.tree.fork_branch(branch_id); - let branch = &mut self.tree[receiving_branch_id]; - - branch.insert_message(port_id, message.content.as_message().unwrap().clone()); - - self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); - self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); - - any_branch_received = true; - } - } - if any_branch_received { - return ConnectorScheduling::Immediate; + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + // Note: we only know that a branch is waiting on a message when + // it reaches the `get` call. But we might have already received + // a message that targets this branch, so check now. + let mut any_message_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + branch.awaiting_port = PortIdLocal::new_invalid(); + branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_message_received = true; } - } else { - branch.sync_state = SpeculativeState::Inconsistent; + } + + if any_message_received { + return ConnectorScheduling::Immediate; } } RunResult::BranchAtSyncEnd => { @@ -269,27 +298,37 @@ impl ConnectorPDL { if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); - } else if consistency == Consistency::Inconsistent { + } else { branch.sync_state = SpeculativeState::Inconsistent; } }, + RunResult::BranchFork => { + // Like the `NewChannel` result. This means we're setting up + // a branch and putting a marker inside the RunContext for the + // next time we run the PDL code + let left_id = branch_id; + let right_id = self.tree.fork_branch(left_id); + self.consensus.notify_of_new_branch(left_id, right_id); + self.tree.push_into_queue(QueueKind::Runnable, left_id); + self.tree.push_into_queue(QueueKind::Runnable, right_id); + + let left_branch = &mut self.tree[left_id]; + left_branch.prepared = PreparedStatement::ForkedExecution(true); + let right_branch = &mut self.tree[right_id]; + right_branch.prepared = PreparedStatement::ForkedExecution(false); + } RunResult::BranchPut(port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `put()` is valid. - let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - comp_ctx.submit_message(Message::Data(DataMessage { - sync_header, data_header, - content: DataContent::Message(content), - })); - - self.tree.push_into_queue(QueueKind::Runnable, branch_id); - return ConnectorScheduling::Immediate; - } else { - branch.sync_state = SpeculativeState::Inconsistent; - } + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + comp_ctx.submit_message(Message::Data(DataMessage { + sync_header, data_header, + content: DataContent::Message(content), + })); + + branch.prepared = PreparedStatement::PerformedPut; + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; }, _ => unreachable!("unexpected run result {:?} in sync mode", run_result), } @@ -312,9 +351,7 @@ impl ConnectorPDL { let mut run_context = ConnectorRunContext{ branch_id: branch.id, consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), + prepared: branch.prepared.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -327,6 +364,7 @@ impl ConnectorPDL { RunResult::ComponentAtSyncStart => { comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); + debug_assert!(self.last_finished_handled.is_none()); self.consensus.start_sync(comp_ctx); self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); @@ -356,7 +394,7 @@ impl ConnectorPDL { RunResult::NewChannel => { let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); - branch.prepared_channel = Some(( + branch.prepared = PreparedStatement::CreatedChannel(( Value::Output(PortId::new(putter.self_id.index)), Value::Input(PortId::new(getter.self_id.index)), )); @@ -381,5 +419,6 @@ impl ConnectorPDL { } ctx.notify_sync_end(&[]); + self.last_finished_handled = None; } } \ No newline at end of file