# -*- coding: utf-8 -*-
"""
Module to produce traveltime lookup tables defined on a Cartesian grid.
:copyright:
2020, QuakeMigrate developers.
:license:
GNU General Public License, Version 3
(https://www.gnu.org/licenses/gpl-3.0.html)
"""
import copy
import pathlib
import pickle
import matplotlib.pyplot as plt
import numpy as np
import pyproj
from mpl_toolkits.axes_grid1.anchored_artists import AnchoredSizeBar
from scipy.interpolate import RegularGridInterpolator
[docs]class Grid3D:
"""
A grid object represents a collection of points in a 3-D Cartesian space
that can be used to produce regularised traveltime lookup tables that
sample the continuous traveltime space for each station in a seismic
network.
This class also provides the series of transformations required to move
between the input projection, the grid projection and the grid index
coordinate spaces.
The size and shape specifications of the grid are defined by providing the
(input projection) coordinates for the lower-left and upper-right corners,
a node spacing and the projections (defined using pyproj) of the input and
grid spaces.
Attributes
----------
coord_proj : `pyproj.Proj` object
Input coordinate space projection.
grid_corners : array-like, shape (8, 3)
Positions of the corners of the grid in the grid coordinate space.
grid_proj : `pyproj.Proj` object
Grid space projection.
grid_xyz : array-like, shape (3, nx, ny, nz)
Positions of the grid nodes in the grid coordinate space. The shape of
each element of the list is defined by the number of nodes in each
dimension.
ll_corner : array-like, [float, float, float]
Location of the lower-left corner of the grid in the grid
projection. Should also contain the minimum depth in the grid.
node_count : array-like, [int, int, int]
Number of nodes in each dimension of the grid. This is calculated by
finding the number of nodes with a given node spacing that fit between
the lower-left and upper-right corners. This value is rounded up if the
number of nodes returned is non-integer, to ensure the requested area
is included in the grid.
node_spacing : array-like, [float, float, float]
Distance between nodes in each dimension of the grid.
precision : list of float
An appropriate number of decimal places for distances as a function of
the node spacing and coordinate projection.
unit_conversion_factor : float
A conversion factor based on the grid projection, used to convert
between units of metres and kilometres.
unit_name : str
Shorthand string for the units of the grid projection.
ur_corner : array-like, [float, float, float]
Location of the upper-right corner of the grid in the grid
projection. Should also contain the maximum depth in the grid.
Methods
-------
coord2grid(value, inverse=False, clip=False)
Provides a transformation between the input projection and grid
coordinate spaces.
decimate(df, inplace=False)
Downsamples the traveltime lookup tables by some decimation factor.
index2coord(value, inverse=False, unravel=False, clip=False)
Provides a transformation between grid indices (can be a flattened
index or an [i, j, k] position) and the input projection coordinate
space.
index2grid(value, inverse=False, unravel=False)
Provides a transformation between grid indices (can be a flattened
index or an [i, j, k] position) and the grid coordinate space.
"""
def __init__(self, ll_corner, ur_corner, node_spacing, grid_proj,
coord_proj):
"""Instantiate the Grid3D object."""
self.grid_proj = grid_proj
self.coord_proj = coord_proj
# Transform the geographical grid corners into grid coordinates
self.ll_corner = self.coord2grid(ll_corner)[0]
self.ur_corner = self.coord2grid(ur_corner)[0]
# Calculate the grid dimensions and the number of nodes required
grid_dims = self.ur_corner - self.ll_corner
self.node_spacing = node_spacing
self.node_count = np.ceil(grid_dims / self.node_spacing) + 1
[docs] def decimate(self, df, inplace=False):
"""
Resample the traveltime lookup tables by decimation by some factor.
Parameters
----------
df : array-like [int, int, int]
Decimation factor in each dimension.
inplace : bool, optional
Perform the operation on the lookup table object or a copy.
Returns
-------
grid : Grid3D object (optional)
Returns a Grid3D object with decimated traveltime lookup tables.
"""
df = np.array(df, dtype=np.int)
new_node_count = 1 + (self.node_count - 1) // df
c1 = (self.node_count - df * (new_node_count - 1) - 1) // 2
if inplace:
grid = self
else:
grid = copy.deepcopy(self)
grid.node_count = new_node_count
grid.node_spacing = self.node_spacing * df
for station, map_ in grid.traveltimes.items():
for phase, ttimes in map_.items():
grid[station][phase] = ttimes[c1[0]::df[0],
c1[1]::df[1],
c1[2]::df[2]]
if not inplace:
return grid
[docs] def index2grid(self, value, inverse=False, unravel=False):
"""
Convert between grid indices and grid coordinate space.
Parameters
----------
value : array-like
Array (of arrays) containing the grid indices (grid coordinates)
to be transformed. Can be an array of flattened indices.
inverse : bool, optionale
Reverses the direction of the transform.
Default indices -> grid coordinates.
unravel : bool, optional
Convert a flat index or array of flat indices into a tuple of
coordinate arrays.
Returns
-------
out : array-like
Returns an array of arrays of the transformed values.
"""
if unravel:
value = np.column_stack(np.unravel_index(value, self.node_count))
else:
value = np.array(value)
if inverse:
out = np.rint((value - self.ll_corner) / self.node_spacing)
out = np.vstack(out.astype(int))
else:
out = np.vstack(self.ll_corner + (value * self.node_spacing))
# Handle cases where only a single ijk index is requested
if out.shape[1] == 1:
out = out.T
return out
[docs] def coord2grid(self, value, inverse=False):
"""
Convert between input coordinate space and grid coordinate space.
Parameters
----------
value : array-like
Array (of arrays) containing the coordinate locations to be
transformed. Each sub-array should describe a single point in the
3-D input space.
inverse : bool, optional
Reverses the direction of the transform.
Default input coordinates -> grid coordinates
Returns
-------
out : array-like
Returns an array of arrays of the transformed values.
"""
v1, v2, v3 = np.array(value).T
if inverse:
inproj, outproj = self.grid_proj, self.coord_proj
else:
inproj, outproj = self.coord_proj, self.grid_proj
return np.column_stack(pyproj.transform(inproj, outproj, v1, v2, v3))
[docs] def index2coord(self, value, inverse=False, unravel=False):
"""
Convert between grid indices and input coordinate space.
This is a utility function that wraps the other two defined transforms.
Parameters
----------
value : array-like
Array (of arrays) containing the grid indices (grid coordinates)
to be transformed. Can be an array of flattened indices.
inverse : bool, optional
Reverses the direction of the transform.
Default indices -> input projection coordinates.
unravel : bool, optional
Convert a flat index or array of flat indices into a tuple of
coordinate arrays.
Returns
-------
out : array-like
Returns an array of arrays of the transformed values.
"""
if inverse:
value = self.coord2grid(value)
out = self.index2grid(value, inverse=True)
else:
value = self.index2grid(value, unravel=unravel)
out = self.coord2grid(value, inverse=True)
return out
@property
def node_count(self):
"""Get and set the number of nodes in each dimension of the grid."""
try:
return self._node_count
except AttributeError:
print("FutureWarning: The internal data structure of LUT has "
"changed.\nTo remove this warning you will need to convert "
"your lookup table to the new-style\nusing "
"`quakemigrate.lut.update_lut`.")
return self._cell_count
@node_count.setter
def node_count(self, value):
value = np.array(value, dtype="int32")
assert (np.all(value > 0)), "Node count must be greater than [0]"
self._node_count = value
@property
def node_spacing(self):
"""Get and set the spacing of nodes in each dimension of the grid."""
try:
return self._node_spacing
except AttributeError:
print("FutureWarning: The internal data structure of LUT has "
"changed.\nTo remove this warning you will need to convert "
"your lookup table to the new-style\nusing "
"`quakemigrate.lut.update_lut`.")
return self._cell_size
@node_spacing.setter
def node_spacing(self, value):
value = np.array(value, dtype="float64")
if value.size == 1:
value = np.repeat(value, 3)
else:
assert (value.shape == (3,)), "Node spacing must be an nx3 array."
assert (np.all(value > 0)), "Node spacing must be greater than [0]"
self._node_spacing = value
@property
def grid_corners(self):
"""Get the xyz positions of the nodes on the corners of the grid."""
c = self.node_count - 1
i, j, k = np.meshgrid([0, c[0]], [0, c[1]], [0, c[2]], indexing="ij")
return self.index2grid(np.c_[i.flatten(), j.flatten(), k.flatten()])
[docs] def get_grid_extent(self, cells=False):
"""
Get the minimum/maximum extent of each dimension of the grid.
The default returns the grid extent as the convex hull of the grid
nodes. It is useful, for visualisation purposes, to also be able to
determine the grid extent as the convex hull of a grid of cells centred
on the grid nodes.
Parameters
----------
cells : bool, optional
Specifies the grid mode (nodes / cells) for which to calculate the
extent.
Returns
-------
extent : array-like
Pair of arrays representing two corners for the grid.
"""
ll, ur = self.grid_corners[0], self.grid_corners[-1]
if cells is True:
ll -= self.node_spacing / 2
ur += self.node_spacing / 2
return self.coord2grid([ll, ur], inverse=True)
grid_extent = property(get_grid_extent)
@property
def grid_xyz(self):
"""Get the xyz positions of all of the nodes in the grid."""
nc = self.node_count
ijk = np.meshgrid(*[np.arange(n) for n in nc], indexing="ij")
xyz = self.index2grid(np.column_stack([dim.flatten() for dim in ijk]))
return [xyz[:, dim].reshape(nc) for dim in range(3)]
@property
def precision(self):
"""
Get appropriate number of decimal places as a function of the
node spacing and coordinate projection.
"""
return [-int(np.format_float_scientific(axis).split("e")[1]) for axis
in np.subtract(*self.index2coord([[0, 0, 0], [1, 1, 1]]))]
@property
def unit_conversion_factor(self):
"""Expose unit_conversion_factor of the grid projection."""
return self.grid_proj.crs.axis_info[0].unit_conversion_factor
@property
def unit_name(self):
"""Expose unit_name of the grid_projection and return shorthand."""
unit_name = self.grid_proj.crs.axis_info[0].unit_name
return "km" if unit_name == "kilometre" else "m"
# --- Deprecation handling ---
@property
def cell_count(self):
"""Handler for deprecated attribute name 'cell_count'"""
return self.node_count
@cell_count.setter
def cell_count(self, value):
if value is None:
return
print("FutureWarning: Parameter name has changed - continuing.")
print("To remove this message, change:")
print("\t'cell_count' -> 'node_count'")
self.node_count = value
@property
def cell_size(self):
"""Handler for deprecated attribute name 'cell_size'"""
return self.node_spacing
@cell_size.setter
def cell_size(self, value):
if value is None:
return
print("FutureWarning: Parameter name has changed - continuing.")
print("To remove this message, change:")
print("\t'cell_size' -> 'node_spacing'")
self.node_spacing = value
[docs]class LUT(Grid3D):
"""
A lookup table (LUT) object is a simple data structure that is used to
store a series of regularised tables that, for each seismic station in a
network, store the traveltimes to every point in the 3-D volume. These
lookup tables are pre-computed to reduce the computational cost of the
back-projection method.
This class provides utility functions that can be used to serve up or query
these pre-computed lookup tables.
This object is-a Grid3D.
Attributes
----------
fraction_tt : float
An estimate of the uncertainty in the velocity model as a function of
a fraction of the traveltime. (Default 0.1 == 10%)
max_traveltime : float
The maximum traveltime between any station and a point in the grid.
phases : list of str
Seismic phases for which there are traveltime lookup tables available.
stations_xyz : array-like, shape (n, 3)
Positions of the stations in the grid coordinate space.
traveltimes : dict
A dictionary containing the traveltime lookup tables. The structure of
this dictionary is:
traveltimes
- "<Station1-ID>"
- "<PHASE>"
- "<PHASE>"
- "<Station2-ID"
- "<PHASE>"
- "<PHASE>"
etc
velocity_model : `~pandas.DataFrame` object
Contains the input velocity model specification.
Methods
-------
serve_traveltimes(sampling_rate)
Serve up the traveltime lookup tables.
traveltime_to(phase, ijk)
Query traveltimes to a grid location (in terms of indices) for a
particular phase.
save(filename)
Dumps the current state of the lookup table object to a pickle file.
load(filename)
Restore the state of the saved LUT object from a pickle file.
plot(fig, gs, slices=None, hypocentre=None, station_clr="k")
Plot cross-sections of the LUT with station locations. Optionally plot
slices through a coalescence volume.
"""
def __init__(self, fraction_tt=0.1, lut_file=None, **grid_spec):
"""Instantiate the LUT object."""
if grid_spec:
super().__init__(**grid_spec)
self.fraction_tt = fraction_tt
self.traveltimes = {}
self.phases = []
self.velocity_model = ""
else:
self.fraction_tt = fraction_tt
self.phases = ["P", "S"] # Handle old lookup tables
if lut_file is not None:
self.load(lut_file)
def __str__(self):
"""Return short summary string of the lookup table object."""
ll, *_, ur = self.coord2grid(self.grid_corners, inverse=True)
out = ("QuakeMigrate traveltime lookup table\nGrid parameters"
"\n\tLower-left corner : {lat1:10.5f}\u00b0N "
"{lon1:10.5f}\u00b0E {dep1:10.3f} {unit_name:s}"
"\n\tUpper-right corner : {lat2:10.5f}\u00b0N "
"{lon2:10.5f}\u00b0E {dep2:10.3f} {unit_name:s}"
f"\n\tNumber of nodes : {self.node_count}"
f"\n\tNode spacing : {self.node_spacing} {self.unit_name}"
"\n\n")
out = out.format(lat1=ll[0], lon1=ll[1], dep1=ll[2],
lat2=ur[0], lon2=ur[1], dep2=ur[2],
unit_name=self.unit_name)
out += ("\tVelocity model:\n"
"\t{}".format(str(self.velocity_model).replace("\n", "\n\t")))
return out
[docs] def serve_traveltimes(self, sampling_rate):
"""
Serve up the traveltime lookup tables.
The traveltimes are multiplied by the scan sampling rate and converted
to integers.
Parameters
----------
sampling_rate : int
Samples per second used in the scan run.
Returns
-------
traveltimes : `numpy.ndarray` of `numpy.int`
Stacked traveltime lookup tables for all seismic phases, stacked
along the station axis, with shape(nx, ny, nz, nstations)
"""
traveltimes = self._serve_traveltimes(self.phases)
return np.rint(traveltimes * sampling_rate).astype(np.int32)
[docs] def traveltime_to(self, phase, ijk):
"""
Serve up the traveltimes to a grid location for a particular phase.
Parameters
----------
phase : str
The seismic phase to lookup.
ijk : array-like
Grid indices for which to serve traveltime.
Returns
-------
traveltimes : array-like
Array of interpolated traveltimes to the requested grid position.
"""
grid = tuple([np.arange(nc) for nc in self.node_count])
traveltimes = self._serve_traveltimes([phase])
interpolator = RegularGridInterpolator(grid, traveltimes,
bounds_error=False,
fill_value=None)
return interpolator(ijk)[0]
def _serve_traveltimes(self, phases):
"""Utility function to serve up traveltimes for a list of phases."""
traveltimes = []
for phase in phases:
for station in self.station_data["Name"].values:
try:
traveltimes.append(self[station][phase])
except KeyError:
traveltimes.append(self[station][f"TIME_{phase}"])
return np.stack(traveltimes, axis=-1)
[docs] def save(self, filename):
"""
Dump the current state of the lookup table object to a pickle file.
Parameters
----------
filename : str
Path to location to save pickled lookup table.
"""
# Ensure the output path exists
pathlib.Path(filename).parent.mkdir(parents=True, exist_ok=True)
with open(filename, "wb") as f:
pickle.dump(self.__dict__, f, 4)
[docs] def load(self, filename):
"""
Read the contents of a pickle file and restore state of the lookup
table object.
Parameters
----------
filename : str
Path to pickle file to load.
"""
print("FutureWarning: This method of reading lookup tables has been"
"deprecated.\nTo remove this warning:\n"
"\tUse 'quakemigrate.io.read_lut(lut_file=/path/to/file'")
with open(filename, "rb") as f:
self.__dict__.update(pickle.load(f))
if hasattr(self, "maps"):
print("FutureWarning: The internal data structure of LUT has "
"changed.\nTo remove this warning you will need to convert "
"your lookup table to the new-style\nusing "
"`quakemigrate.lut.update_lut`.")
[docs] def plot(self, fig, gs, slices=None, hypocentre=None, station_clr="k"):
"""
Plot the lookup table for a particular station.
Parameters
----------
fig : `~matplotlib.Figure` object
Canvas on which LUT is plotted.
gs : tuple(int, int)
Grid specification for the plot.
slices : array of arrays, optional
Slices through a coalescence volume to plot.
hypocentre : array of floats
Event hypocentre - will add cross-hair to plot.
station_clr : str, optional
Plot the stations with a particular colour.
"""
xy = plt.subplot2grid(gs, (2, 0), colspan=5, rowspan=5, fig=fig)
xz = plt.subplot2grid(gs, (7, 0), colspan=5, rowspan=2, fig=fig)
yz = plt.subplot2grid(gs, (2, 5), colspan=2, rowspan=5, fig=fig)
xz.get_shared_x_axes().join(xy, xz)
yz.get_shared_y_axes().join(xy, yz)
# --- Set aspect ratio ---
# Aspect is defined such that a circle will be stretched so that its
# height is aspect times the width.
cells_extent = self.get_grid_extent(cells=True)
extent = abs(cells_extent[1] - cells_extent[0])
# NOTE: no fenceposts here, because we want the size of the grid as
# cells
grid_size = self.node_spacing * self.node_count
aspect = (extent[0] * grid_size[1]) / (extent[1] * grid_size[0])
xy.set_aspect(aspect=aspect)
bounds = np.stack(cells_extent, axis=-1)
for i, j, ax in [(0, 1, xy), (0, 2, xz), (2, 1, yz)]:
gminx, gmaxx = bounds[i]
gminy, gmaxy = bounds[j]
ax.set_xlim([gminx, gmaxx])
ax.set_ylim([gminy, gmaxy])
# --- Plot crosshair for event hypocentre ---
if hypocentre is not None:
ax.axvline(x=hypocentre[i], ls="--", lw=1.5, c="white")
ax.axhline(y=hypocentre[j], ls="--", lw=1.5, c="white")
# --- Plot slices through coalescence volume ---
if slices is None:
continue
slice_ = slices[i + j - 1]
nx, ny = [dim + 1 for dim in slice_.shape]
grid1, grid2 = np.mgrid[gminx:gmaxx:nx*1j, gminy:gmaxy:ny*1j]
sc = ax.pcolormesh(grid1, grid2, slice_, edgecolors="face")
if i + j - 1 != 0:
continue
# --- Add colourbar ---
cax = plt.subplot2grid(gs, (7, 5), colspan=2, rowspan=2, fig=fig)
cax.set_axis_off()
cb = fig.colorbar(sc, ax=cax, orientation="horizontal",
fraction=0.8, aspect=8)
cb.ax.set_xlabel("Normalised coalescence\nvalue", rotation=0,
fontsize=14)
# --- Plot stations ---
xy.scatter(self.station_data.Longitude.values,
self.station_data.Latitude.values,
s=15, marker="^", zorder=20, c=station_clr)
xz.scatter(self.station_data.Longitude.values,
self.station_data.Elevation.values,
s=15, marker="^", zorder=20, c=station_clr)
yz.scatter(self.station_data.Elevation.values,
self.station_data.Latitude.values,
s=15, marker="<", zorder=20, c=station_clr)
for i, row in self.station_data.iterrows():
xy.annotate(row["Name"], [row.Longitude, row.Latitude], zorder=20,
c=station_clr, clip_on=True)
# --- Add scalebar ---
num_cells = np.ceil(self.node_count[0] / 10)
length = num_cells * self.node_spacing[0]
size = extent[0] * length / grid_size[0]
scalebar = AnchoredSizeBar(xy.transData, size=size,
label=f"{length} {self.unit_name}",
loc="lower right", pad=0.5, sep=5,
frameon=False, color=station_clr)
xy.add_artist(scalebar)
# --- Axes labelling ---
xy.tick_params(which="both", left=True, right=True, top=True,
bottom=True, labelleft=True, labeltop=True,
labelright=False, labelbottom=False)
xy.set_ylabel("Latitude (deg)", fontsize=14)
xy.yaxis.set_label_position("left")
xz.invert_yaxis()
xz.tick_params(which="both", left=True, right=True, top=True,
bottom=True, labelleft=True, labeltop=False,
labelright=False, labelbottom=True)
xz.set_xlabel("Longitude (deg)", fontsize=14)
xz.set_ylabel(f"Depth ({self.unit_name})", fontsize=14)
xz.yaxis.set_label_position("left")
yz.tick_params(which="both", left=True, right=True, top=True,
bottom=True, labelleft=False, labeltop=True,
labelright=True, labelbottom=True)
yz.set_xlabel(f"Depth ({self.unit_name})", fontsize=14)
yz.xaxis.set_label_position("bottom")
@property
def max_extent(self):
"""Get the minimum/maximum geographical extent of the stations/grid."""
stat_min, stat_max = self.station_extent
grid_min, grid_max = self.get_grid_extent(cells=True)
min_extent = [min(a, b) for a, b in zip(stat_min, grid_min)]
max_extent = [max(a, b) for a, b in zip(stat_max, grid_max)]
diff = abs(np.subtract(max_extent, min_extent))
min_extent = np.subtract(min_extent, 0.05*diff)
max_extent = np.add(max_extent, 0.05 * diff)
return np.array([min_extent, max_extent])
@property
def max_traveltime(self):
"""Get the maximum traveltime from any station across the grid."""
return np.max(self._serve_traveltimes(self.phases))
@property
def station_extent(self):
"""Get the minimum/maximum extent of the seismic network."""
coordinates = self.station_data[["Longitude", "Latitude", "Elevation"]]
return [[f(dim) for dim in coordinates.values.T] for f in (min, max)]
@property
def stations_xyz(self):
"""Get station locations in the grid space [X, Y, Z]."""
coordinates = self.station_data[["Longitude", "Latitude", "Elevation"]]
return self.coord2grid(coordinates.values)
def __add__(self, other):
"""
Define behaviour for the rich addition operator, "+".
Two lookup tables which have identical grid definitions (as per "==")
can be combined by adding the traveltime lookup tables from
other.traveltimes for which the station key is not already in
self.traveltimes.
Parameters
----------
other : :class:`~quakemigrate.lut.LUT` object
LUT with traveltime lookup tables to add to self.
"""
if not isinstance(other, LUT):
print("Addition not defined for non-LUT object.")
return self
else:
if self == other:
self.traveltimes.update(other.traveltimes)
return self
else:
print("Grid definitions do not match - cannot combine.")
def __eq__(self, other):
"""
Define behaviour for the rich equality operator, "==".
Two lookup tables are defined to be equal if their grid definitions are
identical - corners, node spacing, projections.
Parameters
----------
other : :class:`~quakemigrate.lut.LUT` object
LUT with which to test equality with self.
"""
# Test if other isinstance of LUT
if not isinstance(other, LUT):
print("Equality of LUT with non-LUT object is undefined.")
return False
else:
# Test equality of grid corners
eq_corners = (self.grid_corners == other.grid_corners).all()
# Test equality of node spacings
eq_sizes = (self.node_spacing == other.node_spacing).all()
# Test equality of projections
eq_projections = (self.grid_proj == other.grid_proj
and self.coord_proj == other.coord_proj)
return eq_corners and eq_sizes and eq_projections
def __getitem__(self, key):
"""
Provide a method to directly access traveltime tables by station key
without having to go through the traveltimes dictionary.
Parameters
----------
key : str
Station ID for which to search.
Returns
-------
station_traveltimes : dict
Traveltime lookup table for key (station), if key exists.
"""
try:
return self.traveltimes[key]
except AttributeError:
return self.maps[key]
except KeyError:
print(f"No traveltime lookup table available for '{key}'.")