Is there a way to parallelize the same circuit for multiple input data?

Hi, I’m TMMeme.

I want to parallelize the circuit executions for local simulators.
I want to parallelize over data — ie different inputs to the same circuits.
What is currently the simplest way to achieve this? Parallel computations are planned to be performed at an accelerated rate by NVIDIA GPU.

I saw the doc qml.QNodeCollection. But this doc mainly introduces the way to run multiple different quantum circuits in parallel for a single input.

Thanks for reading so far

Hi @TM_MEME thanks for reaching out.

Currently, support for batching with lightning.gpu works natively for observable calculations using the adjoint differentiation pipeline, but not for data-parallelism over input parameters.

We have had good success using both Dask (e.g see dask-cuda) and Ray for this, with the latter having allowed us to perform data-parallelism over 128 GPUs as part of our circuit-cutting work.

Is there a specific use-case you are aiming to build here? As qml.QNodeCollection is a deprecated PennyLane feature, it may not be ideal for your purpose.

Thank you for kind comment, @mlxd
I want to parallelize the quantum convolution. Currently I am doing sequential quantum convolutional operations, which takes time.

#device
dev = qml.device("default.qubit", wires=10)

@qml.qnode(dev, interface='torch')
def qconv(inputs, weights): 
    N_phi = len(inputs)

    # Encoding of 'N_phi' classical input values
    for j in range(N_phi):
        qml.RY(np.pi * inputs[j], wires=j)

    # Random quantum circuit
    RandomLayers(weights, wires=range(N_phi))

    # Measurement producing 'N_phi' classical output values
    result = [qml.expval(qml.PauliZ(j)) for j in range(N_phi)]
    return result

#weight_shapes
N_layers = 1 
N_rand_params = 10 
weight_shapes = {"weights": (N_layers, N_rand_params)}

#TorchLayer
qlayer = qml.qnn.TorchLayer(qconv, weight_shapes)

#definition of Qonvlayer
class QonvLayer(nn.Module):
    def __init__(self):
        super(QonvLayer, self).__init__()
        self.qonv = qlayer

    def forward(self, data): 
        # Constants
        N = len(data[0][0]) 
        K = 10
        slide = 1
        N_base = 4

        # Output Size
        out_x = 3
        out_y = N_base * K 
        out_c = N-K+1 
        out = np.zeros((out_x, out_y, out_c))
        out_temp_temp = np.zeros((K*N_base, out_c))

        for x in range(out_x):
            batch_x = data[x:x+1][0][:]

            for y in range(N_base):

                out_temp = np.zeros((K, out_c))
                print("out_temp", out_temp, out_temp.shape)
                batch_xy = batch_x[y] 

                for c in range(0, out_c, slide):
                    batch_k = batch_xy[c:c+K]
                    q_results = self.qonv(batch_k) 
                    print("q_results", q_results)

                    for i in range(K): 
                        out_temp[i][c]= q_results[i] 
                    print("out_temp:" , out_temp) 
                
                
                for i in range(K): 
                    for j in range(out_c): 
                        out_temp_temp[i+y*K][j] = out_temp[i][j]

            for i in range(K*N_base):
                for j in range(out_c):
                    out[x][i][j]=out_temp_temp[i][j]
        
        return out    

data = torch.randn([10, 4, 20])
 

# forward
model = QonvLayer()
print(model)

y = model(data)
print(y)

print("done")

Hi @TM_MEME thank you for the input. Since this uses the Torch interface, it may be possible to explore Torch’s support for multi-GPU and distributed training (link). For a more complex case of GPUs being on multiple nodes, I’d suggest reading up on DistributedDataParallel (link) framework to gain data-parallelism over your inputs (in this case, replacing the input for loop with an offload of batches per GPU). If this matches your needs, I think the ideas should help get the batching done in parallel over different GPU resources.

Feel free to let us know if this doesn’t suit your needs, and we can offer some additional suggestions.

Sorry for the delay in replying.
I’m sorry but this may not fit my needs.
Is there any way to copy a quantum circuit to multiple GPUs in one node and compute the quantum circuit in parallel?

