#!/usr/bin/env python
#
# Project: Sift implementation in Python + OpenCL
# https://github.com/silx-kit/silx
#
# Copyright (C) 2013-2024 European Synchrotron Radiation Facility, Grenoble, France
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
"""
Contains classes for image alignment on a reference images.
"""
__authors__ = ["Jérôme Kieffer", "Pierre Paleo"]
__contact__ = "jerome.kieffer@esrf.eu"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
__date__ = "06/05/2020"
__status__ = "production"
import os
import gc
from threading import Semaphore
import numpy
from ..common import ocl, pyopencl, kernel_workgroup_size
from ..processing import OpenclProcessing
from ..utils import calc_size, get_opencl_code
from .utils import matching_correction
import logging
logger = logging.getLogger(__name__)
from .match import MatchPlan
from .plan import SiftPlan
try:
import feature
except ImportError:
feature = None
def arrow_start(kplist):
# x_ref = kplist.x
# y_ref = kplist.y
angle_ref = kplist.angle
scale_ref = kplist.scale
x_ref2 = kplist.x + scale_ref * numpy.cos(angle_ref)
y_ref2 = kplist.y + scale_ref * numpy.sin(angle_ref)
return x_ref2, y_ref2
def transform_pts(matrix, offset, x, y):
nx = -offset[1] + y * matrix[1, 0] + x * matrix[1, 1]
ny = -offset[0] + x * matrix[0, 1] + y * matrix[0, 0]
return nx, ny
[docs]
class LinearAlign(OpenclProcessing):
"""Align images on a reference image based on an afine transformation (bi-linear + offset)"""
kernel_files = {"transform": 128}
def __init__(
self,
image,
mask=None,
extra=0,
init_sigma=None,
ctx=None,
devicetype="all",
platformid=None,
deviceid=None,
block_size=None,
profile=False,
):
"""
Constructor of the class
:param image: reference image on which other image should be aligned
:param mask: masked out region of the image
:param devicetype: Kind of preferred devce
:param profile:collect profiling information ?
:param device: 2-tuple of integer. see clinfo
:param max_workgroup_size: limit the workgroup size
:param ROI: Region of interest: to be implemented
:param extra: extra space around the image, can be an integer, or a 2 tuple in YX convention: TODO!
:param init_sigma: blurring width, you should have good reasons to modify the 1.6 default value...
"""
OpenclProcessing.__init__(
self,
ctx=ctx,
devicetype=devicetype,
platformid=platformid,
deviceid=deviceid,
block_size=block_size,
profile=profile,
)
self.ref = numpy.ascontiguousarray(image)
self.cl_mem = {}
self.shape = image.shape
if len(self.shape) == 3:
self.RGB = True
self.shape = self.shape[:2]
elif len(self.shape) == 2:
self.RGB = False
else:
raise RuntimeError(
"Unable to process image of shape %s"
% (
tuple(
self.shape,
)
)
)
if "__len__" not in dir(extra):
self.extra = (int(extra), int(extra))
else:
self.extra = extra[:2]
self.outshape = tuple(i + 2 * j for i, j in zip(self.shape, self.extra))
self.mask = mask
self.sift = SiftPlan(
template=image,
ctx=self.ctx,
profile=self.profile,
block_size=self.block_size,
init_sigma=init_sigma,
)
self.ref_kp = self.sift.keypoints(image)
# TODO: move to SIFT
if self.mask is not None:
kpx = numpy.round(self.ref_kp.x).astype(numpy.int32)
kpy = numpy.round(self.ref_kp.y).astype(numpy.int32)
masked = self.mask[(kpy, kpx)].astype(bool)
logger.warning(
"Reducing keypoint list from %i to %i because of the ROI"
% (self.ref_kp.size, masked.sum())
)
self.ref_kp = self.ref_kp[masked]
self.match = MatchPlan(
ctx=self.ctx, profile=self.profile, block_size=self.block_size
)
# Allocate reference keypoints on the GPU within match context:
self.cl_mem["ref_kp_gpu"] = pyopencl.array.to_device(
self.match.queue, self.ref_kp
)
# TODO optimize match so that the keypoint2 can be optional
self.fill_value = 0
self.wg = {}
self.compile_kernels()
self._allocate_buffers()
self.sem = Semaphore()
self.relative_transfo = None
def __del__(self):
"""
Destructor: release all buffers
"""
self._free_kernels()
self._free_buffers()
self.queue = None
self.ctx = None
gc.collect()
def _allocate_buffers(self):
"""
All buffers are allocated here
"""
if self.RGB:
self.cl_mem["input"] = pyopencl.array.empty(
self.queue, shape=self.shape + (3,), dtype=numpy.uint8
)
self.cl_mem["output"] = pyopencl.array.empty(
self.queue, shape=self.outshape + (3,), dtype=numpy.uint8
)
else:
self.cl_mem["input"] = pyopencl.array.empty(
self.queue, shape=self.shape, dtype=numpy.float32
)
self.cl_mem["output"] = pyopencl.array.empty(
self.queue, shape=self.outshape, dtype=numpy.float32
)
self.cl_mem["matrix"] = pyopencl.array.empty(
self.queue, shape=(2, 2), dtype=numpy.float32
)
self.cl_mem["offset"] = pyopencl.array.empty(
self.queue, shape=(1, 2), dtype=numpy.float32
)
def _free_buffers(self):
"""
free all memory allocated on the device
"""
for buffer_name in list(self.cl_mem.keys()):
if self.cl_mem[buffer_name] is not None:
try:
buffer = self.cl_mem.pop(buffer_name)
del buffer
except pyopencl.LogicError:
logger.error("Error while freeing buffer %s" % buffer_name)
[docs]
def compile_kernels(self):
"""
Call the OpenCL compiler
"""
kernel_src = [os.path.join("sift", kernel) for kernel in self.kernel_files]
OpenclProcessing.compile_kernels(self, kernel_src)
bs = min(
self.kernel_files["transform"],
min(self.check_workgroup_size(kn) for kn in self.kernels.get_kernels()),
)
if self.block_size is not None:
bs = min(self.block_size, bs)
device = self.ctx.devices[0]
if bs > 32:
wgm = (32, bs // 32)
if bs >= 256:
wgr = (4, bs // 32, 8)
else:
wgr = (4, 8, bs // 32)
elif bs > 4:
wgm = (bs // 4, 4)
wgr = (4, bs // 4, 1)
else:
wgm = (bs, 1)
wgr = (bs, 1, 1)
size_per_dim = device.max_work_item_sizes
self.wg["transform_RGB"] = tuple(min(i, j) for i, j in zip(wgr, size_per_dim))
self.wg["transform"] = tuple(min(i, j) for i, j in zip(wgm, size_per_dim))
def _free_kernels(self):
"""
free all kernels
"""
self.program = None
[docs]
def align(
self,
img,
shift_only=False,
return_all=False,
double_check=False,
relative=False,
orsa=False,
):
"""
Align image on reference image
:param img: numpy array containing the image to align to reference
:param return_all: return in addition ot the image, keypoints, matching keypoints, and transformations as a dict
:param relative: update reference keypoints with those from current image to perform relative alignment
:return: aligned image, or all informations, or None if no matching keypoints
"""
logger.debug("ref_keypoints: %s" % self.ref_kp.size)
if self.RGB:
data = numpy.ascontiguousarray(img, numpy.uint8)
else:
data = numpy.ascontiguousarray(img, numpy.float32)
with self.sem:
cpy = pyopencl.enqueue_copy(self.queue, self.cl_mem["input"].data, data)
if self.profile:
self.events.append(("Copy H->D", cpy))
cpy.wait()
kp = self.sift.keypoints(self.cl_mem["input"])
# print("ref %s img %s" % (self.cl_mem["ref_kp_gpu"].shape, kp.shape))
logger.debug("mod image keypoints: %s" % kp.size)
raw_matching = self.match.match(
self.cl_mem["ref_kp_gpu"], kp, raw_results=True
)
# print(raw_matching.max(axis=0))
matching = numpy.recarray(
shape=raw_matching.shape, dtype=MatchPlan.dtype_kp
)
len_match = raw_matching.shape[0]
if len_match == 0:
logger.warning("No matching keypoints")
return None
matching[:, 0] = self.ref_kp[raw_matching[:, 0]]
matching[:, 1] = kp[raw_matching[:, 1]]
if orsa:
if feature:
matching = feature.sift_orsa(matching, self.shape, 1)
else:
logger.warning("feature is not available. No ORSA filtering")
if (len_match < 3 * 6) or (shift_only): # 3 points per DOF
if shift_only:
logger.debug("Shift Only mode: Common keypoints: %s" % len_match)
else:
logger.warning("Shift Only mode: Common keypoints: %s" % len_match)
dx = matching[:, 1].x - matching[:, 0].x
dy = matching[:, 1].y - matching[:, 0].y
matrix = numpy.identity(2, dtype=numpy.float32)
offset = numpy.array(
[+numpy.median(dy), +numpy.median(dx)], numpy.float32
)
else:
logger.debug("Common keypoints: %s" % len_match)
transform_matrix = matching_correction(matching)
offset = numpy.array(
[transform_matrix[5], transform_matrix[2]], dtype=numpy.float32
)
matrix = numpy.empty((2, 2), dtype=numpy.float32)
matrix[0, 0], matrix[0, 1] = transform_matrix[4, 0], transform_matrix[3, 0]
matrix[1, 0], matrix[1, 1] = transform_matrix[1, 0], transform_matrix[0, 0]
if double_check and (
len_match >= 3 * 6
): # and abs(matrix - numpy.identity(2)).max() > 0.1:
logger.warning("Validating keypoints, %s,%s" % (matrix, offset))
dx = matching[:, 1].x - matching[:, 0].x
dy = matching[:, 1].y - matching[:, 0].y
dangle = matching[:, 1].angle - matching[:, 0].angle
dscale = numpy.log(matching[:, 1].scale / matching[:, 0].scale)
distance = numpy.sqrt(dx * dx + dy * dy)
outlayer = numpy.zeros(distance.shape, numpy.int8)
outlayer += abs((distance - distance.mean()) / distance.std()) > 4
outlayer += abs((dangle - dangle.mean()) / dangle.std()) > 4
outlayer += abs((dscale - dscale.mean()) / dscale.std()) > 4
outlayersum = outlayer.sum()
if outlayersum > 0 and not numpy.isinf(outlayersum):
matching2 = matching[outlayer == 0]
transform_matrix = matching_correction(matching2)
offset = numpy.array(
[transform_matrix[5], transform_matrix[2]], dtype=numpy.float32
)
matrix = numpy.empty((2, 2), dtype=numpy.float32)
matrix[0, 0], matrix[0, 1] = (
transform_matrix[4, 0],
transform_matrix[3, 0],
)
matrix[1, 0], matrix[1, 1] = (
transform_matrix[1, 0],
transform_matrix[0, 0],
)
if relative: # update stable part to perform a relative alignment
self.ref_kp = kp
if self.mask is not None:
kpx = numpy.round(self.ref_kp.x).astype(numpy.int32)
kpy = numpy.round(self.ref_kp.y).astype(numpy.int32)
masked = self.mask[(kpy, kpx)].astype(bool)
logger.warning(
"Reducing keypoint list from %i to %i because of the ROI"
% (self.ref_kp.size, masked.sum())
)
self.ref_kp = self.ref_kp[masked]
self.cl_mem["ref_kp_gpu"] = pyopencl.array.to_device(
self.match.queue, self.ref_kp
)
transfo = numpy.zeros((3, 3), dtype=numpy.float64)
transfo[:2, :2] = matrix
transfo[0, 2] = offset[0]
transfo[1, 2] = offset[1]
transfo[2, 2] = 1
if self.relative_transfo is None:
self.relative_transfo = transfo
else:
self.relative_transfo = numpy.dot(transfo, self.relative_transfo)
matrix = numpy.ascontiguousarray(
self.relative_transfo[:2, :2], dtype=numpy.float32
)
offset = numpy.ascontiguousarray(
self.relative_transfo[:2, 2], dtype=numpy.float32
)
cpy1 = pyopencl.enqueue_copy(self.queue, self.cl_mem["matrix"].data, matrix)
cpy2 = pyopencl.enqueue_copy(self.queue, self.cl_mem["offset"].data, offset)
if self.profile:
self.events += [("Copy matrix", cpy1), ("Copy offset", cpy2)]
if self.RGB:
shape = (4, self.shape[1], self.shape[0])
kname = "transform_RGB"
else:
shape = self.shape[1], self.shape[0]
kname = "transform"
transform = self.kernels.get_kernel(kname)
ev = transform(
self.queue,
calc_size(shape, self.wg[kname]),
self.wg[kname],
self.cl_mem["input"].data,
self.cl_mem["output"].data,
self.cl_mem["matrix"].data,
self.cl_mem["offset"].data,
numpy.int32(self.shape[1]),
numpy.int32(self.shape[0]),
numpy.int32(self.outshape[1]),
numpy.int32(self.outshape[0]),
self.sift.cl_mem["min"].get()[0],
numpy.int32(1),
)
if self.profile:
self.events += [(kname, ev)]
result = self.cl_mem["output"].get()
if return_all:
# corr = numpy.dot(matrix, numpy.vstack((matching[:, 1].y, matching[:, 1].x))).T - \
# offset.T - numpy.vstack((matching[:, 0].y, matching[:, 0].x)).T
corr = (
numpy.dot(matrix, numpy.vstack((matching[:, 0].y, matching[:, 0].x))).T
+ offset.T
- numpy.vstack((matching[:, 1].y, matching[:, 1].x)).T
)
rms = numpy.sqrt((corr * corr).sum(axis=-1).mean())
# Todo: calculate the RMS of deplacement and return it:
return {
"result": result,
"keypoint": kp,
"matching": matching,
"offset": offset,
"matrix": matrix,
"rms": rms,
}
return result
__call__ = align
[docs]
def log_profile(self):
"""
If we are in debugging mode, prints out all timing for every single OpenCL call
"""
t = 0.0
# orient = 0.0
# descr = 0.0
if self.profile:
for e in self.events:
if "__len__" in dir(e) and len(e) >= 2:
et = 1e-6 * (e[1].profile.end - e[1].profile.start)
print("%50s:\t%.3fms" % (e[0], et))
t += et