Hello,
I read the instructions for using different devices for simulating photonic operations. It seems that strawberryfields.tf is the fastest one for simulating large scale problems with support of gradients backprop. Right now, I try to run a machine learning task with this device and configured it to use GPU. Howeve , it does not make a difference for the execution speed when compared to using CPUs only. My guess is that strawberryfields.tf does not support GPUs? Please correct me if I am wrong. As this task is comparatively small, it does not make sense to me to have such a slow running speed. If this is not the correct device and there are other devices that can do this job better, please correct me.
I attached the code below for your reference. Thank you in advance,
Bests,
import pennylane as qml
import tensorflow as tf
import itertools
import math
import numpy as np
import pdb
class PhotonicGeneratorTF(tf.keras.layers.Layer):
def \__init_\_(
self,
num_modes,
cutoff_dim,
non_gaussian,
non_gaussian_gamma,
circuit_depth,
output_type="both",
interface="tf",
\*\*kwargs,
):
super().\__init_\_(\*\*kwargs)
if num_modes < 1:
raise ValueError("num_modes must be >= 1")
if cutoff_dim < 2:
raise ValueError("cutoff_dim must be >= 2")
if circuit_depth < 1:
raise ValueError("circuit_depth must be >= 1")
if non_gaussian is not None and non_gaussian not in ("kerr", "cubic"):
raise ValueError("non_gaussian must be None, 'kerr', or 'cubic'")
self.num_modes = int(num_modes)
self.cutoff_dim = int(cutoff_dim)
self.non_gaussian = non_gaussian
self.non_gaussian_gamma = float(non_gaussian_gamma)
self.circuit_depth = int(circuit_depth)
self.output_type = output_type
self.interface = interface
\# Strawberry Fields Fock backend (truncated)
self.dev = qml.device(
"strawberryfields.tf",
wires=self.num_modes,
cutoff_dim=self.cutoff_dim,
)
\# --- Learnable parameters ---
params_per_mode = 5 # Displacement(2) + Squeezing(2) + Rotation(1)
\# Keep params as float32 (fine). The simulator may still output float64.
self.params = tf.Variable(
initial_value=tf.random.normal(
\[self.circuit_depth, self.num_modes, params_per_mode\], dtype=tf.float32
),
trainable=True,
name="params",
) # \[L,M,5\]
num_bs_pairs = self.num_modes - 1
self.bs_params = tf.Variable(
initial_value=tf.random.normal(
\[self.circuit_depth, num_bs_pairs, 2\], dtype=tf.float32
),
trainable=True,
name="bs_params",
) # \[L,M-1,2\]
\# --- Build QNodes ---
self.\_build_qnodes(self.output_type)
def \_apply_layer(self, layer_params, layer_bs_params):
"""
Apply one layer of operations to the current state.
layer_params: \[M, 5\]
layer_bs_params: \[M-1, 2\]
"""
M = self.num_modes
num_bs_pairs = M - 1
\# A) Single mode Gaussian operations
for mode in range(M):
p = layer_params\[mode\] # \[5\]
qml.Displacement(p\[0\], p\[1\], wires=mode)
qml.Squeezing(p\[2\], p\[3\], wires=mode)
qml.Rotation(p\[4\], wires=mode)
\# B) Neighbor beamsplitters
for i in range(num_bs_pairs):
theta = layer_bs_params\[i, 0\]
phi = layer_bs_params\[i, 1\]
qml.Beamsplitter(theta, phi, wires=\[i, i + 1\])
\# C) Optional non-Gaussian op per mode
if self.non_gaussian is None:
return
for mode in range(M):
if self.non_gaussian == "kerr":
qml.Kerr(self.non_gaussian_gamma, wires=mode)
elif self.non_gaussian == "cubic":
qml.CubicPhase(self.non_gaussian_gamma, wires=mode)
def \_build_qnodes(self, qnode_type="both"):
"""
Args:
qnode_type:
- "expval" : build only the expectation-value QNode (self.\_qnode_expval)
- "probs" : build only the probabilities QNode (self.\_qnode_probs)
- "both" : build both (default)
"""
qnode_type = (qnode_type or "both").lower()
if qnode_type not in ("expval", "probs", "both"):
raise ValueError("qnode_type must be one of: 'expval', 'probs', 'both'")
M = self.num_modes
if qnode_type in ("expval", "both"):
@qml.qnode(self.dev, interface="tf")
def qnode_expval(params, bs_params, ops):
for layer in range(self.circuit_depth):
self.\_apply_layer(params\[layer\], bs_params\[layer\])
return qml.expval(ops)
self.\_qnode_expval = qnode_expval
if qnode_type in ("probs", "both"):
@qml.qnode(self.dev, interface="tf")
def qnode_probs(params, bs_params):
for layer in range(self.circuit_depth):
self.\_apply_layer(params\[layer\], bs_params\[layer\])
return qml.probs(wires=list(range(M)))
self.\_qnode_probs = qnode_probs
def observables_up_to_k_tensor(self, k):
"""
Build a feature vector by measuring TensorN observables over all subsets of modes
with size 1..k.
"""
if k < 1:
raise ValueError("k must be >= 1")
M = self.num_modes
k = min(k, M)
combos = \[\]
vals = \[\]
for degree in range(1, k + 1):
for comb in itertools.combinations(range(M), degree):
ops = qml.TensorN(wires=list(comb))
val = self.\_qnode_expval(self.params, self.bs_params, ops)
val = tf.convert_to_tensor(val)
scalar = tf.reshape(val, \[-1\])\[0\]
combos.append(tuple(comb))
vals.append(tf.cast(scalar, tf.float32))
if len(vals) == 0:
values = tf.zeros(\[0\], dtype=tf.float32)
else:
values = tf.stack(vals, axis=0)
return combos, values
def call(self, inputs=None, k=None, return_values=False, return_probs=False, training=False):
if (not return_values) and (not return_probs):
raise ValueError("At least one of return_values / return_probs must be True.")
out_values = None
out_probs = None
if return_values:
if k is None:
raise ValueError("k must be provided when return_values=True.")
\_, out_values = self.observables_up_to_k_tensor(k)
if return_probs:
if not hasattr(self, "\_qnode_probs"):
raise RuntimeError(
"qnode_probs was not built. Call \_build_qnodes(qnode_type='probs'|'both') "
"or initialize the model accordingly."
)
out_probs = self.\_qnode_probs(self.params, self.bs_params)
out_probs = tf.convert_to_tensor(out_probs) # keep simulator dtype (often float64)
if return_values and return_probs:
return out_values, out_probs
if return_values:
return out_values
return out_probs
def generate_photon_count_feature_tf(cutoff, num_modes, basis=None, num_freqs=2, dtype=tf.float32):
"""
Generate per-configuration photon-count features.
"""
photon_count_strings = list(itertools.product(range(cutoff), repeat=num_modes))
photon_count_tensors = tf.convert_to_tensor(photon_count_strings, dtype=dtype) # \[N, M\]
if basis is None:
return photon_count_tensors
if basis == "Fourier":
x = photon_count_tensors / tf.cast((cutoff - 1), dtype) # \[N, M\]
freq_bands = tf.pow(tf.constant(2.0, dtype=dtype), tf.range(num_freqs, dtype=dtype)) # \[F\]
x_expanded = x\[..., tf.newaxis\] \* freq_bands # \[N, M, F\]
sin_feat = tf.sin(tf.constant(2.0 \* math.pi, dtype=dtype) \* x_expanded)
cos_feat = tf.cos(tf.constant(2.0 \* math.pi, dtype=dtype) \* x_expanded)
features = tf.concat(\[sin_feat, cos_feat\], axis=-1) # \[N, M, 2F\]
features = tf.reshape(features, \[tf.shape(features)\[0\], -1\]) # \[N, M\*2F\]
return features
raise ValueError(f"Unknown basis: {basis}")
class ModelTF(tf.keras.Model):
"""
Full model (Photonic probs -> optional features -> MLP) in TF
"""
def \__init_\_(
self,
num_freqs=3,
basis="Fourier",
hidden_dims=(64, 64),
normalize_probs=True,
probability_fourier_degree=0,
use_photon_count_features=True,
num_modes=5,
cutoff_dim=5,
non_gaussian=None,
non_gaussian_gamma=0.1,
circuit_depth=1,
out_dim=2,
\*\*kwargs,
):
super().\__init_\_(\*\*kwargs)
if out_dim <= 0:
raise ValueError("out_dim must be a positive integer.")
if probability_fourier_degree is None:
probability_fourier_degree = 0
probability_fourier_degree = int(probability_fourier_degree)
if probability_fourier_degree < 0:
raise ValueError("probability_fourier_degree must be >= 0")
self.num_freqs = int(num_freqs)
self.basis = basis
self.out_dim = int(out_dim)
self.hidden_dims = \[int(h) for h in hidden_dims\]
self.normalize_probs = bool(normalize_probs)
self.num_modes = int(num_modes)
self.cutoff_dim = int(cutoff_dim)
self.non_gaussian = non_gaussian
self.non_gaussian_gamma = float(non_gaussian_gamma)
self.circuit_depth = int(circuit_depth)
self.probability_fourier_degree = probability_fourier_degree
self.use_photon_count_features = bool(use_photon_count_features)
self.photonic_layer = PhotonicGeneratorTF(
num_modes=self.num_modes,
cutoff_dim=self.cutoff_dim,
non_gaussian=self.non_gaussian,
non_gaussian_gamma=self.non_gaussian_gamma,
circuit_depth=self.circuit_depth,
output_type="probs",
interface="tf",
name="photonic_layer",
)
if self.use_photon_count_features:
feats = generate_photon_count_feature_tf(
cutoff=self.photonic_layer.cutoff_dim,
num_modes=self.photonic_layer.num_modes,
basis=self.basis,
num_freqs=self.num_freqs,
dtype=tf.float32,
)
self.features = tf.Variable(feats, trainable=False, name="features")
feat_dim = int(feats.shape\[1\])
else:
self.features = tf.Variable(tf.zeros(\[0, 0\], dtype=tf.float32), trainable=False, name="features")
feat_dim = 0
if self.probability_fourier_degree > 0:
prob_dim = 2 \* self.probability_fourier_degree
else:
prob_dim = 1
input_dim = prob_dim + feat_dim
\_ = input_dim # (kept for clarity)
self.mlp_layers = \[\]
for h in self.hidden_dims:
if h <= 0:
raise ValueError("All hidden_dims must be positive integers.")
self.mlp_layers.append(tf.keras.layers.Dense(h, activation="relu"))
self.mlp_layers.append(tf.keras.layers.Dense(self.out_dim, activation=None))
def call(self, inputs=None, truncated_set: int = None, training=False):
if truncated_set is None:
raise ValueError("truncated_set must be provided.")
T = int(truncated_set)
if T <= 0:
raise ValueError("truncated_set must be a positive integer.")
probs = self.photonic_layer(None, return_probs=True, training=training) # could be \[N\] or \[1,N\]
probs = tf.convert_to_tensor(probs)
probs = tf.reshape(probs, \[-1\]) # ------------------ MINIMAL FIX: flatten to 1D \[N\] ------------------
probs = tf.cast(probs, tf.float32) # cast once here
N = int(probs.shape\[0\]) # now correct in eager + typical traced cases
if T > N:
raise ValueError(f"truncated_set ({T}) cannot exceed total configurations ({N}).")
probs_truncated = tf.expand_dims(probs\[:T\], axis=1) # \[T,1\]
if self.normalize_probs:
prob_sum = tf.reduce_sum(probs_truncated)
eps = tf.constant(1e-12, dtype=probs_truncated.dtype)
probs_truncated = tf.cond(
prob_sum > eps,
lambda: probs_truncated / prob_sum,
lambda: tf.debugging.assert_greater(prob_sum, eps) or probs_truncated,
)
if self.probability_fourier_degree > 0:
F = self.probability_fourier_degree
freq_bands = tf.pow(tf.constant(2.0, dtype=tf.float32), tf.range(F, dtype=tf.float32)) # \[F\]
x_expanded = probs_truncated\[..., tf.newaxis\] \* freq_bands # \[T,1,F\]
two_pi = tf.constant(2.0 \* math.pi, dtype=tf.float32)
sin_feat = tf.sin(two_pi \* x_expanded)
cos_feat = tf.cos(two_pi \* x_expanded)
probs_feat = tf.concat(\[sin_feat, cos_feat\], axis=-1) # \[T,1,2F\]
probs_feat = tf.reshape(probs_feat, \[T, -1\]) # \[T,2F\]
else:
probs_feat = probs_truncated # \[T,1\]
if self.use_photon_count_features:
feats = tf.cast(self.features\[:T\], probs_feat.dtype)
feature_in = tf.concat(\[probs_feat, feats\], axis=1)
else:
feature_in = probs_feat
x = feature_in
for layer in self.mlp_layers:
x = layer(x, training=training)
return x
def make_synthetic_targets_tf(T, out_dim=2, seed=0, noise_std=0.01, dtype=tf.float32):
T = int(T)
out_dim = int(out_dim)
if T <= 0:
raise ValueError("T must be positive.")
if out_dim <= 0:
raise ValueError("out_dim must be positive.")
t = tf.linspace(tf.constant(0.0, dtype=dtype), tf.constant(1.0, dtype=dtype), T)
t = tf.reshape(t, \[T, 1\])
two_pi = tf.constant(2.0 \* math.pi, dtype=dtype)
four_pi = tf.constant(4.0 \* math.pi, dtype=dtype)
basis = tf.concat(
\[t, t\*\*2, tf.sin(two_pi \* t), tf.cos(two_pi \* t), tf.sin(four_pi \* t), tf.cos(four_pi \* t)\],
axis=1,
)
g = tf.random.Generator.from_seed(int(seed))
W = g.normal(shape=\[int(basis.shape\[1\]), out_dim\], dtype=dtype)
b = g.normal(shape=\[out_dim\], dtype=dtype)
y = tf.tanh(tf.matmul(basis, W) + b)
if noise_std and noise_std > 0:
y = y + tf.constant(float(noise_std), dtype=dtype) \* tf.random.normal(tf.shape(y), dtype=dtype)
return y
def load_gaussians_tf(file_path, dtype=tf.float32):
if not os.path.exists(file_path):
raise FileNotFoundError(f"{file_path} not found.")
if file_path.endswith(".npy"):
arr = np.load(file_path)
return tf.convert_to_tensor(arr, dtype=dtype)
raise ValueError("For TensorFlow version, please provide a .npy file (descriptors.npy).")
def train_demo_tf():
tf.keras.backend.set_floatx("float32")
\# ---- Torch-like device selection ----
device_str = "/GPU:0" if tf.config.list_logical_devices("GPU") else "/CPU:0"
print("Using device:", device_str)
with tf.device(device_str):
num_modes = 5
cutoff_dim = 5
N = cutoff_dim \*\* num_modes
truncated_set = 1000
if truncated_set > N:
print("The error is here")
print(f"Error: truncated_set ({truncated_set}) cannot exceed total configurations ({N}).")
raise ValueError(f"truncated_set ({truncated_set}) cannot exceed total configurations ({N}).")
y_true = make_synthetic_targets_tf(truncated_set, out_dim=2, seed=0, noise_std=0.01)
out_dim = int(y_true.shape\[-1\])
model = ModelTF(
use_photon_count_features=True,
num_freqs=3,
basis="Fourier",
hidden_dims=(64, 64, 64),
normalize_probs=True,
probability_fourier_degree=4,
num_modes=num_modes,
cutoff_dim=cutoff_dim,
non_gaussian=None,
non_gaussian_gamma=0.1,
circuit_depth=2,
out_dim=out_dim,
name="full_model_tf",
)
optimizer = tf.keras.optimizers.Adam(learning_rate=2e-2)
loss_fn = tf.keras.losses.MeanSquaredError()
steps = 200
for step in range(steps):
with tf.GradientTape() as tape:
y_pred = model(None, truncated_set=truncated_set, training=True)
loss = loss_fn(y_true, y_pred)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if (step + 1) % 20 == 0 or step == 0:
print(f"step {step+1:4d} | loss {float(loss.numpy()):.6f}")
y_pred = model(None, truncated_set=truncated_set, training=False)
mse = tf.reduce_mean(tf.square(y_pred - y_true))
print("\\nFinal MSE:", float(mse.numpy()))
if _name_ == “_main_”:
train_demo_tf()