Source code for gusto.recovery.recovery

"""
Operators to recover fields from a discontinuous to a continuous function space.

The recovery operators provided in this module are used to restore continuity
in a discontinuous field, or to reconstruct a higher-order field from a lower-
order field, which can be used to improve the accuracy of lowest-order spatial
discretisations.
"""
from enum import Enum

import ufl
from firedrake import (BrokenElement, Constant, DirichletBC, FiniteElement,
                       Function, FunctionSpace, Interpolator, Projector,
                       SpatialCoordinate, TensorProductElement,
                       VectorFunctionSpace, as_vector, function, interval,
                       VectorElement)
from gusto.recovery import Averager
from .recovery_kernels import (BoundaryRecoveryExtruded, BoundaryRecoveryHCurl,
                               BoundaryGaussianElimination)


__all__ = ["BoundaryMethod", "BoundaryRecoverer", "Recoverer"]


[docs] class BoundaryMethod(Enum): """ Method for correcting the recovery at the domain boundaries. An enumerator object encoding methods for correcting boundary recovery: extruded: which corrects a scalar field on an extruded mesh at the top and bottom boundaries. \n hcurl: this corrects the recovery of a HDiv field into a HCurl space at the top and bottom boundaries of an extruded mesh. \n taylor: uses a Taylor expansion to correct the field at all the boundaries of the domain. Should only be used in Cartesian domains. """ extruded = 0 hcurl = 1 taylor = 2
[docs] class BoundaryRecoverer(object): """ Corrects values in domain boundary cells that have been recovered. An object that performs a `recovery` process at the domain boundaries that has second-order accuracy. This is necessary because the :class:`Averager` object does not recover a field with sufficient accuracy at the boundaries. The strategy is to expand the function at the boundary using a Taylor expansion. The quickest way to perform this is by using the analytic solution and a parloop. This is only implemented to recover to the CG1 function space. """ def __init__(self, x_inout, method=BoundaryMethod.extruded, eff_coords=None): """ Args: v_CG1 (:class:`Function`): the continuous function after the first recovery is performed. Should be in a first-order continuous :class:`FunctionSpace`. This is already correct on the interior of the domain. It will be returned with corrected values. method (:class:`BoundaryMethod`, optional): enumerator specifying the method to use. Defaults to `BoundaryMethod.extruded`. eff_coords (:class:`Function`, optional): the effective coordinates corresponding to the initial recovery process. Should be in the :class:`VectorFunctionSpace` corresponding to the space of the `v_DG1` variable. This must be provided for the Taylor expansion boundary method. Defaults to None. Raises: ValueError: if the `v_CG1` field is in a space that is not CG1 when using the Taylor boundary method. ValueError: if the `v_DG1` field is in a space that is not the DG1 equispaced space when using the Taylor boundary method. ValueError: if the effective coordinates are not provided when using the Taylor expansion boundary method. ValueError: using the extruded or hcurl boundary methods with a non-extruded mesh. """ self.x_inout = x_inout self.method = method self.eff_coords = eff_coords V_inout = x_inout.function_space() mesh = V_inout.mesh() # -------------------------------------------------------------------- # # Checks # -------------------------------------------------------------------- # if self.method == BoundaryMethod.taylor: CG1 = FunctionSpace(mesh, "CG", 1) if x_inout.function_space() != CG1: raise ValueError("This boundary recovery method requires v1 to be in CG1.") if eff_coords is None: raise ValueError('Need eff_coords field for Taylor expansion boundary method') elif self.method in [BoundaryMethod.extruded, BoundaryMethod.hcurl]: # check that mesh is valid -- must be an extruded mesh if not V_inout.extruded: raise ValueError('The extruded boundary method only works on extruded meshes') else: raise ValueError("Boundary method should be a Boundary Method Enum object.") # -------------------------------------------------------------------- # # Initalisation for different boundary methods # -------------------------------------------------------------------- # if self.method == BoundaryMethod.extruded: # create field to temporarily hold values self.x_tmp = Function(V_inout) self.kernel = BoundaryRecoveryExtruded(V_inout) elif self.method == BoundaryMethod.hcurl: # create field to temporarily hold values self.x_tmp = Function(V_inout) self.kernel = BoundaryRecoveryHCurl(V_inout) elif self.method == BoundaryMethod.taylor: # Create DG1 space ----------------------------------------------- # if V_inout.extruded: cell = mesh._base_mesh.ufl_cell().cellname() DG1_hori_elt = FiniteElement("DG", cell, 1, variant="equispaced") DG1_vert_elt = FiniteElement("DG", interval, 1, variant="equispaced") DG1_element = TensorProductElement(DG1_hori_elt, DG1_vert_elt) else: cell = mesh.ufl_cell().cellname() DG1_element = FiniteElement("DG", cell, 1, variant="equispaced") vec_DG1 = VectorFunctionSpace(mesh, DG1_element) # Create coordinates --------------------------------------------- # coords = SpatialCoordinate(mesh) self.act_coords = Function(vec_DG1).project(coords) # actual coordinates self.eff_coords = eff_coords # effective coordinates self.on_exterior = find_domain_boundaries(mesh) self.num_ext = find_domain_boundaries(mesh) # Make operators used in process --------------------------------- # V_broken = FunctionSpace(mesh, BrokenElement(V_inout.ufl_element())) self.x_DG1_wrong = Function(V_broken) self.x_DG1_correct = Function(V_broken) self.interpolator = Interpolator(self.x_inout, self.x_DG1_wrong) self.averager = Averager(self.x_DG1_correct, self.x_inout) self.kernel = BoundaryGaussianElimination(V_broken)
[docs] def apply(self): """Applies the boundary recovery process.""" if self.method == BoundaryMethod.taylor: self.interpolator.interpolate() self.kernel.apply(self.x_DG1_wrong, self.x_DG1_correct, self.act_coords, self.eff_coords, self.num_ext) self.averager.project() else: self.x_tmp.assign(self.x_inout) self.kernel.apply(self.x_inout, self.x_tmp)
[docs] class Recoverer(object): """ Recovers a field from a low-order space to a higher-order space. An object that 'recovers' a field from a low-order space (e.g. DG0) into a higher-order space (e.g. CG1). This first interpolates or projects the field into the broken (fully-discontinuous) form of the target higher-order space, then uses the :class:`Averager` to restore continuity. This may not be accurate at domain boundaries, so if specified, the field will then be corrected using the :class:`BoundaryRecoverer`. """ def __init__(self, x_in, x_out, method='interpolate', boundary_method=None): """ Args: x_in (:class:`Function`): the field or :class:`ufl.Expr` to project. For instance this could be in the DG0 space. x_out (:class:`Function`): to field to put the result in. This could for instance lie in the CG1 space. method (str, optional): method for obtaining the field in the broken space. Must be 'interpolate' or 'project'. Defaults to 'interpolate'. boundary_method (:class:`BoundaryMethod`, optional): enumerator specifying the boundary method to use. Defaults to None. Raises: ValueError: the `VDG` argument must be specified if the `boundary_method` is not None. """ # check if v_in is valid if not isinstance(x_in, (ufl.core.expr.Expr, function.Function)): raise ValueError("Can only recover UFL expression or Functions not '%s'" % type(x_in)) self.x_out = x_out V_out = x_out.function_space() mesh = V_out.mesh() rec_elt = V_out.ufl_element() # -------------------------------------------------------------------- # # Set up broken space # -------------------------------------------------------------------- # self.vector_function_space = isinstance(rec_elt, VectorElement) if self.vector_function_space: # VectorElement has to be on the outside # so first need to get underlying finite element brok_elt = VectorElement(BrokenElement(rec_elt.sub_elements[0])) else: # Otherwise we can immediately get broken element brok_elt = BrokenElement(rec_elt) V_brok = FunctionSpace(mesh, brok_elt) # -------------------------------------------------------------------- # # Set up interpolation / projection # -------------------------------------------------------------------- # x_brok = Function(V_brok) self.method = method if method == 'interpolate': self.broken_op = Interpolator(x_in, x_brok) elif method == 'project': self.broken_op = Projector(x_in, x_brok) else: raise ValueError(f'Valid methods are "interpolate" or "project", not {method}') self.averager = Averager(x_brok, self.x_out) # -------------------------------------------------------------------- # # Set up boundary recovery # -------------------------------------------------------------------- # self.boundary_method = boundary_method # check boundary method options are valid if boundary_method is not None: if boundary_method not in [BoundaryMethod.extruded, BoundaryMethod.taylor, BoundaryMethod.hcurl]: raise TypeError("Boundary method must be a BoundaryMethod Enum object.") # now specify things that we'll need if we are doing boundary recovery if boundary_method == BoundaryMethod.extruded: # check dimensions if self.vector_function_space: raise ValueError('This method only works for scalar functions.') self.boundary_recoverer = BoundaryRecoverer(self.x_out, method=BoundaryMethod.extruded) elif boundary_method == BoundaryMethod.hcurl: self.boundary_recoverer = BoundaryRecoverer(self.x_out, method=BoundaryMethod.hcurl) else: eff_coords = find_eff_coords(x_in.function_space()) # For scalar functions, just set up boundary recoverer and return field into x_brok if not self.vector_function_space: self.boundary_recoverer = BoundaryRecoverer(self.x_out, method=BoundaryMethod.taylor, eff_coords=eff_coords) else: # Must set up scalar functions for each component CG1 = FunctionSpace(mesh, "CG", 1) # now, break the problem down into components x_out_scalars = [] self.boundary_recoverers = [] self.interpolate_to_scalars = [] self.extra_averagers = [] # the boundary recoverer needs to be done on a scalar fields # so need to extract component and restore it after the boundary recovery is done for i in range(V_out.value_size): x_out_scalars.append(Function(CG1)) self.interpolate_to_scalars.append(Interpolator(self.x_out[i], x_out_scalars[i])) self.boundary_recoverers.append(BoundaryRecoverer(x_out_scalars[i], method=BoundaryMethod.taylor, eff_coords=eff_coords[i])) self.interpolate_to_vector = Interpolator(as_vector(x_out_scalars), self.x_out)
[docs] def project(self): """Perform the whole recovery step.""" # Initial averaging step self.broken_op.project() if self.method == 'project' else self.broken_op.interpolate() self.averager.project() # Boundary recovery if self.boundary_method is not None: # For vector elements, treat each component separately if self.vector_function_space: for (interpolate_to_scalar, boundary_recoverer) \ in zip(self.interpolate_to_scalars, self.boundary_recoverers): interpolate_to_scalar.interpolate() # Correct at boundaries boundary_recoverer.apply() # Combine the components to obtain the vector field self.interpolate_to_vector.interpolate() else: # Extrapolate at boundaries self.boundary_recoverer.apply() return self.x_out
def find_eff_coords(V0): """ Find the effective coordinates corresponding to a recovery process. Takes a function in a space `V0` and returns the effective coordinates, in an equispaced vector DG1 space, of a recovery into a CG1 field. This is for use with the :class:`BoundaryRecoverer`, as it facilitates the Gaussian elimination used to get second-order recovery at boundaries. If `V0` is a vector function space, this returns an array of coordinates for each component. Geocentric Cartesian coordinates are returned. Args: V0 (:class:`FunctionSpace`): the function space of the original field before the recovery process. """ mesh = V0.mesh() if V0.extruded: cell = mesh._base_mesh.ufl_cell().cellname() DG1_hori_elt = FiniteElement("DG", cell, 1, variant="equispaced") DG1_vert_elt = FiniteElement("DG", interval, 1, variant="equispaced") DG1_element = TensorProductElement(DG1_hori_elt, DG1_vert_elt) else: cell = mesh.ufl_cell().cellname() DG1_element = FiniteElement("DG", cell, 1, variant="equispaced") vec_CG1 = VectorFunctionSpace(mesh, "CG", 1) vec_DG1 = VectorFunctionSpace(mesh, DG1_element) x = SpatialCoordinate(mesh) if isinstance(V0.ufl_element(), VectorElement) or V0.ufl_element().value_size > 1: eff_coords_list = [] V0_coords_list = [] # treat this separately for each component for i in range(V0.ufl_element().value_size): # fill an d-dimensional list with i-th coordinate x_list = [x[i] for j in range(V0.ufl_element().value_size)] # the i-th element in V0_coords_list is a vector with all components the i-th coord ith_V0_coords = Function(V0).project(as_vector(x_list)) V0_coords_list.append(ith_V0_coords) for i in range(V0.ufl_element().value_size): # slice through V0_coords_list to obtain the coords of the DOFs for that component x_list = [V0_coords[i] for V0_coords in V0_coords_list] # average these to find effective coords in CG1 V0_coords_in_DG1 = Function(vec_DG1).interpolate(as_vector(x_list)) eff_coords_in_CG1 = Function(vec_CG1) eff_coords_averager = Averager(V0_coords_in_DG1, eff_coords_in_CG1) eff_coords_averager.project() # obtain these in DG1 eff_coords_in_DG1 = Function(vec_DG1).interpolate(eff_coords_in_CG1) eff_coords_list.append(correct_eff_coords(eff_coords_in_DG1)) return eff_coords_list else: # find the coordinates at DOFs in V0 vec_V0 = VectorFunctionSpace(mesh, V0.ufl_element()) V0_coords = Function(vec_V0).project(x) # average these to find effective coords in CG1 V0_coords_in_DG1 = Function(vec_DG1).interpolate(V0_coords) eff_coords_in_CG1 = Function(vec_CG1) eff_coords_averager = Averager(V0_coords_in_DG1, eff_coords_in_CG1) eff_coords_averager.project() # obtain these in DG1 eff_coords_in_DG1 = Function(vec_DG1).interpolate(eff_coords_in_CG1) return correct_eff_coords(eff_coords_in_DG1) def correct_eff_coords(eff_coords): """ Corrects the effective coordinates. This corrects the effective coordinates that have been calculated by simply averaging, as they may which will not be correct for periodic meshes. Args: eff_coords (:class:`Function`): the effective coordinates field in the vector equispaced DG1 :class:`FunctionSpace`. """ mesh = eff_coords.function_space().mesh() vec_CG1 = VectorFunctionSpace(mesh, "CG", 1) if vec_CG1.extruded: cell = mesh._base_mesh.ufl_cell().cellname() DG1_hori_elt = FiniteElement("DG", cell, 1, variant="equispaced") DG1_vert_elt = FiniteElement("DG", interval, 1, variant="equispaced") DG1_element = TensorProductElement(DG1_hori_elt, DG1_vert_elt) else: cell = mesh.ufl_cell().cellname() DG1_element = FiniteElement("DG", cell, 1, variant="equispaced") vec_DG1 = VectorFunctionSpace(mesh, DG1_element) x = SpatialCoordinate(mesh) if eff_coords.function_space() != vec_DG1: raise ValueError('eff_coords needs to be in the vector DG1 space') # obtain different coords in DG1 DG1_coords = Function(vec_DG1).interpolate(x) CG1_coords_from_DG1 = Function(vec_CG1) averager = Averager(DG1_coords, CG1_coords_from_DG1) averager.project() DG1_coords_from_averaged_CG1 = Function(vec_DG1).interpolate(CG1_coords_from_DG1) DG1_coords_diff = Function(vec_DG1).interpolate(DG1_coords - DG1_coords_from_averaged_CG1) # interpolate coordinates, adjusting those different coordinates adjusted_coords = Function(vec_DG1) adjusted_coords.interpolate(eff_coords + DG1_coords_diff) return adjusted_coords def find_domain_boundaries(mesh): """ Find the cells on the domain boundaries. Makes a field in the scalar DG0 :class:`FunctionSpace`, whose values are 0 everywhere except for in cells on the boundary of the domain, where the values are 1.0. This allows boundary cells to be identified. Args: mesh (:class:`Mesh`): the mesh. """ DG0 = FunctionSpace(mesh, "DG", 0) CG1 = FunctionSpace(mesh, "CG", 1) on_exterior_DG0 = Function(DG0) on_exterior_CG1 = Function(CG1) # we get values in CG1 initially as DG0 will not work for triangular elements bc_codes = ['on_boundary', 'top', 'bottom'] bcs = [DirichletBC(CG1, Constant(1.0), bc_code) for bc_code in bc_codes] for bc in bcs: try: bc.apply(on_exterior_CG1) except ValueError: pass on_exterior_DG0.interpolate(on_exterior_CG1) return on_exterior_DG0