Source code for gusto.time_discretisation.multi_level_schemes

"""Implements time discretisations with multiple time levels."""

from abc import abstractproperty

from firedrake import (Function, NonlinearVariationalProblem,
                       NonlinearVariationalSolver)
from firedrake.fml import replace_subject, all_terms, drop
from gusto.core.configuration import EmbeddedDGOptions, RecoveryOptions
from gusto.core.labels import time_derivative
from gusto.time_discretisation.time_discretisation import TimeDiscretisation


__all__ = ["BDF2", "Leapfrog", "AdamsMoulton", "AdamsBashforth"]


class MultilevelTimeDiscretisation(TimeDiscretisation):
    """Base class for multi-level timesteppers"""

    def __init__(self, domain, field_name=None, solver_parameters=None,
                 limiter=None, options=None):
        """
        Args:
            domain (:class:`Domain`): the model's domain object, containing the
                mesh and the compatible function spaces.
            field_name (str, optional): name of the field to be evolved.
                Defaults to None.
            solver_parameters (dict, optional): dictionary of parameters to
                pass to the underlying solver. Defaults to None.
            limiter (:class:`Limiter` object, optional): a limiter to apply to
                the evolving field to enforce monotonicity. Defaults to None.
            options (:class:`AdvectionOptions`, optional): an object containing
                options to either be passed to the spatial discretisation, or
                to control the "wrapper" methods, such as Embedded DG or a
                recovery method. Defaults to None.
        """
        if isinstance(options, (EmbeddedDGOptions, RecoveryOptions)):
            raise NotImplementedError("Only SUPG advection options have been implemented for this time discretisation")
        super().__init__(domain=domain, field_name=field_name,
                         solver_parameters=solver_parameters,
                         limiter=limiter, options=options)
        self.initial_timesteps = 0

    @abstractproperty
    def nlevels(self):
        pass

    def setup(self, equation, apply_bcs=True, *active_labels):
        super().setup(equation=equation, apply_bcs=apply_bcs, *active_labels)
        for n in range(self.nlevels, 1, -1):
            setattr(self, "xnm%i" % (n-1), Function(self.fs))


