Source code for gusto.solvers.linear_solvers

"""
This module provides abstract linear solver objects.

The linear solvers provided here are used for solving linear problems on mixed
finite element spaces.
"""

from firedrake import (
    split, LinearVariationalProblem, Constant, LinearVariationalSolver,
    TestFunctions, TrialFunctions, TestFunction, TrialFunction, lhs,
    rhs, FacetNormal, div, dx, jump, avg, dS, dS_v, dS_h, ds_v, ds_t, ds_b,
    ds_tb, inner, action, dot, grad, Function, VectorSpaceBasis, cross,
    BrokenElement, FunctionSpace, MixedFunctionSpace, DirichletBC, as_vector,
    assemble, conditional
)
from firedrake.fml import Term, drop
from firedrake.petsc import flatten_parameters
from firedrake.__future__ import interpolate
from pyop2.profiling import timed_function, timed_region

from gusto.equations.active_tracers import TracerVariableType
from gusto.core.logging import (
    logger, DEBUG, logging_ksp_monitor_true_residual,
    attach_custom_monitor
)
from gusto.core.labels import linearisation, time_derivative, hydrostatic
from gusto.equations import thermodynamics
from gusto.recovery.recovery_kernels import AverageWeightings, AverageKernel
from abc import ABCMeta, abstractmethod, abstractproperty


__all__ = ["BoussinesqSolver", "LinearTimesteppingSolver", "CompressibleSolver",
           "ThermalSWSolver", "MoistConvectiveSWSolver"]


class TimesteppingSolver(object, metaclass=ABCMeta):
    """Base class for timestepping linear solvers for Gusto."""

    def __init__(self, equations, alpha=0.5, tau_values=None,
                 solver_parameters=None, overwrite_solver_parameters=False):
        """
        Args:
            equations (:class:`PrognosticEquation`): the model's equation.
            alpha (float, optional): the semi-implicit off-centring factor.
                Defaults to 0.5. A value of 1 is fully-implicit.
            tau_values (dict, optional): contains the semi-implicit relaxation
                parameters. Defaults to None, in which case the value of alpha is used.
            solver_parameters (dict, optional): contains the options to be
                passed to the underlying :class:`LinearVariationalSolver`.
                Defaults to None.
            overwrite_solver_parameters (bool, optional): if True use only the
                `solver_parameters` that have been passed in. If False then
                update the default parameters with the `solver_parameters`
                passed in. Defaults to False.
        """
        self.equations = equations
        self.dt = equations.domain.dt
        self.alpha = Constant(alpha)
        self.tau_values = tau_values if tau_values is not None else {}

        if solver_parameters is not None:
            if not overwrite_solver_parameters:
                p = flatten_parameters(self.solver_parameters)
                p.update(flatten_parameters(solver_parameters))
                solver_parameters = p
            self.solver_parameters = solver_parameters

        if logger.isEnabledFor(DEBUG):
            self.solver_parameters["ksp_monitor_true_residual"] = None

        # setup the solver
        self._setup_solver()

    @staticmethod
    def log_ksp_residuals(ksp):
        if logger.isEnabledFor(DEBUG):
            ksp.setMonitor(logging_ksp_monitor_true_residual)

    @abstractproperty
    def solver_parameters(self):
        """Solver parameters for this solver"""
        pass

    @abstractmethod
    def _setup_solver(self):
        pass

    @abstractmethod
    def update_reference_profiles(self):
        """
        Update the solver when the reference profiles have changed.

        This typically includes forcing any Jacobians that depend on
        the reference profiles to be reassembled, and recalculating
        any values derived from the reference values.
        """
        pass

    @abstractmethod
    def solve(self):
        pass


