# -*- coding: utf-8 -*-
"""
Module to perform core QuakeMigrate functions: detect() and locate().
:copyright:
2020–2024, QuakeMigrate developers.
:license:
GNU General Public License, Version 3
(https://www.gnu.org/licenses/gpl-3.0.html)
"""
import logging
import warnings
from datetime import time
import numpy as np
from obspy import UTCDateTime
import pandas as pd
from scipy.interpolate import Rbf
from scipy.signal import fftconvolve
import quakemigrate.util as util
from quakemigrate.core import find_max_coa, migrate
from quakemigrate.io import (
Event,
Run,
ScanmSEED,
read_triggered_events,
write_availability,
write_cut_waveforms,
write_coalescence,
)
from quakemigrate.plot.event import event_summary
from .onsets import Onset
from .pickers import GaussianPicker, PhasePicker
from .local_mag import LocalMag
# Filter warnings
warnings.filterwarnings(
"ignore", message=("Covariance of the parameters could not be estimated")
)
[docs]class QuakeScan:
"""
QuakeMigrate scanning class.
Provides an interface for the wrapped compiled C functions, used to perform the
continuous scan (detect) or refined event migrations (locate).
Parameters
----------
archive : :class:`~quakemigrate.io.data.Archive` object
Details the structure and location of a data archive and provides methods for
reading data from file.
lut : :class:`~quakemigrate.lut.lut.LUT` object
Contains the traveltime lookup tables for seismic phases, computed for some
pre-defined velocity model.
onset : :class:`~quakemigrate.signal.onsets.base.Onset` object
Provides callback methods for calculation of onset functions.
run_path : str
Points to the top level directory containing all input files, under which the
specific run directory will be created.
run_name : str
Name of the current QuakeMigrate run.
kwargs : **dict
See QuakeScan Attributes for details. In addition to these:
Attributes
----------
continuous_scanmseed_write : bool
Option to continuously write the .scanmseed file output by detect() at the end
of every time step. Default behaviour is to write in day chunks where possible.
Default: False.
cut_waveform_format : str, optional
File format used when writing waveform data. We support any format also
supported by ObsPy - "MSEED" (default), "SAC", "SEGY", "GSE2".
log : bool, optional
Toggle for logging. If True, will output to stdout and generate a log file.
Default is to only output to stdout.
loglevel : {"info", "debug"}, optional
Toggle to set the logging level: "debug" will print out additional diagnostic
information to the log and stdout. (Default "info")
mags : :class:`~quakemigrate.signal.local_mag.local_mag.LocalMag` object, optional
Provides methods for calculating local magnitudes, performed during locate.
marginal_window : float, optional
Half-width of window centred on the maximum coalescence time. The 4-D
coalescence functioned is marginalised over time across this window such that
the earthquake location and associated uncertainty can be appropriately
calculated. It should be an estimate of the time uncertainty in the earthquake
origin time, which itself is some combination of the expected spatial
uncertainty and uncertainty in the seismic velocity model used.
Default: 2 seconds.
picker : :class:`~quakemigrate.signal.pickers.base.PhasePicker` object, optional
Provides callback methods for phase picking, performed during locate.
plot_all_stns : bool, optional
If true, plot all stations in the LUT. Otherwise, only plot stations which were
used for migration (i.e. omitting stations for which there was no data, or data
did not pass the specified quality checks). Default: True.
plot_event_summary : bool, optional
Plot event summary figure - see `quakemigrate.plot` for more details.
Default: True.
plot_event_video : bool, optional
Plot coalescence video for each located earthquake. Default: False.
post_pad : float
Additional amount of data to read in after the timestep, used to ensure the
correct coalescence is calculated at every sample.
pre_pad : float
Additional amount of data to read in before the timestep, used to ensure the
correct coalescence is calculated at every sample.
real_waveform_units : {"displacement", "velocity"}
Units to output real cut waveforms.
run : :class:`~quakemigrate.io.core.Run` object
Light class encapsulating i/o path information for a given run.
scan_rate : int, optional
Sampling rate at which the 4-D coalescence map will be calculated. Currently
fixed to be the same as the onset function sampling rate (not
user-configurable).
threads : int, optional
The number of threads for the C functions to use on the executing host.
Default: 1 thread.
timestep : float, optional
Length (in seconds) of timestep used in detect(). Note: total detect run
duration should be divisible by timestep. Increasing timestep will increase RAM
usage during detect, but will slightly speed up overall detect run.
Default: 120 seconds.
wa_waveform_units : {"displacement", "velocity"}
Units to output Wood-Anderson simulated cut waveforms.
write_cut_waveforms : bool, optional
Write raw cut waveforms for all data read from the archive for each event
located by locate(). See `~quakemigrate.io.data.Archive` parameter
`read_all_stations`. Default: False.
NOTE: this data has not been processed or quality-checked!
write_marginal_coalescence : bool, optional
Write the marginalised 3-D coalescence map to file (in .npy format).
Default: False.
write_coalescence : bool, optional
Write the raw 4-D coalescence map from locate to file (in .npy format).
Default: False.
write_real_waveforms : bool, optional
Write real cut waveforms for all data read from the archive for each event
located by locate(). See `~quakemigrate.io.data.Archive` parameter
`read_all_stations`. Default: False.
NOTE: the units of this data (displacement or velocity) are controlled by
`real_waveform_units`.
NOTE: this data has not been processed or quality-checked!
NOTE: no padding has been added to take into account the taper applied during
response removal.
write_wa_waveforms : bool, optional
Write Wood-Anderson simulated cut waveforms for all data read from the archive
for each event located by locate(). See `~quakemigrate.io.data.Archive`
parameter `read_all_stations`. Default: False.
NOTE: the units of this data (displacement or velocity) are controlled by
`wa_waveform_units`.
NOTE: this data has not been processed or quality-checked!
NOTE: no padding has been added to take into account the taper applied during
response removal.
xy_files : str, optional
Path to comma-separated value file (.csv) containing a series of coordinate
files to plot. Columns: ["File", "Color", "Linewidth", "Linestyle"], where
"File" is the absolute path to the file containing the coordinates to be
plotted. E.g: "/home/user/volcano_outlines.csv,black,0.5,-". Each .csv
coordinate file should contain coordinates only, with columns: ["Longitude",
"Latitude"]. E.g.: "-17.5,64.8". Lines pre-pended with ``#`` will be treated as
a comment - this can be used to include references. See the
Volcanotectonic_Iceland example XY_files for a template.\n
.. note:: Do not include a header line in either file.
+++ TO BE REMOVED TO ARCHIVE CLASS +++
pre_cut : float, optional
Specify how long before the event origin time to cut the waveform data from.
post_cut : float, optional
Specify how long after the event origin time to cut the waveform.
data to
+++ TO BE REMOVED TO ARCHIVE CLASS +++
Methods
-------
detect(starttime, endtime)
Core detection method -- compute decimated 3-D coalescence continuously
throughout entire time period; output as .scanmseed (in mSEED format).
locate(starttime, endtime) or locate(file)
Core locate method -- compute 3-D coalescence over short time window around
candidate earthquake triggered from continuous detect output; output location &
uncertainties (.event file), phase picks (.picks file), plus multiple optional
plots / data for further analysis and processing.
Raises
------
OnsetTypeError
If an object is passed in through the `onset` argument that is not derived from
the :class:`~quakemigrate.signal.onsets.base.Onset` base class.
PickerTypeError
If an object is passed in through the `picker` argument that is not derived from
the :class:`~quakemigrate.signal.pickers.base.PhasePicker` base class.
RuntimeError
If the user does not supply the locate function with valid arguments.
TimeSpanException
If the user supplies a starttime that is after the endtime.
"""
def __init__(self, archive, lut, onset, run_path, run_name, **kwargs):
"""Instantiate the QuakeScan object."""
self.archive = archive
self.lut = lut
if isinstance(onset, Onset):
self.onset = onset
else:
raise util.OnsetTypeError
self.onset.post_pad = lut.max_traveltime
self.pre_pad = 0.0
self.post_pad = 0.0
# --- Set up i/o ---
self.run = Run(
run_path,
run_name,
kwargs.get("run_subname", ""),
loglevel=kwargs.get("loglevel", "info"),
)
self.log = kwargs.get("log", False)
picker = kwargs.get("picker")
if picker is None:
self.picker = GaussianPicker(onset=onset)
elif isinstance(picker, PhasePicker):
self.picker = picker
else:
raise util.PickerTypeError
# --- Grab QuakeScan parameters or set defaults ---
# Parameters related specifically to Detect
self.timestep = kwargs.get("timestep", 120.0)
self.time_step = kwargs.get("time_step") # DEPRECATING
# Parameters related specifically to Locate
self.marginal_window = kwargs.get("marginal_window", 2.0)
# General QuakeScan parameters
self.threads = kwargs.get("threads", 1)
self.n_cores = kwargs.get("n_cores") # DEPRECATING
self.sampling_rate = kwargs.get("sampling_rate") # DEPRECATING
self.scan_rate = self.onset.sampling_rate
# Magnitudes
mags = kwargs.get("mags")
if mags is not None:
if not isinstance(mags, LocalMag):
raise util.MagsTypeError
self.mags = mags
# Plotting toggles and parameters
self.plot_event_summary = kwargs.get("plot_event_summary", True)
self.plot_all_stns = kwargs.get("plot_all_stns", True)
self.plot_event_video = kwargs.get("plot_event_video", False)
self.xy_files = kwargs.get("xy_files")
# File writing toggles
self.continuous_scanmseed_write = kwargs.get(
"continuous_scanmseed_write", False
)
self.write_cut_waveforms = kwargs.get("write_cut_waveforms", False)
self.write_real_waveforms = kwargs.get("write_real_waveforms", False)
self.real_waveform_units = kwargs.get("real_waveform_units", "displacement")
self.write_wa_waveforms = kwargs.get("write_wa_waveforms", False)
self.wa_waveform_units = kwargs.get("wa_waveform_units", "displacement")
self.cut_waveform_format = kwargs.get("cut_waveform_format", "MSEED")
self.write_marginal_coalescence = kwargs.get(
"write_marginal_coalescence", False
)
self.write_coalescence = kwargs.get("write_coalescence", False)
# +++ TO BE REMOVED TO ARCHIVE CLASS +++
self.pre_cut = None
self.post_cut = None
# +++ TO BE REMOVED TO ARCHIVE CLASS +++
def __str__(self):
"""Return short summary string of the QuakeScan object."""
out = (
"\tScan parameters:\n"
f"\t\tScan sampling rate = {self.scan_rate} Hz\n"
f"\t\tThread count = {self.threads}\n"
)
if self.run.stage == "detect":
out += f"\t\tTime step = {self.timestep} s\n"
elif self.run.stage == "locate":
out += f"\t\tMarginal window = {self.marginal_window} s\n"
return out
[docs] def detect(self, starttime, endtime):
"""
Scans through data calculating coalescence in a (decimated) 3-D grid by
continuously migrating onset functions.
Parameters
----------
starttime : str
Timestamp from which to run continuous scan.
endtime : str
Timestamp up to which to run continuous scan. Note: if the duration is not
divisible by the specified timestep, the endtime will be extended to
accommodate. If the endtime is set to midnight, then it will be automatically
adjusted to one sample prior.
"""
# Configure logging
self.run.stage = "detect"
self.run.logger(self.log)
starttime, endtime = UTCDateTime(starttime), UTCDateTime(endtime)
if starttime > endtime:
raise util.TimeSpanException
# Shift endtime one sample earlier if it is at midnight (not necessary for
# typical combinations of starttimes and timesteps, but here to cover edge
# cases)
if endtime.time == time(0, 0):
endtime -= 1 / self.scan_rate
# Number of steps to break run duration into
n_steps = int(np.ceil((endtime - starttime) / self.timestep))
# Check if chosen start & endtimes and timestep are compatible
calc_endtime = starttime + n_steps * self.timestep - 1 / self.scan_rate
if calc_endtime - endtime > 1 / self.scan_rate:
logging.info(
f"Warning: chosen run duration {endtime - starttime} s is not "
f"divisible by the specified timestep {self.timestep} s. Detect will "
f"instead compute up to {calc_endtime}\n"
)
logging.info(util.log_spacer)
logging.info("\tDETECT - Continuous coalescence scan")
logging.info(util.log_spacer)
logging.info(f"\n\tScanning from {starttime} to {calc_endtime}\n")
logging.info(self)
logging.info(self.onset)
logging.info(util.log_spacer)
self._continuous_compute(starttime, n_steps)
logging.info(util.log_spacer)
[docs] def locate(self, starttime=None, endtime=None, trigger_file=None):
"""
Re-computes the coalescence on an undecimated grid for a short time window
around each candidate earthquake triggered from the (decimated) continuous
detect scan. Calculates event location and uncertainties, makes phase arrival
picks, plus multiple optional plotting / data outputs for further analysis and
processing.
Parameters
----------
starttime : str, optional
Timestamp from which to include events in the locate scan.
endtime : str, optional
Timestamp up to which to include events in the locate scan. Note: if the
endtime is set to midnight, then only events during the previous day will
be included.
trigger_file : str, optional
File containing triggered events to be located.
"""
# Configure logging
self.run.stage = "locate"
self.run.logger(self.log)
if not (starttime is None and endtime is None):
starttime, endtime = UTCDateTime(starttime), UTCDateTime(endtime)
if starttime > endtime:
raise util.TimeSpanException
if trigger_file is None and starttime is None and endtime is None:
raise RuntimeError("Must supply an input argument.")
if (starttime is None) ^ (endtime is None):
raise RuntimeError("Must supply a starttime AND an endtime.")
logging.info(util.log_spacer)
logging.info("\tLOCATE - Determining event location and uncertainty")
logging.info(util.log_spacer)
if trigger_file is not None:
logging.info(f"\n\tLocating events in {trigger_file}")
else:
logging.info(f"\n\tLocating events from {starttime} to {endtime}\n")
logging.info(self)
logging.info(self.onset)
logging.info(self.picker)
if self.mags is not None:
logging.info(self.archive.__str__(response_only=True))
logging.info(self.mags)
logging.info(util.log_spacer)
if trigger_file is not None:
self._locate_events(trigger_file=trigger_file)
else:
self._locate_events(starttime=starttime, endtime=endtime)
logging.info(util.log_spacer)
def _continuous_compute(self, starttime, n_steps):
"""
Compute coalescence between two timestamps, divided into increments of
`timestep`. Outputs coalescence and station availability data to file.
Parameters
----------
starttime : `obspy.UTCDateTime` object
Timestamp from which to compute continuous coalescence.
n_steps : int
Number of timesteps (of length `timestep`) to compute.
"""
coalescence = ScanmSEED(
self.run, self.continuous_scanmseed_write, self.scan_rate
)
self.pre_pad, self.post_pad = self.onset.pad(self.timestep)
availability_cols = np.array(
[
[f"{stat}_{ph}" for stat in self.archive.stations]
for ph in self.onset.phases
]
).flatten()
availability = pd.DataFrame(index=range(n_steps), columns=availability_cols)
for i in range(n_steps):
w_beg = starttime + self.timestep * i - self.pre_pad
w_end = (
starttime + self.timestep * (i + 1) - 1 / self.scan_rate + self.post_pad
)
logging.debug(f" Processing : {w_beg}-{w_end} ".center(110, "~"))
logging.info(
(
f" Processing : {w_beg + self.pre_pad}-{w_end - self.post_pad} "
).center(110, "~")
)
try:
data = self.archive.read_waveform_data(w_beg, w_end)
time, max_coa, max_coa_n, coord, onset_data = self._compute(data)
logging.debug(f"1-D con shape : {max_coa.shape}")
coalescence.append(
time, max_coa, max_coa_n, coord, self.lut.unit_conversion_factor
)
availability.loc[i] = onset_data.availability
except (
util.ArchiveEmptyException,
util.DataGapException,
util.DataAvailabilityException,
) as e:
coalescence.empty(
starttime, self.timestep, i, e.msg, self.lut.unit_conversion_factor
)
availability.loc[i] = np.zeros(len(availability_cols), dtype=int)
availability.rename(
index={i: str(starttime + self.timestep * i)}, inplace=True
)
if not coalescence.written:
coalescence.write()
write_availability(self.run, availability)
def _locate_events(self, **kwargs):
"""
Loop through list of earthquakes read in from trigger results and re-compute
coalescence; output phase picks, event location and uncertainty, plus optional
plots and outputs.
Parameters
----------
kwargs : **dict
Can contain:
starttime : `obspy.UTCDateTime` object, optional
Timestamp from which to include events in the locate scan.
endtime : `obspy.UTCDateTime` object, optional
Timestamp up to which to include events in the locate scan.
trigger_file : str, optional
File containing triggered events to be located.
"""
triggered_events = read_triggered_events(self.run, **kwargs)
n_events = len(triggered_events.index)
self.pre_pad, self.post_pad = self.onset.pad(4 * self.marginal_window)
for i, triggered_event in triggered_events.iterrows():
event = Event(self.marginal_window, triggered_event)
w_beg = event.trigger_time - 2 * self.marginal_window - self.pre_pad
w_end = event.trigger_time + 2 * self.marginal_window + self.post_pad
logging.info(util.log_spacer)
logging.info(f"\tEVENT - {i+1} of {n_events} - {event.uid}")
logging.info(util.log_spacer)
try:
logging.info("\tReading waveform data...")
event.add_waveform_data(self._read_event_waveform_data(w_beg, w_end))
logging.info("\tComputing 4-D coalescence function...")
event.add_compute_output( # pylint: disable=E1120
*self._compute(event.data, event)
)
except (
util.ArchiveEmptyException,
util.DataGapException,
util.DataAvailabilityException,
) as e:
logging.info(e.msg)
continue
if self.write_coalescence:
logging.info("\tSaving full coalescence map...")
write_coalescence(self.run, event.map4d, event)
# --- Trim coalescence map to marginal window ---
if event.in_marginal_window():
event.trim2window()
else:
del event
continue
logging.info("\tDetermining event location and uncertainty...")
marginalised_coa_map = self._calculate_location(event)
if self.write_marginal_coalescence:
logging.info("\tSaving marginalised coalescence map...")
write_coalescence(
self.run, marginalised_coa_map, event, marginalised=True
)
logging.info("\tMaking phase picks...")
event, _ = self.picker.pick_phases(event, self.lut, self.run)
if self.mags is not None:
logging.info("\tCalculating magnitude...")
event, _ = self.mags.calc_magnitude(event, self.lut, self.run)
event.write(self.run, self.lut)
if self.plot_event_summary:
event_summary(
self.run,
event,
marginalised_coa_map,
self.lut,
xy_files=self.xy_files,
plot_all_stns=self.plot_all_stns,
)
if self.plot_event_video:
logging.info("Support for event videos coming soon.")
if self.write_cut_waveforms:
write_cut_waveforms(
self.run,
event,
self.cut_waveform_format,
pre_cut=self.pre_cut,
post_cut=self.post_cut,
)
if self.write_real_waveforms:
write_cut_waveforms(
self.run,
event,
self.cut_waveform_format,
pre_cut=self.pre_cut,
post_cut=self.post_cut,
waveform_type="real",
units=self.real_waveform_units,
)
if self.write_wa_waveforms:
write_cut_waveforms(
self.run,
event,
self.cut_waveform_format,
pre_cut=self.pre_cut,
post_cut=self.post_cut,
waveform_type="wa",
units=self.wa_waveform_units,
)
del event, marginalised_coa_map
logging.info(util.log_spacer)
@util.timeit("info")
def _compute(self, data, event=None):
"""
Compute 3-D coalescence between two time stamps.
Parameters
----------
data : :class:`~quakemigrate.io.data.WaveformData` object
Light class encapsulating data returned by an archive query.
Returns
-------
times : `numpy.ndarray` of `obspy.UTCDateTime` objects, shape(nsamples)
Timestamps for the coalescence data.
max_coa : `numpy.ndarray` of floats, shape(nsamples)
Coalescence value through time.
max_coa_n : `numpy.ndarray` of floats, shape(nsamples)
Normalised coalescence value through time.
coord : `numpy.ndarray` of floats, shape(nsamples)
Location of maximum coalescence through time in input projection space.
map4d : `numpy.ndarry`, shape(nx, ny, nz, nsamp), optional
4-D coalescence map.
"""
# --- Calculate continuous coalescence within 3-D volume ---
onsets, onset_data = self.onset.calculate_onsets(data)
try:
traveltimes = self.lut.serve_traveltimes(
onset_data.sampling_rate, onset_data.availability
)
except KeyError as e:
raise util.LUTPhasesException(
f"Attempting to migrate phases {onset_data.phases}; but traveltimes "
f"for {e} not found in the LUT. Please create a new lookup table with "
f"phases={onset_data.phases}"
)
# Here fsmp and lsmp are used to calculate the length of map4d from the shape of
# the onset functions --> need to use onset sampling_rate, not scan rate.
fsmp = util.time2sample(self.pre_pad, onset_data.sampling_rate)
lsmp = util.time2sample(self.post_pad, onset_data.sampling_rate)
avail = np.sum([value for _, value in onset_data.availability.items()])
map4d = migrate(onsets, traveltimes, fsmp, lsmp, avail, self.threads)
# --- Find continuous peak coalescence in 3-D volume ---
max_coa, max_coa_n, max_idx = find_max_coa(map4d, self.threads)
coord = self.lut.index2coord(max_idx, unravel=True)
if self.run.stage == "detect":
del map4d
time = data.starttime + self.pre_pad
return time, max_coa, max_coa_n, coord, onset_data
else:
times = event.mw_times(self.scan_rate)
return times, max_coa, max_coa_n, coord, map4d, onset_data
@util.timeit("info")
def _read_event_waveform_data(self, w_beg, w_end):
"""
Read waveform data for a triggered event.
Parameters
----------
w_beg : `obpsy.UTCDateTime` object
Timestamp from which to read waveform data.
w_end : `obspy.UTCDateTime` object
Timestamp up to which to read waveform data.
Returns
-------
data : :class:`~quakemigrate.io.data.WaveformData` object
Light class encapsulating data returned by an archive query.
"""
# Extra pre- and post-pad default to 0.
pre_pad = post_pad = 0.0
# If calculating magnitudes, read in padding required for amplitude
# measurements.
if self.mags:
pre_pad, post_pad = self.mags.amp.pad(
self.marginal_window, self.lut.max_traveltime, self.lut.fraction_tt
)
# If a specific pre / post cut has been requested by the user,
# check which is bigger.
if self.pre_cut:
pre_pad = max(pre_pad, self.pre_cut)
if self.post_cut:
post_pad = max(post_pad, self.post_cut)
# Trim the pre_pad and post_pad to avoid cutting more data than we need; only
# subtract 1*marginal_window so that if the event otime moves by this much (the
# maximum allowed) from the triggered event time, we still have the correct
# window of data to apply the pre_cut.
pre_pad = max(0.0, pre_pad - self.marginal_window - self.pre_pad)
post_pad = max(0.0, post_pad - self.marginal_window - self.post_pad)
logging.debug(f"{w_beg}, {w_end}, {pre_pad}, {post_pad}")
return self.archive.read_waveform_data(w_beg, w_end, pre_pad, post_pad)
@util.timeit("info")
def _calculate_location(self, event):
"""
Marginalise the 4-D coalescence grid and calculate a set of locations and
associated uncertainties by:
(1) calculating the covariance of the entire coalescence map;
(2) smoothing and fitting a 3-D Gaussian function and ..
(3) fitting a 3-D spline function ..
to a region around the maximum coalescence location in the
marginalised 3-D coalescence map.
Parameters
----------
event : :class:`~quakemigrate.io.event.Event` object
Light class encapsulating waveforms, coalescence information, picks and
location information for a given event.
Returns
-------
coa_map : array-like
Marginalised 3-D coalescence map.
"""
# --- Marginalise and normalise the coalescence grid ---
coa_map = np.sum(event.map4d, axis=-1)
coa_map = coa_map / np.nanmax(coa_map)
# --- Determine best-fitting interpolated spline location ---
event.add_spline_location(self._splineloc(np.copy(coa_map)))
# --- Determine best-fitting Gaussian location and uncertainty ---
smoothed_coa_map = self._gaufilt3d(np.copy(coa_map))
event.add_gaussian_location(*self._gaufit3d(smoothed_coa_map))
# --- Determine global covariance location and uncertainty ---
event.add_covariance_location(*self._covfit3d(np.copy(coa_map)))
return coa_map
@util.timeit()
def _splineloc(self, coa_map, win=5, upscale=10):
"""
Fit a 3-D spline function to a region around the maximum coalescence in the
marginalised coalescence map and interpolate by factor `upscale` to return a
sub-grid maximum coalescence location.
Parameters
----------
coa_map : array-like
Marginalised 3-D coalescence map.
win : int
Window of grid nodes (+/-(win-1)//2 in x, y and z) around max value in
coa_map to perform the fit over.
upscale : int
Upscaling factor to interpolate the fitted 3-D spline function by.
Returns
-------
location : array-like, [x, y, z]
Max coalescence location from spline interpolation.
"""
# Get shape of 3-D coalescence map
nx, ny, nz = coa_map.shape
n = np.array([nx, ny, nz])
# Find maximum coalescence location in grid
mx, my, mz = np.unravel_index(np.nanargmax(coa_map), coa_map.shape)
i = np.array([mx, my, mz])
# Determining window about maximum value and trimming coa grid
w2 = (win - 1) // 2
x1, y1, z1 = np.clip(i - w2, 0 * n, n)
x2, y2, z2 = np.clip(i + w2 + 1, 0 * n, n)
# If subgrid is not close to the edge
if (x2 - x1) == (y2 - y1) == (z2 - z1):
coa_map_trim = coa_map[x1:x2, y1:y2, z1:z2]
# Defining the original interpolation function
xo = np.linspace(0, coa_map_trim.shape[0] - 1, coa_map_trim.shape[0])
yo = np.linspace(0, coa_map_trim.shape[1] - 1, coa_map_trim.shape[1])
zo = np.linspace(0, coa_map_trim.shape[2] - 1, coa_map_trim.shape[2])
xog, yog, zog = np.meshgrid(xo, yo, zo)
interpgrid = Rbf(
xog.flatten(),
yog.flatten(),
zog.flatten(),
coa_map_trim.flatten(),
function="cubic",
)
# Creating the new interpolated grid
xx = np.linspace(
0, coa_map_trim.shape[0] - 1, (coa_map_trim.shape[0] - 1) * upscale + 1
)
yy = np.linspace(
0, coa_map_trim.shape[1] - 1, (coa_map_trim.shape[1] - 1) * upscale + 1
)
zz = np.linspace(
0, coa_map_trim.shape[2] - 1, (coa_map_trim.shape[2] - 1) * upscale + 1
)
xxg, yyg, zzg = np.meshgrid(xx, yy, zz)
# Interpolate spline function on new grid
coa_map_int = interpgrid(
xxg.flatten(), yyg.flatten(), zzg.flatten()
).reshape(xxg.shape)
# Calculate max coalescence location on interpolated grid
mxi, myi, mzi = np.unravel_index(
np.nanargmax(coa_map_int), coa_map_int.shape
)
mxi = mxi / upscale + x1
myi = myi / upscale + y1
mzi = mzi / upscale + z1
logging.debug(f"\t\tGridded loc: {mx} {my} {mz}")
logging.debug(f"\t\tSpline loc: {mxi} {myi} {mzi}")
# Run check that spline location is within grid-cell
if (abs(mx - mxi) > 1) or (abs(my - myi) > 1) or (abs(mz - mzi) > 1):
logging.debug(
"\tSpline warning: spline location outside grid "
"cell with maximum coalescence value"
)
location = self.lut.index2coord([[mxi, myi, mzi]])[0]
# Run check that spline location is within window
if (abs(mx - mxi) > w2) or (abs(my - myi) > w2) or (abs(mz - mzi) > w2):
logging.info(
"\t !!!! Spline error: location outside interpolation window !!!!"
)
logging.info("\t\t\tGridded Location returned")
location = self.lut.index2coord([[mx, my, mz]])[0]
else:
logging.info(
"\t !!!! Spline error: interpolation window crosses edge of grid !!!!"
)
logging.info("\t\t\tGridded Location returned")
location = self.lut.index2coord([[mx, my, mz]])[0]
return location
@util.timeit()
def _gaufit3d(self, coa_map, thresh=0.0, win=7):
"""
Fit a 3-D Gaussian function to a region around the maximum coalescence location
in the 3-D marginalised coalescence map: return expectation location and
associated uncertainty.
Parameters
----------
coa_map : array-like
Marginalised 3-D coalescence map.
thresh : float (between 0 and 1), optional
Cut-off threshold (percentile) to trim coa_map: only data above this
percentile will be retained.
win : int, optional
Window of grid nodes (+/-(win-1)//2 in x, y and z) around max value in
coa_map to perform the fit over.
Returns
-------
location : array-like, [x, y, z]
Expectation location from 3-D Gaussian fit.
uncertainty : array-like, [sx, sy, sz]
One sigma uncertainties on expectation location from 3-D Gaussian fit.
"""
# Get shape of 3-D coalescence map and max coalescence grid location
shape = coa_map.shape
ijk = np.unravel_index(np.nanargmax(coa_map), shape)
# Only use grid nodes above threshold value, and within the specified window
# around the coalescence peak
flag = np.logical_and(coa_map > thresh, self._mask3d(shape, ijk, win))
ix, iy, iz = np.where(flag)
# Subtract mean of entire 3-D coalescence map from the local grid window so it
# is better approximated by a Gaussian (which goes to zero at infinity)
coa_map = coa_map - np.nanmean(coa_map)
ls = [np.arange(n) for n in shape]
# Get ijk indices for points in the sub-grid
x, y, z = [L[idx] - i for L, idx, i in zip(ls, np.where(flag), ijk)]
X = np.c_[x * x, y * y, z * z, x * y, x * z, y * z, x, y, z, np.ones(len(ix))].T
Y = -np.log(np.clip(coa_map.astype(np.float64)[ix, iy, iz], 1e-300, np.inf))
X_inv = np.linalg.pinv(X)
P = np.matmul(Y, X_inv)
G = -np.array(
[2 * P[0], P[3], P[4], P[3], 2 * P[1], P[5], P[4], P[5], 2 * P[2]]
).reshape((3, 3))
H = np.array([P[6], P[7], P[8]])
loc = np.matmul(np.linalg.inv(G), H)
cx, cy, cz = loc
K = (
P[9]
- P[0] * cx**2
- P[1] * cy**2
- P[2] * cz**2
- P[3] * cx * cy
- P[4] * cx * cz
- P[5] * cy * cz
)
M = np.array(
[
P[0],
P[3] / 2,
P[4] / 2,
P[3] / 2,
P[1],
P[5] / 2,
P[4] / 2,
P[5] / 2,
P[2],
]
).reshape(3, 3)
egv, vec = np.linalg.eig(M)
sgm = np.sqrt(0.5 / np.clip(np.abs(egv), 1e-10, np.inf)) / 2
val = np.exp(-K)
csgm = np.sqrt(0.5 / np.clip(np.abs(M.diagonal()), 1e-10, np.inf))
# Convert back to whole-grid coordinates
gau_3d = [loc + ijk, vec, sgm, csgm, val]
# Convert grid location to XYZ / coordinates
location = [[gau_3d[0][0], gau_3d[0][1], gau_3d[0][2]]]
location = self.lut.index2coord(location)[0]
uncertainty = sgm * self.lut.node_spacing
return location, uncertainty
@util.timeit()
def _covfit3d(self, coa_map, thresh=0.90, win=None):
"""
Calculate the 3-D covariance of the marginalised coalescence map, filtered above
a percentile threshold `thresh`. Optionally can also perform the fit on a
sub-window of the grid around the maximum coalescence location.
Parameters
----------
coa_map : array-like
Marginalised 3-D coalescence map.
thresh : float (between 0 and 1), optional
Cut-off threshold (fractional percentile) to trim coa_map; only data above
this percentile will be retained.
win : int, optional
Window of grid nodes (+/-(win-1)//2 in x, y and z) around max value in
coa_map to perform the fit over.
Returns
-------
location : array-like, [x, y, z]
Expectation location from covariance fit.
uncertainty : array-like, [sx, sy, sz]
One sigma uncertainties on expectation location from covariance fit.
"""
# Get shape of 3-D coalescence map and max coalesence grid location
shape = coa_map.shape
ijk = np.unravel_index(np.nanargmax(coa_map), coa_map.shape)
# If window is specified, clip the grid to only look here.
if win:
flag = np.logical_and(coa_map > thresh, self._mask3d(shape, ijk, win))
else:
flag = np.where(coa_map > thresh, True, False)
# Treat the coalescence values in the grid as the sample weights
sw = coa_map.flatten()
sw[~flag.flatten()] = np.nan
ssw = np.nansum(sw)
# Get the x, y and z samples on which to perform the fit
nc = self.lut.node_count
ns = self.lut.node_spacing
grid = np.meshgrid(*[np.arange(n) for n in nc], indexing="ij")
xs, ys, zs = [g.flatten() * size for g, size in zip(grid, ns)]
# Expectation values:
xe, ye, ze = [np.nansum(sw * s) / ssw for s in [xs, ys, zs]]
# Covariance matrix:
cov_matrix = np.zeros((3, 3))
cov_matrix[0, 0] = np.nansum(sw * (xs - xe) ** 2) / ssw
cov_matrix[1, 1] = np.nansum(sw * (ys - ye) ** 2) / ssw
cov_matrix[2, 2] = np.nansum(sw * (zs - ze) ** 2) / ssw
cov_matrix[0, 1] = np.nansum(sw * (xs - xe) * (ys - ye)) / ssw
cov_matrix[1, 0] = cov_matrix[0, 1]
cov_matrix[0, 2] = np.nansum(sw * (xs - xe) * (zs - ze)) / ssw
cov_matrix[2, 0] = cov_matrix[0, 2]
cov_matrix[1, 2] = np.nansum(sw * (ys - ye) * (zs - ze)) / ssw
cov_matrix[2, 1] = cov_matrix[1, 2]
location_xyz = self.lut.ll_corner + np.array([xe, ye, ze])
location = self.lut.coord2grid(location_xyz, inverse=True)[0]
uncertainty = np.diag(np.sqrt(abs(cov_matrix)))
return location, uncertainty
@util.timeit()
def _gaufilt3d(self, map3d, sgm=0.8, shp=None):
"""
Smooth the 3-D marginalised coalescence map using a 3-D Gaussian function to
enable a better Gaussian fit to the data to be calculated.
Parameters
----------
map3d : array-like
Marginalised 3-D coalescence map.
sgm : float
Sigma value (in grid nodes) for the 3-D Gaussian filter function; bigger
sigma leads to more aggressive (long wavelength) smoothing.
shp : array-like, optional
Shape of volume.
Returns
-------
smoothed_map : array-like
Gaussian smoothed 3-D coalescence map.
"""
if shp is None:
shp = map3d.shape
# Construct 3-D Gaussian filter
flt = util.gaussian_3d(*shp, sgm)
# Convolve map_3d and 3-D Gaussian filter
smoothed_map = fftconvolve(map3d, flt, mode="same")
# Mirror and convolve again (to avoid "phase-shift")
smoothed_map = smoothed_map[::-1, ::-1, ::-1] / np.nanmax(smoothed_map)
smoothed_map = fftconvolve(smoothed_map, flt, mode="same")
# Final mirror and normalise
smoothed_map = smoothed_map[::-1, ::-1, ::-1] / np.nanmax(smoothed_map)
return smoothed_map
def _mask3d(self, n, i, window):
"""
Creates a mask that can be applied to a 3-D grid.
Parameters
----------
n : array-like, int
Shape of grid.
i : array-like, int
Location of node around which to mask.
window : int
Size of window around node to mask - window of grid nodes is
+/-(win-1)//2 in x, y and z.
Returns
-------
mask : array-like
Masking array.
"""
n = np.array(n)
i = np.array(i)
w2 = (window - 1) // 2
x1, y1, z1 = np.clip(i - w2, 0 * n, n)
x2, y2, z2 = np.clip(i + w2 + 1, 0 * n, n)
mask = np.zeros(n, dtype=bool)
mask[x1:x2, y1:y2, z1:z2] = True
return mask
# --- Deprecation/Future handling ---
@property
def scan_rate(self):
"""Get scan_rate"""
return self._scan_rate
@scan_rate.setter
def scan_rate(self, value):
if value is None:
return
elif value == self.onset.sampling_rate:
self._scan_rate = value
return
print(
"Warning: Parameter not yet user-configurable. Currently\n"
"the scan sampling rate must be the same as the onset sampling\n"
f"rate, which you have set to {self.scan_rate} Hz. Please\n"
"contact the QuakeMigrate developers for further info."
)
@property
def sampling_rate(self):
"""Get sampling_rate"""
return self.scan_rate
@sampling_rate.setter
def sampling_rate(self, value):
if value is None:
return
print(
"Warning: Parameter name has changed - continuing. Currently\n"
"the scan sampling rate must be the same as the onset sampling\n"
f"rate, which you have set to {self.scan_rate} Hz. Please\n"
"contact the QuakeMigrate developers for further info."
)
@property
def time_step(self):
"""Handler for deprecated attribute name 'time_step'"""
return self.timestep
@time_step.setter
def time_step(self, value):
if value is None:
return
print(
"FutureWarning: Parameter name has changed - continuing.\n"
"To remove this message, change:\n"
"\t'time_step' -> 'timestep'"
)
self.timestep = value
@property
def n_cores(self):
"""Handler for deprecated attribute name 'n_cores'"""
return self.threads
@n_cores.setter
def n_cores(self, value):
if value is None:
return
print(
"FutureWarning: Parameter name has changed - continuing.\n"
"To remove this message, change:\n"
"\t'n_cores' -> 'threads'"
)
self.threads = value