From 088be76302456bb92d73df734fb835197a71c4c2 2021-11-14 15:34:10 From: mh Date: 2021-11-14 15:34:10 Subject: [PATCH] Implement multi-put and multi-get --- diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index aded401d18af5b9c3253fe43f2183f73167db1b4..3b16522574521e6ec3bdb70b57ee34b22073a949 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -585,7 +585,7 @@ impl Prompt { unreachable!("executor calling 'get' on value {:?}", value) }; - match ctx.get(port_id) { + match ctx.performed_get(port_id) { Some(result) => { // We have the result. Merge the `ValueGroup` with the // stack/heap storage. @@ -614,11 +614,12 @@ impl Prompt { let msg_value = cur_frame.expr_values.pop_front().unwrap(); let deref_msg_value = self.store.maybe_read_ref(&msg_value).clone(); - if ctx.did_put(port_id) { + if ctx.performed_put(port_id) { // We're fine, deallocate in case the expression value stack // held an owned value self.store.drop_value(msg_value.get_heap_pos()); } else { + // Prepare to execute again cur_frame.expr_values.push_front(msg_value); cur_frame.expr_values.push_front(port_value); cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id)); @@ -811,7 +812,7 @@ impl Prompt { LocalStatement::Channel(stmt) => { // Need to create a new channel by requesting it from // the runtime. - match ctx.get_channel() { + match ctx.created_channel() { None => { // No channel is pending. So request one Ok(EvalContinuation::NewChannel) @@ -893,7 +894,7 @@ impl Prompt { cur_frame.position = stmt.left_body.upcast(); } else { // Need to fork - if let Some(go_left) = ctx.get_fork() { + if let Some(go_left) = ctx.performed_fork() { // Runtime has created a fork if go_left { cur_frame.position = stmt.left_body.upcast(); diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 44988824b0ca822a44d4640b61b480ee6eb19ba5..fade25ccafb1caa91db843d4930698c93c605a4c 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -284,11 +284,11 @@ impl ProtocolDescription { // TODO: @temp Should just become a concrete thing that is passed in pub trait RunContext { - fn did_put(&mut self, port: PortId) -> bool; - fn get(&mut self, port: PortId) -> Option; // None if still waiting on message + fn performed_put(&mut self, port: PortId) -> bool; + fn performed_get(&mut self, port: PortId) -> Option; // None if still waiting on message fn fires(&mut self, port: PortId) -> Option; // None if not yet branched - fn get_fork(&mut self) -> Option; // None if not yet forked - fn get_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared + fn performed_fork(&mut self) -> Option; // None if not yet forked + fn created_channel(&mut self) -> Option<(Value, Value)>; // None if not yet prepared } #[derive(Debug)] @@ -301,7 +301,7 @@ pub enum RunResult { // Can only occur inside sync blocks BranchInconsistent, // branch has inconsistent behaviour BranchMissingPortState(PortId), // branch doesn't know about port firing - BranchMissingPortValue(PortId), // branch hasn't received message on input port yet + BranchGet(PortId), // branch hasn't received message on input port yet BranchAtSyncEnd, BranchFork, BranchPut(PortId, ValueGroup), @@ -334,7 +334,7 @@ impl ComponentState { EC::NewFork => return RR::BranchFork, EC::BlockFires(port_id) => return RR::BranchMissingPortState(port_id), - EC::BlockGet(port_id) => return RR::BranchMissingPortValue(port_id), + EC::BlockGet(port_id) => return RR::BranchGet(port_id), EC::Put(port_id, value) => { let value_group = ValueGroup::from_store(&self.prompt.store, &[value]); return RR::BranchPut(port_id, value_group); @@ -469,7 +469,7 @@ impl ComponentState { } impl RunContext for EvalContext<'_> { - fn did_put(&mut self, port: PortId) -> bool { + fn performed_put(&mut self, port: PortId) -> bool { match self { EvalContext::None => unreachable!(), EvalContext::Nonsync(_) => unreachable!(), @@ -479,7 +479,7 @@ impl RunContext for EvalContext<'_> { } } - fn get(&mut self, port: PortId) -> Option { + fn performed_get(&mut self, port: PortId) -> Option { match self { EvalContext::None => unreachable!(), EvalContext::Nonsync(_) => unreachable!(), @@ -518,7 +518,7 @@ impl RunContext for EvalContext<'_> { } } - fn get_channel(&mut self) -> Option<(Value, Value)> { + fn created_channel(&mut self) -> Option<(Value, Value)> { match self { EvalContext::None => unreachable!(), EvalContext::Nonsync(context) => { @@ -531,7 +531,7 @@ impl RunContext for EvalContext<'_> { } } - fn get_fork(&mut self) -> Option { + fn performed_fork(&mut self) -> Option { // Never actually used in the old runtime return None; } diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index a790ff23e52fc9ae7f37e70509474ac156234248..511dfc1597390034c4913bd4a68a0b566fd6df2e 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -52,6 +52,35 @@ pub(crate) enum SpeculativeState { Inconsistent, // branch can never represent a local solution, so halted } +#[derive(Debug)] +pub(crate) enum PreparedStatement { + CreatedChannel((Value, Value)), + ForkedExecution(bool), + PerformedPut, + PerformedGet(ValueGroup), + None, +} + +impl PreparedStatement { + pub(crate) fn is_none(&self) -> bool { + if let PreparedStatement::None = self { + return true; + } else { + return false; + } + } + + pub(crate) fn take(&mut self) -> PreparedStatement { + if let PreparedStatement::None = self { + return PreparedStatement::None; + } else { + let mut replacement = PreparedStatement::None; + std::mem::swap(self, &mut replacement); + return replacement; + } + } +} + /// The execution state of a branch. This envelops the PDL code and the /// execution state. And derived from that: if we're ready to keep running the /// code, or if we're halted for some reason (e.g. waiting for a message). @@ -63,9 +92,7 @@ pub(crate) struct Branch { pub sync_state: SpeculativeState, pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue` - pub inbox: HashMap, // TODO: Remove, currently only valid in single-get/put mode - pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove? - pub prepared_fork: Option, // TODO: See above + pub prepared: PreparedStatement, } impl BranchListItem for Branch { @@ -84,9 +111,7 @@ impl Branch { sync_state: SpeculativeState::RunningNonSync, awaiting_port: PortIdLocal::new_invalid(), next_in_queue: BranchId::new_invalid(), - inbox: HashMap::new(), - prepared_channel: None, - prepared_fork: None, + prepared: PreparedStatement::None, } } @@ -97,8 +122,7 @@ impl Branch { // (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_id.is_valid()) || // (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) // ); // forking from non-sync, or forking from a branching point - debug_assert!(parent_branch.prepared_channel.is_none()); - debug_assert!(parent_branch.prepared_fork.is_none()); + debug_assert!(parent_branch.prepared.is_none()); Branch { id: BranchId::new(new_index), @@ -107,20 +131,9 @@ impl Branch { sync_state: SpeculativeState::RunningInSync, awaiting_port: parent_branch.awaiting_port, next_in_queue: BranchId::new_invalid(), - inbox: parent_branch.inbox.clone(), - prepared_channel: None, - prepared_fork: None, + prepared: PreparedStatement::None, } } - - /// Inserts a message into the branch for retrieval by a corresponding - /// `get(port)` call. - pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) { - debug_assert!(target_port.is_valid()); - debug_assert!(self.awaiting_port == target_port); - self.awaiting_port = PortIdLocal::new_invalid(); - self.inbox.insert(target_port, contents); - } } /// Queue of branches. Just a little helper. @@ -295,8 +308,7 @@ impl ExecTree { branch.sync_state = SpeculativeState::RunningNonSync; debug_assert!(!branch.awaiting_port.is_valid()); branch.next_in_queue = BranchId::new_invalid(); - branch.inbox.clear(); - debug_assert!(branch.prepared_channel.is_none()); + debug_assert!(branch.prepared.is_none()); // Clear out all the queues for queue_idx in 0..NUM_QUEUES { diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 4f7c23baab3a6d06e4d7f391d0574ab5b733da77..a198f917218ce02aafdf8cd1a2e97c1115df25ca 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -33,6 +33,7 @@ use crate::PortId; use crate::common::ComponentState; use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; +use crate::runtime2::branch::PreparedStatement; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; @@ -69,28 +70,28 @@ pub(crate) struct ConnectorPDL { last_finished_handled: Option, } +// TODO: Remove remaining fields once 'fires()' is removed from language. struct ConnectorRunContext<'a> { branch_id: BranchId, consensus: &'a Consensus, - received: &'a HashMap, - scheduler: SchedulerCtx<'a>, - prepared_channel: Option<(Value, Value)>, - prepared_fork: Option, + prepared: PreparedStatement, } impl<'a> RunContext for ConnectorRunContext<'a>{ - fn did_put(&mut self, port: PortId) -> bool { - let port_id = PortIdLocal::new(port.0.u32_suffix); - let annotation = self.consensus.get_annotation(self.branch_id, port_id); - return annotation.registered_id.is_some(); + fn performed_put(&mut self, _port: PortId) -> bool { + return match self.prepared.take() { + PreparedStatement::None => false, + PreparedStatement::PerformedPut => true, + taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken) + }; } - fn get(&mut self, port: PortId) -> Option { - let port_id = PortIdLocal::new(port.0.u32_suffix); - match self.received.get(&port_id) { - Some(data) => Some(data.clone()), - None => None, - } + fn performed_get(&mut self, _port: PortId) -> Option { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::PerformedGet(value) => Some(value), + taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken), + }; } fn fires(&mut self, port: PortId) -> Option { @@ -99,12 +100,20 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ return annotation.expected_firing.map(|v| Value::Bool(v)); } - fn get_channel(&mut self) -> Option<(Value, Value)> { - return self.prepared_channel.take(); + fn created_channel(&mut self) -> Option<(Value, Value)> { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::CreatedChannel(ports) => Some(ports), + taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken), + }; } - fn get_fork(&mut self) -> Option { - return self.prepared_fork.take(); + fn performed_fork(&mut self) -> Option { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::ForkedExecution(path) => Some(path), + taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken), + }; } } @@ -181,7 +190,9 @@ impl ConnectorPDL { self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; - receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); + debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port); + receiving_branch.awaiting_port = PortIdLocal::new_invalid(); + receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); self.consensus.notify_of_received_message(receiving_branch_id, &message); // And prepare the branch for running @@ -212,10 +223,7 @@ impl ConnectorPDL { let mut run_context = ConnectorRunContext{ branch_id, consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), - prepared_fork: branch.prepared_fork.take(), + prepared: branch.prepared.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -251,43 +259,38 @@ impl ConnectorPDL { return ConnectorScheduling::Immediate; }, - RunResult::BranchMissingPortValue(port_id) => { + RunResult::BranchGet(port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `get()` is valid, so mark the branch as awaiting a message - branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - branch.awaiting_port = port_id; - self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); - - // Note: we only know that a branch is waiting on a message when - // it reaches the `get` call. But we might have already received - // a message that targets this branch, so check now. - let mut any_message_received = false; - for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message) { - // This branch can receive the message, so we do the - // fork-and-receive dance - let receiving_branch_id = self.tree.fork_branch(branch_id); - let branch = &mut self.tree[receiving_branch_id]; - - branch.insert_message(port_id, message.content.as_message().unwrap().clone()); - - self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message); - self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); - - any_message_received = true; - } - } - if any_message_received { - return ConnectorScheduling::Immediate; + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + // Note: we only know that a branch is waiting on a message when + // it reaches the `get` call. But we might have already received + // a message that targets this branch, so check now. + let mut any_message_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + branch.awaiting_port = PortIdLocal::new_invalid(); + branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_message_received = true; } - } else { - branch.sync_state = SpeculativeState::Inconsistent; + } + + if any_message_received { + return ConnectorScheduling::Immediate; } } RunResult::BranchAtSyncEnd => { @@ -310,27 +313,22 @@ impl ConnectorPDL { self.tree.push_into_queue(QueueKind::Runnable, right_id); let left_branch = &mut self.tree[left_id]; - left_branch.prepared_fork = Some(true); + left_branch.prepared = PreparedStatement::ForkedExecution(true); let right_branch = &mut self.tree[right_id]; - right_branch.prepared_fork = Some(false); + right_branch.prepared = PreparedStatement::ForkedExecution(false); } RunResult::BranchPut(port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `put()` is valid. - let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - comp_ctx.submit_message(Message::Data(DataMessage { - sync_header, data_header, - content: DataContent::Message(content), - })); - - self.tree.push_into_queue(QueueKind::Runnable, branch_id); - return ConnectorScheduling::Immediate; - } else { - branch.sync_state = SpeculativeState::Inconsistent; - } + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + comp_ctx.submit_message(Message::Data(DataMessage { + sync_header, data_header, + content: DataContent::Message(content), + })); + + branch.prepared = PreparedStatement::PerformedPut; + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; }, _ => unreachable!("unexpected run result {:?} in sync mode", run_result), } @@ -353,10 +351,7 @@ impl ConnectorPDL { let mut run_context = ConnectorRunContext{ branch_id: branch.id, consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), - prepared_fork: branch.prepared_fork.take(), + prepared: branch.prepared.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -399,7 +394,7 @@ impl ConnectorPDL { RunResult::NewChannel => { let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); - branch.prepared_channel = Some(( + branch.prepared = PreparedStatement::CreatedChannel(( Value::Output(PortId::new(putter.self_id.index)), Value::Input(PortId::new(getter.self_id.index)), )); diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 91764ee546a1731534007775d44156f10b8958f7..daea21188bf15d535f184b8faa1c08a3af865ae8 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,6 +1,7 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; +use crate::runtime2::inbox::BranchMarker; use super::ConnectorId; use super::branch::BranchId; @@ -14,19 +15,20 @@ use super::scheduler::ComponentCtx; struct BranchAnnotation { port_mapping: Vec, + cur_marker: BranchMarker, } #[derive(Debug)] pub(crate) struct LocalSolution { component: ConnectorId, final_branch_id: BranchId, - port_mapping: Vec<(ChannelId, BranchId)>, + port_mapping: Vec<(ChannelId, BranchMarker)>, } #[derive(Debug, Clone)] pub(crate) struct GlobalSolution { component_branches: Vec<(ConnectorId, BranchId)>, - channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info + channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info } // ----------------------------------------------------------------------------- @@ -54,7 +56,8 @@ pub(crate) struct Consensus { // --- State that is cleared after each round // Local component's state highest_connector_id: ConnectorId, - branch_annotations: Vec, + branch_annotations: Vec, // index is branch ID + branch_markers: Vec, // index is branch marker, maps to branch // Gathered state from communication encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, @@ -76,6 +79,7 @@ impl Consensus { return Self { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), + branch_markers: Vec::new(), encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), peers: Vec::new(), @@ -116,7 +120,9 @@ impl Consensus { expected_firing: None, }) .collect(), + cur_marker: BranchMarker::new_invalid(), }); + self.branch_markers.push(BranchId::new_invalid()); self.highest_connector_id = ctx.id; @@ -129,10 +135,13 @@ impl Consensus { // index is the length in `branch_annotations`. debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize); let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize]; + let new_marker = BranchMarker::new(self.branch_markers.len() as u32); let new_branch_annotations = BranchAnnotation{ port_mapping: parent_branch_annotations.port_mapping.clone(), + cur_marker: new_marker, }; self.branch_annotations.push(new_branch_annotations); + self.branch_markers.push(new_branch_id); } /// Notifies the consensus algorithm that a branch has reached the end of @@ -214,7 +223,7 @@ impl Consensus { expected_mapping: source_mapping.clone(), sending_port: port.port_id, target_port: peer_port_id, - new_mapping: BranchId::new_invalid(), + new_mapping: BranchMarker::new_invalid(), }, content: DataContent::SilentPortNotification, })); @@ -223,7 +232,7 @@ impl Consensus { target_mapping.push(( channel_id, - port.registered_id.unwrap_or(BranchId::new_invalid()) + port.registered_id.unwrap_or(BranchMarker::new_invalid()) )); } @@ -290,23 +299,27 @@ impl Consensus { // Construct data header // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. - debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ expected_mapping: branch.port_mapping.clone(), sending_port: port_info.self_id, target_port: port_info.peer_id, - new_mapping: branch_id + new_mapping: branch.cur_marker, }; // Update port mapping for mapping in &mut branch.port_mapping { if mapping.port_id == source_port_id { mapping.expected_firing = Some(true); - mapping.registered_id = Some(branch_id); + mapping.registered_id = Some(branch.cur_marker); } } + // Update branch marker + let new_marker = BranchMarker::new(self.branch_markers.len() as u32); + branch.cur_marker = new_marker; + self.branch_markers.push(branch_id); + self.encountered_ports.push(source_port_id); return (self.create_sync_header(ctx), data_header); @@ -546,7 +559,7 @@ impl Consensus { #[derive(Debug)] struct MatchedLocalSolution { final_branch_id: BranchId, - channel_mapping: Vec<(ChannelId, BranchId)>, + channel_mapping: Vec<(ChannelId, BranchMarker)>, matches: Vec, } diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index f1175db78a17bb428cdee71c3bf0cc6c8d43210a..91db3a3711b6eecb1bfb71020328bbc54fad2f25 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -13,10 +13,32 @@ use super::port::PortIdLocal; #[derive(Debug, Copy, Clone)] pub(crate) struct PortAnnotation { pub port_id: PortIdLocal, - pub registered_id: Option, + pub registered_id: Option, pub expected_firing: Option, } +/// Marker for a branch in a port mapping. A marker is, like a branch ID, a +/// unique identifier for a branch, but differs in that a branch only has one +/// branch ID, but might have multiple associated markers (i.e. one branch +/// performing a `put` three times will generate three markers. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct BranchMarker{ + marker: u32, +} + +impl BranchMarker { + #[inline] + pub(crate) fn new(marker: u32) -> Self { + debug_assert!(marker != 0); + return Self{ marker }; + } + + #[inline] + pub(crate) fn new_invalid() -> Self { + return Self{ marker: 0 } + } +} + /// The header added by the synchronization algorithm to all. #[derive(Debug, Clone)] pub(crate) struct SyncHeader { @@ -31,7 +53,7 @@ pub(crate) struct DataHeader { pub expected_mapping: Vec, pub sending_port: PortIdLocal, pub target_port: PortIdLocal, - pub new_mapping: BranchId, + pub new_mapping: BranchMarker, } // TODO: Very much on the fence about this. On one hand I thought making it a diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index e17481fd34027ccea8b01922c9bf820b8c76e02c..cb02d7df66e4af31130a3d9667dc1a8ebb237254 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -183,54 +183,46 @@ impl ConnectorApplication { match &cur_instruction { ApplicationSyncAction::Put(port_id, content) => { let port_id = *port_id; - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - let message = Message::Data(DataMessage { - sync_header, - data_header, - content: DataContent::Message(content.clone()), - }); - comp_ctx.submit_message(message); - self.tree.push_into_queue(QueueKind::Runnable, branch_id); - return ConnectorScheduling::Immediate; - } else { - branch.sync_state = SpeculativeState::Inconsistent; - } + + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + let message = Message::Data(DataMessage { + sync_header, + data_header, + content: DataContent::Message(content.clone()), + }); + comp_ctx.submit_message(message); + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; }, ApplicationSyncAction::Get(port_id) => { let port_id = *port_id; - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - branch.awaiting_port = port_id; - self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); - - let mut any_message_received = false; - for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message) { - // This branch can receive the message, so we do the - // fork-and-receive dance - let receiving_branch_id = self.tree.fork_branch(branch_id); - let branch = &mut self.tree[receiving_branch_id]; - debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); - self.branch_extra.push(instruction_idx + 1); - - branch.insert_message(port_id, message.content.as_message().unwrap().clone()); - - self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message); - self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); - - any_message_received = true; - } - } - if any_message_received { - return ConnectorScheduling::Immediate; + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + let mut any_message_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); + self.branch_extra.push(instruction_idx + 1); + + branch.insert_message(port_id, message.content.as_message().unwrap().clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_message_received = true; } - } else { - branch.sync_state = SpeculativeState::Inconsistent; + } + + if any_message_received { + return ConnectorScheduling::Immediate; } } } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index bb3ec93c94100031abfca83c71057e88ebd8512a..092b65f090107f47fe0c48f9656a99c6f473e2e6 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -348,11 +348,11 @@ impl Scheduler { // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { - println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); + // println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { - println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); + // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); } } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index f206f105f1c8485276b6d69fd891a5337d2a413a..3c2ce3c1a3b75f637a20b7534adf698a2395cefd 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -9,12 +9,10 @@ use crate::common::Id; use crate::protocol::eval::*; use crate::runtime2::native::{ApplicationSyncAction}; -// - // Generic testing constants, use when appropriate to simplify stress-testing -pub(crate) const NUM_THREADS: u32 = 1; // number of threads in runtime -pub(crate) const NUM_INSTANCES: u32 = 1; // number of test instances constructed -pub(crate) const NUM_LOOPS: u32 = 1; // number of loops within a single test (not used by all tests) +pub(crate) const NUM_THREADS: u32 = 3; // number of threads in runtime +pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed +pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) fn create_runtime(pdl: &str) -> Runtime { let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");