Multiprocessing on GPU

In this tutorial, the idea is to parallelized the reading of several HDF5 files in different processes and to perform the azimuthal integration on the GPU together with the Bitshuffle-LZ decompression.

The size of the problem is 1000 files containing 1000 frames each of 4Mpix. That’s one million frames. Reduced data are 1000x1000x1000 in single precision float.

This is an extreme case where we will try how to use all resources of the computer, so the first thing to do is to determine the topology of the computer: * AC922 computer from IBM with power9 processors. * 2 processors, 2x16 cores. Two cores share the L3-cache * 2 GPUs which can be accessed either via POCL or via the nvidia driver (the later is broken).

Disclaimer: * We use multiprocess instead of the multiprocessing library to be able to demonstrate the precedure with the jupuyter notebook. * We use SharedArray to provide a natural way of sharing the result array in memory between processes.

[1]:
%matplotlib inline
# use `widget` for better user experience; `inline` is for documentation generation

#Topology of the computer:
!lstopo /tmp/topo.png
from IPython.display import Image
Image(filename='/tmp/topo.png')
Failed to open /tmp/topo.png for writing (File exists)
[1]:
../../../_images/usage_tutorial_Parallelization_MultiGPU_1_1.png
[2]:
import os
import glob
import concurrent
import time
import json
from matplotlib.pyplot import subplots
import multiprocess
from multiprocess import Process, Value, Array
if "mpctx" in globals():
    mpctx = multiprocess.get_context('spawn')
else:
    multiprocess.set_start_method('spawn')
    mpctx = multiprocess.get_context('spawn')
import numpy
import hdf5plugin
import h5py
import pyFAI
print("pyFAI version: ", pyFAI.version)
import SharedArray
from silx.opencl import ocl
from silx.opencl.codec.bitshuffle_lz4 import BitshuffleLz4
import inspect
import collections
Item = collections.namedtuple("Item", "index filename")
MAIN_PROCESS = multiprocess.parent_process() is None
print(ocl)
pyFAI version:  0.22.0-dev10
OpenCL devices:
[0] Portable Computing Language: (0,0) pthread-POWER9, altivec supported, (0,1) Tesla V100-SXM2-32GB, (0,2) Tesla V100-SXM2-32GB
[1] NVIDIA CUDA: (1,0) Tesla V100-SXM2-32GB, (1,1) Tesla V100-SXM2-32GB
[3]:
#This cell contains the parameters for all the processing
params = {
    "DEVICES": [[0,1],[0,2]],
    "NWORKERS": 4,
    "FRAME_PER_FILE": 1000,
    "NFILES" : 1000,
    "NBINS" : 1000,
    "DETECTOR":"Eiger_4M",
    "pathname" : "/tmp/big_%04d.h5",
    "pathmask" : "/tmp/big_????.h5",
    "dtype" : "float32",
    "SHARED_NAME" : "shm://multigpu",
    "array_shape" : [1000, 1000, 1000],
    }
with open("param.json", "w") as w: w.write(json.dumps(params, indent=2))
for k,v in params.items():
    globals()[k] = v
[4]:
def build_integrator(detector=DETECTOR):
    "Build an azimuthal integrator with a dummy geometry"
    geo = {"detector": detector,
           "wavelength": 1e-10,
           "rot3":0} #work around a bug https://github.com/silx-kit/pyFAI/pull/1749
    ai = pyFAI.load(geo)
    return ai
[5]:
# Generate a set of files
def generate_one_frame(ai, unit="q_nm^-1", dtype="uint32"):
    """Prepare a frame with little count so that it compresses well"""
    qmax = ai.array_from_unit(unit=unit).max()
    q = numpy.linspace(0, qmax, 100)
    img = ai.calcfrom1d(q, 100/(1+q*q))
    frame = numpy.random.poisson(img).astype(dtype)
    return frame

def generate_files(img):
    cmp = hdf5plugin.Bitshuffle()
    filename = pathname%0
    shape = img.shape
    with h5py.File(filename, "w") as h:
        ds = h.create_dataset("data", shape=(FRAME_PER_FILE,)+shape, chunks=(1,)+shape, dtype=img.dtype, **cmp)
        for i in range(FRAME_PER_FILE):
            ds[i] = img + i%500 #Each frame has a different value to prevent caching effects
    res = [filename]
    for i in range(1, NFILES):
        new_file = pathname%i
        os.link(filename,new_file)
        res.append(new_file)
    return res

