Source code for quakemigrate.signal.pickers.gaussian

# -*- coding: utf-8 -*-
"""
The default seismic phase picking class - fits a 1-D Gaussian to the calculated onset
functions.

:copyright:
    2020–2024, QuakeMigrate developers.
:license:
    GNU General Public License, Version 3
    (https://www.gnu.org/licenses/gpl-3.0.html)

"""

import logging

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.optimize import curve_fit

from quakemigrate.plot.phase_picks import pick_summary
import quakemigrate.util as util
from .base import PhasePicker


[docs]class GaussianPicker(PhasePicker): """ This class details the default method of making phase picks shipped with QuakeMigrate, namely fitting a 1-D Gaussian function to the onset function for each station and phase. Attributes ---------- phase_picks : dict "GAU_P" : array-like Numpy array stack of Gaussian pick info (each as a dict) for P phase "GAU_S" : array-like Numpy array stack of Gaussian pick info (each as a dict) for S phase threshold_method : {"MAD", "percentile"} Which method to use to calculate the pick threshold; a percentile of the data outside the pick windows (e.g. 0.99 = 99th percentile) or a multiple of the Median Absolute Deviation of the signal outside the pick windows. Default uses the MAD method. percentile_pick_threshold : float, optional Picks will only be made if the onset function exceeds this percentile of the noise level (amplitude of onset function outside pick windows). (Default: 1.0) mad_pick_threshold : float, optional Picks will only be made if the onset function exceeds its median value plus this multiple of the MAD (calculated from the onset data outside the pick windows). (Default: 8) plot_picks : bool Toggle plotting of phase picks. write_seed_ids : bool Toggle writing the SEED id's of the traces that have contributed to a given phase pick within the .picks file. Default: False. Methods ------- pick_phases(event, lut, run) Picks phase arrival times for located events by fitting a 1-D Gaussian function to the P and/or S onset functions """ DEFAULT_GAUSSIAN_FIT = {"popt": 0, "xdata": 0, "xdata_dt": 0, "PickValue": -1} def __init__(self, onset=None, **kwargs): """Instantiate the GaussianPicker object.""" super().__init__(**kwargs) self.onset = onset # --- Get pick method and threshold --- self.threshold_method = kwargs.get("threshold_method", "MAD") if self.threshold_method == "percentile": self.percentile_pick_threshold = kwargs.get( "percentile_pick_threshold", 1.0 ) elif self.threshold_method == "MAD": self.mad_pick_threshold = kwargs.get("mad_pick_threshold", 8.0) else: raise util.InvalidPickThresholdMethodException # Handle deprecated `pick_threshold` if kwargs.get("pick_threshold"): self.pick_threshold = kwargs["pick_threshold"] self.plot_picks = kwargs.get("plot_picks", False) self.write_seed_ids = kwargs.get("write_seed_ids", False) if "fraction_tt" in kwargs.keys(): print( "FutureWarning: Fraction of traveltime argument moved to lookup tables." "\nIt remains possible to override the fraction of traveltime here, if " "required, to further\ntune the phase picker." ) self._fraction_tt = kwargs.get("fraction_tt") def __str__(self): """Returns a short summary string of the GaussianPicker.""" str_ = "\tPhase picking by fitting a 1-D Gaussian to onsets\n" if self.threshold_method == "percentile": str_ += f"\t\tPercentile threshold = {self.percentile_pick_threshold}\n" elif self.threshold_method == "MAD": str_ += f"\t\tMAD multiplier = {self.mad_pick_threshold}\n" if self._fraction_tt is not None: str_ += f"\t\tSearch window = {self._fraction_tt*100}% of traveltime\n" return str_
[docs] @util.timeit("info") def pick_phases(self, event, lut, run): """ Picks phase arrival times for located events. Parameters ---------- event : :class:`~quakemigrate.io.event.Event` object Light class encapsulating waveforms, coalescence information and location information for a given event. lut : :class:`~quakemigrate.lut.lut.LUT` object Contains the traveltime lookup tables for seismic phases, computed for some pre-defined velocity model. run : :class:`~quakemigrate.io.core.Run` object Light class encapsulating i/o path information for a given run. Returns ------- event : :class:`~quakemigrate.io.event.Event` object Event object provided to pick_phases(), but now with phase picks! picks : `pandas.DataFrame` DataFrame that contains the measured picks with columns: ["Name", "Phase", "ModelledTime", "PickTime", "PickError", "SNR"] Each row contains the phase pick from one station/phase. """ # Onsets are recalculated without logging _, onset_data = self.onset.calculate_onsets( event.data, timespan=4 * event.marginal_window ) if self._fraction_tt is None: fraction_tt = lut.fraction_tt else: fraction_tt = self._fraction_tt e_ijk = lut.index2coord(event.hypocentre, inverse=True)[0] # Pre-define pick DataFrame and fit params and pick windows dicts p_idx = np.arange(sum([len(v) for _, v in onset_data.onsets.items()])) columns = [ "Station", "Phase", "ModelledTime", "PickTime", "PickError", "SNR", "Residual", ] if self.write_seed_ids: columns = [columns[0], "SEED_ids", *columns[1:]] picks = pd.DataFrame(index=p_idx, columns=columns) gaussfits = {} pick_windows = {} idx = 0 for station, onsets in onset_data.onsets.items(): for phase, onset in onsets.items(): traveltime = lut.traveltime_to(phase, e_ijk, station)[0] pick_windows.setdefault(station, {}).update( { phase: self._determine_window( event, onset_data, traveltime, fraction_tt ) } ) n_samples = len(onset) self._distinguish_windows( pick_windows[station], list(onsets.keys()), n_samples ) for phase, onset in onsets.items(): # Find threshold from 'noise' part of onset pick_threshold = self._find_pick_threshold( onset, pick_windows[station], self.threshold_method ) logging.debug(f"\t\tPicking {phase} at {station}...") fit, *pick = self._fit_gaussian( onset, onset_data.sampling_rate, self.onset.gaussian_halfwidth(phase), onset_data.starttime, pick_threshold, pick_windows[station][phase], ) gaussfits.setdefault(station, {}).update({phase: fit}) traveltime = lut.traveltime_to(phase, e_ijk, station)[0] model_time = event.otime + traveltime if pick[0] == -1: residual = -1 else: residual = pick[0] - model_time if self.write_seed_ids: stream = onset_data.filtered_waveforms.select( station=station, channel=self.onset.channel_maps[phase], ) seed_ids = sorted(set([tr.id for tr in stream])) picks.iloc[idx] = [ station, seed_ids, phase, model_time, *pick, residual, ] else: picks.iloc[idx] = [station, phase, model_time, *pick, residual] idx += 1 event.add_picks(picks, gaussfits=gaussfits, pick_windows=pick_windows) self.write(run, event.uid, picks) if self.plot_picks: logging.info("\t\tPlotting picks...") for station, onsets in onset_data.onsets.items(): traveltimes = [ lut.traveltime_to(phase, e_ijk, station)[0] for phase in onsets.keys() ] self.plot(event, station, onset_data, picks, traveltimes, run) return event, picks
def _determine_window(self, event, onset_data, tt, fraction_tt): """ Determine phase pick window upper and lower bounds based on the event marginal window and a set percentage of the phase travel time. Parameters ---------- event : :class:`~quakemigrate.io.event.Event` object Light class to encapsulate information about an event, including origin time, location and waveform data. onset_data : :class:`~quakemigrate.signal.onsets.base.OnsetData` object Light class encapsulating data generated during onset calculation. tt : float Traveltime for the requested phase. fraction_tt : float Defines width of time window around expected phase arrival time in which to search for a phase pick as a function of the traveltime from the event location to that station -- should be an estimate of the uncertainty in the velocity model. Returns ------- lower_idx : int Index of lower bound for the phase pick window. arrival_idx : int Index of the modelled phase arrival time. upper_idx : int Index of upper bound for the phase pick window. """ arrival_idx = util.time2sample( event.otime + tt - onset_data.starttime, onset_data.sampling_rate ) # Add length of marginal window to this and convert to index samples = util.time2sample( tt * fraction_tt + event.marginal_window, onset_data.sampling_rate ) return [arrival_idx - samples, arrival_idx, arrival_idx + samples] def _distinguish_windows(self, windows, phases, samples): """ Ensure pick windows do not overlap - if they do, set the upper bound of window one and the lower bound of window two to be the midpoint index of the two modelled phase arrival times. Parameters ---------- windows : dict Dictionary of windows with phases as keys. phases : list of str Phases being migrated. samples : int Total number of samples in the onset function. """ # Handle first key first_idx = windows[phases[0]][0] windows[phases[0]][0] = 0 if first_idx < 0 else first_idx # Handle keys pairwise for p1, p2 in util.pairwise(phases): p1_window, p2_window = windows[p1], windows[p2] mid_idx = int((p1_window[1] + p2_window[1]) / 2) windows[p1][2] = min(mid_idx, p1_window[2]) windows[p2][0] = max(mid_idx, p2_window[0]) # Handle last key last_idx = windows[phases[-1]][2] windows[phases[-1]][2] = samples if last_idx > samples else last_idx def _find_pick_threshold(self, onset, windows, method): """ Determine a pick threshold from the onset data outside the pick windows. Parameters ---------- onset : `numpy.ndarray` of `numpy.double` Onset (characteristic) function. windows : list of int Indexes of the lower window bound, the phase arrival, and the upper window bound. method : {"percentile", "MAD"} Method used to calculate the pick threshold from the noise data. Return ------ pick_threshold : float The threshold calculated from the onset data outside the pick windows, according to the specified `method`. """ onset_noise = onset.copy() for _, window in windows.items(): onset_noise[window[0] : window[2]] = -1 # Remove data during pick windows, and data set to 1 (in onset function taper # pad windows) onset_noise = onset_noise[onset_noise > 1] if method == "percentile": pick_threshold = np.percentile( onset_noise, self.percentile_pick_threshold * 100 ) elif method == "MAD": med = np.median(onset_noise) mad = util.calculate_mad(onset_noise) pick_threshold = med + (mad * self.mad_pick_threshold) return pick_threshold def _fit_gaussian( self, onset, sampling_rate, halfwidth, starttime, pick_threshold, window ): """ Fit a Gaussian to the onset function in order to make a time pick with an associated uncertainty. Uses the amplitude and timing of the onset function peak and some knowledge of the onset function parameters (e.g. short-term average window length, for the :class:`~quakemigrate.signal.onsets.stalta.STALTAOnset`) to make an initial estimate of a gaussian fit to the onset function. Parameters ---------- onset : `numpy.ndarray` of `numpy.double` Onset function. sampling_rate : int Sampling rate of the onset function. halfwidth : float Initial estimate for the Gaussian half-width based on some function of the onset function parameters. starttime : `obspy.UTCDateTime` object Timestamp for first sample of the onset function. pick_threshold : float Value above which to threshold data based on noise. window : list of int, [start, arrival, end] Indices for the window start, modelled phase arrival, and window end. Returns ------- gaussian_fit : dictionary Gaussian fit parameters: {"popt": popt, "xdata": x_data, "xdata_dt": x_data_dt, "PickValue": max_onset, "PickThreshold": pick_threshold} max_onset : float Amplitude of Gaussian fit to onset function, i.e. the SNR. sigma : float Sigma of Gaussian fit to onset function, i.e. the pick uncertainty. mean : `obspy.UTCDateTime` Mean of Gaussian fit to onset function, i.e. the pick time. """ # Trim the onset function in the pick window onset_signal = onset[window[0] : window[2]] logging.debug(f"\t\t win_min: {window[0]}, win_max: {window[2]}") # Identify the peak in the windowed onset that exceeds this threshold # AND contains the maximum value in the window (i.e. the 'true' peak). try: peak_idxs = self._find_peak(onset_signal, pick_threshold) # add an extra sample either side for the curve fitting. This makes the # fitting more stable, and guarantees at least 3 samples --> avoids an # under-constrained optimisation (3 fitting params). padded_peak_idxs = [peak_idxs[0] - 1, peak_idxs[1] + 1] padded_peak_idxs = [window[0] + p for p in padded_peak_idxs] logging.debug( f"\t\t padded_peak_idxmin: {padded_peak_idxs[0]}," f" padded_peak_idxmax: {padded_peak_idxs[1]}" ) x_data = np.arange(*padded_peak_idxs) / sampling_rate y_data = onset[padded_peak_idxs[0] : padded_peak_idxs[1]] except util.NoOnsetPeak as e: logging.debug(e.msg) return self._pick_failure(pick_threshold) # Try to fit a 1-D Gaussian # Initial parameters (p0) are: # height = max value of onset function # mean = time of max value # sigma = `halfwidth` - determined from onset function parameters p0 = [ max(y_data), (padded_peak_idxs[0] + np.argmax(y_data)) / sampling_rate, halfwidth / sampling_rate, ] try: popt, _ = curve_fit(util.gaussian_1d, x_data, y_data, p0) except (ValueError, RuntimeError) as e: # curve_fit can fail for a number of reasons - primarily if the input data # contains nans or if the least-squares minimisation fails. A warning may # also be emitted to stdout if the covariance of the parameters could not # be estimated - this is suppressed by default in scan.py. logging.debug(f"\t\t Failed curve_fit:\n{e}\n\t\t Continuing...") return self._pick_failure(pick_threshold) except TypeError as e: logging.debug( "\t\t Failed curve_fit - too few input data?" f"{e}\n\t\t Continuing..." ) # Unpack results: # popt = [height, mean (seconds), sigma (seconds)] max_onset = popt[0] mean = starttime + float(popt[1]) sigma = np.absolute(popt[2]) # Check pick mean is within the pick window. if not window[0] < popt[1] * sampling_rate < window[2]: logging.debug("\t\t Pick mean out of bounds - continuing.") return self._pick_failure(pick_threshold) gaussian_fit = { "popt": popt, "xdata": x_data, "xdata_dt": np.array([starttime + x for x in x_data]), "PickValue": max_onset, "PickThreshold": pick_threshold, } return gaussian_fit, mean, sigma, max_onset def _pick_failure(self, pick_threshold): """ Short utility function to produce the default values when a pick cannot be made. Parameters ---------- pick_threshold : float Pick threshold value for onset data. Returns ------- gaussian_fit : dictionary The default Gaussian fit dictionary, with relevant pick threshold value. max_onset : int A default of -1 value to indicate failure. sigma : int A default of -1 value to indicate failure. mean : int A default of -1 value to indicate failure. """ gaussian_fit = self.DEFAULT_GAUSSIAN_FIT.copy() gaussian_fit["PickThreshold"] = pick_threshold mean = sigma = max_onset = -1 return gaussian_fit, mean, sigma, max_onset def _find_peak(self, windowed_onset, pick_threshold): """ Identify peaks, if any, within the windowed onset that exceed the specified threshold value. Of those peaks, this function seeks the one that contains the maximum value within the window, i.e. the 'true' peak - see the diagram below. v * * * * * * | * * * * | |---------------#-------#----| | * ** * | Parameters ---------- windowed_onset : `numpy.ndarray` of `numpy.double` The onset function within the picking window. pick_threshold : float Value above which to search for peaks in the onset data. Returns ------- true_peak_idx : [int, int] Start and end index values for the 'true' peak, with +1 added to the last index so that all of the values above the threshold are returned when slicing by index. Raises ------ util.NoOnsetPeak If no onset data, or only a single sample, exceeds the pick threshold. """ exceedence = np.where(windowed_onset > pick_threshold)[0] if len(exceedence) == 0: raise util.NoOnsetPeak(pick_threshold) # Identify all peaks - there are possibly multiple distinct periods of data that # exceed the threshold. The following command simply seeks non-consecutive index # values in the array of points that exceed the threshold and splits the array # at these points into 'peaks'. peaks = np.split(exceedence, np.where(np.diff(exceedence) != 1)[0] + 1) # Identify the peak that contains the true peak (maximum) true_maximum = np.argmax(windowed_onset) for i, peak in enumerate(peaks): if np.any(peak == true_maximum): break # Check if there is more than a single sample above the threshold if len(peaks[i]) < 2: raise util.NoOnsetPeak(pick_threshold) # Grab the peak and return the start/end index values. NOTE: + 1 is required so # that the last sample is included when slicing by index true_peak_idxs = [peaks[i][0], peaks[i][-1] + 1] return true_peak_idxs
[docs] @util.timeit() def plot(self, event, station, onset_data, picks_df, traveltimes, run): """ Plot figure showing the filtered traces for each data component and the onset functions calculated from them (P and/or S) for each station. The search window to make a phase pick is displayed, along with the dynamic pick threshold, the phase pick time and its uncertainty (if made) and the Gaussian fit to the onset function. Parameters ---------- event : :class:`~quakemigrate.io.event.Event` object Light class to encapsulate information about an event, including origin time, location and waveform data. station : str Station name. onset_data : :class:`~quakemigrate.signal.onsets.base.OnsetData` object Light class encapsulating data generated during onset calculation. picks_df : `pandas.DataFrame` object DataFrame that contains the measured picks with columns: ["Name", "Phase", "ModelledTime", "PickTime", "PickError", "SNR"] Each row contains the phase pick from one station/phase. traveltimes : list of float Modelled traveltimes from the event hypocentre to the station for each phase to be plotted. run : :class:`~quakemigrate.io.core.Run` object Light class encapsulating i/o path information for a given run. """ fpath = run.path / f"locate/{run.subname}/pick_plots/{event.uid}" fpath.mkdir(exist_ok=True, parents=True) onsets = onset_data.onsets[station] channel_maps = onset_data.channel_maps waveforms = onset_data.filtered_waveforms.select(station=station) # Check if any data available to plot if not bool(waveforms): return picks = picks_df[picks_df["Station"] == station].reset_index(drop=True) windows = event.picks["pick_windows"][station] # Call subroutine to plot phase pick figure fig = pick_summary( event, station, waveforms, picks, onsets, channel_maps, traveltimes, windows ) fstem = f"{event.uid}_{station}" file = (fpath / fstem).with_suffix(".pdf") plt.savefig(file) plt.close(fig)
@property def fraction_tt(self): """Handler for deprecated attribute 'fraction_tt'""" return self._fraction_tt @fraction_tt.setter def fraction_tt(self, value): print( "FutureWarning: Fraction of traveltime attribute has moved to lookup table." "\nOverriding..." ) self._fraction_tt = value @property def pick_threshold(self): """Handler for deprecated attribute 'pick_threshold'""" @pick_threshold.setter def pick_threshold(self, value): raise AttributeError( "The 'pick_threshold' attribute has been deprecated. Select a threshold " "method from 'percentile' or 'MAD', and see the docs for the syntax for " "the appropriate threshold." )