How to use pennylane-tensorflow model with noisy circuits and qml.sample()?

Hello, I’ve been working with pennylane and the tensorflow interface to build a hybrid model (with batched inputs). I’ve started with noiselless simulations with qml.probs() as “measurement” output and default.qubit device. This worked fine. Now I would like to test the model with the influence of noise. I would like to use default.mixed device and qml.sample() as measurement type. The resulting shot outputs I want to use to caculate the probability for each basis state. The problem is that tensorflow can’t work with the qml.sample output in the Qnode. I did write a wrappr around the qnode to calculate the probabilities directly as output of the Qnode. In this way forward pass is working correctly, but backpropagation with gradint calculation does not.

How can I build a working hybrid model with noise impact and qml.sample ?

System:
Python 3.11.4

Package infomation:
ipykernel
matplotlib==3.7.2
tensorflow==2.13.0
pennylane==0.31.0
pydot==1.4.2
graphviz==0.20.1
pandas==2.0.3
ipywidgets==8.0.7
wrds==3.1.6
swig==4.1.1
yfinance==0.2.26
stockstats==0.6.0
exchange_calendars==4.2.8
plotly==5.16.1
nbformat>=4.2.0

Hey @AnniZa! Welcome to the forum :rocket:

Can you share a minimal code example that replicates the issue you’re facing? That’ll help me try and figure out a solution for you :slight_smile:

Hi @isaacdevlugt,
thank you for the fast reply.

First I did change a small bug in pennylane 0.31.0 to work with tensorflow 2.13.0.
In the new tf version the function should_record_backprop() had changed the location from tensorflow\python\eager\tape.py to tensorflow\python\eager\record.py so I did change the calls in pennylane\math\utils.py to the new reference.
Is this fixed in newer versions of pennylane?

Now to the code:
1.) In the init method of my class: def init(self, shots, L) :

# Define the device for pennylane (circuit) (default.mixed -> mixed state simulator)
dev = qml.device('default.mixed', wires = n_qubits, shots = shots)
self.dev = dev
# Create the pennylane circuit
self.circ = Class.create_circuit(self,dev)                                             
# 2.) Build the keras model (tensorflow)
self.model = Class.create_model(self, L)

2.) create_circuit(self,dev) :

def create_circuit(self, dev):
      @qml.qnode(dev, diff_method="best", interface='tf', max_expansion=2)
      def quantum_circuit(inputs, angles):
          L = self.L
          n_qubits = self.n_qubits
          new_angles=Class.preprocess_angles(angles)
          new_inputs=Class.preprocess_inputs(inputs)
          #Apply the hadamard gates
          Class.hadamard(self)
          #Loop for each layer l
          for l in range(0,L):
                Class.layer1(self, new_angles[l])                   
                Class.layer2(self)                    
                Class.layer3(self, new_inputs[l])
                Class.depolarizing_noise(self, self.p)

          # Measurement (shots) and calculation of probabilities for basis states
          result = qml.sample(wires=range(n_qubits))
          # We need to flatten the tensor to [shot1,shot2,shot3...] for keras
          return result                     #returns the raw samples of the measurements (array for each qubit and shot) 

        # Wrap the qnode and retrieve reshaped ouput qnode 
        reshaped_circuit = Class.reshaped_qnode(quantum_circuit)
        return reshaped_circuit  

3.) My Wrapper to get a flattened tensor:

# Wrapper to get a new qnode with the correct dimension for the keras layer (batched)
    @staticmethod
    def reshaped_qnode(original_qnode):
        @wraps(original_qnode)
        def wrapper(*args, **kwargs):
            original_result = original_qnode(*args, **kwargs)
            reshaped_result = tf.reshape(original_result, [1,-1])       # here we need the first dimension to be the batch dim, the second returns a flattened tensor of the arays of measurement outcomes [shot1,shot2...]

4:) create_model(self, L) :

N=self.number_qubits
#Defining the keras layer
weight_shapes = {"angles": 3*N*L}  
specsdict = {"angles": {"initializer": tf.keras.initializers.RandomUniform(minval=-np.pi, maxval=np.pi, seed=self.seed)}} 
qlayer = qml.qnn.KerasLayer(self.circ, weight_shapes, input_dim = S, output_dim = self.shots*N, weight_specs = specsdict)                                                       
qlayer.output_dtype = tf.float32

