I am running this code an improved one given at the Demos Page.
I am getting the results as desired.
I want to do a inferencing for single data sample. I tried a lot its not working. Can someone help.
This code is given here
##### data in the csv format is already loaded into df variable.
sns.set()
seed = 0
rng = np.random.default_rng(seed=seed)
def convolutional_layer(weights, wires, skip_first_layer=True):
"""Adds a convolutional layer to a circuit.
Args:
weights (np.array): 1D array with 15 weights of the parametrized gates.
wires (list[int]): Wires where the convolutional layer acts on.
skip_first_layer (bool): Skips the first two U3 gates of a layer.
"""
n_wires = len(wires)
assert n_wires >= 3, "this circuit is too small!"
for p in [0, 1]:
for indx, w in enumerate(wires):
if indx % 2 == p and indx < n_wires - 1:
if indx % 2 == 0 and not skip_first_layer:
qml.U3(*weights[:3], wires=[w])
qml.U3(*weights[3:6], wires=[wires[indx + 1]])
qml.IsingXX(weights[6], wires=[w, wires[indx + 1]])
qml.IsingYY(weights[7], wires=[w, wires[indx + 1]])
qml.IsingZZ(weights[8], wires=[w, wires[indx + 1]])
qml.U3(*weights[9:12], wires=[w])
qml.U3(*weights[12:], wires=[wires[indx + 1]])
qml.IsingXX(weights[6], wires=[w, wires[indx + 1]])
qml.IsingYY(weights[7], wires=[w, wires[indx + 1]])
qml.IsingZZ(weights[8], wires=[w, wires[indx + 1]])
qml.U3(*weights[9:12], wires=[w])
qml.U3(*weights[12:], wires=[wires[indx + 1]])
def pooling_layer(weights, wires):
"""Adds a pooling layer to a circuit.
Args:
weights (np.array): Array with the weights of the conditional U3 gate.
wires (list[int]): List of wires to apply the pooling layer on.
"""
n_wires = len(wires)
assert len(wires) >= 2, "this circuit is too small!"
for indx, w in enumerate(wires):
if indx % 2 == 1 and indx < n_wires:
m_outcome = qml.measure(w)
qml.cond(m_outcome, qml.U3)(*weights, wires=wires[indx - 1])
def conv_and_pooling(kernel_weights, n_wires, skip_first_layer=True):
"""Apply both the convolutional and pooling layer."""
convolutional_layer(kernel_weights[:15], n_wires, skip_first_layer=skip_first_layer)
pooling_layer(kernel_weights[15:], n_wires)
def dense_layer(weights, wires):
"""Apply an arbitrary unitary gate to a specified set of wires."""
qml.ArbitraryUnitary(weights, wires)
num_wires = 6
device = qml.device("default.qubit", wires=num_wires)
@qml.qnode(device, interface="jax")
def conv_net(weights, last_layer_weights, features):
"""Define the QCNN circuit
Args:
weights (np.array): Parameters of the convolution and pool layers.
last_layer_weights (np.array): Parameters of the last dense layer.
features (np.array): Input data to be embedded using AmplitudEmbedding."""
layers = weights.shape[1]
wires = list(range(num_wires))
# inputs the state input_state
qml.AmplitudeEmbedding(features=features, wires=wires, pad_with=0.5)
qml.Barrier(wires=wires, only_visual=True)
# adds convolutional and pooling layers
for j in range(layers):
conv_and_pooling(weights[:, j], wires, skip_first_layer=(not j == 0))
wires = wires[::2]
qml.Barrier(wires=wires, only_visual=True)
assert last_layer_weights.size == 4 ** (len(wires)) - 1, (
"The size of the last layer weights vector is incorrect!"
f" \n Expected {4 ** (len(wires)) - 1}, Given {last_layer_weights.size}"
)
dense_layer(last_layer_weights, wires)
return qml.probs(wires=(0))
fig, ax = qml.draw_mpl(conv_net)(
np.random.rand(18, 2), np.random.rand(4 ** 2 - 1), np.random.rand(2 ** num_wires)
)
# Save the figure in high resolution and display it
output_path_base = '/content/drive/MyDrive/Colab Notebooks/6-QCNN-2'
# Save in different formats
#fig.savefig(f'{output_path_base}.jpg', format='jpg', dpi=800)
#fig.savefig(f'{output_path_base}.png', format='png', dpi=800)
#fig.savefig(f'{output_path_base}.eps', format='eps', dpi=800)
#fig.savefig(f'{output_path_base}.svg', format='svg', dpi=800)
plt.show()
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import shuffle
# Extract features and labels
features = df.iloc[:, :-1].values # All columns except the last one
labels = df.iloc[:, -1].values # Last column (classification: 0 or 1)
# Count occurrences of each class
unique, counts = np.unique(labels, return_counts=True)
class_distribution = dict(zip(unique, counts))
print("Class distribution:", class_distribution)
# Shuffle the data and labels
features, labels = shuffle(features, labels, random_state=42)
# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)
# Normalize the data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Print the shapes of the datasets
print("Training data shape:", X_train_scaled.shape)
print("Testing data shape:", X_test_scaled.shape)
print("Training labels shape:", y_train.shape)
print("Testing labels shape:", y_test.shape)
print("Training data type:",type(X_train_scaled))
print("Testing data type:",type(X_test_scaled))
print("Training label type:",type(y_train))
print("Testing label type:",type(y_test))
def load_digits_data(num_train, num_test, rng):
"""Return training and testing data of digits dataset."""
digits = datasets.load_digits()
features, labels = digits.data, digits.target
# only use first two classes
features = features[np.where((labels == 0) | (labels == 1))]
labels = labels[np.where((labels == 0) | (labels == 1))]
# normalize data
features = features / np.linalg.norm(features, axis=1).reshape((-1, 1))
# subsample train and test split
train_indices = rng.choice(len(labels), num_train, replace=False)
test_indices = rng.choice(
np.setdiff1d(range(len(labels)), train_indices), num_test, replace=False
)
x_train, y_train = features[train_indices], labels[train_indices]
x_test, y_test = features[test_indices], labels[test_indices]
return (
jnp.asarray(x_train),
jnp.asarray(y_train),
jnp.asarray(x_test),
jnp.asarray(y_test),
)
@jax.jit
def compute_out(weights, weights_last, features, labels):
"""Computes the output of the corresponding label in the qcnn"""
cost = lambda weights, weights_last, feature, label: conv_net(weights, weights_last, feature)[
label
]
return jax.vmap(cost, in_axes=(None, None, 0, 0), out_axes=0)(
weights, weights_last, features, labels
)
def compute_accuracy(weights, weights_last, features, labels):
"""Computes the accuracy over the provided features and labels"""
out = compute_out(weights, weights_last, features, labels)
return jnp.sum(out > 0.5) / len(out)
def compute_cost(weights, weights_last, features, labels):
"""Computes the cost over the provided features and labels"""
out = compute_out(weights, weights_last, features, labels)
return 1.0 - jnp.sum(out) / len(labels)
def init_weights():
"""Initializes random weights for the QCNN model."""
weights = pnp.random.normal(loc=0, scale=1, size=(18, 2), requires_grad=True)
weights_last = pnp.random.normal(loc=0, scale=1, size=4 ** 2 - 1, requires_grad=True)
return jnp.array(weights), jnp.array(weights_last)
value_and_grad = jax.jit(jax.value_and_grad(compute_cost, argnums=[0, 1]))
def train_qcnn(n_train, n_test, n_epochs):
"""
Args:
n_train (int): number of training examples
n_test (int): number of test examples
n_epochs (int): number of training epochs
desc (string): displayed string during optimization
Returns:
dict: n_train,
steps,
train_cost_epochs,
train_acc_epochs,
test_cost_epochs,
test_acc_epochs
"""
# load data
x_train, y_train, x_test, y_test = load_digits_data(n_train, n_test, rng)
# init weights and optimizer
weights, weights_last = init_weights()
# learning rate decay
cosine_decay_scheduler = optax.cosine_decay_schedule(0.1, decay_steps=n_epochs, alpha=0.95)
optimizer = optax.adam(learning_rate=cosine_decay_scheduler)
opt_state = optimizer.init((weights, weights_last))
# data containers
train_cost_epochs, test_cost_epochs, train_acc_epochs, test_acc_epochs = [], [], [], []
for step in range(n_epochs):
# Training step with (adam) optimizer
train_cost, grad_circuit = value_and_grad(weights, weights_last, x_train, y_train)
updates, opt_state = optimizer.update(grad_circuit, opt_state)
weights, weights_last = optax.apply_updates((weights, weights_last), updates)
train_cost_epochs.append(train_cost)
# compute accuracy on training data
train_acc = compute_accuracy(weights, weights_last, x_train, y_train)
train_acc_epochs.append(train_acc)
# compute accuracy and cost on testing data
test_out = compute_out(weights, weights_last, x_test, y_test)
test_acc = jnp.sum(test_out > 0.5) / len(test_out)
test_acc_epochs.append(test_acc)
test_cost = 1.0 - jnp.sum(test_out) / len(test_out)
test_cost_epochs.append(test_cost)
return dict(
n_train=[n_train] * n_epochs,
step=np.arange(1, n_epochs + 1, dtype=int),
train_cost=train_cost_epochs,
train_acc=train_acc_epochs,
test_cost=test_cost_epochs,
test_acc=test_acc_epochs,
), weights, weights_last
import time
# Record the start time
start_time = time.time()
n_test = 100
n_epochs = 100
n_reps = 100
def run_iterations(n_train):
results_df = pd.DataFrame(
columns=["train_acc", "train_cost", "test_acc", "test_cost", "step", "n_train"]
)
for _ in range(n_reps):
results, weights, weights_last = train_qcnn(n_train=n_train, n_test=n_test, n_epochs=n_epochs)
results_df = pd.concat(
[results_df, pd.DataFrame.from_dict(results)], axis=0, ignore_index=True
)
return results_df, weights, weights_last
# run training for multiple sizes
train_sizes = [2,160]
results_df, weights, weights_last= run_iterations(n_train=2)
for n_train in train_sizes[1:]:
new_results_df, new_weights, new_weights_last = run_iterations(n_train=n_train)
results_df = pd.concat([results_df, new_results_df], axis=0, ignore_index=True)
weights, weights_last = new_weights, new_weights_last # Update weights and weights_last
print('Running size =', n_train, f"Elapsed Time: {(time.time() - start_time)} seconds")