diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 419df819b60b039f6930dbfea3d6269311a82723..a79d9810109b7215372f2f19213c6546221829ec 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -61,6 +61,7 @@ pub(crate) struct Branch { code_state: ComponentState, prepared_channel: Option<(Value, Value)>, sync_state: SpeculativeState, + halted_at_port: PortIdLocal, // invalid if not halted next_branch_in_queue: Option, // Message/port state received: HashMap, // TODO: @temporary, remove together with fires() @@ -77,6 +78,7 @@ impl Branch { code_state: component_state, prepared_channel: None, sync_state: SpeculativeState::RunningNonSync, + halted_at_port: PortIdLocal::new_invalid(), next_branch_in_queue: None, received: HashMap::new(), ports_delta: Vec::new(), @@ -98,6 +100,7 @@ impl Branch { code_state: parent_branch.code_state.clone(), prepared_channel: None, sync_state: SpeculativeState::RunningInSync, + halted_at_port: PortIdLocal::new_invalid(), next_branch_in_queue: None, received: parent_branch.received.clone(), ports_delta: parent_branch.ports_delta.clone(), @@ -105,7 +108,13 @@ impl Branch { } fn commit_to_sync(&mut self) { - self.index = BranchId::new(0); + // Logically impossible conditions (because we have a finished branch + // we are going to commit to) + debug_assert!(self.prepared_channel.is_none()); + debug_assert!(!self.halted_at_port.is_valid()); + + // Reset other variables to their defaults + self.index = BranchId::new_invalid(); self.parent_index = BranchId::new_invalid(); self.sync_state = SpeculativeState::RunningNonSync; self.next_branch_in_queue = None; @@ -404,6 +413,38 @@ impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { + // Check for new messages we haven't seen before. If any of the + // pending branches can accept the message, do so. + while let Some((target_port_id, message)) = self.inbox.next_message() { + let mut branch_idx = self.sync_pending_get.first; + while branch_idx != 0 { + let branch = &self.branches[branch_idx as usize]; + let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0); + + let target_port_index = self.ports.get_port_index(*target_port_id).unwrap(); + let port_mapping = self.ports.get_port(branch_idx, target_port_index); + + if branch.sync_state == SpeculativeState::HaltedAtBranchPoint && + branch.halted_at_port == *target_port_id && + port_mapping.last_registered_branch_id == message.sender_prev_branch_id { + // Branch may accept this mesage, so create a fork that + // contains this message in the inbox. + let new_branch_idx = self.branches.len() as u32; + let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch); + + self.ports.prepare_sync_branch(branch_idx, new_branch_idx); + let mapping = self.ports.get_port_mut(branch_idx, target_port_index); + mapping.last_registered_branch_id = message.sender_cur_branch_id; + + let new_branch_id = BranchId::new(new_branch_idx); + self.branches.push(new_branch); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id) + } + + branch_idx = next_branch_idx; + } + } + let scheduling = self.run_in_speculative_mode(sched_ctx, conn_ctx, delta_state); // When in speculative mode we might have generated new sync @@ -486,7 +527,6 @@ impl ConnectorPDL { // Handling connector messages // ------------------------------------------------------------------------- - #[inline] pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) { self.inbox.insert_message(target_port, message); } @@ -819,6 +859,7 @@ impl ConnectorPDL { if is_valid_get { // Mark as a branching point for future messages branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.halted_at_port = local_port_id; let branch_id = branch.index; Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id);