# Create a set of files with dummy data in them:
if len(glob.glob(pathmask)) == NFILES:
    input_files = glob.glob(pathmask)
    input_files.sort()
else:
    for f in glob.glob(pathmask):
        os.remove(f)
    input_files = generate_files(generate_one_frame(build_integrator(DETECTOR)))
input_files[:5]
[5]:
['/tmp/big_0000.h5',
 '/tmp/big_0001.h5',
 '/tmp/big_0002.h5',
 '/tmp/big_0003.h5',
 '/tmp/big_0004.h5']
[6]:
#This is allows to create and destroy shared numpy arrays

def create_shared_array(shape, dtype="float32", name=SHARED_NAME, create=False):
    if create:
        ary = SharedArray.create(name, shape, dtype=dtype)
    else:
        ary = SharedArray.attach(name)
    return ary

def release_shared(name=SHARED_NAME):
    if MAIN_PROCESS:
        SharedArray.delete(name)

result_array = create_shared_array(array_shape, dtype, SHARED_NAME, create=True)
[7]:
def worker(rank, queue, shm_name, counter):
    """Function representing one worker, used in a pool of worker.

    :param rank: integer, index of the worker.
    :param queue: input queue, expects Item with index and name of the file to process
    :param shm_name: name of the output shared memory to put integrated intensities
    :param counter: decremented when quits
    :return: nothing, used in a process.
    """
    def new_engine(engine, wg):
        "Change workgroup size of an engine"
        return engine.__class__((engine._data, engine._indices, engine._indptr),
                              engine.size, empty=engine.empty, unit=engine.unit,
                              bin_centers=engine.bin_centers, azim_centers = engine.azim_centers,
                              ctx=engine.ctx, block_size=wg)
    #imports:
    import pyFAI
    import numpy
    import SharedArray
    from silx.opencl.codec.bitshuffle_lz4 import BitshuffleLz4
    import h5py
    import json
    import sys
    #load parameters:
    for k,v in json.load(open("param.json")).items():
        globals()[k] = v
    #Start up the integrator:
    ai = pyFAI.load({"detector": DETECTOR,
                     "wavelength": 1e-10,
                     "rot3":0})
    blank = numpy.zeros(ai.detector.shape, dtype="uint32")
    method = ("full", "csr", "opencl", DEVICES[rank%len(DEVICES)])
    res = ai.integrate1d(blank, NBINS, method=method)
    omega = ai.solidAngleArray()
    engine = ai.engines[res.method].engine

    omega_crc = engine.on_device["solidangle"]
    engine = new_engine(engine, 512)

    gpu_decompressor = BitshuffleLz4(2000000, blank.size, dtype=blank.dtype, ctx=engine.ctx)
    gpu_decompressor.block_size = 128
    result_array = SharedArray.attach(SHARED_NAME)
    with counter.get_lock():
        counter.value += 1
    #Worker is ready !
    while True:
        item = queue.get()
        index = item.index
        if index<0:
            with counter.get_lock():
                counter.value -= 1
            return
        with h5py.File(item.filename, "r") as h5:
            ds = h5["data"]
            for i in range(ds.id.get_num_chunks()):
                filter_mask, chunk = ds.id.read_direct_chunk(ds.id.get_chunk_info(i).chunk_offset)
                if filter_mask == 0:
                    # print(f"{rank}: process frame #{i}")
                    dec = gpu_decompressor(chunk)
                    intensity = engine.integrate_ng(dec, solidangle=omega, solidangle_checksum=omega_crc).intensity
                    result_array[index, i,:] = intensity.astype(dtype)
[8]:
def build_pool(nbprocess, queue, shm_name, counter):
    """Build a pool of processes with workers, and starts them"""
    pool = []
    for i in range(nbprocess):
        process = Process(target=worker, name=f"worker_{i:02d}", args=(i, queue, shm_name, counter))
        process.start()
        pool.append(process)
    while counter.value<nbprocess:
        time.sleep(1)
    return pool

