Source code for silx.gui.plot.items.scatter

# /*##########################################################################
#
# Copyright (c) 2017-2024 European Synchrotron Radiation Facility
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ###########################################################################*/
"""This module provides the :class:`Scatter` item of the :class:`Plot`.
"""

__authors__ = ["T. Vincent", "P. Knobel"]
__license__ = "MIT"
__date__ = "29/03/2017"


from collections import namedtuple
import logging
import threading
import numpy
from matplotlib.tri import LinearTriInterpolator, Triangulation

from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, CancelledError

from ....utils.proxy import docstring
from ....math.combo import min_max
from ....math.histogram import Histogramnd
from ....utils.weakref import WeakList
from .core import PointsBase, ColormapMixIn, ScatterVisualizationMixIn
from .axis import Axis
from ._pick import PickingResult
from silx._utils import NP_OPTIONAL_COPY


_logger = logging.getLogger(__name__)


class _GreedyThreadPoolExecutor(ThreadPoolExecutor):
    """:class:`ThreadPoolExecutor` with an extra :meth:`submit_greedy` method."""

    def __init__(self, *args, **kwargs):
        super(_GreedyThreadPoolExecutor, self).__init__(*args, **kwargs)
        self.__futures = defaultdict(WeakList)
        self.__lock = threading.RLock()

    def submit_greedy(self, queue, fn, *args, **kwargs):
        """Same as :meth:`submit` but cancel previous tasks in given queue.

        This means that when a new task is submitted for a given queue,
        all other pending tasks of that queue are cancelled.

        :param queue: Identifier of the queue. This must be hashable.
        :param callable fn: The callable to call with provided extra arguments
        :return: Future corresponding to this task
        :rtype: concurrent.futures.Future
        """
        with self.__lock:
            # Cancel previous tasks in given queue
            for future in self.__futures.pop(queue, []):
                if not future.done():
                    future.cancel()

            future = super(_GreedyThreadPoolExecutor, self).submit(fn, *args, **kwargs)
            self.__futures[queue].append(future)

        return future


# Functions to guess grid shape from coordinates


def _get_z_line_length(array):
    """Return length of line if array is a Z-like 2D regular grid.

    :param numpy.ndarray array: The 1D array of coordinates to check
    :return: 0 if no line length could be found,
        else the number of element per line.
    :rtype: int
    """
    sign = numpy.sign(numpy.diff(array))
    if len(sign) == 0 or sign[0] == 0:  # We don't handle that
        return 0
    # Check this way to account for 0 sign (i.e., diff == 0)
    beginnings = numpy.where(sign == -sign[0])[0] + 1
    if len(beginnings) == 0:
        return 0
    length = beginnings[0]
    if numpy.all(numpy.equal(numpy.diff(beginnings), length)):
        return length
    return 0


def _guess_z_grid_shape(x, y):
    """Guess the shape of a grid from (x, y) coordinates.

    The grid might contain more elements than x and y,
    as the last line might be partly filled.

    :param numpy.ndarray x:
    :paran numpy.ndarray y:
    :returns: (order, (height, width)) of the regular grid,
        or None if could not guess one.
        'order' is 'row' if X (i.e., column) is the fast dimension, else 'column'.
    :rtype: Union[List(str,int),None]
    """
    width = _get_z_line_length(x)
    if width != 0:
        return "row", (int(numpy.ceil(len(x) / width)), width)
    else:
        height = _get_z_line_length(y)
        if height != 0:
            return "column", (height, int(numpy.ceil(len(y) / height)))
    return None


def is_monotonic(array):
    """Returns whether array is monotonic (increasing or decreasing).

    :param numpy.ndarray array: 1D array-like container.
    :returns: 1 if array is monotonically increasing,
       -1 if array is monotonically decreasing,
       0 if array is not monotonic
    :rtype: int
    """
    diff = numpy.diff(numpy.ravel(array))
    with numpy.errstate(invalid="ignore"):
        if numpy.all(diff >= 0):
            return 1
        elif numpy.all(diff <= 0):
            return -1
        else:
            return 0


