Setup:
Python: 3.7
pennylane: dev-0.5
tensorflow: 1.14 with CUDA: 10.0
GPU: GeForce GTX 950M
This is default_qubit_par.py
located in pennylane.plugins
. I have yet to implement variance, BasisState and sampling.
import warnings
import numpy as np
from scipy.linalg import eigh
from pennylane import Device
import tensorflow as tf
# tolerance for numerical errors
tolerance = 1e-10
# ========================================================
# utilities
# ========================================================
def tf_kron(a: tf.Tensor, b: tf.Tensor) -> tf.Tensor:
"""
Implementation of Kronecker product for tensorflow
Args:
a: tf.Tensor of size N x M
b: tf.Tensor of size P x K
Returns: tf.Tensor of size N * P x M * K
"""
a_shape = [a.shape[0].value, a.shape[1].value]
b_shape = [b.shape[0].value, b.shape[1].value]
return tf.reshape(tf.reshape(a, [a_shape[0], 1, a_shape[1], 1]) * tf.reshape(b, [1, b_shape[0], 1, b_shape[1]]),
[a_shape[0] * b_shape[0], a_shape[1] * b_shape[1]])
def spectral_decomposition(A):
r"""Spectral decomposition of a Hermitian matrix.
Args:
A (array): Hermitian matrix
Returns:
(vector[float], list[array[complex]]): (a, P): eigenvalues and hermitian projectors
such that :math:`A = \sum_k a_k P_k`.
"""
d, v = eigh(A)
P = []
for k in range(d.shape[0]):
temp = v[:, k]
P.append(np.outer(temp, temp.conj()))
return d, P
# ========================================================
# fixed gates
# ========================================================
I = tf.convert_to_tensor(np.eye(2), dtype=tf.complex128)
# Pauli matrices
X = tf.convert_to_tensor(np.array([[0, 1], [1, 0]]), dtype=tf.complex128) #: Pauli-X matrix
Y = tf.convert_to_tensor(np.array([[0, -1j], [1j, 0]]), dtype=tf.complex128) #: Pauli-Y matrix
Z = tf.convert_to_tensor(np.array([[1, 0], [0, -1]]), dtype=tf.complex128) #: Pauli-Z matrix
H = tf.convert_to_tensor(np.array([[1, 1], [1, -1]]) / np.sqrt(2), dtype=tf.complex128) #: Hadamard gate
# Two qubit gates
CNOT = tf.convert_to_tensor(np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 0, 1], [0, 0, 1, 0]]),
dtype=tf.complex128) #: CNOT gate
SWAP = tf.convert_to_tensor(np.array([[1, 0, 0, 0], [0, 0, 1, 0], [0, 1, 0, 0], [0, 0, 0, 1]]),
dtype=tf.complex128) #: SWAP gate
CZ = tf.convert_to_tensor(np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0], [0, 0, 0, -1]]),
dtype=tf.complex128) #: CZ gate
# ========================================================
# parametrized gates
# ========================================================
def Rphi(phi) -> tf.Tensor:
r"""One-qubit phase shift.
Args:
phi (float): phase shift angle
Returns:
array: unitary 2x2 phase shift matrix
"""
return tf.convert_to_tensor([[1, 0], [0, np.exp(1j * phi)]], dtype=tf.complex128)
def Rotx(theta) -> tf.Tensor:
r"""One-qubit rotation about the x axis.
Args:
theta (float): rotation angle
Returns:
array: unitary 2x2 rotation matrix :math:`e^{-i \sigma_x \theta/2}`
"""
return tf.math.cos(theta / 2) * I + 1j * tf.math.sin(-theta / 2) * X
def Roty(theta) -> tf.Tensor:
r"""One-qubit rotation about the y axis.
Args:
theta (float): rotation angle
Returns:
array: unitary 2x2 rotation matrix :math:`e^{-i \sigma_y \theta/2}`
"""
return tf.math.cos(theta / 2) * I + 1j * tf.math.sin(-theta / 2) * Y
def Rotz(theta) -> tf.Tensor:
r"""One-qubit rotation about the z axis.
Args:
theta (float): rotation angle
Returns:
array: unitary 2x2 rotation matrix :math:`e^{-i \sigma_z \theta/2}`
"""
return tf.math.cos(theta / 2) * I + 1j * tf.math.sin(-theta / 2) * Z
def Rot3(params) -> tf.Tensor:
r"""Arbitrary one-qubit rotation using three Euler angles.
Args:
a,b,c (float): rotation angles
Returns:
array: unitary 2x2 rotation matrix ``rz(c) @ ry(b) @ rz(a)``
"""
a = params[0]
b = params[1]
c = params[2]
return Rotz(c) @ (Roty(b) @ Rotz(a))
def CRotx(theta) -> tf.Tensor:
r"""Two-qubit controlled rotation about the x axis.
Args:
theta (float): rotation angle
Returns:
array: unitary 4x4 rotation matrix :math:`|0\rangle\langle 0|\otimes \mathbb{I}+|1\rangle\langle 1|\otimes R_x(\theta)`
"""
return tf_kron(tf.convert_to_tensor(np.array([[1, 0], [0, 0]]), dtype=tf.complex128), I) + \
tf_kron(tf.convert_to_tensor(np.array([[0, 0], [0, 1]]), dtype=tf.complex128), Rotx(theta))
def CRoty(theta) -> tf.Tensor:
r"""Two-qubit controlled rotation about the y axis.
Args:
theta (float): rotation angle
Returns:
array: unitary 4x4 rotation matrix :math:`|0\rangle\langle 0|\otimes \mathbb{I}+|1\rangle\langle 1|\otimes R_y(\theta)`
"""
return tf_kron(tf.convert_to_tensor(np.array([[1, 0], [0, 0]]), dtype=tf.complex128), I) + \
tf_kron(tf.convert_to_tensor(np.array([[0, 0], [0, 1]]), dtype=tf.complex128), Roty(theta))
def CRotz(theta) -> tf.Tensor:
r"""Two-qubit controlled rotation about the z axis.
Args:
theta (float): rotation angle
Returns:
array: unitary 4x4 rotation matrix :math:`|0\rangle\langle 0|\otimes \mathbb{I}+|1\rangle\langle 1|\otimes R_z(\theta)`
"""
return tf_kron(tf.convert_to_tensor(np.array([[1, 0], [0, 0]]), dtype=tf.complex128), I) + \
tf_kron(tf.convert_to_tensor(np.array([[0, 0], [0, 1]]), dtype=tf.complex128), Rotz(theta))
def CRot3(params) -> tf.Tensor:
r"""Arbitrary two-qubit controlled rotation using three Euler angles.
Args:
a,b,c (float): rotation angles
Returns:
array: unitary 4x4 rotation matrix :math:`|0\rangle\langle 0|\otimes \mathbb{I}+|1\rangle\langle 1|\otimes R(a,b,c)`
"""
return tf_kron(tf.convert_to_tensor(np.array([[1, 0], [0, 0]]), dtype=tf.complex128), I) + \
tf_kron(tf.convert_to_tensor(np.array([[0, 0], [0, 1]]), dtype=tf.complex128), Rot3(params))
# ========================================================
# Arbitrary states and operators
# ========================================================
def unitary(*args):
r"""Input validation for an arbitary unitary operation.
Args:
args (array): square unitary matrix
Returns:
array: square unitary matrix
"""
U = np.asarray(args[0])
if U.shape[0] != U.shape[1]:
raise ValueError("Operator must be a square matrix.")
if not np.allclose(U @ U.conj().T, np.identity(U.shape[0])):
raise ValueError("Operator must be unitary.")
return tf.convert_to_tensor(U, dtype=tf.complex128)
def hermitian(*args):
r"""Input validation for an arbitary Hermitian expectation.
Args:
args (array): square hermitian matrix
Returns:
array: square hermitian matrix
"""
A = np.asarray(args[0])
if A.shape[0] != A.shape[1]:
raise ValueError("Expectation must be a square matrix.")
if not np.allclose(A, A.conj().T):
raise ValueError("Expectation must be Hermitian.")
return tf.convert_to_tensor(A, dtype=tf.complex128)
# ========================================================
# device
# ========================================================
class DefaultQubitPar(Device):
"""Default qubit device for PennyLane.
Args:
wires (int): the number of modes to initialize the device in
shots (int): How many times the circuit should be evaluated (or sampled) to estimate
the expectation values. A value of 0 yields the exact result.
"""
name = 'Default qubit PennyLane plugin with tensorflow parallelization'
short_name = 'default.qubit_par'
pennylane_requires = '0.5'
version = '0.4.0'
author = 'Xanadu Inc.'
# Note: BasisState and QubitStateVector don't
# map to any particular function, as they modify
# the internal device state directly.
_operation_map = {
'BasisState': None,
'QubitStateVector': None,
'QubitUnitary': unitary,
'PauliX': X,
'PauliY': Y,
'PauliZ': Z,
'Hadamard': H,
'CNOT': CNOT,
'SWAP': SWAP,
'CZ': CZ,
'PhaseShift': Rphi,
'RX': Rotx,
'RY': Roty,
'RZ': Rotz,
'Rot': Rot3,
'CRX': CRotx,
'CRY': CRoty,
'CRZ': CRotz,
'CRot': CRot3
}
_observable_map = {
'PauliX': X,
'PauliY': Y,
'PauliZ': Z,
'Hadamard': H,
'Hermitian': hermitian,
'Identity': I
}
def __init__(self, wires, *, shots=0):
super().__init__(wires, shots)
self.eng = None
self._state = None
def pre_apply(self):
self.reset()
def apply(self, operation, wires, par):
assert isinstance(par, (
tf.Tensor, tf.Variable)), "Parameters must be TF Tensor or TF Variable, got {} instead".format(type(par))
par = tf.to_complex128(par)
if operation == 'QubitStateVector':
state = par
if len(state.shape) == 1 and state.shape[0] == 2 ** self.num_wires:
self._state = state
else:
raise ValueError('State vector must be of length 2**wires.')
if wires is not None and wires != [] and list(wires) != list(range(self.num_wires)):
raise ValueError(
"The default.qubit plugin can apply QubitStateVector only to all of the {} wires.".format(
self.num_wires))
return
if operation == 'BasisState':
raise NotImplementedError
# n = len(par[0])
# # get computational basis state number
# if n > self.num_wires or not (set(par[0]) == {0, 1} or set(par[0]) == {0} or set(par[0]) == {1}):
# raise ValueError(
# "BasisState parameter must be an array of 0 or 1 integers of length at most {}.".format(
# self.num_wires))
# if wires is not None and wires != [] and list(wires) != list(range(self.num_wires)):
# raise ValueError(
# "The default.qubit plugin can apply BasisState only to all of the {} wires.".format(self.num_wires))
#
# num = int(np.sum(np.array(par[0]) * 2 ** np.arange(n - 1, -1, -1)))
#
# self._state = np.zeros_like(self._state)
# self._state[num] = 1.
# self._state[num] = tf.convert_to_tensor(self._state[num], stype=tf.complex128)
# return
A = self._get_operator_matrix(operation, par)
self._state = self.mat_vec_product(A, self._state, wires)
def mat_vec_product(self, mat, vec, wires):
r"""Apply multiplication of a matrix to subsystems of the quantum state.
Args:
mat (array): matrix to multiply
vec (array): state vector to multiply
wires (Sequence[int]): target subsystems
Returns:
array: output vector after applying ``mat`` to input ``vec`` on specified subsystems
"""
# TODO: use multi-index vectors/matrices to represent states/gates internally
mat = tf.reshape(mat, [2] * len(wires) * 2)
vec = tf.reshape(vec, [2] * self.num_wires)
axes = (np.arange(len(wires), 2 * len(wires)), wires)
tdot = tf.tensordot(mat, vec, axes=axes)
# tensordot causes the axes given in `wires` to end up in the first positions
# of the resulting tensor. This corresponds to a (partial) transpose of
# the correct output state
# We'll need to invert this permutation to put the indices in the correct place
unused_idxs = [idx for idx in range(self.num_wires) if idx not in wires]
perm = wires + unused_idxs
inv_perm = np.argsort(perm) # argsort gives inverse permutation
state_multi_index = tf.transpose(tdot, inv_perm)
return tf.reshape(state_multi_index, [2 ** self.num_wires, ])
def expval(self, observable, wires, par):
if self.shots == 0:
# exact expectation value
A = self._get_operator_matrix(observable, par)
ev = self.ev(A, wires)
else:
# estimate the ev
ev = tf.reduce_mean(self.sample(observable, wires, par, self.shots))
return ev
def var(self, observable, wires, par):
return NotImplementedError
# if self.shots == 0:
# # exact expectation value
# A = self._get_operator_matrix(observable, par)
# var = self.ev(A @ A, wires) - self.ev(A, wires) ** 2
# else:
# # estimate the ev
# var = np.var(self.sample(observable, wires, par, self.shots))
#
# return var
def sample(self, observable, wires, par, n=None):
return NotImplementedError
# if n is None:
# n = self.shots
#
# if n == 0:
# raise ValueError("Calling sample with n = 0 is not possible.")
# if n < 0 or not isinstance(n, int):
# raise ValueError("The number of samples must be a positive integer.")
#
# A = self._get_operator_matrix(observable, par)
# a, P = spectral_decomposition(A)
#
# p = np.zeros(a.shape)
# for idx, Pi in enumerate(P):
# p[idx] = self.ev(Pi, wires)
#
# return np.random.choice(a, n, p=p)
def _get_operator_matrix(self, operation, par):
"""Get the operator matrix for a given operation or observable.
Args:
operation (str): name of the operation/observable
par (tuple[float]): parameter values
Returns:
array: matrix representation.
"""
A = {**self._operation_map, **self._observable_map}[operation]
if not callable(A):
return A
return A(par)
def ev(self, A, wires):
r"""Expectation value of observable on specified wires.
Args:
A (array[float]): the observable matrix as array
wires (Sequence[int]): target subsystems
Returns:
float: expectation value :math:`\expect{A} = \bra{\psi}A\ket{\psi}`
"""
As = self.mat_vec_product(A, self._state, wires)
# expectation = np.vdot(self._state, As)
expectation = tf.tensordot(tf.conj(self._state), As, axes=1)
# if tf.abs(tf.imag(expectation)) > tolerance:
# warnings.warn('Nonvanishing imaginary part {} in expectation value.'.format(expectation.imag),
# RuntimeWarning)
return tf.real(expectation)
def reset(self):
"""Reset the device"""
# init the state vector to |00..0>
self._state = np.zeros(2 ** self.num_wires, dtype=complex)
self._state[0] = 1
self._state = tf.convert_to_tensor(self._state, dtype=tf.complex128)
@property
def operations(self):
return set(self._operation_map.keys())
@property
def observables(self):
return set(self._observable_map.keys())
Example two qubit circuit that uses this Device:
import pennylane as qml
from pennylane import numpy as np
from pennylane.plugins.default_qubit_par import DefaultQubitPar
from pennylane.ops.qubit import QubitStateVector
import tensorflow as tf
# get the standard device and parallel device
dev = qml.device('default.qubit', wires=2)
par_dev = DefaultQubitPar(wires=2)
# simple test circuit
def circuit(var, state=None):
QubitStateVector(state, wires=[0,1])
qml.RX(var[0], wires=0)
qml.RY(var[1], wires=1)
qml.CNOT(wires=[0,1])
return qml.expval(qml.PauliZ(0))
# construct the circuit using the standard device QNode
circ = qml.QNode(circuit, device=dev)
circ.construct([(1, 2)], {'state': np.array([1,0,0,0])})
# graph inputs
inputs = tf.placeholder(tf.complex128, [None, 4], name='inputs')
def construct_graph_from_circuit(input_state):
"""
Pop operations from the queue and convert them into a TF graph using the default_dev_par class
Args:
input_state: data vector
Returns: expectation value of the quantum cicuit.
"""
# reset state
par_dev.pre_apply()
for op in circ.queue:
# encode data into qubit state vector
if op.name == 'QubitStateVector':
input_state /= tf.norm(input_state)
par_dev.apply(op.name, op.wires, input_state)
elif op.name == 'BasisState':
raise NotImplementedError
# gate parameters are TF vars
else:
par_dev.apply(op.name, op.wires, tf.Variable(initial_value=op.parameters, dtype=tf.float64))
# only works for single EV at the moment
measurement = circ.ev[0]
return par_dev.expval(measurement.name, measurement.wires, measurement.parameters)
# map function onto inputs in parallel.
outputs = tf.map_fn(construct_graph_from_circuit, inputs, parallel_iterations=120, dtype=tf.double)
# simple example loss function
loss = tf.abs(outputs)
# optimizer
opt = tf.train.GradientDescentOptimizer(learning_rate=0.01)
# Perform minimization with respect to the trainable variables.
train_step = opt.minimize(loss, var_list=[var for var in tf.trainable_variables()])
# Tensorflow session
sess = tf.Session(config=tf.ConfigProto(log_device_placement=False))
sess.run(tf.initialize_all_variables())
# random data
x = np.random.randn(10000, 4)
# use the standard PennyLane circuit so that we can compare
dev_circ = qml.QNode(circuit, device=dev)
# after one training step the parameters are updated so TF and PennyLane should not agree anymore
for i in range(2):
res, o, = sess.run([train_step, outputs, ], feed_dict={inputs: x,})
pennylane_out = np.array([dev_circ((1, 2), state=x[j] / np.linalg.norm(x[j])) for j in range(x.shape[0])])
print("At step {}, PennyLane and TF agree: {}".format(i, all(np.isclose(pennylane_out, o))))
I have checked a couple of things between the PennyLane and TF device:
- Output from some test circuits is the same (single and this double qubit example)
- Parameterized gate matrices are the same
- When running Tensorboard it shows that most of the computations are done on the GPU
Let me know what you think.