Source code for odl.core.operator.tensor_ops

# Copyright 2014-2020 The ODL contributors
#
# This file is part of ODL.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.

"""Operators defined for tensor fields."""


from numbers import Integral
from typing import Optional, Iterable
from dataclasses import dataclass

import numpy as np

from odl.core.util.npy_compat import AVOID_UNNECESSARY_COPY

from odl.core.operator.operator import Operator, AdapterOperator
from odl.core.operator.default_ops import IdentityOperator
from odl.core.operator.pspace_ops import DiagonalOperator
from odl.core.set import ComplexNumbers, RealNumbers
from odl.core.set.space import LinearSpace
from odl.core.space import ProductSpace, tensor_space
from odl.core.space.base_tensors import TensorSpace, Tensor
from odl.core.space.weightings.weighting import ArrayWeighting
from odl.core.util import dtype_repr, indent, signature_string
from odl.core.array_API_support import (ArrayBackend, lookup_array_backend,
                                        abs as odl_abs, maximum, pow, sqrt,
                                        multiply, get_array_and_backend,
                                        can_cast, odl_all_equal)

from odl.core.sparse import is_sparse, get_sparse_matrix_impl, lookup_sparse_format

__all__ = ('PointwiseNorm', 'PointwiseInner', 'PointwiseSum', 'MatrixOperator',
           'SamplingOperator', 'WeightedSumSamplingOperator',
           'FlatteningOperator', 'DeviceChange', 'ArrayBackendChange')

_SUPPORTED_DIFF_METHODS = ('central', 'forward', 'backward')

class _ImplChangeOperator(Operator):
    """An operator that is mathematically the identity, but whose domain and codomain
    differ in what backend or device they use for their arrays.
    This class is not intended for direct use (it is tedious having to explicitly
    define the domain and range as identical spaces with different implementation).
    Instead, use `ArrayBackendChange` and `DeviceChange`, which only require specifying
    the actual change that needs to happen. Both are automatically converted to
    `_ImplChangeOperator` when used in an operator pipeline.
    """

    def __init__(self, domain, range):
        """Create an operator tying two equivalent spaces with different storage together.

        Parameters
        ----------
        domain, range : `TensorSpace`
            Spaces of vectors. They must be identical save for the backend (`impl`)
            or the device.
        """
        assert(domain.shape == range.shape)
        assert(domain.impl == range.impl or domain.device == range.device)
        super().__init__(domain, range=range, linear=True)

    def _call(self, x):
        """Copy data to the intended backend."""
        if self.range.impl != self.domain.impl:
            return x.to_impl(self.range.impl)
        elif self.range.device != self.domain.device:
            return x.to_device(self.range.device)
        else:
            return x

    @property
    def inverse(self):
        """Operator that copies data back to the original backend."""
        return _ImplChangeOperator(domain=self.range, range=self.domain)

    @property
    def adjoint(self):
        """Adjoint is the same as inverse, as backend change is mathematically
        the identity."""
        return self.inverse

    def norm(self, estimate=False, **kwargs):
        """Return the operator norm of this operator. This is 1, as the
        operator is mathematically the identity."""
        return 1

    def __repr__(self):
        """Represent the operator by its domain and the impl of the range."""
        return f"{self.__class__.__name__}(domain={repr(self.domain)}, range_impl={repr(self.range.impl)})"

    def __str__(self):
        return f"{self.__class__.__name__}(domain={str(self.domain)}, range_impl={str(self.range.impl)})"

