Source code for nabu.processing.fft_cuda

import os
import warnings
from multiprocessing import get_context
from multiprocessing.pool import Pool
import numpy as np
from ..utils import check_supported
from .fft_base import _BaseFFT, _BaseVKFFT

try:
    from pyvkfft.cuda import VkFFTApp as vk_cufft

    __has_vkfft__ = True
except (ImportError, OSError):
    __has_vkfft__ = False
    vk_cufft = None
from ..cuda.processing import CudaProcessing

Plan = None
cu_fft = None
cu_ifft = None
__has_skcuda__ = None


[docs] def init_skcuda(): # This needs to be done here, because scikit-cuda creates a Cuda context at import, # which can mess things up in some cases. # Ugly solution to an ugly problem. global __has_skcuda__, Plan, cu_fft, cu_ifft try: from skcuda.fft import Plan from skcuda.fft import fft as cu_fft from skcuda.fft import ifft as cu_ifft __has_skcuda__ = True except ImportError: __has_skcuda__ = False
[docs] class SKCUFFT(_BaseFFT): implem = "skcuda" backend = "cuda" ProcessingCls = CudaProcessing def _configure_batched_transform(self): if __has_skcuda__ is None: init_skcuda() if not (__has_skcuda__): raise ImportError("Please install pycuda and scikit-cuda to use the CUDA back-end") self.cufft_batch_size = 1 self.cufft_shape = self.shape self._cufft_plan_kwargs = {} if (self.axes is not None) and (len(self.axes) < len(self.shape)): # In the easiest case, the transform is computed along the fastest dimensions: # - 1D transforms of lines of 2D data # - 2D transforms of images of 3D data (stacked along slow dim) # - 1D transforms of 3D data along fastest dim # Otherwise, we have to configure cuda "advanced memory layout". data_ndims = len(self.shape) if data_ndims == 2: n_y, n_x = self.shape along_fast_dim = self.axes[0] == 1 self.cufft_shape = n_x if along_fast_dim else n_y self.cufft_batch_size = n_y if along_fast_dim else n_x if not (along_fast_dim): # Batched vertical 1D FFT on 2D data need advanced data layout # http://docs.nvidia.com/cuda/cufft/#advanced-data-layout self._cufft_plan_kwargs = { "inembed": np.int32([0]), "istride": n_x, "idist": 1, "onembed": np.int32([0]), "ostride": n_x, "odist": 1, } if data_ndims == 3: # TODO/FIXME - the following work for C2C but not R2C ?! # fast_axes = [(1, 2), (2, 1), (2,)] fast_axes = [(2,)] if self.axes not in fast_axes: raise NotImplementedError( "With the CUDA backend, batched transform on 3D data is only supported along fastest dimensions" ) self.cufft_batch_size = self.shape[0] self.cufft_shape = self.shape[1:] if len(self.axes) == 1: # 1D transform on 3D data: here only supported along fast dim, so batch_size is Nx*Ny self.cufft_batch_size = np.prod(self.shape[:2]) self.cufft_shape = (self.shape[-1],) if len(self.cufft_shape) == 1: self.cufft_shape = self.cufft_shape[0] def _configure_normalization(self, normalize): self.normalize = normalize if self.normalize == "ortho": # TODO raise NotImplementedError("Normalization mode 'ortho' is not implemented with CUDA backend yet.") self.cufft_scale_inverse = self.normalize == "rescale" def _compute_fft_plans(self): self.plan_forward = Plan( # pylint: disable = E1102 self.cufft_shape, self.dtype, self.dtype_out, batch=self.cufft_batch_size, stream=self.processing.stream, **self._cufft_plan_kwargs, # cufft extensible plan API is only supported after 0.5.1 # (commit 65288d28ca0b93e1234133f8d460dc6becb65121) # but there is still no official 0.5.2 # ~ auto_allocate=True # cufft extensible plan API ) self.plan_inverse = Plan( # pylint: disable = E1102 self.cufft_shape, # not shape_out self.dtype_out, self.dtype, batch=self.cufft_batch_size, stream=self.processing.stream, **self._cufft_plan_kwargs, # cufft extensible plan API is only supported after 0.5.1 # (commit 65288d28ca0b93e1234133f8d460dc6becb65121) # but there is still no official 0.5.2 # ~ auto_allocate=True )
[docs] def fft(self, array, output=None): if output is None: output = self.output_fft = self.processing.allocate_array( "output_fft", self.shape_out, dtype=self.dtype_out ) cu_fft(array, output, self.plan_forward, scale=False) # pylint: disable = E1102 return output
[docs] def ifft(self, array, output=None): if output is None: output = self.output_ifft = self.processing.allocate_array("output_ifft", self.shape, dtype=self.dtype) cu_ifft( # pylint: disable = E1102 array, output, self.plan_inverse, scale=self.cufft_scale_inverse, ) return output
[docs] class VKCUFFT(_BaseVKFFT): """ Cuda FFT, using VKFFT backend """ implem = "vkfft" backend = "cuda" ProcessingCls = CudaProcessing vkffs_cls = vk_cufft def _init_backend(self, backend_options): super()._init_backend(backend_options) self._vkfft_other_init_kwargs = {"stream": self.processing.stream}
def _has_vkfft(x): # should be run from within a Process try: from nabu.processing.fft_cuda import VKCUFFT, __has_vkfft__ if not __has_vkfft__: return False vk = VKCUFFT((16,), "f") avail = True except (RuntimeError, OSError): avail = False return avail
[docs] def has_vkfft(safe=True): """ Determine whether pyvkfft is available. For Cuda GPUs, vkfft relies on nvrtc which supports a narrow range of Cuda devices. Unfortunately, it's not possible to determine whether vkfft is available before creating a Cuda context. So we create a process (from scratch, i.e no fork), do the test within, and exit. This function cannot be tested from a notebook/console, a proper entry point has to be created (if __name__ == "__main__"). """ if not safe: return _has_vkfft(None) ctx = get_context("spawn") with Pool(1, context=ctx) as p: v = p.map(_has_vkfft, [1])[0] return v
def _has_skfft(x): # should be run from within a Process try: from nabu.processing.fft_cuda import SKCUFFT sk = SKCUFFT((16,), "f") avail = True except (ImportError, RuntimeError, OSError): avail = False return avail
[docs] def has_skcuda(safe=True): """ Determine whether scikit-cuda/CUFFT is available. Currently, scikit-cuda will create a Cuda context for Cublas, which can mess up the current execution. Do it in a separate thread. """ if not safe: return _has_skfft(None) ctx = get_context("spawn") with Pool(1, context=ctx) as p: v = p.map(_has_skfft, [1])[0] return v
[docs] def get_fft_class(backend="skcuda"): backends = { "scikit-cuda": SKCUFFT, "skcuda": SKCUFFT, "cufft": SKCUFFT, "scikit": SKCUFFT, "vkfft": VKCUFFT, "pyvkfft": VKCUFFT, } def check_vkfft(asked_fft_cls): if asked_fft_cls is VKCUFFT: if has_vkfft(safe=True) is False: warnings.warn("Could not get VKFFT backend. Falling-back to scikit-cuda/CUFFT instead.", RuntimeWarning) return SKCUFFT return VKCUFFT return SKCUFFT def get_fft_cls(asked_fft_backend): asked_fft_backend = asked_fft_backend.lower() check_supported(asked_fft_backend, list(backends.keys()), "FFT backend name") asked_fft_cls = backends[asked_fft_backend] fft_cls = check_vkfft(asked_fft_cls) return fft_cls asked_fft_backend_env = os.environ.get("NABU_FFT_BACKEND", "") if asked_fft_backend_env != "": return get_fft_cls(asked_fft_backend_env) return get_fft_cls(backend)
[docs] def get_available_fft_implems(): avail_implems = [] if has_skcuda(safe=True): avail_implems.append("skcuda") if has_vkfft(safe=True): avail_implems.append("vkfft") return avail_implems