def _guess_grid(x, y):
    """Guess a regular grid from the points.

    Result convention is (x, y)

    :param numpy.ndarray x: X coordinates of the points
    :param numpy.ndarray y: Y coordinates of the points
    :returns: (order, (height, width)
        order is 'row' or 'column'
    :rtype: Union[List[str,List[int]],None]
    """
    x, y = numpy.ravel(x), numpy.ravel(y)

    guess = _guess_z_grid_shape(x, y)
    if guess is not None:
        return guess

    else:
        # Cannot guess a regular grid
        # Let's assume it's a single line
        order = "row"  # or 'column' doesn't matter for a single line
        y_monotonic = is_monotonic(y)
        if is_monotonic(x) or y_monotonic:  # we can guess a line
            x_min, x_max = min_max(x)
            y_min, y_max = min_max(y)

            if not y_monotonic or x_max - x_min >= y_max - y_min:
                # x only is monotonic or both are and X varies more
                # line along X
                shape = 1, len(x)
            else:
                # y only is monotonic or both are and Y varies more
                # line along Y
                shape = len(y), 1

        else:  # Cannot guess a line from the points
            return None

    return order, shape


def _quadrilateral_grid_coords(points):
    """Compute an irregular grid of quadrilaterals from a set of points

    The input points are expected to lie on a grid.

    :param numpy.ndarray points:
       3D data set of 2D input coordinates (height, width, 2)
       height and width must be at least 2.
    :return: 3D dataset of 2D coordinates of the grid (height+1, width+1, 2)
    """
    assert points.ndim == 3
    assert points.shape[0] >= 2
    assert points.shape[1] >= 2
    assert points.shape[2] == 2

    dim0, dim1 = points.shape[:2]
    grid_points = numpy.zeros((dim0 + 1, dim1 + 1, 2), dtype=numpy.float64)

    # Compute inner points as mean of 4 neighbours
    neighbour_view = numpy.lib.stride_tricks.as_strided(
        points,
        shape=(dim0 - 1, dim1 - 1, 2, 2, points.shape[2]),
        strides=points.strides[:2] + points.strides[:2] + points.strides[-1:],
        writeable=False,
    )
    inner_points = numpy.mean(neighbour_view, axis=(2, 3))
    grid_points[1:-1, 1:-1] = inner_points

    # Compute 'vertical' sides
    # Alternative: grid_points[1:-1, [0, -1]] = points[:-1, [0, -1]] + points[1:, [0, -1]] - inner_points[:, [0, -1]]
    grid_points[1:-1, [0, -1], 0] = (
        points[:-1, [0, -1], 0] + points[1:, [0, -1], 0] - inner_points[:, [0, -1], 0]
    )
    grid_points[1:-1, [0, -1], 1] = inner_points[:, [0, -1], 1]

    # Compute 'horizontal' sides
    grid_points[[0, -1], 1:-1, 0] = inner_points[[0, -1], :, 0]
    grid_points[[0, -1], 1:-1, 1] = (
        points[[0, -1], :-1, 1] + points[[0, -1], 1:, 1] - inner_points[[0, -1], :, 1]
    )

    # Compute corners
    d0, d1 = [0, 0, -1, -1], [0, -1, -1, 0]
    grid_points[d0, d1] = 2 * points[d0, d1] - inner_points[d0, d1]
    return grid_points


def _quadrilateral_grid_as_triangles(points):
    """Returns the points and indices to make a grid of quadrilaterals

    :param numpy.ndarray points:
        3D array of points (height, width, 2)
    :return: triangle corners (4 * N, 2), triangle indices (2 * N, 3)
        With N = height * width, the number of input points
    """
    nbpoints = numpy.prod(points.shape[:2])

    grid = _quadrilateral_grid_coords(points)
    coords = numpy.empty((4 * nbpoints, 2), dtype=grid.dtype)
    coords[::4] = grid[:-1, :-1].reshape(-1, 2)
    coords[1::4] = grid[1:, :-1].reshape(-1, 2)
    coords[2::4] = grid[:-1, 1:].reshape(-1, 2)
    coords[3::4] = grid[1:, 1:].reshape(-1, 2)

    indices = numpy.empty((2 * nbpoints, 3), dtype=numpy.uint32)
    indices[::2, 0] = numpy.arange(0, 4 * nbpoints, 4)
    indices[::2, 1] = numpy.arange(1, 4 * nbpoints, 4)
    indices[::2, 2] = numpy.arange(2, 4 * nbpoints, 4)
    indices[1::2, 0] = indices[::2, 1]
    indices[1::2, 1] = indices[::2, 2]
    indices[1::2, 2] = numpy.arange(3, 4 * nbpoints, 4)

    return coords, indices


