Source code for firedrake.solving_utils

from itertools import chain

import numpy
import ufl

from pyop2 import op2
from firedrake import dmhooks
from firedrake.function import Function
from firedrake.cofunction import Cofunction
from firedrake.exceptions import ConvergenceError
from firedrake.petsc import PETSc, DEFAULT_KSP_PARAMETERS
from firedrake.formmanipulation import ExtractSubBlock
from firedrake.ufl_expr import action
from firedrake.utils import cached_property
from firedrake.logging import warning


def _make_reasons(reasons):
    return {getattr(reasons, r): r
            for r in dir(reasons) if not r.startswith('_')}


KSPReasons = _make_reasons(PETSc.KSP.ConvergedReason())
SNESReasons = _make_reasons(PETSc.SNES.ConvergedReason())


[docs] def set_defaults(solver_parameters, arguments, *, ksp_defaults=None, snes_defaults=None): """Set defaults for solver parameters. :arg solver_parameters: dict of user solver parameters to override/extend defaults :arg arguments: arguments for the bilinear form (need to know if we have a Real block). :arg ksp_defaults: Default KSP parameters. :arg snes_defaults: Default SNES parameters.""" if ksp_defaults is None: ksp_defaults = {} if snes_defaults is None: snes_defaults = {} if solver_parameters: # User configured something, try and set sensible direct solve # defaults for missing bits. parameters = solver_parameters.copy() for k, v in snes_defaults.items(): parameters.setdefault(k, v) keys = frozenset(parameters.keys()) ksp_defaults = ksp_defaults.copy() skip = set() if "pc_type" in keys: # Might reasonably expect to get petsc defaults skip.update({"pc_factor_mat_solver_type", "ksp_type"}) if parameters.get("mat_type") in {"matfree", "nest"}: # Non-LU defaults. ksp_defaults["ksp_type"] = "gmres" ksp_defaults["pc_type"] = "jacobi" for k, v in ksp_defaults.items(): if k not in skip: parameters.setdefault(k, v) return parameters # OK, we're in full defaults mode now. parameters = dict(chain.from_iterable( d.items() for d in (ksp_defaults, snes_defaults))) if any(V.ufl_element().family() == "Real" for a in arguments for V in a.function_space()): test, trial = arguments if test.function_space() != trial.function_space(): # Don't know what to do here. How did it happen? raise ValueError("Can't generate defaults for non-square problems with real blocks") fields = [] reals = [] for i, V_ in enumerate(test.function_space()): if V_.ufl_element().family() == "Real": reals.append(i) else: fields.append(i) if len(fields) == 0: # Just reals, GMRES opts = {"ksp_type": "gmres", "pc_type": "none"} parameters.update(opts) else: warning("Real block detected, generating Schur complement elimination PC") # Full Schur complement eliminating onto Real blocks. opts = {"mat_type": "matfree", "ksp_type": "fgmres", "pc_type": "fieldsplit", "pc_fieldsplit_type": "schur", "pc_fieldsplit_schur_fact_type": "full", "pc_fieldsplit_0_fields": ",".join(map(str, fields)), "pc_fieldsplit_1_fields": ",".join(map(str, reals)), "fieldsplit_0": { "ksp_type": "preonly", "pc_type": "python", "pc_python_type": "firedrake.AssembledPC", "assembled": DEFAULT_KSP_PARAMETERS, }, "fieldsplit_1": { "ksp_type": "gmres", "pc_type": "none", }} parameters.update(opts) return parameters else: # We can assemble an AIJ matrix, factor it with a sparse # direct method. return parameters
[docs] def check_snes_convergence(snes): r = snes.getConvergedReason() try: reason = SNESReasons[r] inner = False except KeyError: r = snes.getKSP().getConvergedReason() try: inner = True reason = KSPReasons[r] except KeyError: reason = "unknown reason (petsc4py enum incomplete?), try with -snes_converged_reason and -ksp_converged_reason" if r < 0: if inner: msg = "Inner linear solve failed to converge after %d iterations with reason: %s" % \ (snes.getKSP().getIterationNumber(), reason) else: msg = reason raise ConvergenceError(r"""Nonlinear solve failed to converge after %d nonlinear iterations. Reason: %s""" % (snes.getIterationNumber(), msg))
class _SNESContext(object): r""" Context holding information for SNES callbacks. :arg problem: a :class:`NonlinearVariationalProblem`. :arg mat_type: Indicates whether the Jacobian is assembled monolithically ('aij'), as a block sparse matrix ('nest') or matrix-free (as :class:`~.ImplicitMatrix`, 'matfree'). :arg pmat_type: Indicates whether the preconditioner (if present) is assembled monolithically ('aij'), as a block sparse matrix ('nest') or matrix-free (as :class:`~.ImplicitMatrix`, 'matfree'). :arg appctx: Any extra information used in the assembler. For the matrix-free case this will contain the Newton state in ``"state"``. :arg pre_jacobian_callback: User-defined function called immediately before Jacobian assembly :arg post_jacobian_callback: User-defined function called immediately after Jacobian assembly :arg pre_function_callback: User-defined function called immediately before residual assembly :arg post_function_callback: User-defined function called immediately after residual assembly :arg options_prefix: The options prefix of the SNES. :arg transfer_manager: Object that can transfer functions between levels, typically a :class:`~.TransferManager` :arg pre_apply_bcs: If `False`, the problem is linearised around the initial guess before imposing the boundary conditions. The idea here is that the SNES holds a shell DM which contains this object as "user context". When the SNES calls back to the user form_function code, we pull the DM out of the SNES and then get the context (which is one of these objects) to find the Firedrake level information. """ @PETSc.Log.EventDecorator() def __init__(self, problem, mat_type, pmat_type, appctx=None, pre_jacobian_callback=None, pre_function_callback=None, post_jacobian_callback=None, post_function_callback=None, options_prefix=None, transfer_manager=None, pre_apply_bcs=True): from firedrake.assemble import get_assembler if pmat_type is None: pmat_type = mat_type self.mat_type = mat_type self.pmat_type = pmat_type self.options_prefix = options_prefix self.pre_apply_bcs = pre_apply_bcs matfree = mat_type == 'matfree' pmatfree = pmat_type == 'matfree' self._problem = problem self._pre_jacobian_callback = pre_jacobian_callback self._pre_function_callback = pre_function_callback self._post_jacobian_callback = post_jacobian_callback self._post_function_callback = post_function_callback self.fcp = problem.form_compiler_parameters # Function to hold current guess self._x = problem.u_restrict if appctx is None: appctx = {} # A split context will already get the full state. # TODO, a better way of doing this. # Now we don't have a temporary state inside the snes # context we could just require the user to pass in the # full state on the outside. appctx.setdefault("state", self._x) appctx.setdefault("form_compiler_parameters", self.fcp) self.appctx = appctx self.matfree = matfree self.pmatfree = pmatfree self.F = problem.F self.J = problem.J # For Jp to equal J, bc.Jp must equal bc.J for all EquationBC objects. Jp_eq_J = problem.Jp is None and all(bc.Jp_eq_J for bc in problem.bcs) if mat_type != pmat_type or not Jp_eq_J: # Need separate pmat if either Jp is different or we want # a different pmat type to the mat type. if problem.Jp is None: self.Jp = self.J else: self.Jp = problem.Jp else: # pmat_type == mat_type and Jp_eq_J self.Jp = None self.bcs_F = tuple(bc.extract_form('F') for bc in problem.bcs) self.bcs_J = tuple(bc.extract_form('J') for bc in problem.bcs) self.bcs_Jp = tuple(bc.extract_form('Jp') for bc in problem.bcs) self._bc_residual = None if not pre_apply_bcs: # Delayed lifting of DirichletBCs self._bc_residual = Function(self._x.function_space()) if problem.is_linear: # Drop existing lifting term from the resiudal assert isinstance(self.F, ufl.BaseForm) self.F = ufl.replace(self.F, {self._x: ufl.zero(self._x.ufl_shape)}) self.F -= action(self.J, self._bc_residual) self._assemble_residual = get_assembler(self.F, bcs=self.bcs_F, form_compiler_parameters=self.fcp, zero_bc_nodes=pre_apply_bcs, ).assemble self._jacobian_assembled = False self._splits = {} self._coarse = None self._fine = None self._nullspace = None self._nullspace_T = None self._near_nullspace = None self._transfer_manager = transfer_manager @property def transfer_manager(self): """This allows the transfer manager to be set from options, e.g. solver_parameters = {"ksp_type": "cg", "pc_type": "mg", "mg_transfer_manager": __name__ + ".manager"} The value for "mg_transfer_manager" can either be a specific instantiated object, or a function or class name. In the latter case it will be invoked with no arguments to instantiate the object. If "snes_type": "fas" is used, the relevant option is "fas_transfer_manager", with the same semantics. """ if self._transfer_manager is None: opts = PETSc.Options() prefix = self.options_prefix or "" if opts.hasName(prefix + "mg_transfer_manager"): managername = opts[prefix + "mg_transfer_manager"] elif opts.hasName(prefix + "fas_transfer_manager"): managername = opts[prefix + "fas_transfer_manager"] else: managername = None if managername is None: from firedrake import TransferManager transfer = TransferManager(use_averaging=True) else: (modname, objname) = managername.rsplit('.', 1) mod = __import__(modname) obj = getattr(mod, objname) if isinstance(obj, type): transfer = obj() else: transfer = obj self._transfer_manager = transfer return self._transfer_manager @transfer_manager.setter def transfer_manager(self, manager): if self._transfer_manager is not None: raise ValueError("Must set transfer manager before first use.") self._transfer_manager = manager def set_function(self, snes): r"""Set the residual evaluation function""" with self._F.dat.vec_wo as v: snes.setFunction(self.form_function, v) def set_jacobian(self, snes): snes.setJacobian(self.form_jacobian, J=self._jac.petscmat, P=self._pjac.petscmat) def set_nullspace(self, nullspace, ises=None, transpose=False, near=False): if nullspace is None: return nullspace._apply(self._jac, transpose=transpose, near=near) if self.Jp is not None: nullspace._apply(self._pjac, transpose=transpose, near=near) if ises is not None: nullspace._apply(ises, transpose=transpose, near=near) @PETSc.Log.EventDecorator() def split(self, fields): from firedrake import replace, as_vector, split from firedrake import NonlinearVariationalProblem as NLVP from firedrake.bcs import DirichletBC, EquationBC fields = tuple(tuple(f) for f in fields) splits = self._splits.get(tuple(fields)) if splits is not None: return splits splits = [] problem = self._problem splitter = ExtractSubBlock() for field in fields: F = splitter.split(problem.F, argument_indices=(field, )) J = splitter.split(problem.J, argument_indices=(field, field)) us = problem.u_restrict.subfunctions V = F.arguments()[0].function_space() # Exposition: # We are going to make a new solution Function on the sub # mixed space defined by the relevant fields. # But the form may refer to the rest of the solution # anyway. # So we pull it apart and will make a new function on the # subspace that shares data. pieces = [us[i].dat for i in field] if len(pieces) == 1: val, = pieces subu = Function(V, val=val) subsplit = (subu, ) else: val = op2.MixedDat(pieces) subu = Function(V, val=val) # Split it apart to shove in the form. subsplit = split(subu) vec = [] for i, u in enumerate(us): if i in field: # If this is a field we're keeping, get it from # the new function. Otherwise just point to the # old data. u = subsplit[field.index(i)] if u.ufl_shape == (): vec.append(u) else: vec.extend(u[idx] for idx in numpy.ndindex(u.ufl_shape)) # So now we have a new representation for the solution # vector in the old problem. For the fields we're going # to solve for, it points to a new Function (which wraps # the original pieces). For the rest, it points to the # pieces from the original Function. # IOW, we've reinterpreted our original mixed solution # function as being made up of some spaces we're still # solving for, and some spaces that have just become # coefficients in the new form. u = as_vector(vec) F = replace(F, {problem.u_restrict: u}) J = replace(J, {problem.u_restrict: u}) if problem.Jp is not None: Jp = splitter.split(problem.Jp, argument_indices=(field, field)) Jp = replace(Jp, {problem.u_restrict: u}) else: Jp = None bcs = [] for bc in problem.bcs: if isinstance(bc, DirichletBC): bc_temp = bc.reconstruct(field=field, V=V, g=bc.function_arg, sub_domain=bc.sub_domain) elif isinstance(bc, EquationBC): bc_temp = bc.reconstruct(V, subu, u, field, problem.is_linear) if bc_temp is not None: bcs.append(bc_temp) new_problem = NLVP(F, subu, bcs=bcs, J=J, Jp=Jp, is_linear=problem.is_linear, form_compiler_parameters=problem.form_compiler_parameters) new_problem._constant_jacobian = problem._constant_jacobian splits.append(type(self)(new_problem, mat_type=self.mat_type, pmat_type=self.pmat_type, appctx=self.appctx, transfer_manager=self.transfer_manager, pre_apply_bcs=self.pre_apply_bcs)) return self._splits.setdefault(tuple(fields), splits) @staticmethod def form_function(snes, X, F): r"""Form the residual for this problem :arg snes: a PETSc SNES object :arg X: the current guess (a Vec) :arg F: the residual at X (a Vec) """ dm = snes.getDM() ctx = dmhooks.get_appctx(dm) # X may not be the same vector as the vec behind self._x, so # copy guess in from X. with ctx._x.dat.vec_wo as v: X.copy(v) if ctx._pre_function_callback is not None: ctx._pre_function_callback(X) if not ctx.pre_apply_bcs: # Compute DirichletBC residual for bc in ctx.bcs_F: bc.apply(ctx._bc_residual, u=ctx._x) ctx._assemble_residual(tensor=ctx._F, current_state=ctx._x) if ctx._post_function_callback is not None: with ctx._F.dat.vec as F_: ctx._post_function_callback(X, F_) # F may not be the same vector as self._F, so copy # residual out to F. with ctx._F.dat.vec_ro as v: v.copy(F) @staticmethod def form_jacobian(snes, X, J, P): r"""Form the Jacobian for this problem :arg snes: a PETSc SNES object :arg X: the current guess (a Vec) :arg J: the Jacobian (a Mat) :arg P: the preconditioner matrix (a Mat) """ dm = snes.getDM() ctx = dmhooks.get_appctx(dm) problem = ctx._problem assert J.handle == ctx._jac.petscmat.handle if problem._constant_jacobian and ctx._jacobian_assembled: # Don't need to do any work with a constant jacobian # that's already assembled return ctx._jacobian_assembled = True # X may not be the same vector as the vec behind self._x, so # copy guess in from X. with ctx._x.dat.vec_wo as v: X.copy(v) if ctx._pre_jacobian_callback is not None: ctx._pre_jacobian_callback(X) ctx._assemble_jac(ctx._jac) if ctx._post_jacobian_callback is not None: ctx._post_jacobian_callback(X, J) if ctx.Jp is not None: assert P.handle == ctx._pjac.petscmat.handle ctx._assemble_pjac(ctx._pjac) ises = problem.J.arguments()[0].function_space()._ises ctx.set_nullspace(ctx._nullspace, ises, transpose=False, near=False) ctx.set_nullspace(ctx._nullspace_T, ises, transpose=True, near=False) ctx.set_nullspace(ctx._near_nullspace, ises, transpose=False, near=True) @staticmethod def compute_operators(ksp, J, P): r"""Form the Jacobian for this problem :arg ksp: a PETSc KSP object :arg J: the Jacobian (a Mat) :arg P: the preconditioner matrix (a Mat) """ dm = ksp.getDM() ctx = dmhooks.get_appctx(dm) problem = ctx._problem assert J.handle == ctx._jac.petscmat.handle if problem._constant_jacobian and ctx._jacobian_assembled: # Don't need to do any work with a constant jacobian # that's already assembled return ctx._jacobian_assembled = True fine = ctx._fine if fine is not None: manager = dmhooks.get_transfer_manager(fine._x.function_space().dm) manager.inject(fine._x, ctx._x) if ctx.pre_apply_bcs: for bc in problem.dirichlet_bcs(): bc.apply(ctx._x) ctx._assemble_jac(ctx._jac) if ctx.Jp is not None: assert P.handle == ctx._pjac.petscmat.handle ctx._assemble_pjac(ctx._pjac) @cached_property def _assembler_jac(self): from firedrake.assemble import get_assembler return get_assembler(self.J, bcs=self.bcs_J, form_compiler_parameters=self.fcp, mat_type=self.mat_type, options_prefix=self.options_prefix, appctx=self.appctx) @cached_property def _jac(self): return self._assembler_jac.allocate() @cached_property def _assemble_jac(self): return self._assembler_jac.assemble @cached_property def is_mixed(self): return self._jac.block_shape != (1, 1) @cached_property def _assembler_pjac(self): from firedrake.assemble import get_assembler if self.mat_type != self.pmat_type or self._problem.Jp is not None: return get_assembler(self.Jp, bcs=self.bcs_Jp, form_compiler_parameters=self.fcp, mat_type=self.pmat_type, options_prefix=self.options_prefix, appctx=self.appctx) else: return self._assembler_jac @cached_property def _pjac(self): if self.mat_type != self.pmat_type or self._problem.Jp is not None: return self._assembler_pjac.allocate() else: return self._jac @cached_property def _assemble_pjac(self): return self._assembler_pjac.assemble @cached_property def _F(self): return Cofunction(self.F.arguments()[0].function_space().dual())