Hello @isaacdevlugt , sorry that I needed some time.
My Code is quite complex and with other package dependencies so I created a simple version resulting in the same issue. And here is the output from qml.about():
Name: PennyLane
Version: 0.31.0
Summary: PennyLane is a Python quantum machine learning library by Xanadu Inc.
Home-page: https://github.com/PennyLaneAI/pennylane
Author:
Author-email:
License: Apache License 2.0
Location: ...
Requires: appdirs, autograd, autoray, cachetools, networkx, numpy, pennylane-lightning, requests, rustworkx, scipy, semantic-version, toml
Required-by: PennyLane-Lightning
Platform info: Windows-10-10.0.19045-SP0
Python version: 3.11.4
Numpy version: 1.23.5
Scipy version: 1.10.0
Installed devices:
- default.gaussian (PennyLane-0.31.0)
- default.mixed (PennyLane-0.31.0)
- default.qubit (PennyLane-0.31.0)
- default.qubit.autograd (PennyLane-0.31.0)
- default.qubit.jax (PennyLane-0.31.0)
- default.qubit.tf (PennyLane-0.31.0)
- default.qubit.torch (PennyLane-0.31.0)
- default.qutrit (PennyLane-0.31.0)
- null.qubit (PennyLane-0.31.0)
- lightning.qubit (PennyLane-Lightning-0.31.0)
Code to reproduce:
# This code requires the following packages
import numpy as np
from numpy import pi
# Importing the frameworks of pennylane and tensorflow
import pennylane as qml
import tensorflow as tf
class NoisyModel(object):
"""Test Model"""
def __init__(self, learning_rate_theta=0.1, n_qubits =2, n_inputs=8, shots = 1, p_noise = 0):
# Amount of noise (p=0:no noise, p=3/4: fully depolarizing channel, p=1: uniform Pauli error channel)
self.p = p_noise # Depolarizing channel
self.shots = shots
self.optimizer_theta = tf.keras.optimizers.Adam(learning_rate = learning_rate_theta, amsgrad=True)
# Number of qubits in the circuit
self.n_qubits = n_qubits
self.n_inputs = n_inputs
# Device
dev = qml.device('default.mixed', wires = n_qubits, shots = shots)
self.dev = dev
# Create the pennylane circuit
self.circ = NoisyModel.create_circuit(self,dev)
# Build the keras model (tensorflow)
self.model = NoisyModel.create_model(self)
# Memory
memory_size = int(1e6)
self.memory = self.Memory(self.n_inputs, np.power(2,self.n_qubits), memory_size)
def create_circuit(self, dev):
@qml.qnode(dev, diff_method="best", interface='tf', max_expansion=2)
def quantum_circuit(inputs, angles):
NoisyModel.var_layer(self, angles)
#NoisyModel.input_layer(self, inputs)
NoisyModel.depolarizing_noise(self, self.p)
result = qml.sample(wires=range(self.n_qubits))
return result
return quantum_circuit
def create_model(self):
weight_shapes = {"angles": 3*self.n_qubits}
specsdict = {"angles": {"initializer": tf.keras.initializers.RandomUniform(minval=-np.pi, maxval=np.pi)}}
qlayer = qml.qnn.KerasLayer(self.circ, weight_shapes, input_dim = self.n_inputs, output_dim = self.shots*self.n_qubits, weight_specs = specsdict)
input = tf.keras.Input(shape=(self.n_inputs,))
prob=qlayer(input)
model = tf.keras.Model(
inputs=[input],
outputs=[prob],
name="noisy_model"
)
return(model)
def var_layer(self,angles): #trainable parameters (angles)
j=0
for i in range(0,self.n_qubits):
qml.RX(angles[j], wires=i)
qml.RY(angles[j+1], wires=i)
qml.RZ(angles[j+2], wires=i)
j=j+3
def depolarizing_noise(self,p):
# p defines the ammount of noise in each layer (p)
for i in range(self.n_qubits):
qml.DepolarizingChannel(p, wires=i)
def input_layer(self, inputs):
rotation_gates = [qml.RX, qml.RY, qml.RZ]
# inputs.shape: (batch_size, n_features)
batch_size, n_features = inputs.shape
for batch_index in range(batch_size):
for feature_index in range(n_features):
gate = rotation_gates[feature_index // self.n_qubits % len(rotation_gates)]
wire = feature_index % self.n_qubits
gate(inputs[batch_index, feature_index], wires=wire)
def sample(self, model, inputbatch):
outputs = model(inputbatch)
outputs = self.calculate_prob_from_shots(outputs)
probs = outputs/tf.reduce_sum(outputs, axis=1, keepdims=True)
return probs
def forward(self,model,inputbatch):
#sample probabilities
probs = self.sample(model, inputbatch)
return(probs)
def calculate_prob_from_shots(self, outcome):
# Count the basis states in a dictionary
reshaped_results = tf.reshape(outcome, (-1, self.shots, self.n_qubits))
comp_basis_states = tf.constant([[0,0],[0,1],[1,0],[1,1]], dtype = tf.int64)
comp_basis_states = tf.cast(comp_basis_states, dtype=tf.float32)
num_reshaped_results = len(reshaped_results)
all_results = tf.TensorArray(dtype=tf.float32, size = num_reshaped_results)
# Loop over batches
for j in tf.range(num_reshaped_results):
batch_outcome = reshaped_results[j]
batch_outcome = tf.cast(batch_outcome, dtype=tf.float32)
num_comp_basis_states = len(comp_basis_states)
result_probs = tf.TensorArray(dtype=tf.float32, size = num_comp_basis_states)
# Check which basis states occured and get their counts
for i in tf.range(num_comp_basis_states):
basis_state = comp_basis_states[i]
equaltens = tf.math.equal(batch_outcome, basis_state)
matches = tf.reduce_all(equaltens, axis=1)
# Count how often the state appears
count = tf.reduce_sum(tf.cast(matches, tf.int32))
# check if no match
if tf.math.equal(count,0):
prob = tf.constant(0.0, dtype=tf.float32)
else:
# Get the count value for this basis state and calc probability
tfcount = tf.cast(count,tf.float32)
tfshots = tf.constant(self.shots, dtype=tf.float32)
prob = tfcount/tfshots
# Add the probability for the basis state. The policy tensor will have the same order of basis states as given in comp_basis_states
result_probs = result_probs.write(i,prob)
all_results = all_results.write(j,result_probs.stack())
return all_results.stack()
def learn(self, batch_size):
# Get the gradients and loss from the update function
gradients = self.update(batch_size)
model_gradients=gradients[0] #get gradients as list : [gradients array thetas]
# Apply the gradients
self.optimizer_theta.apply_gradients(zip([model_gradients[0]], [self.model.trainable_variables[0]]))
return(gradients)
@tf.function
def update(self, batch_size):
with tf.GradientTape() as model_tape:
buffer_output = self.memory.sample_batch(batch_size)
inputs = buffer_output['input']
# Get prob from model for batch of inputs
all_new_probs = self.forward(self.model, inputs[0])
loss = -tf.reduce_mean(all_new_probs)
model_gradients = model_tape.gradient(loss, self.model.trainable_variables)
return(model_gradients)
class Memory:
def __init__(self, input_dim, probs_dim, size):
self.inputs_buf = np.zeros([size, input_dim], dtype=np.float32)
self.probs_buf = np.zeros([size, probs_dim], dtype=np.float32)
self.ptr, self.size, self.max_size = 0, 0, size
def store(self, inputs, probs):
self.inputs_buf[self.ptr] = inputs
self.probs_buf[self.ptr] = probs
self.ptr = (self.ptr+1) % self.max_size
self.size = min(self.size+1, self.max_size)
def sample_batch(self, batch_size=32):
idxs = np.random.randint(0, self.size, size=batch_size)
return dict(input=self.inputs_buf[idxs],
probs=self.probs_buf[idxs])
def train_loop(noisy_model, train_rounds, batch_size, batchinput):
for rounds in range(train_rounds):
probs = noisy_model.forward(noisy_model.model,batchinput[0])
noisy_model.memory.store(batchinput[0], probs)
noisy_model.learn(batch_size)
# Script to run:
testmodel = NoisyModel()
#print(testmodel)
#print(testmodel.model)
inputs = np.random.random(size = (1,testmodel.n_inputs))
#print(inputs)
#print(inputs.shape[0])
train_loop(testmodel, 1, 1, inputs)
# Or to test model directly:
#prob = testmodel.model(inputs)
#print(prob)
The problem is, that some internal calculation seem to not fit with the keras model. Either it has a missing required argument or reshape method does not work. I think the model input dimensions and output dimensions should be correct for the qml.sample() with the number of shots. If I reshape the result of the qnode in a wrapper like this: reshaped_result = tf.reshape(original_result, [1,-1]) it works for forward pass, but not for the gradients calculation.
Thank you really much for your help. I’m struggling with this for quite some time.