Hi @TM_MEME

Do you mean to split a given state-vector computation across multiple GPUs (i), or to run separate state-vector computations with the same circuit, but different inputs (ii)?

If (i) this is not something we currently support with PennyLane LightningPU, but are planning this for a later release. This will likely however not result in any speedup, but will allow running larger circuits that do not fit onto a single GPU.

If (ii) you can enable batch-mode for LightningGPU, which will allow a given circuit, using adjoint differentiation mode to calculate each part of the computation across multiple GPUs on a single node. This can be enabled by using @qml.qnode(dev, interface='torch', diff_method="adjoint") and enabling the batch mode as documented in the Parallel adjoint differentiation support section of the documentation.

Note that this is not currently supporting multi-node communication, not does it provide any advantage to circuits less than 20 qubits.

Hi,

I am struggling with same problem. I tried the following with dask,
image
image

Now, if we are parallelising the expectation values of 5 different inputs for same circuit as below,
image

But this is only until computing the expectation values in parallel. But machine learning requires computing the gradients of parameters for optimization. I want to speed up the simulations and got stuck with the gradients computations as follows,
Gradients : qml.grad(expval)(params)
ERROR :

  • Failed to deserialize

  • Cannot pickle ‘generator’ object

Can someone please help in this regard or show some directions on the alternatives.

Thanks a lot!!!

Hi @Abhishek,

I’m not being able to replicate your error.

I get a different error though because you’re trying to find the gradient of expval, however this isn’t a real scalar-output function so you run into an error.

From what I can tell your error seems more related to dask or your environment and not something related to PennyLane. If you look at the full error traceback then maybe you can see where the error is originating. If you share some more details here then maybe someone can help you.

Hi @CatalinaAlbornoz ,

Thanks for your help. Following is the code.

dev = qml.device('lightning.qubit', wires=1)
@qml.qnode(dev, diff_method='adjoint')
def circuit(x, y):
    qml.RX(x, wires=0)
    qml.RY(y, wires=0)
    return qml.expval(qml.PauliZ(wires=0))

In the circuit below, consider (RX) has many inputs which computations are parallelized and (RY) is a parameter.
image

def dask_cost(x, y):
    temp = []
    for i in range(5):
        val = dask.delayed(circuit)(x[i], y)
        temp.append(val)
    expvals = dask.compute(*temp)
    expvals = np.sum(np.array(expvals))
    return expvals

def normal_cost(x, y):
    temp = []
    for i in range(5):
        val = circuit(x[i], y)
        temp.append(val)
    expvals = np.sum(np.array(temp))
    return expvals
x = np.array(np.random.random(5))
y = 2.0
dask_cost(x, y)
>>> tensor(-1.89933823, requires_grad=True)
normal_cost(x, y)
>>> tensor(-1.89933823, requires_grad=True)

Gradient of cost function with respect to parameters are needed in machine learning task to update the parameters.

qml.grad(normal_cost, argnum=1)(x, y)
>>> -4.1501297391067

qml.grad(dask_cost, argnum=1)(x, y)
Error :

  1. Could not serialize object of type ArrayBox.
  2. Failed to deserialize,
  3. Cannot pickle ‘generator’ object

I guess that this problem needs handling objects of type ArrayBox with serialization and deserializations of operations from dask. If anyone knows how to do that, please share your views here.

Also does this usage of parallel computing help to speed up the simulations ? Please share your views

Thanks !!!

Hi @Abhishek,

Thank you for sharing your code. I don’t get any errors when I run it.

I suspect that you may have an old version of Dask, PennyLane or Python.

Usually having the latest Python and PennyLane versions is best. Using Python 3.8-3.11 and PennyLane 0.29.1 should work for you. I would recommend that you follow these steps. Note that where it says “name_of_your_environment" you can choose any name that you want:

Create a new conda environment with:conda create --name name_of_your_environment python=3.10
Activate the environment:conda activate name_of_your_environment

After this you can install the needed packages:python -m pip install pennylane dask

