"""
This file provides a coordinate object, dependent on the mesh.
Coordinate fields are stored in specified VectorFunctionSpaces.
"""
from gusto.core.coord_transforms import lonlatr_from_xyz, rotated_lonlatr_coords
from gusto.core.logging import logger
from firedrake import SpatialCoordinate, Function
import numpy as np
import pandas as pd
[docs]
class Coordinates(object):
"""
An object for holding and setting up coordinate fields.
"""
def __init__(self, mesh, on_sphere=False, rotated_pole=None, radius=None):
"""
Args:
mesh (:class:`Mesh`): the model's domain object.
on_sphere (bool, optional): whether the domain is on the surface of
a sphere. If False, the domain is assumed to be Cartesian.
Defaults to False.
rotated_pole (tuple, optional): a tuple of floats (lon, lat) of the
location to use as the north pole in a spherical coordinate
system. These are expressed in the original coordinate system.
The longitude and latitude must be expressed in radians.
Defaults to None. This is unused for non-spherical domains.
radius (float, optional): the radius of a spherical domain. Defaults
to None. This is unused for non-spherical domains.
"""
self.mesh = mesh
# -------------------------------------------------------------------- #
# Set up spatial coordinate
# -------------------------------------------------------------------- #
if on_sphere:
xyz = SpatialCoordinate(mesh)
if rotated_pole is not None:
lon, lat, r = rotated_lonlatr_coords(xyz, rotated_pole)
else:
lon, lat, r = lonlatr_from_xyz(xyz[0], xyz[1], xyz[2])
if mesh.extruded:
self.coords = (lon, lat, r-radius)
self.coords_name = ['lon', 'lat', 'h']
else:
self.coords = (lon, lat)
self.coords_name = ['lon', 'lat']
else:
self.coords = SpatialCoordinate(mesh)
if mesh.geometric_dimension() == 1:
self.coords_name = ['x']
elif mesh.geometric_dimension() == 2 and mesh.extruded:
self.coords_name = ['x', 'z']
elif mesh.geometric_dimension() == 2:
self.coords_name = ['x', 'y']
elif mesh.geometric_dimension() == 3:
self.coords_name = ['x', 'y', 'z']
else:
raise ValueError('Cannot work out coordinates of domain')
# -------------------------------------------------------------------- #
# Store chi field
# -------------------------------------------------------------------- #
self.chi_coords = {} # Dict of natural coords by space
self.global_chi_coords = {} # Dict of whole coords stored on first proc
self.parallel_array_lims = {} # Dict of array lengths for each proc
[docs]
def register_space(self, domain, space_name):
"""
Computes the coordinate fields at the DoFs of a function space, which
are subsequently used for outputting.
As proper parallel outputting is not yet implemented, the array of
coordinate data is entirely broadcast to the first processor.
Args:
space_name (str): the name of the function space to be registered
with the :class:`Coordinates` object.
Raises:
NotImplementedError: only scalar-valued spaces are implemented.
"""
comm = self.mesh.comm
comm_size = comm.Get_size()
my_rank = comm.Get_rank()
topological_dimension = self.mesh.topological_dimension()
if space_name in self.chi_coords.keys():
logger.warning(f'Coords for {space_name} space have already been computed')
return None
# Loop through spaces
space = domain.spaces(space_name)
# Use the appropriate scalar function space if the space is vector
if np.prod(space.ufl_element().value_shape) > 1:
# TODO: get scalar space, and only compute coordinates if necessary
logger.warning(f'Space {space_name} has more than one dimension, '
+ 'and coordinates used for netCDF output have not '
+ 'yet been implemented for this.')
return None
self.chi_coords[space_name] = []
# Now set up
for i in range(topological_dimension):
self.chi_coords[space_name].append(Function(space).interpolate(self.coords[i]))
# -------------------------------------------------------------------- #
# Code for settings up coordinates for parallel-serial IO
# -------------------------------------------------------------------- #
len_coords = space.dim()
my_num_dofs = len(self.chi_coords[space_name][0].dat.data_ro[:])
if my_rank != 0:
# Do not make full coordinate array
self.global_chi_coords[space_name] = None
self.parallel_array_lims[space_name] = None
# Find number of DoFs on this processor
comm.send(my_num_dofs, dest=0)
else:
# First processor has one large array of the global chi data
self.global_chi_coords[space_name] = np.zeros((topological_dimension, len_coords))
# Store the limits inside this array telling us how data is partitioned
self.parallel_array_lims[space_name] = np.zeros((comm_size, 2), dtype=int)
# First processor has the first bit of data
self.parallel_array_lims[space_name][my_rank][0] = 0
self.parallel_array_lims[space_name][my_rank][1] = my_num_dofs - 1
# Receive number of DoFs on other processors
for procid in range(1, comm_size):
other_num_dofs = comm.recv(source=procid)
self.parallel_array_lims[space_name][procid][0] = self.parallel_array_lims[space_name][procid-1][1] + 1
self.parallel_array_lims[space_name][procid][1] = self.parallel_array_lims[space_name][procid][0] + other_num_dofs - 1
# Now move coordinates to first processor
for i in range(topological_dimension):
if my_rank != 0:
# Send information to rank 0
my_tag = comm_size*i + my_rank
comm.send(self.chi_coords[space_name][i].dat.data_ro[:], dest=0, tag=my_tag)
else:
# Rank 0 -- the receiver
(low_lim, up_lim) = self.parallel_array_lims[space_name][my_rank][:]
self.global_chi_coords[space_name][i][low_lim:up_lim+1] = self.chi_coords[space_name][i].dat.data_ro[:]
# Receive coords from each processor and put them into array
for procid in range(1, comm_size):
my_tag = comm_size*i + procid
new_coords = comm.recv(source=procid, tag=my_tag)
(low_lim, up_lim) = self.parallel_array_lims[space_name][procid][:]
self.global_chi_coords[space_name][i, low_lim:up_lim+1] = new_coords
[docs]
def get_column_data(self, field, domain):
"""
Reshapes a field's data into columns.
Args:
field (:class:`Function`): the field whose data needs sorting.
domain (:class:`Domain`): the domain used to register coordinates
if this hasn't already been done.
Returns:
tuple of :class:`numpy.ndarray`: a 2D array of data, arranged in
columns, and the data pairing the indices of the data with the
ordered column data.
"""
space_name = field.function_space().name
if space_name not in self.chi_coords.keys():
self.register_space(domain, space_name)
coords = self.chi_coords[space_name]
data_is_3d = (len(coords) == 3)
coords_X = coords[0].dat.data
coords_Y = coords[1].dat.data if data_is_3d else None
coords_Z = coords[-1].dat.data
# ------------------------------------------------------------------------ #
# Round data to ensure sorting in dataframe is OK
# ------------------------------------------------------------------------ #
# Work out digits to round to, based on number of points and range of coords
num_points = np.size(coords_X)
data_range = np.max(coords_X) - np.min(coords_X)
if data_range > np.finfo(type(data_range)).tiny:
digits = int(np.floor(-np.log10(data_range / num_points)) + 3)
coords_X = coords_X.round(digits)
if data_is_3d:
data_range = np.max(coords_Y) - np.min(coords_Y)
if data_range > np.finfo(type(data_range)).tiny:
# Only round if there is already some range
digits = int(np.floor(-np.log10(data_range / num_points)) + 3)
coords_Y = coords_Y.round(digits)
# -------------------------------------------------------------------- #
# Make data frame
# -------------------------------------------------------------------- #
data_dict = {'field': field.dat.data, 'X': coords_X, 'Z': coords_Z,
'index': range(len(field.dat.data))}
if coords_Y is not None:
data_dict['Y'] = coords_Y
# Put everything into a pandas dataframe
data = pd.DataFrame(data_dict)
# Sort array by X and Y coordinates
if data_is_3d:
data = data.sort_values(by=['X', 'Y', 'Z'])
first_X, first_Y = data['X'].values[0], data['Y'].values[0]
first_point = data[(np.isclose(data['X'], first_X))
& (np.isclose(data['Y'], first_Y))]
else:
data = data.sort_values(by=['X', 'Z'])
first_X = data['X'].values[0]
first_point = data[np.isclose(data['X'], first_X)]
# Number of levels should correspond to the number of points with the first
# coordinate values
num_levels = len(first_point)
assert len(data) % num_levels == 0, 'Unable to nicely divide data into levels'
# -------------------------------------------------------------------- #
# Create new arrays to store structured data
# -------------------------------------------------------------------- #
num_hori_points = int(len(data) / num_levels)
field_data = np.zeros((num_hori_points, num_levels))
coords_X = np.zeros((num_hori_points, num_levels))
if data_is_3d:
coords_Y = np.zeros((num_hori_points, num_levels))
coords_Z = np.zeros((num_hori_points, num_levels))
index_data = np.zeros((num_hori_points, num_levels), dtype=int)
# -------------------------------------------------------------------- #
# Fill arrays, on the basis of the dataframe already being sorted
# -------------------------------------------------------------------- #
for lev_idx in range(num_levels):
data_slice = slice(lev_idx, num_hori_points*num_levels+lev_idx, num_levels)
field_data[:, lev_idx] = data['field'].values[data_slice]
coords_X[:, lev_idx] = data['X'].values[data_slice]
if data_is_3d:
coords_Y[:, lev_idx] = data['Y'].values[data_slice]
coords_Z[:, lev_idx] = data['Z'].values[data_slice]
index_data[:, lev_idx] = data['index'].values[data_slice]
return field_data, index_data
[docs]
def set_field_from_column_data(self, field, columnwise_data, index_data):
"""
Fills in field data from some columnwise data.
Args:
field (:class:`Function`): the field whose data shall be filled.
columnwise_data (:class:`numpy.ndarray`): the field data arranged
into columns, to be written into the field.
index_data (:class:`numpy.ndarray`): the indices of the original
field data, arranged like the columnwise data.
Returns:
:class:`Function`: the updated field.
"""
_, num_levels = np.shape(columnwise_data)
for lev_idx in range(num_levels):
field.dat.data[index_data[:, lev_idx]] = columnwise_data[:, lev_idx]
return field