Source code for gusto.diagnostics.diagnostics

"""Common diagnostic fields."""


from firedrake import (assemble, dot, dx, Function, sqrt, TestFunction,
                       TrialFunction, Constant, grad, inner, FacetNormal,
                       LinearVariationalProblem, LinearVariationalSolver,
                       ds_b, ds_v, ds_t, dS_h, dS_v, ds, dS, div, avg, pi,
                       TensorFunctionSpace, SpatialCoordinate, as_vector,
                       Projector, Interpolator, FunctionSpace, FiniteElement,
                       TensorProductElement)
from firedrake.assign import Assigner
from ufl.domain import extract_unique_domain

from abc import ABCMeta, abstractmethod, abstractproperty
from gusto.core.coord_transforms import rotated_lonlatr_vectors
from gusto.core.logging import logger
from gusto.core.kernels import MinKernel, MaxKernel
import numpy as np

__all__ = ["Diagnostics", "DiagnosticField", "CourantNumber", "Gradient",
           "XComponent", "YComponent", "ZComponent", "MeridionalComponent",
           "ZonalComponent", "RadialComponent", "Energy", "KineticEnergy",
           "Sum", "Difference", "SteadyStateError", "Perturbation",
           "Divergence", "TracerDensity", "IterativeDiagnosticField"]