Please let me know if this works for you! If this doesn’t solve your issue please post the output of qml.about() and your full error traceback.

Hi @CatalinaAlbornoz , I followed your steps and got the same pickle error. And i traced the error back to autograd library.

qml.about()
Name: PennyLane
Version: 0.29.1
Summary: PennyLane is a Python quantum machine learning library by Xanadu Inc.
Home-page: https://github.com/XanaduAI/pennylane
Author: 
Author-email: 
License: Apache License 2.0
Location: C:\Users\xxxx\Anaconda3\envs\dummy\Lib\site-packages
Requires: appdirs, autograd, autoray, cachetools, networkx, numpy, pennylane-lightning, requests, retworkx, scipy, semantic-version, toml
Required-by: PennyLane-Lightning

Platform info:           Windows-10-10.0.19045-SP0
Python version:          3.11.0
Numpy version:           1.23.5
Scipy version:           1.10.1
Installed devices:
- default.gaussian (PennyLane-0.29.1)
- default.mixed (PennyLane-0.29.1)
- default.qubit (PennyLane-0.29.1)
- default.qubit.autograd (PennyLane-0.29.1)
- default.qubit.jax (PennyLane-0.29.1)
- default.qubit.tf (PennyLane-0.29.1)
- default.qubit.torch (PennyLane-0.29.1)
- default.qutrit (PennyLane-0.29.1)
- null.qubit (PennyLane-0.29.1)
- lightning.qubit (PennyLane-Lightning-0.29.0)

I am wondering how you are getting output without error. Can you please post your code and the output. I would really like to see it.

Hi @Abhishek,

I’m using your exact same code. I’m wondering if maybe you’re importing numpy properly. Remember that if you’re using the autograd interface you need to import numpy from PennyLane:

import pennylane as qml
from pennylane import numpy as np

There may also be an issue with the Python and/or numpy versions that you’re using, although this is less likely. I used Python 3.9 and numpy 1.22.

If the numpy import isn’t the issue then I would suggest that you follow the steps I shared again but with Python 3.10 (just in case there’s some strange unknown behaviour with Python 3.11) and please copy and paste here your full error traceback here.

Hi @CatalinaAlbornoz ,

I use only pennylane numpy (from pennylane import numpy as np) everytime like you showed. When I change the device from lightning.qubit to default.qubit and diff_method from adjoint to best, surprisingly it is giving some output although different ones.

qml.grad(normal_cost, argnum=1)(x, y)
>>-4.315619113490985
qml.grad(dask_cost, argnum=1)(x, y)
>>-0.8970631311618367

However, I followed your suggestion with python 3.10 version. And I have not changed numpy version as pennylane installs its own numpy. The error occured to me when i use lightning qubit and adjoint diff method.
Following is the qml.about()

Name: PennyLane
Version: 0.29.1
Summary: PennyLane is a Python quantum machine learning library by Xanadu Inc.
Home-page: https://github.com/XanaduAI/pennylane
Author: 
Author-email: 
License: Apache License 2.0
Location: c:\users\setty\anaconda3\envs\temp\lib\site-packages
Requires: appdirs, autograd, autoray, cachetools, networkx, numpy, pennylane-lightning, requests, retworkx, scipy, semantic-version, toml
Required-by: PennyLane-Lightning

Platform info:           Windows-10-10.0.19042-SP0
Python version:          3.10.9
Numpy version:           1.23.5
Scipy version:           1.10.1
Installed devices:
- default.gaussian (PennyLane-0.29.1)
- default.mixed (PennyLane-0.29.1)
- default.qubit (PennyLane-0.29.1)
- default.qubit.autograd (PennyLane-0.29.1)
- default.qubit.jax (PennyLane-0.29.1)
- default.qubit.tf (PennyLane-0.29.1)
- default.qubit.torch (PennyLane-0.29.1)
- default.qutrit (PennyLane-0.29.1)
- null.qubit (PennyLane-0.29.1)
- lightning.qubit (PennyLane-Lightning-0.29.0)

Error traceback,
qml.grad(dask_cost, argnum=1)(x, y)

