Source code for gusto.solvers.linear_solvers

This module provides abstract linear solver objects.

The linear solvers provided here are used for solving linear problems on mixed
finite element spaces.

from firedrake import (
    split, LinearVariationalProblem, Constant, LinearVariationalSolver,
    TestFunctions, TrialFunctions, TestFunction, TrialFunction, lhs,
    rhs, FacetNormal, div, dx, jump, avg, dS, dS_v, dS_h, ds_v, ds_t, ds_b,
    ds_tb, inner, action, dot, grad, Function, VectorSpaceBasis, cross,
    BrokenElement, FunctionSpace, MixedFunctionSpace, DirichletBC, as_vector
from firedrake.fml import Term, drop
from firedrake.petsc import flatten_parameters
from pyop2.profiling import timed_function, timed_region

from gusto.equations.active_tracers import TracerVariableType
from gusto.core.logging import (
    logger, DEBUG, logging_ksp_monitor_true_residual,
from gusto.core.labels import linearisation, time_derivative, hydrostatic
from gusto.equations import thermodynamics
from gusto.recovery.recovery_kernels import AverageWeightings, AverageKernel
from abc import ABCMeta, abstractmethod, abstractproperty

__all__ = ["BoussinesqSolver", "LinearTimesteppingSolver", "CompressibleSolver", "ThermalSWSolver", "MoistConvectiveSWSolver"]

class TimesteppingSolver(object, metaclass=ABCMeta):
    """Base class for timestepping linear solvers for Gusto."""

    def __init__(self, equations, alpha=0.5, tau_values=None,
                 solver_parameters=None, overwrite_solver_parameters=False):
            equations (:class:`PrognosticEquation`): the model's equation.
            alpha (float, optional): the semi-implicit off-centring factor.
                Defaults to 0.5. A value of 1 is fully-implicit.
            tau_values (dict, optional): contains the semi-implicit relaxation
                parameters. Defaults to None, in which case the value of alpha is used.
            solver_parameters (dict, optional): contains the options to be
                passed to the underlying :class:`LinearVariationalSolver`.
                Defaults to None.
            overwrite_solver_parameters (bool, optional): if True use only the
                `solver_parameters` that have been passed in. If False then
                update the default parameters with the `solver_parameters`
                passed in. Defaults to False.
        self.equations = equations
        self.dt = equations.domain.dt
        self.alpha = alpha
        self.tau_values = tau_values if tau_values is not None else {}

        if solver_parameters is not None:
            if not overwrite_solver_parameters:
                p = flatten_parameters(self.solver_parameters)
                solver_parameters = p
            self.solver_parameters = solver_parameters

        if logger.isEnabledFor(DEBUG):
            self.solver_parameters["ksp_monitor_true_residual"] = None

        # setup the solver

    def log_ksp_residuals(ksp):
        if logger.isEnabledFor(DEBUG):

    def solver_parameters(self):
        """Solver parameters for this solver"""

    def _setup_solver(self):

    def solve(self):

[docs] class CompressibleSolver(TimesteppingSolver): """ Timestepping linear solver object for the compressible Euler equations. This solves a linear problem for the compressible Euler equations in theta-exner formulation with prognostic variables u (velocity), rho (density) and theta (potential temperature). It follows the following strategy: (1) Analytically eliminate theta (introduces error near topography) (2a) Formulate the resulting mixed system for u and rho using a hybridized mixed method. This breaks continuity in the linear perturbations of u, and introduces a new unknown on the mesh interfaces approximating the average of the Exner pressure perturbations. These trace unknowns also act as Lagrange multipliers enforcing normal continuity of the "broken" u variable. (2b) Statically condense the block-sparse system into a single system for the Lagrange multipliers. This is the only globally coupled system requiring a linear solver. (2c) Using the computed trace variables, we locally recover the broken velocity and density perturbations. This is accomplished in two stages: (i): Recover rho locally using the multipliers. (ii): Recover "broken" u locally using rho and the multipliers. (2d) Project the "broken" velocity field into the HDiv-conforming space using local averaging. (3) Reconstruct theta """ solver_parameters = {'mat_type': 'matfree', 'ksp_type': 'preonly', 'pc_type': 'python', 'pc_python_type': 'firedrake.SCPC', 'pc_sc_eliminate_fields': '0, 1', # The reduced operator is not symmetric 'condensed_field': {'ksp_type': 'fgmres', 'ksp_rtol': 1.0e-8, 'ksp_atol': 1.0e-8, 'ksp_max_it': 100, 'pc_type': 'gamg', 'pc_gamg_sym_graph': None, 'mg_levels': {'ksp_type': 'gmres', 'ksp_max_it': 5, 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'}}} def __init__(self, equations, alpha=0.5, tau_values=None, quadrature_degree=None, solver_parameters=None, overwrite_solver_parameters=False): """ Args: equations (:class:`PrognosticEquation`): the model's equation. alpha (float, optional): the semi-implicit off-centring factor. Defaults to 0.5. A value of 1 is fully-implicit. tau_values (dict, optional): contains the semi-implicit relaxation parameters. Defaults to None, in which case the value of alpha is used. quadrature_degree (tuple, optional): a tuple (q_h, q_v) where q_h is the required quadrature degree in the horizontal direction and q_v is that in the vertical direction. Defaults to None. solver_parameters (dict, optional): contains the options to be passed to the underlying :class:`LinearVariationalSolver`. Defaults to None. overwrite_solver_parameters (bool, optional): if True use only the `solver_parameters` that have been passed in. If False then update the default parameters with the `solver_parameters` passed in. Defaults to False. """ self.equations = equations if quadrature_degree is not None: self.quadrature_degree = quadrature_degree else: dgspace = equations.domain.spaces("DG") if any(deg > 2 for deg in dgspace.ufl_element().degree()): logger.warning("default quadrature degree most likely not sufficient for this degree element") self.quadrature_degree = (5, 5) super().__init__(equations, alpha, tau_values, solver_parameters, overwrite_solver_parameters) @timed_function("Gusto:SolverSetup") def _setup_solver(self): equations = self.equations dt = self.dt # Set relaxation parameters. If an alternative has not been given, set # to semi-implicit off-centering factor beta_u_ = dt*self.tau_values.get("u", self.alpha) beta_t_ = dt*self.tau_values.get("theta", self.alpha) beta_r_ = dt*self.tau_values.get("rho", self.alpha) cp = equations.parameters.cp Vu = equations.domain.spaces("HDiv") Vu_broken = FunctionSpace(equations.domain.mesh, BrokenElement(Vu.ufl_element())) Vtheta = equations.domain.spaces("theta") Vrho = equations.domain.spaces("DG") # Store time-stepping coefficients as UFL Constants beta_u = Constant(beta_u_) beta_t = Constant(beta_t_) beta_r = Constant(beta_r_) beta_u_cp = Constant(beta_u * cp) h_deg = Vrho.ufl_element().degree()[0] v_deg = Vrho.ufl_element().degree()[1] Vtrace = FunctionSpace(equations.domain.mesh, "HDiv Trace", degree=(h_deg, v_deg)) # Split up the rhs vector (symbolically) self.xrhs = Function(self.equations.function_space) u_in, rho_in, theta_in = split(self.xrhs)[0:3] # Build the function space for "broken" u, rho, and pressure trace M = MixedFunctionSpace((Vu_broken, Vrho, Vtrace)) w, phi, dl = TestFunctions(M) u, rho, l0 = TrialFunctions(M) n = FacetNormal(equations.domain.mesh) # Get background fields _, rhobar, thetabar = split(equations.X_ref)[0:3] exnerbar = thermodynamics.exner_pressure(equations.parameters, rhobar, thetabar) exnerbar_rho = thermodynamics.dexner_drho(equations.parameters, rhobar, thetabar) exnerbar_theta = thermodynamics.dexner_dtheta(equations.parameters, rhobar, thetabar) # Analytical (approximate) elimination of theta k = equations.domain.k # Upward pointing unit vector theta = -dot(k, u)*dot(k, grad(thetabar))*beta_t + theta_in # Only include theta' (rather than exner') in the vertical # component of the gradient # The exner prime term (here, bars are for mean and no bars are # for linear perturbations) exner = exnerbar_theta*theta + exnerbar_rho*rho # Vertical projection def V(u): return k*inner(u, k) # hydrostatic projection h_project = lambda u: u - k*inner(u, k) # Specify degree for some terms as estimated degree is too large dxp = dx(degree=(self.quadrature_degree)) dS_vp = dS_v(degree=(self.quadrature_degree)) dS_hp = dS_h(degree=(self.quadrature_degree)) ds_vp = ds_v(degree=(self.quadrature_degree)) ds_tbp = (ds_t(degree=(self.quadrature_degree)) + ds_b(degree=(self.quadrature_degree))) # Add effect of density of water upon theta, using moisture reference profiles # TODO: Explore if this is the right thing to do for the linear problem if equations.active_tracers is not None: mr_t = Constant(0.0)*thetabar for tracer in equations.active_tracers: if tracer.chemical == 'H2O': if tracer.variable_type == TracerVariableType.mixing_ratio: idx = equations.field_names.index( mr_bar = split(equations.X_ref)[idx] mr_t += mr_bar else: raise NotImplementedError('Only mixing ratio tracers are implemented') theta_w = theta / (1 + mr_t) thetabar_w = thetabar / (1 + mr_t) else: theta_w = theta thetabar_w = thetabar _l0 = TrialFunction(Vtrace) _dl = TestFunction(Vtrace) a_tr = _dl('+')*_l0('+')*(dS_vp + dS_hp) + _dl*_l0*ds_vp + _dl*_l0*ds_tbp def L_tr(f): return _dl('+')*avg(f)*(dS_vp + dS_hp) + _dl*f*ds_vp + _dl*f*ds_tbp cg_ilu_parameters = {'ksp_type': 'cg', 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'} # Project field averages into functions on the trace space rhobar_avg = Function(Vtrace) exnerbar_avg = Function(Vtrace) rho_avg_prb = LinearVariationalProblem(a_tr, L_tr(rhobar), rhobar_avg) exner_avg_prb = LinearVariationalProblem(a_tr, L_tr(exnerbar), exnerbar_avg) self.rho_avg_solver = LinearVariationalSolver(rho_avg_prb, solver_parameters=cg_ilu_parameters, options_prefix='rhobar_avg_solver') self.exner_avg_solver = LinearVariationalSolver(exner_avg_prb, solver_parameters=cg_ilu_parameters, options_prefix='exnerbar_avg_solver') # "broken" u, rho, and trace system # NOTE: no ds_v integrals since equations are defined on # a periodic (or sphere) base mesh. if any([t.has_label(hydrostatic) for t in self.equations.residual]): u_mass = inner(w, (h_project(u) - u_in))*dx else: u_mass = inner(w, (u - u_in))*dx eqn = ( # momentum equation u_mass - beta_u_cp*div(theta_w*V(w))*exnerbar*dxp # following does nothing but is preserved in the comments # to remind us why (because V(w) is purely vertical). # + beta_cp*jump(theta_w*V(w), n=n)*exnerbar_avg('+')*dS_vp + beta_u_cp*jump(theta_w*V(w), n=n)*exnerbar_avg('+')*dS_hp + beta_u_cp*dot(theta_w*V(w), n)*exnerbar_avg*ds_tbp - beta_u_cp*div(thetabar_w*w)*exner*dxp # trace terms appearing after integrating momentum equation + beta_u_cp*jump(thetabar_w*w, n=n)*l0('+')*(dS_vp + dS_hp) + beta_u_cp*dot(thetabar_w*w, n)*l0*(ds_tbp + ds_vp) # mass continuity equation + (phi*(rho - rho_in) - beta_r*inner(grad(phi), u)*rhobar)*dx + beta_r*jump(phi*u, n=n)*rhobar_avg('+')*(dS_v + dS_h) # term added because u.n=0 is enforced weakly via the traces + beta_r*phi*dot(u, n)*rhobar_avg*(ds_tb + ds_v) # constraint equation to enforce continuity of the velocity # through the interior facets and weakly impose the no-slip # condition + dl('+')*jump(u, n=n)*(dS_vp + dS_hp) + dl*dot(u, n)*(ds_tbp + ds_vp) ) # TODO: can we get this term using FML? # contribution of the sponge term if hasattr(self.equations, "mu"): eqn += dt**inner(w, k)*inner(u, k)*dx if equations.parameters.Omega is not None: Omega = as_vector([0, 0, equations.parameters.Omega]) eqn += inner(w, cross(2*Omega, u))*dx aeqn = lhs(eqn) Leqn = rhs(eqn) # Function for the hybridized solutions self.urhol0 = Function(M) hybridized_prb = LinearVariationalProblem(aeqn, Leqn, self.urhol0) hybridized_solver = LinearVariationalSolver(hybridized_prb, solver_parameters=self.solver_parameters, options_prefix='ImplicitSolver') self.hybridized_solver = hybridized_solver # Project broken u into the HDiv space using facet averaging. # Weight function counting the dofs of the HDiv element: self._weight = Function(Vu) weight_kernel = AverageWeightings(Vu) weight_kernel.apply(self._weight) # Averaging kernel self._average_kernel = AverageKernel(Vu) # HDiv-conforming velocity self.u_hdiv = Function(Vu) # Reconstruction of theta theta = TrialFunction(Vtheta) gamma = TestFunction(Vtheta) self.theta = Function(Vtheta) theta_eqn = gamma*(theta - theta_in + dot(k, self.u_hdiv)*dot(k, grad(thetabar))*beta_t)*dx theta_problem = LinearVariationalProblem(lhs(theta_eqn), rhs(theta_eqn), self.theta) self.theta_solver = LinearVariationalSolver(theta_problem, solver_parameters=cg_ilu_parameters, options_prefix='thetabacksubstitution') # Store boundary conditions for the div-conforming velocity to apply # post-solve self.bcs = self.equations.bcs['u'] # Log residuals on hybridized solver self.log_ksp_residuals(self.hybridized_solver.snes.ksp) # Log residuals on the trace system too python_context = self.hybridized_solver.snes.ksp.pc.getPythonContext() attach_custom_monitor(python_context, logging_ksp_monitor_true_residual)
[docs] @timed_function("Gusto:LinearSolve") def solve(self, xrhs, dy): """ Solve the linear problem. Args: xrhs (:class:`Function`): the right-hand side field in the appropriate :class:`MixedFunctionSpace`. dy (:class:`Function`): the resulting field in the appropriate :class:`MixedFunctionSpace`. """ self.xrhs.assign(xrhs) # TODO: can we avoid computing these each time the solver is called? with timed_region("Gusto:HybridProjectRhobar"):'Compressible linear solver: rho average solve') self.rho_avg_solver.solve() with timed_region("Gusto:HybridProjectExnerbar"):'Compressible linear solver: Exner average solve') self.exner_avg_solver.solve() # Solve the hybridized system'Compressible linear solver: hybridized solve') self.hybridized_solver.solve() broken_u, rho1, _ = self.urhol0.subfunctions u1 = self.u_hdiv # Project broken_u into the HDiv space u1.assign(0.0) with timed_region("Gusto:HybridProjectHDiv"):'Compressible linear solver: restore continuity') self._average_kernel.apply(u1, self._weight, broken_u) # Reapply bcs to ensure they are satisfied for bc in self.bcs: bc.apply(u1) # Copy back into u and rho cpts of dy u, rho, theta = dy.subfunctions[0:3] u.assign(u1) rho.assign(rho1) # Reconstruct theta with timed_region("Gusto:ThetaRecon"):'Compressible linear solver: theta solve') self.theta_solver.solve() # Copy into theta cpt of dy theta.assign(self.theta)
[docs] class BoussinesqSolver(TimesteppingSolver): """ Linear solver object for the Boussinesq equations. This solves a linear problem for the Boussinesq equations with prognostic variables u (velocity), p (pressure) and b (buoyancy). It follows the following strategy: This solver follows the following strategy: (1) Analytically eliminate b (introduces error near topography) (2) Solve resulting system for (u,p) using a hybrid-mixed method (3) Reconstruct b """ solver_parameters = { 'ksp_type': 'preonly', 'mat_type': 'matfree', 'pc_type': 'python', 'pc_python_type': 'firedrake.HybridizationPC', 'hybridization': {'ksp_type': 'cg', 'pc_type': 'gamg', 'ksp_rtol': 1e-8, 'mg_levels': {'ksp_type': 'chebyshev', 'ksp_max_it': 2, 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'}} } @timed_function("Gusto:SolverSetup") def _setup_solver(self): equation = self.equations # just cutting down line length a bit dt = self.dt # Set relaxation parameters. If an alternative has not been given, set # to semi-implicit off-centering factor beta_u_ = dt*self.tau_values.get("u", self.alpha) beta_p_ = dt*self.tau_values.get("p", self.alpha) beta_b_ = dt*self.tau_values.get("b", self.alpha) Vu = equation.domain.spaces("HDiv") Vb = equation.domain.spaces("theta") Vp = equation.domain.spaces("DG") # Store time-stepping coefficients as UFL Constants beta_u = Constant(beta_u_) beta_p = Constant(beta_p_) beta_b = Constant(beta_b_) # Split up the rhs vector (symbolically) self.xrhs = Function(self.equations.function_space) u_in, p_in, b_in = split(self.xrhs) # Build the reduced function space for u,p M = MixedFunctionSpace((Vu, Vp)) w, phi = TestFunctions(M) u, p = TrialFunctions(M) # Get background fields bbar = split(equation.X_ref)[2] # Analytical (approximate) elimination of theta k = equation.domain.k # Upward pointing unit vector b = -dot(k, u)*dot(k, grad(bbar))*beta_b + b_in # vertical projection def V(u): return k*inner(u, k) eqn = ( inner(w, (u - u_in))*dx - beta_u*div(w)*p*dx - beta_u*inner(w, k)*b*dx ) if equation.compressible: cs = equation.parameters.cs eqn += phi * (p - p_in) * dx + beta_p * phi * cs**2 * div(u) * dx else: eqn += phi * div(u) * dx if hasattr(self.equations, "mu"): eqn += dt**inner(w, k)*inner(u, k)*dx if equation.parameters.Omega is not None: Omega = as_vector((0, 0, equation.parameter.Omega)) eqn += inner(w, cross(2*Omega, u))*dx aeqn = lhs(eqn) Leqn = rhs(eqn) # Place to put result of u p solver self.up = Function(M) # Boundary conditions (assumes extruded mesh) # BCs are declared for the plain velocity space. As we need them in # a mixed problem, we replicate the BCs but for subspace of M bcs = [DirichletBC(M.sub(0), bc.function_arg, bc.sub_domain) for bc in self.equations.bcs['u']] # Solver for u, p up_problem = LinearVariationalProblem(aeqn, Leqn, self.up, bcs=bcs) # Provide callback for the nullspace of the trace system def trace_nullsp(T): return VectorSpaceBasis(constant=True) appctx = {"trace_nullspace": trace_nullsp} self.up_solver = LinearVariationalSolver(up_problem, solver_parameters=self.solver_parameters, appctx=appctx) # Reconstruction of b b = TrialFunction(Vb) gamma = TestFunction(Vb) u, p = self.up.subfunctions self.b = Function(Vb) b_eqn = gamma*(b - b_in + dot(k, u)*dot(k, grad(bbar))*beta_b)*dx b_problem = LinearVariationalProblem(lhs(b_eqn), rhs(b_eqn), self.b) self.b_solver = LinearVariationalSolver(b_problem) # Log residuals on hybridized solver self.log_ksp_residuals(self.up_solver.snes.ksp)
[docs] @timed_function("Gusto:LinearSolve") def solve(self, xrhs, dy): """ Solve the linear problem. Args: xrhs (:class:`Function`): the right-hand side field in the appropriate :class:`MixedFunctionSpace`. dy (:class:`Function`): the resulting field in the appropriate :class:`MixedFunctionSpace`. """ self.xrhs.assign(xrhs) with timed_region("Gusto:VelocityPressureSolve"):'Boussinesq linear solver: mixed solve') self.up_solver.solve() u1, p1 = self.up.subfunctions u, p, b = dy.subfunctions u.assign(u1) p.assign(p1) with timed_region("Gusto:BuoyancyRecon"):'Boussinesq linear solver: buoyancy reconstruction') self.b_solver.solve() b.assign(self.b)
[docs] class ThermalSWSolver(TimesteppingSolver): """ Linear solver object for the thermal shallow water equations. This solves a linear problem for the thermal shallow water equations with prognostic variables u (velocity), D (depth) and b (buoyancy). It follows the following strategy: (1) Eliminate b (2) Solve the resulting system for (u, D) using a hybrid-mixed method (3) Reconstruct b """ solver_parameters = { 'ksp_type': 'preonly', 'mat_type': 'matfree', 'pc_type': 'python', 'pc_python_type': 'firedrake.HybridizationPC', 'hybridization': {'ksp_type': 'cg', 'pc_type': 'gamg', 'ksp_rtol': 1e-8, 'mg_levels': {'ksp_type': 'chebyshev', 'ksp_max_it': 2, 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'}} } @timed_function("Gusto:SolverSetup") def _setup_solver(self): equation = self.equations # just cutting down line length a bit dt = self.dt beta_u_ = dt*self.tau_values.get("u", self.alpha) beta_d_ = dt*self.tau_values.get("D", self.alpha) beta_b_ = dt*self.tau_values.get("b", self.alpha) Vu = equation.domain.spaces("HDiv") VD = equation.domain.spaces("DG") Vb = equation.domain.spaces("DG") # Check that the third field is buoyancy if not equation.field_names[2] == 'b': raise NotImplementedError("Field 'b' must exist to use the thermal linear solver in the SIQN scheme") # Store time-stepping coefficients as UFL Constants beta_u = Constant(beta_u_) beta_d = Constant(beta_d_) beta_b = Constant(beta_b_) # Split up the rhs vector self.xrhs = Function(self.equations.function_space) u_in = split(self.xrhs)[0] D_in = split(self.xrhs)[1] b_in = split(self.xrhs)[2] # Build the reduced function space for u, D M = MixedFunctionSpace((Vu, VD)) w, phi = TestFunctions(M) u, D = TrialFunctions(M) # Get background buoyancy and depth Dbar = split(equation.X_ref)[1] bbar = split(equation.X_ref)[2] # Approximate elimination of b b = -dot(u, grad(bbar))*beta_b + b_in n = FacetNormal(equation.domain.mesh) eqn = ( inner(w, (u - u_in)) * dx - beta_u * (D - Dbar) * div(w*bbar) * dx + beta_u * jump(w*bbar, n) * avg(D-Dbar) * dS - beta_u * 0.5 * Dbar * bbar * div(w) * dx - beta_u * 0.5 * Dbar * b * div(w) * dx - beta_u * 0.5 * bbar * div(w*(D-Dbar)) * dx + beta_u * 0.5 * jump((D-Dbar)*w, n) * avg(bbar) * dS + inner(phi, (D - D_in)) * dx + beta_d * phi * Dbar * div(u) * dx ) if 'coriolis' in equation.prescribed_fields._field_names: f = equation.prescribed_fields('coriolis') eqn += beta_u_ * f * inner(w, equation.domain.perp(u)) * dx aeqn = lhs(eqn) Leqn = rhs(eqn) # Place to put results of (u,D) solver self.uD = Function(M) # Boundary conditions bcs = [DirichletBC(M.sub(0), bc.function_arg, bc.sub_domain) for bc in self.equations.bcs['u']] # Solver for u, D uD_problem = LinearVariationalProblem(aeqn, Leqn, self.uD, bcs=bcs) # Provide callback for the nullspace of the trace system def trace_nullsp(T): return VectorSpaceBasis(constant=True) appctx = {"trace_nullspace": trace_nullsp} self.uD_solver = LinearVariationalSolver(uD_problem, solver_parameters=self.solver_parameters, appctx=appctx) # Reconstruction of b b = TrialFunction(Vb) gamma = TestFunction(Vb) u, D = self.uD.subfunctions self.b = Function(Vb) b_eqn = gamma*(b - b_in + inner(u, grad(bbar))*beta_b) * dx b_problem = LinearVariationalProblem(lhs(b_eqn), rhs(b_eqn), self.b) self.b_solver = LinearVariationalSolver(b_problem) # Log residuals on hybridized solver self.log_ksp_residuals(self.uD_solver.snes.ksp)
[docs] @timed_function("Gusto:LinearSolve") def solve(self, xrhs, dy): """ Solve the linear problem. Args: xrhs (:class:`Function`): the right-hand side field in the appropriate :class:`MixedFunctionSpace`. dy (:class:`Function`): the resulting field in the appropriate :class:`MixedFunctionSpace`. """ self.xrhs.assign(xrhs) # Check that the b reference profile has been set bbar = split(self.equations.X_ref)[2] b = dy.subfunctions[2] bbar_func = Function(b.function_space()).interpolate(bbar) if == 0 and == 0: logger.warning("The reference profile for b in the linear solver is zero. To set a non-zero profile add b to the set_reference_profiles argument.") with timed_region("Gusto:VelocityDepthSolve"):'Thermal linear solver: mixed solve') self.uD_solver.solve() u1, D1 = self.uD.subfunctions u = dy.subfunctions[0] D = dy.subfunctions[1] b = dy.subfunctions[2] u.assign(u1) D.assign(D1) with timed_region("Gusto:BuoyancyRecon"):'Thermal linear solver: buoyancy reconstruction') self.b_solver.solve() b.assign(self.b)
[docs] class LinearTimesteppingSolver(object): """ A general object for solving mixed finite element linear problems. This linear solver object is general and is designed for use with different equation sets, including with the non-linear shallow-water equations. It forms the linear problem from the equations using FML. The linear system is solved using a hybridised-mixed method. """ solver_parameters = { 'ksp_type': 'preonly', 'mat_type': 'matfree', 'pc_type': 'python', 'pc_python_type': 'firedrake.HybridizationPC', 'hybridization': {'ksp_type': 'cg', 'pc_type': 'gamg', 'ksp_rtol': 1e-8, 'mg_levels': {'ksp_type': 'chebyshev', 'ksp_max_it': 2, 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'}} } def __init__(self, equation, alpha): """ Args: equation (:class:`PrognosticEquation`): the model's equation object. alpha (float): the semi-implicit off-centring factor. A value of 1 is fully-implicit. """ residual = equation.residual.label_map( lambda t: t.has_label(linearisation), lambda t: Term(t.get(linearisation).form, t.labels), drop) dt = equation.domain.dt W = equation.function_space beta = dt*alpha # Split up the rhs vector (symbolically) self.xrhs = Function(W) aeqn = residual.label_map( lambda t: (t.has_label(time_derivative) and t.has_label(linearisation)), map_if_false=lambda t: beta*t) Leqn = residual.label_map( lambda t: (t.has_label(time_derivative) and t.has_label(linearisation)), map_if_false=drop) # Place to put result of solver self.dy = Function(W) # Solver bcs = [DirichletBC(W.sub(0), bc.function_arg, bc.sub_domain) for bc in equation.bcs['u']] problem = LinearVariationalProblem(aeqn.form, action(Leqn.form, self.xrhs), self.dy, bcs=bcs) self.solver = LinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix='linear_solver')
[docs] @timed_function("Gusto:LinearSolve") def solve(self, xrhs, dy): """ Solve the linear problem. Args: xrhs (:class:`Function`): the right-hand side field in the appropriate :class:`MixedFunctionSpace`. dy (:class:`Function`): the resulting field in the appropriate :class:`MixedFunctionSpace`. """ self.xrhs.assign(xrhs) self.solver.solve() dy.assign(self.dy)
[docs] class MoistConvectiveSWSolver(TimesteppingSolver): """ Linear solver for the moist convective shallow water equations. This solves a linear problem for the shallow water equations with prognostic variables u (velocity) and D (depth). The linear system is solved using a hybridised-mixed method. """ solver_parameters = { 'ksp_type': 'preonly', 'mat_type': 'matfree', 'pc_type': 'python', 'pc_python_type': 'firedrake.HybridizationPC', 'hybridization': {'ksp_type': 'cg', 'pc_type': 'gamg', 'ksp_rtol': 1e-8, 'mg_levels': {'ksp_type': 'chebyshev', 'ksp_max_it': 2, 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'}} } @timed_function("Gusto:SolverSetup") def _setup_solver(self): equation = self.equations # just cutting down line length a bit dt = self.dt beta_u_ = dt*self.tau_values.get("u", self.alpha) beta_d_ = dt*self.tau_values.get("D", self.alpha) Vu = equation.domain.spaces("HDiv") VD = equation.domain.spaces("DG") # Store time-stepping coefficients as UFL Constants beta_u = Constant(beta_u_) beta_d = Constant(beta_d_) # Split up the rhs vector self.xrhs = Function(self.equations.function_space) u_in = split(self.xrhs)[0] D_in = split(self.xrhs)[1] # Build the reduced function space for u, D M = MixedFunctionSpace((Vu, VD)) w, phi = TestFunctions(M) u, D = TrialFunctions(M) # Get background depth Dbar = split(equation.X_ref)[1] g = equation.parameters.g eqn = ( inner(w, (u - u_in)) * dx - beta_u * (D - Dbar) * div(w*g) * dx + inner(phi, (D - D_in)) * dx + beta_d * phi * Dbar * div(u) * dx ) if 'coriolis' in equation.prescribed_fields._field_names: f = equation.prescribed_fields('coriolis') eqn += beta_u_ * f * inner(w, equation.domain.perp(u)) * dx aeqn = lhs(eqn) Leqn = rhs(eqn) # Place to put results of (u,D) solver self.uD = Function(M) # Boundary conditions bcs = [DirichletBC(M.sub(0), bc.function_arg, bc.sub_domain) for bc in self.equations.bcs['u']] # Solver for u, D uD_problem = LinearVariationalProblem(aeqn, Leqn, self.uD, bcs=bcs) # Provide callback for the nullspace of the trace system def trace_nullsp(T): return VectorSpaceBasis(constant=True) appctx = {"trace_nullspace": trace_nullsp} self.uD_solver = LinearVariationalSolver(uD_problem, solver_parameters=self.solver_parameters, appctx=appctx) # Log residuals on hybridized solver self.log_ksp_residuals(self.uD_solver.snes.ksp)
[docs] @timed_function("Gusto:LinearSolve") def solve(self, xrhs, dy): """ Solve the linear problem. Args: xrhs (:class:`Function`): the right-hand side field in the appropriate :class:`MixedFunctionSpace`. dy (:class:`Function`): the resulting field in the appropriate :class:`MixedFunctionSpace`. """ self.xrhs.assign(xrhs) with timed_region("Gusto:VelocityDepthSolve"):'Moist convective linear solver: mixed solve') self.uD_solver.solve() u1, D1 = self.uD.subfunctions u = dy.subfunctions[0] D = dy.subfunctions[1] u.assign(u1) D.assign(D1)