#Defining the input
input = tf.keras.Input(shape=(S,))

#Defining the output
probability=qlayer(input)
probability= tf.cast(probability,dtype=tf.float32)

#Defining the keras model
model = tf.keras.Model(
inputs=[input],
       outputs=[probability])

return(model)

5.) Learning method:

def learn(self, batch_size):
    # Get the gradients from the update function (update())
    gradients = self.update(batch_size)
    model_gradients=gradients[1]          
    # Backpropagation: 
    self.optimizer_theta.apply_gradients(zip([gradients[0]], [self.model.trainable_variables[0]]))

    return(gradients)

6.) update(batch_size) :

@tf.function
def update(self, batch_size):
    with tf.GradientTape() as model_tape:
    memory_output = self.memory.sample_batch(batch_size)
    batch_states = memory_output[0]
    all_results  = self.forward_batch(self.model, batch_states)
    model_loss = loss_method(all_results)

    # Calculate gradients of model (list [grads theta, ...])
    model_gradients = model_tape.gradient(model_loss, self.model.trainable_variables)

    return(model_gradients) 

I hope that helps. So the issue is that if the return of the Qnode is qml.sample() it’s the return type “SampleMP” and tf can’t handle this type in the keras model. For this reason I tried to wrap it, but now I have issues with the calculation of the gradients in backpropagation.

Is there a way, so that I don’t need any workarounds as the wrapper?

Thanks for the help :slight_smile:

Hey @AnniZa, thank you for providing your code! I just want to be certain that I’m running the exact same thing that you are. Can you copy all of the code in one code block? Include every function definition, variable, etc., in one place :ok_hand:

Also, if you could show me the output of qml.about(), that would be awesome! :slight_smile:

Hello @isaacdevlugt , sorry that I needed some time.

My Code is quite complex and with other package dependencies so I created a simple version resulting in the same issue. And here is the output from qml.about():

Name: PennyLane
Version: 0.31.0
Summary: PennyLane is a Python quantum machine learning library by Xanadu Inc.
Home-page: https://github.com/PennyLaneAI/pennylane
Author: 
Author-email: 
License: Apache License 2.0
Location: ...
Requires: appdirs, autograd, autoray, cachetools, networkx, numpy, pennylane-lightning, requests, rustworkx, scipy, semantic-version, toml
Required-by: PennyLane-Lightning

Platform info:           Windows-10-10.0.19045-SP0
Python version:          3.11.4
Numpy version:           1.23.5
Scipy version:           1.10.0
Installed devices:
- default.gaussian (PennyLane-0.31.0)
- default.mixed (PennyLane-0.31.0)
- default.qubit (PennyLane-0.31.0)
- default.qubit.autograd (PennyLane-0.31.0)
- default.qubit.jax (PennyLane-0.31.0)
- default.qubit.tf (PennyLane-0.31.0)
- default.qubit.torch (PennyLane-0.31.0)
- default.qutrit (PennyLane-0.31.0)
- null.qubit (PennyLane-0.31.0)
- lightning.qubit (PennyLane-Lightning-0.31.0)

Code to reproduce:

# This code requires the following packages
import numpy as np
from numpy import pi

# Importing the frameworks of pennylane and tensorflow
import pennylane as qml
import tensorflow as tf