[docs] class Diagnostics(object): """ Stores all diagnostic fields, and controls global diagnostics computation. This object stores the diagnostic fields to be output, and the computation of global values from them (such as global maxima or norms). """ available_diagnostics = ["min", "max", "rms", "l2", "total"] def __init__(self, *fields): """ Args: *fields: list of :class:`Function` objects of fields to be output. """ self.fields = list(fields)
[docs] def register(self, *fields): """ Registers diagnostic fields for outputting. Args: *fields: list of :class:`Function` objects of fields to be output. """ fset = set(self.fields) for f in fields: if f not in fset: self.fields.append(f)
[docs] @staticmethod def min(f): """ Finds the global minimum DoF value of a field. Args: f (:class:`Function`): field to compute diagnostic for. """ min_kernel = MinKernel() return min_kernel.apply(f)
[docs] @staticmethod def max(f): """ Finds the global maximum DoF value of a field. Args: f (:class:`Function`): field to compute diagnostic for. """ max_kernel = MaxKernel() return max_kernel.apply(f)
[docs] @staticmethod def rms(f): """ Calculates the root-mean-square of a field. Args: f (:class:`Function`): field to compute diagnostic for. """ area = assemble(1*dx(domain=extract_unique_domain(f))) return sqrt(assemble(inner(f, f)*dx)/area)
[docs] @staticmethod def l2(f): """ Calculates the L2 norm of a field. Args: f (:class:`Function`): field to compute diagnostic for. """ return sqrt(assemble(inner(f, f)*dx))
[docs] @staticmethod def total(f): """ Calculates the total of a field. Only applicable for fields with scalar-values. Args: f (:class:`Function`): field to compute diagnostic for. """ if len(f.ufl_shape) == 0: return assemble(f * dx) else: pass
[docs] class DiagnosticField(object, metaclass=ABCMeta): """Base object to represent diagnostic fields for outputting.""" def __init__(self, space=None, method='interpolate', required_fields=()): """ Args: space (:class:`FunctionSpace`, optional): the function space to evaluate the diagnostic field in. Defaults to None, in which case a default space will be chosen for this diagnostic. method (str, optional): a string specifying the method of evaluation for this diagnostic. Valid options are 'interpolate', 'project', 'assign' and 'solve'. Defaults to 'interpolate'. required_fields (tuple, optional): tuple of names of the fields that are required for the computation of this diagnostic field. Defaults to (). """ assert method in ['interpolate', 'project', 'solve', 'assign'], \ f'Invalid evaluation method {self.method} for diagnostic {self.name}' self._initialised = False self.required_fields = required_fields self.space = space self.method = method self.expr = None self.to_dump = True # Property to allow graceful failures if solve method not valid if not hasattr(self, "solve_implemented"): self.solve_implemented = False if method == 'solve' and not self.solve_implemented: raise NotImplementedError(f'Solve method has not been implemented for diagnostic {self.name}') @abstractproperty def name(self): """The name of this diagnostic field""" pass
[docs] @abstractmethod def setup(self, domain, state_fields, space=None): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. space (:class:`FunctionSpace`, optional): the function space for the diagnostic field to be computed in. Defaults to None, in which case the space will be DG0. """ if not self._initialised: if self.space is None: if space is None: if not hasattr(domain.spaces, "DG0"): space = domain.spaces.create_space("DG0", "DG", 0) else: space = domain.spaces("DG0") self.space = space else: space = self.space # Add space to domain assert space.name is not None, \ f'Diagnostics {self.name} is using a function space which does not have a name' if not hasattr(domain.spaces, space.name): domain.spaces.add_space(space.name, space) self.field = state_fields(self.name, space=space, dump=self.to_dump, pick_up=False) if self.method != 'solve': assert self.expr is not None, \ f"The expression for diagnostic {self.name} has not been specified" # Solve method must be declared in diagnostic's own setup routine if self.method == 'interpolate': self.evaluator = Interpolator(self.expr, self.field) elif self.method == 'project': self.evaluator = Projector(self.expr, self.field) elif self.method == 'assign': self.evaluator = Assigner(self.field, self.expr) self._initialised = True
[docs] def compute(self): """Compute the diagnostic field from the current state.""" logger.debug(f'Computing diagnostic {self.name} with {self.method} method') if self.method == 'interpolate': self.evaluator.interpolate() elif self.method == 'assign': self.evaluator.assign() elif self.method == 'project': self.evaluator.project() elif self.method == 'solve': self.evaluator.solve()
def __call__(self): """Return the diagnostic field computed from the current state.""" self.compute() return self.field
[docs] class IterativeDiagnosticField(DiagnosticField): """ Iterative evaluation of a diagnostic expression for diagnostics with no explicit definition but an implicit definition. This uses a form of damped Picard iteration. """ def __init__(self, space=None, method='interpolate', required_fields=(), num_iterations=3, gamma=0.8): """ Args: space (:class:`FunctionSpace`, optional): the function space to evaluate the diagnostic field in. Defaults to None, in which case a default space will be chosen for this diagnostic. method (str, optional): a string specifying the method of evaluation for this diagnostic. Valid options are 'interpolate', 'project', 'assign' and 'solve'. Defaults to 'interpolate'. required_fields (tuple, optional): tuple of names of the fields that are required for the computation of this diagnostic field. Defaults to (). num_iterations (integer, optional): number of times to iteratively evaluate the expression. Defaults to 3. gamma (float, optional): weight given to previous guess, which is used to avoid numerical instabilities. """ super().__init__(space=space, method=method, required_fields=required_fields) self.num_iterations = num_iterations self.gamma = gamma
[docs] def setup(self, domain, state_fields, space=None): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. space (:class:`FunctionSpace`, optional): the function space for the diagnostic field to be computed in. Defaults to None, in which case the space will be DG0. """ # NB: child classes of this need to set up: # (a) self.implicit_expr: the implicit UFL expression for the diagnostic # which must depend on self.field # (b) self.first_guess: the UFL expression for the first guess if not self._initialised: if self.space is None: if space is None: if not hasattr(domain.spaces, "DG0"): space = domain.spaces.create_space("DG0", "DG", 0) else: space = domain.spaces("DG0") self.space = space else: space = self.space # Add space to domain assert space.name is not None, \ f'Diagnostics {self.name} is using a function space which does not have a name' if not hasattr(domain.spaces, space.name): domain.spaces.add_space(space.name, space) self.field = state_fields(self.name, space=space, dump=self.to_dump, pick_up=False) self.expr = (1.0 - self.gamma)*self.field + self.gamma*self.implicit_expr(domain, state_fields) self.first_guess = self.set_first_guess(domain, state_fields) if self.method != 'solve': assert self.expr is not None, \ f"The expression for diagnostic {self.name} has not been specified" # Solve method must be declared in diagnostic's own setup routine if self.method == 'interpolate': self.evaluator = Interpolator(self.expr, self.field) elif self.method == 'project': self.evaluator = Projector(self.expr, self.field) elif self.method == 'assign': self.evaluator = Assigner(self.field, self.expr) self._initialised = True
@property @abstractmethod def implicit_expr(self, domain, state_fields): """ The implicit UFL expression for the diagnostic, which should depend on self.field Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ pass @property @abstractmethod def set_first_guess(self, domain, state_fields): """ The first guess of the diagnostic Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ pass
[docs] def compute(self): """Compute the diagnostic field from the current state.""" # Set first guess self.field.interpolate(self.first_guess) # Iterate for _ in range(self.num_iterations): super().compute()
[docs] class CourantNumber(DiagnosticField): """Dimensionless Courant number diagnostic field.""" name = "CourantNumber" def __init__(self, velocity='u', component='whole', name=None, to_dump=True, space=None, method='interpolate', required_fields=()): """ Args: velocity (str or :class:`ufl.Expr`, optional): the velocity field to take the Courant number of. Can be a string referring to an existing field, or an expression. If it is an expression, the name argument is required. Defaults to 'u'. component (str, optional): the component of the velocity to use for calculating the Courant number. Valid values are "whole", "horizontal" or "vertical". Defaults to "whole". name (str, optional): the name to append to "CourantNumber" to form the name of this diagnostic. This argument must be provided if the velocity is an expression (rather than a string). Defaults to None. to_dump (bool, optional): whether this diagnostic should be dumped. Defaults to True. space (:class:`FunctionSpace`, optional): the function space to evaluate the diagnostic field in. Defaults to None, in which case a default space will be chosen for this diagnostic. method (str, optional): a string specifying the method of evaluation for this diagnostic. Valid options are 'interpolate', 'project', 'assign' and 'solve'. Defaults to 'interpolate'. required_fields (tuple, optional): tuple of names of the fields that are required for the computation of this diagnostic field. Defaults to (). """ if component not in ["whole", "horizontal", "vertical"]: raise ValueError(f'component arg {component} not valid. Allowed ' + 'values are "whole", "horizontal" and "vertical"') self.component = component # Work out whether to take Courant number from field or expression if type(velocity) is str: # Default name should just be CourantNumber if velocity == 'u': self.name = 'CourantNumber' elif name is None: self.name = 'CourantNumber_'+velocity else: self.name = 'CourantNumber_'+name if component != 'whole': self.name += '_'+component else: if name is None: raise ValueError('CourantNumber diagnostic: if provided ' + 'velocity is an expression then the name ' + 'argument must be provided') self.name = 'CourantNumber_'+name self.velocity = velocity super().__init__(space=space, method=method, required_fields=required_fields) # Done after super init to ensure that it is not always set to True self.to_dump = to_dump
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ V = FunctionSpace(domain.mesh, "DG", 0) test = TestFunction(V) cell_volume = Function(V) self.cell_flux = Function(V) # Calculate cell volumes One = Function(V).assign(1) assemble(One*test*dx, tensor=cell_volume) # Get the velocity that is being used if type(self.velocity) is str: u = state_fields(self.velocity) else: u = self.velocity # Determine the component of the velocity if self.component == "whole": u_expr = u elif self.component == "vertical": u_expr = dot(u, domain.k)*domain.k elif self.component == "horizontal": u_expr = u - dot(u, domain.k)*domain.k # Work out which facet integrals to use if domain.mesh.extruded: dS_calc = dS_v + dS_h ds_calc = ds_v + ds_t + ds_b else: dS_calc = dS ds_calc = ds # Set up form for DG flux n = FacetNormal(domain.mesh) # Check if the velocity is a vector or scalar field if u.ufl_shape == (): un = 0.5*(-u_expr * n[0] + abs(-u_expr * n[0])) else: un = 0.5*(inner(-u_expr, n) + abs(inner(-u_expr, n))) self.cell_flux_form = 2*avg(un*test)*dS_calc + un*test*ds_calc # Final Courant number expression self.expr = self.cell_flux * domain.dt / cell_volume super().setup(domain, state_fields)
[docs] def compute(self): """Compute the diagnostic field from the current state.""" assemble(self.cell_flux_form, tensor=self.cell_flux) super().compute()
[docs] class Gradient(DiagnosticField): """Diagnostic for computing the gradient of fields.""" def __init__(self, name, space=None, method='solve'): """ Args: name (str): name of the field to compute the gradient of. space (:class:`FunctionSpace`, optional): the function space to evaluate the diagnostic field in. Defaults to None, in which case a default space will be chosen for this diagnostic. method (str, optional): a string specifying the method of evaluation for this diagnostic. Valid options are 'interpolate', 'project', 'assign' and 'solve'. Defaults to 'solve'. """ self.fname = name self.solve_implemented = True super().__init__(space=space, method=method, required_fields=(name,)) @property def name(self): """Gives the name of this diagnostic field.""" return self.fname+"_gradient"
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ f = state_fields(self.fname) mesh_dim = domain.mesh.geometric_dimension() try: field_dim = state_fields(self.fname).ufl_shape[0] except IndexError: field_dim = 1 shape = (mesh_dim, ) * field_dim space = TensorFunctionSpace(domain.mesh, "CG", 1, shape=shape, name=f'Tensor{field_dim}_CG1') if self.method != 'solve': self.expr = grad(f) super().setup(domain, state_fields, space=space) # Set up problem now that self.field has been set up if self.method == 'solve': test = TestFunction(space) trial = TrialFunction(space) n = FacetNormal(domain.mesh) a = inner(test, trial)*dx L = -inner(div(test), f)*dx if space.extruded: L += dot(dot(test, n), f)*(ds_t + ds_b) prob = LinearVariationalProblem(a, L, self.field) self.evaluator = LinearVariationalSolver(prob)
[docs] class Divergence(DiagnosticField): """Diagnostic for computing the divergence of vector-valued fields.""" def __init__(self, name='u', space=None, method='interpolate'): """ Args: name (str, optional): name of the field to compute the gradient of. Defaults to 'u', in which case this takes the divergence of the wind field. space (:class:`FunctionSpace`, optional): the function space to evaluate the diagnostic field in. Defaults to None, in which case the default space is the domain's DG space. method (str, optional): a string specifying the method of evaluation for this diagnostic. Valid options are 'interpolate', 'project', 'assign' and 'solve'. Defaults to 'interpolate'. """ self.fname = name super().__init__(space=space, method=method, required_fields=(self.fname,)) @property def name(self): """Gives the name of this diagnostic field.""" return self.fname+"_divergence"
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ f = state_fields(self.fname) self.expr = div(f) space = domain.spaces("DG") super().setup(domain, state_fields, space=space)
class VectorComponent(DiagnosticField): """Base diagnostic for orthogonal components of vector-valued fields.""" def __init__(self, name, space=None, method='interpolate'): """ Args: name (str): name of the field to compute the component of. space (:class:`FunctionSpace`, optional): the function space to evaluate the diagnostic field in. Defaults to None, in which case the default space is the domain's DG space. method (str, optional): a string specifying the method of evaluation for this diagnostic. Valid options are 'interpolate', 'project', 'assign' and 'solve'. Defaults to 'interpolate'. """ self.fname = name super().__init__(space=space, method=method, required_fields=(name,)) def setup(self, domain, state_fields, unit_vector): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. unit_vector (:class:`ufl.Expr`): the unit vector to extract the component for. This assumes an orthogonal coordinate system. """ f = state_fields(self.fname) self.expr = dot(f, unit_vector) super().setup(domain, state_fields)
[docs] class XComponent(VectorComponent): """The geocentric Cartesian x-component of a vector-valued field.""" @property def name(self): """Gives the name of this diagnostic field.""" return self.fname+"_x"
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ dim = domain.mesh.topological_dimension() e_x = as_vector([Constant(1.0)]+[Constant(0.0)]*(dim-1)) super().setup(domain, state_fields, e_x)
[docs] class YComponent(VectorComponent): """The geocentric Cartesian y-component of a vector-valued field.""" @property def name(self): """Gives the name of this diagnostic field.""" return self.fname+"_y"
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ assert domain.metadata['domain_type'] not in ['interval', 'vertical_slice'], \ f'Y-component diagnostic cannot be used with domain {domain.metadata["domain_type"]}' dim = domain.mesh.topological_dimension() e_y = as_vector([Constant(0.0), Constant(1.0)]+[Constant(0.0)]*(dim-2)) super().setup(domain, state_fields, e_y)
[docs] class ZComponent(VectorComponent): """The geocentric Cartesian z-component of a vector-valued field.""" @property def name(self): """Gives the name of this diagnostic field.""" return self.fname+"_z"
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ assert domain.metadata['domain_type'] not in ['interval', 'plane'], \ f'Z-component diagnostic cannot be used with domain {domain.metadata["domain_type"]}' dim = domain.mesh.topological_dimension() e_x = as_vector([Constant(0.0)]*(dim-1)+[Constant(1.0)]) super().setup(domain, state_fields, e_x)
class SphericalComponent(VectorComponent): """Base diagnostic for computing spherical-polar components of fields.""" def __init__(self, name, rotated_pole=None, space=None, method='interpolate'): """ Args: name (str): name of the field to compute the component of. rotated_pole (tuple of floats, optional): a tuple of floats (lon, lat) of the new pole, in the original coordinate system. The longitude and latitude must be expressed in radians. Defaults to None, corresponding to a pole of (0, pi/2). space (:class:`FunctionSpace`, optional): the function space to evaluate the diagnostic field in. Defaults to None, in which case the default space is the domain's DG space. method (str, optional): a string specifying the method of evaluation for this diagnostic. Valid options are 'interpolate', 'project', 'assign' and 'solve'. Defaults to 'interpolate'. """ self.rotated_pole = (0.0, pi/2) if rotated_pole is None else rotated_pole super().__init__(name=name, space=space, method=method) def _check_args(self, domain, field): """ Checks the validity of the domain and field for taking the spherical component diagnostic. Args: domain (:class:`Domain`): the model's domain object. field (:class:`Function`): the field to take the component of. """ # check geometric dimension is 3D if domain.mesh.geometric_dimension() != 3: raise ValueError('Spherical components only work when the geometric dimension is 3!') if np.prod(field.ufl_shape) != 3: raise ValueError('Components can only be found of a vector function space in 3D.')
[docs] class MeridionalComponent(SphericalComponent): """The meridional component of a vector-valued field.""" @property def name(self): """Gives the name of this diagnostic field.""" return self.fname+"_meridional"
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ f = state_fields(self.fname) self._check_args(domain, f) xyz = SpatialCoordinate(domain.mesh) _, e_lat, _ = rotated_lonlatr_vectors(xyz, self.rotated_pole) super().setup(domain, state_fields, e_lat)
[docs] class ZonalComponent(SphericalComponent): """The zonal component of a vector-valued field.""" @property def name(self): """Gives the name of this diagnostic field.""" return self.fname+"_zonal"
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ f = state_fields(self.fname) self._check_args(domain, f) xyz = SpatialCoordinate(domain.mesh) e_lon, _, _ = rotated_lonlatr_vectors(xyz, self.rotated_pole) super().setup(domain, state_fields, e_lon)
[docs] class RadialComponent(SphericalComponent): """The radial component of a vector-valued field.""" @property def name(self): """Gives the name of this diagnostic field.""" return self.fname+"_radial"
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ f = state_fields(self.fname) self._check_args(domain, f) xyz = SpatialCoordinate(domain.mesh) _, _, e_r = rotated_lonlatr_vectors(xyz, self.rotated_pole) super().setup(domain, state_fields, e_r)
# TODO: unify all energy diagnostics -- should be based on equation
[docs] class Energy(DiagnosticField): """Base diagnostic field for computing energy density fields."""
[docs] def kinetic(self, u, factor=None): """ Computes a kinetic energy term. Args: u (:class:`ufl.Expr`): the velocity variable. factor (:class:`ufl.Expr`, optional): factor to multiply the term by (e.g. a density variable). Defaults to None. Returns: :class:`ufl.Expr`: the kinetic energy term """ if factor is not None: energy = 0.5*factor*dot(u, u) else: energy = 0.5*dot(u, u) return energy
[docs] class KineticEnergy(Energy): """Diagnostic kinetic energy density.""" name = "KineticEnergy" def __init__(self, space=None, method='interpolate'): """ Args: space (:class:`FunctionSpace`, optional): the function space to evaluate the diagnostic field in. Defaults to None, in which case a default space will be chosen for this diagnostic. method (str, optional): a string specifying the method of evaluation for this diagnostic. Valid options are 'interpolate', 'project', 'assign' and 'solve'. Defaults to 'interpolate'. """ super().__init__(space=space, method=method, required_fields=("u"))
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ u = state_fields("u") self.expr = self.kinetic(u) super().setup(domain, state_fields)
[docs] class Sum(DiagnosticField): """Base diagnostic for computing the sum of two fields.""" def __init__(self, field_name1, field_name2): """ Args: field_name1 (str): the name of one field to be added. field_name2 (str): the name of the other field to be added. """ super().__init__(method='assign', required_fields=(field_name1, field_name2)) self.field_name1 = field_name1 self.field_name2 = field_name2 @property def name(self): """Gives the name of this diagnostic field.""" return self.field_name1+"_plus_"+self.field_name2
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ field1 = state_fields(self.field_name1) field2 = state_fields(self.field_name2) space = field1.function_space() self.expr = field1 + field2 super().setup(domain, state_fields, space=space)
[docs] class Difference(DiagnosticField): """Base diagnostic for calculating the difference between two fields.""" def __init__(self, field_name1, field_name2): """ Args: field_name1 (str): the name of the field to be subtracted from. field_name2 (str): the name of the field to be subtracted. """ super().__init__(method='assign', required_fields=(field_name1, field_name2)) self.field_name1 = field_name1 self.field_name2 = field_name2 @property def name(self): """Gives the name of this diagnostic field.""" return self.field_name1+"_minus_"+self.field_name2
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ field1 = state_fields(self.field_name1) field2 = state_fields(self.field_name2) self.expr = field1 - field2 space = field1.function_space() super().setup(domain, state_fields, space=space)
[docs] class SteadyStateError(Difference): """Base diagnostic for computing the steady-state error in a field.""" def __init__(self, name): """ Args: name (str): name of the field to take the steady-state error of. """ self.field_name1 = name self.field_name2 = name+'_init' DiagnosticField.__init__(self, method='assign', required_fields=(name, self.field_name2))
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ # Check if initial field already exists -- otherwise needs creating if not hasattr(state_fields, self.field_name2): field1 = state_fields(self.field_name1) field2 = state_fields(self.field_name2, space=field1.function_space(), pick_up=True, dump=False) # Attach state fields to self so that we can pick it up in compute self.state_fields = state_fields # The initial value for fields may not have already been set yet so we # postpone setting it until the compute method is called self.init_field_set = False else: field1 = state_fields(self.field_name1) field2 = state_fields(self.field_name2, space=field1.function_space(), pick_up=True, dump=False) # By default set this new field to the current value # This may be overwritten if picking up from a checkpoint field2.assign(field1) self.state_fields = state_fields self.init_field_set = True super().setup(domain, state_fields)
[docs] def compute(self): # The first time the compute method is called we set the initial field. # We do not want to do this if picking up from a checkpoint if not self.init_field_set: # Set initial field full_field = self.state_fields(self.field_name1) init_field = self.state_fields(self.field_name2) init_field.assign(full_field) self.init_field_set = True super().compute()
@property def name(self): """Gives the name of this diagnostic field.""" return self.field_name1+"_error"
[docs] class Perturbation(Difference): """Base diagnostic for computing perturbations from a reference profile.""" def __init__(self, name): """ Args: name (str): name of the field to take the perturbation of. """ self.field_name1 = name self.field_name2 = name+'_bar' DiagnosticField.__init__(self, method='assign', required_fields=(name, self.field_name2)) @property def name(self): """Gives the name of this diagnostic field.""" return self.field_name1+"_perturbation"
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ # Check if initial field already exists -- otherwise needs creating if not hasattr(state_fields, self.field_name2): field1 = state_fields(self.field_name1) _ = state_fields(self.field_name2, space=field1.function_space(), pick_up=True, dump=False) super().setup(domain, state_fields)
[docs] class TracerDensity(DiagnosticField): """Diagnostic for computing the density of a tracer. This is computed as the product of a mixing ratio and dry density""" @property def name(self): """Gives the name of this diagnostic field. This records the mixing ratio and density names, in case multiple tracer densities are used.""" return "TracerDensity_"+self.mixing_ratio_name+'_'+self.density_name def __init__(self, mixing_ratio_name, density_name, space=None, method='interpolate'): """ Args: mixing_ratio_name (str): the name of the tracer mixing ratio variable density_name (str): the name of the tracer density variable space (:class:`FunctionSpace`, optional): the function space to evaluate the diagnostic field in. Defaults to None, in which case a new space will be constructed for this diagnostic. This space will have enough a high enough degree to accurately compute the product of the mixing ratio and density. method (str, optional): a string specifying the method of evaluation for this diagnostic. Valid options are 'interpolate', 'project' and 'assign'. Defaults to 'interpolate'. """ super().__init__(space=space, method=method, required_fields=(mixing_ratio_name, density_name)) self.mixing_ratio_name = mixing_ratio_name self.density_name = density_name
[docs] def setup(self, domain, state_fields): """ Sets up the :class:`Function` for the diagnostic field. Args: domain (:class:`Domain`): the model's domain object. state_fields (:class:`StateFields`): the model's field container. """ m_X = state_fields(self.mixing_ratio_name) rho_d = state_fields(self.density_name) self.expr = m_X*rho_d if self.space is None: # Construct a space for the diagnostic that has enough # degrees to accurately capture the tracer density. This # will be the sum of the degrees of the individual mixing ratio # and density function spaces. m_X_space = m_X.function_space() rho_d_space = rho_d.function_space() if domain.spaces.extruded_mesh: # Extract the base horizontal and vertical elements # for the mixing ratio and density. m_X_horiz = m_X_space.ufl_element().sub_elements[0] m_X_vert = m_X_space.ufl_element().sub_elements[1] rho_d_horiz = rho_d_space.ufl_element().sub_elements[0] rho_d_vert = rho_d_space.ufl_element().sub_elements[1] horiz_degree = m_X_horiz.degree() + rho_d_horiz.degree() vert_degree = m_X_vert.degree() + rho_d_vert.degree() cell = domain.mesh._base_mesh.ufl_cell().cellname() horiz_elt = FiniteElement('DG', cell, horiz_degree) vert_elt = FiniteElement('DG', cell, vert_degree) elt = TensorProductElement(horiz_elt, vert_elt) else: m_X_degree = m_X_space.ufl_element().degree() rho_d_degree = rho_d_space.ufl_element().degree() degree = m_X_degree + rho_d_degree cell = domain.mesh.ufl_cell().cellname() elt = FiniteElement('DG', cell, degree) tracer_density_space = FunctionSpace(domain.mesh, elt, name='tracer_density_space') super().setup(domain, state_fields, space=tracer_density_space) else: super().setup(domain, state_fields)