[docs] class BDF2(MultilevelTimeDiscretisation): """ Implements the implicit multistep BDF2 timestepping method. The BDF2 timestepping method for operator F is written as: \n y^(n+1) = (4/3)*y^n - (1/3)*y^(n-1) + (2/3)*dt*F[y^(n+1)] \n """ @property def nlevels(self): return 2 @property def lhs0(self): """Set up the discretisation's left hand side (the time derivative).""" l = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x_out, old_idx=self.idx)) l = l.label_map(lambda t: t.has_label(time_derivative), map_if_false=lambda t: self.dt*t) return l.form @property def rhs0(self): """Set up the time discretisation's right hand side for inital BDF step.""" r = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.x1, old_idx=self.idx), map_if_false=drop) return r.form @property def lhs(self): """Set up the discretisation's left hand side (the time derivative).""" l = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x_out, old_idx=self.idx)) l = l.label_map(lambda t: t.has_label(time_derivative), map_if_false=lambda t: (2/3)*self.dt*t) return l.form @property def rhs(self): """Set up the time discretisation's right hand side for BDF2 steps.""" xn = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.x1, old_idx=self.idx), map_if_false=drop) xnm1 = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.xnm1, old_idx=self.idx), map_if_false=drop) r = (4/3.) * xn - (1/3.) * xnm1 return r.form @property def solver0(self): """Set up the problem and the solver for initial BDF step.""" # setup solver using lhs and rhs defined in derived class problem = NonlinearVariationalProblem(self.lhs0-self.rhs0, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__+"0" return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name) @property def solver(self): """Set up the problem and the solver for BDF2 steps.""" # setup solver using lhs and rhs defined in derived class problem = NonlinearVariationalProblem(self.lhs-self.rhs, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__ return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name)
[docs] def apply(self, x_out, *x_in): """ Apply the time discretisation to advance one whole time step. Args: x_out (:class:`Function`): the output field to be computed. x_in (:class:`Function`): the input field(s). """ if self.initial_timesteps < self.nlevels-1: self.initial_timesteps += 1 solver = self.solver0 else: solver = self.solver self.xnm1.assign(x_in[0]) self.x1.assign(x_in[1]) solver.solve() x_out.assign(self.x_out)
[docs] class Leapfrog(MultilevelTimeDiscretisation): """ Implements the multistep Leapfrog timestepping method. The Leapfrog timestepping method for operator F is written as: \n y^(n+1) = y^(n-1) + 2*dt*F[y^n] """ @property def nlevels(self): return 2 @property def rhs0(self): """Set up the discretisation's right hand side for initial forward euler step.""" r = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x1, old_idx=self.idx)) r = r.label_map(lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.dt*t) return r.form @property def lhs(self): """Set up the discretisation's left hand side (the time derivative).""" return super(Leapfrog, self).lhs @property def rhs(self): """Set up the discretisation's right hand side for leapfrog steps.""" r = self.residual.label_map( lambda t: t.has_label(time_derivative), map_if_false=replace_subject(self.x1, old_idx=self.idx)) r = r.label_map(lambda t: t.has_label(time_derivative), map_if_true=replace_subject(self.xnm1, old_idx=self.idx), map_if_false=lambda t: -2.0*self.dt*t) return r.form @property def solver0(self): """Set up the problem and the solver for initial forward euler step.""" # setup solver using lhs and rhs defined in derived class problem = NonlinearVariationalProblem(self.lhs-self.rhs0, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__+"0" return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name) @property def solver(self): """Set up the problem and the solver for leapfrog steps.""" # setup solver using lhs and rhs defined in derived class problem = NonlinearVariationalProblem(self.lhs-self.rhs, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__ return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name)
[docs] def apply(self, x_out, *x_in): """ Apply the time discretisation to advance one whole time step. Args: x_out (:class:`Function`): the output field to be computed. x_in (:class:`Function`): the input field(s). """ if self.initial_timesteps < self.nlevels-1: self.initial_timesteps += 1 solver = self.solver0 else: solver = self.solver self.xnm1.assign(x_in[0]) self.x1.assign(x_in[1]) solver.solve() x_out.assign(self.x_out)
[docs] class AdamsBashforth(MultilevelTimeDiscretisation): """ Implements the explicit multistep Adams-Bashforth timestepping method of general order up to 5. The general AB timestepping method for operator F is written as: \n y^(n+1) = y^n + dt*(b_0*F[y^(n)] + b_1*F[y^(n-1)] + b_2*F[y^(n-2)] + b_3*F[y^(n-3)] + b_4*F[y^(n-4)]) """ def __init__(self, domain, order, field_name=None, solver_parameters=None, options=None): """ Args: domain (:class:`Domain`): the model's domain object, containing the mesh and the compatible function spaces. field_name (str, optional): name of the field to be evolved. Defaults to None. order (float, optional): order of scheme solver_parameters (dict, optional): dictionary of parameters to pass to the underlying solver. Defaults to None. options (:class:`AdvectionOptions`, optional): an object containing options to either be passed to the spatial discretisation, or to control the "wrapper" methods, such as Embedded DG or a recovery method. Defaults to None. Raises: ValueError: if order is not provided, or is in incorrect range. """ if (order > 5 or order < 1): raise ValueError("Adams-Bashforth of order greater than 5 not implemented") if isinstance(options, (EmbeddedDGOptions, RecoveryOptions)): raise NotImplementedError("Only SUPG advection options have been implemented for this time discretisation") super().__init__(domain, field_name, solver_parameters=solver_parameters, options=options) self.order = order
[docs] def setup(self, equation, apply_bcs=True, *active_labels): super().setup(equation=equation, apply_bcs=apply_bcs, *active_labels) self.x = [Function(self.fs) for i in range(self.nlevels)] if (self.order == 1): self.b = [1.0] elif (self.order == 2): self.b = [-(1.0/2.0), (3.0/2.0)] elif (self.order == 3): self.b = [(5.0)/(12.0), -(16.0)/(12.0), (23.0)/(12.0)] elif (self.order == 4): self.b = [-(9.0)/(24.0), (37.0)/(24.0), -(59.0)/(24.0), (55.0)/(24.0)] elif (self.order == 5): self.b = [(251.0)/(720.0), -(1274.0)/(720.0), (2616.0)/(720.0), -(2774.0)/(720.0), (2901.0)/(720.0)]
@property def nlevels(self): return self.order @property def rhs0(self): """Set up the discretisation's right hand side for initial forward euler step.""" r = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x[-1], old_idx=self.idx)) r = r.label_map(lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.dt*t) return r.form @property def lhs(self): """Set up the discretisation's left hand side (the time derivative).""" return super(AdamsBashforth, self).lhs @property def rhs(self): """Set up the discretisation's right hand side for Adams Bashforth steps.""" r = self.residual.label_map(all_terms, map_if_true=replace_subject(self.x[-1], old_idx=self.idx)) r = r.label_map(lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.b[-1]*self.dt*t) for n in range(self.nlevels-1): rtemp = self.residual.label_map(lambda t: t.has_label(time_derivative), map_if_true=drop, map_if_false=replace_subject(self.x[n], old_idx=self.idx)) rtemp = rtemp.label_map(lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.dt*self.b[n]*t) r += rtemp return r.form @property def solver0(self): """Set up the problem and the solverfor initial forward euler step.""" # setup solver using lhs and rhs defined in derived class problem = NonlinearVariationalProblem(self.lhs-self.rhs0, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__+"0" return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name) @property def solver(self): """Set up the problem and the solver for Adams Bashforth steps.""" # setup solver using lhs and rhs defined in derived class problem = NonlinearVariationalProblem(self.lhs-self.rhs, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__ return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name)
[docs] def apply(self, x_out, *x_in): """ Apply the time discretisation to advance one whole time step. Args: x_out (:class:`Function`): the output field to be computed. x_in (:class:`Function`): the input field(s). """ if self.initial_timesteps < self.nlevels-1: self.initial_timesteps += 1 solver = self.solver0 else: solver = self.solver for n in range(self.nlevels): self.x[n].assign(x_in[n]) solver.solve() x_out.assign(self.x_out)
[docs] class AdamsMoulton(MultilevelTimeDiscretisation): """ Implements the implicit multistep Adams-Moulton timestepping method of general order up to 5 The general AM timestepping method for operator F is written as \n y^(n+1) = y^n + dt*(b_0*F[y^(n+1)] + b_1*F[y^(n)] + b_2*F[y^(n-1)] + b_3*F[y^(n-2)]) """ def __init__(self, domain, order, field_name=None, solver_parameters=None, options=None): """ Args: domain (:class:`Domain`): the model's domain object, containing the mesh and the compatible function spaces. field_name (str, optional): name of the field to be evolved. Defaults to None. order (float, optional): order of scheme solver_parameters (dict, optional): dictionary of parameters to pass to the underlying solver. Defaults to None. options (:class:`AdvectionOptions`, optional): an object containing options to either be passed to the spatial discretisation, or to control the "wrapper" methods, such as Embedded DG or a recovery method. Defaults to None. Raises: ValueError: if order is not provided, or is in incorrect range. """ if (order > 4 or order < 1): raise ValueError("Adams-Moulton of order greater than 5 not implemented") if isinstance(options, (EmbeddedDGOptions, RecoveryOptions)): raise NotImplementedError("Only SUPG advection options have been implemented for this time discretisation") if not solver_parameters: solver_parameters = {'ksp_type': 'gmres', 'pc_type': 'bjacobi', 'sub_pc_type': 'ilu'} super().__init__(domain, field_name, solver_parameters=solver_parameters, options=options) self.order = order
[docs] def setup(self, equation, apply_bcs=True, *active_labels): super().setup(equation=equation, apply_bcs=apply_bcs, *active_labels) self.x = [Function(self.fs) for i in range(self.nlevels)] if (self.order == 1): self.bl = (1.0/2.0) self.br = [(1.0/2.0)] elif (self.order == 2): self.bl = (5.0/12.0) self.br = [-(1.0/12.0), (8.0/12.0)] elif (self.order == 3): self.bl = (9.0/24.0) self.br = [(1.0/24.0), -(5.0/24.0), (19.0/24.0)] elif (self.order == 4): self.bl = (251.0/720.0) self.br = [-(19.0/720.0), (106.0/720.0), -(254.0/720.0), (646.0/720.0)]
@property def nlevels(self): return self.order @property def rhs0(self): """ Set up the discretisation's right hand side for initial trapezoidal step. """ r = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x[-1], old_idx=self.idx)) r = r.label_map(lambda t: t.has_label(time_derivative), map_if_false=lambda t: -0.5*self.dt*t) return r.form @property def lhs0(self): """ Set up the time discretisation's right hand side for initial trapezoidal step. """ l = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x_out, old_idx=self.idx)) l = l.label_map(lambda t: t.has_label(time_derivative), map_if_false=lambda t: 0.5*self.dt*t) return l.form @property def lhs(self): """Set up the time discretisation's right hand side for Adams Moulton steps.""" l = self.residual.label_map( all_terms, map_if_true=replace_subject(self.x_out, old_idx=self.idx)) l = l.label_map(lambda t: t.has_label(time_derivative), map_if_false=lambda t: self.bl*self.dt*t) return l.form @property def rhs(self): """Set up the discretisation's right hand side for Adams Moulton steps.""" r = self.residual.label_map(all_terms, map_if_true=replace_subject(self.x[-1], old_idx=self.idx)) r = r.label_map(lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.br[-1]*self.dt*t) for n in range(self.nlevels-1): rtemp = self.residual.label_map(lambda t: t.has_label(time_derivative), map_if_true=drop, map_if_false=replace_subject(self.x[n], old_idx=self.idx)) rtemp = rtemp.label_map(lambda t: t.has_label(time_derivative), map_if_false=lambda t: -self.dt*self.br[n]*t) r += rtemp return r.form @property def solver0(self): """Set up the problem and the solver for initial trapezoidal step.""" # setup solver using lhs and rhs defined in derived class problem = NonlinearVariationalProblem(self.lhs0-self.rhs0, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__+"0" return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name) @property def solver(self): """Set up the problem and the solver for Adams Moulton steps.""" # setup solver using lhs and rhs defined in derived class problem = NonlinearVariationalProblem(self.lhs-self.rhs, self.x_out, bcs=self.bcs) solver_name = self.field_name+self.__class__.__name__ return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters, options_prefix=solver_name)
[docs] def apply(self, x_out, *x_in): """ Apply the time discretisation to advance one whole time step. Args: x_out (:class:`Function`): the output field to be computed. x_in (:class:`Function`): the input field(s). """ if self.initial_timesteps < self.nlevels-1: self.initial_timesteps += 1 solver = self.solver0 else: solver = self.solver for n in range(self.nlevels): self.x[n].assign(x_in[n]) solver.solve() x_out.assign(self.x_out)