[docs] @dataclass(repr=True) class ProductSpaceOverindexingException(ValueError): space: LinearSpace subspace_index: int | list[int] def __str__(self): return repr(self)
class DeviceChange(AdapterOperator): """A pseudo-operator that copies arrays from one computational device to another. This is useful as an adapter in a pipeline of operators that need to use different devices for some reason. Note that it is usually more efficient to implement your whole pipeline on a single device, if possible. Further reading: :ref:`adapteroperatorguide` """
[docs] def __init__(self, domain_device: str, range_device: str, subspace_index: int | list[int] =[]): """Create an operator tying two equivalent spaces with different storage together. Parameters ---------- domain_device, range_device : `str` Device specifiers such as `'cpu'` or `'cuda:0'`. Which ones are supported depends on the backend and hardware. subspace_index: int or sequence of ints If the domain is a compound space (i.e. `ProductSpace`), you may wish to only change the device of one of the constituent spaces. This can be done with this index into the product space structure. If a list is provided, this is understood as recursive indexing into a nested product space. """ self._domain_device = domain_device self._range_device = range_device # TODO refactor logic for `subspace_index` to avoid duplication between # `DeviceChange` and `ArrayBackendChange` if isinstance(subspace_index, int): self._subspace_index = [subspace_index] elif (isinstance(subspace_index, Iterable) and all(isinstance(i,int) for i in subspace_index)): self._subspace_index = list(subspace_index) else: raise TypeError( f"`subspace_index` must be `int` or list of ints; got {type(subspace_index)}")
def _subspace_index_exception(self, space): return ProductSpaceOverindexingException( space=space, subspace_index=self._subspace_index) def _infer_op_from_domain(self, domain: LinearSpace) -> Operator: if isinstance(domain, ProductSpace): if self._subspace_index: subchanger = DeviceChange(domain_device=self._domain_device, range_device=self._range_device, subspace_index=self._subspace_index[1:]) try: return DiagonalOperator(*[subchanger._infer_op_from_domain(p) if i==self._subspace_index[0] else IdentityOperator(p) for i,p in enumerate(domain.spaces)]) except ProductSpaceOverindexingException as e: raise self._subspace_index_exception(domain) from e else: return DiagonalOperator(*[self._infer_op_from_domain(p) for p in domain.spaces]) elif not isinstance(domain, TensorSpace): raise TypeError(f"Device change is only defined on `TensorSpace` or `ProductSpace`.") elif domain.device != self._domain_device: raise ValueError(f"Expected {self._domain_device}, got {domain.device=}") elif len(self._subspace_index) > 0: raise self._subspace_index_exception(domain) else: return _ImplChangeOperator(domain=domain, range=domain.to_device(self._range_device)) def _infer_op_from_range(self, range: LinearSpace) -> Operator: index_exception = ProductSpaceOverindexingException( space=range, subspace_index=self._subspace_index) if isinstance(range, ProductSpace): if self._subspace_index: subchanger = DeviceChange(domain_device=self._domain_device, range_device=self._range_device, subspace_index=self._subspace_index[1:]) try: return DiagonalOperator(*[subchanger._infer_op_from_range(p) if i==self._subspace_index[0] else IdentityOperator(p) for i,p in enumerate(range.spaces)]) except ProductSpaceOverindexingException as e: raise self._subspace_index_exception(range) from e else: return DiagonalOperator(*[self._infer_op_from_range(p) for p in range.spaces]) elif not isinstance(range, TensorSpace): raise TypeError(f"Device change is only defined on `TensorSpace` or `ProductSpace`.") elif range.device != self._range_device: raise ValueError(f"Expected {self._range_device}, got {range.device=}") elif len(self._subspace_index) > 0: raise self._subspace_index_exception(range) else: return _ImplChangeOperator(domain=range.to_device(self._domain_device), range=range) def __repr__(self): """Represent the operator by its domain and the device of the range.""" return f"{self.__class__.__name__}(domain_device={repr(self._domain_device)}, range_device={repr(self._range_device)})" def __str__(self): return f"{self.__class__.__name__}(domain_device={str(self._domain_device)}, range_device={str(self._range_device)})" class ArrayBackendChange(AdapterOperator): """A pseudo-operator that transfers arrays from one backend to another. Both backends must support the same device (this can mean you first need to use `DeviceChange` to transfer to `'cpu'`, which should be supported by all backends). Further reading: :ref:`adapteroperatorguide` """
[docs] def __init__(self, domain_impl: str, range_impl: str, subspace_index: int | list[int] =[]): """Create an operator tying two equivalent spaces with different storage together. Parameters ---------- domain_impl, range_impl : `str` Backend specifiers such as `'numpy'` or `'pytorch'`. Which ones are supported depends on the installed packages. """ self._domain_impl = domain_impl self._range_impl = range_impl if isinstance(subspace_index, int): self._subspace_index = [subspace_index] elif (isinstance(subspace_index, Iterable) and all(isinstance(i,int) for i in subspace_index)): self._subspace_index = list(subspace_index) else: raise TypeError( f"`subspace_index` must be `int` or list of ints; got {type(subspace_index)}")
def _subspace_index_exception(self, space): return ProductSpaceOverindexingException( space=space, subspace_index=self._subspace_index) def _infer_op_from_domain(self, domain: LinearSpace) -> Operator: if isinstance(domain, ProductSpace): if self._subspace_index: subchanger = ArrayBackendChange(domain_impl=self._domain_impl, range_impl=self._range_impl, subspace_index=self._subspace_index[1:]) try: return DiagonalOperator(*[subchanger._infer_op_from_domain(p) if i==self._subspace_index[0] else IdentityOperator(p) for i,p in enumerate(domain.spaces)]) except ProductSpaceOverindexingException as e: raise self._subspace_index_exception(range) from e else: return DiagonalOperator(*[self._infer_op_from_domain(p) for p in domain.spaces]) elif not isinstance(domain, TensorSpace): raise TypeError(f"Backend change is only defined on `TensorSpace` or `ProductSpace`.") elif domain.impl != self._domain_impl: raise ValueError(f"Expected {self._domain_impl}, got {domain.impl=}") elif len(self._subspace_index) > 0: raise self._subspace_index_exception(domain) else: return _ImplChangeOperator(domain=domain, range=domain.to_impl(self._range_impl)) def _infer_op_from_range(self, range: LinearSpace) -> Operator: if isinstance(range, ProductSpace): if self._subspace_index: subchanger = ArrayBackendChange(domain_impl=self._domain_impl, range_impl=self._range_impl, subspace_index=self._subspace_index[1:]) try: return DiagonalOperator(*[subchanger._infer_op_from_range(p) if i==self._subspace_index[0] else IdentityOperator(p) for i,p in enumerate(range.spaces)]) except ProductSpaceOverindexingException as e: raise self._subspace_index_exception(range) from e else: return DiagonalOperator(*[self._infer_op_from_range(p) for p in range.spaces]) elif not isinstance(range, TensorSpace): raise TypeError(f"Backend change is only defined on `TensorSpace` or `ProductSpace`.") elif range.impl != self._range_impl: raise ValueError(f"Expected {self._range_impl}, got {range.impl=}") elif len(self._subspace_index) > 0: raise self._subspace_index_exception(range) return _ImplChangeOperator(domain=range.to_impl(self._domain_impl), range=range)
[docs] def norm(self, estimate=False, **kwargs): """Return the operator norm of this operator. This is 1, as the operator is mathematically the identity.""" return 1
def __repr__(self): """Represent the operator by its domain and the device of the range.""" return f"{self.__class__.__name__}(domain_impl={repr(self._domain_impl)}, range_impl={repr(self._range_impl)})" def __str__(self): return f"{self.__class__.__name__}(domain_impl={str(self._domain_impl)}, range_impl={str(self._range_impl)})"
[docs] class PointwiseTensorFieldOperator(Operator): """Abstract operator for point-wise tensor field manipulations. A point-wise operator acts on a space of vector or tensor fields, i.e. a power space ``X^d`` of a discretized function space ``X``. Its range is the power space ``X^k`` with a possibly different number ``k`` of components. For ``k == 1``, the base space ``X`` can be used instead. For example, if ``X`` is a `DiscretizedSpace` space, then ``ProductSpace(X, d)`` is a valid domain for any positive integer ``d``. It is also possible to have tensor fields over tensor fields, i.e. ``ProductSpace(ProductSpace(X, n), m)``. .. note:: It is allowed that ``domain``, ``range`` and ``base_space`` use different ``dtype``. Correctness for, e.g., real-to-complex mappings is not guaranteed in that case. See Also -------- odl.core.space.pspace.ProductSpace """
[docs] def __init__(self, domain, range, base_space, linear=False): """Initialize a new instance. Parameters ---------- domain, range : {`ProductSpace`, `LinearSpace`} Spaces of vector fields between which the operator maps. They have to be either power spaces of the same base space ``X`` (up to ``dtype``), or the base space itself. Empty product spaces are not allowed. base_space : `LinearSpace` The base space ``X``. linear : bool, optional If ``True``, assume that the operator is linear. """ if not is_compatible_space(domain, base_space): raise ValueError( f"`domain` {domain} is not compatible with `base_space` {base_space}" ) if not is_compatible_space(range, base_space): raise ValueError( f"`range` {range} is not compatible with `base_space` {base_space}" ) super().__init__(domain=domain, range=range, linear=linear) self.__base_space = base_space
@property def base_space(self): """Base space ``X`` of this operator's domain and range.""" return self.__base_space
class PointwiseNorm(PointwiseTensorFieldOperator): """Take the point-wise norm of a vector field. This operator computes the (weighted) p-norm in each point of a vector field, thus producing a scalar-valued function. It implements the formulas :: ||F(x)|| = [ sum_j( w_j * |F_j(x)|^p ) ]^(1/p) for ``p`` finite and :: ||F(x)|| = max_j( w_j * |F_j(x)| ) for ``p = inf``, where ``F`` is a vector field. This implies that the `Operator.domain` is a power space of a discretized function space. For example, if ``X`` is a `DiscretizedSpace` space, then ``ProductSpace(X, d)`` is a valid domain for any positive integer ``d``. """
[docs] def __init__(self, vfspace, exponent=None, weighting=None): """Initialize a new instance. Parameters ---------- vfspace : `ProductSpace` Space of vector fields on which the operator acts. It has to be a product space of identical spaces, i.e. a power space. exponent : non-zero float, optional Exponent of the norm in each point. Values between 0 and 1 are currently not supported due to numerical instability. Default: ``vfspace.exponent`` weighting : `array-like` or positive float, optional Weighting array or constant for the norm. If an array is given, its length must be equal to ``len(domain)``, and all entries must be positive. By default, the weights are is taken from ``domain.weighting``. Note that this excludes unusual weightings with custom inner product, norm or dist. Examples -------- We make a tiny vector field space in 2D and create the standard point-wise norm operator on that space. The operator maps a vector field to a scalar function: >>> spc = odl.uniform_discr([-1, -1], [1, 1], (1, 2)) >>> vfspace = odl.ProductSpace(spc, 2) >>> pw_norm = odl.PointwiseNorm(vfspace) >>> pw_norm.range == spc True Now we can calculate the 2-norm in each point: >>> x = vfspace.element([[[1, -4]], ... [[0, 3]]]) >>> print(pw_norm(x)) [[ 1., 5.]] We can change the exponent either in the vector field space or in the operator directly: >>> vfspace = odl.ProductSpace(spc, 2, exponent=1) >>> pw_norm = PointwiseNorm(vfspace) >>> print(pw_norm(x)) [[ 1., 7.]] >>> vfspace = odl.ProductSpace(spc, 2) >>> pw_norm = PointwiseNorm(vfspace, exponent=1) >>> print(pw_norm(x)) [[ 1., 7.]] """ if not isinstance(vfspace, ProductSpace): raise TypeError(f"`vfspace` {vfspace} is not a ProductSpace instance") super().__init__( domain=vfspace, range=vfspace[0].real_space, base_space=vfspace[0], linear=False, ) # Need to check for product space shape once higher order tensors # are implemented if exponent is None: if self.domain.exponent is None: raise ValueError(f"cannot determine `exponent` from {self.domain}") self._exponent = self.domain.exponent elif exponent < 1: raise ValueError("`exponent` smaller than 1 not allowed") else: self._exponent = float(exponent) # Handle weighting, including sanity checks if weighting is None: # TODO: find a more robust way of getting the weights as an array if hasattr(self.domain.weighting, 'array'): self.__weights = self.domain.weighting.array elif hasattr(self.domain.weighting, 'const'): self.__weights = [ self.domain.weighting.const * self.domain[i].one() for i in range(len(vfspace)) ] else: raise ValueError( f"weighting scheme {self.domain.weighting} of the domain does not define a weighting array or constant" ) self.__is_weighted = False else: ### This is a bad situation: although we worked hard to get an elegant weighting, the PointwiseNorm just yanks all of that down the drain by reimplementing the norm operation and the input sanitisation just for a ProductSpace. ### EV reimplemented these two functionnalities but moving forward, this should be coerced into abiding to our new API if (isinstance(weighting, list) and all([isinstance(w, Tensor) for w in weighting])): self.__weights = weighting self.__is_weighted = all(odl_all_equal(w, 1) for w in weighting) else: if isinstance(weighting, (int, float)): weighting = [weighting for _ in range(len(self.domain))] weighted_flag = [] for i in range(len(self.domain)): if weighting[i] <= 0: raise ValueError( f"weighting array weighting contains invalid entry {weighting[i]}" ) if weighting[i] in [1, 1.0]: weighted_flag.append(False) else: weighted_flag.append(True) self.__is_weighted = bool(any(weighted_flag)) weighting = [ self.domain[i].tspace.broadcast_to(weighting[i]) for i in range(len(self.domain)) ] self.__weights = [] for i in range(len(self.domain)): self.__weights.append(self.domain[i].element(weighting[i]))
@property def exponent(self): """Exponent ``p`` of this norm.""" return self._exponent @property def weights(self): """Weighting array of this operator.""" return self.__weights @property def is_weighted(self): """``True`` if weighting is not 1 or all ones.""" return self.__is_weighted
[docs] def _call(self, f, out): """Implement ``self(f, out)``.""" if self.exponent == 1.0: self._call_vecfield_1(f, out) elif self.exponent == float('inf'): self._call_vecfield_inf(f, out) else: self._call_vecfield_p(f, out)
def _call_vecfield_1(self, vf, out): """Implement ``self(vf, out)`` for exponent 1.""" odl_abs(vf[0], out=out) if self.is_weighted: out *= self.weights[0] if len(self.domain) == 1: return tmp = self.range.element() for fi, wi in zip(vf[1:], self.weights[1:]): odl_abs(fi, out=tmp) if self.is_weighted: tmp *= wi out += tmp def _call_vecfield_inf(self, vf, out): """Implement ``self(vf, out)`` for exponent ``inf``.""" odl_abs(vf[0], out=out) if self.is_weighted: out *= self.weights[0] if len(self.domain) == 1: return tmp = self.range.element() for vfi, wi in zip(vf[1:], self.weights[1:]): odl_abs(vfi, out=tmp) if self.is_weighted: tmp *= wi maximum(out, tmp, out=out) def _call_vecfield_p(self, vf, out): """Implement ``self(vf, out)`` for exponent 1 < p < ``inf``.""" # Optimization for 1 component - just absolute value (maybe weighted) if len(self.domain) == 1: odl_abs(vf[0], out=out) if self.is_weighted: out *= self.weights[0] ** (1 / self.exponent) return # Initialize out, avoiding one copy self._abs_pow(vf[0], out=out, p=self.exponent) if self.is_weighted: out *= self.weights[0] tmp = self.range.element() for fi, wi in zip(vf[1:], self.weights[1:]): self._abs_pow(fi, out=tmp, p=self.exponent) if self.is_weighted: tmp *= wi out += tmp self._abs_pow(out, out=out, p=(1 / self.exponent)) def _abs_pow(self, fi, out, p): """Compute |F_i(x)|^p point-wise and write to ``out``.""" # Optimization for very common cases if p == 0.5: odl_abs(fi, out=out) sqrt(out, out=out) elif p == 2.0 and self.base_space.field == RealNumbers(): multiply(fi, fi, out=out) else: odl_abs(fi, out=out) pow(out, p, out=out)
[docs] def derivative(self, vf): """Derivative of the point-wise norm operator at ``vf``. The derivative at ``F`` of the point-wise norm operator ``N`` with finite exponent ``p`` and weights ``w`` is the pointwise inner product with the vector field :: x --> N(F)(x)^(1-p) * [ F_j(x) * |F_j(x)|^(p-2) ]_j Note that this is not well-defined for ``F = 0``. If ``p < 2``, any zero component will result in a singularity. Parameters ---------- vf : `domain` `element-like` Vector field ``F`` at which to evaluate the derivative. Returns ------- deriv : `PointwiseInner` Derivative operator at the given point ``vf``. Raises ------ NotImplementedError * if the vector field space is complex, since the derivative is not linear in that case * if the exponent is ``inf`` """ if self.domain.field == ComplexNumbers(): raise NotImplementedError("operator not Frechet-differentiable " "on a complex space") if self.exponent == float('inf'): raise NotImplementedError("operator not Frechet-differentiable " "for exponent = inf") vf = self.domain.element(vf) vf_pwnorm_fac = self(vf) if self.exponent != 2: # optimize away most common case. vf_pwnorm_fac **= (self.exponent - 1) inner_vf = vf.copy() for gi in inner_vf: gi *= pow(odl_abs(gi), self.exponent - 2) if self.exponent >= 2: # Any component that is zero is not divided with nz = (vf_pwnorm_fac.asarray() != 0) gi[nz] /= vf_pwnorm_fac[nz] else: # For exponents < 2 there will be a singularity if any # component is zero. This results in inf or nan. See the # documentation for further details. gi /= vf_pwnorm_fac return PointwiseInner(self.domain, inner_vf, weighting=self.weights)
[docs] class PointwiseInnerBase(PointwiseTensorFieldOperator): """Base class for `PointwiseInner` and `PointwiseInnerAdjoint`. Implemented to allow code reuse between the classes. """
[docs] def __init__(self, adjoint, vfspace, vecfield, weighting=None): """Initialize a new instance. All parameters are given according to the specifics of the "usual" operator. The ``adjoint`` parameter is used to control conversions for the inverse transform. Parameters ---------- adjoint : bool ``True`` if the operator should be the adjoint, ``False`` otherwise. vfspace : `ProductSpace` Space of vector fields on which the operator acts. It has to be a product space of identical spaces, i.e. a power space. vecfield : ``vfspace`` `element-like` Vector field with which to calculate the point-wise inner product of an input vector field weighting : `array-like` or float, optional Weighting array or constant for the norm. If an array is given, its length must be equal to ``len(domain)``. By default, the weights are is taken from ``domain.weighting``. Note that this excludes unusual weightings with custom inner product, norm or dist. """ if not isinstance(vfspace, ProductSpace): raise TypeError(f"`vfspace` {vfspace} is not a ProductSpace instance") if adjoint: super().__init__( domain=vfspace[0], range=vfspace, base_space=vfspace[0], linear=True ) else: super().__init__( domain=vfspace, range=vfspace[0], base_space=vfspace[0], linear=True ) # Bail out if the space is complex but we cannot take the complex # conjugate. if (vfspace.field == ComplexNumbers() and not hasattr(self.base_space.element_type, 'conj')): raise NotImplementedError( f"base space element type {self.base_space.element_type} does not implement conj() method required for complex inner products" ) self._vecfield = vfspace.element(vecfield) # Handle weighting, including sanity checks if weighting is None: self.__is_weighted = False if hasattr(vfspace.weighting, 'array'): self.__weights = vfspace.weighting.array elif hasattr(vfspace.weighting, 'const'): # Casting the constant to an array of constants is just bad self.__weights = [vfspace.weighting.const *vfspace[i].one() for i in range(len(vfspace))] else: raise ValueError( f"weighting scheme {vfspace.weighting} of the domain does not define a weighting array or constant" ) else: # Check if the input has already been sanitised, i.e is it an odl.Tensor if (isinstance(weighting, list) and all(isinstance(w, Tensor) for w in weighting)): self.__weights = weighting self.__is_weighted = all(odl_all_equal(w, 1) for w in weighting) # these are required to provide an array-API compatible weighting parsing. else: if isinstance(weighting, (int, float)): weighting = [weighting for i in range(len(vfspace))] weighted_flag = [] for i in range(len(vfspace)): if weighting[i] <= 0: raise ValueError( f"weighting array weighting contains invalid entry {weighting[i]}" ) if weighting[i] in [1, 1.0]: weighted_flag.append(False) else: weighted_flag.append(True) self.__is_weighted = bool(any(weighted_flag)) weighting = [ vfspace[i].tspace.broadcast_to(weighting[i]) for i in range(len(vfspace)) ] self.__weights = [] for i in range(len(vfspace)): self.weights.append(vfspace[i].element(weighting[i]))
@property def vecfield(self): """Fixed vector field ``G`` of this inner product.""" return self._vecfield @property def weights(self): """Weighting array of this operator.""" return self.__weights @property def is_weighted(self): """``True`` if weighting is not 1 or all ones.""" return self.__is_weighted @property def adjoint(self): """Adjoint operator.""" raise NotImplementedError("abstract method")
class PointwiseInner(PointwiseInnerBase): """Take the point-wise inner product with a given vector field. This operator takes the (weighted) inner product :: <F(x), G(x)> = sum_j ( w_j * F_j(x) * conj(G_j(x)) ) for a given vector field ``G``, where ``F`` is the vector field acting as a variable to this operator. This implies that the `Operator.domain` is a power space of a discretized function space. For example, if ``X`` is a `DiscretizedSpace` space, then ``ProductSpace(X, d)`` is a valid domain for any positive integer ``d``. """
[docs] def __init__(self, vfspace, vecfield, weighting=None): """Initialize a new instance. Parameters ---------- vfspace : `ProductSpace` Space of vector fields on which the operator acts. It has to be a product space of identical spaces, i.e. a power space. vecfield : ``vfspace`` `element-like` Vector field with which to calculate the point-wise inner product of an input vector field weighting : `array-like` or float, optional Weighting array or constant for the norm. If an array is given, its length must be equal to ``len(domain)``, and all entries must be positive. By default, the weights are is taken from ``domain.weighting``. Note that this excludes unusual weightings with custom inner product, norm or dist. Examples -------- We make a tiny vector field space in 2D and create the point-wise inner product operator with a fixed vector field. The operator maps a vector field to a scalar function: >>> spc = odl.uniform_discr([-1, -1], [1, 1], (1, 2)) >>> vfspace = odl.ProductSpace(spc, 2) >>> fixed_vf = np.array([[[0, 1]], ... [[1, -1]]]) >>> pw_inner = PointwiseInner(vfspace, fixed_vf) >>> pw_inner.range == spc True Now we can calculate the inner product in each point: >>> x = vfspace.element([[[1, -4]], ... [[0, 3]]]) >>> print(pw_inner(x)) [[ 0., -7.]] """ super().__init__( adjoint=False, vfspace=vfspace, vecfield=vecfield, weighting=weighting )
@property def vecfield(self): """Fixed vector field ``G`` of this inner product.""" return self._vecfield
[docs] def _call(self, vf, out): """Implement ``self(vf, out)``.""" if self.domain.field == ComplexNumbers(): vf[0].multiply(self._vecfield[0].conj(), out=out) else: vf[0].multiply(self._vecfield[0], out=out) if self.is_weighted: out *= self.weights[0] if len(self.domain) == 1: return tmp = self.range.element() for vfi, gi, wi in zip(vf[1:], self.vecfield[1:], self.weights[1:]): if self.domain.field == ComplexNumbers(): vfi.multiply(gi.conj(), out=tmp) else: vfi.multiply(gi, out=tmp) if self.is_weighted: tmp *= wi out += tmp
@property def adjoint(self): """Adjoint of this operator. Returns ------- adjoint : `PointwiseInnerAdjoint` """ return PointwiseInnerAdjoint( sspace=self.base_space, vecfield=self.vecfield, vfspace=self.domain, weighting=self.weights)
[docs] class PointwiseInnerAdjoint(PointwiseInnerBase): """Adjoint of the point-wise inner product operator. The adjoint of the inner product operator is a mapping :: A^* : X --> X^d If the vector field space ``X^d`` is weighted by a vector ``v``, the adjoint, applied to a function ``h`` from ``X`` is the vector field :: x --> h(x) * (w / v) * G(x), where ``G`` and ``w`` are the vector field and weighting from the inner product operator, resp., and all multiplications are understood component-wise. """
[docs] def __init__(self, sspace, vecfield, vfspace=None, weighting=None): """Initialize a new instance. Parameters ---------- sspace : `LinearSpace` "Scalar" space on which the operator acts vecfield : range `element-like` Vector field of the point-wise inner product operator vfspace : `ProductSpace`, optional Space of vector fields to which the operator maps. It must be a power space with ``sspace`` as base space. This option is intended to enforce an operator range with a certain weighting. Default: ``ProductSpace(space, len(vecfield), weighting=weighting)`` weighting : `array-like` or float, optional Weighting array or constant of the inner product operator. If an array is given, its length must be equal to ``len(vecfield)``. By default, the weights are is taken from ``range.weighting`` if applicable. Note that this excludes unusual weightings with custom inner product, norm or dist. """ if vfspace is None: vfspace = ProductSpace(sspace, len(vecfield), weighting=weighting) else: if not isinstance(vfspace, ProductSpace): raise TypeError(f"`vfspace` {vfspace} is not a ProductSpace instance") if vfspace[0] != sspace: raise ValueError( f"base space of the range is different from the given scalar space ({vfspace[0]} != {sspace})" ) super().__init__( adjoint=True, vfspace=vfspace, vecfield=vecfield, weighting=weighting ) # Get weighting from range if hasattr(self.range.weighting, 'array'): ### The tolist() is an ugly tweak to recover a list from the pspace weighting.array which is stored in numpy self.__ran_weights = vfspace.element(self.range.weighting.array.tolist()) elif hasattr(self.range.weighting, 'const'): # Casting the constant to an array of constants is just bad self.__ran_weights = [self.range.weighting.const *self.range[i].one() for i in range(len(self.range))] else: raise ValueError( f"weighting scheme {self.range.weighting} of the range does not define a weighting array or constant" )
[docs] def _call(self, f, out): """Implement ``self(vf, out)``.""" for vfi, oi, ran_wi, dom_wi in zip(self.vecfield, out, self.__ran_weights, self.weights): vfi.multiply(f, out=oi) # Removed the optimisation here, it would require casting ran_wi as odl.TensorSpaceElement # if not isclose(ran_wi, dom_wi).all(): oi *= dom_wi / ran_wi
@property def adjoint(self): """Adjoint of this operator. Returns ------- adjoint : `PointwiseInner` """ return PointwiseInner(vfspace=self.range, vecfield=self.vecfield, weighting=self.weights)
# TODO: Make this an optimized operator on its own. class PointwiseSum(PointwiseInner): """Take the point-wise sum of a vector field. This operator takes the (weighted) sum :: sum(F(x)) = [ sum_j( w_j * F_j(x) ) ] where ``F`` is a vector field. This implies that the `Operator.domain` is a power space of a discretized function space. For example, if ``X`` is a `DiscretizedSpace` space, then ``ProductSpace(X, d)`` is a valid domain for any positive integer ``d``. """
[docs] def __init__(self, vfspace, weighting=None): """Initialize a new instance. Parameters ---------- vfspace : `ProductSpace` Space of vector fields on which the operator acts. It has to be a product space of identical spaces, i.e. a power space. weighting : `array-like` or float, optional Weighting array or constant for the sum. If an array is given, its length must be equal to ``len(domain)``. By default, the weights are is taken from ``domain.weighting``. Note that this excludes unusual weightings with custom inner product, norm or dist. Examples -------- We make a tiny vector field space in 2D and create the standard point-wise sum operator on that space. The operator maps a vector field to a scalar function: >>> spc = odl.uniform_discr([-1, -1], [1, 1], (1, 2)) >>> vfspace = odl.ProductSpace(spc, 2) >>> pw_sum = PointwiseSum(vfspace) >>> pw_sum.range == spc True Now we can calculate the sum in each point: >>> x = vfspace.element([[[1, -4]], ... [[0, 3]]]) >>> print(pw_sum(x)) [[ 1., -1.]] """ if not isinstance(vfspace, ProductSpace): raise TypeError(f"`vfspace` {vfspace} is not a ProductSpace instance") ones = vfspace.one() super().__init__(vfspace, vecfield=ones, weighting=weighting)
class MatrixOperator(Operator): """A matrix acting as a linear operator. This operator uses a matrix to represent an operator, and get its adjoint and inverse by doing computations on the matrix. This is in general a rather slow and memory-inefficient approach, and users are recommended to use other alternatives if possible. """
[docs] def __init__(self, matrix, domain=None, range=None, impl: Optional[str]=None, device: Optional[str]=None, axis=0): r"""Initialize a new instance. Parameters ---------- matrix : `array-like` or `scipy.sparse.base.spmatrix` 2-dimensional array representing the linear operator. For Scipy sparse matrices only tensor spaces with ``ndim == 1`` are allowed as ``domain``. The matrix is copied to `impl`/`device`, if these are specified (only once, when the operator is initialized). If a plain Python list is supplied, it will first be converted to a NumPy array. domain : `TensorSpace`, optional Space of elements on which the operator can act. Its ``dtype`` must be castable to ``range.dtype``. For the default ``None``, a space with 1 axis and size ``matrix.shape[1]`` is used, together with the matrix' data type. range : `TensorSpace`, optional Space of elements on to which the operator maps. Its ``shape`` and ``dtype`` attributes must match those of the result of the multiplication. For the default ``None``, the range is inferred from ``matrix``, ``domain`` and ``axis``. impl : `ArrayBackend`-identifying `str`, optional Which backend to use for the low-level matrix multiplication. If not explicitly provided, it will be inferred in the following order of preference, depending on what is available: 1. from `domain` 2. from `range` 3. from `matrix` device : `str`, optional On which device to store the matrix. Same defaulting logic as for `impl`. axis : int, optional Sum over this axis of an input tensor in the multiplication. Examples -------- By default, ``domain`` and ``range`` are spaces of with one axis: >>> m = np.ones((3, 4)) >>> op = MatrixOperator(m) >>> op.domain rn(4) >>> op.range rn(3) >>> op([1, 2, 3, 4]) rn(3).element([ 10., 10., 10.]) For multi-dimensional arrays (tensors), the summation (contraction) can be performed along a specific axis. In this example, the number of matrix rows (4) must match the domain shape entry in the given axis: >>> dom = odl.rn((5, 4, 4)) # can use axis=1 or axis=2 >>> op = MatrixOperator(m, domain=dom, axis=1) >>> op(dom.one()).shape (5, 3, 4) >>> op = MatrixOperator(m, domain=dom, axis=2) >>> op(dom.one()).shape (5, 4, 3) The operator also works on `uniform_discr` type spaces. Note, however, that the ``weighting`` of the domain is propagated to the range by default, in order to keep the correspondence between adjoint and transposed matrix: >>> space = odl.uniform_discr(0, 1, 4) >>> op = MatrixOperator(m, domain=space) >>> op(space.one()) rn(3, weighting=0.25).element([ 4., 4., 4.]) >>> np.array_equal(op.adjoint.matrix, m.T) True Notes ----- For a matrix :math:`A \in \mathbb{F}^{n \times m}`, the operation on a tensor :math:`T \in \mathbb{F}^{n_1 \times \dots \times n_d}` is defined as the summation .. math:: (A \cdot T)_{i_1, \dots, i_k, \dots, i_d} = \sum_{j=1}^m A_{i_k j} T_{i_1, \dots, j, \dots, i_d}. It produces a new tensor :math:`A \cdot T \in \mathbb{F}^{ n_1 \times \dots \times n \times \dots \times n_d}`. """ def infer_backend_from(default_backend): if impl is not None: self.__array_backend = lookup_array_backend(impl) else: assert isinstance(default_backend, ArrayBackend) self.__array_backend = default_backend def infer_device_from(default_device): self.__device = default_device if device is None else device self._sparse_format = lookup_sparse_format(matrix) if domain is not None: infer_backend_from(domain.array_backend) infer_device_from(domain.device) elif range is not None: infer_backend_from(range.array_backend) infer_device_from(range.device) elif self.is_sparse: if self._sparse_format.impl == 'scipy': infer_backend_from(lookup_array_backend('numpy')) infer_device_from('cpu') elif self._sparse_format.impl == 'pytorch': infer_backend_from(lookup_array_backend('pytorch')) infer_device_from(matrix.device) else: raise ValueError elif isinstance(matrix, (list, tuple)): infer_backend_from(lookup_array_backend('numpy')) infer_device_from('cpu') else: infer_backend_from(get_array_and_backend(matrix)[1]) infer_device_from(matrix.device) self.__arr_ns = self.array_backend.array_namespace if self.is_sparse: if self._sparse_format.impl == 'scipy': if self.array_backend.impl != 'numpy': raise TypeError(f"SciPy sparse matrices can only be used with NumPy on CPU, not {self.array_backend.impl}.") if self.device != 'cpu': raise TypeError(f"SciPy sparse matrices can only be used with NumPy on CPU, not {device}.") elif self._sparse_format.impl == 'pytorch': if self.array_backend.impl != 'pytorch': raise TypeError(f"PyTorch sparse matrices can only be used with Pytorch, not {self.array_backend.impl}.") self.__matrix = matrix elif isinstance(matrix, Tensor): self.__matrix = matrix.data self.__matrix = self.__arr_ns.asarray( matrix.data, device=self.__device, copy=AVOID_UNNECESSARY_COPY ) while len(self.__matrix.shape) < 2: self.__matrix = self.__matrix[None] else: self.__matrix = self.__arr_ns.asarray( matrix, device=self.__device, copy=AVOID_UNNECESSARY_COPY ) while len(self.__matrix.shape) < 2: self.__matrix = self.__matrix[None] self.__axis, axis_in = int(axis), axis if self.axis != axis_in: raise ValueError(f"`axis` must be integer, got {axis_in}") if self.matrix.ndim != 2: raise ValueError(f"`matrix` has {self.matrix.ndim} axes instead of 2") # Infer or check domain if domain is None: dtype = self.array_backend.identifier_of_dtype(self.matrix.dtype) domain = tensor_space((self.matrix.shape[1],), dtype=dtype, impl = self.array_backend.impl, device = self.device ) else: if not isinstance(domain, TensorSpace): raise TypeError( f"`domain` must be a `TensorSpace` instance, got {domain}" ) if self.is_sparse and domain.ndim > 1: raise ValueError("`domain.ndim` > 1 unsupported for " "scipy sparse matrices") if domain.shape[axis] != self.matrix.shape[1]: raise ValueError( f"`domain.shape[axis]` not equal to `matrix.shape[1]` ({domain.shape[axis]} != {self.matrix.shape[1]})" ) range_shape = list(domain.shape) range_shape[self.axis] = self.matrix.shape[0] if range is None: # Infer range range_dtype = self.__arr_ns.result_type( self.matrix.dtype, domain.dtype) range_dtype = self.array_backend.identifier_of_dtype(range_dtype) if (range_shape != domain.shape and isinstance(domain.weighting, ArrayWeighting)): # Cannot propagate weighting due to size mismatch. weighting = None else: weighting = domain.weighting range = tensor_space(range_shape, impl = self.array_backend.impl, device=self.device, dtype=range_dtype, weighting=weighting, exponent=domain.exponent) else: # Check consistency of range if not isinstance(range, TensorSpace): raise TypeError( f"`range` must be a `TensorSpace` instance, got {range}" ) if range.shape != tuple(range_shape): raise ValueError( f"expected `range.shape` = {tuple(range_shape)}, got {range.shape}" ) # Check compatibility of data types result_dtype = self.__arr_ns.result_type(domain.dtype, self.matrix.dtype) if not can_cast(self.__arr_ns, result_dtype, range.dtype): raise ValueError( f"result data type {dtype_repr(result_dtype)} cannot be safely cast to range data type {dtype_repr(range.dtype)}" ) super().__init__(domain, range, linear=True)
@property def is_sparse(self) -> bool: """Whether the underlying matrix is stored in a format that optimized for mostly-zero entries. Note that this does not necessarily say anything about how sparse the matrix actually is.""" return self._sparse_format is not None @property def matrix(self): """Matrix representing this operator.""" return self.__matrix @property def array_backend(self): """Backend on which to carry out the BLAS matmul operation. Note that this does not necessarily have to be the same as either the range or domain of the operator, but by default it will be chosen such. If a different backend and/or device is used, the operator will always copy data to `self.array_backend` before carrying out the matrix multiplication, then copy the result to `self.range.array_backend`. Such copies should generally be avoided as they can be slow, but they can sometimes be justified if memory is scarce on one of the devices. """ return self.__array_backend @property def device(self): """Computational device on which to carry out the BLAS operation. See remarks on `array_backend`.""" return self.__device @property def axis(self): """Axis of domain elements over which is summed.""" return self.__axis @property def adjoint(self): """Adjoint operator represented by the adjoint matrix. Returns ------- adjoint : `MatrixOperator` """ return MatrixOperator(self.matrix.conj().T, domain=self.range, range=self.domain, axis=self.axis) @property def inverse(self): """Inverse operator represented by the inverse matrix. Taking the inverse causes sparse matrices to become dense and is generally very heavy computationally since the matrix is inverted numerically (an O(n^3) operation). It is recommended to instead use one of the solvers available in the ``odl.solvers`` package. Returns ------- inverse : `MatrixOperator` """ if self.is_sparse: matrix = self._sparse_format.to_dense(self.matrix) else: matrix = self.matrix return MatrixOperator(self.__arr_ns.linalg.inv(matrix), domain=self.range, range=self.domain, axis=self.axis, impl=self.domain.impl, device=self.domain.device)
[docs] def _call(self, x): """Return ``self(x[, out])``.""" if self.is_sparse: out = self._sparse_format.matmul_spmatrix_with_vector(self.matrix, x.data) else: dot = self.__arr_ns.tensordot(self.matrix, x.data, axes=([1], [self.axis])) # New axis ends up as first, need to swap it to its place out = self.__arr_ns.moveaxis(dot, 0, self.axis) return out
def __repr__(self): """Return ``repr(self)``.""" # Matrix printing itself in an executable way (for dense matrix) if self.is_sparse or self.array_backend.impl != 'numpy': matrix_str = repr(self.matrix) else: matrix_str = np.array2string(self.matrix, separator=', ') posargs = [matrix_str] # Optional arguments with defaults, inferred from the matrix range_shape = list(self.domain.shape) range_shape[self.axis] = self.matrix.shape[0] try: default_domain = tensor_space(self.matrix.shape[1], impl=self.array_backend.impl, dtype=self.matrix.dtype) except (ValueError, TypeError): default_domain = None try: default_range = tensor_space(range_shape, impl=self.array_backend.impl, dtype=self.matrix.dtype) except (ValueError, TypeError): default_range = None optargs = [ ('domain', self.domain, default_domain), ('range', self.range, default_range), ('axis', self.axis, 0) ] inner_str = signature_string( posargs, optargs, sep=[", ", ", ", ",\n"], mod=[["!s"], ["!r", "!r", ""]] ) return f"{self.__class__.__name__}(\n{indent(inner_str)}\n)" def __str__(self): """Return ``str(self)``.""" return repr(self) def _normalize_sampling_points(sampling_points, ndim): """Normalize points to an ndim-long list of linear index arrays. This helper converts sampling indices for `SamplingOperator` from integers or array-like objects to a list of length ``ndim``, where each entry is a `numpy.ndarray` with ``dtype=int``. The function also checks if all arrays have equal lengths, and that they fulfill ``array.ndim=1`` (or ``size=0`` for if ``ndim == 0``). The result of this normalization is intended to be used for indexing an ``ndim``-dimensional array at ``sampling_points`` via NumPy fancy indexing, i.e., ``result = ndim_array[sampling_points]``. """ sampling_points_in = sampling_points if ndim == 0: sampling_points = [np.array(sampling_points, dtype=int, copy=AVOID_UNNECESSARY_COPY)] if sampling_points[0].size != 0: raise ValueError("`sampling_points` must be empty for 0-dim. `domain`") elif ndim == 1: if isinstance(sampling_points, Integral): sampling_points = (sampling_points,) sampling_points = np.array(sampling_points, dtype=int, copy=AVOID_UNNECESSARY_COPY, ndmin=1) # Handle possible list of length one if sampling_points.ndim == 2 and sampling_points.shape[0] == 1: sampling_points = sampling_points[0] sampling_points = [sampling_points] if sampling_points[0].ndim > 1: raise ValueError(f"expected 1D index (array), got {sampling_points_in}") else: try: iter(sampling_points) except TypeError as exc: raise TypeError("`sampling_points` must be a sequence " "for domain with ndim > 1") from exc else: if np.ndim(sampling_points) == 1: sampling_points = [np.array(p, dtype=int) for p in sampling_points] else: sampling_points = [ np.array(pts, dtype=int, copy=AVOID_UNNECESSARY_COPY, ndmin=1) for pts in sampling_points] if any(pts.ndim != 1 for pts in sampling_points): raise ValueError( f"index arrays in `sampling_points` must be 1D, got {sampling_points_in}" ) return sampling_points class SamplingOperator(Operator): """Operator that samples coefficients. The operator is defined by :: SamplingOperator(f) == c * f[sampling_points] with the weight ``c`` being determined by the variant. By choosing ``c = 1``, this operator approximates point evaluations or inner products with Dirac deltas, see option ``variant='point_eval'``. By choosing ``c = cell_volume``, it approximates the integration of ``f`` over the indexed cells, see option ``variant='integrate'``. """
[docs] def __init__(self, domain, sampling_points, variant='point_eval'): """Initialize a new instance. Parameters ---------- domain : `TensorSpace` Set of elements on which this operator acts. sampling_points : 1D `array-like` or sequence of 1D array-likes Indices that determine the sampling points. In n dimensions, it should be a sequence of n arrays, where each member array is of equal length N. The indexed positions are ``(arr1[i], arr2[i], ..., arrn[i])``, in total N points. If ``domain`` is one-dimensional, a single array-like can be used. Likewise, a single point can be given as integer in 1D, and as a array-like sequence in nD. variant : {'point_eval', 'integrate'}, optional For ``'point_eval'`` this operator performs the sampling by evaluation the function at the sampling points. The ``'integrate'`` variant approximates integration by multiplying point evaluation with the cell volume. Examples -------- Sampling in 1d can be done with a single index (an int) or a sequence of such: >>> space = odl.uniform_discr(0, 1, 4) >>> op = odl.SamplingOperator(space, sampling_points=1) >>> x = space.element([1, 2, 3, 4]) >>> op(x) rn(1).element([ 2.]) >>> op = odl.SamplingOperator(space, sampling_points=[1, 2, 1]) >>> op(x) rn(3).element([ 2., 3., 2.]) There are two variants ``'point_eval'`` (default) and ``'integrate'``, where the latter scales values by the cell volume to approximate the integral over the cells of the points: >>> op = odl.SamplingOperator(space, sampling_points=[1, 2, 1], ... variant='integrate') >>> space.cell_volume # the scaling constant 0.25 >>> op(x) rn(3).element([ 0.5 , 0.75, 0.5 ]) In higher dimensions, a sequence of index array-likes must be given, or a single sequence for a single point: >>> space = odl.uniform_discr([0, 0], [1, 1], (2, 3)) >>> # Sample at the index (0, 2) >>> op = odl.SamplingOperator(space, sampling_points=[0, 2]) >>> x = space.element([[1, 2, 3], ... [4, 5, 6]]) >>> op(x) rn(1).element([ 3.]) >>> sampling_points = [[0, 1, 1], # indices (0, 2), (1, 1), (1, 0) ... [2, 1, 0]] >>> op = odl.SamplingOperator(space, sampling_points) >>> op(x) rn(3).element([ 3., 5., 4.]) """ if not isinstance(domain, TensorSpace): raise TypeError(f"`domain` must be a `TensorSpace` instance, got {domain}") self.__sampling_points = _normalize_sampling_points(sampling_points, domain.ndim) # Flatten indices during init for faster indexing later indices_flat = np.ravel_multi_index(self.sampling_points, dims=domain.shape) if np.isscalar(indices_flat): self._indices_flat = np.array([indices_flat], dtype=int) else: self._indices_flat = indices_flat self.__variant = str(variant).lower() if self.variant not in ("point_eval", "integrate"): raise ValueError(f"`variant` {variant} not understood") # Propagating the impl and device of the range ran = tensor_space( self.sampling_points[0].size, dtype=domain.dtype, impl=domain.impl, device=domain.device, ) super().__init__(domain, ran, linear=True)
@property def variant(self): """Weighting scheme for the sampling operator.""" return self.__variant @property def sampling_points(self): """Indices where to sample the function.""" return self.__sampling_points
[docs] def _call(self, x): """Return values at indices, possibly weighted.""" out = x.asarray().ravel()[self._indices_flat] if self.variant == 'point_eval': weights = 1.0 elif self.variant == 'integrate': weights = getattr(self.domain, 'cell_volume', 1.0) else: raise RuntimeError(f"bad variant {self.variant}") if weights != 1.0: out *= weights return out
@property def adjoint(self): """Adjoint of the sampling operator, a `WeightedSumSamplingOperator`. If each sampling point occurs only once, the adjoint consists in inserting the given values into the output at the sampling points. Duplicate sampling points are weighted with their multiplicity. Examples -------- >>> space = odl.uniform_discr([-1, -1], [1, 1], shape=(2, 3)) >>> sampling_points = [[0, 1, 1, 0], ... [0, 1, 2, 0]] >>> op = odl.SamplingOperator(space, sampling_points) >>> x = space.element([[1, 2, 3], ... [4, 5, 6]]) >>> np.abs(op.adjoint(op(x)).inner(x) - op(x).inner(op(x))) < 1e-10 True The ``'integrate'`` variant adjoint puts ones at the indices in ``sampling_points``, multiplied by their multiplicity: >>> op = odl.SamplingOperator(space, sampling_points, ... variant='integrate') >>> op.adjoint(op.range.one()) # (0, 0) occurs twice uniform_discr([-1., -1.], [ 1., 1.], (2, 3)).element( [[ 2., 0., 0.], [ 0., 1., 1.]] ) >>> np.abs(op.adjoint(op(x)).inner(x) - op(x).inner(op(x))) < 1e-10 True """ if self.variant == 'point_eval': variant = 'dirac' elif self.variant == 'integrate': variant = 'char_fun' else: raise RuntimeError(f"bad variant {self.variant}") return WeightedSumSamplingOperator(self.domain, self.sampling_points, variant) def __repr__(self): """Return ``repr(self)``.""" posargs = [self.domain, self.sampling_points] optargs = [('variant', self.variant, 'point_eval')] sig_str = signature_string(posargs, optargs, mod=['!r', ''], sep=[',\n', '', ',\n']) return f"{self.__class__.__name__}(\n{indent(sig_str)}\n)" def __str__(self): """Return ``str(self)``.""" return repr(self) class WeightedSumSamplingOperator(Operator): r"""Operator computing the sum of coefficients at sampling locations. This operator is the adjoint of `SamplingOperator`. Notes ----- The weighted sum sampling operator for a sequence :math:`I = (i_n)_{n=1}^N` of indices (possibly with duplicates) is given by .. math:: W_I(g)(x) = \sum_{i \in I} d_i(x) g_i, where :math:`g \in \mathbb{F}^N` is the value vector, and :math:`d_i` is either a Dirac delta or a characteristic function of the cell centered around the point indexed by :math:`i`. """
[docs] def __init__(self, range, sampling_points, variant='char_fun'): """Initialize a new instance. Parameters ---------- range : `TensorSpace` Set of elements into which this operator maps. sampling_points : 1D `array-like` or sequence of 1D array-likes Indices that determine the sampling points. In n dimensions, it should be a sequence of n arrays, where each member array is of equal length N. The indexed positions are ``(arr1[i], arr2[i], ..., arrn[i])``, in total N points. If ``range`` is one-dimensional, a single array-like can be used. Likewise, a single point can be given as integer in 1D, and as a array-like sequence in nD. variant : {'char_fun', 'dirac'}, optional This option determines which function to sum over. Examples -------- In 1d, a single index (an int) or a sequence of such can be used for indexing. >>> space = odl.uniform_discr(0, 1, 4) >>> op = odl.WeightedSumSamplingOperator(space, sampling_points=1) >>> op.domain rn(1) >>> x = op.domain.element([1]) >>> # Put value 1 at index 1 >>> op(x) uniform_discr(0.0, 1.0, 4).element([ 0., 1., 0., 0.]) >>> op = odl.WeightedSumSamplingOperator(space, ... sampling_points=[1, 2, 1]) >>> op.domain rn(3) >>> x = op.domain.element([1, 0.5, 0.25]) >>> # Index 1 occurs twice and gets two contributions (1 and 0.25) >>> op(x) uniform_discr(0.0, 1.0, 4).element([ 0. , 1.25, 0.5 , 0. ]) The ``'dirac'`` variant scales the values by the reciprocal cell volume of the operator range: >>> op = odl.WeightedSumSamplingOperator( ... space, sampling_points=[1, 2, 1], variant='dirac') >>> x = op.domain.element([1, 0.5, 0.25]) >>> 1 / op.range.cell_volume # the scaling constant 4.0 >>> op(x) uniform_discr(0.0, 1.0, 4).element([ 0., 5., 2., 0.]) In higher dimensions, a sequence of index array-likes must be given, or a single sequence for a single point: >>> space = odl.uniform_discr([0, 0], [1, 1], (2, 3)) >>> # Sample at the index (0, 2) >>> op = odl.WeightedSumSamplingOperator(space, ... sampling_points=[0, 2]) >>> x = op.domain.element([1]) >>> # Insert the value 1 at index (0, 2) >>> op(x) uniform_discr([ 0., 0.], [ 1., 1.], (2, 3)).element( [[ 0., 0., 1.], [ 0., 0., 0.]] ) >>> sampling_points = [[0, 1], # indices (0, 2) and (1, 1) ... [2, 1]] >>> op = odl.WeightedSumSamplingOperator(space, sampling_points) >>> x = op.domain.element([1, 2]) >>> op(x) uniform_discr([ 0., 0.], [ 1., 1.], (2, 3)).element( [[ 0., 0., 1.], [ 0., 2., 0.]] ) """ if not isinstance(range, TensorSpace): raise TypeError(f"`range` must be a `TensorSpace` instance, got {range}") self.__sampling_points = _normalize_sampling_points(sampling_points, range.ndim) # Convert a list of index arrays to linear index array indices_flat = np.ravel_multi_index(self.sampling_points, dims=range.shape) if np.isscalar(indices_flat): indices_flat = np.array([indices_flat], dtype=int) else: indices_flat = np.array(indices_flat, dtype=int) ### Always converting the indices to the right data type self._indices_flat = range.array_backend.array_constructor( indices_flat, dtype=int, device=range.device ) self.__variant = str(variant).lower() if self.variant not in ('dirac', 'char_fun'): raise ValueError(f"`variant` {variant} not understood") # Recording the namespace for bincount self.namespace = range.array_backend.array_namespace # Propagating the impl and device of the range domain = tensor_space( self.sampling_points[0].size, dtype=range.dtype, impl=range.impl, device=range.device, ) super().__init__(domain, range, linear=True)
@property def variant(self): """Weighting scheme for the operator.""" return self.__variant @property def sampling_points(self): """Indices where to sample the function.""" return self.__sampling_points
[docs] def _call(self, x): """Sum all values if indices are given multiple times.""" y = self.namespace.bincount(self._indices_flat, weights=x.data, minlength=self.range.size) out = y.reshape(self.range.shape) if self.variant == 'dirac': weights = getattr(self.range, 'cell_volume', 1.0) elif self.variant == 'char_fun': weights = 1.0 else: raise RuntimeError(f'The variant "{self.variant}" is not yet supported') if weights != 1.0: out /= weights return out
@property def adjoint(self): """Adjoint of this operator, a `SamplingOperator`. The ``'char_fun'`` variant of this operator corresponds to the ``'integrate'`` sampling operator, and ``'dirac'`` corresponds to ``'point_eval'``. Examples -------- >>> space = odl.uniform_discr([-1, -1], [1, 1], shape=(2, 3)) >>> # Point (0, 0) occurs twice >>> sampling_points = [[0, 1, 1, 0], ... [0, 1, 2, 0]] >>> op = odl.WeightedSumSamplingOperator(space, sampling_points, ... variant='dirac') >>> y = op.range.element([[1, 2, 3], ... [4, 5, 6]]) >>> op.adjoint(y) rn(4).element([ 1., 5., 6., 1.]) >>> x = op.domain.element([1, 2, 3, 4]) >>> np.abs(op.adjoint(op(x)).inner(x) - op(x).inner(op(x))) < 1e-10 True >>> op = odl.WeightedSumSamplingOperator(space, sampling_points, ... variant='char_fun') >>> np.abs(op.adjoint(op(x)).inner(x) - op(x).inner(op(x))) < 1e-10 True """ if self.variant == 'dirac': variant = 'point_eval' elif self.variant == 'char_fun': variant = 'integrate' else: raise RuntimeError(f'The variant "{self.variant}" is not yet supported') return SamplingOperator(self.range, self.sampling_points, variant) def __repr__(self): """Return ``repr(self)``.""" posargs = [self.range, self.sampling_points] optargs = [('variant', self.variant, 'char_fun')] sig_str = signature_string(posargs, optargs, mod=['!r', ''], sep=[',\n', '', ',\n']) return f"{self.__class__.__name__}(\n{indent(sig_str)}\n)" def __str__(self): """Return ``str(self)``.""" return repr(self) class FlatteningOperator(Operator): """Operator that reshapes the object as a column vector. The operation performed by this operator is :: FlatteningOperator(x) == ravel(x) The range of this operator is always a `TensorSpace`, i.e., even if the domain is a discrete function space. """
[docs] def __init__(self, domain): """Initialize a new instance. Parameters ---------- domain : `TensorSpace` Set of elements on which this operator acts. Examples -------- >>> space = odl.uniform_discr([-1, -1], [1, 1], shape=(2, 3)) >>> op = odl.FlatteningOperator(space) >>> op.range rn(6) >>> x = space.element([[1, 2, 3], ... [4, 5, 6]]) >>> op(x) rn(6).element([ 1., 2., 3., 4., 5., 6.]) """ if not isinstance(domain, TensorSpace): raise TypeError(f"`domain` must be a `TensorSpace` instance, got {domain}") range = tensor_space(domain.size, dtype=domain.dtype) super().__init__(domain, range, linear=True)
[docs] def _call(self, x): """Flatten ``x``.""" return self.range.element(x.data.reshape([self.range.shape[0]]))
@property def adjoint(self): """Adjoint of the flattening, a scaled version of the `inverse`. Examples -------- >>> space = odl.uniform_discr([-1, -1], [1, 1], shape=(2, 4)) >>> op = odl.FlatteningOperator(space) >>> y = op.range.element([1, 2, 3, 4, 5, 6, 7, 8]) >>> 1 / space.cell_volume # the scaling factor 2.0 >>> op.adjoint(y) uniform_discr([-1., -1.], [ 1., 1.], (2, 4)).element( [[ 2., 4., 6., 8.], [ 10., 12., 14., 16.]] ) >>> x = space.element([[1, 2, 3, 4], ... [5, 6, 7, 8]]) >>> np.abs(op.adjoint(op(x)).inner(x) - op(x).inner(op(x))) < 1e-10 True """ scaling = getattr(self.domain, 'cell_volume', 1.0) return 1 / scaling * self.inverse @property def inverse(self): """Operator that reshapes to original shape. Examples -------- >>> space = odl.uniform_discr([-1, -1], [1, 1], shape=(2, 4)) >>> op = odl.FlatteningOperator(space) >>> y = op.range.element([1, 2, 3, 4, 5, 6, 7, 8]) >>> op.inverse(y) uniform_discr([-1., -1.], [ 1., 1.], (2, 4)).element( [[ 1., 2., 3., 4.], [ 5., 6., 7., 8.]] ) >>> op(op.inverse(y)) == y True """ op = self scaling = getattr(self.domain, 'cell_volume', 1.0) class FlatteningOperatorInverse(Operator): """Inverse of `FlatteningOperator`. This operator reshapes a flat vector back to original shape:: FlatteningOperatorInverse(x) == reshape(x, orig_shape) """ def __init__(self): """Initialize a new instance.""" super().__init__(op.range, op.domain, linear=True) def _call(self, x): """Reshape ``x`` back to n-dim. shape.""" return np.reshape(x.asarray(), self.range.shape) @property def adjoint(self): """Adjoint of this operator, a scaled `FlatteningOperator`.""" return scaling * op @property def inverse(self): """Inverse of this operator.""" return op def __repr__(self): """Return ``repr(self)``.""" return f"{op}.inverse" def __str__(self): """Return ``str(self)``.""" return repr(self) return FlatteningOperatorInverse() def __repr__(self): """Return ``repr(self)``.""" return f"{self.__class__.__name__}({self.domain})" def __str__(self): """Return ``str(self)``.""" return repr(self)
[docs] def is_compatible_space(space, base_space): """Check compatibility of a (power) space with a base space. Compatibility here means that the spaces are equal or ``space`` is a non-empty power space of ``base_space`` up to different data types. Parameters ---------- space, base_space : `LinearSpace` Spaces to check for compatibility. ``base_space`` cannot be a `ProductSpace`. Returns ------- is_compatible : bool ``True`` if - ``space == base_space`` or - ``space.astype(base_space.dtype) == base_space``, provided that these properties exist, or - ``space`` is a power space of nonzero length and one of the three situations applies to ``space[0]`` (recursively). Otherwise ``False``. Examples -------- Scalar spaces: >>> base = odl.rn(2) >>> is_compatible_space(odl.rn(2), base) True >>> is_compatible_space(odl.rn(3), base) False >>> is_compatible_space(odl.rn(2, dtype='float32'), base) True Power spaces: >>> is_compatible_space(odl.rn(2) ** 2, base) True >>> is_compatible_space(odl.rn(2) * odl.rn(3), base) # no power space False >>> is_compatible_space(odl.rn(2, dtype='float32') ** 2, base) True """ if isinstance(base_space, ProductSpace): return False if isinstance(space, ProductSpace): if not space.is_power_space: return False elif len(space) == 0: return False else: return is_compatible_space(space[0], base_space) else: if hasattr(space, 'astype') and hasattr(base_space, 'dtype'): # TODO: maybe only the shape should play a role? comp_space = space.astype(base_space.dtype) else: comp_space = space return comp_space == base_space
if __name__ == '__main__': from odl.core.util.testutils import run_doctests run_doctests()