Source code for quakemigrate.signal.trigger

# -*- coding: utf-8 -*-
"""
Module to perform the trigger stage of QuakeMigrate.

:copyright:
    2020–2024, QuakeMigrate developers.
:license:
    GNU General Public License, Version 3
    (https://www.gnu.org/licenses/gpl-3.0.html)

"""

import logging
from datetime import time

import numpy as np
from obspy import UTCDateTime
import pandas as pd
from scipy.ndimage import gaussian_filter1d

from quakemigrate.io import Run, read_scanmseed, write_triggered_events
from quakemigrate.plot import trigger_summary
import quakemigrate.util as util


[docs]def chunks2trace(a, new_shape): """ Create a trace filled with chunks of the same value. Parameters: ----------- a : array-like Array of chunks. new_shape : tuple of ints (number of chunks, chunk_length). Returns: -------- b : array-like Single array of values contained in `a`. """ b = np.broadcast_to(a[:, None], new_shape) b = np.reshape(b, np.prod(new_shape)) return b
CANDIDATES_COLS = [ "EventNum", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM", ] REFINED_EVENTS_COLS = [ "EventID", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM", ]
[docs]class Trigger: """ QuakeMigrate triggering class. Triggers candidate earthquakes from the continuous maximum coalescence through time data output by the decimated detect scan, ready to be run through locate(). Parameters ---------- lut : :class:`~quakemigrate.lut.lut.LUT` object Contains the traveltime lookup tables for the selected seismic phases, computed for some pre-defined velocity model. run_path : str Points to the top level directory containing all input files, under which the specific run directory will be created. run_name : str Name of the current QuakeMigrate run. kwargs : **dict See Trigger Attributes for details. In addition to these: log : bool, optional Toggle for logging. If True, will output to stdout and generate a log file. Default is to only output to stdout. loglevel : {"info", "debug"}, optional Toggle to set the logging level: "debug" will print out additional diagnostic information to the log and stdout. (Default "info") trigger_name : str Optional name of a sub-run - useful when testing different trigger parameters, for example. Attributes ---------- mad_window_length : float, optional Length of window within which to calculate the Median Absolute Deviation, for the "mad" trigger threshold method. Default: 3600 seconds (1 hour). mad_multiplier : float, optional A scaling factor for the MAD output to determine the number of median absolute deviations above the median value of the coalescence trace to set the trigger threshold; for the "mad" trigger threshold method. Default: 8.0. marginal_window : float, optional Half-width of window centred on the maximum coalescence time. The 4-D coalescence functioned is marginalised over time across this window such that the earthquake location and associated uncertainty can be appropriately calculated. It should be an estimate of the time uncertainty in the earthquake origin time, which itself is some combination of the expected spatial uncertainty and uncertainty in the seismic velocity model used. Default: 2 seconds. min_event_interval : float, optional Minimum time interval between triggered events. Must be at least twice the marginal window. Default: 4 seconds. median_window_length : float, optional Length of window within which to calculate the median of the coalescence trace, for the "median_ratio" trigger threshold method. Default: 3600 seconds (1 hour). median_multiplier : float, optional A scaling factor by which to multiply the median of the coalescence trace to set the trigger threshold; for the "median ratio" trigger threshold method. Default: 1.2. normalise_coalescence : bool, optional If True, triggering is performed on the maximum coalescence normalised by the mean coalescence value in the 3-D grid. Default: False. pad : float, optional Additional time padding to ensure events close to the starttime/endtime are not cut off and missed. Default: 120 seconds. plot_trigger_summary : bool, optional Plot triggering through time for each batched segment. Default: True. run : :class:`~quakemigrate.io.core.Run` object Light class encapsulating i/o path information for a given run. static_threshold : float, optional Static threshold value above which to trigger candidate events. threshold_method : str, optional Toggle between a "static" threshold and a selection of dynamic threshold methods; either based on the Median Absolute Deviation ("mad") or a multiple of the median value of the coalescence trace ("median_ratio"). Default: "static". smooth_coa : bool, optional Whether to apply a gaussian smoothing to the coalescence trace before applying the trigger threshold to identify candidate events. Default: False smoothing_kernel_sigma : float, optional Sigma (standard deviation) of the Gaussian kernel to convolve with the coalescence trace, to be used with 'smooth_coa'. Default: 0.2 seconds. smoothing_kernel_width : float, optional Number of standard deviations at which to truncate the Gaussian kernel. See `~scipy.ndimage.gaussian_filter1d` for more information. To be used with 'smooth_coa'. Default: 4.0. xy_files : str, optional Path to comma-separated value file (.csv) containing a series of coordinate files to plot. Columns: ["File", "Color", "Linewidth", "Linestyle"], where "File" is the absolute path to the file containing the coordinates to be plotted. E.g: "/home/user/volcano_outlines.csv,black,0.5,-". Each .csv coordinate file should contain coordinates only, with columns: ["Longitude", "Latitude"]. E.g.: "-17.5,64.8". Lines pre-pended with ``#`` will be treated as a comment - this can be used to include references. See the Volcanotectonic_Iceland example XY_files for a template.\n .. note:: Do not include a header line in either file. plot_all_stns : bool, optional If true, plot all stations used for detect. Otherwise, only plot stations which for which some data was available during the trigger time window. NOTE: if no station availability data is found, all stations in the LUT will be plotted. (Default: True) write_event_time_windows : bool, optional If true, write out the MinTime and MaxTime of the trigger window for each triggered event (corresponding to the minimum event interval plus the window during which the coalescence value is above the trigger threshold). This option is mainly a convenience for post-hoc plotting. Methods ------- trigger(starttime, endtime, region=None, interactive_plot=False) Trigger candidate earthquakes from decimated detect scan results. Raises ------ ValueError If `min_event_interval` < 2 * `marginal_window`. InvalidTriggerThresholdMethodException If an invalid threshold method is passed in by the user. TimeSpanException If the user supplies a starttime that is after the endtime. """ def __init__(self, lut, run_path, run_name, **kwargs): """Instantiate the Trigger object.""" self.lut = lut # --- Organise i/o and logging --- self.run = Run( run_path, run_name, kwargs.get("trigger_name", ""), "trigger", loglevel=kwargs.get("loglevel", "info"), ) self.run.logger(kwargs.get("log", False)) # --- Grab Trigger parameters or set defaults --- self.threshold_method = kwargs.get("threshold_method", "static") self.static_threshold = kwargs.get("static_threshold", 1.5) self.mad_window_length = kwargs.get("mad_window_length", 3600.0) self.mad_multiplier = kwargs.get("mad_multiplier", 8.0) self.median_window_length = kwargs.get("median_window_length", 3600.0) self.median_multiplier = kwargs.get("median_multiplier", 1.2) self.marginal_window = kwargs.get("marginal_window", 2.0) self.min_event_interval = kwargs.get("min_event_interval", 4.0) if kwargs.get("minimum_repeat"): self.minimum_repeat = kwargs.get("minimum_repeat") self.normalise_coalescence = kwargs.get("normalise_coalescence", False) self.pad = kwargs.get("pad", 120.0) self.smooth_coa = kwargs.get("smooth_coa", False) self.smoothing_kernel_sigma = kwargs.get("smoothing_kernel_sigma", 0.2) self.smoothing_kernel_width = kwargs.get("smoothing_kernel_width", 4.0) # --- Plotting toggles and parameters --- self.plot_trigger_summary = kwargs.get("plot_trigger_summary", True) self.xy_files = kwargs.get("xy_files") self.plot_all_stns = kwargs.get("plot_all_stns", True) # --- Trigger io parameters --- self.write_event_time_windows = kwargs.get("write_event_time_windows", False) def __str__(self): """Return short summary string of the Trigger object.""" out = ( "\tTrigger parameters:\n" f"\t\tPre/post pad = {self.pad} s\n" f"\t\tMarginal window = {self.marginal_window} s\n" f"\t\tMinimum event interval = {self.min_event_interval} s\n\n" f"\t\tTriggering from the " ) out += "normalised " if self.normalise_coalescence else "" out += "maximum coalescence trace.\n\n" out += f"\t\tTrigger threshold method: {self.threshold_method}\n" if self.threshold_method == "static": out += f"\t\tStatic threshold = {self.static_threshold}\n\n" elif self.threshold_method == "mad": out += ( f"\t\tMAD Window = {self.mad_window_length}\n" f"\t\tMAD Multiplier = {self.mad_multiplier}\n\n" ) elif self.threshold_method == "median_ratio": out += ( f"\t\tMedian Window = {self.median_window_length}\n" f"\t\tMedian Multiplier = {self.median_multiplier}\n\n" ) if self.smooth_coa: out += ( "\t\tApplying gaussian smoothing to the coalescence trace.\n" f"\t\tGaussian kernel sigma = {self.smoothing_kernel_sigma} s\n" f"\t\tGaussian kernel truncated at {self.smoothing_kernel_width} " f"standard deviations.\n" ) return out
[docs] def trigger(self, starttime, endtime, region=None, interactive_plot=False): """ Trigger candidate earthquakes from decimated scan data. Parameters ---------- starttime : str Timestamp from which to trigger events. endtime : str Timestamp up to which to trigger events. region : list of floats, optional Only retain triggered events located within this region. Format is: [Xmin, Ymin, Zmin, Xmax, Ymax, Zmax] As longitude / latitude / depth (units corresponding to the lookup table grid projection; in positive-down frame). interactive_plot : bool, optional Toggles whether to produce an interactive plot. Default: False. Raises ------ TimeSpanException If `starttime` is after `endtime`. """ starttime, endtime = UTCDateTime(starttime), UTCDateTime(endtime) if starttime > endtime: raise util.TimeSpanException logging.info(util.log_spacer) logging.info("\tTRIGGER - Triggering events from .scanmseed") logging.info(util.log_spacer) logging.info(f"\n\tTriggering events from {starttime} to {endtime}\n") logging.info(self) logging.info(util.log_spacer) batchstart = starttime while batchstart < endtime: next_day = UTCDateTime(batchstart.date) + 86400 batchend = next_day if next_day <= endtime else endtime self._trigger_batch(batchstart, batchend, region, interactive_plot) batchstart = next_day logging.info(util.log_spacer)
def _trigger_batch(self, batchstart, batchend, region, interactive_plot): """ Wraps all of the methods used in sequence to determine triggers. Parameters ---------- batchstart : `obspy.UTCDateTime` object Timestamp from which to trigger events. batchend : `obspy.UTCDateTime` object Timestamp up to which to trigger events. region : list of floats Only retain triggered events located within this region. Format is: [Xmin, Ymin, Zmin, Xmax, Ymax, Zmax] As longitude / latitude / depth (units corresponding to the lookup table grid projection; in positive-down frame). interactive_plot : bool Toggles whether to produce an interactive plot. Default: False. """ logging.info("\tReading in .scanmseed...") data, stats = read_scanmseed( self.run, batchstart, batchend, self.pad, self.lut.unit_conversion_factor ) # Check if the batch extends to midnight; if so, reduce 'batchend' by one sample # to ensure consistent treatment of multi-day runs (midnight = next day, so not # included). if batchend.time == time(0, 0): batchend -= stats.delta if self.smooth_coa: data = self._smooth_coa(data, stats.sampling_rate) logging.info("\n\tTriggering events...") trigger_on = "COA_N" if self.normalise_coalescence else "COA" threshold = self._get_threshold(data[trigger_on], stats.sampling_rate) candidate_events = self._identify_candidates(data, trigger_on, threshold) if candidate_events.empty: logging.info( "\tNo events triggered at this threshold - try a lower detection " "threshold." ) events = candidate_events discarded = candidate_events else: refined_events = self._refine_candidates(candidate_events) logging.debug(refined_events) events = self._filter_events(refined_events, batchstart, batchend, region) logging.debug(events) discarded = refined_events[ ~refined_events.index.isin(events.index) ].dropna() logging.debug(discarded) logging.info( f"\n\t\t{len(events)} event(s) triggered within the specified region " f"between {batchstart} \n\t\tand {batchend}" ) logging.info("\n\tWriting triggered events to file...") write_triggered_events( self.run, events, batchstart, self.write_event_time_windows ) if self.plot_trigger_summary: logging.info("\n\tPlotting trigger summary...") trigger_summary( events, batchstart, batchend, self.run, self.marginal_window, self.min_event_interval, threshold, self._threshold_method_string(), self.normalise_coalescence, self.lut, data, region, discarded, interactive=interactive_plot, xy_files=self.xy_files, plot_all_stns=self.plot_all_stns, ) def _threshold_method_string(self): """Threshold parameter string for trigger summary plot.""" if self.threshold_method == "static": threshold_string = f"{self.static_threshold} (static)" elif self.threshold_method == "mad": threshold_string = ( f"MAD ({self.mad_window_length} s / {self.mad_multiplier}x)" ) elif self.threshold_method == "median_ratio": threshold_string = ( f"Median Ratio ({self.median_window_length} s / " f"{self.median_multiplier}x)" ) return threshold_string def _smooth_coa(self, data, sampling_rate): """Apply a Gaussian smoothing to the coalescence trace.""" # Convert kernel sigma from time to samples st_dev = self.smoothing_kernel_sigma * sampling_rate logging.info("\n\tApplying smoothing...") data.loc[:, "COA"] = gaussian_filter1d( data["COA"], st_dev, truncate=self.smoothing_kernel_width ) data.loc[:, "COA_N"] = gaussian_filter1d( data["COA_N"], st_dev, truncate=self.smoothing_kernel_width ) return data @util.timeit() def _get_threshold(self, scandata, sampling_rate): """ Determine the threshold to use when triggering candidate events. Parameters ---------- scandata : `pandas.Series` object (Normalised) coalescence values for which to calculate the threshold. sampling_rate : int Number of samples per second of the coalescence scan data. Returns ------- threshold : `numpy.ndarray` object Array of threshold values. """ if self.threshold_method in ["mad", "median_ratio"]: # Split the coalescence trace into window_length chunks breaks = np.arange(len(scandata)) if self.threshold_method == "mad": window_length = self.mad_window_length else: window_length = self.median_window_length breaks = breaks[breaks % int(window_length * sampling_rate) == 0][1:] chunks = np.split(scandata.values, breaks) # Calculate the median values median_values = np.asarray([np.median(chunk) for chunk in chunks]) median_trace = chunks2trace(median_values, (len(chunks), len(chunks[0]))) median_trace = median_trace[: len(scandata)] if self.threshold_method == "mad": # If MAD, also calculate the MAD values mad_values = np.asarray([util.calculate_mad(chunk) for chunk in chunks]) mad_trace = chunks2trace(mad_values, (len(chunks), len(chunks[0]))) mad_trace = mad_trace[: len(scandata)] # Set the dynamic threshold threshold = median_trace + (mad_trace * self.mad_multiplier) else: # Set the dynamic threshold threshold = median_trace * self.median_multiplier else: # Set static threshold threshold = np.zeros_like(scandata) + self.static_threshold return threshold @util.timeit() def _identify_candidates(self, scandata, trigger_on, threshold): """ Identify distinct periods of time for which the maximum (normalised) coalescence trace exceeds the chosen threshold. Parameters ---------- scandata : `pandas.DataFrame` object Data output by detect() -- decimated scan. Columns: ["DT", "COA", "COA_N", "X", "Y", "Z"] - X/Y/Z as lon/lat/ z-units corresponding to the units of the lookup table grid projection. trigger_on : str Specifies the maximum coalescence data on which to trigger events. threshold : `numpy.ndarray` object Array of threshold values. Returns ------- triggers : `pandas.DataFrame` object Candidate events exceeding some threshold. Columns: ["EventNum", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"] """ # Switch between user-facing minimum event interval definition (minimum interval # between event triggers) and internal definition (extra buffer on top of # marginal window within which events cannot overlap) min_event_interval = self.min_event_interval - self.marginal_window thresholded = scandata[scandata[trigger_on] >= threshold] r = np.arange(len(thresholded)) candidates = [d for _, d in thresholded.groupby(thresholded.index - r)] triggers = pd.DataFrame(columns=CANDIDATES_COLS) for i, candidate in enumerate(candidates): # Assign peak index based on the "COA" timeseries, irrespective of whether # the user chose to use "COA_N" for trigger; this ensures a closer # comparison with the origin time determination in locate(), which is fixed # to using the COA timeseries. peak = candidate.loc[candidate["COA"].idxmax()] # If first sample above threshold is within the marginal window if (peak["DT"] - candidate["DT"].iloc[0]) < self.marginal_window: min_dt = peak["DT"] - self.min_event_interval # Otherwise just subtract the minimum event interval else: min_dt = candidate["DT"].iloc[0] - min_event_interval # If last sample above threshold is within the marginal window if (candidate["DT"].iloc[-1] - peak["DT"]) < self.marginal_window: max_dt = peak["DT"] + self.min_event_interval # Otherwise just add the minimum event interval else: max_dt = candidate["DT"].iloc[-1] + min_event_interval trigger = pd.Series( [ i, peak["DT"], peak[trigger_on], peak["X"], peak["Y"], peak["Z"], min_dt, max_dt, peak["COA"], peak["COA_N"], ], index=CANDIDATES_COLS, ) triggers = pd.concat( [ triggers if not triggers.empty else None, trigger.to_frame().T.convert_dtypes(), ], ignore_index=True, ) return triggers @util.timeit() def _refine_candidates(self, candidate_events): """ Merge candidate events for which the marginal windows overlap with the minimum inter-event time. Parameters ---------- candidate_events : `pandas.DataFrame` object Candidate events corresponding to periods of time in which the coalescence signal exceeds some threshold. Columns: ["EventNum", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"] Returns ------- events : `pandas.DataFrame` object Merged events with some minimum inter-event spacing in time. Columns: ["EventID", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"]. """ # Iterate pairwise (event1, event2) over the candidate events to identify # overlaps between: # - event1 marginal window and event2 minimum window position # - event2 marginal window and event1 maximum window position event_count = 1 for i, event1 in candidate_events.iterrows(): candidate_events.loc[i, "EventNum"] = event_count if i + 1 == len(candidate_events): continue event2 = candidate_events.iloc[i + 1] if all( [ event1["MaxTime"] < event2["CoaTime"] - self.marginal_window, event2["MinTime"] > event1["CoaTime"] + self.marginal_window, ] ): event_count += 1 # Split into DataFrames by event number merged_candidates = [ d for _, d in candidate_events.groupby(candidate_events["EventNum"]) ] # Update the min/max window times and build final event DataFrame refined_events = pd.DataFrame(columns=REFINED_EVENTS_COLS) for i, candidate in enumerate(merged_candidates): logging.debug(f"\t Triggered event {i+1} of {len(merged_candidates)}") event = candidate.loc[candidate["TRIG_COA"].idxmax()].copy() event["MinTime"] = candidate["MinTime"].min() event["MaxTime"] = candidate["MaxTime"].max() # Add unique identifier event_uid = str(event["CoaTime"]) for char_ in ["-", ":", ".", " ", "Z", "T"]: event_uid = event_uid.replace(char_, "") event_uid = event_uid[:17].ljust(17, "0") event["EventID"] = event_uid refined_events = pd.concat( [ refined_events if not refined_events.empty else None, event.to_frame().T.convert_dtypes(), ], ignore_index=True, ) return refined_events @util.timeit() def _filter_events(self, events, starttime, endtime, region): """ Remove events within the padding time and/or within a specific geographical region. Also adds a unique event identifier based on the coalescence time. Parameters ---------- events : `pandas.DataFrame` object Refined set of events to be filtered. Columns: ["EventID", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"]. starttime : `obspy.UTCDateTime` object Timestamp from which to trigger events. endtime : `obspy.UTCDateTime` object Timestamp up to which to trigger events. region : list of floats Only retain triggered events located within this region. Format is: [Xmin, Ymin, Zmin, Xmax, Ymax, Zmax] As longitude / latitude / depth (units corresponding to the lookup table grid projection; in positive-down frame). Returns ------- events : `pandas.DataFrame` object Final set of triggered events. Columns: ["EventID", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"]. """ # Remove events which occur in the pre-pad and post-pad: events = events.loc[ (events["CoaTime"] >= starttime) & (events["CoaTime"] <= endtime), : ].copy() if region is not None: events = events.loc[ (events["COA_X"] >= region[0]) & (events["COA_Y"] >= region[1]) & (events["COA_Z"] >= region[2]) & (events["COA_X"] <= region[3]) & (events["COA_Y"] <= region[4]) & (events["COA_Z"] <= region[5]), :, ].copy() return events @property def min_event_interval(self): """Get and set the minimum event interval.""" return self._min_event_interval @min_event_interval.setter def min_event_interval(self, value): if value < 2 * self.marginal_window: raise ValueError("\tMinimum event interval must be >= 2 * marginal window.") else: self._min_event_interval = value @property def threshold_method(self): """Get and set the threshold method.""" return self._threshold_method @threshold_method.setter def threshold_method(self, value): if value in ["static", "mad", "median_ratio"]: self._threshold_method = value elif value == "dynamic": # DEPRECATED print( "FutureWarning: This threshold method has been renamed - continuing.\n" "To remove this message, change:\n\t'dynamic' -> 'mad'" ) self._threshold_method = "mad" else: raise util.InvalidTriggerThresholdMethodException # --- Deprecation/Future handling --- @property def minimum_repeat(self): """Handler for deprecated attribute name 'minimum_repeat'.""" return self._min_event_interval @minimum_repeat.setter def minimum_repeat(self, value): if value < 2 * self.marginal_window: raise ValueError("\tMinimum repeat must be >= 2 * marginal window.") print( "FutureWarning: Parameter name has changed - continuing.\n" "To remove this message, change:\n" "\t'minimum_repeat' -> 'min_event_interval'" ) self._min_event_interval = value