import functools
import operator
import numpy as np
from pyadjoint.tape import annotate_tape
from pyop2.utils import cached_property
import pytools
import finat.ufl
from ufl.algorithms import extract_coefficients
from ufl.constantvalue import as_ufl
from ufl.corealg.map_dag import map_expr_dag
from ufl.corealg.multifunction import MultiFunction
from ufl.domain import extract_unique_domain
from firedrake.constant import Constant
from firedrake.function import Function
from firedrake.petsc import PETSc
from firedrake.utils import ScalarType, split_by
from firedrake.vector import Vector
def _isconstant(expr):
return isinstance(expr, Constant) or \
(isinstance(expr, Function) and expr.ufl_element().family() == "Real")
def _isfunction(expr):
return isinstance(expr, Function) and expr.ufl_element().family() != "Real"
[docs]
class CoefficientCollector(MultiFunction):
"""Multifunction used for converting an expression into a weighted sum of coefficients.
Calling ``map_expr_dag(CoefficientCollector(), expr)`` will return a tuple whose entries
are of the form ``(coefficient, weight)``. Expressions that cannot be expressed as a
weighted sum will raise an exception.
Note: As well as being simple weighted sums (e.g. ``u.assign(2*v1 + 3*v2)``), one can
also assign constant expressions of the appropriate shape (e.g. ``u.assign(1.0)`` or
``u.assign(2*v + 3)``). Therefore the returned tuple must be split since ``coefficient``
may be either a :class:`firedrake.constant.Constant` or :class:`firedrake.function.Function`.
"""
[docs]
def product(self, o, a, b):
scalars, vectors = split_by(self._is_scalar_equiv, [a, b])
# Case 1: scalar * scalar
if len(scalars) == 2:
# Compress the first argument (arbitrary)
scalar, vector = scalars
# Case 2: scalar * vector
elif len(scalars) == 1:
scalar, = scalars
vector, = vectors
# Case 3: vector * vector (invalid)
else:
raise ValueError("Expressions containing the product of two vector-valued "
"subexpressions cannot be used for assignment. Consider using "
"interpolate instead.")
scaling = self._as_scalar(scalar)
return tuple((coeff, weight*scaling) for coeff, weight in vector)
[docs]
def division(self, o, a, b):
# Division is only valid if b (the divisor) is a scalar
if self._is_scalar_equiv(b):
divisor = self._as_scalar(b)
return tuple((coeff, weight/divisor) for coeff, weight in a)
else:
raise ValueError("Expressions involving division by a vector-valued subexpression "
"cannot be used for assignment. Consider using interpolate instead.")
[docs]
def sum(self, o, a, b):
# Note: a and b are tuples of (coefficient, weight) so addition is concatenation
return a + b
[docs]
def power(self, o, a, b):
# Only valid if a and b are scalars
return ((Constant(self._as_scalar(a) ** self._as_scalar(b)), 1),)
[docs]
def abs(self, o, a):
# Only valid if a is a scalar
return ((Constant(abs(self._as_scalar(a))), 1),)
def _scalar(self, o):
return ((Constant(o), 1),)
int_value = _scalar
float_value = _scalar
complex_value = _scalar
zero = _scalar
[docs]
def multi_index(self, o):
pass
[docs]
def indexed(self, o, a, _):
return a
[docs]
def component_tensor(self, o, a, _):
return a
[docs]
def coefficient(self, o):
return ((o, 1),)
[docs]
def constant_value(self, o):
return ((o, 1),)
[docs]
def expr(self, o, *operands):
raise NotImplementedError(f"Handler not defined for {type(o)}")
def _is_scalar_equiv(self, weighted_coefficients):
"""Return ``True`` if the sequence of ``(coefficient, weight)`` can be compressed to
a single scalar value.
This is only true when all coefficients are :class:`firedrake.Constant` or
are :class:`firedrake.Function` and ``c.ufl_element().family() == "Real"``
in both cases ``c.dat.dim`` must have shape ``(1,)``.
"""
return all(_isconstant(c) and c.dat.dim == (1,) for (c, _) in weighted_coefficients)
def _as_scalar(self, weighted_coefficients):
"""Compress a sequence of ``(coefficient, weight)`` tuples to a single scalar value.
This is necessary because we do not know a priori whether a :class:`firedrake.Constant`
is going to be used as a scale factor (e.g. ``u.assign(Constant(2)*v)``), or as a
constant to be added (e.g. ``u.assign(2*v + Constant(3))``). Therefore we only
compress to a scalar when we know it is required (e.g. inside a product with a
:class:`~.firedrake.function.Function`).
"""
return pytools.one(
functools.reduce(operator.add, (c.dat.data_ro*w for c, w in weighted_coefficients))
)
[docs]
class Assigner:
"""Class performing pointwise assignment of an expression to a :class:`firedrake.function.Function`.
:param assignee: The :class:`~.firedrake.function.Function` being assigned to.
:param expression: The :class:`ufl.core.expr.Expr` to evaluate.
:param subset: Optional subset (:class:`pyop2.types.set.Subset`) to apply the assignment over.
"""
symbol = "="
_coefficient_collector = CoefficientCollector()
def __init__(self, assignee, expression, subset=None):
if isinstance(expression, Vector):
expression = expression.function
expression = as_ufl(expression)
for coeff in extract_coefficients(expression):
if isinstance(coeff, Function) and coeff.ufl_element().family() != "Real":
if coeff.ufl_element() != assignee.ufl_element():
raise ValueError("All functions in the expression must have the same "
"element as the assignee")
if extract_unique_domain(coeff) != extract_unique_domain(assignee):
raise ValueError("All functions in the expression must use the same "
"mesh as the assignee")
if (subset and type(assignee.ufl_element()) == finat.ufl.MixedElement
and any(el.family() == "Real"
for el in assignee.ufl_element().sub_elements)):
raise ValueError("Subset is not a valid argument for assigning to a mixed "
"element including a real element")
self._assignee = assignee
self._expression = expression
self._subset = subset
def __str__(self):
return f"{self._assignee} {self.symbol} {self._expression}"
def __repr__(self):
return f"{self.__class__.__name__}({self._assignee!r}, {self._expression!r})"
[docs]
@PETSc.Log.EventDecorator()
def assign(self):
"""Perform the assignment."""
if annotate_tape():
raise NotImplementedError(
"Taping with explicit Assigner objects is not supported yet. "
"Use Function.assign instead."
)
# To minimize communication during assignment we perform a number of tricks:
# * If we are not assigning to a subset then we can always write to the
# halo. The validity of the original assignee dat halo does not matter
# since we are overwriting it entirely.
# * We can also write to the halo if we are assigning to a subset provided
# that the assignee halo is not dirty to start with.
# * If we are assigning to a subset where the assignee dat has a dirty halo,
# then we should only write to the owned values. There is no point in
# writing to the halo since a full halo exchange is still required.
# * If any of the functions in the expression do not have valid halos then
# we only write to the owned values in the assignee. Otherwise we might
# end up doing a lot of halo exchanges for the expression just to avoid
# a single halo exchange for the assignee.
# * If we do write to the halo then the resulting halo will never be dirty.
func_halos_valid = all(f.dat.halo_valid for f in self._functions)
assign_to_halos = (
func_halos_valid and (not self._subset or self._assignee.dat.halo_valid))
if assign_to_halos:
subset_indices = self._subset.indices if self._subset else ...
data_ro = operator.attrgetter("data_ro_with_halos")
else:
subset_indices = self._subset.owned_indices if self._subset else ...
data_ro = operator.attrgetter("data_ro")
# If mixed, loop over individual components
for lhs_dat, *func_dats in zip(self._assignee.dat.split,
*(f.dat.split for f in self._functions)):
func_data = np.array([data_ro(f)[subset_indices] for f in func_dats])
rvalue = self._compute_rvalue(func_data)
self._assign_single_dat(lhs_dat, subset_indices, rvalue, assign_to_halos)
# if we have bothered writing to halo it naturally must not be dirty
if assign_to_halos:
self._assignee.dat.halo_valid = True
@cached_property
def _constants(self):
return tuple(c for (c, _) in self._weighted_coefficients if _isconstant(c))
@cached_property
def _constant_weights(self):
return tuple(w for (c, w) in self._weighted_coefficients if _isconstant(c))
@cached_property
def _functions(self):
return tuple(c for (c, _) in self._weighted_coefficients if _isfunction(c))
@cached_property
def _function_weights(self):
return tuple(w for (c, w) in self._weighted_coefficients if _isfunction(c))
def _assign_single_dat(self, lhs_dat, indices, rvalue, assign_to_halos):
if assign_to_halos:
lhs_dat.data_wo_with_halos[indices] = rvalue
else:
lhs_dat.data_wo[indices] = rvalue
def _compute_rvalue(self, func_data):
# There are two components to the rvalue: weighted functions (in the same function space),
# and constants (e.g. u.assign(2*v + 3)).
func_rvalue = (func_data.T @ self._function_weights).T
const_data = np.array([c.dat.data_ro for c in self._constants], dtype=ScalarType)
const_rvalue = const_data.T @ self._constant_weights
return func_rvalue + const_rvalue
@cached_property
def _weighted_coefficients(self):
# TODO: It would be nice to stash this on the expression so we can avoid extra
# traversals for non-persistent Assigner objects, but expressions do not currently
# have caches attached to them.
return map_expr_dag(self._coefficient_collector, self._expression)
[docs]
class IAddAssigner(Assigner):
"""Assigner class for ``firedrake.function.Function.__iadd__``."""
symbol = "+="
def _assign_single_dat(self, lhs, indices, rvalue, assign_to_halos):
if assign_to_halos:
lhs.data_with_halos[indices] += rvalue
else:
lhs.data[indices] += rvalue
[docs]
class ISubAssigner(Assigner):
"""Assigner class for ``firedrake.function.Function.__isub__``."""
symbol = "-="
def _assign_single_dat(self, lhs, indices, rvalue, assign_to_halos):
if assign_to_halos:
lhs.data_with_halos[indices] -= rvalue
else:
lhs.data[indices] -= rvalue
[docs]
class IMulAssigner(Assigner):
"""Assigner class for ``firedrake.function.Function.__imul__``."""
symbol = "*="
def _assign_single_dat(self, lhs, indices, rvalue, assign_to_halos):
if self._functions:
raise ValueError("Only multiplication by scalars is supported")
if assign_to_halos:
lhs.data_with_halos[indices] *= rvalue
else:
lhs.data[indices] *= rvalue
[docs]
class IDivAssigner(Assigner):
"""Assigner class for ``firedrake.function.Function.__itruediv__``."""
symbol = "/="
def _assign_single_dat(self, lhs, indices, rvalue, assign_to_halos):
if self._functions:
raise ValueError("Only division by scalars is supported")
if assign_to_halos:
lhs.data_with_halos[indices] /= rvalue
else:
lhs.data[indices] /= rvalue