class NoisyModel(object):
    """Test Model"""
    
    def __init__(self, learning_rate_theta=0.1, n_qubits =2, n_inputs=8, shots = 1, p_noise = 0):                                          
        # Amount of noise (p=0:no noise, p=3/4: fully depolarizing channel, p=1: uniform Pauli error channel)
        self.p = p_noise                  # Depolarizing channel   
        self.shots = shots 
        self.optimizer_theta = tf.keras.optimizers.Adam(learning_rate = learning_rate_theta, amsgrad=True)
        # Number of qubits in the circuit
        self.n_qubits = n_qubits
        self.n_inputs = n_inputs

        # Device
        dev = qml.device('default.mixed', wires = n_qubits, shots = shots)
        self.dev = dev

        # Create the pennylane circuit
        self.circ = NoisyModel.create_circuit(self,dev) 

        # Build the keras model (tensorflow)
        self.model = NoisyModel.create_model(self)

        # Memory
        memory_size = int(1e6)
        self.memory = self.Memory(self.n_inputs, np.power(2,self.n_qubits), memory_size)

    def create_circuit(self, dev):
        @qml.qnode(dev, diff_method="best", interface='tf', max_expansion=2)
        def quantum_circuit(inputs, angles):
            NoisyModel.var_layer(self, angles)                                       
            #NoisyModel.input_layer(self, inputs)
            NoisyModel.depolarizing_noise(self, self.p)   

            result = qml.sample(wires=range(self.n_qubits))
            return result
        return quantum_circuit
    
    def create_model(self):
        weight_shapes = {"angles": 3*self.n_qubits}         
        specsdict = {"angles": {"initializer": tf.keras.initializers.RandomUniform(minval=-np.pi, maxval=np.pi)}}         
        qlayer = qml.qnn.KerasLayer(self.circ, weight_shapes, input_dim = self.n_inputs, output_dim = self.shots*self.n_qubits, weight_specs = specsdict)                                                       
    
        input = tf.keras.Input(shape=(self.n_inputs,))
        prob=qlayer(input)

        model = tf.keras.Model(
            inputs=[input],
            outputs=[prob],
            name="noisy_model"
        )
        return(model)
    
    def var_layer(self,angles):                                      #trainable parameters (angles)
        j=0
        for i in range(0,self.n_qubits):
            qml.RX(angles[j], wires=i)
            qml.RY(angles[j+1], wires=i)
            qml.RZ(angles[j+2], wires=i)
            j=j+3

    def depolarizing_noise(self,p):
        # p defines the ammount of noise in each layer (p)
        for i in range(self.n_qubits):
            qml.DepolarizingChannel(p, wires=i)  

    def input_layer(self, inputs):
        rotation_gates = [qml.RX, qml.RY, qml.RZ]
    
        # inputs.shape: (batch_size, n_features)
        batch_size, n_features = inputs.shape

        for batch_index in range(batch_size):
            for feature_index in range(n_features):
                gate = rotation_gates[feature_index // self.n_qubits % len(rotation_gates)]
                wire = feature_index % self.n_qubits
                gate(inputs[batch_index, feature_index], wires=wire)

    def sample(self, model, inputbatch):
        outputs = model(inputbatch)
        outputs = self.calculate_prob_from_shots(outputs)                
        probs = outputs/tf.reduce_sum(outputs, axis=1, keepdims=True)                                        
        return probs                      
    
    def forward(self,model,inputbatch):                                      
        #sample probabilities
        probs = self.sample(model, inputbatch)
        return(probs)
    
    def calculate_prob_from_shots(self, outcome):
        # Count the basis states in a dictionary
        reshaped_results = tf.reshape(outcome, (-1, self.shots, self.n_qubits))
        comp_basis_states = tf.constant([[0,0],[0,1],[1,0],[1,1]], dtype = tf.int64)
        comp_basis_states = tf.cast(comp_basis_states, dtype=tf.float32)

        num_reshaped_results = len(reshaped_results)
        all_results = tf.TensorArray(dtype=tf.float32, size = num_reshaped_results)
    
        # Loop over batches
        for j in tf.range(num_reshaped_results):
            batch_outcome = reshaped_results[j]
            batch_outcome = tf.cast(batch_outcome, dtype=tf.float32)
            num_comp_basis_states = len(comp_basis_states)
            result_probs = tf.TensorArray(dtype=tf.float32, size = num_comp_basis_states)

            # Check which basis states occured and get their counts
            for i in tf.range(num_comp_basis_states):
                basis_state = comp_basis_states[i]
                equaltens = tf.math.equal(batch_outcome, basis_state)
                matches = tf.reduce_all(equaltens, axis=1)

                # Count how often the state appears
                count = tf.reduce_sum(tf.cast(matches, tf.int32))
               
                # check if no match
                if tf.math.equal(count,0):
                    prob = tf.constant(0.0, dtype=tf.float32)
                else:
                    # Get the count value for this basis state and calc probability
                    tfcount = tf.cast(count,tf.float32)
                    tfshots = tf.constant(self.shots, dtype=tf.float32)
                    prob = tfcount/tfshots
                # Add the probability for the basis state. The policy tensor will have the same order of basis states as given in comp_basis_states
                result_probs = result_probs.write(i,prob)

            all_results = all_results.write(j,result_probs.stack())   
        
        return all_results.stack()  
    
    def learn(self, batch_size):

        # Get the gradients and loss from the update function
        gradients = self.update(batch_size)
        model_gradients=gradients[0]           #get gradients as list : [gradients array thetas]         

        # Apply the gradients
        self.optimizer_theta.apply_gradients(zip([model_gradients[0]], [self.model.trainable_variables[0]]))

        return(gradients)
    
    @tf.function
    def update(self, batch_size):
        
        with tf.GradientTape() as model_tape:
            buffer_output = self.memory.sample_batch(batch_size)
            inputs = buffer_output['input']
            # Get prob from model for batch of inputs
            all_new_probs  = self.forward(self.model, inputs[0])
            loss = -tf.reduce_mean(all_new_probs)

        model_gradients = model_tape.gradient(loss, self.model.trainable_variables)

        return(model_gradients)

    class Memory:
        def __init__(self, input_dim, probs_dim, size):
            self.inputs_buf = np.zeros([size, input_dim], dtype=np.float32)
            self.probs_buf = np.zeros([size, probs_dim], dtype=np.float32)
            self.ptr, self.size, self.max_size = 0, 0, size

        def store(self, inputs, probs):
            self.inputs_buf[self.ptr] = inputs
            self.probs_buf[self.ptr] = probs
            self.ptr = (self.ptr+1) % self.max_size
            self.size = min(self.size+1, self.max_size)

        def sample_batch(self, batch_size=32):
            idxs = np.random.randint(0, self.size, size=batch_size)
            return dict(input=self.inputs_buf[idxs],
                        probs=self.probs_buf[idxs])

def train_loop(noisy_model, train_rounds, batch_size, batchinput):
    for rounds in range(train_rounds):
        probs = noisy_model.forward(noisy_model.model,batchinput[0])
        noisy_model.memory.store(batchinput[0], probs)
        noisy_model.learn(batch_size)
    

# Script to run:
        
testmodel = NoisyModel()
#print(testmodel)
#print(testmodel.model)
inputs = np.random.random(size = (1,testmodel.n_inputs))
#print(inputs)
#print(inputs.shape[0]) 
train_loop(testmodel, 1, 1, inputs)   

# Or to test model directly:
#prob = testmodel.model(inputs)
#print(prob)

The problem is, that some internal calculation seem to not fit with the keras model. Either it has a missing required argument or reshape method does not work. I think the model input dimensions and output dimensions should be correct for the qml.sample() with the number of shots. If I reshape the result of the qnode in a wrapper like this: reshaped_result = tf.reshape(original_result, [1,-1]) it works for forward pass, but not for the gradients calculation.

Thank you really much for your help. I’m struggling with this for quite some time.

Hi @AnniZa,
Thank you for your question.
Just to be sure, is it not possible for you to upgrade to PennyLane version 0.34 which is the latest version?
I am seeing that the reshape issue might still exist in that version but it may be easier to find a fix.

Hi @AnniZa,

I hope you’ve found a solution to your problem. If you haven’t already then maybe the code below will help you. It uses ‘probs’ instead of ‘sample’ but from the description of your problem it seems like it’s what you’re looking for. Note that the code doesn’t really lead to good training. But it shows no errors so you can adapt it according to your needs and it can help you troubleshoot.

import pennylane as qml
import tensorflow as tf
tf.get_logger().setLevel('ERROR') # add this to ignore warnings


tf.keras.backend.set_floatx('float64') # add this to avoid issues

# Create a dataset
n_qubits = 2
n_datapoints = 3
feature_dim = 2 # feature_dim<=n_qubits
data = tf.zeros((n_datapoints, feature_dim))
labels = tf.constant([[1,0,0,0] for i in data]) # Change this if the number of qubits changes

# Create your device and QNode
dev = qml.device('default.mixed', wires=n_qubits, shots=None)

@qml.qnode(dev)
def quantum_circuit(inputs, weights):
    qml.AngleEmbedding(inputs, wires=range(feature_dim), rotation="X")
    for i in range(n_qubits):
        qml.RY(weights[i], wires=i)
    return qml.probs()

# Draw circuit
weights = tf.random.uniform(shape=[n_qubits])
print(qml.draw(quantum_circuit, decimals=1)(data, weights))

# The code below is largely based on this demo https://pennylane.ai/qml/demos/tutorial_qnn_module_tf/

# Create your Keras quantum layer
weight_shapes = {"weights": (n_qubits)}
qlayer = qml.qnn.KerasLayer(quantum_circuit, weight_shapes, output_dim=2**n_qubits)

# Create your classical layers and your model
inputs = tf.keras.Input(shape=(feature_dim,))
clayer_1 = tf.keras.layers.Dense(n_qubits)
clayer_2 = tf.keras.layers.Dense(2**n_qubits, activation="softmax")
model = tf.keras.models.Sequential([clayer_1, qlayer, clayer_2])

# Train the model
opt = tf.keras.optimizers.SGD(learning_rate=0.2)
model.compile(opt, loss="mae", metrics=["accuracy"])
model.fit(data, labels, batch_size=n_datapoints)

I hope this helps!

Hello @CatalinaAlbornoz ,

thank you for sharing the code. unfortunately I only had time yesterday to test it with the latest version from pennylane, but it occurs the same issue.
My code with qml.probs() works also fine, but I need to look at noisy simulations and thus that is not really an option for me. Is there a recommended way to use noisy simulations in pennylane with a hybrid (tensorflow) setup ? Furthermore I need each probability of the basis states as result, so qml.expval() is also not really what I’m looking for.

Hi @AnniZa,

I’m sorry for the delay in my response.

One hack that you could try is using qml.expval() and setting shots=1. This will be equivalent to taking a sample. Does this work for you?

Hi @CatalinaAlbornoz,
I also had the same idea to use qml.expval. I also upgraded to the tensorflow version 2.14, because it looks like there are some issues in versions <2.14 with py_func and symbolic tensors.

Unfortunately I get another error now:

    ValueError: Exception encountered when calling layer 'keras_layer' (type KerasLayer).
    
    in user code:
    
        File "...\Lib\site-packages\pennylane\qnn\keras.py", line 383, in call  *
            results = self._evaluate_qnode(inputs)
        File "...\Lib\site-packages\pennylane\qnn\keras.py", line 411, in _evaluate_qnode  *
            res = [tf.reshape(r, (tf.shape(x)[0], tf.reduce_prod(r.shape[1:]))) for r in res]
    
        ValueError: Cannot convert a partially known TensorShape <unknown> to a Tensor.
    
    Call arguments received by layer 'keras_layer' (type KerasLayer):
      • inputs=tf.Tensor(shape=(40, 8), dtype=float32)

In forward pass everything works fine, but when the tensors (x) are symbolic for backpropagation the tf.shape(x) results in this error. I used here
‘observable’: ‘[expval(Identity(wires=[0]) @ Identity(wires=[1])), expval(Identity(wires=[0]) @ PauliZ(wires=[1])), expval(PauliZ(wires=[0]) @ Identity(wires=[1])), expval(PauliZ(wires=[0]) @ PauliZ(wires=[1]))]’

and shots=1

Any idea how to fix this ?

Thanks for the help :).

Hey @AnniZa,

Apologies for the delay here! Someone from the PennyLane team will be here to help shorty :raised_hands:

Mind posting the full traceback? It would help me figure out what part of pennylane is generating the problem.

Hi @AnniZa :wave: , just following up on Christina’s message. Would you be able to share the full traceback with us? It would help us dig into what’s causing the problem you are experiencing :eyes: .