diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index c6e919b4198fcbf0372ed9f3d74f12421fcf010f..943e149661563948bd38169e4e07b8c0ddd74a11 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -145,7 +145,10 @@ impl ConnectorPDL { // there is one that can receive this message. debug_assert!(ctx.workspace_branches.is_empty()); let mut branches = Vec::new(); // TODO: @Remove - self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches); + if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) { + // Old message, so drop it + return; + } for branch_id in branches.drain(..) { // This branch can receive, so fork and given it the message @@ -154,7 +157,7 @@ impl ConnectorPDL { let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); + self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); @@ -238,7 +241,7 @@ impl ConnectorPDL { // a message that targets this branch, so check now. let mut any_branch_received = false; for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) { + if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) { // This branch can receive the message, so we do the // fork-and-receive dance let receiving_branch_id = self.tree.fork_branch(branch_id); @@ -247,7 +250,7 @@ impl ConnectorPDL { branch.insert_message(port_id, message.content.as_message().unwrap().clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); + self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_branch_received = true;