diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index c96f4d21929db539d7f77c8ff70c5cc614c847bc..d0c89f190b75a73f8a246bb8d4fc762a8c864d24 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -695,7 +695,7 @@ impl Connector { } // No errors! Time to modify `cu` // create a new component and identifier - let cu = &mut self.unphased; + let Connector { phased, unphased: cu } = self; let new_cid = cu.ips.id_manager.new_component_id(); cu.proto_components.insert(new_cid, cu.proto_description.new_component(identifier, ports)); // update the ownership of moved ports @@ -708,7 +708,16 @@ impl Connector { if let Some(set) = cu.ips.port_info.owned.get_mut(&cu.native_component_id) { set.retain(|x| !ports.contains(x)); } - cu.ips.port_info.owned.insert(new_cid, ports.iter().copied().collect()); + let moved_port_set: HashSet = ports.iter().copied().collect(); + if let ConnectorPhased::Communication(comm) = phased { + // Preserve invariant: batches only reason about native's ports. + // Remove batch puts/gets for moved ports. + for batch in comm.native_batches.iter_mut() { + batch.to_put.retain(|port, _| !moved_port_set.contains(port)); + batch.to_get.retain(|port| !moved_port_set.contains(port)); + } + } + cu.ips.port_info.owned.insert(new_cid, moved_port_set); Ok(()) } }