Source code for firedrake.slate.static_condensation.la_utils

from collections import namedtuple
from pyop2.utils import as_tuple

from firedrake.formmanipulation import split_form
from firedrake.parameters import parameters
from firedrake.slate.slate import DiagonalTensor, Tensor, AssembledVector
from firedrake.petsc import PETSc


LAContext = namedtuple("LAContext",
                       ["lhs", "rhs", "field_idx"])
LAContext.__doc__ = """\
Context information for systems of equations after
applying algebraic transformation via Slate-supported
operations. This object provides the symbolic expressions
for the transformed linear system of equations.

:param lhs: The resulting expression for the transformed
            left-hand side matrix.
:param rhs: The resulting expression for the transformed
            right-hand side vector.
:param field_idx: An integer or iterable of integers
                  (if the system is mixed) denoting
                  which field(s) the resulting solution
                  is defined on.
"""


[docs] def condense_and_forward_eliminate(A, b, elim_fields, prefix, pc): """Returns Slate expressions for the operator and right-hand side vector after eliminating specified unknowns. :arg A: a `slate.Tensor` corresponding to the mixed UFL operator. :arg b: a `firedrake.Function` corresponding to the right-hand side. :arg elim_fields: a `tuple` of indices denoting which fields to eliminate. :arg prefix: an option prefix for the condensed field. :arg pc: a Preconditioner instance. :returns: a tuple of `LAContext` and `SchurComplementBuilder` """ if not isinstance(A, Tensor): raise ValueError("Left-hand operator must be a Slate Tensor") # Ensures field indices are in increasing order id_0, id_1 = elim_fields[0], elim_fields[-1] elim_fields = list(as_tuple(elim_fields)) elim_fields.sort() all_fields = list(range(len(A.arg_function_spaces[0]))) condensed_fields = list(set(all_fields) - set(elim_fields)) condensed_fields.sort() _A = A.blocks _b = AssembledVector(b).blocks # NOTE: Does not support non-contiguous field elimination e_idx0 = elim_fields[0] e_idx1 = elim_fields[-1] f_idx0 = condensed_fields[0] f_idx1 = condensed_fields[-1] # Finite element systems where static condensation # is possible have the general form: # # | A_ee A_ef || x_e | | b_e | # | || | = | | # | A_fe A_ff || x_f | | b_f | # # where subscript `e` denotes the coupling with fields # that will be eliminated, and `f` denotes the condensed # fields. outer_id_0 = slice(e_idx0, e_idx1 + 1) outer_id_1 = slice(f_idx0, f_idx1 + 1) Aff = _A[outer_id_1, outer_id_1] Aef = _A[outer_id_0, outer_id_1] Afe = _A[outer_id_1, outer_id_0] Aee = _A[outer_id_0, outer_id_0] bf = _b[outer_id_1] be = _b[outer_id_0] # The reduced operator and right-hand side are: # S = A_ff - A_fe * A_ee.inv * A_ef # r = b_f - A_fe * A_ee.inv * b_e # TODO check the vidx and pidx indices are correct schur_builder = SchurComplementBuilder(prefix, Aee, Afe, Aef, pc, id_0, id_1, non_zero_saddle_mat=Aff) r, S = schur_builder.build_schur(be, non_zero_saddle_rhs=bf) field_idx = [idx for idx in range(f_idx0, f_idx1)] return LAContext(lhs=S, rhs=r, field_idx=field_idx), schur_builder
[docs] def backward_solve(A, b, x, schur_builder, reconstruct_fields): """Returns a sequence of linear algebra contexts containing Slate expressions for backwards substitution. :arg A: a `slate.Tensor` corresponding to the mixed UFL operator. :arg b: a `firedrake.Function` corresponding to the right-hand side. :arg x: a `firedrake.Function` corresponding to the solution. :arg schur_builder: a `SchurComplementBuilder` :arg reconstruct_fields: a `tuple` of indices denoting which fields to reconstruct. :returns: a list of `LAContext` for the reconstruction """ if not isinstance(A, Tensor): raise ValueError("Left-hand operator must be a Slate Tensor") all_fields = list(range(len(A.arg_function_spaces[0]))) nfields = len(all_fields) reconstruct_fields = as_tuple(reconstruct_fields) _b = b.subfunctions _x = x.subfunctions # Ordering matters systems = [] # Reconstruct one unknown from one determined field: # # | A_ee A_ef || x_e | | b_e | # | || | = | | # | A_fe A_ff || x_f | | b_f | # # where x_f is known from a previous computation. # Returns the system: # # A_ee x_e = b_e - A_ef * x_f. if nfields == 2: id_e, = reconstruct_fields id_f, = [idx for idx in all_fields if idx != id_e] A_eeinv = schur_builder.A00_inv_hat A_ef = schur_builder.KT b_e = AssembledVector(_b[id_e]) x_f = AssembledVector(_x[id_f]) r_e = b_e - A_ef * x_f local_system = LAContext(lhs=A_eeinv, rhs=r_e, field_idx=(id_e,)) systems.append(local_system) # Reconstruct two unknowns from one determined field: # # | A_e0e0 A_e0e1 A_e0f || x_e0 | | b_e0 | # | A_e1e0 A_e1e1 A_e1f || x_e1 | = | b_e1 | # | A_fe0 A_fe1 A_ff || x_f | | b_f | # # where x_f is the known field. Returns two systems to be # solved in order (determined from the reverse order of indices # e0 and e1): # # Solve for e1 first (obtained from eliminating x_e0): # # S_e1 x_e1 = r_e1 # # where # # S_e1 = A_e1e1 - A_e1e0 * A_e0e0.inv * A_e0e1, and # r_e1 = b_e1 - A_e1e0 * A_e0e0.inv * b_e0 # - (A_e1f - A_e1e0 * A_e0e0.inv * A_e0f) * x_f, # # And then solve for x_e0 given x_f and x_e1: # # A_e0e0 x_e0 = b_e0 - A_e0e1 * x_e1 - A_e0f * x_f. elif nfields == 3: if len(reconstruct_fields) != nfields - 1: raise NotImplementedError("Implemented for 1 determined field") # Order of reconstruction doesn't need to be in order # of increasing indices id_e0, id_e1 = reconstruct_fields id_f, = [idx for idx in all_fields if idx not in reconstruct_fields] _, A_e0e1, A_e1e0, _ = schur_builder.list_split_mixed_ops A_e0f, A_e1f = schur_builder.list_split_trace_ops_transpose A_e0e0inv = schur_builder.A00_inv_hat x_e1 = AssembledVector(_x[id_e1]) x_f = AssembledVector(_x[id_f]) b_e0 = AssembledVector(_b[id_e0]) b_e1 = AssembledVector(_b[id_e1]) # Solve for e1 Sf = A_e1f - A_e1e0 * A_e0e0inv * A_e0f r_e1 = b_e1 - A_e1e0 * A_e0e0inv * b_e0 - Sf * x_f Sinv = schur_builder.inner_S_inv_hat systems.append(LAContext(lhs=Sinv, rhs=r_e1, field_idx=(id_e1,))) # Solve for e0 r_e0 = b_e0 - A_e0e1 * x_e1 - A_e0f * x_f systems.append(LAContext(lhs=A_e0e0inv, rhs=r_e0, field_idx=(id_e0,))) else: msg = "Not implemented for systems with %s fields" % nfields raise NotImplementedError(msg) return systems
[docs] class SchurComplementBuilder(object): """A Slate-based Schur complement expression builder. The expression is used in the trace system solve and parts of it in the reconstruction calls of the other two variables of the hybridised system. How the Schur complement if constructed, and in particular how the local inverse of the mixed matrix is built, is controlled with PETSc options. All corresponding PETSc options start with ``hybridization_localsolve``. The following option sets are valid together with the usual set of hybridisation options: .. code-block:: text {'localsolve': {'ksp_type': 'preonly', 'pc_type': 'fieldsplit', 'pc_fieldsplit_type': 'schur'}} A Schur complement is requested for the mixed matrix inverse which appears inside the Schur complement of the trace system solve. The Schur complements are then nested. For details see defition of :meth:`build_schur`. No fieldsplit options are set so all local inverses are calculated explicitly. .. code-block:: text 'localsolve': {'ksp_type': 'preonly', 'pc_type': 'fieldsplit', 'pc_fieldsplit_type': 'schur', 'fieldsplit_1': {'ksp_type': 'default', 'pc_type': 'python', 'pc_python_type': __name__ + '.DGLaplacian'}} The inverse of the Schur complement inside the Schur decomposition of the mixed matrix inverse is approximated by a default solver (LU in the matrix-explicit case) which is preconditioned by a user-defined operator, e.g. a DG Laplacian, see :meth:`build_inner_S_inv`. So :math:`P_S * S * x = P_S * b`. .. code-block:: text 'localsolve': {'ksp_type': 'preonly', 'pc_type': 'fieldsplit', 'pc_fieldsplit_type': 'schur', 'fieldsplit_1': {'ksp_type': 'default', 'pc_type': 'python', 'pc_python_type': __name__ + '.DGLaplacian', 'aux_ksp_type': 'preonly'} 'aux_pc_type': 'jacobi'}}}} The inverse of the Schur complement inside the Schur decomposition of the mixed matrix inverse is approximated by a default solver (LU in the matrix-explicit case) which is preconditioned by a user-defined operator, e.g. a DG Laplacian. The inverse of the preconditioning matrix is approximated through the inverse of only the diagonal of the provided operator, see :meth:`build_Sapprox_inv`. So :math:`diag(P_S).inv * S * x = diag(P_S).inv * b`. .. code-block:: text 'localsolve': {'ksp_type': 'preonly', 'pc_type': 'fieldsplit', 'pc_fieldsplit_type': 'schur', 'fieldsplit_0': {'ksp_type': 'default', 'pc_type': 'jacobi'} The inverse of the :math:`A_{00}` block of the mixed matrix is approximated by a default solver (LU in the matrix-explicit case) which is preconditioned by the diagonal matrix of :math:`A_{00}, see :meth:`build_A00_inv`. So :math:`diag(A_{00}).inv * A_{00} * x = diag(A_{00}).inv * b`. .. code-block:: text 'localsolve': {'ksp_type': 'preonly', 'pc_type': 'fieldsplit', 'pc_fieldsplit_type': 'None', 'fieldsplit_0': ... 'fieldsplit_1': ... All the options for ``fieldsplit_`` are still valid if ``'pc_fieldsplit_type': 'None'.`` In this case the mixed matrix inverse which appears inside the Schur complement of the trace system solve is calculated explicitly, but the local inverses of :math:`A_{00}` and the Schur complement in the reconstructions calls are still treated according to the options in ``fieldsplit_``. """ def __init__(self, prefix, Atilde, K, KT, pc, vidx, pidx, non_zero_saddle_mat=None): # set options, operators and order of sub-operators self.Atilde = Atilde self.K = K self.KT = KT self.vidx = vidx self.pidx = pidx # prefixes self.prefix = prefix + "localsolve_" self._retrieve_options(pc) self.non_zero_saddle_mat = non_zero_saddle_mat # Check if Atilde is mixed all_fields = list(range(len(Atilde.arg_function_spaces[0]))) self.nfields = len(all_fields) self._split_mixed_operator() # build all inverses self.A00_inv_hat = self.build_A00_inv() if self.nfields > 1: self.schur_approx = self.retrieve_user_S_approx(pc, self.schur_approx) if self.schur_approx else None self.inner_S = self.build_inner_S() self.inner_S_approx_inv_hat = self.build_Sapprox_inv() self.inner_S_inv_hat = self.build_inner_S_inv() def _split_mixed_operator(self): split_mixed_op = dict(split_form(self.Atilde.form)) id0, id1 = (self.vidx, self.pidx) A00 = Tensor(split_mixed_op[(id0, id0)]) self.list_split_mixed_ops = [A00, None, None, None] if self.nfields > 1: A01 = Tensor(split_mixed_op[(id0, id1)]) A10 = Tensor(split_mixed_op[(id1, id0)]) A11 = Tensor(split_mixed_op[(id1, id1)]) self.list_split_mixed_ops = [A00, A01, A10, A11] split_trace_op = dict(split_form(self.K.form)) K0 = Tensor(split_trace_op[(0, id0)]) K1 = Tensor(split_trace_op[(0, id1)]) self.list_split_trace_ops = [K0, K1] split_trace_op_transpose = dict(split_form(self.KT.form)) K0 = Tensor(split_trace_op_transpose[(id0, 0)]) K1 = Tensor(split_trace_op_transpose[(id1, 0)]) self.list_split_trace_ops_transpose = [K0, K1] def _check_options(self, valid): default = object() opts = PETSc.Options(self.prefix) for key, supported in valid: value = opts.getString(key, default=default) if value is not default and value not in supported: raise ValueError(f"Unsupported value ({value}) for '{self.prefix + key}'. " f"Should be one of {supported}") def _retrieve_options(self, pc): get_option = lambda key: PETSc.Options(self.prefix).getString(key, default="") # Get options for Schur complement decomposition self._check_options([("ksp_type", {"preonly"}), ("pc_type", {"fieldsplit"}), ("pc_fieldsplit_type", {"schur"})]) self.nested = (get_option("ksp_type") == "preonly" and get_option("pc_type") == "fieldsplit" and get_option("pc_fieldsplit_type") == "schur") # Get preconditioning options for A00 fs0, fs1 = ("fieldsplit_"+str(idx) for idx in (self.vidx, self.pidx)) self._check_options([(fs0+"ksp_type", {"preonly", "default"}), (fs0+"pc_type", {"jacobi"})]) self.preonly_A00 = get_option(fs0+"_ksp_type") == "preonly" self.jacobi_A00 = get_option(fs0+"_pc_type") == "jacobi" # Get preconditioning options for the Schur complement self._check_options([(fs1+"ksp_type", {"preonly", "default"}), (fs1+"pc_type", {"jacobi", "python"})]) self.preonly_S = get_option(fs1+"_ksp_type") == "preonly" self.jacobi_S = get_option(fs1+"_pc_type") == "jacobi" # Get user supplied operator and its options self.schur_approx = (get_option(fs1+"_pc_python_type") if get_option(fs1+"_pc_type") == "python" else False) self._check_options([(fs1+"aux_ksp_type", {"preonly", "default"}), (fs1+"aux_pc_type", {"jacobi"})]) self.preonly_Shat = get_option(fs1+"_aux_ksp_type") == "preonly" self.jacobi_Shat = get_option(fs1+"_aux_pc_type") == "jacobi" if self.jacobi_Shat or self.jacobi_A00: assert parameters["slate_compiler"]["optimise"], ("Local systems should only get preconditioned with " "a preconditioning matrix if the Slate optimiser replaces " "inverses by solves.")
[docs] def build_inner_S(self): """Build the inner Schur complement.""" _, A01, A10, A11 = self.list_split_mixed_ops return A11 - A10 * self.A00_inv_hat * A01
[docs] def inv(self, A, P, prec, preonly=False): """ Calculates the inverse of an operator A. The inverse is potentially approximated through a solve which is potentially preconditioned with the preconditioner P if prec is True. The inverse of A may be just approximated with the inverse of P if prec and replace. """ return (P if prec and preonly else (P*A).inv * P if prec else A.inv)
[docs] def build_inner_S_inv(self): """ Calculates the inverse of the schur complement. The inverse is potentially approximated through a solve which is potentially preconditioned with the preconditioner P. """ A = self.inner_S P = self.inner_S_approx_inv_hat prec = bool(self.schur_approx) or self.jacobi_S return self.inv(A, P, prec, self.preonly_S)
[docs] def build_Sapprox_inv(self): """ Calculates the inverse of preconditioner to the Schur complement, which can be either the schur complement approximation provided by the user or jacobi. The inverse is potentially approximated through a solve which is potentially preconditioned with jacobi. """ prec = (bool(self.schur_approx) and self.jacobi_Shat) or self.jacobi_S A = self.schur_approx if self.schur_approx else self.inner_S P = DiagonalTensor(A).inv preonly = self.preonly_Shat if self.schur_approx else True return self.inv(A, P, prec, preonly)
[docs] def build_A00_inv(self): """ Calculates the inverse of :math:`A_{00}`, the (0,0)-block of the mixed matrix Atilde. The inverse is potentially approximated through a solve which is potentially preconditioned with jacobi. """ A, _, _, _ = self.list_split_mixed_ops P = DiagonalTensor(A).inv return self.inv(A, P, self.jacobi_A00, self.preonly_A00)
[docs] def retrieve_user_S_approx(self, pc, usercode): """Retrieve a user-defined :class:firedrake.preconditioners.AuxiliaryOperator from the PETSc Options, which is an approximation to the Schur complement and its inverse is used to precondition the local solve in the reconstruction calls (e.g.). """ _, _, _, A11 = self.list_split_mixed_ops test, trial = A11.arguments() if usercode != "": (modname, funname) = usercode.rsplit('.', 1) mod = __import__(modname) fun = getattr(mod, funname) if isinstance(fun, type): fun = fun() return Tensor(fun.form(pc, test, trial)[0]) else: return None
[docs] def build_schur(self, rhs, non_zero_saddle_rhs=None): """The Schur complement in the operators of the trace solve contains the inverse on a mixed system. Users may want this inverse to be treated with another Schur complement. Let the mixed matrix Atilde be called A here. Then, if a nested schur complement is requested, the inverse of Atilde is rewritten with help of a a Schur decomposition as follows. .. code-block:: text A.inv=[[I, -A00.inv * A01] * [[A00.inv, 0 ] * [[I, 0] [0, I ]] [0, S.inv]] [-A10* A00.inv, I]] -------------------- ----------------- ------------------ block1 block2 block3 with the (inner) schur complement S = A11 - A10 * A00.inv * A01 """ if self.nested: _, A01, A10, _ = self.list_split_mixed_ops K0, K1 = self.list_split_trace_ops KT0, KT1 = self.list_split_trace_ops_transpose R = [rhs.blocks[self.vidx], rhs.blocks[self.pidx]] # K * block1 K_Ainv_block1 = [K0, -K0 * self.A00_inv_hat * A01 + K1] # K * block1 * block2 K_Ainv_block2 = [K_Ainv_block1[0] * self.A00_inv_hat, K_Ainv_block1[1] * self.inner_S_inv_hat] # K * block1 * block2 * block3 K_Ainv_block3 = [K_Ainv_block2[0] - K_Ainv_block2[1] * A10 * self.A00_inv_hat, K_Ainv_block2[1]] # K * block1 * block2 * block3 * broken residual schur_rhs = (K_Ainv_block3[0] * R[0] + K_Ainv_block3[1] * R[1]) # K * block1 * block2 * block3 * K.T schur_comp = K_Ainv_block3[0] * KT0 + K_Ainv_block3[1] * KT1 else: P = DiagonalTensor(self.Atilde).inv Atildeinv = self.inv(self.Atilde, P, self.jacobi_A00, self.preonly_A00) schur_rhs = self.K * Atildeinv * rhs schur_comp = self.K * Atildeinv * self.KT if self.non_zero_saddle_mat or non_zero_saddle_rhs: assert self.non_zero_saddle_mat and non_zero_saddle_rhs, "The problem is not a saddle point system and you missed to pass either A11 or the corresponding part in the rhs." # problem is not a saddle point problem schur_rhs = non_zero_saddle_rhs - schur_rhs schur_comp = self.non_zero_saddle_mat - schur_comp return schur_rhs, schur_comp