Source code for gusto.time_discretisation.multi_level_schemes

"""Implements time discretisations with multiple time levels."""

from abc import abstractproperty

from firedrake import (Function, NonlinearVariationalProblem,
                       NonlinearVariationalSolver)
from firedrake.fml import replace_subject, all_terms, drop
from gusto.core.configuration import EmbeddedDGOptions, RecoveryOptions
from gusto.core.labels import time_derivative, source_label
from gusto.time_discretisation.time_discretisation import TimeDiscretisation


__all__ = ["BDF2", "Leapfrog", "AdamsMoulton", "AdamsBashforth"]


class MultilevelTimeDiscretisation(TimeDiscretisation):
    """Base class for multi-level timesteppers"""

    def __init__(self, domain, field_name=None, solver_parameters=None,
                 limiter=None, options=None, augmentation=None):
        """
        Args:
            domain (:class:`Domain`): the model's domain object, containing the
                mesh and the compatible function spaces.
            field_name (str, optional): name of the field to be evolved.
                Defaults to None.
            solver_parameters (dict, optional): dictionary of parameters to
                pass to the underlying solver. Defaults to None.
            limiter (:class:`Limiter` object, optional): a limiter to apply to
                the evolving field to enforce monotonicity. Defaults to None.
            options (:class:`AdvectionOptions`, optional): an object containing
                options to either be passed to the spatial discretisation, or
                to control the "wrapper" methods, such as Embedded DG or a
                recovery method. Defaults to None.
            augmentation (:class:`Augmentation`): allows the equation solved in
                this time discretisation to be augmented, for instances with
                extra terms of another auxiliary variable. Defaults to None.
        """
        if isinstance(options, (EmbeddedDGOptions, RecoveryOptions)):
            raise NotImplementedError("Only SUPG advection options have been implemented for this time discretisation")
        super().__init__(domain=domain, field_name=field_name,
                         solver_parameters=solver_parameters,
                         limiter=limiter, options=options,
                         augmentation=augmentation)
        self.initial_timesteps = 0

    @abstractproperty
    def nlevels(self):
        pass

    def setup(self, equation, apply_bcs=True, *active_labels):
        super().setup(equation=equation, apply_bcs=apply_bcs, *active_labels)
        for n in range(self.nlevels, 1, -1):
            setattr(self, "xnm%i" % (n-1), Function(self.fs))

        # Check that we do not have source terms
        for t in self.residual:
            if (t.has_label(source_label)):
                raise NotImplementedError("Source terms have not been implemented with multilevel schemes")