[docs] class CompressibleSolver(TimesteppingSolver): """ Timestepping linear solver object for the compressible Euler equations. This solves a linear problem for the compressible Euler equations in theta-exner formulation with prognostic variables u (velocity), rho (density) and theta (potential temperature). It follows the following strategy: (1) Analytically eliminate theta (introduces error near topography) (2a) Formulate the resulting mixed system for u and rho using a hybridized mixed method. This breaks continuity in the linear perturbations of u, and introduces a new unknown on the mesh interfaces approximating the average of the Exner pressure perturbations. These trace unknowns also act as Lagrange multipliers enforcing normal continuity of the "broken" u variable. (2b) Statically condense the block-sparse system into a single system for the Lagrange multipliers. This is the only globally coupled system requiring a linear solver. (2c) Using the computed trace variables, we locally recover the broken velocity and density perturbations. This is accomplished in two stages: (i): Recover rho locally using the multipliers. (ii): Recover "broken" u locally using rho and the multipliers. (2d) Project the "broken" velocity field into the HDiv-conforming space using local averaging. (3) Reconstruct theta """ solver_parameters = {'mat_type': 'matfree', 'ksp_type': 'preonly', 'pc_type': 'python', 'pc_python_type': 'firedrake.SCPC', 'pc_sc_eliminate_fields': '0, 1', # The reduced operator is not symmetric 'condensed_field': {'ksp_type': 'fgmres', 'ksp_rtol': 1.0e-8, 'ksp_atol': 1.0e-8, 'ksp_max_it': 100, 'pc_type': 'gamg', 'pc_gamg_sym_graph': None, 'mg_levels': {'ksp_type': 'gmres', 'ksp_max_it': 5, 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'}}} def __init__(self, equations, alpha=0.5, tau_values=None, solver_parameters=None, overwrite_solver_parameters=False): """ Args: equations (:class:`PrognosticEquation`): the model's equation. alpha (float, optional): the semi-implicit off-centring factor. Defaults to 0.5. A value of 1 is fully-implicit. tau_values (dict, optional): contains the semi-implicit relaxation parameters. Defaults to None, in which case the value of alpha is used. solver_parameters (dict, optional): contains the options to be passed to the underlying :class:`LinearVariationalSolver`. Defaults to None. overwrite_solver_parameters (bool, optional): if True use only the `solver_parameters` that have been passed in. If False then update the default parameters with the `solver_parameters` passed in. Defaults to False. """ self.equations = equations self.quadrature_degree = equations.domain.max_quad_degree super().__init__(equations, alpha, tau_values, solver_parameters, overwrite_solver_parameters) @timed_function("Gusto:SolverSetup") def _setup_solver(self): equations = self.equations cp = equations.parameters.cp dt = self.dt # Set relaxation parameters. If an alternative has not been given, set # to semi-implicit off-centering factor beta_u = dt*self.tau_values.get("u", self.alpha) beta_t = dt*self.tau_values.get("theta", self.alpha) beta_r = dt*self.tau_values.get("rho", self.alpha) Vu = equations.domain.spaces("HDiv") Vu_broken = FunctionSpace(equations.domain.mesh, BrokenElement(Vu.ufl_element())) Vtheta = equations.domain.spaces("theta") Vrho = equations.domain.spaces("DG") h_deg = Vrho.ufl_element().degree()[0] v_deg = Vrho.ufl_element().degree()[1] Vtrace = FunctionSpace(equations.domain.mesh, "HDiv Trace", degree=(h_deg, v_deg)) # Split up the rhs vector (symbolically) self.xrhs = Function(self.equations.function_space) u_in, rho_in, theta_in = split(self.xrhs)[0:3] # Build the function space for "broken" u, rho, and pressure trace M = MixedFunctionSpace((Vu_broken, Vrho, Vtrace)) w, phi, dl = TestFunctions(M) u, rho, l0 = TrialFunctions(M) n = FacetNormal(equations.domain.mesh) # Get background fields _, rhobar, thetabar = split(equations.X_ref)[0:3] exnerbar = thermodynamics.exner_pressure(equations.parameters, rhobar, thetabar) exnerbar_rho = thermodynamics.dexner_drho(equations.parameters, rhobar, thetabar) exnerbar_theta = thermodynamics.dexner_dtheta(equations.parameters, rhobar, thetabar) # Analytical (approximate) elimination of theta k = equations.domain.k # Upward pointing unit vector theta = -dot(k, u)*dot(k, grad(thetabar))*beta_t + theta_in # Only include theta' (rather than exner') in the vertical # component of the gradient # The exner prime term (here, bars are for mean and no bars are # for linear perturbations) exner = exnerbar_theta*theta + exnerbar_rho*rho # Vertical projection def V(u): return k*inner(u, k) # hydrostatic projection h_project = lambda u: u - k*inner(u, k) # Specify degree for some terms as estimated degree is too large dx_qp = dx(degree=(equations.domain.max_quad_degree)) dS_v_qp = dS_v(degree=(equations.domain.max_quad_degree)) dS_h_qp = dS_h(degree=(equations.domain.max_quad_degree)) ds_v_qp = ds_v(degree=(equations.domain.max_quad_degree)) ds_tb_qp = (ds_t(degree=(equations.domain.max_quad_degree)) + ds_b(degree=(equations.domain.max_quad_degree))) # Add effect of density of water upon theta, using moisture reference profiles # TODO: Explore if this is the right thing to do for the linear problem if equations.active_tracers is not None: mr_t = Constant(0.0)*thetabar for tracer in equations.active_tracers: if tracer.chemical == 'H2O': if tracer.variable_type == TracerVariableType.mixing_ratio: idx = equations.field_names.index(tracer.name) mr_bar = split(equations.X_ref)[idx] mr_t += mr_bar else: raise NotImplementedError('Only mixing ratio tracers are implemented') theta_w = theta / (1 + mr_t) thetabar_w = thetabar / (1 + mr_t) else: theta_w = theta thetabar_w = thetabar _l0 = TrialFunction(Vtrace) _dl = TestFunction(Vtrace) a_tr = _dl('+')*_l0('+')*(dS_v_qp + dS_h_qp) + _dl*_l0*ds_v_qp + _dl*_l0*ds_tb_qp def L_tr(f): return _dl('+')*avg(f)*(dS_v_qp + dS_h_qp) + _dl*f*ds_v_qp + _dl*f*ds_tb_qp cg_ilu_parameters = {'ksp_type': 'cg', 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'} # Project field averages into functions on the trace space rhobar_avg = Function(Vtrace) exnerbar_avg = Function(Vtrace) rho_avg_prb = LinearVariationalProblem(a_tr, L_tr(rhobar), rhobar_avg, constant_jacobian=True) exner_avg_prb = LinearVariationalProblem(a_tr, L_tr(exnerbar), exnerbar_avg, constant_jacobian=True) self.rho_avg_solver = LinearVariationalSolver(rho_avg_prb, solver_parameters=cg_ilu_parameters, options_prefix='rhobar_avg_solver') self.exner_avg_solver = LinearVariationalSolver(exner_avg_prb, solver_parameters=cg_ilu_parameters, options_prefix='exnerbar_avg_solver') # "broken" u, rho, and trace system # NOTE: no ds_v integrals since equations are defined on # a periodic (or sphere) base mesh. if any([t.has_label(hydrostatic) for t in self.equations.residual]): u_mass = inner(w, (h_project(u) - u_in))*dx else: u_mass = inner(w, (u - u_in))*dx eqn = ( # momentum equation u_mass - beta_u*cp*div(theta_w*V(w))*exnerbar*dx_qp # following does nothing but is preserved in the comments # to remind us why (because V(w) is purely vertical). # + beta*cp*jump(theta_w*V(w), n=n)*exnerbar_avg('+')*dS_v_qp + beta_u*cp*jump(theta_w*V(w), n=n)*exnerbar_avg('+')*dS_h_qp + beta_u*cp*dot(theta_w*V(w), n)*exnerbar_avg*ds_tb_qp - beta_u*cp*div(thetabar_w*w)*exner*dx_qp # trace terms appearing after integrating momentum equation + beta_u*cp*jump(thetabar_w*w, n=n)*l0('+')*(dS_v_qp + dS_h_qp) + beta_u*cp*dot(thetabar_w*w, n)*l0*(ds_tb_qp + ds_v_qp) # mass continuity equation + (phi*(rho - rho_in) - beta_r*inner(grad(phi), u)*rhobar)*dx + beta_r*jump(phi*u, n=n)*rhobar_avg('+')*(dS_v + dS_h) # term added because u.n=0 is enforced weakly via the traces + beta_r*phi*dot(u, n)*rhobar_avg*(ds_tb + ds_v) # constraint equation to enforce continuity of the velocity # through the interior facets and weakly impose the no-slip # condition + dl('+')*jump(u, n=n)*(dS_v + dS_h) + dl*dot(u, n)*(ds_t + ds_b + ds_v) ) # TODO: can we get this term using FML? # contribution of the sponge term if hasattr(self.equations, "mu"): eqn += dt*self.equations.mu*inner(w, k)*inner(u, k)*dx_qp if equations.parameters.Omega is not None: Omega = as_vector([0, 0, equations.parameters.Omega]) eqn += inner(w, cross(2*Omega, u))*dx aeqn = lhs(eqn) Leqn = rhs(eqn) # Function for the hybridized solutions self.urhol0 = Function(M) hybridized_prb = LinearVariationalProblem(aeqn, Leqn, self.urhol0, constant_jacobian=True) hybridized_solver = LinearVariationalSolver(hybridized_prb, solver_parameters=self.solver_parameters, options_prefix='ImplicitSolver') self.hybridized_solver = hybridized_solver # Project broken u into the HDiv space using facet averaging. # Weight function counting the dofs of the HDiv element: self._weight = Function(Vu) weight_kernel = AverageWeightings(Vu) weight_kernel.apply(self._weight) # Averaging kernel self._average_kernel = AverageKernel(Vu) # HDiv-conforming velocity self.u_hdiv = Function(Vu) # Reconstruction of theta theta = TrialFunction(Vtheta) gamma = TestFunction(Vtheta) self.theta = Function(Vtheta) theta_eqn = gamma*(theta - theta_in + dot(k, self.u_hdiv)*dot(k, grad(thetabar))*beta_t)*dx theta_problem = LinearVariationalProblem(lhs(theta_eqn), rhs(theta_eqn), self.theta, constant_jacobian=True) self.theta_solver = LinearVariationalSolver(theta_problem, solver_parameters=cg_ilu_parameters, options_prefix='thetabacksubstitution') # Store boundary conditions for the div-conforming velocity to apply # post-solve self.bcs = self.equations.bcs['u'] # Log residuals on hybridized solver self.log_ksp_residuals(self.hybridized_solver.snes.ksp) # Log residuals on the trace system too python_context = self.hybridized_solver.snes.ksp.pc.getPythonContext() attach_custom_monitor(python_context, logging_ksp_monitor_true_residual)
[docs] @timed_function("Gusto:UpdateReferenceProfiles") def update_reference_profiles(self): with timed_region("Gusto:HybridProjectRhobar"): logger.info('Compressible linear solver: rho average solve') self.rho_avg_solver.solve() with timed_region("Gusto:HybridProjectExnerbar"): logger.info('Compressible linear solver: Exner average solve') self.exner_avg_solver.solve() # Because the left hand side of the hybridised problem depends # on the reference profile, the Jacobian matrix should change # when the reference profiles are updated. This call will tell # the hybridized_solver to reassemble the Jacobian next time # `solve` is called. self.hybridized_solver.invalidate_jacobian()
[docs] @timed_function("Gusto:LinearSolve") def solve(self, xrhs, dy): """ Solve the linear problem. Args: xrhs (:class:`Function`): the right-hand side field in the appropriate :class:`MixedFunctionSpace`. dy (:class:`Function`): the resulting field in the appropriate :class:`MixedFunctionSpace`. """ self.xrhs.assign(xrhs) # Solve the hybridized system logger.info('Compressible linear solver: hybridized solve') self.hybridized_solver.solve() broken_u, rho1, _ = self.urhol0.subfunctions u1 = self.u_hdiv # Project broken_u into the HDiv space u1.assign(0.0) with timed_region("Gusto:HybridProjectHDiv"): logger.info('Compressible linear solver: restore continuity') self._average_kernel.apply(u1, self._weight, broken_u) # Reapply bcs to ensure they are satisfied for bc in self.bcs: bc.apply(u1) # Copy back into u and rho cpts of dy u, rho, theta = dy.subfunctions[0:3] u.assign(u1) rho.assign(rho1) # Reconstruct theta with timed_region("Gusto:ThetaRecon"): logger.info('Compressible linear solver: theta solve') self.theta_solver.solve() # Copy into theta cpt of dy theta.assign(self.theta)
[docs] class BoussinesqSolver(TimesteppingSolver): """ Linear solver object for the Boussinesq equations. This solves a linear problem for the Boussinesq equations with prognostic variables u (velocity), p (pressure) and b (buoyancy). It follows the following strategy: This solver follows the following strategy: (1) Analytically eliminate b (introduces error near topography) (2) Solve resulting system for (u,p) using a hybrid-mixed method (3) Reconstruct b """ solver_parameters = { 'ksp_type': 'preonly', 'mat_type': 'matfree', 'pc_type': 'python', 'pc_python_type': 'firedrake.HybridizationPC', 'hybridization': {'ksp_type': 'cg', 'pc_type': 'gamg', 'ksp_rtol': 1e-8, 'mg_levels': {'ksp_type': 'chebyshev', 'ksp_max_it': 2, 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'}} } @timed_function("Gusto:SolverSetup") def _setup_solver(self): equation = self.equations # just cutting down line length a bit dt = self.dt # Set relaxation parameters. If an alternative has not been given, set # to semi-implicit off-centering factor beta_u = dt*self.tau_values.get("u", self.alpha) beta_p = dt*self.tau_values.get("p", self.alpha) beta_b = dt*self.tau_values.get("b", self.alpha) Vu = equation.domain.spaces("HDiv") Vb = equation.domain.spaces("theta") Vp = equation.domain.spaces("DG") # Split up the rhs vector (symbolically) self.xrhs = Function(self.equations.function_space) u_in, p_in, b_in = split(self.xrhs) # Build the reduced function space for u,p M = MixedFunctionSpace((Vu, Vp)) w, phi = TestFunctions(M) u, p = TrialFunctions(M) # Get background fields bbar = split(equation.X_ref)[2] # Analytical (approximate) elimination of theta k = equation.domain.k # Upward pointing unit vector b = -dot(k, u)*dot(k, grad(bbar))*beta_b + b_in # vertical projection def V(u): return k*inner(u, k) eqn = ( inner(w, (u - u_in))*dx - beta_u*div(w)*p*dx - beta_u*inner(w, k)*b*dx ) if equation.compressible: cs = equation.parameters.cs eqn += phi * (p - p_in) * dx + beta_p * phi * cs**2 * div(u) * dx else: eqn += phi * div(u) * dx if hasattr(self.equations, "mu"): eqn += dt*self.equations.mu*inner(w, k)*inner(u, k)*dx if equation.parameters.Omega is not None: Omega = as_vector((0, 0, equation.parameter.Omega)) eqn += inner(w, cross(2*Omega, u))*dx aeqn = lhs(eqn) Leqn = rhs(eqn) # Place to put result of u p solver self.up = Function(M) # Boundary conditions (assumes extruded mesh) # BCs are declared for the plain velocity space. As we need them in # a mixed problem, we replicate the BCs but for subspace of M bcs = [DirichletBC(M.sub(0), bc.function_arg, bc.sub_domain) for bc in self.equations.bcs['u']] # Solver for u, p up_problem = LinearVariationalProblem(aeqn, Leqn, self.up, bcs=bcs, constant_jacobian=True) # Provide callback for the nullspace of the trace system def trace_nullsp(T): return VectorSpaceBasis(constant=True) appctx = {"trace_nullspace": trace_nullsp} self.up_solver = LinearVariationalSolver(up_problem, solver_parameters=self.solver_parameters, appctx=appctx) # Reconstruction of b b = TrialFunction(Vb) gamma = TestFunction(Vb) u, p = self.up.subfunctions self.b = Function(Vb) b_eqn = gamma*(b - b_in + dot(k, u)*dot(k, grad(bbar))*beta_b)*dx b_problem = LinearVariationalProblem(lhs(b_eqn), rhs(b_eqn), self.b, constant_jacobian=True) self.b_solver = LinearVariationalSolver(b_problem) # Log residuals on hybridized solver self.log_ksp_residuals(self.up_solver.snes.ksp)
[docs] @timed_function("Gusto:UpdateReferenceProfiles") def update_reference_profiles(self): self.up_solver.invalidate_jacobian()
[docs] @timed_function("Gusto:LinearSolve") def solve(self, xrhs, dy): """ Solve the linear problem. Args: xrhs (:class:`Function`): the right-hand side field in the appropriate :class:`MixedFunctionSpace`. dy (:class:`Function`): the resulting field in the appropriate :class:`MixedFunctionSpace`. """ self.xrhs.assign(xrhs) with timed_region("Gusto:VelocityPressureSolve"): logger.info('Boussinesq linear solver: mixed solve') self.up_solver.solve() u1, p1 = self.up.subfunctions u, p, b = dy.subfunctions u.assign(u1) p.assign(p1) with timed_region("Gusto:BuoyancyRecon"): logger.info('Boussinesq linear solver: buoyancy reconstruction') self.b_solver.solve() b.assign(self.b)
[docs] class ThermalSWSolver(TimesteppingSolver): """Linear solver object for the thermal shallow water equations. This solves a linear problem for the thermal shallow water equations with prognostic variables u (velocity), D (depth) and either b (buoyancy) or b_e (equivalent buoyancy). It follows the following strategy: (1) Eliminate b (2) Solve the resulting system for (u, D) using a hybrid-mixed method (3) Reconstruct b """ solver_parameters = { 'ksp_type': 'preonly', 'mat_type': 'matfree', 'pc_type': 'python', 'pc_python_type': 'firedrake.HybridizationPC', 'hybridization': {'ksp_type': 'cg', 'pc_type': 'gamg', 'ksp_rtol': 1e-8, 'mg_levels': {'ksp_type': 'chebyshev', 'ksp_max_it': 2, 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'}} } @timed_function("Gusto:SolverSetup") def _setup_solver(self): equation = self.equations # just cutting down line length a bit equivalent_buoyancy = equation.equivalent_buoyancy dt = self.dt beta_u = dt*self.tau_values.get("u", self.alpha) beta_d = dt*self.tau_values.get("D", self.alpha) beta_b = dt*self.tau_values.get("b", self.alpha) Vu = equation.domain.spaces("HDiv") VD = equation.domain.spaces("DG") Vb = equation.domain.spaces("DG") # Check that the third field is buoyancy if not equation.field_names[2] == 'b' and not (equation.field_names[2] == 'b_e' and equivalent_buoyancy): raise NotImplementedError("Field 'b' or 'b_e' must exist to use the thermal linear solver in the SIQN scheme") # Split up the rhs vector self.xrhs = Function(equation.function_space) u_in, D_in, b_in = split(self.xrhs)[0:3] # Build the reduced function space for u, D M = MixedFunctionSpace((Vu, VD)) w, phi = TestFunctions(M) u, D = TrialFunctions(M) # Get background buoyancy and depth Dbar, bbar = split(equation.X_ref)[1:3] # Approximate elimination of b b = -dot(u, grad(bbar))*beta_b + b_in if equivalent_buoyancy: # compute q_v using q_sat to partition q_t into q_v and q_c self.q_sat_func = Function(VD) self.qvbar = Function(VD) qtbar = split(equation.X_ref)[3] # set up interpolators that use the X_ref values for D and b_e self.q_sat_expr_interpolate = interpolate( equation.compute_saturation(equation.X_ref), VD) self.q_v_interpolate = interpolate( conditional(qtbar < self.q_sat_func, qtbar, self.q_sat_func), VD) # bbar was be_bar and here we correct to become bbar bbar += equation.parameters.beta2 * self.qvbar n = FacetNormal(equation.domain.mesh) eqn = ( inner(w, (u - u_in)) * dx - beta_u * (D - Dbar) * div(w*bbar) * dx + beta_u * jump(w*bbar, n) * avg(D-Dbar) * dS - beta_u * 0.5 * Dbar * bbar * div(w) * dx - beta_u * 0.5 * Dbar * b * div(w) * dx - beta_u * 0.5 * bbar * div(w*(D-Dbar)) * dx + beta_u * 0.5 * jump((D-Dbar)*w, n) * avg(bbar) * dS + inner(phi, (D - D_in)) * dx + beta_d * phi * div(Dbar*u) * dx ) if 'coriolis' in equation.prescribed_fields._field_names: f = equation.prescribed_fields('coriolis') eqn += beta_u * f * inner(w, equation.domain.perp(u)) * dx aeqn = lhs(eqn) Leqn = rhs(eqn) # Place to put results of (u,D) solver self.uD = Function(M) # Boundary conditions bcs = [DirichletBC(M.sub(0), bc.function_arg, bc.sub_domain) for bc in self.equations.bcs['u']] # Solver for u, D uD_problem = LinearVariationalProblem(aeqn, Leqn, self.uD, bcs=bcs, constant_jacobian=True) # Provide callback for the nullspace of the trace system def trace_nullsp(T): return VectorSpaceBasis(constant=True) appctx = {"trace_nullspace": trace_nullsp} self.uD_solver = LinearVariationalSolver(uD_problem, solver_parameters=self.solver_parameters, appctx=appctx) # Reconstruction of b b = TrialFunction(Vb) gamma = TestFunction(Vb) u, D = self.uD.subfunctions self.b = Function(Vb) b_eqn = gamma*(b - b_in + inner(u, grad(bbar))*beta_b) * dx b_problem = LinearVariationalProblem(lhs(b_eqn), rhs(b_eqn), self.b, constant_jacobian=True) self.b_solver = LinearVariationalSolver(b_problem) # Log residuals on hybridized solver self.log_ksp_residuals(self.uD_solver.snes.ksp)
[docs] @timed_function("Gusto:UpdateReferenceProfiles") def update_reference_profiles(self): if self.equations.equivalent_buoyancy: self.q_sat_func.assign(assemble(self.q_sat_expr_interpolate)) self.qvbar.assign(assemble(self.q_v_interpolate))
[docs] @timed_function("Gusto:LinearSolve") def solve(self, xrhs, dy): """ Solve the linear problem. Args: xrhs (:class:`Function`): the right-hand side field in the appropriate :class:`MixedFunctionSpace`. dy (:class:`Function`): the resulting field in the appropriate :class:`MixedFunctionSpace`. """ self.xrhs.assign(xrhs) # Check that the b reference profile has been set bbar = split(self.equations.X_ref)[2] b = dy.subfunctions[2] bbar_func = Function(b.function_space()).interpolate(bbar) if bbar_func.dat.data.max() == 0 and bbar_func.dat.data.min() == 0: logger.warning("The reference profile for b in the linear solver is zero. To set a non-zero profile add b to the set_reference_profiles argument.") with timed_region("Gusto:VelocityDepthSolve"): logger.info('Thermal linear solver: mixed solve') self.uD_solver.solve() u1, D1 = self.uD.subfunctions u = dy.subfunctions[0] D = dy.subfunctions[1] b = dy.subfunctions[2] u.assign(u1) D.assign(D1) with timed_region("Gusto:BuoyancyRecon"): logger.info('Thermal linear solver: buoyancy reconstruction') self.b_solver.solve() b.assign(self.b)
[docs] class LinearTimesteppingSolver(object): """ A general object for solving mixed finite element linear problems. This linear solver object is general and is designed for use with different equation sets, including with the non-linear shallow-water equations. It forms the linear problem from the equations using FML. The linear system is solved using a hybridised-mixed method. """ solver_parameters = { 'ksp_type': 'preonly', 'mat_type': 'matfree', 'pc_type': 'python', 'pc_python_type': 'firedrake.HybridizationPC', 'hybridization': {'ksp_type': 'cg', 'pc_type': 'gamg', 'ksp_rtol': 1e-8, 'mg_levels': {'ksp_type': 'chebyshev', 'ksp_max_it': 2, 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'}} } def __init__(self, equation, alpha, reference_dependent=True): """ Args: equation (:class:`PrognosticEquation`): the model's equation object. alpha (float): the semi-implicit off-centring factor. A value of 1 is fully-implicit. reference_dependent: this indicates that the solver Jacobian should be rebuilt if the reference profiles have been updated. """ self.reference_dependent = reference_dependent residual = equation.residual.label_map( lambda t: t.has_label(linearisation), lambda t: Term(t.get(linearisation).form, t.labels), drop) self.alpha = Constant(alpha) dt = equation.domain.dt W = equation.function_space beta = dt*self.alpha # Split up the rhs vector (symbolically) self.xrhs = Function(W) aeqn = residual.label_map( lambda t: (t.has_label(time_derivative) and t.has_label(linearisation)), map_if_false=lambda t: beta*t) Leqn = residual.label_map( lambda t: (t.has_label(time_derivative) and t.has_label(linearisation)), map_if_false=drop) # Place to put result of solver self.dy = Function(W) # Solver bcs = [DirichletBC(W.sub(0), bc.function_arg, bc.sub_domain) for bc in equation.bcs['u']] problem = LinearVariationalProblem(aeqn.form, action(Leqn.form, self.xrhs), self.dy, bcs=bcs, constant_jacobian=True) self.solver = LinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix='linear_solver')
[docs] @timed_function("Gusto:UpdateReferenceProfiles") def update_reference_profiles(self): if self.reference_dependent: self.solver.invalidate_jacobian()
[docs] @timed_function("Gusto:LinearSolve") def solve(self, xrhs, dy): """ Solve the linear problem. Args: xrhs (:class:`Function`): the right-hand side field in the appropriate :class:`MixedFunctionSpace`. dy (:class:`Function`): the resulting field in the appropriate :class:`MixedFunctionSpace`. """ self.xrhs.assign(xrhs) self.solver.solve() dy.assign(self.dy)
[docs] class MoistConvectiveSWSolver(TimesteppingSolver): """ Linear solver for the moist convective shallow water equations. This solves a linear problem for the shallow water equations with prognostic variables u (velocity) and D (depth). The linear system is solved using a hybridised-mixed method. """ solver_parameters = { 'ksp_type': 'preonly', 'mat_type': 'matfree', 'pc_type': 'python', 'pc_python_type': 'firedrake.HybridizationPC', 'hybridization': {'ksp_type': 'cg', 'pc_type': 'gamg', 'ksp_rtol': 1e-8, 'mg_levels': {'ksp_type': 'chebyshev', 'ksp_max_it': 2, 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'}} } @timed_function("Gusto:SolverSetup") def _setup_solver(self): equation = self.equations # just cutting down line length a bit dt = self.dt beta_u = dt*self.tau_values.get("u", self.alpha) beta_d = dt*self.tau_values.get("D", self.alpha) Vu = equation.domain.spaces("HDiv") VD = equation.domain.spaces("DG") # Split up the rhs vector self.xrhs = Function(self.equations.function_space) u_in = split(self.xrhs)[0] D_in = split(self.xrhs)[1] # Build the reduced function space for u, D M = MixedFunctionSpace((Vu, VD)) w, phi = TestFunctions(M) u, D = TrialFunctions(M) # Get background depth Dbar = split(equation.X_ref)[1] g = equation.parameters.g eqn = ( inner(w, (u - u_in)) * dx - beta_u * (D - Dbar) * div(w*g) * dx + inner(phi, (D - D_in)) * dx + beta_d * phi * div(Dbar*u) * dx ) if 'coriolis' in equation.prescribed_fields._field_names: f = equation.prescribed_fields('coriolis') eqn += beta_u * f * inner(w, equation.domain.perp(u)) * dx aeqn = lhs(eqn) Leqn = rhs(eqn) # Place to put results of (u,D) solver self.uD = Function(M) # Boundary conditions bcs = [DirichletBC(M.sub(0), bc.function_arg, bc.sub_domain) for bc in self.equations.bcs['u']] # Solver for u, D uD_problem = LinearVariationalProblem(aeqn, Leqn, self.uD, bcs=bcs, constant_jacobian=True) # Provide callback for the nullspace of the trace system def trace_nullsp(T): return VectorSpaceBasis(constant=True) appctx = {"trace_nullspace": trace_nullsp} self.uD_solver = LinearVariationalSolver(uD_problem, solver_parameters=self.solver_parameters, appctx=appctx) # Log residuals on hybridized solver self.log_ksp_residuals(self.uD_solver.snes.ksp)
[docs] @timed_function("Gusto:UpdateReferenceProfiles") def update_reference_profiles(self): self.uD_solver.invalidate_jacobian()
[docs] @timed_function("Gusto:LinearSolve") def solve(self, xrhs, dy): """ Solve the linear problem. Args: xrhs (:class:`Function`): the right-hand side field in the appropriate :class:`MixedFunctionSpace`. dy (:class:`Function`): the resulting field in the appropriate :class:`MixedFunctionSpace`. """ self.xrhs.assign(xrhs) with timed_region("Gusto:VelocityDepthSolve"): logger.info('Moist convective linear solver: mixed solve') self.uD_solver.solve() u1, D1 = self.uD.subfunctions u = dy.subfunctions[0] D = dy.subfunctions[1] u.assign(u1) D.assign(D1)