diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index e17481fd34027ccea8b01922c9bf820b8c76e02c..cb02d7df66e4af31130a3d9667dc1a8ebb237254 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -183,54 +183,46 @@ impl ConnectorApplication { match &cur_instruction { ApplicationSyncAction::Put(port_id, content) => { let port_id = *port_id; - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - let message = Message::Data(DataMessage { - sync_header, - data_header, - content: DataContent::Message(content.clone()), - }); - comp_ctx.submit_message(message); - self.tree.push_into_queue(QueueKind::Runnable, branch_id); - return ConnectorScheduling::Immediate; - } else { - branch.sync_state = SpeculativeState::Inconsistent; - } + + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + let message = Message::Data(DataMessage { + sync_header, + data_header, + content: DataContent::Message(content.clone()), + }); + comp_ctx.submit_message(message); + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; }, ApplicationSyncAction::Get(port_id) => { let port_id = *port_id; - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - branch.awaiting_port = port_id; - self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); - - let mut any_message_received = false; - for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message) { - // This branch can receive the message, so we do the - // fork-and-receive dance - let receiving_branch_id = self.tree.fork_branch(branch_id); - let branch = &mut self.tree[receiving_branch_id]; - debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); - self.branch_extra.push(instruction_idx + 1); - - branch.insert_message(port_id, message.content.as_message().unwrap().clone()); - - self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message); - self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); - - any_message_received = true; - } - } - if any_message_received { - return ConnectorScheduling::Immediate; + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + let mut any_message_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); + self.branch_extra.push(instruction_idx + 1); + + branch.insert_message(port_id, message.content.as_message().unwrap().clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_message_received = true; } - } else { - branch.sync_state = SpeculativeState::Inconsistent; + } + + if any_message_received { + return ConnectorScheduling::Immediate; } } }