Distributing azimuthal integration

A feedback of distributed azimuthal integration and the use of dask.distributed

Presenter Notes

Outline

  1. Context and objectives
  2. Measuring time for AI and reading data
  3. Distributing azimuthal integration with distributed
  4. Feed-back on distributed

Presenter Notes

1. Context and objectives

Presenter Notes

1. Context: ID15A XRD data deluge

Context

  • GPFS setup on ID15A --> one order of magnitude faster data acquisition
  • Each proposal ends up with (tens of) millions of frames
  • The beamline currently cannot cope with the data throughput
    • Several days to process the data from one proposal
    • By then, users are usually already gone

Example

ACL9011001d/ACL9011001d_0195/scan0001/<files>
sample       dataset         scan
1-10        10-200 (200)    1-20 (1)   (125 files - 25 k frames)
  • 1 scan (25k frames): 4 mins @ 100 FPS (25 secs @ 1000 FPS)
  • 1 sample (200 scans * 1 dataset * 25k frames = 5M frames): 14h @ 100 FPS (1h30@1000 FPS)

Presenter Notes

1. Objectives

  • Fast processing. Get integrated data as soon as reasonably possible
    • Target: 1000-2000 FPS (vs. 100 now)
    • Start with stored data. Later: ODA ?
  • Simple to use (crucial requirement expressed by ID15A)
  • Versatile software (to be used on ID15A, ID11, and later for XRF-CT)
    • ... while still accounting for beamline-specifics (eg. ID15A output files layout)
  • ADA-maintained, tomo-bridge development standards

In this presentation: AI = Azimuthal Integration
(and not Artificial Intelligence for once!)

Presenter Notes

2. Measuring time for AI and reading data

Presenter Notes

2. Involved steps

Overview of the involved steps:

  • Load data (from GPFS)
  • perform Azimuthal Integration (AI) on frames
  • Save curves (on GPFS)

How to accelerate this ?

  • Measure the individual steps, determine bottlenecks
  • Review parallelization/distribution strategies

Presenter Notes

2.1 - Azimuthal Integration (AI)

pyFAI is already optimized (multi-years effort)

  • Bare-metal languages (Cython, OpenCL)
  • Different algorithms & data structures

Some figures with the ID15A parameters

  • CPU: 40-80 FPS (see next slide for details)
  • GPU: 150-400 FPS with modern GPUs (V100, A100, A40)

ID15A parameters:

  • Pilatus CdTe 2M (1679x1475)
  • 2500 azimuthal bins
  • Poisson error propagation
  • Polarization factor 1.0

Presenter Notes

2.1 - AI: detailed figures

GPU:

  • Usually 150-400 FPS
  • Can load many (~10) AI engines without significantly degrading performances

CPU

  • CPU: 40-80 FPS
  • No point using too many cores - L3 cache sharing and limits of OpenMP scaling?

Intel Xeon Gold 6248 (hpc5):

  • 10 threads: 55 FPS
  • 20 threads: 60 FPS
  • 40 threads: 80 FPS
  • Best FPS/threads is at 10 threads (one socket)

AMD EPYC 7543: 55 FPS (optimum at 4 threads only!)

Presenter Notes

2.1 - How to accelerate AI ?

  • pyFAI already optimized
  • -> More "workers"
    • Scale up: several multi-threaded AI per CPU, several AI per GPU
    • Scale out: more machines

Presenter Notes

2.2 - Loading data

Loading 200 frames (one file) typically takes 0.5-2 secs (0.99 - 4 GB/S i.e 100-400 FPS).

  • Data is LZ4-compressed
  • LZ4 decompression is the bottleneck (not network!)
  • Decompression uses hdf5plugin under the hood
    • ongoing work to optimize it
  • Current hdf5plugin benefits from multithreads up to a limit (detailed figures next slide)
    • AMD EPYC 💪
  • Threads placement has a crucial impact (power9 machines: use physical cores!)

Loading vs processing: where is the bottleneck?

  • CPU (nice): clearly AI (40-70 FPS) reading done at 150-200 FPS
  • GPU/power9: clearly reading (< 100 FPS), unless careful taskset
  • GPU/EPYC: AI (reading done at > 400 FPS, even with 4 threads)

Presenter Notes

2.2 - Loading data: detailed figures

Timing to read 200 frames.

Power9

  • 64/128 cores: 1.0s (1.98 GB/s : 200 FPS)
  • taskset 0-3 or 0-7: 1.61s
  • taskset 0-15: 750 ms
  • taskset 0,4,8,12,16: 840 ms

EPYC 7543

  • 64/64 cores: 722 ms
  • taskset 0-3 or 0-7: 360 ms (!)
  • taskset 0-1: 500 ms

Xeon Gold 6248

  • 40 cores: 1.43s
  • taskset 0-9: 1.17s

Presenter Notes

3. Distributing azimuthal integration with distributed

Presenter Notes

3. Distributing computations - strategies

  • Local machine vs many-nodes
  • Granularity
    • One or several images
    • One "scan" (25 000 images)
    • Several scans / one dataset

Importantly, spawning an azimuthal integrator takes several seconds (to tens of seconds)

  • Need to process many frames to make the AI instantiation worth
    • Exit parallelization on (few) images
  • Ideally, spawn only once
    • spawn n_workers integrators and feed them with stacks of images/datasets

Presenter Notes

3. Distributing computations - possible solutions