[docs] class BDF2(MultilevelTimeDiscretisation): """ Implements the implicit multistep BDF2 timestepping method. The BDF2 timestepping method for operator F is written as: \n y^(n+1) = (4/3)*y^n - (1/3)*y^(n-1) + (2/3)*dt*F[y^(n+1)] \n """ @property def nlevels(self): return 2 @property def res0(self): """Set up the discretisation's residual for initial BDF step.""" residual = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x_out, old_idx=self.idx) ) residual = residual.label_map( lambda t: t.has_label(time_derivative), map_if_false=lambda t: self.dt * t ) r = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.x1, old_idx=self.idx), map_if_false=drop ) residual -= r return residual.form @property def res(self): """Set up the discretisation's residual.""" residual = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x_out, old_idx=self.idx) ) residual = residual.label_map( lambda t: t.has_label(time_derivative), map_if_false=lambda t: (2 / 3) * self.dt * t ) xn = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.x1, old_idx=self.idx), map_if_false=drop ) xnm1 = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.xnm1, old_idx=self.idx), map_if_false=drop ) residual -= (4 / 3) * xn - (1 / 3) * xnm1 return residual.form @property def solver0(self): """Set up the problem and the solver for initial BDF step.""" # setup solver using residual (res) defined in derived class problem = NonlinearVariationalProblem(self.res0, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__+"0" return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name) @property def solver(self): """Set up the problem and the solver for BDF2 steps.""" # setup solver using residual (res) defined in derived class problem = NonlinearVariationalProblem(self.res, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__ return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name)
[docs] def apply(self, x_out, *x_in): """ Apply the time discretisation to advance one whole time step. Args: x_out (:class:`Function`): the output field to be computed. x_in (:class:`Function`): the input field(s). """ if self.initial_timesteps < self.nlevels-1: self.initial_timesteps += 1 solver = self.solver0 else: solver = self.solver self.xnm1.assign(x_in[0]) self.x1.assign(x_in[1]) # Set initial solver guess self.x_out.assign(x_in[1]) solver.solve() x_out.assign(self.x_out)
[docs] class Leapfrog(MultilevelTimeDiscretisation): """ Implements the multistep Leapfrog timestepping method. The Leapfrog timestepping method for operator F is written as: \n y^(n+1) = y^(n-1) + 2*dt*F[y^n] """ @property def nlevels(self): return 2 @property def res0(self): """Set up the discretisation's residual for initial forward euler step.""" residual = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.x_out, old_idx=self.idx), map_if_false=drop ) r = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x1, old_idx=self.idx) ) r = r.label_map( lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.dt * t ) residual -= r return residual.form @property def rhs(self): """Set up the discretisation's residual for leapfrog steps.""" residual = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.x_out, old_idx=self.idx), map_if_false=drop ) r = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_false=replace_subject(self.x1, old_idx=self.idx) ) r = r.label_map( lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.xnm1, old_idx=self.idx), map_if_false=lambda t: -2.0 * self.dt * t ) residual -= r return residual.form @property def solver0(self): """Set up the problem and the solver for initial forward euler step.""" # setup solver using residual (res) defined in derived class problem = NonlinearVariationalProblem(self.res0, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__+"0" return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name) @property def solver(self): """Set up the problem and the solver for leapfrog steps.""" # setup solver using residual (res) defined in derived class problem = NonlinearVariationalProblem(self.res, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__ return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name)
[docs] def apply(self, x_out, *x_in): """ Apply the time discretisation to advance one whole time step. Args: x_out (:class:`Function`): the output field to be computed. x_in (:class:`Function`): the input field(s). """ if self.initial_timesteps < self.nlevels-1: self.initial_timesteps += 1 solver = self.solver0 else: solver = self.solver self.xnm1.assign(x_in[0]) self.x1.assign(x_in[1]) # Set initial solver guess self.x_out.assign(x_in[1]) solver.solve() x_out.assign(self.x_out)
[docs] class AdamsBashforth(MultilevelTimeDiscretisation): """ Implements the explicit multistep Adams-Bashforth timestepping method of general order up to 5. The general AB timestepping method for operator F is written as: \n y^(n+1) = y^n + dt*(b_0*F[y^(n)] + b_1*F[y^(n-1)] + b_2*F[y^(n-2)] + b_3*F[y^(n-3)] + b_4*F[y^(n-4)]) """ def __init__(self, domain, order, field_name=None, solver_parameters=None, options=None, augmentation=None): """ Args: domain (:class:`Domain`): the model's domain object, containing the mesh and the compatible function spaces. field_name (str, optional): name of the field to be evolved. Defaults to None. order (float, optional): order of scheme solver_parameters (dict, optional): dictionary of parameters to pass to the underlying solver. Defaults to None. options (:class:`AdvectionOptions`, optional): an object containing options to either be passed to the spatial discretisation, or to control the "wrapper" methods, such as Embedded DG or a recovery method. Defaults to None. augmentation (:class:`Augmentation`): allows the equation solved in this time discretisation to be augmented, for instances with extra terms of another auxiliary variable. Defaults to None. Raises: ValueError: if order is not provided, or is in incorrect range. """ if (order > 5 or order < 1): raise ValueError("Adams-Bashforth of order greater than 5 not implemented") if isinstance(options, (EmbeddedDGOptions, RecoveryOptions)): raise NotImplementedError("Only SUPG advection options have been implemented for this time discretisation") super().__init__(domain, field_name, solver_parameters=solver_parameters, options=options, augmentation=augmentation) self.order = order
[docs] def setup(self, equation, apply_bcs=True, *active_labels): super().setup(equation=equation, apply_bcs=apply_bcs, *active_labels) self.x = [Function(self.fs) for i in range(self.nlevels)] if (self.order == 1): self.b = [1.0] elif (self.order == 2): self.b = [-(1.0/2.0), (3.0/2.0)] elif (self.order == 3): self.b = [(5.0)/(12.0), -(16.0)/(12.0), (23.0)/(12.0)] elif (self.order == 4): self.b = [-(9.0)/(24.0), (37.0)/(24.0), -(59.0)/(24.0), (55.0)/(24.0)] elif (self.order == 5): self.b = [(251.0)/(720.0), -(1274.0)/(720.0), (2616.0)/(720.0), -(2774.0)/(720.0), (2901.0)/(720.0)]
@property def nlevels(self): return self.order @property def res0(self): """Set up the discretisation's residual for initial forward euler step.""" residual = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.x_out, old_idx=self.idx), map_if_false=drop ) r = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x[-1], old_idx=self.idx) ) r = r.label_map( lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.dt * t ) residual -= r return residual.form @property def res(self): """Set up the discretisation's residual for Adams Bashforth steps.""" residual = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.x_out, old_idx=self.idx), map_if_false=drop ) r = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x[-1], old_idx=self.idx) ) residual -= r.label_map( lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.b[-1] * self.dt * t ) for n in range(self.nlevels - 1): rtemp = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=drop, map_if_false=replace_subject(self.x[n], old_idx=self.idx) ) rtemp = rtemp.label_map( lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.dt * self.b[n] * t ) residual -= rtemp return residual.form @property def solver0(self): """Set up the problem and the solverfor initial forward euler step.""" # setup solver using residual (res) defined in derived class problem = NonlinearVariationalProblem(self.res0, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__+"0" return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name) @property def solver(self): """Set up the problem and the solver for Adams Bashforth steps.""" # setup solver using residual (res) defined in derived class problem = NonlinearVariationalProblem(self.res, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__ return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name)
[docs] def apply(self, x_out, *x_in): """ Apply the time discretisation to advance one whole time step. Args: x_out (:class:`Function`): the output field to be computed. x_in (:class:`Function`): the input field(s). """ if self.initial_timesteps < self.nlevels-1: self.initial_timesteps += 1 solver = self.solver0 else: solver = self.solver for n in range(self.nlevels): self.x[n].assign(x_in[n]) # Set initial solver guess self.x_out.assign(x_in[-1]) solver.solve() x_out.assign(self.x_out)
[docs] class AdamsMoulton(MultilevelTimeDiscretisation): """ Implements the implicit multistep Adams-Moulton timestepping method of general order up to 5 The general AM timestepping method for operator F is written as \n y^(n+1) = y^n + dt*(b_0*F[y^(n+1)] + b_1*F[y^(n)] + b_2*F[y^(n-1)] + b_3*F[y^(n-2)]) """ def __init__(self, domain, order, field_name=None, solver_parameters=None, options=None, augmentation=None): """ Args: domain (:class:`Domain`): the model's domain object, containing the mesh and the compatible function spaces. field_name (str, optional): name of the field to be evolved. Defaults to None. order (float, optional): order of scheme solver_parameters (dict, optional): dictionary of parameters to pass to the underlying solver. Defaults to None. options (:class:`AdvectionOptions`, optional): an object containing options to either be passed to the spatial discretisation, or to control the "wrapper" methods, such as Embedded DG or a recovery method. Defaults to None. augmentation (:class:`Augmentation`): allows the equation solved in this time discretisation to be augmented, for instances with extra terms of another auxiliary variable. Defaults to None. Raises: ValueError: if order is not provided, or is in incorrect range. """ if (order > 4 or order < 1): raise ValueError("Adams-Moulton of order greater than 5 not implemented") if isinstance(options, (EmbeddedDGOptions, RecoveryOptions)): raise NotImplementedError("Only SUPG advection options have been implemented for this time discretisation") if not solver_parameters: solver_parameters = {'ksp_type': 'gmres', 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'} super().__init__(domain, field_name, solver_parameters=solver_parameters, options=options, augmentation=augmentation) self.order = order
[docs] def setup(self, equation, apply_bcs=True, *active_labels): super().setup(equation=equation, apply_bcs=apply_bcs, *active_labels) self.x = [Function(self.fs) for i in range(self.nlevels)] if (self.order == 1): self.bl = (1.0/2.0) self.br = [(1.0/2.0)] elif (self.order == 2): self.bl = (5.0/12.0) self.br = [-(1.0/12.0), (8.0/12.0)] elif (self.order == 3): self.bl = (9.0/24.0) self.br = [(1.0/24.0), -(5.0/24.0), (19.0/24.0)] elif (self.order == 4): self.bl = (251.0/720.0) self.br = [-(19.0/720.0), (106.0/720.0), -(254.0/720.0), (646.0/720.0)]
@property def nlevels(self): return self.order @property def res0(self): """ Set up the discretisation's residual for initial trapezoidal step. """ residual = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x_out, old_idx=self.idx) ) residual = residual.label_map( lambda t: t.has_label(time_derivative), map_if_false=lambda t: 0.5 * self.dt * t ) r = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x[-1], old_idx=self.idx) ) r = r.label_map( lambda t: t.has_label(time_derivative), map_if_false=lambda t: -0.5 * self.dt * t ) residual -= r return residual.form @property def res(self): """Set up the time discretisation's residual for Adams Moulton steps.""" residual = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x_out, old_idx=self.idx) ) residual = residual.label_map( lambda t: t.has_label(time_derivative), map_if_false=lambda t: self.bl * self.dt * t ) r = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x[-1], old_idx=self.idx) ) residual -= r.label_map( lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.br[-1] * self.dt * t ) for n in range(self.nlevels - 1): rtemp = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=drop, map_if_false=replace_subject(self.x[n], old_idx=self.idx) ) rtemp = rtemp.label_map( lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.dt * self.br[n] * t ) residual -= rtemp return residual.form @property def solver0(self): """Set up the problem and the solver for initial trapezoidal step.""" # setup solver using residual (res) defined in derived class problem = NonlinearVariationalProblem(self.res0, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__+"0" return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name) @property def solver(self): """Set up the problem and the solver for Adams Moulton steps.""" # setup solver using residual (res) defined in derived class problem = NonlinearVariationalProblem(self.res, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__ return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name)
[docs] def apply(self, x_out, *x_in): """ Apply the time discretisation to advance one whole time step. Args: x_out (:class:`Function`): the output field to be computed. x_in (:class:`Function`): the input field(s). """ if self.initial_timesteps < self.nlevels-1: self.initial_timesteps += 1 solver = self.solver0 else: solver = self.solver for n in range(self.nlevels): self.x[n].assign(x_in[n]) # Set initial solver guess self.x_out.assign(x_in[-1]) solver.solve() x_out.assign(self.x_out)