Hello everyone,
I am working on a research project where I train 12 Classical Hybrid Quantum Neural Networks across 3 seeds (varying from 2 to 5 qubits and 2 to 4 layers). I am trying to log gradient norms for every layer (pre, quantum, post) to analyse training behaviour later. (gradient efficiency and expressibility)
My first code had a bug in the gradient calculation part (it used ~50% GPU) but after fixing the gradient calculation section now it mostly runs on CPU (using barely 20% GPU, that much is consumed at idle time).
I have attached the code below, I would really appreciate a quick reply on how to fix it as I the journal I aim to publish in has a deadline in 1-1.5 months.
Further improvements/enhancements are welcome !
My system has a ryzen 7 and a RTX 3060 laptop 6GB GPU.
The code is in parts as follow:
- data loader (I cannot provide the exact data as it is part of ongoing research but it has 19 features and 1000 samples.)
- imports
- Model defination + training pipeline (callable function as per set parameters)
- Method of call the function.
- output of qml.about()
- Data loader
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import TensorDataset, DataLoader, Subset
from sklearn.preprocessing import StandardScaler
# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Feature and target columns
feature_columns = [ ... ]
target_column = 'Bin' # 'Bin_value' is not used here; add if needed
def prepare_data(approach_3_df, sample_size, batch_size=64):
# Combine features and target
data = approach_3_df[feature_columns + [target_column]]
# Split into train, val, test
train_df, temp_df = train_test_split(data, test_size=0.4, random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)
# Scale only the features
scaler = StandardScaler()
train_df.loc[:, feature_columns] = scaler.fit_transform(train_df[feature_columns])
val_df.loc[:, feature_columns] = scaler.transform(val_df[feature_columns])
test_df.loc[:, feature_columns] = scaler.transform(test_df[feature_columns])
# Separate features and targets
train_features = train_df[feature_columns]
val_features = val_df[feature_columns]
test_features = test_df[feature_columns]
train_labels = torch.tensor(train_df[target_column].values, dtype=torch.float32, device=device).unsqueeze(1)
val_labels = torch.tensor(val_df[target_column].values, dtype=torch.float32, device=device).unsqueeze(1)
test_labels = torch.tensor(test_df[target_column].values, dtype=torch.float32, device=device).unsqueeze(1)
# Convert features to tensors
xtrain = torch.tensor(train_features.values, dtype=torch.float32, device=device)
xval = torch.tensor(val_features.values, dtype=torch.float32, device=device)
xtest = torch.tensor(test_features.values, dtype=torch.float32, device=device)
# Create TensorDatasets
train_dataset = TensorDataset(xtrain, train_labels)
val_dataset = TensorDataset(xval, val_labels)
test_dataset = TensorDataset(xtest, test_labels)
# Create DataLoader instances
gpu_loader_config = {
"pin_memory": False,
"num_workers": 0,
"persistent_workers": False,
}
# Dynamically limit datasets to at most 1000 samples
train_subset = Subset(train_dataset, range(min(sample_size, len(train_dataset))))
val_subset = Subset(val_dataset, range(min(sample_size, len(val_dataset))))
test_subset = Subset(test_dataset, range(min(sample_size, len(test_dataset))))
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, **gpu_loader_config)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, **gpu_loader_config)
test_loader = DataLoader(test_subset, batch_size=batch_size, shuffle=False, **gpu_loader_config)
return train_loader, val_loader, test_loader
- Imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
import pennylane as qml
import math
import numpy as np
import matplotlib.pyplot as plt
import time
import random
import os
import json
import csv
import collections
- Model defination + training pipeline
import os
import pennylane as qml
# Set the GPU you want to use
def run():
# =====================
# 2. GPU Configuration
# =====================
if USE_GPU and not torch.cuda.is_available():
raise RuntimeError("CUDA is not available. This script requires a GPU.")
device = torch.device("cuda" if USE_GPU else "cpu")
print(f"Using device: {device}")
print(f"PyTorch CUDA version: {torch.version.cuda}")
dev = qml.device("lightning.gpu", wires=NUM_QUBITS, batch_obs =True)
# =====================
# 3. Quantum Circuit
# =====================
# PennyLane GPU-enabled Lightning device
# Quantum circuit (single shot for one input only)
@qml.qnode(dev, interface="torch", diff_method='parameter-shift')
def quantum_circuit(inputs, weights):
for i in range(NUM_QUBITS):
qml.RY(inputs[i], wires=i)
for layer in range(NUM_LAYERS):
for i in range(NUM_QUBITS):
qml.RZ(weights[layer, i], wires=i)
qml.RX(weights[layer, i], wires=i)
for i in range(NUM_QUBITS):
qml.CNOT(wires=[i, (i + 1) % NUM_QUBITS])
return [qml.expval(qml.PauliZ(i)) for i in range(NUM_QUBITS)]
class QuantumLayer(nn.Module):
def _init_(self):
super()._init_()
self.weights = nn.Parameter(0.01 * torch.randn(NUM_LAYERS, NUM_QUBITS, device=device))
def forward(self, x):
# x shape: (batch_size, NUM_QUBITS)
batch_results = []
for i in range(x.shape[0]):
result = quantum_circuit(x[i], self.weights)
result_tensor = torch.stack(result) if isinstance(result, list) else result
batch_results.append(result_tensor)
return torch.stack(batch_results, dim=0) # shape: (batch_size, NUM_QUBITS)
# Pre-autoencoder
class PreAutoencoder(nn.Module):
def _init_(self):
super()._init_()
self.encoder = nn.Sequential(
nn.Linear(INPUT_DIM, HIDDEN_DIM),
nn.ReLU(),
nn.Linear(HIDDEN_DIM, NUM_QUBITS),
nn.Tanh(),
)
def forward(self, x):
x = x.to(device)
# Apply the encoder to the input data
encoded = self.encoder(x) # shape: (batch_size, NUM_QUBITS)
return encoded
# Post-autoencoder
class PostAutoencoder(nn.Module):
def _init_(self):
super()._init_()
self.decoder = nn.Sequential(
nn.Linear(NUM_QUBITS, POST_HIDDEN_DIM),
nn.ReLU(),
nn.Linear(POST_HIDDEN_DIM, FINAL_OUTPUT_DIM),
)
def forward(self, x):
x = x.to(device)
return self.decoder(x)
# Full hybrid model
class HybridQNN(nn.Module):
def _init_(self, pre_encoder, quantum_layer, post_decoder):
super()._init_()
self.pre_encoder = pre_encoder
self.quantum_layer = quantum_layer
self.post_decoder = post_decoder
def forward(self, x):
encoded = self.pre_encoder(x) # shape: (batch_size, NUM_QUBITS)
scaled = encoded * (math.pi / 2) # scale to [−π/2, π/2]
quantum_out = self.quantum_layer(scaled) # shape: (batch_size, NUM_QUBITS)
return self.post_decoder(quantum_out)
# Instantiate model components
pre_ae = PreAutoencoder().to(device)
quantum_layer = QuantumLayer().to(device)
post_ae = PostAutoencoder().to(device)
model = HybridQNN(pre_ae, quantum_layer, post_ae).to(device)
# ===========================
# 5. Training Loop Setup
# ===========================
train_loader, val_loader, test_loader = prepare_data(approach_3_df=DATA, sample_size=SAMPLE_SIZE, batch_size=BATCH_SIZE)
# =====================
# 6. Optimizer Setup
# =====================
optimizer = optim.AdamW([
{'params': model.pre_encoder.parameters(), 'lr': LR_PRE_ENCODER, 'weight_decay': WEIGHT_DECAY},
{'params': model.quantum_layer.parameters(), 'lr': LR_QUANTUM},
{'params': model.post_decoder.parameters(), 'lr': LR_POST_DECODER, 'weight_decay': WEIGHT_DECAY}
])
# =====================
# 7. Learning Rate Scheduler
# =====================
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
optimizer,
mode='min',
factor=LR_SCHEDULER_FACTOR,
patience=LR_SCHEDULER_PATIENCE,
verbose=True
)
# =====================
# 8. Mixed Precision Support
# =====================
scaler = torch.GradScaler(device=device)
# =====================
# 9. Loss Function
# =====================
criterion = nn.MSELoss()
# =====================
# 10. Training Loop (Enhanced)
# =====================
training_logs = []
best_val_loss = float('inf')
epochs_no_improve = 0
best_epoch = 0
train_losses = []
val_losses = []
learning_rates = []
gradient_norms_total = []
start_time = time.time()
for epoch in tqdm(range(NUM_EPOCHS), desc="Training Progress", unit="epoch"):
model.train()
train_loss = 0.0
epoch_grad_per_layer_sum = collections.OrderedDict()
num_batches = 0
for batch_idx, (inputs, labels) in enumerate(train_loader, 1):
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
inputs = inputs.to(device).to(torch.float32)
with torch.autocast(device_type=device.type, dtype=torch.float16, enabled=torch.cuda.is_available()):
outputs = model(inputs)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
batch_grad_norms = collections.OrderedDict()
for name, param in model.named_parameters():
if param.grad is not None:
norm_val = param.grad.norm().item()
#print(f"Gradient norm for {name}: {norm_val:.4f}")
batch_grad_norms[name] = norm_val
if name not in epoch_grad_per_layer_sum:
epoch_grad_per_layer_sum[name] = 0.0
epoch_grad_per_layer_sum[name] += norm_val
max_norm_value = 10.0
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm_value)
scaler.step(optimizer)
scaler.update()
train_loss += loss.item() * inputs.size(0)
num_batches += 1
train_loss /= len(train_loader.dataset)
epoch_grad_per_layer = {k: v / num_batches for k, v in epoch_grad_per_layer_sum.items()}
# Calculate total gradient norm for the epoch (L2 norm over all grads)
total_norm = 0.0
for param in model.parameters():
if param.grad is not None:
param_norm = param.grad.data.norm(2)
total_norm += param_norm.item() ** 2
total_norm = total_norm ** 0.5
gradient_norms_total.append(total_norm)
# Example to print quantum layer gradients
#print("\nQuantum layer weights gradients after backward:")
#print(quantum_layer.weights.grad)
# === Validation ===
model.eval()
val_loss = 0.0
with torch.no_grad():
for inputs, labels in val_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss += loss.item() * inputs.size(0)
val_loss /= len(val_loader.dataset)
scheduler.step(val_loss)
learning_rates.append(optimizer.param_groups[0]['lr'])
if val_loss < best_val_loss:
best_val_loss = val_loss
best_epoch = epoch + 1
torch.save(model.state_dict(), SAVE_MODEL_PATH)
epochs_no_improve = 0
else:
epochs_no_improve += 1
if epochs_no_improve >= EARLY_STOP_PATIENCE:
print(f"Early stopping triggered at epoch {epoch + 1}")
break
train_losses.append(train_loss)
val_losses.append(val_loss)
# Save logs for this epoch
training_logs.append({
"epoch": epoch + 1,
"train_loss": train_loss,
"val_loss": val_loss,
"gradient_norm_total": total_norm,
"gradient_norm_layerwise": epoch_grad_per_layer,
"learning_rate": optimizer.param_groups[0]["lr"]
})
training_time = (time.time() - start_time) / 60 # minutes
# ===========================
# Final Metrics Logging
# ===========================
final_train_loss = train_losses[-1]
final_val_loss = val_losses[-1]
generalization_gap = final_val_loss - final_train_loss
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
# ===========================
# Save Final Predictions
# ===========================
model.eval()
final_predictions = []
final_labels = []
with torch.no_grad():
for inputs, labels in test_loader:
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
final_predictions.append(outputs.cpu().numpy())
final_labels.append(labels.cpu().numpy())
# Convert to numpy arrays for easy saving
final_predictions = np.concatenate(final_predictions, axis=0)
final_labels = np.concatenate(final_labels, axis=0)
# Save the predictions and labels to CSV
os.makedirs('logs', exist_ok=True)
np.savetxt(f'{DATA_NAME}/{DATA_NAME}_{NUM_SAMPLES}/logs/{model_name}_predictions.csv', final_predictions, delimiter=',')
np.savetxt(f'{DATA_NAME}/{DATA_NAME}_{NUM_SAMPLES}/logs/{model_name}_labels.csv', final_labels, delimiter=',')
# ===========================
# Save Training Logs to JSON and CSV
# ===========================
os.makedirs("logs", exist_ok=True)
# Save training logs to JSON
with open(f"{DATA_NAME}/{DATA_NAME}_{NUM_SAMPLES}/logs/{model_name}_logs.json", "w") as f:
json.dump(training_logs, f, indent=4)
# Save summary to CSV
run_summary = {
"model_id": model_name,
"architecture": " ",
"qubits": NUM_QUBITS,
"depth": NUM_LAYERS,
"batch_size": BATCH_SIZE,
"sample_size": NUM_SAMPLES,
"random_seed": RANDOM_SEED,
"train_loss_final": final_train_loss,
"val_loss_final": final_val_loss,
"best_val_loss": best_val_loss,
"best_epoch": best_epoch,
"final_grad_norm": training_logs[-1]["gradient_norm_total"],
"generalization_gap": generalization_gap,
"training_time_minutes": training_time,
"num_params": num_params
}
csv_path = f"{DATA_NAME}/{DATA_NAME}_{NUM_SAMPLES}/logs/{model_name}_run_summaries.csv"
write_headers = not os.path.exists(csv_path)
with open(csv_path, "a", newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=run_summary.keys())
if write_headers:
writer.writeheader()
writer.writerow(run_summary)
- Calling the function
l = [42,52 ,62] # random seeds
qubit_range = [2,3,4,5]
layers_range = [2,3,4]
data_sample_size = [1000]
c = 0 # to track the model
# Dataset loop with (name, dataframe)
datasets = [("data_name", data)]
for sample_size in data_sample_size:
for num_qubit in qubit_range:
for layers in layers_range:
c += 1
for DATA_NAME, DATA in datasets:
for rs in l:
# General settings
SAMPLE_SIZE = sample_size
USE_GPU = True
BATCH_SIZE = 32
INPUT_DIM = 19
HIDDEN_DIM = 8
QUANTUM_OUTPUT_DIM = 2
POST_HIDDEN_DIM = 40
FINAL_OUTPUT_DIM = 1
NUM_SAMPLES = SAMPLE_SIZE
# Quantum settings
NUM_QUBITS = num_qubit
NUM_LAYERS = layers
RANDOM_SEED = rs
DIFF_METHOD = "adjoint"
# Reproducibility
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed_all(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(RANDOM_SEED)
# Model name
model_name = f"{DATA_NAME}{sample_size}{c}_{rs}"
# Print configuration
print(f"Running {model_name} | Dataset: {DATA_NAME} | "
f"Sample Size: {SAMPLE_SIZE} | Qubits: {NUM_QUBITS} | "
f"Layers: {NUM_LAYERS} | Seed: {RANDOM_SEED}")
# Training settings
NUM_EPOCHS = 50
EARLY_STOP_PATIENCE = 5
LR_PRE_ENCODER = 0.001
LR_QUANTUM = 0.0001
LR_POST_DECODER = 0.001
WEIGHT_DECAY = 0.01
LR_SCHEDULER_FACTOR = 0.5
LR_SCHEDULER_PATIENCE = 3
SAVE_MODEL_PATH = f"{model_name}.pth"
# Run training
run()
print(f"\nTotal runs: {c}")
Output of qml.about()
, I am using cuda 12.4 on a RTX 3060 laptop 6gb GPU.
Name: PennyLane
Version: 0.41.0
Summary: PennyLane is a cross-platform Python library for quantum computing, quantum machine learning, and quantum chemistry. Train a quantum computer the same way as a neural network.
Home-page: https://github.com/PennyLaneAI/pennylane
Author:
Author-email:
License: Apache License 2.0
Location: /home/tiwashri/miniconda3/envs/qgpu/lib/python3.10/site-packages
Requires: appdirs, autograd, autoray, cachetools, diastatic-malt, networkx, numpy, packaging, pennylane-lightning, requests, rustworkx, scipy, tomlkit, typing-extensions
Required-by: PennyLane_Lightning, PennyLane_Lightning_GPU
Platform info: Linux-6.11.0-26-generic-x86_64-with-glibc2.39
Python version: 3.10.17
Numpy version: 1.26.4
Scipy version: 1.15.2
Installed devices:
- lightning.qubit (PennyLane_Lightning-0.41.1)
- lightning.gpu (PennyLane_Lightning_GPU-0.41.1)
- default.clifford (PennyLane-0.41.0)
- default.gaussian (PennyLane-0.41.0)
- default.mixed (PennyLane-0.41.0)
- default.qubit (PennyLane-0.41.0)
- default.qutrit (PennyLane-0.41.0)
- default.qutrit.mixed (PennyLane-0.41.0)
- default.tensor (PennyLane-0.41.0)
- null.qubit (PennyLane-0.41.0)
- reference.qubit (PennyLane-0.41.0)
None