_RegularGridInfo = namedtuple(
    "_RegularGridInfo", ["bounds", "origin", "scale", "shape", "order"]
)


_HistogramInfo = namedtuple(
    "_HistogramInfo", ["mean", "count", "sum", "origin", "scale", "shape"]
)


[docs] class Scatter(PointsBase, ColormapMixIn, ScatterVisualizationMixIn): """Description of a scatter""" _DEFAULT_SELECTABLE = True """Default selectable state for scatter plots""" _SUPPORTED_SCATTER_VISUALIZATION = ( ScatterVisualizationMixIn.Visualization.POINTS, ScatterVisualizationMixIn.Visualization.SOLID, ScatterVisualizationMixIn.Visualization.REGULAR_GRID, ScatterVisualizationMixIn.Visualization.IRREGULAR_GRID, ScatterVisualizationMixIn.Visualization.BINNED_STATISTIC, ) """Overrides supported Visualizations""" def __init__(self): PointsBase.__init__(self) ColormapMixIn.__init__(self) ScatterVisualizationMixIn.__init__(self) self._value = () self.__alpha = None # Cache Delaunay triangulation future object self.__triangulationFuture = None # Cache interpolator future object self.__interpolatorFuture = None self.__executor = None # Cache triangles: x, y, indices self.__cacheTriangles = None, None, None # Cache regular grid and histogram info self.__cacheRegularGridInfo = None self.__cacheHistogramInfo = None def _updateColormappedData(self): """Update the colormapped data, to be called when changed""" if self.getVisualization() is self.Visualization.BINNED_STATISTIC: histoInfo = self.__getHistogramInfo() if histoInfo is None: data = None else: data = getattr( histoInfo, self.getVisualizationParameter( self.VisualizationParameter.BINNED_STATISTIC_FUNCTION ), ) else: data = self.getValueData(copy=False) self._setColormappedData(data, copy=False) @docstring(ScatterVisualizationMixIn) def setVisualization(self, mode): previous = self.getVisualization() if super().setVisualization(mode): if bool(mode is self.Visualization.BINNED_STATISTIC) ^ bool( previous is self.Visualization.BINNED_STATISTIC ): self._updateColormappedData() return True else: return False @docstring(ScatterVisualizationMixIn) def setVisualizationParameter(self, parameter, value): parameter = self.VisualizationParameter.from_value(parameter) if super(Scatter, self).setVisualizationParameter(parameter, value): if parameter in ( self.VisualizationParameter.GRID_BOUNDS, self.VisualizationParameter.GRID_MAJOR_ORDER, self.VisualizationParameter.GRID_SHAPE, ): self.__cacheRegularGridInfo = None if parameter in ( self.VisualizationParameter.BINNED_STATISTIC_SHAPE, self.VisualizationParameter.BINNED_STATISTIC_FUNCTION, self.VisualizationParameter.DATA_BOUNDS_HINT, ): if parameter in ( self.VisualizationParameter.BINNED_STATISTIC_SHAPE, self.VisualizationParameter.DATA_BOUNDS_HINT, ): self.__cacheHistogramInfo = None # Clean-up cache if self.getVisualization() is self.Visualization.BINNED_STATISTIC: self._updateColormappedData() return True else: return False @docstring(ScatterVisualizationMixIn) def getCurrentVisualizationParameter(self, parameter): value = self.getVisualizationParameter(parameter) if ( parameter is self.VisualizationParameter.DATA_BOUNDS_HINT or value is not None ): return value # Value has been set, return it elif parameter is self.VisualizationParameter.GRID_BOUNDS: grid = self.__getRegularGridInfo() return None if grid is None else grid.bounds elif parameter is self.VisualizationParameter.GRID_MAJOR_ORDER: grid = self.__getRegularGridInfo() return None if grid is None else grid.order elif parameter is self.VisualizationParameter.GRID_SHAPE: grid = self.__getRegularGridInfo() return None if grid is None else grid.shape elif parameter is self.VisualizationParameter.BINNED_STATISTIC_SHAPE: info = self.__getHistogramInfo() return None if info is None else info.shape else: raise NotImplementedError() def __getRegularGridInfo(self): """Get grid info""" if self.__cacheRegularGridInfo is None: shape = self.getVisualizationParameter( self.VisualizationParameter.GRID_SHAPE ) order = self.getVisualizationParameter( self.VisualizationParameter.GRID_MAJOR_ORDER ) if shape is None or order is None: guess = _guess_grid( self.getXData(copy=False), self.getYData(copy=False) ) if guess is None: _logger.warning( "Cannot guess a grid: Cannot display as regular grid image" ) return None if shape is None: shape = guess[1] if order is None: order = guess[0] nbpoints = len(self.getXData(copy=False)) if nbpoints > shape[0] * shape[1]: # More data points that provided grid shape: enlarge grid _logger.warning( "More data points than provided grid shape size: extends grid" ) dim0, dim1 = shape if order == "row": # keep dim1, enlarge dim0 dim0 = nbpoints // dim1 + (1 if nbpoints % dim1 else 0) else: # keep dim0, enlarge dim1 dim1 = nbpoints // dim0 + (1 if nbpoints % dim0 else 0) shape = dim0, dim1 bounds = self.getVisualizationParameter( self.VisualizationParameter.GRID_BOUNDS ) if bounds is None: x, y = self.getXData(copy=False), self.getYData(copy=False) min_, max_ = min_max(x) xRange = (min_, max_) if (x[0] - min_) < (max_ - x[0]) else (max_, min_) min_, max_ = min_max(y) yRange = (min_, max_) if (y[0] - min_) < (max_ - y[0]) else (max_, min_) bounds = (xRange[0], yRange[0]), (xRange[1], yRange[1]) begin, end = bounds scale = ( (end[0] - begin[0]) / max(1, shape[1] - 1), (end[1] - begin[1]) / max(1, shape[0] - 1), ) if scale[0] == 0 and scale[1] == 0: scale = 1.0, 1.0 elif scale[0] == 0: scale = scale[1], scale[1] elif scale[1] == 0: scale = scale[0], scale[0] origin = begin[0] - 0.5 * scale[0], begin[1] - 0.5 * scale[1] self.__cacheRegularGridInfo = _RegularGridInfo( bounds=bounds, origin=origin, scale=scale, shape=shape, order=order ) return self.__cacheRegularGridInfo def __getHistogramInfo(self): """Get histogram info""" if self.__cacheHistogramInfo is None: shape = self.getVisualizationParameter( self.VisualizationParameter.BINNED_STATISTIC_SHAPE ) if shape is None: shape = 100, 100 # TODO compute auto shape x, y, values = self.getData(copy=False)[:3] if len(x) == 0: # No histogram return None if not numpy.issubdtype(x.dtype, numpy.floating): x = x.astype(numpy.float64) if not numpy.issubdtype(y.dtype, numpy.floating): y = y.astype(numpy.float64) if not numpy.issubdtype(values.dtype, numpy.floating): values = values.astype(numpy.float64) ranges = (tuple(min_max(y, finite=True)), tuple(min_max(x, finite=True))) rangesHint = self.getVisualizationParameter( self.VisualizationParameter.DATA_BOUNDS_HINT ) if rangesHint is not None: ranges = tuple( (min(dataMin, hintMin), max(dataMax, hintMax)) for (dataMin, dataMax), (hintMin, hintMax) in zip( ranges, rangesHint ) ) points = numpy.transpose(numpy.array((y, x))) counts, sums, bin_edges = Histogramnd( points, histo_range=ranges, n_bins=shape, weights=values ) yEdges, xEdges = bin_edges origin = xEdges[0], yEdges[0] scale = ( (xEdges[-1] - xEdges[0]) / (len(xEdges) - 1), (yEdges[-1] - yEdges[0]) / (len(yEdges) - 1), ) with numpy.errstate(divide="ignore", invalid="ignore"): histo = sums / counts self.__cacheHistogramInfo = _HistogramInfo( mean=histo, count=counts, sum=sums, origin=origin, scale=scale, shape=shape, ) return self.__cacheHistogramInfo def __applyColormapToData(self): """Compute colors by applying colormap to values. :returns: Array of RGBA colors """ cmap = self.getColormap() rgbacolors = cmap.applyToData(self) if self.__alpha is not None: rgbacolors[:, -1] = (rgbacolors[:, -1] * self.__alpha).astype(numpy.uint8) return rgbacolors def _addBackendRenderer(self, backend): """Update backend renderer""" # Filter-out values <= 0 xFiltered, yFiltered, valueFiltered, xerror, yerror = self.getData( copy=False, displayed=True ) # Remove not finite numbers (this includes filtered out x, y <= 0) mask = numpy.logical_and(numpy.isfinite(xFiltered), numpy.isfinite(yFiltered)) xFiltered = xFiltered[mask] yFiltered = yFiltered[mask] if len(xFiltered) == 0: return None # No data to display, do not add renderer to backend visualization = self.getVisualization() if visualization is self.Visualization.BINNED_STATISTIC: plot = self.getPlot() if ( plot is None or plot.getXAxis().getScale() != Axis.LINEAR or plot.getYAxis().getScale() != Axis.LINEAR ): # Those visualizations are not available with log scaled axes return None histoInfo = self.__getHistogramInfo() if histoInfo is None: return None data = getattr( histoInfo, self.getVisualizationParameter( self.VisualizationParameter.BINNED_STATISTIC_FUNCTION ), ) return backend.addImage( data=data, origin=histoInfo.origin, scale=histoInfo.scale, colormap=self.getColormap(), alpha=self.getAlpha(), ) elif visualization is self.Visualization.POINTS: rgbacolors = self.__applyColormapToData() return backend.addCurve( xFiltered, yFiltered, color=rgbacolors[mask], gapcolor=None, symbol=self.getSymbol(), linewidth=0, linestyle="", yaxis="left", xerror=xerror, yerror=yerror, fill=False, alpha=self.getAlpha(), symbolsize=self.getSymbolSize(), baseline=None, ) else: plot = self.getPlot() if ( plot is None or plot.getXAxis().getScale() != Axis.LINEAR or plot.getYAxis().getScale() != Axis.LINEAR ): # Those visualizations are not available with log scaled axes return None if visualization is self.Visualization.SOLID: try: triangulation = self._getTriangulationFuture().result() except (RuntimeError, ValueError): _logger.warning( "Cannot get a triangulation: Cannot display as solid surface" ) return None else: rgbacolors = self.__applyColormapToData() triangles = triangulation.triangles.astype(numpy.int32) return backend.addTriangles( xFiltered, yFiltered, triangles, color=rgbacolors[mask], alpha=self.getAlpha(), ) elif visualization is self.Visualization.REGULAR_GRID: gridInfo = self.__getRegularGridInfo() if gridInfo is None: return None dim0, dim1 = gridInfo.shape if gridInfo.order == "column": # transposition needed dim0, dim1 = dim1, dim0 values = self.getValueData(copy=False) if self.__alpha is None and len(values) == dim0 * dim1: image = values.reshape(dim0, dim1) else: # The points do not fill the whole image if self.__alpha is None and numpy.issubdtype( values.dtype, numpy.floating ): image = numpy.empty(dim0 * dim1, dtype=values.dtype) image[: len(values)] = values image[len(values) :] = float("nan") # Transparent pixels image.shape = dim0, dim1 else: # Per value alpha or no NaN, so convert to RGBA rgbacolors = self.__applyColormapToData() image = numpy.empty((dim0 * dim1, 4), dtype=numpy.uint8) image[: len(rgbacolors)] = rgbacolors image[len(rgbacolors) :] = (0, 0, 0, 0) # Transparent pixels image.shape = dim0, dim1, 4 if gridInfo.order == "column": if image.ndim == 2: image = numpy.transpose(image) else: image = numpy.transpose(image, axes=(1, 0, 2)) if image.ndim == 2: colormap = self.getColormap() if colormap.isAutoscale(): # Avoid backend to compute autoscale: use item cache colormap = colormap.copy() colormap.setVRange(*colormap.getColormapRange(self)) else: colormap = None return backend.addImage( data=image, origin=gridInfo.origin, scale=gridInfo.scale, colormap=colormap, alpha=self.getAlpha(), ) elif visualization is self.Visualization.IRREGULAR_GRID: gridInfo = self.__getRegularGridInfo() if gridInfo is None: return None shape = gridInfo.shape if shape is None: # No shape, no display return None rgbacolors = self.__applyColormapToData() nbpoints = len(xFiltered) if nbpoints == 1: # single point, render as a square points return backend.addCurve( xFiltered, yFiltered, color=rgbacolors[mask], gapcolor=None, symbol="s", linewidth=0, linestyle="", yaxis="left", xerror=None, yerror=None, fill=False, alpha=self.getAlpha(), symbolsize=7, baseline=None, ) # Make shape include all points gridOrder = gridInfo.order if nbpoints != numpy.prod(shape): if gridOrder == "row": shape = int(numpy.ceil(nbpoints / shape[1])), shape[1] else: # column-major order shape = shape[0], int(numpy.ceil(nbpoints / shape[0])) if shape[0] < 2 or shape[1] < 2: # Single line, at least 2 points points = numpy.zeros((2, nbpoints, 3), dtype=numpy.float64) # Use row/column major depending on shape, not on info value gridOrder = "row" if shape[0] == 1 else "column" if gridOrder == "row": points[0, :, 0] = xFiltered points[0, :, 1] = yFiltered else: # column-major order points[0, :, 0] = yFiltered points[0, :, 1] = xFiltered # Add a second line that will be clipped in the end points[1, :-1] = ( points[0, :-1] + numpy.cross(points[0, 1:] - points[0, :-1], (0.0, 0.0, 1.0)) ) points[1, -1] = ( points[0, -1] + numpy.cross(points[0, -1] - points[0, -2], (0.0, 0.0, 1.0)) ) points = points[:, :, :2] elif gridOrder == "row": # row-major order if nbpoints != numpy.prod(shape): points = numpy.empty( (numpy.prod(shape), 2), dtype=numpy.float64 ) points[:nbpoints, 0] = xFiltered points[:nbpoints, 1] = yFiltered # Index of last element of last fully filled row index = (nbpoints // shape[1]) * shape[1] points[nbpoints:, 0] = xFiltered[ index - (numpy.prod(shape) - nbpoints) : index ] points[nbpoints:, 1] = yFiltered[-1] else: points = numpy.transpose((xFiltered, yFiltered)) points.shape = shape[0], shape[1], 2 else: # column-major order if nbpoints != numpy.prod(shape): points = numpy.empty( (numpy.prod(shape), 2), dtype=numpy.float64 ) points[:nbpoints, 0] = yFiltered points[:nbpoints, 1] = xFiltered # Index of last element of last fully filled column index = (nbpoints // shape[0]) * shape[0] points[nbpoints:, 0] = yFiltered[ index - (numpy.prod(shape) - nbpoints) : index ] points[nbpoints:, 1] = xFiltered[-1] else: points = numpy.transpose((yFiltered, xFiltered)) points.shape = shape[1], shape[0], 2 coords, indices = _quadrilateral_grid_as_triangles(points) # Remove unused extra triangles coords = coords[: 4 * nbpoints] indices = indices[: 2 * nbpoints] if gridOrder == "row": x, y = coords[:, 0], coords[:, 1] else: # column-major order y, x = coords[:, 0], coords[:, 1] rgbacolors = rgbacolors[mask] # Filter-out not finite points gridcolors = numpy.empty( (4 * nbpoints, rgbacolors.shape[-1]), dtype=rgbacolors.dtype ) for first in range(4): gridcolors[first::4] = rgbacolors[:nbpoints] return backend.addTriangles( x, y, indices, color=gridcolors, alpha=self.getAlpha() ) else: _logger.error("Unhandled visualization %s", visualization) return None @docstring(PointsBase) def pick(self, x, y): result = super(Scatter, self).pick(x, y) if result is not None: visualization = self.getVisualization() if visualization is self.Visualization.IRREGULAR_GRID: # Specific handling of picking for the irregular grid mode index = result.getIndices(copy=False)[0] // 4 result = PickingResult(self, (index,)) elif visualization is self.Visualization.REGULAR_GRID: # Specific handling of picking for the regular grid mode picked = result.getIndices(copy=False) if picked is None: return None row, column = picked[0][0], picked[1][0] gridInfo = self.__getRegularGridInfo() if gridInfo is None: return None if gridInfo.order == "row": index = row * gridInfo.shape[1] + column else: index = row + column * gridInfo.shape[0] if index >= len( self.getXData(copy=False) ): # OK as long as not log scale return None # Image can be larger than scatter result = PickingResult(self, (index,)) elif visualization is self.Visualization.BINNED_STATISTIC: picked = result.getIndices(copy=False) if picked is None or len(picked) == 0 or len(picked[0]) == 0: return None row, col = picked[0][0], picked[1][0] histoInfo = self.__getHistogramInfo() if histoInfo is None: return None sx, sy = histoInfo.scale ox, oy = histoInfo.origin xdata = self.getXData(copy=False) ydata = self.getYData(copy=False) indices = numpy.nonzero( numpy.logical_and( numpy.logical_and( xdata >= ox + sx * col, xdata < ox + sx * (col + 1) ), numpy.logical_and( ydata >= oy + sy * row, ydata < oy + sy * (row + 1) ), ) )[0] result = None if len(indices) == 0 else PickingResult(self, indices) return result def __getExecutor(self): """Returns async greedy executor :rtype: _GreedyThreadPoolExecutor """ if self.__executor is None: self.__executor = _GreedyThreadPoolExecutor(max_workers=2) return self.__executor def _getTriangulationFuture(self): """Returns a :class:`Future` which result is the Triangulation object. :rtype: concurrent.futures.Future """ if self.__triangulationFuture is None or self.__triangulationFuture.cancelled(): # Need to init a new delaunay x, y = self.getData(copy=False)[:2] # Remove not finite points mask = numpy.logical_and(numpy.isfinite(x), numpy.isfinite(y)) self.__triangulationFuture = self.__getExecutor().submit_greedy( "Triangulation", Triangulation, x[mask], y[mask] ) return self.__triangulationFuture @staticmethod def __initInterpolator(triangulationFuture, values): """Returns an interpolator for the given data points :param concurrent.futures.Future triangulationFuture: Future object which result is a Triangulation object :param numpy.ndarray values: The data value of valid points. :rtype: Union[callable,None] """ # Wait for Triangulation to complete try: triangulation = triangulationFuture.result() except (RuntimeError, ValueError): return None # triangulation failed except CancelledError: return None return LinearTriInterpolator(triangulation, values) def _getInterpolatorFuture(self): """Returns a :class:`Future` which result is the interpolator. The interpolator is a callable taking an array Nx2 of points as a single argument. The :class:`Future` result is None in case the interpolator cannot be initialized. :rtype: concurrent.futures.Future """ if self.__interpolatorFuture is None or self.__interpolatorFuture.cancelled(): # Need to init a new interpolator x, y, values = self.getData(copy=False)[:3] # Remove not finite points mask = numpy.logical_and(numpy.isfinite(x), numpy.isfinite(y)) x, y, values = x[mask], y[mask], values[mask] self.__interpolatorFuture = self.__getExecutor().submit_greedy( "interpolator", self.__initInterpolator, self._getTriangulationFuture(), values, ) return self.__interpolatorFuture def _filterData(self, xPositive: bool, yPositive: bool): """Filter out errors<0 and values with x or y <= 0 on log axes :param bool xPositive: True to filter arrays according to X coords. :param bool yPositive: True to filter arrays according to Y coords. :return: The filtered arrays or unchanged object if not filtering needed :rtype: (x, y, value, xerror, yerror) """ # overloaded from PointsBase to filter also value. value = self.getValueData(copy=False) if xPositive or yPositive: clipped = self._getClippingBoolArray(xPositive, yPositive) if numpy.any(clipped): # copy to keep original array and convert to float value = numpy.array(value, copy=True, dtype=numpy.float64) value[clipped] = numpy.nan x, y, xerror, yerror = PointsBase._filterData(self, xPositive, yPositive) return x, y, value, xerror, yerror
[docs] def getValueData(self, copy=True): """Returns the value assigned to the scatter data points. :param copy: True (Default) to get a copy, False to use internal representation (do not modify!) :rtype: numpy.ndarray """ return numpy.array(self._value, copy=copy or NP_OPTIONAL_COPY)
def getAlphaData(self, copy=True): """Returns the alpha (transparency) assigned to the scatter data points. :param copy: True (Default) to get a copy, False to use internal representation (do not modify!) :rtype: numpy.ndarray """ return numpy.array(self.__alpha, copy=copy or NP_OPTIONAL_COPY)
[docs] def getData(self, copy=True, displayed=False): """Returns the x, y coordinates and the value of the data points :param copy: True (Default) to get a copy, False to use internal representation (do not modify!) :param bool displayed: True to only get curve points that are displayed in the plot. Default: False. Note: If plot has log scale, negative points are not displayed. :returns: (x, y, value, xerror, yerror) :rtype: 5-tuple of numpy.ndarray """ if displayed: data = self._getCachedData() if data is not None: assert len(data) == 5 return data return ( self.getXData(copy), self.getYData(copy), self.getValueData(copy), self.getXErrorData(copy), self.getYErrorData(copy), )
# reimplemented from PointsBase to handle `value`
[docs] def setData(self, x, y, value, xerror=None, yerror=None, alpha=None, copy=True): """Set the data of the scatter. :param numpy.ndarray x: The data corresponding to the x coordinates. :param numpy.ndarray y: The data corresponding to the y coordinates. :param numpy.ndarray value: The data corresponding to the value of the data points. :param xerror: Values with the uncertainties on the x values :type xerror: A float, or a numpy.ndarray of float32. If it is an array, it can either be a 1D array of same length as the data or a 2D array with 2 rows of same length as the data: row 0 for lower errors, row 1 for upper errors. :param yerror: Values with the uncertainties on the y values :type yerror: A float, or a numpy.ndarray of float32. See xerror. :param alpha: Values with the transparency (between 0 and 1) :type alpha: A float, or a numpy.ndarray of float32 :param bool copy: True make a copy of the data (default), False to use provided arrays. """ value = numpy.array(value, copy=copy or NP_OPTIONAL_COPY) assert value.ndim == 1 assert len(x) == len(value) # Convert complex data if numpy.iscomplexobj(value): _logger.warning("Converting value data to absolute value to plot it.") value = numpy.absolute(value) # Reset triangulation and interpolator if self.__triangulationFuture is not None: self.__triangulationFuture.cancel() self.__triangulationFuture = None if self.__interpolatorFuture is not None: self.__interpolatorFuture.cancel() self.__interpolatorFuture = None # Data changed, this needs update self.__cacheRegularGridInfo = None self.__cacheHistogramInfo = None self._value = value if alpha is not None: # Make sure alpha is an array of float in [0, 1] alpha = numpy.array(alpha, copy=copy or NP_OPTIONAL_COPY) assert alpha.ndim == 1 assert len(x) == len(alpha) if alpha.dtype.kind != "f": alpha = alpha.astype(numpy.float32) if numpy.any(numpy.logical_or(alpha < 0.0, alpha > 1.0)): alpha = numpy.clip(alpha, 0.0, 1.0) self.__alpha = alpha # set x, y, xerror, yerror # call self._updated + plot._invalidateDataRange() PointsBase.setData(self, x, y, xerror, yerror, copy) self._updateColormappedData()