def end_pool(pool, queue):
    """Ends all processes from a pool by sending them a "kill-pill"""
    for process in pool:
        queue.put(Item(-1, None))

# Build the pool of workers
queue = mpctx.Queue()
counter = mpctx.Value("i", 0)
pool = build_pool(NWORKERS, queue, SHARED_NAME, counter)
pool
[8]:
[<Process name='worker_00' pid=226160 parent=225643 started>,
 <Process name='worker_01' pid=226161 parent=225643 started>,
 <Process name='worker_02' pid=226162 parent=225643 started>,
 <Process name='worker_03' pid=226226 parent=225643 started>]
[9]:
start_time = time.perf_counter()
for idx, fn in enumerate(input_files):
    queue.put(Item(idx, fn))
end_pool(pool, queue)
counter.value
[9]:
4
[10]:
while counter.value:
    time.sleep(1)
run_time = time.perf_counter()-start_time
print(f"run-time: {run_time:.1f}s")
run-time: 677.946s
[11]:
result_array
[11]:
array([[[9.79569473e+01, 9.78041000e+01, 9.82191238e+01, ...,
         5.50363541e-01, 5.63470900e-01, 8.26274157e-01],
        [9.89569473e+01, 9.88041077e+01, 9.92191238e+01, ...,
         1.62667990e+00, 1.63994408e+00, 1.90288854e+00],
        [9.99569473e+01, 9.98041077e+01, 1.00219124e+02, ...,
         2.70299625e+00, 2.71641731e+00, 2.97950292e+00],
        ...,
        [5.93956970e+02, 5.93804199e+02, 5.94219360e+02, ...,
         5.34403198e+02, 5.34494141e+02, 5.34827026e+02],
        [5.94956970e+02, 5.94804199e+02, 5.95219299e+02, ...,
         5.35479553e+02, 5.35570618e+02, 5.35903625e+02],
        [5.95956970e+02, 5.95804199e+02, 5.96219360e+02, ...,
         5.36555908e+02, 5.36647095e+02, 5.36980225e+02]],

       [[9.79569473e+01, 9.78041000e+01, 9.82191238e+01, ...,
         5.50363541e-01, 5.63470900e-01, 8.26274157e-01],
        [9.89569473e+01, 9.88041077e+01, 9.92191238e+01, ...,
         1.62667990e+00, 1.63994408e+00, 1.90288854e+00],
        [9.99569473e+01, 9.98041077e+01, 1.00219124e+02, ...,
         2.70299625e+00, 2.71641731e+00, 2.97950292e+00],
        ...,
        [5.93956970e+02, 5.93804199e+02, 5.94219360e+02, ...,
         5.34403198e+02, 5.34494141e+02, 5.34827026e+02],
        [5.94956970e+02, 5.94804199e+02, 5.95219299e+02, ...,
         5.35479553e+02, 5.35570618e+02, 5.35903625e+02],
        [5.95956970e+02, 5.95804199e+02, 5.96219360e+02, ...,
         5.36555908e+02, 5.36647095e+02, 5.36980225e+02]],

       [[9.79569473e+01, 9.78041000e+01, 9.82191238e+01, ...,
         5.50363541e-01, 5.63470900e-01, 8.26274157e-01],
        [9.89569473e+01, 9.88041077e+01, 9.92191238e+01, ...,
         1.62667990e+00, 1.63994408e+00, 1.90288854e+00],
        [9.99569473e+01, 9.98041077e+01, 1.00219124e+02, ...,
         2.70299625e+00, 2.71641731e+00, 2.97950292e+00],
        ...,
        [5.93956970e+02, 5.93804199e+02, 5.94219360e+02, ...,
         5.34403198e+02, 5.34494141e+02, 5.34827026e+02],
        [5.94956970e+02, 5.94804199e+02, 5.95219299e+02, ...,
         5.35479553e+02, 5.35570618e+02, 5.35903625e+02],
        [5.95956970e+02, 5.95804199e+02, 5.96219360e+02, ...,
         5.36555908e+02, 5.36647095e+02, 5.36980225e+02]],

       ...,

       [[5.96956970e+02, 5.96804199e+02, 5.97219299e+02, ...,
         5.37632202e+02, 5.37723572e+02, 5.38056824e+02],
        [9.79569473e+01, 9.78041000e+01, 9.82191238e+01, ...,
         5.50363541e-01, 5.63470900e-01, 8.26274157e-01],
        [9.89569473e+01, 9.88041077e+01, 9.92191238e+01, ...,
         1.62667990e+00, 1.63994408e+00, 1.90288854e+00],
        ...,
        [5.93956970e+02, 5.93804199e+02, 5.94219360e+02, ...,
         5.34403198e+02, 5.34494141e+02, 5.34827026e+02],
        [5.94956970e+02, 5.94804199e+02, 5.95219299e+02, ...,
         5.35479553e+02, 5.35570618e+02, 5.35903625e+02],
        [5.95956970e+02, 5.95804199e+02, 5.96219360e+02, ...,
         5.36555908e+02, 5.36647095e+02, 5.36980225e+02]],

       [[5.96956970e+02, 5.96804199e+02, 5.97219299e+02, ...,
         5.37632202e+02, 5.37723572e+02, 5.38056824e+02],
        [9.79569473e+01, 9.78041000e+01, 9.82191238e+01, ...,
         5.50363541e-01, 5.63470900e-01, 8.26274157e-01],
        [9.89569473e+01, 9.88041077e+01, 9.92191238e+01, ...,
         1.62667990e+00, 1.63994408e+00, 1.90288854e+00],
        ...,
        [5.93956970e+02, 5.93804199e+02, 5.94219360e+02, ...,
         5.34403198e+02, 5.34494141e+02, 5.34827026e+02],
        [5.94956970e+02, 5.94804199e+02, 5.95219299e+02, ...,
         5.35479553e+02, 5.35570618e+02, 5.35903625e+02],
        [5.95956970e+02, 5.95804199e+02, 5.96219360e+02, ...,
         5.36555908e+02, 5.36647095e+02, 5.36980225e+02]],

       [[5.96956970e+02, 5.96804199e+02, 5.97219299e+02, ...,
         5.37632202e+02, 5.37723572e+02, 5.38056824e+02],
        [9.79569473e+01, 9.78041000e+01, 9.82191238e+01, ...,
         5.50363541e-01, 5.63470900e-01, 8.26274157e-01],
        [9.89569473e+01, 9.88041077e+01, 9.92191238e+01, ...,
         1.62667990e+00, 1.63994408e+00, 1.90288854e+00],
        ...,
        [5.93956970e+02, 5.93804199e+02, 5.94219360e+02, ...,
         5.34403198e+02, 5.34494141e+02, 5.34827026e+02],
        [5.94956970e+02, 5.94804199e+02, 5.95219299e+02, ...,
         5.35479553e+02, 5.35570618e+02, 5.35903625e+02],
        [5.95956970e+02, 5.95804199e+02, 5.96219360e+02, ...,
         5.36555908e+02, 5.36647095e+02, 5.36980225e+02]]], dtype=float32)
[12]:
fig, ax = subplots()
ax.imshow(result_array[:,:,5])
[12]:
<matplotlib.image.AxesImage at 0x73caf4c6f070>
../../../_images/usage_tutorial_Parallelization_MultiGPU_12_1.png
[13]:
print(f"Performances: {NFILES*FRAME_PER_FILE/run_time:.3f} fps")
Performances: 1475.043 fps
[14]:
release_shared(SHARED_NAME)

Conclusion

It is possible to reach 1500 frames per second on a dual-GPU computer. In this configuration, a million of Eiger4M frames are reduced in something close to 10mn.

Energy considerations for the reduction of one million frames: * Each of the 2 GPU was consuming 140 W (over the 300W TDP of one GPU) over 700s which represents 200 kJ (55.5 Wh) for the data reduction. * The equivalent processing using only a 6-core workstation takes half a day (40682 s) and consumes 1561 kJ (433 Wh). * Processing on GPUs is thus close to 8x more efficient than processing on CPU only.

Other considerations: * Here reading from disk is fast enough, no need to put more than 2 processes per GPU. If reading is much slower, more can be beneficial. * Since the output array is in shared memory, one needs to have enough RAM for hosting it.