"""Implements time discretisations with multiple time levels."""
from abc import abstractproperty
from firedrake import (Function, NonlinearVariationalProblem,
NonlinearVariationalSolver)
from firedrake.fml import replace_subject, all_terms, drop
from gusto.core.configuration import EmbeddedDGOptions, RecoveryOptions
from gusto.core.labels import time_derivative
from gusto.time_discretisation.time_discretisation import TimeDiscretisation
__all__ = ["BDF2", "Leapfrog", "AdamsMoulton", "AdamsBashforth"]
class MultilevelTimeDiscretisation(TimeDiscretisation):
"""Base class for multi-level timesteppers"""
def __init__(self, domain, field_name=None, solver_parameters=None,
limiter=None, options=None):
"""
Args:
domain (:class:`Domain`): the model's domain object, containing the
mesh and the compatible function spaces.
field_name (str, optional): name of the field to be evolved.
Defaults to None.
solver_parameters (dict, optional): dictionary of parameters to
pass to the underlying solver. Defaults to None.
limiter (:class:`Limiter` object, optional): a limiter to apply to
the evolving field to enforce monotonicity. Defaults to None.
options (:class:`AdvectionOptions`, optional): an object containing
options to either be passed to the spatial discretisation, or
to control the "wrapper" methods, such as Embedded DG or a
recovery method. Defaults to None.
"""
if isinstance(options, (EmbeddedDGOptions, RecoveryOptions)):
raise NotImplementedError("Only SUPG advection options have been implemented for this time discretisation")
super().__init__(domain=domain, field_name=field_name,
solver_parameters=solver_parameters,
limiter=limiter, options=options)
self.initial_timesteps = 0
@abstractproperty
def nlevels(self):
pass
def setup(self, equation, apply_bcs=True, *active_labels):
super().setup(equation=equation, apply_bcs=apply_bcs, *active_labels)
for n in range(self.nlevels, 1, -1):
setattr(self, "xnm%i" % (n-1), Function(self.fs))
[docs]
class BDF2(MultilevelTimeDiscretisation):
"""
Implements the implicit multistep BDF2 timestepping method.
The BDF2 timestepping method for operator F is written as: \n
y^(n+1) = (4/3)*y^n - (1/3)*y^(n-1) + (2/3)*dt*F[y^(n+1)] \n
"""
@property
def nlevels(self):
return 2
@property
def lhs0(self):
"""Set up the discretisation's left hand side (the time derivative)."""
l = self.residual.label_map(
all_terms,
map_if_true=replace_subject(self.x_out, old_idx=self.idx))
l = l.label_map(lambda t: t.has_label(time_derivative),
map_if_false=lambda t: self.dt*t)
return l.form
@property
def rhs0(self):
"""Set up the time discretisation's right hand side for inital BDF step."""
r = self.residual.label_map(
lambda t: t.has_label(time_derivative),
map_if_true=replace_subject(self.x1, old_idx=self.idx),
map_if_false=drop)
return r.form
@property
def lhs(self):
"""Set up the discretisation's left hand side (the time derivative)."""
l = self.residual.label_map(
all_terms,
map_if_true=replace_subject(self.x_out, old_idx=self.idx))
l = l.label_map(lambda t: t.has_label(time_derivative),
map_if_false=lambda t: (2/3)*self.dt*t)
return l.form
@property
def rhs(self):
"""Set up the time discretisation's right hand side for BDF2 steps."""
xn = self.residual.label_map(
lambda t: t.has_label(time_derivative),
map_if_true=replace_subject(self.x1, old_idx=self.idx),
map_if_false=drop)
xnm1 = self.residual.label_map(
lambda t: t.has_label(time_derivative),
map_if_true=replace_subject(self.xnm1, old_idx=self.idx),
map_if_false=drop)
r = (4/3.) * xn - (1/3.) * xnm1
return r.form
@property
def solver0(self):
"""Set up the problem and the solver for initial BDF step."""
# setup solver using lhs and rhs defined in derived class
problem = NonlinearVariationalProblem(self.lhs0-self.rhs0, self.x_out, bcs=self.bcs)
solver_name = self.field_name+self.__class__.__name__+"0"
return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters,
options_prefix=solver_name)
@property
def solver(self):
"""Set up the problem and the solver for BDF2 steps."""
# setup solver using lhs and rhs defined in derived class
problem = NonlinearVariationalProblem(self.lhs-self.rhs, self.x_out, bcs=self.bcs)
solver_name = self.field_name+self.__class__.__name__
return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters,
options_prefix=solver_name)
[docs]
def apply(self, x_out, *x_in):
"""
Apply the time discretisation to advance one whole time step.
Args:
x_out (:class:`Function`): the output field to be computed.
x_in (:class:`Function`): the input field(s).
"""
if self.initial_timesteps < self.nlevels-1:
self.initial_timesteps += 1
solver = self.solver0
else:
solver = self.solver
self.xnm1.assign(x_in[0])
self.x1.assign(x_in[1])
solver.solve()
x_out.assign(self.x_out)
[docs]
class Leapfrog(MultilevelTimeDiscretisation):
"""
Implements the multistep Leapfrog timestepping method.
The Leapfrog timestepping method for operator F is written as: \n
y^(n+1) = y^(n-1) + 2*dt*F[y^n]
"""
@property
def nlevels(self):
return 2
@property
def rhs0(self):
"""Set up the discretisation's right hand side for initial forward euler step."""
r = self.residual.label_map(
all_terms,
map_if_true=replace_subject(self.x1, old_idx=self.idx))
r = r.label_map(lambda t: t.has_label(time_derivative),
map_if_false=lambda t: -self.dt*t)
return r.form
@property
def lhs(self):
"""Set up the discretisation's left hand side (the time derivative)."""
return super(Leapfrog, self).lhs
@property
def rhs(self):
"""Set up the discretisation's right hand side for leapfrog steps."""
r = self.residual.label_map(
lambda t: t.has_label(time_derivative),
map_if_false=replace_subject(self.x1, old_idx=self.idx))
r = r.label_map(lambda t: t.has_label(time_derivative),
map_if_true=replace_subject(self.xnm1, old_idx=self.idx),
map_if_false=lambda t: -2.0*self.dt*t)
return r.form
@property
def solver0(self):
"""Set up the problem and the solver for initial forward euler step."""
# setup solver using lhs and rhs defined in derived class
problem = NonlinearVariationalProblem(self.lhs-self.rhs0, self.x_out, bcs=self.bcs)
solver_name = self.field_name+self.__class__.__name__+"0"
return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters,
options_prefix=solver_name)
@property
def solver(self):
"""Set up the problem and the solver for leapfrog steps."""
# setup solver using lhs and rhs defined in derived class
problem = NonlinearVariationalProblem(self.lhs-self.rhs, self.x_out, bcs=self.bcs)
solver_name = self.field_name+self.__class__.__name__
return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters,
options_prefix=solver_name)
[docs]
def apply(self, x_out, *x_in):
"""
Apply the time discretisation to advance one whole time step.
Args:
x_out (:class:`Function`): the output field to be computed.
x_in (:class:`Function`): the input field(s).
"""
if self.initial_timesteps < self.nlevels-1:
self.initial_timesteps += 1
solver = self.solver0
else:
solver = self.solver
self.xnm1.assign(x_in[0])
self.x1.assign(x_in[1])
solver.solve()
x_out.assign(self.x_out)
[docs]
class AdamsBashforth(MultilevelTimeDiscretisation):
"""
Implements the explicit multistep Adams-Bashforth timestepping
method of general order up to 5.
The general AB timestepping method for operator F is written as: \n
y^(n+1) = y^n + dt*(b_0*F[y^(n)] + b_1*F[y^(n-1)] + b_2*F[y^(n-2)] + b_3*F[y^(n-3)] + b_4*F[y^(n-4)])
"""
def __init__(self, domain, order, field_name=None,
solver_parameters=None, options=None):
"""
Args:
domain (:class:`Domain`): the model's domain object, containing the
mesh and the compatible function spaces.
field_name (str, optional): name of the field to be evolved.
Defaults to None.
order (float, optional): order of scheme
solver_parameters (dict, optional): dictionary of parameters to
pass to the underlying solver. Defaults to None.
options (:class:`AdvectionOptions`, optional): an object containing
options to either be passed to the spatial discretisation, or
to control the "wrapper" methods, such as Embedded DG or a
recovery method. Defaults to None.
Raises:
ValueError: if order is not provided, or is in incorrect range.
"""
if (order > 5 or order < 1):
raise ValueError("Adams-Bashforth of order greater than 5 not implemented")
if isinstance(options, (EmbeddedDGOptions, RecoveryOptions)):
raise NotImplementedError("Only SUPG advection options have been implemented for this time discretisation")
super().__init__(domain, field_name,
solver_parameters=solver_parameters,
options=options)
self.order = order
[docs]
def setup(self, equation, apply_bcs=True, *active_labels):
super().setup(equation=equation, apply_bcs=apply_bcs,
*active_labels)
self.x = [Function(self.fs) for i in range(self.nlevels)]
if (self.order == 1):
self.b = [1.0]
elif (self.order == 2):
self.b = [-(1.0/2.0), (3.0/2.0)]
elif (self.order == 3):
self.b = [(5.0)/(12.0), -(16.0)/(12.0), (23.0)/(12.0)]
elif (self.order == 4):
self.b = [-(9.0)/(24.0), (37.0)/(24.0), -(59.0)/(24.0), (55.0)/(24.0)]
elif (self.order == 5):
self.b = [(251.0)/(720.0), -(1274.0)/(720.0), (2616.0)/(720.0),
-(2774.0)/(720.0), (2901.0)/(720.0)]
@property
def nlevels(self):
return self.order
@property
def rhs0(self):
"""Set up the discretisation's right hand side for initial forward euler step."""
r = self.residual.label_map(
all_terms,
map_if_true=replace_subject(self.x[-1], old_idx=self.idx))
r = r.label_map(lambda t: t.has_label(time_derivative),
map_if_false=lambda t: -self.dt*t)
return r.form
@property
def lhs(self):
"""Set up the discretisation's left hand side (the time derivative)."""
return super(AdamsBashforth, self).lhs
@property
def rhs(self):
"""Set up the discretisation's right hand side for Adams Bashforth steps."""
r = self.residual.label_map(all_terms,
map_if_true=replace_subject(self.x[-1], old_idx=self.idx))
r = r.label_map(lambda t: t.has_label(time_derivative),
map_if_false=lambda t: -self.b[-1]*self.dt*t)
for n in range(self.nlevels-1):
rtemp = self.residual.label_map(lambda t: t.has_label(time_derivative),
map_if_true=drop,
map_if_false=replace_subject(self.x[n], old_idx=self.idx))
rtemp = rtemp.label_map(lambda t: t.has_label(time_derivative),
map_if_false=lambda t: -self.dt*self.b[n]*t)
r += rtemp
return r.form
@property
def solver0(self):
"""Set up the problem and the solverfor initial forward euler step."""
# setup solver using lhs and rhs defined in derived class
problem = NonlinearVariationalProblem(self.lhs-self.rhs0, self.x_out, bcs=self.bcs)
solver_name = self.field_name+self.__class__.__name__+"0"
return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters,
options_prefix=solver_name)
@property
def solver(self):
"""Set up the problem and the solver for Adams Bashforth steps."""
# setup solver using lhs and rhs defined in derived class
problem = NonlinearVariationalProblem(self.lhs-self.rhs, self.x_out, bcs=self.bcs)
solver_name = self.field_name+self.__class__.__name__
return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters,
options_prefix=solver_name)
[docs]
def apply(self, x_out, *x_in):
"""
Apply the time discretisation to advance one whole time step.
Args:
x_out (:class:`Function`): the output field to be computed.
x_in (:class:`Function`): the input field(s).
"""
if self.initial_timesteps < self.nlevels-1:
self.initial_timesteps += 1
solver = self.solver0
else:
solver = self.solver
for n in range(self.nlevels):
self.x[n].assign(x_in[n])
solver.solve()
x_out.assign(self.x_out)
[docs]
class AdamsMoulton(MultilevelTimeDiscretisation):
"""
Implements the implicit multistep Adams-Moulton
timestepping method of general order up to 5
The general AM timestepping method for operator F is written as \n
y^(n+1) = y^n + dt*(b_0*F[y^(n+1)] + b_1*F[y^(n)] + b_2*F[y^(n-1)] + b_3*F[y^(n-2)])
"""
def __init__(self, domain, order, field_name=None,
solver_parameters=None, options=None):
"""
Args:
domain (:class:`Domain`): the model's domain object, containing the
mesh and the compatible function spaces.
field_name (str, optional): name of the field to be evolved.
Defaults to None.
order (float, optional): order of scheme
solver_parameters (dict, optional): dictionary of parameters to
pass to the underlying solver. Defaults to None.
options (:class:`AdvectionOptions`, optional): an object containing
options to either be passed to the spatial discretisation, or
to control the "wrapper" methods, such as Embedded DG or a
recovery method. Defaults to None.
Raises:
ValueError: if order is not provided, or is in incorrect range.
"""
if (order > 4 or order < 1):
raise ValueError("Adams-Moulton of order greater than 5 not implemented")
if isinstance(options, (EmbeddedDGOptions, RecoveryOptions)):
raise NotImplementedError("Only SUPG advection options have been implemented for this time discretisation")
if not solver_parameters:
solver_parameters = {'ksp_type': 'gmres',
'pc_type': 'bjacobi',
'sub_pc_type': 'ilu'}
super().__init__(domain, field_name,
solver_parameters=solver_parameters,
options=options)
self.order = order
[docs]
def setup(self, equation, apply_bcs=True, *active_labels):
super().setup(equation=equation, apply_bcs=apply_bcs, *active_labels)
self.x = [Function(self.fs) for i in range(self.nlevels)]
if (self.order == 1):
self.bl = (1.0/2.0)
self.br = [(1.0/2.0)]
elif (self.order == 2):
self.bl = (5.0/12.0)
self.br = [-(1.0/12.0), (8.0/12.0)]
elif (self.order == 3):
self.bl = (9.0/24.0)
self.br = [(1.0/24.0), -(5.0/24.0), (19.0/24.0)]
elif (self.order == 4):
self.bl = (251.0/720.0)
self.br = [-(19.0/720.0), (106.0/720.0), -(254.0/720.0), (646.0/720.0)]
@property
def nlevels(self):
return self.order
@property
def rhs0(self):
"""
Set up the discretisation's right hand side for initial trapezoidal
step.
"""
r = self.residual.label_map(
all_terms,
map_if_true=replace_subject(self.x[-1], old_idx=self.idx))
r = r.label_map(lambda t: t.has_label(time_derivative),
map_if_false=lambda t: -0.5*self.dt*t)
return r.form
@property
def lhs0(self):
"""
Set up the time discretisation's right hand side for initial
trapezoidal step.
"""
l = self.residual.label_map(
all_terms,
map_if_true=replace_subject(self.x_out, old_idx=self.idx))
l = l.label_map(lambda t: t.has_label(time_derivative),
map_if_false=lambda t: 0.5*self.dt*t)
return l.form
@property
def lhs(self):
"""Set up the time discretisation's right hand side for Adams Moulton steps."""
l = self.residual.label_map(
all_terms,
map_if_true=replace_subject(self.x_out, old_idx=self.idx))
l = l.label_map(lambda t: t.has_label(time_derivative),
map_if_false=lambda t: self.bl*self.dt*t)
return l.form
@property
def rhs(self):
"""Set up the discretisation's right hand side for Adams Moulton steps."""
r = self.residual.label_map(all_terms,
map_if_true=replace_subject(self.x[-1], old_idx=self.idx))
r = r.label_map(lambda t: t.has_label(time_derivative),
map_if_false=lambda t: -self.br[-1]*self.dt*t)
for n in range(self.nlevels-1):
rtemp = self.residual.label_map(lambda t: t.has_label(time_derivative),
map_if_true=drop,
map_if_false=replace_subject(self.x[n], old_idx=self.idx))
rtemp = rtemp.label_map(lambda t: t.has_label(time_derivative),
map_if_false=lambda t: -self.dt*self.br[n]*t)
r += rtemp
return r.form
@property
def solver0(self):
"""Set up the problem and the solver for initial trapezoidal step."""
# setup solver using lhs and rhs defined in derived class
problem = NonlinearVariationalProblem(self.lhs0-self.rhs0, self.x_out, bcs=self.bcs)
solver_name = self.field_name+self.__class__.__name__+"0"
return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters,
options_prefix=solver_name)
@property
def solver(self):
"""Set up the problem and the solver for Adams Moulton steps."""
# setup solver using lhs and rhs defined in derived class
problem = NonlinearVariationalProblem(self.lhs-self.rhs, self.x_out, bcs=self.bcs)
solver_name = self.field_name+self.__class__.__name__
return NonlinearVariationalSolver(problem, solver_parameters=self.solver_parameters,
options_prefix=solver_name)
[docs]
def apply(self, x_out, *x_in):
"""
Apply the time discretisation to advance one whole time step.
Args:
x_out (:class:`Function`): the output field to be computed.
x_in (:class:`Function`): the input field(s).
"""
if self.initial_timesteps < self.nlevels-1:
self.initial_timesteps += 1
solver = self.solver0
else:
solver = self.solver
for n in range(self.nlevels):
self.x[n].assign(x_in[n])
solver.solve()
x_out.assign(self.x_out)