Hi! Here is a simplified version of my code. I really cannot make it shorter
. I have tried running it in my computer and it works. The structure of the code is the following:
- Imports.
- Setup of the Hamiltonian I use for testing.
- Utility functions: to define measurement schemes, compute classical shadows based on those schemes, and estimate the expectation value of a given Hamiltonian.
- Device configuration.
- Plotting results.
If you use verbose = True
in the function shadow_grouping_estimation_rmsd
, you will see the time it takes for some steps, and you will be able to verify that calculate_classical_shadow
doesn’t scale well with larger measurement budgets. Note also that the function get_measurement_scheme
also takes some time, but you should not worry about it, since it will typically be done once for a given Hamiltonian and then loaded from a file.
Also, when I used qml.QuantumTape
, the code was definitely faster, but the RMSDs did not decrease with the number of shots, which makes no sense! Maybe I was using them wrongly… It seemed as if using qml.QuantumTape
did not properly construct the classical shadow…
The code is commented for clarity, but if anything’s unclear or you have further questions, feel free to ask!
### Imports
import pennylane as qml
from pennylane import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from datetime import datetime
### Set up of the Hamiltonian
C1 = qml.PauliX(1) @ qml.PauliZ(2) + qml.PauliX(0) + qml.PauliX(0) @ qml.PauliZ(2)
C2 = qml.PauliX(0) @ qml.PauliZ(1) + qml.PauliZ(1)
C3 = qml.PauliX(0) @ qml.PauliY(1) @ qml.PauliX(2) + qml.PauliX(0) @ qml.PauliX(2)
terms = [*C1, *C2, *C3]
np.random.seed(66)
n_qubits = 3
coeffs = np.round(np.random.rand(len(terms)), 2)
H = qml.Hamiltonian(coeffs, terms, grouping_type="qwc")
### Important Functions
def format_pauli_string(op, n_qubits):
"""Format Pauli string as integers. E.g. X0 @ Z1 @ I2 -> [1, 3, 0]
Args:
op (qml.PauliX @ qml.PauliZ @ qml.Identity): Pauli string to be formatted.
n_qubits (int): number of qubits in the system.
Returns:
np.array: array of integers representing the Pauli string.
"""
pauli_map = {
qml.PauliX: 1,
qml.PauliY: 2,
qml.PauliZ: 3,
qml.Identity: 0,
}
pauli_array = np.zeros(n_qubits, dtype=int)
# check if term is product of Paulis
if isinstance(op, qml.ops.Prod):
for sub_op in op:
wire = sub_op.wires[0]
pauli_array[wire] = pauli_map.get(type(sub_op), 0)
# elif isinstance(op, qml.Identity):
# pass # already all zeros
# check if term is single Pauli operator
else:
wire = op.wires[0]
pauli_array[wire] = pauli_map.get(type(op), 0)
return pauli_array
def are_pauli_words_qwc(obs1, obs2):
"""Check qubit-wise commutativity between two Pauli strings in integer format.
Args:
pauli_a (np.ndarray): Array of integers representing Pauli string A.
pauli_b (np.ndarray): Array of integers representing Pauli string B.
Returns:
bool: True if A and B are QWC, False otherwise.
"""
for a, b in zip(obs1, obs2):
if a == 0 or b == 0:
continue # identity always commutes
elif a != b:
return False
return True
def format_hamiltonian(H):
"""Format Hamiltonian as a list of Pauli strings in integer format.
Args:
H (qml.Hamiltonian): Hamiltonian to be formatted.
n_qubits (int): number of qubits in the system.
Returns:
tuple (n_qubits, formatted Hamiltonian): Formatted Hamiltonian is an array representing the Hamiltonian. Each row is a Pauli string.
"""
n_qubits = len(H.wires)
ham = np.zeros((len(H.ops), n_qubits), dtype=int)
for i in range(len(H.ops)):
ham[i] = format_pauli_string(H.ops[i], n_qubits)
return H.coeffs, ham
def get_measurement_scheme(Ham, n_shots, return_weights=False, return_compat_obs=False):
"""
Function to obtain the measurement scheme for the Hamiltonian H.
The measurement scheme is obtained by first ordering the terms in H
by their weight and then iteratively replacing the Identity
operators in the measurement setting with the terms of the Hamiltonian
that are compatible with the measurement setting.
Args:
Ham (tuple): (coeffs, Pauli strings as int arrays), output of format_hamiltonian().
n_shots (int): Number of measurement settings (rounds) to generate.
return_weights (bool): If True, return weight history for each term.
return_compat_obs (bool): If True, return observables compatible with each setting.
Returns:
meas_scheme: Array of integer-encoded measurement settings.
Optionally: weights_tracker, obs_compat.
"""
coeffs, H_ops = Ham
n_terms, n_qubits = H_ops.shape
now = datetime.now()
meas_scheme = np.zeros((n_shots, n_qubits), dtype=int) # Measurement settings
weights_tracker = np.zeros((n_shots, n_terms)) if return_weights else None
N = np.ones(n_terms, dtype=int) # QWC counts per observable (start at 1 to avoid division by 0)
for i in range(n_shots):
# Efficient weight calculation using closed formula
weights = np.abs(coeffs) * (np.sqrt(N + 1) - np.sqrt(N)) / np.sqrt(N * (N + 1))
if return_weights:
weights_tracker[i] = weights
# Sort terms by descending weight
sorted_indices = np.argsort(weights)[::-1]
sorted_terms = H_ops[sorted_indices]
Q = np.zeros(n_qubits, dtype=int) # Start with identity on all qubits
n_changes = 0
# Fill in Q by trying to add high-weight compatible terms
for term in sorted_terms:
# Check if we have already filled all qubits
if n_changes >= n_qubits:
break
if are_pauli_words_qwc(Q, term):
# Substitute Pauli operators into available identity slots
for j in range(n_qubits):
# if Q is identity on qubit j and the term is non-identity on qubit j, replace Q_j with term_j
if Q[j] == 0 and term[j] != 0:
Q[j] = term[j]
n_changes += 1
if n_changes >= n_qubits:
break
# Store the new measurement setting
meas_scheme[i] = Q
# Update QWC compatibility counts for each term
for t in range(n_terms):
if are_pauli_words_qwc(H_ops[t], Q):
N[t] += 1
# Optional: build list of compatible measurement settings per observable
# Get, for each term in H, the indexes in meas_scheme of the settings that QWC with that observable
obs_compat = None
if return_compat_obs:
obs_compat = np.empty(n_terms, dtype=object)
for t in range(n_terms):
compat_idxs = np.array(
[i for i, Q in enumerate(meas_scheme) if are_pauli_words_qwc(H_ops[t], Q)]
)
obs_compat[t] = compat_idxs
finish = datetime.now()
print(f"Shots: {n_shots} \t Time for measurement scheme: {(finish - now).total_seconds()} s")
# Return according to flags
if return_weights:
if return_compat_obs:
return meas_scheme, weights_tracker, obs_compat
else:
return meas_scheme, weights_tracker
elif return_compat_obs:
return meas_scheme, obs_compat
else:
return meas_scheme
def calculate_classical_shadow(circuit_template, meas_scheme):
"""
Function to calculate the classical shadow of a quantum circuit.
Args:
circuit_template (function): circuit that prepares the quantum state.
meas_scheme (list): measurement scheme.
shadow_size (int): number of shots to be used in the measurement scheme.
n_qubits (int): number of qubits in the circuit.
Returns:
array: classical shadow of the quantum circuit. (outcomes, recipes)
"""
# use mapping to convert Pauli operators to integers
pauli_map = {0: qml.Identity, 1: qml.PauliX, 2: qml.PauliY, 3: qml.PauliZ}
shadow_size, n_qubits = meas_scheme.shape
outcomes = np.zeros((shadow_size, n_qubits), dtype=int)
# for each measurement setting in the measurement scheme, calculate the expectation value of the Pauli operators provided by the measurement setting
for snapshot in range(shadow_size):
obs = [pauli_map[int(meas_scheme[snapshot, i])](i) for i in range(n_qubits)]
outcomes[snapshot] = circuit_template(obs)
return (outcomes, meas_scheme)
def estimate_hamiltonian(Ham, shadow, obs_compat, non_idle_wires):
"""Function to estimate the expectation value of a Hamiltonian given its Pauli decomposition and a classical shadow obtained from a measurement scheme.
The expectation value is calculated using formula (12) from the original paper.
Args:
H (qml.Hamiltonian): Hamiltonian to estimate
shadow (tuple): classical shadow obtained from a measurement scheme.
Returns:
float: estimation of expected value of the Hamiltonian.
"""
coeffs, H = Ham
outcomes = shadow[0] # shape: (n_shots, n_qubits)
estimates = np.zeros(len(H)) # Store one estimate per term in the Hamiltonian
for i, (compat, wires) in enumerate(zip(obs_compat, non_idle_wires)):
# Skip if no compatible measurements were found for this observable
if len(compat) == 0:
continue
# Extract the subset of outcomes for compatible measurement settings and only on the relevant qubits (non-identity positions)
compat_outcomes = outcomes[compat][:, wires]
# Estimate the expected value of O^(i) as the mean of the product of outcome bits across qubits
estimates[i] = np.mean(np.prod(compat_outcomes, axis=1))
# Return the full Hamiltonian expectation value
return np.dot(coeffs, estimates)
def shadow_grouping_estimation_rmsd(Ham, qnode, number_of_runs, n_shotss, ideal_H, write_to_file=True, folder_name="Data", offset=0, verbose=False, k=10):
coeffs, H = format_hamiltonian(Ham)
n_terms, n_qubits = H.shape
# distances = np.zeros((number_of_runs, len(n_shotss)))
rmsds = []
# rmsds_std_devs = np.zeros(len(n_shotss))
# Precompute non-idle wires: for each observable O^(i) in the Hamiltonian decomposition, obtain the wires on which O^(i)_j != Id(wire=j)
non_idle_wires = [np.nonzero(H[i])[0] for i in range(n_terms)]
# Compute full measurement scheme and compatibility map for max shots.
# For each term in H, return which measurement settings (their indices in complete_meas_scheme) QWC with it
max_shots = n_shotss[-1]
complete_meas_scheme, complete_obs_compat = get_measurement_scheme((coeffs, H), max_shots, return_compat_obs=True)
# Populate the table with data
if verbose:
print(f"{'n_shots':20} {'time/run (s)':20} {'total_time (s)':25}")
for i, n_shots in enumerate(n_shotss):
meas_scheme = complete_meas_scheme[:n_shotss[i]]
obs_compat = [[idx for idx in compat if idx < n_shots] for compat in complete_obs_compat]
# get Paulis for each measurement setting in the measurement scheme
start_time = datetime.now()
# estimates of H for each run
for _ in range(number_of_runs):
shadow_time = datetime.now()
# calculate classical shadow using the measurement scheme
shadow = calculate_classical_shadow(qnode, meas_scheme)
if verbose:
print(f"\t Shadow {_:<20} {(datetime.now() - shadow_time).total_seconds():<20.4f}")
# estimate the (exp_val of the) Hamiltonian by estimating (the exp_val of) each observable O^(i) in the decomposition of the Hamiltonian
estimate_time = datetime.now()
sg_estimate = estimate_hamiltonian((coeffs, H), shadow, obs_compat, non_idle_wires) + offset
if verbose:
print(f"\t Estimate {_:<20} {(datetime.now() - estimate_time).total_seconds():<20.4f}")
rmsds.append(rmsd(sg_estimate, ideal_H))
total_time = (datetime.now() - start_time).total_seconds()
# time_per_run = total_time / number_of_runs
if verbose:
print(f"{n_shots:<20} {total_time / number_of_runs:<20.4f} {total_time:<25.4f}")
rmsds_arr = np.array(rmsds).reshape(len(n_shotss), number_of_runs)
means, std_dev = np.mean(rmsds_arr, axis=1), np.std(rmsds_arr, axis=1)
if write_to_file:
data = np.column_stack((n_shotss, means, std_dev))
# os.makedirs(folder_name, exist_ok=True)
np.savetxt(f"{folder_name}/rmsds_{n_shotss[0]}_{n_shotss[-1]}_opt.csv", data, delimiter=",", header="shots, rmsd, std_dev", comments='')
return means, std_dev
def rmsd(x, y):
"""root mean square difference"""
return np.sqrt(np.mean((x - y)**2))
### Device configuration
dev_ideal = qml.device("default.qubit", wires=n_qubits, shots=None)
dev = qml.device("lightning.qubit", wires=n_qubits, shots=1)
# circuit template to prepare the state
x = np.arange(2*n_qubits, dtype="float64")
def circuit():
for i in range(n_qubits):
qml.RY(x[i], wires=i)
for i in range(n_qubits-1):
qml.CNOT(wires=[i, i+1])
for i in range(n_qubits):
qml.RY(x[i+n_qubits], wires=i)
# ideal device to calculate the exact expectation value of the Hamiltonian
@qml.qnode(dev_ideal)
def qnode_ideal():
circuit()
return qml.expval(H)
ideal_H = qnode_ideal()
print("Ideal H =", np.round(ideal_H, 5))
# circuit to prepare state
@qml.qnode(dev)
def qnode(observables):
circuit()
return [qml.expval(obs) for obs in observables]
### Run algorithm and plot results
n_shotss = (
list(range(10, 110, 10)) +
list(range(200, 1100, 100)) +
list(range(2000, 11000, 1000))
)
number_of_runs = 10
means, std_dev = shadow_grouping_estimation_rmsd(H, qnode, number_of_runs, n_shotss, ideal_H, write_to_file=False, folder_name="Data", offset=0, verbose=True)
plt.rcParams['font.size'] = 14
plt.rcParams['axes.labelsize'] = 16
plt.rcParams['axes.titlesize'] = 18
plt.rcParams['xtick.labelsize'] = 14
plt.rcParams['ytick.labelsize'] = 14
# Create figure
fig, ax = plt.subplots(figsize=(8,6))
# Plot with error bars
ax.errorbar(n_shotss, means, yerr=std_dev, fmt='o-', color="teal", ecolor='lightgray', elinewidth=2, capsize=4, markersize=5)
# Titles and labels
ax.set_title(f"RMSD for different number of shots ({number_of_runs} runs for each)", pad=15)
ax.set_xlabel("Shots", labelpad=10)
ax.set_ylabel("RMSD", labelpad=10)
ax.set_xscale("log")
# Tight layout
plt.tight_layout()
# Show plot
plt.show()