Source code for gusto.physics.boundary_and_turbulence

"""
Objects to describe physics parametrisations for the boundary layer, such as
drag and turbulence."""

from firedrake import (
    Interpolator, conditional, Function, dx, sqrt, dot, Constant, grad,
    TestFunctions, split, inner, TestFunction, exp, avg, outer, FacetNormal,
    SpatialCoordinate, dS_v, NonlinearVariationalProblem,
    NonlinearVariationalSolver
)
from firedrake.fml import subject
from gusto.core.configuration import BoundaryLayerParameters
from gusto.recovery import Recoverer, BoundaryMethod
from gusto.equations import CompressibleEulerEquations
from gusto.core.labels import prognostic
from gusto.core.logging import logger
from gusto.equations import thermodynamics
from gusto.physics.physics_parametrisation import PhysicsParametrisation

__all__ = ["SurfaceFluxes", "WindDrag", "StaticAdjustment",
           "SuppressVerticalWind", "BoundaryLayerMixing"]


[docs] class SurfaceFluxes(PhysicsParametrisation): """ Prescribed surface temperature and moisture fluxes, to be used in aquaplanet simulations as Sea Surface Temperature fluxes. This formulation is taken from the DCMIP (2016) test case document. These can be used either with an in-built implicit formulation, or with a generic time discretisation. Written to assume that dry density is unchanged by the parametrisation. """ def __init__(self, equation, T_surface_expr, vapour_name=None, implicit_formulation=False, parameters=None): """ Args: equation (:class:`PrognosticEquationSet`): the model's equation. T_surface_expr (:class:`ufl.Expr`): the surface temperature. vapour_name (str, optional): name of the water vapour variable. Defaults to None, in which case no moisture fluxes are applied. implicit_formulation (bool, optional): whether the scheme is already put into a Backwards Euler formulation (which allows this scheme to actually be used with a Forwards Euler or other explicit time discretisation). Otherwise, this is formulated more generally and can be used with any time stepper. Defaults to False. parameters (:class:`BoundaryLayerParameters`): configuration object giving the parameters for boundary and surface level schemes. Defaults to None, in which case default values are used. """ # -------------------------------------------------------------------- # # Check arguments and generic initialisation # -------------------------------------------------------------------- # if not isinstance(equation, CompressibleEulerEquations): raise ValueError("Surface fluxes can only be used with Compressible Euler equations") if vapour_name is not None: if vapour_name not in equation.field_names: raise ValueError(f"Field {vapour_name} does not exist in the equation set") if parameters is None: parameters = BoundaryLayerParameters() label_name = 'surface_flux' super().__init__(equation, label_name, parameters=None) self.implicit_formulation = implicit_formulation self.X = Function(equation.X.function_space()) self.dt = Constant(0.0) # -------------------------------------------------------------------- # # Extract prognostic variables # -------------------------------------------------------------------- # u_idx = equation.field_names.index('u') T_idx = equation.field_names.index('theta') rho_idx = equation.field_names.index('rho') if vapour_name is not None: m_v_idx = equation.field_names.index(vapour_name) X = self.X tests = TestFunctions(X.function_space()) if implicit_formulation else equation.tests u = split(X)[u_idx] rho = split(X)[rho_idx] theta_vd = split(X)[T_idx] test_theta = tests[T_idx] if vapour_name is not None: m_v = split(X)[m_v_idx] test_m_v = tests[m_v_idx] else: m_v = None if implicit_formulation: # Need to evaluate rho at theta-points boundary_method = BoundaryMethod.extruded if equation.domain.vertical_degree == 0 else None rho_averaged = Function(equation.function_space.sub(T_idx)) self.rho_recoverer = Recoverer(rho, rho_averaged, boundary_method=boundary_method) exner = thermodynamics.exner_pressure(equation.parameters, rho_averaged, theta_vd) else: # Exner is more general expression exner = thermodynamics.exner_pressure(equation.parameters, rho, theta_vd) # Alternative variables T = thermodynamics.T(equation.parameters, theta_vd, exner, r_v=m_v) p = thermodynamics.p(equation.parameters, exner) # -------------------------------------------------------------------- # # Expressions for surface fluxes # -------------------------------------------------------------------- # z = equation.domain.height_above_surface z_a = parameters.height_surface_layer surface_expr = conditional(z < z_a, Constant(1.0), Constant(0.0)) u_hori = u - equation.domain.k*dot(u, equation.domain.k) u_hori_mag = sqrt(dot(u_hori, u_hori)) C_H = parameters.coeff_heat C_E = parameters.coeff_evap # Implicit formulation ----------------------------------------------- # # For use with ForwardEuler only, as implicit solution is hand-written if implicit_formulation: self.source_interpolators = [] # First specify T_np1 expression Vtheta = equation.spaces[T_idx] T_np1_expr = ((T + C_H*u_hori_mag*T_surface_expr*self.dt/z_a) / (1 + C_H*u_hori_mag*self.dt/z_a)) # If moist formulation, determine next vapour value if vapour_name is not None: source_mv = Function(Vtheta) mv_sat = thermodynamics.r_sat(equation.parameters, T, p) mv_np1_expr = ((m_v + C_E*u_hori_mag*mv_sat*self.dt/z_a) / (1 + C_E*u_hori_mag*self.dt/z_a)) dmv_expr = surface_expr * (mv_np1_expr - m_v) / self.dt source_mv_expr = test_m_v * source_mv * dx self.source_interpolators.append(Interpolator(dmv_expr, source_mv)) equation.residual -= self.label(subject(prognostic(source_mv_expr, vapour_name), X), self.evaluate) # Moisture needs including in theta_vd expression # NB: still using old pressure here, which implies constant p? epsilon = equation.parameters.R_d / equation.parameters.R_v theta_np1_expr = (thermodynamics.theta(equation.parameters, T_np1_expr, p) * (1 + mv_np1_expr / epsilon)) else: theta_np1_expr = thermodynamics.theta(equation.parameters, T_np1_expr, p) source_theta_vd = Function(Vtheta) dtheta_vd_expr = surface_expr * (theta_np1_expr - theta_vd) / self.dt source_theta_expr = test_theta * source_theta_vd * dx self.source_interpolators.append(Interpolator(dtheta_vd_expr, source_theta_vd)) equation.residual -= self.label(subject(prognostic(source_theta_expr, 'theta'), X), self.evaluate) # General formulation ------------------------------------------------ # else: # Construct underlying expressions kappa = equation.parameters.kappa dT_dt = surface_expr * C_H * u_hori_mag * (T_surface_expr - T) / z_a if vapour_name is not None: mv_sat = thermodynamics.r_sat(equation.parameters, T, p) dmv_dt = surface_expr * C_E * u_hori_mag * (mv_sat - m_v) / z_a source_mv_expr = test_m_v * dmv_dt * dx equation.residual -= self.label( prognostic(subject(source_mv_expr, X), vapour_name), self.evaluate) # Theta expression depends on dmv_dt epsilon = equation.parameters.R_d / equation.parameters.R_v dtheta_vd_dt = (dT_dt * ((1 + m_v / epsilon) / exner - kappa * theta_vd / (rho * T)) + dmv_dt * (T / (epsilon * exner) - kappa * theta_vd / (epsilon + m_v))) else: dtheta_vd_dt = dT_dt * (1 / exner - kappa * theta_vd / (rho * T)) dx_reduced = dx(degree=4) source_theta_expr = test_theta * dtheta_vd_dt * dx_reduced equation.residual -= self.label( subject(prognostic(source_theta_expr, 'theta'), X), self.evaluate)
[docs] def evaluate(self, x_in, dt): """ Evaluates the source term generated by the physics. This does nothing if the implicit formulation is not used. Args: x_in: (:class: 'Function'): the (mixed) field to be evolved. dt: (:class: 'Constant'): the timestep, which can be the time interval for the scheme. """ logger.info(f'Evaluating physics parametrisation {self.label.label}') if self.implicit_formulation: self.X.assign(x_in) self.dt.assign(dt) self.rho_recoverer.project() for source_interpolator in self.source_interpolators: source_interpolator.interpolate()
[docs] class WindDrag(PhysicsParametrisation): """ A simple surface wind drag scheme. This formulation is taken from the DCMIP (2016) test case document. These can be used either with an in-built implicit formulation, or with a generic time discretisation. """ def __init__(self, equation, implicit_formulation=False, parameters=None): """ Args: equation (:class:`PrognosticEquationSet`): the model's equation. implicit_formulation (bool, optional): whether the scheme is already put into a Backwards Euler formulation (which allows this scheme to actually be used with a Forwards Euler or other explicit time discretisation). Otherwise, this is formulated more generally and can be used with any time stepper. Defaults to False. parameters (:class:`BoundaryLayerParameters`): configuration object giving the parameters for boundary and surface level schemes. Defaults to None, in which case default values are used. """ # -------------------------------------------------------------------- # # Check arguments and generic initialisation # -------------------------------------------------------------------- # if not isinstance(equation, CompressibleEulerEquations): raise ValueError("Wind drag can only be used with Compressible Euler equations") if parameters is None: parameters = BoundaryLayerParameters() label_name = 'wind_drag' super().__init__(equation, label_name, parameters=None) k = equation.domain.k self.implicit_formulation = implicit_formulation self.X = Function(equation.X.function_space()) self.dt = Constant(0.0) # -------------------------------------------------------------------- # # Extract prognostic variables # -------------------------------------------------------------------- # u_idx = equation.field_names.index('u') X = self.X tests = TestFunctions(X.function_space()) if implicit_formulation else equation.tests test = tests[u_idx] u = split(X)[u_idx] u_hori = u - k*dot(u, k) u_hori_mag = sqrt(dot(u_hori, u_hori)) # -------------------------------------------------------------------- # # Expressions for wind drag # -------------------------------------------------------------------- # z = equation.domain.height_above_surface z_a = parameters.height_surface_layer surface_expr = conditional(z < z_a, Constant(1.0), Constant(0.0)) C_D0 = parameters.coeff_drag_0 C_D1 = parameters.coeff_drag_1 C_D2 = parameters.coeff_drag_2 C_D = conditional(u_hori_mag < 20.0, C_D0 + C_D1*u_hori_mag, C_D2) # Implicit formulation ----------------------------------------------- # # For use with ForwardEuler only, as implicit solution is hand-written if implicit_formulation: # First specify T_np1 expression Vu = equation.spaces[u_idx] source_u = Function(Vu) u_np1_expr = u_hori / (1 + C_D*u_hori_mag*self.dt/z_a) du_expr = surface_expr * (u_np1_expr - u_hori) / self.dt # TODO: introduce reduced projector test_Vu = TestFunction(Vu) dx_reduced = dx(degree=4) proj_eqn = inner(test_Vu, source_u - du_expr)*dx_reduced proj_prob = NonlinearVariationalProblem(proj_eqn, source_u) self.source_projector = NonlinearVariationalSolver(proj_prob) source_expr = inner(test, source_u - k*dot(source_u, k)) * dx equation.residual -= self.label(subject(prognostic(source_expr, 'u'), X), self.evaluate) # General formulation ------------------------------------------------ # else: # Construct underlying expressions du_dt = -surface_expr * C_D * u_hori_mag * u_hori / z_a dx_reduced = dx(degree=4) source_expr = inner(test, du_dt) * dx_reduced equation.residual -= self.label(subject(prognostic(source_expr, 'u'), X), self.evaluate)
[docs] def evaluate(self, x_in, dt): """ Evaluates the source term generated by the physics. This does nothing if the implicit formulation is not used. Args: x_in: (:class: 'Function'): the (mixed) field to be evolved. dt: (:class: 'Constant'): the timestep, which can be the time interval for the scheme. """ logger.info(f'Evaluating physics parametrisation {self.label.label}') if self.implicit_formulation: self.X.assign(x_in) self.dt.assign(dt) self.source_projector.solve()
[docs] class StaticAdjustment(PhysicsParametrisation): """ A scheme to provide static adjustment, by sorting the potential temperature values in a column so that they are increasing with height. """ def __init__(self, equation, theta_variable='theta_vd'): """ Args: equation (:class:`PrognosticEquationSet`): the model's equation. theta_variable (str, optional): which theta variable to sort the profile for. Valid options are "theta" or "theta_vd". Defaults to "theta_vd". """ self.explicit_only = True from functools import partial # -------------------------------------------------------------------- # # Check arguments and generic initialisation # -------------------------------------------------------------------- # if not isinstance(equation, CompressibleEulerEquations): raise ValueError("Static adjustment can only be used with Compressible Euler equations") if theta_variable not in ['theta', 'theta_vd']: raise ValueError('Static adjustment: theta variable ' + f'{theta_variable} not valid') label_name = 'static_adjustment' super().__init__(equation, label_name, parameters=equation.parameters) self.X = Function(equation.X.function_space()) self.dt = Constant(0.0) # -------------------------------------------------------------------- # # Extract prognostic variables # -------------------------------------------------------------------- # theta_idx = equation.field_names.index('theta') Vt = equation.spaces[theta_idx] self.theta_to_sort = Function(Vt) sorted_theta = Function(Vt) theta = self.X.subfunctions[theta_idx] if theta_variable == 'theta' and 'water_vapour' in equation.field_names: Rv = equation.parameters.R_v Rd = equation.parameters.R_d mv_idx = equation.field_names.index('water_vapour') mv = self.X.subfunctions[mv_idx] self.get_theta_variable = Interpolator(theta / (1 + mv*Rv/Rd), self.theta_to_sort) self.set_theta_variable = Interpolator(self.theta_to_sort * (1 + mv*Rv/Rd), sorted_theta) else: self.get_theta_variable = Interpolator(theta, self.theta_to_sort) self.set_theta_variable = Interpolator(self.theta_to_sort, sorted_theta) # -------------------------------------------------------------------- # # Set up routines to reshape data # -------------------------------------------------------------------- # domain = equation.domain self.get_column_data = partial(domain.coords.get_column_data, field=self.theta_to_sort, domain=domain) self.set_column_data = domain.coords.set_field_from_column_data # -------------------------------------------------------------------- # # Set source term # -------------------------------------------------------------------- # tests = TestFunctions(self.X.function_space()) test = tests[theta_idx] source_expr = inner(test, sorted_theta - theta) / self.dt * dx equation.residual -= self.label(subject(prognostic(source_expr, 'theta'), equation.X), self.evaluate)
[docs] def evaluate(self, x_in, dt): """ Evaluates the source term generated by the physics. This does nothing if the implicit formulation is not used. Args: x_in: (:class: 'Function'): the (mixed) field to be evolved. dt: (:class: 'Constant'): the timestep, which can be the time interval for the scheme. """ logger.info(f'Evaluating physics parametrisation {self.label.label}') self.X.assign(x_in) self.dt.assign(dt) self.get_theta_variable.interpolate() theta_column_data, index_data = self.get_column_data() for col in range(theta_column_data.shape[0]): theta_column_data[col].sort() self.set_column_data(self.theta_to_sort, theta_column_data, index_data) self.set_theta_variable.interpolate()
[docs] class SuppressVerticalWind(PhysicsParametrisation): """ Suppresses the model's vertical wind, reducing it to zero. This is used for instance in a model's spin up period. """ def __init__(self, equation, spin_up_period): """ Args: equation (:class:`PrognosticEquationSet`): the model's equation. spin_up_period (`ufl.Constant`): this parametrisation is applied while the time is less than this -- corresponding to a spin up period. """ self.explicit_only = True # -------------------------------------------------------------------- # # Check arguments and generic initialisation # -------------------------------------------------------------------- # domain = equation.domain if not domain.mesh.extruded: raise RuntimeError("Suppress vertical wind can only be used with " + "extruded meshes") label_name = 'suppress_vertical_wind' super().__init__(equation, label_name, parameters=equation.parameters) self.X = Function(equation.X.function_space()) self.dt = Constant(0.0) self.t = domain.t self.spin_up_period = Constant(spin_up_period) self.strength = Constant(1.0) self.spin_up_done = False # -------------------------------------------------------------------- # # Extract prognostic variables # -------------------------------------------------------------------- # u_idx = equation.field_names.index('u') wind = self.X.subfunctions[u_idx] # -------------------------------------------------------------------- # # Set source term # -------------------------------------------------------------------- # tests = TestFunctions(self.X.function_space()) test = tests[u_idx] # The sink should be just the value of the current vertical wind source_expr = -self.strength * inner(test, domain.k*dot(domain.k, wind)) / self.dt * dx equation.residual -= self.label(subject(prognostic(source_expr, 'u'), equation.X), self.evaluate)
[docs] def evaluate(self, x_in, dt): """ Evaluates the source term generated by the physics. This does nothing if the implicit formulation is not used. Args: x_in: (:class: 'Function'): the (mixed) field to be evolved. dt: (:class: 'Constant'): the timestep, which can be the time interval for the scheme. """ if float(self.t) < float(self.spin_up_period): logger.info(f'Evaluating physics parametrisation {self.label.label}') self.X.assign(x_in) self.dt.assign(dt) elif not self.spin_up_done: self.strength.assign(0.0) self.spin_up_done = True
[docs] class BoundaryLayerMixing(PhysicsParametrisation): """ A simple boundary layer mixing scheme. This acts like a vertical diffusion, using an interior penalty method. """ def __init__(self, equation, field_name, parameters=None): """ Args: equation (:class:`PrognosticEquationSet`): the model's equation. field_name (str): name of the field to apply the diffusion to. ibp (:class:`IntegrateByParts`, optional): how many times to integrate the term by parts. Defaults to IntegrateByParts.ONCE. parameters (:class:`BoundaryLayerParameters`): configuration object giving the parameters for boundary and surface level schemes. Defaults to None, in which case default values are used. """ # -------------------------------------------------------------------- # # Check arguments and generic initialisation # -------------------------------------------------------------------- # if not isinstance(equation, CompressibleEulerEquations): raise ValueError("Boundary layer mixing can only be used with Compressible Euler equations") if field_name not in equation.field_names: raise ValueError(f'field {field_name} not found in equation') if parameters is None: parameters = BoundaryLayerParameters() label_name = f'boundary_layer_mixing_{field_name}' super().__init__(equation, label_name, parameters=None) self.X = Function(equation.X.function_space()) # -------------------------------------------------------------------- # # Extract prognostic variables # -------------------------------------------------------------------- # u_idx = equation.field_names.index('u') T_idx = equation.field_names.index('theta') rho_idx = equation.field_names.index('rho') u = split(self.X)[u_idx] rho = split(self.X)[rho_idx] theta_vd = split(self.X)[T_idx] boundary_method = BoundaryMethod.extruded if equation.domain.vertical_degree == 0 else None rho_averaged = Function(equation.function_space.sub(T_idx)) self.rho_recoverer = Recoverer(rho, rho_averaged, boundary_method=boundary_method) exner = thermodynamics.exner_pressure(equation.parameters, rho_averaged, theta_vd) # Alternative variables p = thermodynamics.p(equation.parameters, exner) p_top = Constant(85000.) p_strato = Constant(10000.) # -------------------------------------------------------------------- # # Expressions for diffusivity coefficients # -------------------------------------------------------------------- # z_a = parameters.height_surface_layer domain = equation.domain u_hori = u - domain.k*dot(u, domain.k) u_hori_mag = sqrt(dot(u_hori, u_hori)) if field_name == 'u': C_D0 = parameters.coeff_drag_0 C_D1 = parameters.coeff_drag_1 C_D2 = parameters.coeff_drag_2 C_D = conditional(u_hori_mag < 20.0, C_D0 + C_D1*u_hori_mag, C_D2) K = conditional(p > p_top, C_D*u_hori_mag*z_a, C_D*u_hori_mag*z_a*exp(-((p_top - p)/p_strato)**2)) else: C_E = parameters.coeff_evap K = conditional(p > p_top, C_E*u_hori_mag*z_a, C_E*u_hori_mag*z_a*exp(-((p_top - p)/p_strato)**2)) # -------------------------------------------------------------------- # # Make source expression # -------------------------------------------------------------------- # dx_reduced = dx(degree=4) dS_v_reduced = dS_v(degree=4) # Need to be careful with order of operations here, to correctly index # when the field is vector-valued d_dz = lambda q: outer(domain.k, dot(grad(q), domain.k)) n = FacetNormal(domain.mesh) # Work out vertical height xyz = SpatialCoordinate(domain.mesh) Vr = domain.spaces('L2') dz = Function(Vr) dz.interpolate(dot(domain.k, d_dz(dot(domain.k, xyz)))) mu = parameters.mu X = self.X tests = equation.tests idx = equation.field_names.index(field_name) test = tests[idx] field = X.subfunctions[idx] if field_name == 'u': # Horizontal diffusion only field = field - domain.k*dot(field, domain.k) # Interior penalty discretisation of vertical diffusion source_expr = ( # Volume term rho_averaged*K*inner(d_dz(test/rho_averaged), d_dz(field))*dx_reduced # Interior penalty surface term - 2*inner(avg(outer(K*field, n)), avg(d_dz(test)))*dS_v_reduced - 2*inner(avg(outer(test, n)), avg(d_dz(K*field)))*dS_v_reduced + 4*mu*avg(dz)*inner(avg(outer(K*field, n)), avg(outer(test, n)))*dS_v_reduced ) equation.residual += self.label( subject(prognostic(source_expr, field_name), X), self.evaluate)
[docs] def evaluate(self, x_in, dt): """ Evaluates the source term generated by the physics. This only recovers the density field. Args: x_in: (:class: 'Function'): the (mixed) field to be evolved. dt: (:class: 'Constant'): the timestep, which can be the time interval for the scheme. """ logger.info(f'Evaluating physics parametrisation {self.label.label}') self.X.assign(x_in) self.rho_recoverer.project()