Source code for silx.opencl.convolution

#!/usr/bin/env python
# /*##########################################################################
#
# Copyright (c) 2019-2023 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
"""Module for convolution on CPU/GPU."""

__authors__ = ["P. Paleo"]
__license__ = "MIT"
__date__ = "01/08/2019"

import numpy as np
from .common import pyopencl as cl
import pyopencl.array as parray
from .processing import OpenclProcessing, EventDescription
from .utils import ConvolutionInfos


[docs] class Convolution(OpenclProcessing): """ A class for performing convolution on CPU/GPU with OpenCL. """ def __init__( self, shape, kernel, axes=None, mode=None, ctx=None, devicetype="all", platformid=None, deviceid=None, profile=False, extra_options=None, ): """Constructor of OpenCL Convolution. :param shape: shape of the array. :param kernel: convolution kernel (1D, 2D or 3D). :param axes: axes along which the convolution is performed, for batched convolutions. :param mode: Boundary handling mode. Available modes are: "reflect": cba|abcd|dcb "nearest": aaa|abcd|ddd "wrap": bcd|abcd|abc "constant": 000|abcd|000 Default is "reflect". :param ctx: actual working context, left to None for automatic initialization from device type or platformid/deviceid :param devicetype: type of device, can be "CPU", "GPU", "ACC" or "ALL" :param platformid: integer with the platform_identifier, as given by clinfo :param deviceid: Integer with the device identifier, as given by clinfo :param profile: switch on profiling to be able to profile at the kernel level, store profiling elements (makes code slightly slower) :param extra_options: Advanced options (dict). Current options are: "allocate_input_array": True, "allocate_output_array": True, "allocate_tmp_array": True, "dont_use_textures": False, """ OpenclProcessing.__init__( self, ctx=ctx, devicetype=devicetype, platformid=platformid, deviceid=deviceid, profile=profile, ) self._configure_extra_options(extra_options) self._determine_use_case(shape, kernel, axes) self._allocate_memory(mode) self._init_kernels() def _configure_extra_options(self, extra_options): self.extra_options = { "allocate_input_array": True, "allocate_output_array": True, "allocate_tmp_array": True, "dont_use_textures": False, } extra_opts = extra_options or {} self.extra_options.update(extra_opts) self.use_textures = not (self.extra_options["dont_use_textures"]) self.use_textures &= self.check_textures_availability() def _get_dimensions(self, shape, kernel): self.shape = shape self.data_ndim = self._check_dimensions(shape=shape, name="Data") self.kernel_ndim = self._check_dimensions(arr=kernel, name="Kernel") Nx = shape[-1] if self.data_ndim >= 2: Ny = shape[-2] else: Ny = 1 if self.data_ndim >= 3: Nz = shape[-3] else: Nz = 1 self.Nx = np.int32(Nx) self.Ny = np.int32(Ny) self.Nz = np.int32(Nz) def _determine_use_case(self, shape, kernel, axes): """ Determine the convolution use case from the input/kernel shape, and axes. """ self._get_dimensions(shape, kernel) if self.kernel_ndim > self.data_ndim: raise ValueError("Kernel dimensions cannot exceed data dimensions") data_ndim = self.data_ndim kernel_ndim = self.kernel_ndim self.kernel = kernel.astype("f") convol_infos = ConvolutionInfos() k = (data_ndim, kernel_ndim) if k not in convol_infos.use_cases: raise ValueError( "Cannot find a use case for data ndim = %d and kernel ndim = %d" % (data_ndim, kernel_ndim) ) possible_use_cases = convol_infos.use_cases[k] self.use_case_name = None for uc_name, uc_params in possible_use_cases.items(): if axes in convol_infos.allowed_axes[uc_name]: self.use_case_name = uc_name self.use_case_desc = uc_params["name"] self.use_case_kernels = uc_params["kernels"].copy() if self.use_case_name is None: raise ValueError( "Cannot find a use case for data ndim = %d, kernel ndim = %d and axes=%s" % (data_ndim, kernel_ndim, str(axes)) ) # TODO implement this use case if self.use_case_name == "batched_separable_2D_1D_3D": raise NotImplementedError( "The use case %s is not implemented" % self.use_case_name ) # self.axes = axes # Replace "axes=None" with an actual value (except for ND-ND) allowed_axes = convol_infos.allowed_axes[self.use_case_name] if len(allowed_axes) > 1: # The default choice might impact perfs self.axes = allowed_axes[0] or allowed_axes[1] self.separable = self.use_case_name.startswith("separable") self.batched = self.use_case_name.startswith("batched") # Update kernel names when using textures if self.use_textures: for i, kern_name in enumerate(self.use_case_kernels): self.use_case_kernels[i] = kern_name + "_tex" def _allocate_memory(self, mode): self.mode = mode or "reflect" option_array_names = { "allocate_input_array": "data_in", "allocate_output_array": "data_out", "allocate_tmp_array": "data_tmp", } # Nonseparable transforms do not need tmp array if not (self.separable): self.extra_options["allocate_tmp_array"] = False # Allocate arrays for option_name, array_name in option_array_names.items(): if self.extra_options[option_name]: value = parray.empty(self.queue, self.shape, np.float32) value.fill(np.float32(0.0)) else: value = None setattr(self, array_name, value) if isinstance(self.kernel, np.ndarray): self.d_kernel = parray.to_device(self.queue, self.kernel) else: if not (isinstance(self.kernel, parray.Array)): raise ValueError("kernel must be either numpy array or pyopencl array") self.d_kernel = self.kernel self._old_input_ref = None self._old_output_ref = None if self.use_textures: self._allocate_textures() self._c_modes_mapping = { "periodic": 2, "wrap": 2, "nearest": 1, "replicate": 1, "reflect": 0, "constant": 3, } mp = self._c_modes_mapping if self.mode.lower() not in mp: raise ValueError( """ Mode %s is not available for textures. Available modes are: %s """ % (self.mode, str(mp.keys())) ) # TODO if not (self.use_textures) and self.mode.lower() == "constant": raise NotImplementedError( "mode='constant' is not implemented without textures yet" ) # self._c_conv_mode = mp[self.mode] def _allocate_textures(self): self.data_in_tex = self.allocate_texture(self.shape) self.d_kernel_tex = self.allocate_texture(self.kernel.shape) self.transfer_to_texture(self.d_kernel, self.d_kernel_tex) def _init_kernels(self): if self.kernel_ndim > 1: if np.abs(np.diff(self.kernel.shape)).max() > 0: raise NotImplementedError( "Non-separable convolution with non-square kernels is not implemented yet" ) compile_options = [str("-DUSED_CONV_MODE=%d" % self._c_conv_mode)] if self.use_textures: kernel_files = ["convolution_textures.cl"] compile_options.extend( [ str("-DIMAGE_DIMS=%d" % self.data_ndim), str("-DFILTER_DIMS=%d" % self.kernel_ndim), ] ) d_kernel_ref = self.d_kernel_tex else: kernel_files = ["convolution.cl"] d_kernel_ref = self.d_kernel.data self.compile_kernels(kernel_files=kernel_files, compile_options=compile_options) self.ndrange = self.shape[::-1] self.wg = None kernel_args = [ self.queue, self.ndrange, self.wg, None, None, d_kernel_ref, np.int32(self.kernel.shape[0]), self.Nx, self.Ny, self.Nz, ] if self.kernel_ndim == 2: kernel_args.insert(6, np.int32(self.kernel.shape[1])) if self.kernel_ndim == 3: kernel_args.insert(6, np.int32(self.kernel.shape[2])) kernel_args.insert(7, np.int32(self.kernel.shape[1])) self.kernel_args = tuple(kernel_args) # If self.data_tmp is allocated, separable transforms can be performed # by a series of batched transforms, without any copy, by swapping refs. self.swap_pattern = None if self.separable: if self.data_tmp is not None: self.swap_pattern = { 2: [("data_in", "data_tmp"), ("data_tmp", "data_out")], 3: [ ("data_in", "data_out"), ("data_out", "data_tmp"), ("data_tmp", "data_out"), ], } else: # TODO raise NotImplementedError("For now, data_tmp has to be allocated") def _get_swapped_arrays(self, i): """ Get the input and output arrays to use when using a "swap pattern". Swapping refs enables to avoid copies between temp. array and output. For example, a separable 2D->1D convolution on 2D data reads: data_tmp = convol(data_input, kernel, axis=1) # step i=0 data_out = convol(data_tmp, kernel, axis=0) # step i=1 :param i: current step number of the separable convolution """ if self.use_textures: # copy is needed when using texture, as data_out is a Buffer if i > 0: self.transfer_to_texture(self.data_out, self.data_in_tex) return self.data_in_tex, self.data_out n_batchs = len(self.axes) in_ref, out_ref = self.swap_pattern[n_batchs][i] d_in = getattr(self, in_ref) d_out = getattr(self, out_ref) return d_in, d_out def _configure_kernel_args(self, opencl_kernel_args, input_ref, output_ref): # TODO more elegant if isinstance(input_ref, parray.Array): input_ref = input_ref.data if isinstance(output_ref, parray.Array): output_ref = output_ref.data if input_ref is not None or output_ref is not None: opencl_kernel_args = list(opencl_kernel_args) if input_ref is not None: opencl_kernel_args[3] = input_ref if output_ref is not None: opencl_kernel_args[4] = output_ref opencl_kernel_args = tuple(opencl_kernel_args) return opencl_kernel_args @staticmethod def _check_dimensions(arr=None, shape=None, name="", dim_min=1, dim_max=3): if shape is not None: ndim = len(shape) elif arr is not None: ndim = arr.ndim else: raise ValueError("Please provide either arr= or shape=") if ndim < dim_min or ndim > dim_max: raise ValueError( "%s dimensions should be between %d and %d" % (name, dim_min, dim_max) ) return ndim def _check_array(self, arr): # TODO allow cl.Buffer if not (isinstance(arr, parray.Array) or isinstance(arr, np.ndarray)): raise TypeError("Expected either pyopencl.array.Array or numpy.ndarray") # TODO composition with ImageProcessing/cast if arr.dtype != np.float32: raise TypeError("Data must be float32") if arr.shape != self.shape: raise ValueError("Expected data shape = %s" % str(self.shape)) def _set_arrays(self, array, output=None): # When using textures: copy if self.use_textures: self.transfer_to_texture(array, self.data_in_tex) data_in_ref = self.data_in_tex else: # Otherwise: copy H->D or update references. if isinstance(array, np.ndarray): self.data_in[:] = array[:] else: self._old_input_ref = self.data_in self.data_in = array data_in_ref = self.data_in if output is not None: if not (isinstance(output, np.ndarray)): self._old_output_ref = self.data_out self.data_out = output # Update OpenCL kernel arguments with new array references self.kernel_args = self._configure_kernel_args( self.kernel_args, data_in_ref, self.data_out ) def _separable_convolution(self): assert len(self.axes) == len(self.use_case_kernels) # Separable: one kernel call per data dimension for i, axis in enumerate(self.axes): in_ref, out_ref = self._get_swapped_arrays(i) self._batched_convolution(axis, input_ref=in_ref, output_ref=out_ref) def _batched_convolution(self, axis, input_ref=None, output_ref=None): # Batched: one kernel call in total opencl_kernel = self.kernels.get_kernel(self.use_case_kernels[axis]) opencl_kernel_args = self._configure_kernel_args( self.kernel_args, input_ref, output_ref ) ev = opencl_kernel(*opencl_kernel_args) if self.profile: self.events.append(EventDescription("batched convolution", ev)) def _nd_convolution(self): assert len(self.use_case_kernels) == 1 opencl_kernel = self.kernels.get_kernel(self.use_case_kernels[0]) ev = opencl_kernel(*self.kernel_args) if self.profile: self.events.append(EventDescription("ND convolution", ev)) def _recover_arrays_references(self): if self._old_input_ref is not None: self.data_in = self._old_input_ref self._old_input_ref = None if self._old_output_ref is not None: self.data_out = self._old_output_ref self._old_output_ref = None self.kernel_args = self._configure_kernel_args( self.kernel_args, self.data_in, self.data_out ) def _get_output(self, output): if output is None: res = self.data_out.get() else: res = output if isinstance(output, np.ndarray): output[:] = self.data_out[:] self._recover_arrays_references() return res
[docs] def convolve(self, array, output=None): """ Convolve an array with the class kernel. :param array: Input array. Can be numpy.ndarray or pyopencl.array.Array. :param output: Output array. Can be numpy.ndarray or pyopencl.array.Array. """ self._check_array(array) self._set_arrays(array, output=output) if self.axes is not None: if self.separable: self._separable_convolution() elif self.batched: assert len(self.axes) == 1 self._batched_convolution(self.axes[0]) # else: ND-ND convol else: # ND-ND convol self._nd_convolution() res = self._get_output(output) return res
__call__ = convolve