Source code for firedrake.pointeval_utils

import loopy as lp
from firedrake.utils import IntType, as_cstr

from ufl import TensorProductCell
from finat.ufl import MixedElement
from ufl.corealg.map_dag import map_expr_dags
from ufl.algorithms import extract_arguments, extract_coefficients
from ufl.domain import extract_unique_domain

import gem

import tsfc
import tsfc.kernel_interface.firedrake_loopy as firedrake_interface
from tsfc.loopy import generate as generate_loopy
from tsfc.parameters import default_parameters

from firedrake import utils
from firedrake.petsc import PETSc


[docs] @PETSc.Log.EventDecorator() def compile_element(expression, coordinates, parameters=None): """Generates C code for point evaluations. :arg expression: UFL expression :arg coordinates: coordinate field :arg parameters: form compiler parameters :returns: C code as string """ if parameters is None: parameters = default_parameters() else: _ = default_parameters() _.update(parameters) parameters = _ # No arguments, please! if extract_arguments(expression): return ValueError("Cannot interpolate UFL expression with Arguments!") # Apply UFL preprocessing expression = tsfc.ufl_utils.preprocess_expression(expression, complex_mode=utils.complex_mode) # Collect required coefficients coefficient, = extract_coefficients(expression) # Point evaluation of mixed coefficients not supported here if type(coefficient.ufl_element()) == MixedElement: raise NotImplementedError("Cannot point evaluate mixed elements yet!") # Replace coordinates (if any) domain = extract_unique_domain(expression) assert extract_unique_domain(coordinates) == domain # Initialise kernel builder builder = firedrake_interface.KernelBuilderBase(utils.ScalarType) builder.domain_coordinate[domain] = coordinates builder._coefficient(coordinates, "x") x_arg = builder.generate_arg_from_expression(builder.coefficient_map[coordinates]) builder._coefficient(coefficient, "f") f_arg = builder.generate_arg_from_expression(builder.coefficient_map[coefficient]) # TODO: restore this for expression evaluation! # expression = ufl_utils.split_coefficients(expression, builder.coefficient_split) # Translate to GEM cell = domain.ufl_cell() dim = cell.topological_dimension() point = gem.Variable('X', (dim,)) point_arg = lp.GlobalArg("X", dtype=utils.ScalarType, shape=(dim,)) config = dict(interface=builder, ufl_cell=extract_unique_domain(coordinates).ufl_cell(), integral_type="cell", point_indices=(), point_expr=point, scalar_type=utils.ScalarType) # TODO: restore this for expression evaluation! # config["cellvolume"] = cellvolume_generator(extract_unique_domain(coordinates), coordinates, config) context = tsfc.fem.GemPointContext(**config) # Abs-simplification expression = tsfc.ufl_utils.simplify_abs(expression, utils.complex_mode) # Translate UFL -> GEM translator = tsfc.fem.Translator(context) result, = map_expr_dags(translator, [expression]) tensor_indices = () if expression.ufl_shape: tensor_indices = tuple(gem.Index() for s in expression.ufl_shape) return_variable = gem.Indexed(gem.Variable('R', expression.ufl_shape), tensor_indices) result_arg = lp.GlobalArg("R", dtype=utils.ScalarType, shape=expression.ufl_shape) result = gem.Indexed(result, tensor_indices) else: return_variable = gem.Indexed(gem.Variable('R', (1,)), (0,)) result_arg = lp.GlobalArg("R", dtype=utils.ScalarType, shape=(1,)) # Unroll max_extent = parameters["unroll_indexsum"] if max_extent: def predicate(index): return index.extent <= max_extent result, = gem.optimise.unroll_indexsum([result], predicate=predicate) # Translate GEM -> loopy result, = gem.impero_utils.preprocess_gem([result]) impero_c = gem.impero_utils.compile_gem([(return_variable, result)], tensor_indices) loopy_args = [result_arg, point_arg, x_arg, f_arg] loopy_kernel, _ = generate_loopy( impero_c, loopy_args, utils.ScalarType, kernel_name="evaluate_kernel", index_names={}) kernel_code = lp.generate_code_v2(loopy_kernel).device_code() # Fill the code template extruded = isinstance(cell, TensorProductCell) code = { "geometric_dimension": domain.geometric_dimension(), "layers_arg": ", int const *__restrict__ layers" if extruded else "", "layers": ", layers" if extruded else "", "extruded_define": "1" if extruded else "0", "IntType": as_cstr(IntType), "scalar_type": utils.ScalarType_c, } # if maps are the same, only need to pass one of them if coordinates.cell_node_map() == coefficient.cell_node_map(): code["wrapper_map_args"] = "%(IntType)s const *__restrict__ coords_map" % code code["map_args"] = "f->coords_map" else: code["wrapper_map_args"] = "%(IntType)s const *__restrict__ coords_map, %(IntType)s const *__restrict__ f_map" % code code["map_args"] = "f->coords_map, f->f_map" evaluate_template_c = """ static inline void wrap_evaluate(%(scalar_type)s* const result, %(scalar_type)s* const X, %(IntType)s const start, %(IntType)s const end%(layers_arg)s, %(scalar_type)s const *__restrict__ coords, %(scalar_type)s const *__restrict__ f, %(wrapper_map_args)s); int evaluate(struct Function *f, double *x, %(scalar_type)s *result) { /* The type definitions and arguments used here are defined as statics in pointquery_utils.py */ double found_ref_cell_dist_l1 = DBL_MAX; struct ReferenceCoords temp_reference_coords, found_reference_coords; int cells_ignore[1] = {-1}; %(IntType)s cell = locate_cell(f, x, %(geometric_dimension)d, &to_reference_coords, &to_reference_coords_xtr, &temp_reference_coords, &found_reference_coords, &found_ref_cell_dist_l1, 1, cells_ignore); if (cell == -1) { return -1; } if (!result) { return 0; } #if %(extruded_define)s int layers[2] = {0, 0}; int nlayers = f->n_layers; layers[1] = cell %% nlayers + 2; cell = cell / nlayers; #endif wrap_evaluate(result, found_reference_coords.X, cell, cell+1%(layers)s, f->coords, f->f, %(map_args)s); return 0; } """ return (evaluate_template_c % code) + kernel_code