Python native multiprocessing

  • ✅ Simple
  • ❌ Adapted only to distribution on the local machine
  • ❌ IPC (if any) has to be done manually (eg. multiprocessing.Queue)
  • ❌ Cannot (?) choose threads affinity

MPI

  • ✅ Adapted to both local and remote distribution
  • ❌ "Invasive" in the code, not so easy to use

Distributed

  • ✅ Adapted to both local and remote distribution (OARCLuster, SLURMCluster, ...)
  • ✅ Support for communications and RPC
  • ✅ Python module hiding most of the boring stuff
  • ❌ Difficult to troubleshoot
  • ❌ Unstable when using many workers (> 30) on the same machine

Presenter Notes

3. A brief introduction to distributed

ClientSchedulerWorker

  • Client and scheduler lives on the front-end, Workers live on the compute node
  • Relies on dask task graph
  • ... but also supports stateful computations with RPC "actors"
    • i.e classes instances living on a remote process/node

Presenter Notes

3. Hello-world example

from distributed import Client, LocalCluster, wait
from dask_jobqueue import SLURMCluster

def say_hello(x):
   print("Hello, I'm", os.getpid(), "from", socket.gethostname())

cluster = SLURMCLuster(cluster_specs) # or LocalCluster()
cluster.scale(n_workers)

client = Client(worker)
future = client.submit(say_hello) # concurrent.futures object
print(future)
wait([future], timeout=timeout) # blocks until task is completed/failed

Presenter Notes

3. Using RPC with distributed

  • Actors are class instances living on a remote process or node.
  • Communication with client is done via RPC (Remote Procedure Call)
  • This seems nice!
    • Spawn each integrator exactly once
    • Feed them with frames stacks or entire datasets or scans... (granularity can be tuned)
    • No exchange of data (only messages like "process dataset with this DataUrl")
    • Book-keeping is done by the scheduler

Presenter Notes

3. Performance results - distributed

Multi-nodes, CPU-only (nice partition, 10 threads/worker)

  • 10 workers: 250 FPS
  • 20 workers: 540 FPS
  • 40 workers: 900 FPS

Local machine: p9-04 (power9 + 2 V100)

  • 16 workers (4T, 8/GPU) : 800 FPS
  • 20 workers (2T or 4T, 10/GPU): 800-1000 FPS
  • 32 workers (4T, 16/GPU) : 1000 FPS, but unstable scheduler/workers

Local machine: gpid16axni (AMD EPYC 7543 + 2 A100)

  • 4 workers: 512 FPS
  • 12 workers: 1260 FPS
  • 16 workers: (4T): 1400 FPS
  • 20 workers: (2T): 1500 FPS
  • 32 workers (2T): 1360 FPS, unstable

Presenter Notes

3. Performance results - multiprocessing

1 dataset per "worker", so granularity = scan !

p9-04 (power9 + 2 V100)

  • 16 workers (2T!): 810 FPS
  • 20 workers (2T): 1010 FPS
  • 32 workers (4T): 1080 FPS - stable!

gpid16axni (AMD EPYC 7543 + 2 A100)

  • 16 workers (4T): 1480 FPS
  • 20 workers (2T): 1540 FPS
  • 32 workers (2T): 1600 FPS - stable!

Notes:

  • dask.distributed has a surprisingly low overhead
  • power9 is 2X slower for decompressing LZ4 data
    • unless carefully-chosen threads affinity (cumbersome)

Presenter Notes

4. Feed-back on distributed

Presenter Notes

4. Feed-back on using distributed

The good

  • SLURMCluster, LocalCluster abstraction
  • Boring job is done for you (sockets communication/RPC, book-keeping, SLURM monitoring...)
  • Performances: very low overhead wrt. multiprocessing
  • Scales nicely with the number of nodes
  • Can choose the number of thread per worker (i.e process), with good affinity

Presenter Notes

4. Feed-back on using distributed

The bad

  • Can't easily create a "worker using 1/8 GPU"
    • For SLURM, "1 job = 1 full GPU"
    • One "dask worker" would have to spawn multiple integrators by itself to "fill" the GPU
    • This contradict the simplicity of distributed
  • Seems unstable if many (> 30) workers on the same machine

The ugly

  • Asynchronous debuging hell
    • Don't ever mix with Thread (will fight for semaphore - same event loop?)
  • When it fails, it does catastrophically

Presenter Notes

4. Things that can go wrong

When distributing computations on the cluster, many things can (and will) go wrong.

  • Filesystem not updated in time -> unable to create master file
  • Heterogeneous openCL installations, eg:
    • pocl instantly segfaults on x86 CPUs in the "nice" partition
    • pocl is the only working implementation on power9 machines
  • If failure (for whatever reason) on worker:
    • "timeout" to be caught on the client side
    • re-submit tasks
  • Respawning computing resources !
  • ... let alone getting computing resources
    • nice partition often crowded with hundreds of jobs

Presenter Notes

4. On stateful computations and usage of RPC

Using the RPC approach with remote classes (distributed "actors"):

Advantages

  • Avoid spawning azimuthal integrator each time
  • Enable to tune the granularity
    • The tasks we give to worker can be "integrate 200 frames" or "integrate an entire scan"

Drawbacks

  • Poorly suited when computing resources can go down (SLURM !)
    • Detecting lost workers, respawning them and fixing the state is possible but messy

Presenter Notes