Source code for quakemigrate.signal.pickers.gaussian

# -*- coding: utf-8 -*-
"""
The default seismic phase picking class - fits a 1-D Gaussian to the calculated
onset functions.

:copyright:
    2020, QuakeMigrate developers.
:license:
    GNU General Public License, Version 3
    (https://www.gnu.org/licenses/gpl-3.0.html)

"""

import logging

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.optimize import curve_fit

from quakemigrate.plot.phase_picks import pick_summary
import quakemigrate.util as util
from .base import PhasePicker


[docs]class GaussianPicker(PhasePicker): """ This class details the default method of making phase picks shipped with QuakeMigrate, namely fitting a 1-D Gaussian function to the STA/LTA onset function trace for each station. Attributes ---------- phase_picks : dict "GAU_P" : array-like Numpy array stack of Gaussian pick info (each as a dict) for P phase "GAU_S" : array-like Numpy array stack of Gaussian pick info (each as a dict) for S phase pick_threshold : float (between 0 and 1) Picks will only be made if the onset function exceeds this percentile of the noise level (average amplitude of onset function outside pick windows). Recommended starting value: 1.0 plot_picks : bool Toggle plotting of phase picks. Methods ------- pick_phases(data, lut, event, event_uid, output) Picks phase arrival times for located earthquakes by fitting a 1-D Gaussian function to the P and S onset functions """ DEFAULT_GAUSSIAN_FIT = {"popt": 0, "xdata": 0, "xdata_dt": 0, "PickValue": -1} def __init__(self, onset=None, **kwargs): """Instantiate the GaussianPicker object.""" super().__init__(**kwargs) self.onset = onset self.pick_threshold = kwargs.get("pick_threshold", 1.0) self.marginal_window = kwargs.get("marginal_window", 1.0) self.sampling_rate = None self.plot_picks = kwargs.get("plot_picks", False) if "fraction_tt" in kwargs.keys(): print("FutureWarning: Fraction of traveltime argument moved to " "lookup tables.\nIt remains possible to override the " "fraction of traveltime here, if required, to further\ntune" "the phase picker.") self._fraction_tt = kwargs.get("fraction_tt") def __str__(self): """Returns a short summary string of the GaussianPicker.""" str_ = ("\tPhase picking by fitting a 1-D Gaussian to onsets\n" f"\t\tPick threshold = {self.pick_threshold}\n" f"\t\tMarginal window = {self.marginal_window} s\n") if self._fraction_tt is not None: str_ += (f"\t\tSearch window = {self._fraction_tt*100}% of " "traveltime\n") return str_
[docs] @util.timeit("info") def pick_phases(self, event, lut, run): """ Picks phase arrival times for located earthquakes. Parameters ---------- event : :class:`~quakemigrate.io.event.Event` object Contains pre-processed waveform data on which to perform picking, the event location, and a unique identifier. lut : :class:`~quakemigrate.lut.LUT` object Contains the traveltime lookup tables for seismic phases, computed for some pre-defined velocity model. run : :class:`~quakemigrate.io.Run` object Light class encapsulating i/o path information for a given run. Returns ------- event : :class:`~quakemigrate.io.event.Event` object Event object provided to pick_phases(), but now with phase picks! picks : `pandas.DataFrame` DataFrame that contains the measured picks with columns: ["Name", "Phase", "ModelledTime", "PickTime", "PickError", "SNR"] Each row contains the phase pick from one station/phase. """ # Onsets are recalculated without logging _ = self.onset.calculate_onsets(event.data, log=False) if self._fraction_tt is None: fraction_tt = lut.fraction_tt else: fraction_tt = self._fraction_tt e_ijk = lut.index2coord(event.hypocentre, inverse=True)[0] ptt = lut.traveltime_to("P", e_ijk) stt = lut.traveltime_to("S", e_ijk) # Pre-define pick DataFrame and fit params and pick windows dicts picks = pd.DataFrame(index=np.arange(0, 2 * len(event.data.p_onset)), columns=["Station", "Phase", "ModelledTime", "PickTime", "PickError", "SNR"]) gaussfits = {} pick_windows = {} for i, station in lut.station_data.iterrows(): gaussfits[station["Name"]] = {} pick_windows[station["Name"]] = {} for j, phase in enumerate(["P", "S"]): if phase == "P": onset = event.data.p_onset[i] model_time = event.otime + ptt[i] else: onset = event.data.s_onset[i] model_time = event.otime + stt[i] gau, max_onset, pick, pick_error, window = self._fit_gaussian( onset, phase, event.data.starttime, event.otime, ptt[i], stt[i], fraction_tt) gaussfits[station["Name"]][phase] = gau pick_windows[station["Name"]][phase] = window picks.iloc[2*i+j] = [station["Name"], phase, model_time, pick, pick_error, max_onset] event.add_picks(picks, gaussfits=gaussfits, pick_windows=pick_windows, pick_threshold=self.pick_threshold) self.write(run, event.uid, picks) if self.plot_picks: logging.info("\t\tPlotting picks...") self.plot(event, lut, picks, list(zip(ptt, stt)), run) return event, picks
def _fit_gaussian(self, onset, phase, starttime, otime, ptt, stt, fraction_tt): """ Fit a Gaussian to the onset function in order to make a time pick with an associated uncertainty. Uses the same STA/LTA onset (characteristic) function as is migrated through the grid to calculate the earthquake location. Uses knowledge of approximate pick index, the short-term average onset window and the signal sampling rate to make an initial estimate of a gaussian fit to the onset function. Parameters ---------- onset : array-like Onset (characteristic) function. phase : str Phase name ("P" or "S"). starttime : UTCDateTime object Start time of data (w_beg). p_arr : UTCDateTime object Time when P phase is expected to arrive based on best location. s_arr : UTCDateTime object Time when S phase is expected to arrive based on best location. ptt : UTCDateTime object Traveltime of P phase. stt : UTCDateTime object Traveltime of S phase. fraction_tt : float Defines width of time window around expected phase arrival time in which to search for a phase pick as a function of the traveltime from the event location to that station -- should be an estimate of the uncertainty in the velocity model. Returns ------- gaussian_fit : dictionary gaussian fit parameters: {"popt": popt, "xdata": x_data, "xdata_dt": x_data_dt, "PickValue": max_onset, "PickThreshold": threshold} max_onset : float amplitude of gaussian fit to onset function sigma : float sigma of gaussian fit to onset function mean : UTCDateTime mean of gaussian fit to onset function == pick time """ p_arr, s_arr = otime + ptt, otime + stt # Determine indices of P and S pick times pt_idx = int((p_arr - starttime) * self.sampling_rate) st_idx = int((s_arr - starttime) * self.sampling_rate) # Determine P and S pick window upper and lower bounds based on # (P-S)/2 -- either this or the next window definition will be # used depending on which is wider. pmin_idx = int(pt_idx - (st_idx - pt_idx) / 2) pmax_idx = int(pt_idx + (st_idx - pt_idx) / 2) smin_idx = int(st_idx - (st_idx - pt_idx) / 2) smax_idx = int(st_idx + (st_idx - pt_idx) / 2) # Check if index falls outside length of onset function; if so set # window to start/end at start/end of data. for idx in [pmin_idx, pmax_idx, smin_idx, smax_idx]: if idx < 0: idx = 0 if idx > len(onset): idx = len(onset) # Defining the bounds to search for the event over # Determine P and S pick window upper and lower bounds based on # set percentage of total travel time, plus marginal window # window based on self.fraction_tt of P/S travel time ptt *= fraction_tt stt *= fraction_tt # Add length of marginal window to this. Convert to index. P_idxmin_new = int(pt_idx - int((self.marginal_window + ptt) * self.sampling_rate)) P_idxmax_new = int(pt_idx + int((self.marginal_window + ptt) * self.sampling_rate)) S_idxmin_new = int(st_idx - int((self.marginal_window + stt) * self.sampling_rate)) S_idxmax_new = int(st_idx + int((self.marginal_window + stt) * self.sampling_rate)) # Setting so the search region can't be bigger than (P-S)/2: # compare the two window definitions; if (P-S)/2 window is # smaller then use this (to avoid picking the wrong phase). P_idxmin = np.max([pmin_idx, P_idxmin_new]) P_idxmax = np.min([pmax_idx, P_idxmax_new]) S_idxmin = np.max([smin_idx, S_idxmin_new]) S_idxmax = np.min([smax_idx, S_idxmax_new]) # Setting parameters depending on the phase if phase == "P": win_min = P_idxmin win_max = P_idxmax if phase == "S": win_min = S_idxmin win_max = S_idxmax logging.debug(f"Win_min = {win_min} ; Win_max = {win_max}") # Find index of maximum value of onset function in the appropriate # pick window max_onset = np.argmax(onset[win_min:win_max]) + win_min # Trim the onset function in the pick window onset_trim = onset[win_min:win_max] # Only keep the onset function outside the pick windows to # calculate the pick threshold onset_threshold = onset.copy() onset_threshold[P_idxmin:P_idxmax] = -1 onset_threshold[S_idxmin:S_idxmax] = -1 onset_threshold = onset_threshold[onset_threshold > -1] # Calculate the pick threshold: either user-specified percentile of # data outside pick windows, or 88th percentile within the relevant # pick window (whichever is bigger). threshold = np.percentile(onset_threshold, self.pick_threshold * 100) threshold_window = np.percentile(onset_trim, 88) threshold = np.max([threshold, threshold_window]) # Remove data within the pick window that is lower than the threshold tmp = (onset_trim - threshold).any() > 0 # If there is any data that meets this requirement... if onset[max_onset] >= threshold and tmp: exceedence = np.where((onset_trim - threshold) > 0)[0] exceedence_dist = np.zeros(len(exceedence)) # Really faffy process to identify the period of data which is # above the threshold around the highest value of the onset # function. d = 1 e = 0 while e < len(exceedence_dist) - 1: if e == len(exceedence_dist): exceedence_dist[e] = d else: if exceedence[e + 1] == exceedence[e] + 1: exceedence_dist[e] = d else: exceedence_dist[e] = d d += 1 e += 1 # Find the indices for this period of data tmp = exceedence_dist[np.argmax(onset_trim[exceedence])] tmp = np.where(exceedence_dist == tmp) # Add one data point below the threshold at each end of this period gau_idxmin = exceedence[tmp][0] + win_min - 1 gau_idxmax = exceedence[tmp][-1] + win_min + 2 logging.debug(f"gau_idxmin = {gau_idxmin} ; " f"gau_idxmax = {gau_idxmax}") # Initial guess for gaussian half-width based on onset function # STA window length data_half_range = self.onset.gaussian_halfwidth(phase) # Select data to fit the gaussian to x_data = np.arange(gau_idxmin, gau_idxmax, dtype=float) x_data = x_data / self.sampling_rate y_data = onset[gau_idxmin:gau_idxmax] # Convert indices to times x_data_dt = np.array([]) for _, x in enumerate(x_data): x_data_dt = np.hstack([x_data_dt, starttime + x]) # Try to fit a 1-D Gaussian. try: # Initial parameters are: # height = max value of onset function # mean = time of max value # sigma = data half-range (calculated above) p0 = [np.max(y_data), float(gau_idxmin + np.argmax(y_data)) / self.sampling_rate, data_half_range / self.sampling_rate] # Do the fit popt, _ = curve_fit(util.gaussian_1d, x_data, y_data, p0) # Results: # popt = [height, mean (seconds), sigma (seconds)] max_onset = popt[0] # Convert mean (pick time) to time mean = starttime + float(popt[1]) sigma = np.absolute(popt[2]) # Check pick mean is within the pick window. if not win_min < popt[1] * self.sampling_rate < win_max: gaussian_fit = self.DEFAULT_GAUSSIAN_FIT gaussian_fit["PickThreshold"] = threshold sigma = -1 mean = -1 max_onset = -1 else: gaussian_fit = {"popt": popt, "xdata": x_data, "xdata_dt": x_data_dt, "PickValue": max_onset, "PickThreshold": threshold} # If curve_fit fails. Will also spit error message to stdout, # though this can be suppressed - see warnings.filterwarnings() except (ValueError, RuntimeError): gaussian_fit = self.DEFAULT_GAUSSIAN_FIT gaussian_fit["PickThreshold"] = threshold sigma = -1 mean = -1 max_onset = -1 # If onset function does not exceed threshold in pick window else: gaussian_fit = self.DEFAULT_GAUSSIAN_FIT gaussian_fit["PickThreshold"] = threshold sigma = -1 mean = -1 max_onset = -1 return gaussian_fit, max_onset, mean, sigma, [win_min, win_max]
[docs] @util.timeit() def plot(self, event, lut, picks, ttimes, run): """ Plot figure showing the filtered traces for each data component and the characteristic functions calculated from them (P and S) for each station. The search window to make a phase pick is displayed, along with the dynamic pick threshold (defined as a percentile of the background noise level), the phase pick time and its uncertainty (if made) and the Gaussian fit to the characteristic function. Parameters ---------- event_uid : str, optional Earthquake UID string; for subdirectory naming within directory {run_path}/traces/ """ fpath = run.path / "locate" / run.subname / "pick_plots" / f"{event.uid}" fpath.mkdir(exist_ok=True, parents=True) # Generate plottable timestamps for data times = event.data.times(type="matplotlib") for i, station in lut.station_data["Name"].iteritems(): signal = event.data.filtered_signal[:, i, :] onsets = [event.data.p_onset[i, :], event.data.s_onset[i, :]] stpicks = picks[picks["Station"] == station].reset_index(drop=True) window = event.picks["pick_windows"][station] # Check if any data available to plot if not signal.any(): continue # Call subroutine to plot basic phase pick figure fig = pick_summary(event, station, signal, stpicks, onsets, ttimes[i], window) # --- Gaussian fits --- axes = fig.axes for j, (ax, ph) in enumerate(zip(axes[3:5], ["P", "S"])): gau = event.picks["gaussfits"][station][ph] win = window[ph] # Plot threshold thresh = gau["PickThreshold"] norm = max(onsets[j][win[0]:win[1]+1]) ax.axhline(thresh / norm, label="Pick threshold") axes[5].text(0.05+j*0.5, 0.25, f"Threshold: {thresh:5.3f}", ha="left", va="center", fontsize=18) # Check pick has been made if not gau["PickValue"] == -1: yy = util.gaussian_1d(gau["xdata"], gau["popt"][0], gau["popt"][1], gau["popt"][2]) dt = [x.datetime for x in gau["xdata_dt"]] ax.plot(dt, yy / norm) # --- Picking windows --- for j, ax in enumerate(axes[:5]): win = window["P"] if j % 3 == 0 else window["S"] clr = "#F03B20" if j % 3 == 0 else "#3182BD" ax.fill_betweenx([-1.1, 1.1], times[win[0]], times[win[1]], alpha=0.2, color=clr, label="Picking window") for ax in axes[3:5]: ax.legend(fontsize=14) fstem = f"{event.uid}_{station}" file = (fpath / fstem).with_suffix(".pdf") plt.savefig(file) plt.close(fig)
@property def fraction_tt(self): """Handler for deprecated attribute 'fraction_tt'""" return self._fraction_tt @fraction_tt.setter def fraction_tt(self, value): print("FutureWarning: Fraction of traveltime attribute has moved to " "lookup table.\n Overriding...") self._fraction_tt = value