Source code for nabu.processing.convolution_cuda

from os.path import dirname
import numpy as np
from ..utils import updiv, get_cuda_srcfile
from ..cuda.utils import __has_pycuda__
from ..misc.utils import ConvolutionInfos
from ..cuda.processing import CudaProcessing

if __has_pycuda__:
    from pycuda.compiler import SourceModule


[docs] class Convolution: """ A class for performing convolution on GPU with CUDA, but without using textures (unlike for example in ``silx.opencl.convolution``) """ def __init__(self, shape, kernel, axes=None, mode=None, extra_options=None, cuda_options=None): """ Constructor of Cuda Convolution. Parameters ----------- shape: tuple Shape of the array. kernel: array-like Convolution kernel (1D, 2D or 3D). axes: tuple, optional Axes along which the convolution is performed, for batched convolutions. mode: str, optional Boundary handling mode. Available modes are: - "reflect": cba|abcd|dcb - "nearest": aaa|abcd|ddd - "wrap": bcd|abcd|abc - "constant": 000|abcd|000 Default is "reflect". extra_options: dict, optional Advanced options (dict). Current options are: - "allocate_input_array": True - "allocate_output_array": True - "allocate_tmp_array": True - "sourcemodule_kwargs": {} - "batch_along_flat_dims": True """ self.cuda = CudaProcessing(**(cuda_options or {})) self._configure_extra_options(extra_options) self._determine_use_case(shape, kernel, axes) self._allocate_memory(mode) self._init_kernels() def _configure_extra_options(self, extra_options): self.extra_options = { "allocate_input_array": True, "allocate_output_array": True, "allocate_tmp_array": True, "sourcemodule_kwargs": {}, "batch_along_flat_dims": True, } extra_opts = extra_options or {} self.extra_options.update(extra_opts) self.sourcemodule_kwargs = self.extra_options["sourcemodule_kwargs"] def _get_dimensions(self, shape, kernel): self.shape = shape self.data_ndim = self._check_dimensions(shape=shape, name="Data") self.kernel_ndim = self._check_dimensions(arr=kernel, name="Kernel") Nx = shape[-1] if self.data_ndim >= 2: Ny = shape[-2] else: Ny = 1 if self.data_ndim >= 3: Nz = shape[-3] else: Nz = 1 self.Nx = np.int32(Nx) self.Ny = np.int32(Ny) self.Nz = np.int32(Nz) def _determine_use_case(self, shape, kernel, axes): """ Determine the convolution use case from the input/kernel shape, and axes. """ self._get_dimensions(shape, kernel) if self.kernel_ndim > self.data_ndim: raise ValueError("Kernel dimensions cannot exceed data dimensions") data_ndim = self.data_ndim kernel_ndim = self.kernel_ndim self.kernel = kernel.astype("f") convol_infos = ConvolutionInfos() k = (data_ndim, kernel_ndim) if k not in convol_infos.use_cases: raise ValueError( "Cannot find a use case for data ndim = %d and kernel ndim = %d" % (data_ndim, kernel_ndim) ) possible_use_cases = convol_infos.use_cases[k] # If some dimensions are "flat", make a batched convolution along them # Ex. data_dim = (1, Nx) -> batched 1D convolution if self.extra_options["batch_along_flat_dims"] and (1 in self.shape): axes = tuple([curr_dim for numels, curr_dim in zip(self.shape, range(len(self.shape))) if numels != 1]) # self.use_case_name = None for uc_name, uc_params in possible_use_cases.items(): if axes in convol_infos.allowed_axes[uc_name]: self.use_case_name = uc_name self.use_case_desc = uc_params["name"] self.use_case_kernels = uc_params["kernels"].copy() if self.use_case_name is None: raise ValueError( "Cannot find a use case for data ndim = %d, kernel ndim = %d and axes=%s" % (data_ndim, kernel_ndim, str(axes)) ) # TODO implement this use case if self.use_case_name == "batched_separable_2D_1D_3D": raise NotImplementedError("The use case %s is not implemented" % self.use_case_name) # self.axes = axes # Replace "axes=None" with an actual value (except for ND-ND) allowed_axes = convol_infos.allowed_axes[self.use_case_name] if len(allowed_axes) > 1: # The default choice might impact perfs self.axes = allowed_axes[0] or allowed_axes[1] self.separable = self.use_case_name.startswith("separable") self.batched = self.use_case_name.startswith("batched") def _allocate_memory(self, mode): self.mode = mode or "reflect" # The current implementation does not support kernel size bigger than data size, # except for mode="nearest" for i, dim_size in enumerate(self.shape): if min(self.kernel.shape) > dim_size and i in self.axes: print( "Warning: kernel support is too large for data dimension %d (%d). Forcing convolution mode to 'nearest'" % (i, dim_size) ) self.mode = "nearest" # option_array_names = { "allocate_input_array": "data_in", "allocate_output_array": "data_out", "allocate_tmp_array": "data_tmp", } # Nonseparable transforms do not need tmp array if not (self.separable): self.extra_options["allocate_tmp_array"] = False # Allocate arrays for option_name, array_name in option_array_names.items(): if self.extra_options[option_name]: value = self.cuda.allocate_array("value", self.shape, np.float32) else: value = None setattr(self, array_name, value) if isinstance(self.kernel, np.ndarray): self.d_kernel = self.cuda.to_device("d_kernel", self.kernel) else: if not (isinstance(self.kernel, self.cuda.array_class)): raise ValueError("kernel must be either numpy array or pycuda array") self.d_kernel = self.kernel self._old_input_ref = None self._old_output_ref = None self._c_modes_mapping = { "periodic": 2, "wrap": 2, "nearest": 1, "replicate": 1, "reflect": 0, "constant": 3, } mp = self._c_modes_mapping if self.mode.lower() not in mp: raise ValueError( """ Mode %s is not available. Available modes are: %s """ % (self.mode, str(mp.keys())) ) if self.mode.lower() == "constant": raise NotImplementedError("mode='constant' is not implemented yet") self._c_conv_mode = mp[self.mode] def _init_kernels(self): if self.kernel_ndim > 1: if np.abs(np.diff(self.kernel.shape)).max() > 0: raise NotImplementedError("Non-separable convolution with non-square kernels is not implemented yet") # Compile source module compile_options = [str("-DUSED_CONV_MODE=%d" % self._c_conv_mode)] fname = get_cuda_srcfile("convolution.cu") nabu_cuda_dir = dirname(fname) include_dirs = [nabu_cuda_dir] self.sourcemodule_kwargs["options"] = compile_options self.sourcemodule_kwargs["include_dirs"] = include_dirs with open(fname) as fid: cuda_src = fid.read() self._module = SourceModule(cuda_src, **self.sourcemodule_kwargs) # Blocks, grid self._block_size = {1: (32, 1, 1), 2: (32, 32, 1), 3: (16, 8, 8)}[self.data_ndim] # TODO tune self._n_blocks = tuple([int(updiv(a, b)) for a, b in zip(self.shape[::-1], self._block_size)]) # Prepare cuda kernel calls self._cudakernel_signature = { 1: "PPPiiii", 2: "PPPiiiii", 3: "PPPiiiiii", }[self.kernel_ndim] self.cuda_kernels = {} for axis, kern_name in enumerate(self.use_case_kernels): self.cuda_kernels[axis] = self._module.get_function(kern_name) self.cuda_kernels[axis].prepare(self._cudakernel_signature) # Cuda kernel arguments kernel_args = [ self._n_blocks, self._block_size, None, None, self.d_kernel.gpudata, np.int32(self.kernel.shape[0]), self.Nx, self.Ny, self.Nz, ] if self.kernel_ndim == 2: kernel_args.insert(5, np.int32(self.kernel.shape[1])) if self.kernel_ndim == 3: kernel_args.insert(5, np.int32(self.kernel.shape[2])) kernel_args.insert(6, np.int32(self.kernel.shape[1])) self.kernel_args = tuple(kernel_args) # If self.data_tmp is allocated, separable transforms can be performed # by a series of batched transforms, without any copy, by swapping refs. self.swap_pattern = None if self.separable: if self.data_tmp is not None: self.swap_pattern = { 2: [("data_in", "data_tmp"), ("data_tmp", "data_out")], 3: [ ("data_in", "data_out"), ("data_out", "data_tmp"), ("data_tmp", "data_out"), ], } else: raise NotImplementedError("For now, data_tmp has to be allocated") def _get_swapped_arrays(self, i): """ Get the input and output arrays to use when using a "swap pattern". Swapping refs enables to avoid copies between temp. array and output. For example, a separable 2D->1D convolution on 2D data reads: data_tmp = convol(data_input, kernel, axis=1) # step i=0 data_out = convol(data_tmp, kernel, axis=0) # step i=1 :param i: current step number of the separable convolution """ n_batchs = len(self.axes) in_ref, out_ref = self.swap_pattern[n_batchs][i] d_in = getattr(self, in_ref) d_out = getattr(self, out_ref) return d_in, d_out def _configure_kernel_args(self, cuda_kernel_args, input_ref, output_ref): # TODO more elegant if isinstance(input_ref, self.cuda.array_class): input_ref = input_ref.gpudata if isinstance(output_ref, self.cuda.array_class): output_ref = output_ref.gpudata if input_ref is not None or output_ref is not None: cuda_kernel_args = list(cuda_kernel_args) if input_ref is not None: cuda_kernel_args[2] = input_ref if output_ref is not None: cuda_kernel_args[3] = output_ref cuda_kernel_args = tuple(cuda_kernel_args) return cuda_kernel_args @staticmethod def _check_dimensions(arr=None, shape=None, name="", dim_min=1, dim_max=3): if shape is not None: ndim = len(shape) elif arr is not None: ndim = arr.ndim else: raise ValueError("Please provide either arr= or shape=") if ndim < dim_min or ndim > dim_max: raise ValueError("%s dimensions should be between %d and %d" % (name, dim_min, dim_max)) return ndim def _check_array(self, arr): if not (isinstance(arr, self.cuda.array_class) or isinstance(arr, np.ndarray)): raise TypeError("Expected either pycuda.gpuarray or numpy.ndarray") if arr.dtype != np.float32: raise TypeError("Data must be float32") if arr.shape != self.shape: raise ValueError("Expected data shape = %s" % str(self.shape)) def _set_arrays(self, array, output=None): # Either copy H->D or update references. if isinstance(array, np.ndarray): self.data_in[:] = array[:] else: self._old_input_ref = self.data_in self.data_in = array data_in_ref = self.data_in if output is not None: if not (isinstance(output, np.ndarray)): self._old_output_ref = self.data_out self.data_out = output # Update Cuda kernel arguments with new array references self.kernel_args = self._configure_kernel_args(self.kernel_args, data_in_ref, self.data_out) def _separable_convolution(self): assert len(self.axes) == len(self.use_case_kernels) # Separable: one kernel call per data dimension for i, axis in enumerate(self.axes): in_ref, out_ref = self._get_swapped_arrays(i) self._batched_convolution(axis, input_ref=in_ref, output_ref=out_ref) def _batched_convolution(self, axis, input_ref=None, output_ref=None): # Batched: one kernel call in total cuda_kernel = self.cuda_kernels[axis] cuda_kernel_args = self._configure_kernel_args(self.kernel_args, input_ref, output_ref) ev = cuda_kernel.prepared_call(*cuda_kernel_args) def _nd_convolution(self): assert len(self.use_case_kernels) == 1 cuda_kernel = self._module.get_function(self.use_case_kernels[0]) ev = cuda_kernel.prepared_call(*self.kernel_args) def _recover_arrays_references(self): if self._old_input_ref is not None: self.data_in = self._old_input_ref self._old_input_ref = None if self._old_output_ref is not None: self.data_out = self._old_output_ref self._old_output_ref = None self.kernel_args = self._configure_kernel_args(self.kernel_args, self.data_in, self.data_out) def _get_output(self, output): if output is None: res = self.data_out.get() else: res = output if isinstance(output, np.ndarray): output[:] = self.data_out[:] self._recover_arrays_references() return res
[docs] def convolve(self, array, output=None): """ Convolve an array with the class kernel. :param array: Input array. Can be numpy.ndarray or pycuda.gpuarray.GPUArray. :param output: Output array. Can be numpy.ndarray or pycuda.gpuarray.GPUArray. """ self._check_array(array) self._set_arrays(array, output=output) if self.axes is not None: if self.separable: self._separable_convolution() elif self.batched: assert len(self.axes) == 1 self._batched_convolution(self.axes[0]) # else: ND-ND convol else: # ND-ND convol self._nd_convolution() res = self._get_output(output) return res
__call__ = convolve