Define a custom ExpectationMP not working

I am trying to define a my own ExpectationMP and get a different energy estimation, instead of taking all states measured I want to take the k-most significant states. I tried to define my own expectation class and it did not work.

def find_max_diff(X_hist):
    max_diff = 0
    saved_k = 0
    X_hist = np.sort(X_hist)[::-1]
    for k in range(len(X_hist) - 1):
        if X_hist[k] - X_hist[k + 1] >= max_diff:
            max_diff = X_hist[k] - X_hist[k + 1]
            saved_k = k
    return X_hist[saved_k], X_hist[saved_k + 1]

def get_top_k_probs(X_hist):
    x_k, x_k_next = find_max_diff(X_hist)
    n = 0
    top_k_hist = [0] * len(X_hist)
    for i,v in enumerate(X_hist):
        if v >= x_k:
            top_k_hist[i] = v
            n += v
    print(f'top_k_hist: {top_k_hist}')
    top_k_probs = [v / n for v in top_k_hist]
    print(f'top_k_probs: {top_k_probs}')
    return top_k_probs

def estimate_top_k_energy(H, X_hist):
    top_k_probs = get_top_k_probs(X_hist)
    return qml.math.dot(top_k_probs, qml.SparseHamiltonian(H.sparse_matrix(), cost_h.wires).eigvals().real)

class TopKEnergyEstimation(ExpectationMP):
    def __init__(self, op):
        super().__init__(op)

    def process_counts(self, counts, wire_order):
        X_hist = np.asarray(list(counts.values()))
        return estimate_top_k_energy(self.obs, X_hist)

cost_h, mixer_h = qml.qaoa.cost.maxcut(G)

def circuit_wrapper(dev):
    @qml.qnode(dev)
    def circuit(params):
        for w in dev.wires:
            qml.Hadamard(wires=w)
        # params[0] is gamma values for all layers
        # params[1] is beta values for all layers
        qml.layer(qaoa_layer, len(params[0]), params[0], params[1], cost_h=cost_h, mixer_h=mixer_h)
        return TopKEnergyEstimation(cost_h)

    return circuit

And, finally, make sure to include the versions of your packages. Specifically, show us the output of qml.about().

Platform info:           Windows-10-10.0.19045-SP0
Python version:          3.9.12
Numpy version:           1.23.0
Scipy version:           1.8.0
Installed devices:
- default.clifford (PennyLane-0.36.0)
- default.gaussian (PennyLane-0.36.0)
- default.mixed (PennyLane-0.36.0)
- default.qubit (PennyLane-0.36.0)
- default.qubit.autograd (PennyLane-0.36.0)
- default.qubit.jax (PennyLane-0.36.0)
- default.qubit.legacy (PennyLane-0.36.0)
- default.qubit.tf (PennyLane-0.36.0)
- default.qubit.torch (PennyLane-0.36.0)
- default.qutrit (PennyLane-0.36.0)
- default.qutrit.mixed (PennyLane-0.36.0)
- null.qubit (PennyLane-0.36.0)
- lightning.qubit (PennyLane-Lightning-0.36.0)
- braket.aws.qubit (amazon-braket-pennylane-plugin-1.13.1)
- braket.local.qubit (amazon-braket-pennylane-plugin-1.13.1)

Hi @yokkon ,

Could you please provide the following information? It can help us understand the problem and find possible solutions:

  1. A minimal (but self-contained) working example.
    This is the simplest version of the code that reproduces the problem. It should include all necessary imports, data, functions, etc., so that we can copy-paste the code and reproduce the problem. However it shouldn’t contain any unnecessary data, functions, etc., for example gates and functions that can be removed to simplify the code.

  2. The full error traceback

If you’re not sure about these two points then make sure to check out this video which can help you.

With the information you provided so far my best guess is that you need to modify process_counts in a different way. Here’s a Minimal Working example that you can use as a base.

import pennylane as qml
from pennylane.measurements import ExpectationMP
import numpy as np

class TopKEnergyEstimation(ExpectationMP):
    def __init__(self, op):
        super().__init__(op)

    def process_counts(self, counts, wire_order):
        with qml.QueuingManager.stop_recording():
            # Add a subprocess here to get only the counts that you want
            # Eg: counts=counts[0:2]
            probs = qml.probs(wires=self.wires).process_counts(counts=counts, wire_order=wire_order)
        return self._calculate_expectation(probs)

dev = qml.device('default.qubit',wires=3)

@qml.qnode(dev)
def circuit():
  for w in dev.wires:
            qml.Hadamard(wires=w)

  return TopKEnergyEstimation(qml.PauliZ(0))