2023-03-21 11:32:27,376 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\core.py", line 158, in loads
    return msgpack.loads(
  File "msgpack\_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\core.py", line 138, in _decode_default
    return merge_and_deserialize(
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\serialize.py", line 497, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\serialize.py", line 426, in deserialize
    return loads(header, frames)
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\serialize.py", line 180, in serialization_error_loads
    raise TypeError(msg)
TypeError: Could not serialize object of type ArrayBox
Traceback (most recent call last):
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object 'VJPNode.initialize_root.<locals>.<lambda>'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\pickle.py", line 68, in dumps
    pickler.dump(x)
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\pickle.py", line 24, in reducer_override
    if _always_use_pickle_for(obj):
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\pickle.py", line 45, in _always_use_pickle_for
    return isinstance(x, (str, bytes))
RecursionError: maximum recursion depth exceeded in __instancecheck__

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\serialize.py", line 347, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\serialize.py", line 71, in pickle_dumps
    frames[0] = pickle.dumps(
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'generator' object

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[47], line 1
----> 1 qml.grad(dask_cost, argnum=1)(x, y)

File ~\Anaconda3\envs\temp\lib\site-packages\pennylane\_grad.py:115, in grad.__call__(self, *args, **kwargs)
    112     self._forward = self._fun(*args, **kwargs)
    113     return ()
--> 115 grad_value, ans = grad_fn(*args, **kwargs)
    116 self._forward = ans
    118 return grad_value

File ~\Anaconda3\envs\temp\lib\site-packages\autograd\wrap_util.py:20, in unary_to_nary.<locals>.nary_operator.<locals>.nary_f(*args, **kwargs)
     18 else:
     19     x = tuple(args[i] for i in argnum)
---> 20 return unary_operator(unary_f, x, *nary_op_args, **nary_op_kwargs)

File ~\Anaconda3\envs\temp\lib\site-packages\pennylane\_grad.py:133, in grad._grad_with_forward(fun, x)
    127 @staticmethod
    128 @unary_to_nary
    129 def _grad_with_forward(fun, x):
    130     """This function is a replica of ``autograd.grad``, with the only
    131     difference being that it returns both the gradient *and* the forward pass
    132     value."""
--> 133     vjp, ans = _make_vjp(fun, x)
    135     if not vspace(ans).size == 1:
    136         raise TypeError(
    137             "Grad only applies to real scalar-output functions. "
    138             "Try jacobian, elementwise_grad or holomorphic_grad."
    139         )

File ~\Anaconda3\envs\temp\lib\site-packages\autograd\core.py:10, in make_vjp(fun, x)
      8 def make_vjp(fun, x):
      9     start_node = VJPNode.new_root()
---> 10     end_value, end_node =  trace(start_node, fun, x)
     11     if end_node is None:
     12         def vjp(g): return vspace(x).zeros()

File ~\Anaconda3\envs\temp\lib\site-packages\autograd\tracer.py:10, in trace(start_node, fun, x)
      8 with trace_stack.new_trace() as t:
      9     start_box = new_box(x, t, start_node)
---> 10     end_box = fun(start_box)
     11     if isbox(end_box) and end_box._trace == start_box._trace:
     12         return end_box._value, end_box._node

File ~\Anaconda3\envs\temp\lib\site-packages\autograd\wrap_util.py:15, in unary_to_nary.<locals>.nary_operator.<locals>.nary_f.<locals>.unary_f(x)
     13 else:
     14     subargs = subvals(args, zip(argnum, x))
---> 15 return fun(*subargs, **kwargs)

Cell In[43], line 6, in dask_cost(x, y)
      4     val = dask.delayed(circuit)(x[i], y)
      5     temp.append(val)
----> 6 expvals = dask.compute(*temp)
      7 expvals = np.sum(np.array(expvals))
      8 return expvals

File ~\Anaconda3\envs\temp\lib\site-packages\dask\base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    596     keys.append(x.__dask_keys__())
    597     postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
    600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\client.py:3168, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3166         should_rejoin = False
   3167 try:
-> 3168     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3169 finally:
   3170     for f in futures.values():

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\client.py:2328, in Client.gather(self, futures, errors, direct, asynchronous)
   2326 else:
   2327     local_worker = None
-> 2328 return self.sync(
   2329     self._gather,
   2330     futures,
   2331     errors=errors,
   2332     direct=direct,
   2333     local_worker=local_worker,
   2334     asynchronous=asynchronous,
   2335 )

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\utils.py:345, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    343     return future
    344 else:
--> 345     return sync(
    346         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    347     )

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\utils.py:412, in sync(loop, func, callback_timeout, *args, **kwargs)
    410 if error:
    411     typ, exc, tb = error
--> 412     raise exc.with_traceback(tb)
    413 else:
    414     return result

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\utils.py:385, in sync.<locals>.f()
    383         future = wait_for(future, callback_timeout)
    384     future = asyncio.ensure_future(future)
--> 385     result = yield future
    386 except Exception:
    387     error = sys.exc_info()

File ~\Anaconda3\envs\temp\lib\site-packages\tornado\gen.py:769, in Runner.run(self)
    766 exc_info = None
    768 try:
--> 769     value = future.result()
    770 except Exception:
    771     exc_info = sys.exc_info()

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\client.py:2220, in Client._gather(self, futures, errors, direct, local_worker)
   2218     else:
   2219         self._gather_future = future
-> 2220     response = await future
   2222 if response["status"] == "error":
   2223     log = logger.warning if errors == "raise" else logger.debug

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\client.py:2271, in Client._gather_remote(self, direct, local_worker)
   2268                 response["data"].update(data2)
   2270     else:  # ask scheduler to gather data for us
-> 2271         response = await retry_operation(self.scheduler.gather, keys=keys)
   2273 return response

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\utils_comm.py:434, in retry_operation(coro, operation, *args, **kwargs)
    428 retry_delay_min = parse_timedelta(
    429     dask.config.get("distributed.comm.retry.delay.min"), default="s"
    430 )
    431 retry_delay_max = parse_timedelta(
    432     dask.config.get("distributed.comm.retry.delay.max"), default="s"
    433 )
--> 434 return await retry(
    435     partial(coro, *args, **kwargs),
    436     count=retry_count,
    437     delay_min=retry_delay_min,
    438     delay_max=retry_delay_max,
    439     operation=operation,
    440 )

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\utils_comm.py:413, in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
    411             delay *= 1 + random.random() * jitter_fraction
    412         await asyncio.sleep(delay)
--> 413 return await coro()

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\core.py:1234, in PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc(**kwargs)
   1232 prev_name, comm.name = comm.name, "ConnectionPool." + key
   1233 try:
-> 1234     return await send_recv(comm=comm, op=key, **kwargs)
   1235 finally:
   1236     self.pool.reuse(self.addr, comm)

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\core.py:993, in send_recv(comm, reply, serializers, deserializers, **kwargs)
    991 await comm.write(msg, serializers=serializers, on_error="raise")
    992 if reply:
--> 993     response = await comm.read(deserializers=deserializers)
    994 else:
    995     response = None

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\comm\tcp.py:254, in TCP.read(self, deserializers)
    251 try:
    252     frames = unpack_frames(frames)
--> 254     msg = await from_frames(
    255         frames,
    256         deserialize=self.deserialize,
    257         deserializers=deserializers,
    258         allow_offload=self.allow_offload,
    259     )
    260 except EOFError:
    261     # Frames possibly garbled or truncated by communication error
    262     self.abort()

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\comm\utils.py:100, in from_frames(frames, deserialize, deserializers, allow_offload)
     98     res = await offload(_from_frames)
     99 else:
--> 100     res = _from_frames()
    102 return res

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\comm\utils.py:83, in from_frames.<locals>._from_frames()
     81 def _from_frames():
     82     try:
---> 83         return protocol.loads(
     84             frames, deserialize=deserialize, deserializers=deserializers
     85         )
     86     except EOFError:
     87         if size > 1000:

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\core.py:158, in loads(frames, deserialize, deserializers)
    152                 raise ValueError(
    153                     "Unpickle on the Scheduler isn't allowed, set `distributed.scheduler.pickle=true`"
    154                 )
    156         return msgpack_decode_default(obj)
--> 158     return msgpack.loads(
    159         frames[0], object_hook=_decode_default, use_list=False, **msgpack_opts
    160     )
    162 except Exception:
    163     logger.critical("Failed to deserialize", exc_info=True)

File msgpack\_unpacker.pyx:194, in msgpack._cmsgpack.unpackb()

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\core.py:138, in loads.<locals>._decode_default(obj)
    136     if "compression" in sub_header:
    137         sub_frames = decompress(sub_header, sub_frames)
--> 138     return merge_and_deserialize(
    139         sub_header, sub_frames, deserializers=deserializers
    140     )
    141 else:
    142     return Serialized(sub_header, sub_frames)

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\serialize.py:497, in merge_and_deserialize(header, frames, deserializers)
    493             merged = bytearray().join(subframes)
    495         merged_frames.append(merged)
--> 497 return deserialize(header, merged_frames, deserializers=deserializers)

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\serialize.py:426, in deserialize(header, frames, deserializers)
    421     raise TypeError(
    422         "Data serialized with %s but only able to deserialize "
    423         "data with %s" % (name, str(list(deserializers)))
    424     )
    425 dumps, loads, wants_context = families[name]
--> 426 return loads(header, frames)

File ~\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\serialize.py:180, in serialization_error_loads(header, frames)
    178 def serialization_error_loads(header, frames):
    179     msg = "\n".join([codecs.decode(frame, "utf8") for frame in frames])
--> 180     raise TypeError(msg)

TypeError: Could not serialize object of type ArrayBox
Traceback (most recent call last):
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object 'VJPNode.initialize_root.<locals>.<lambda>'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\pickle.py", line 68, in dumps
    pickler.dump(x)
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\pickle.py", line 24, in reducer_override
    if _always_use_pickle_for(obj):
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\pickle.py", line 45, in _always_use_pickle_for
    return isinstance(x, (str, bytes))
RecursionError: maximum recursion depth exceeded in __instancecheck__

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\serialize.py", line 347, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\serialize.py", line 71, in pickle_dumps
    frames[0] = pickle.dumps(
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\distributed\protocol\pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "C:\Users\setty\Anaconda3\envs\temp\lib\site-packages\cloudpickle\cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'generator' object

I think you are right. If its working for you then it should work somehow, I am just missing something here.
Thanks for the help :smile:

Hi @Abhishek,

Thank you for sharing your full error traceback. From what I can tell there’s probably a mismatch in your Python versions between your Dask client and the workers. (Eg, see the troubleshooting guide here.)

I recommend that you create a new environment and make sure that everything uses Python 3.8 or greater.

You can also try using Google Colab, where you will be able to run your code with no issues.

Please let me know if this solves your issue!

Hi @CatalinaAlbornoz ,

Thank you for sharing the trouble shooting link.

Versions for client and the 4 workers which were created seems to be matching as follows,

client.get_version()

{'scheduler': {'host': {'python': '3.11.0.final.0',
   'python-bits': 64,
   'OS': 'Windows',
   'OS-release': '10',
   'machine': 'AMD64',
   'processor': 'Intel64 Family 6 Model 165 Stepping 5, GenuineIntel',
   'byteorder': 'little',
   'LC_ALL': 'None',
   'LANG': 'None'},
  'packages': {'python': '3.11.0.final.0',
   'dask': '2023.3.1',
   'distributed': '2023.3.1',
   'msgpack': '1.0.5',
   'cloudpickle': '2.2.1',
   'tornado': '6.2',
   'toolz': '0.12.0',
   'numpy': '1.23.5',
   'pandas': '1.5.3',
   'lz4': None}},
 'workers': {'tcp://127.0.0.1:60486': {'host': {'python': '3.11.0.final.0',
    'python-bits': 64,
    'OS': 'Windows',
    'OS-release': '10',
    'machine': 'AMD64',
    'processor': 'Intel64 Family 6 Model 165 Stepping 5, GenuineIntel',
    'byteorder': 'little',
    'LC_ALL': 'None',
    'LANG': 'None'},
   'packages': {'python': '3.11.0.final.0',
    'dask': '2023.3.1',
    'distributed': '2023.3.1',
    'msgpack': '1.0.5',
    'cloudpickle': '2.2.1',
    'tornado': '6.2',
    'toolz': '0.12.0',
    'numpy': '1.23.5',
    'pandas': '1.5.3',
    'lz4': None}},
  'tcp://127.0.0.1:60493': {'host': {'python': '3.11.0.final.0',
    'python-bits': 64,
    'OS': 'Windows',
    'OS-release': '10',
    'machine': 'AMD64',
    'processor': 'Intel64 Family 6 Model 165 Stepping 5, GenuineIntel',
    'byteorder': 'little',
    'LC_ALL': 'None',
    'LANG': 'None'},
   'packages': {'python': '3.11.0.final.0',
    'dask': '2023.3.1',
    'distributed': '2023.3.1',
    'msgpack': '1.0.5',
    'cloudpickle': '2.2.1',
    'tornado': '6.2',
    'toolz': '0.12.0',
    'numpy': '1.23.5',
    'pandas': '1.5.3',
    'lz4': None}},
  'tcp://127.0.0.1:60496': {'host': {'python': '3.11.0.final.0',
    'python-bits': 64,
    'OS': 'Windows',
    'OS-release': '10',
    'machine': 'AMD64',
    'processor': 'Intel64 Family 6 Model 165 Stepping 5, GenuineIntel',
    'byteorder': 'little',
    'LC_ALL': 'None',
    'LANG': 'None'},
   'packages': {'python': '3.11.0.final.0',
    'dask': '2023.3.1',
    'distributed': '2023.3.1',
    'msgpack': '1.0.5',
    'cloudpickle': '2.2.1',
    'tornado': '6.2',
    'toolz': '0.12.0',
    'numpy': '1.23.5',
    'pandas': '1.5.3',
    'lz4': None}},
  'tcp://127.0.0.1:60499': {'host': {'python': '3.11.0.final.0',
    'python-bits': 64,
    'OS': 'Windows',
    'OS-release': '10',
    'machine': 'AMD64',
    'processor': 'Intel64 Family 6 Model 165 Stepping 5, GenuineIntel',
    'byteorder': 'little',
    'LC_ALL': 'None',
    'LANG': 'None'},
   'packages': {'python': '3.11.0.final.0',
    'dask': '2023.3.1',
    'distributed': '2023.3.1',
    'msgpack': '1.0.5',
    'cloudpickle': '2.2.1',
    'tornado': '6.2',
    'toolz': '0.12.0',
    'numpy': '1.23.5',
    'pandas': '1.5.3',
    'lz4': None}}},
 'client': {'host': {'python': '3.11.0.final.0',
   'python-bits': 64,
   'OS': 'Windows',
   'OS-release': '10',
   'machine': 'AMD64',
   'processor': 'Intel64 Family 6 Model 165 Stepping 5, GenuineIntel',
   'byteorder': 'little',
   'LC_ALL': 'None',
   'LANG': 'None'},
  'packages': {'python': '3.11.0.final.0',
   'dask': '2023.3.1',
   'distributed': '2023.3.1',
   'msgpack': '1.0.5',
   'cloudpickle': '2.2.1',
   'tornado': '6.2',
   'toolz': '0.12.0',
   'numpy': '1.23.5',
   'pandas': '1.5.3',
   'lz4': None}}}

I created new environment following your steps in the previous posts and posted the above error tracebacks and qml.about() etc. In each new environment, python versions were above 3.8. I am not able to understand the version problem.

Also, now I tried with google colab and saw the same error traceback. Google colab has its own python version of 3.9. I have not changed anything. If its working for you, can you please share your solution code with results. That would be great so that I can use the same one to bypass the errors.

Thanks

Hi @CatalinaAlbornoz ,

Sorry, I found my mistake. I missed to add the line in the above code.

import pennylane as qml
import matplotlib.pyplot as plt
from pennylane import numpy as np
import dask
from dask.distributed import Client, progress
client = Client(threads_per_worker=1, n_workers=4)
client

I tried without client and got the output without error. But, without the Client initialization, I think the parallelization in computation is not actually happening. So running dask was just like normal computation, and there is no error. Can you please add these lines into your code in the beginning and rerun the code. Also while running the dask lines, you can check the status of parallelism in the link generated by client. Please let me know if this shows up the error.
Thanks…

Hi @Abhishek!

Thank you for sharing these additional details. I can now reproduce your problem.

It seems that you will need to import pickle and use it within your code.

You might need to create a dask.distributed client, with pickle serializers, and use processes instead of threads.

You might also need to override the device’s native batch_execute function by creating a batch_execute method that uses pickle, and then replace the device’s batch_execute method with the one you create.

Once you have this you can use PennyLane as usual.

Me or my colleague @mlxd might be able to provide a code example in the next few days.

Hi @Abhishek

It is be likely that, depending on your intentions, Dask (or Ray), may not be straightforward — or it could be a single line change. Handling the serialization of internal datastructures with tracked gradients can be problematic, so as @CatalinaAlbornoz suggested, we often need to override in-built behaviours of Dask or Pennylane, depending on the example workload. In this case, it may be easier (and computationally cheaper) to use the in-built batching functionality of default.qubit over input parameters to circuits, as this will be immediately compatible with ML interfaces, potentially allowing you to avoid using a distributed execution framework altogether.

Also, if you are using 1 qubit per simulation, I’d suggest stick with default.qubit overall, as lightning.qubit will have some initial overheads, and does not support such batching workloads. As another note, the adjoint differentiation pipeline relies on parallelisation in C++ to handle gradients over multiple observables, but will not benefit directly from distribution, unless the task is packaged appropriately.

Here is an example of using Dask to handle parameter-shift gradients, where a new circuit is created for each shifted parameter, and immediately can take advantage of distribution using Dask. We redefine the batch_execute method, as mentioned by @CatalinaAlbornoz , change the serializer functionality to ensure the gradients are tracked correctly, and farm the work out to Dask workers. Hopefully this helps you with your workload:

import pennylane as qml
import dask
from dask.distributed.protocol import serialize, deserialize
from dask.distributed import Client
import pickle

def batch_execute_dask(circuits):
    # Override the device's native batch_execute function
    def run_circ(circ):
        "Create remote device and run the circuit"
        c = pickle.loads(circ)
        dev = qml.device("lightning.qubit", wires=wires)
        return qml.execute([c], dev, gradient_fn=None)

    results = []
    for circuit in circuits:
        print(f"Submitting circuit: {circuit}")
        results.append(client.submit(run_circ, pickle.dumps(circuit)))

    return client.gather(results)

if __name__== "__main__":

    # create dask.distributed client, with pickle serializers, and use processes instead of threads
    client = Client(serializers=['pickle'], deserializers=['pickle'], processes=True)

    wires=11
    dev = qml.device("lightning.qubit", wires=wires)

    params = qml.numpy.random.rand(wires)

    # replace the batch_execute method with the above
    dev.batch_execute = batch_execute_dask

    # The rest as normal
    @qml.qnode(dev, diff_method="parameter-shift")
    def circuit(x):
        for i in range(wires):
            qml.RX(x[i], i)
        for i in range(1, wires):
            qml.CNOT(wires=[i-1,i])
        return [qml.expval(qml.PauliZ(i)) for i in range(wires)]

    print(qml.jacobian(circuit)(params))
2 Likes

Hi @mlxd and @CatalinaAlbornoz ,

Thanks a lot for your effort to solve this problem. This idea was to see if there is a speed up in the simulation time. Above example with single qubit was to just make the problem simple to understand. However, the real use cases are much larger and time consuming ones. I will try with this solution and once again thank you so much for the solution code :slight_smile:

1 Like