dask.distributed for tomography processing

Using dask.distributed for distributing tomography reconstruction computations at ESRF

Presenter Notes

Outline

  1. Tomography processing
  2. Why we use dask.distributed
  3. How we use dask.distributed
  4. Benefits and limitations

Presenter Notes

1. Tomography processing

A tomography scan acquires a collection of images : radios

Presenter Notes

1. Tomography processing

In parallel-beam geometry, line k of each radios is used to build sinogram k

Presenter Notes

1. Tomography processing

Radios are chunked, so that each "compute worker" will handle a series of sinograms

Worker k handles chunk k

Presenter Notes

2. Why we use dask.distributed

Project management reasons:

  • Focus developments on our core expertise: tomography processing
  • Do not re-invent a new computations distribution system: additional liability and maintenance burden
  • Use an off-the-shelf solution with a community focused on scientific computing
  • In the code, decouple the computations distribution from actual processing (more difficult to achieve with "invasive" solutions like MPI)

Presenter Notes

2. Why we use dask.distributed

Technical reasons:

  • Parallel-beam geometry --> "obvious" way of distributing work
  • No need of shared memory (in our case)
  • "Move computing resources to data, not the other way around"
  • Many nice features of distributed: resilience, smart scheduling, dashboard, Actors/RPC, ...
  • dask_jobqueue offers an abstraction on top of SLURM/OAR/SGE... ESRF uses two batch schedulers !

Presenter Notes

3. How we use dask.distributed

We do not use dask itself, but dask.distributed and dask_jobqueue.

The new tomography processing software can do the computations in two modes:

  • "Local": computations are distributed on the local machine
  • "jobqueue": computations are distributed over SLURM or OAR.

Presenter Notes

3. How we use dask.distributed

In both cases, the process is the same. A "master process" will

  1. Ingest the processing tasks and options (via a config file or dictionary)
  2. Spawn a "cluster" of workers
  3. Query the workers for their actual computing resources
  4. Build a list of tasks (basically: "reconstruct this chunk")
  5. Distribute the tasks

Presenter Notes

4. Benefits and limitations

Benefits

  • Spawning workers and submiting tasks is roughly the same with "local" and "jobqueue" modes
  • Client/scheduler model (steps 2, 3, 4 above allow flexibility)
  • Asynchronous calls (submit() returns a future) and callbacks

Presenter Notes

4. Benefits and limitations

Limitations

  • Handling of cuda contexts (see next slide)
  • RPC: "Actors" are not as full-featured as the state-less submit() (workaround next slide)
  • Heterogeneous computations
    • Alleviated in "local mode" by using SpecCluster instead of LocalCluster. But not dask_jobqueue equivalent (?)
    • Workaround for "jobqueue mode": spawn two (client, cluster) couples, and one "meta-client" ?

Presenter Notes

4. Benefits and limitations

In the context of (off-line) tomography reconstruction, the critical resource is GPU memory (more than computing power).

The standard way of using distributed is to submit state-less jobs. However for some reason cuda context are not properly handled between Nanny and Worker:

  • Have to manually push() and pop() the Cuda contexts
  • cufft context does not seem to be cleaned up after job completion

Solutions:

  • Destroy completely the cuda context and manually release GPU memory after each chunk processing (not elegant and error-prone)
  • Use stateful computations: Actors

Presenter Notes

4. Benefits and limitations

How to have a "full-featured" Actor:

 1 for worker_name, worker_conf in self.workers.items():
 2     fut = self.client.submit(
 3         WorkerProcess,
 4         ...
 5         actor=True
 6     )
 7     self.actors[worker_name] = fut.result()
 8 
 9 
10 for task in self.tasks:
11     f = self.client.submit(
12         actor_process_chunk,
13         *task
14     )
15     self.futures.append(f)
16 
17 
18 def actor_process_chunk(sub_region, chunk_id):
19     myself = get_worker()
20     my_actors = list(myself.actors.keys())
21     chunk_processor = myself.actors[my_actors[0]]
22     chunk_processor.process_chunk(sub_region=sub_region)

Presenter Notes