counts = {"000": 100, "100": 100}

wire_order = qml.wires.Wires((0, 1, 2))

res = qml.expval(qml.Z(0)).process_counts(counts, wire_order)
print(res)

I look forward to seeing your answers!

Hey Catalina,
Thank you for the quick response.
There is no error shown but the new TopKEnergyEstimation(ExpectationMP) measurement process I created is not being called.
I tried the solution you have shown here but it also does not work - I mean that instead of the TopKEnergyEstimation being called the regular expectation is returned from running the circuit.

When you run the circuit it calls some procedure of the return value which is a measurement process, I don’t know which method is being called but it is not process_counts so the question is how can I use the circuits such that it will call process_counts on the TopKEnergyEstimation?

Hi @CatalinaAlbornoz,

I think I found a solution. let me know what you think about what I did and whether it is a good practice?

from typing import Sequence, Tuple, Union
from pennylane.wires import Wires

class TopKEnergyEstimation(SampleMeasurement):
    def __init__(self, op):
        super().__init__(op)

    def process_samples(
        self,
        samples: Sequence[complex],
        wire_order: Wires,
        shot_range: Tuple[int] = None,
        bin_size: int = None,
    ):
        with qml.QueuingManager.stop_recording():
            counts = qml.counts(wires=self.wires).process_samples(samples, wire_order=wire_order)
            counts = self._counts_to_topk_counts(counts)
            probs = qml.probs(wires=self.wires).process_counts(counts=counts, wire_order=wire_order)
        return self._calculate_expectation(probs)

    def process_counts(self, counts, wire_order):
        with qml.QueuingManager.stop_recording():
            counts = self._counts_to_topk_counts(counts)
            probs = qml.probs(wires=self.wires).process_counts(counts=counts, wire_order=wire_order)
        
        return self._calculate_expectation(probs)
    
    def _calculate_expectation(self, probabilities):
        """
        Calculate the of expectation set of probabilities.

        Args:
            probabilities (array): the probabilities of collapsing to eigen states
        """
        eigvals = qml.math.asarray(self.eigvals(), dtype="float64")
        return qml.math.dot(probabilities, eigvals)

    def _counts_to_topk_counts(self, counts):
        """
        Find only the Top-K most sampled states measured

        Args:
            counts (dict): the counts of the sampled states
        """
        sorted_counts = sorted(counts.items(), key=lambda x:x[1], reverse=True)
        top_k = self._find_max_diff(sorted_counts)
        topk_counts = {sorted_counts[k][0]:sorted_counts[k][1] for k in range(top_k + 1)}
        print(f'topk_counts: {topk_counts}')
        return topk_counts

    def _find_max_diff(self, sorted_counts):
        max_diff = 0
        saved_k = 0
        for k in range(len(sorted_counts) - 1):
            if sorted_counts[k][1] - sorted_counts[k + 1][1] >= max_diff:
                max_diff = sorted_counts[k][1] - sorted_counts[k + 1][1]
                saved_k = k
        return saved_k

Hi @yokkon ,

You’re right my code wasn’t using the new function.

In your code can you please post the code where you use this class? In the code you shared I can only see how you’re defining the class but not using it. I’m also not very clear on what your main goal is. Would you be able to explain it in words so that I can see if there’s maybe an easier solution?

For example, are you using an optimization routine on a circuit and need the output to be a single expectation value?

Thanks!

I am trying to replace the qml.expval with a different measurement process that returns the energy estimation from only the top-k most sampled states, instead of the whole spectrum of states that were measured by the device.
Eventually I use the returned measurement as objective function for a gradient descent optimizer that uses autograd.

@dataclass
class MaxCutGraph:
    G: nx.Graph

def get_qaoa_topk_circuit_wrapper_according_to_graph(graph):
    cost_h, mixer_h = qml.qaoa.cost.maxcut(graph.G)
    nodes = graph.G.nodes

    def circuit_wrapper(dev):
        @qml.qnode(dev)
        def circuit(params):
            for w in nodes:
                qml.Hadamard(wires=w)
            # params[0] is gamma values for all layers
            # params[1] is beta values for all layers
            assert(len(params[0]) == len(params[1]))
            qml.layer(qaoa_layer, len(params[0]), params[0], params[1], cost_h=cost_h, mixer_h=mixer_h)
            return TopKEnergyEstimation(cost_h)

        return circuit
    return circuit_wrapper

Thanks for the details @yokkon !

Let me see how to solve this. I’ll get back to you in the next couple of days.