Source code for quakemigrate.signal.trigger

# -*- coding: utf-8 -*-
"""
Module to perform the trigger stage of QuakeMigrate.

:copyright:
    2020, QuakeMigrate developers.
:license:
    GNU General Public License, Version 3
    (https://www.gnu.org/licenses/gpl-3.0.html)

"""

import logging

import numpy as np
from obspy import UTCDateTime
import pandas as pd

from quakemigrate.io import Run, read_scanmseed, write_triggered_events
from quakemigrate.plot import trigger_summary
import quakemigrate.util as util


[docs]def calculate_mad(x, scale=1.4826): """ Calculates the Median Absolute Deviation (MAD) of the input array x. Parameters ---------- x : array-like Coalescence array in. scale : float, optional A scaling factor for the MAD output to make the calculated MAD factor a consistent estimation of the standard deviation of the distribution. Returns ------- scaled_mad : array-like Array of scaled mean absolute deviation values for the input array, x, scaled to provide an estimation of the standard deviation of the distribution. """ x = np.asarray(x) if not x.size: return np.nan if np.isnan(np.sum(x)): return np.nan # Calculate median and mad values: med = np.apply_over_axes(np.median, x, 0) mad = np.median(np.abs(x - med), axis=0) return scale * mad
[docs]def chunks2trace(a, new_shape): """ Create a trace filled with chunks of the same value. Parameters: ----------- a : array-like Array of chunks. new_shape : tuple of ints (number of chunks, chunk_length) Returns: -------- b : array-like Single array of values contained in `a`. """ b = np.broadcast_to(a[:, None], new_shape) b = np.reshape(b, np.product(new_shape)) return b
CANDIDATES_COLS = ["EventNum", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"] REFINED_EVENTS_COLS = ["EventID", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"]
[docs]class Trigger: """ QuakeMigrate triggering class. Triggers candidate earthquakes from the maximum coalescence through time data output by the decimated detect scan, ready to be run through locate(). Parameters ---------- lut : :class:`~quakemigrate.lut.LUT` object Contains the traveltime lookup tables for P- and S-phases, computed for some pre-defined velocity model. run_path : str Points to the top level directory containing all input files, under which the specific run directory will be created. run_name : str Name of the current QuakeMigrate run. kwargs : **dict See Trigger Attributes for details. In addition to these: log : bool, optional Toggle for logging. If True, will output to stdout and generate a log file. Default is to only output to stdout. loglevel : {"info", "debug"}, optional Toggle to set the logging level: "debug" will print out additional diagnostic information to the log and stdout. (Default "info") trigger_name : str Optional name of a sub-run - useful when testing different trigger parameters, for example. Attributes ---------- mad_window_length : float, optional Length of window within which to calculate the Median Average Deviation. Default: 3600 seconds (1 hour). mad_multiplier : float, optional A scaling factor for the MAD output to make the calculated MAD factor a consistent estimation of the standard deviation of the distribution. Default: 1.4826, which is the appropriate scaling factor for a normal distribution. marginal_window : float, optional Time window over which to marginalise the coalescence, making it solely a function of the spatial dimensions. This should be an estimate of the time error, as derived from an estimate of the spatial error and error in the velocity model. Default: 2 seconds. min_event_interval : float, optional Minimum time interval between triggered events. Must be at least twice the marginal window. Default: 4 seconds. normalise_coalescence : bool, optional If True, triggering is performed on the maximum coalescence normalised by the mean coalescence value in the 3-D grid. Default: False. pad : float, optional Additional time padding to ensure events close to the starttime/endtime are not cut off and missed. Default: 120 seconds. run : :class:`~quakemigrate.io.Run` object Light class encapsulating i/o path information for a given run. static_threshold : float, optional Static threshold value above which to trigger candidate events. threshold_method : str, optional Toggle between a "static" threshold and a "dynamic" threshold, based on the Median Average Deviation. Default: "static". xy_files : str, optional Path to comma-separated value file (.csv) containing a series of coordinate files to plot. Columns: ["File", "Color", "Linewidth", "Linestyle"], where "File" is the absolute path to the file containing the coordinates to be plotted. E.g: "/home/user/volcano_outlines.csv,black,0.5,-". Each .csv coordinate file should contain coordinates only, with columns: ["Longitude", "Latitude"]. E.g.: "-17.5,64.8". Lines pre-pended with ``#`` will be treated as a comment - this can be used to include references. See the Volcanotectonic_Iceland example XY_files for a template.\n .. note:: Do not include a header line in either file. Methods ------- trigger(starttime, endtime, region=None, savefig=True) Trigger candidate earthquakes from decimated detect scan results. Raises ------ ValueError If `min_event_interval` < 2 * `marginal_window`. InvalidThresholdMethodException If an invalid threshold method is passed in by the user. TimeSpanException If the user supplies a starttime that is after the endtime. """ def __init__(self, lut, run_path, run_name, **kwargs): """Instantiate the Trigger object.""" self.lut = lut # --- Organise i/o and logging --- self.run = Run(run_path, run_name, kwargs.get("trigger_name", ""), "trigger", loglevel=kwargs.get("loglevel", "info")) self.run.logger(kwargs.get("log", False)) # --- Grab Trigger parameters or set defaults --- self.threshold_method = kwargs.get("threshold_method", "static") if self.threshold_method == "static": self.static_threshold = kwargs.get("static_threshold", 1.5) elif self.threshold_method == "dynamic": self.mad_window_length = kwargs.get("mad_window_length", 3600.) self.mad_multiplier = kwargs.get("mad_multiplier", 1.) else: raise util.InvalidThresholdMethodException self.marginal_window = kwargs.get("marginal_window", 2.) self.min_event_interval = kwargs.get("min_event_interval", 4.) self.minimum_repeat = kwargs.get("minimum_repeat", 4.) self.normalise_coalescence = kwargs.get("normalise_coalescence", False) self.pad = kwargs.get("pad", 120.) # --- Plotting parameters --- self.xy_files = kwargs.get("xy_files") def __str__(self): """Return short summary string of the Trigger object.""" out = ("\tTrigger parameters:\n" f"\t\tPre/post pad = {self.pad} s\n" f"\t\tMarginal window = {self.marginal_window} s\n" f"\t\tMinimum event interval = {self.min_event_interval} s\n\n" f"\t\tTriggering from the ") out += "normalised " if self.normalise_coalescence else "" out += "maximum coalescence trace.\n\n" out += f"\t\tTrigger threshold method: {self.threshold_method}\n" if self.threshold_method == "static": out += f"\t\tStatic threshold = {self.static_threshold}\n" elif self.threshold_method == "dynamic": out += (f"\t\tMAD Window = {self.mad_window_length}\n" f"\t\tMAD Multiplier = {self.mad_multiplier}\n") return out
[docs] def trigger(self, starttime, endtime, region=None, savefig=True): """ Trigger candidate earthquakes from decimated scan data. Parameters ---------- starttime : str Timestamp from which to trigger. endtime : str Timestamp up to which to trigger. region : list of floats, optional Only write triggered events within this region to the triggered events csv file (for use in locate.) Format is: [Xmin, Ymin, Zmin, Xmax, Ymax, Zmax] Units are longitude / latitude / metres (in positive-down frame). savefig : bool, optional Save triggered events figure (default) or open interactive view. Raises ------ TimeSpanException If `starttime` is after `endtime`. """ starttime, endtime = UTCDateTime(starttime), UTCDateTime(endtime) if starttime > endtime: raise util.TimeSpanException logging.info(util.log_spacer) logging.info("\tTRIGGER - Triggering events from .scanmseed") logging.info(util.log_spacer) logging.info(f"\n\tTriggering events from {starttime} to {endtime}\n") logging.info(self) logging.info(util.log_spacer) batchstart = starttime while batchstart < endtime: next_day = UTCDateTime(batchstart.date) + 86400 batchend = next_day if next_day <= endtime else endtime self._trigger_batch(batchstart, batchend, region, savefig) batchstart = next_day logging.info(util.log_spacer)
def _trigger_batch(self, batchstart, batchend, region, savefig): """ Wraps all of the methods used in sequence to determine triggers. Parameters ---------- batchstart : `obspy.UTCDateTime` object Timestamp from which to trigger. batchend : `obspy.UTCDateTime` object Timestamp up to which to trigger. region : list of floats Only write triggered events within this region to the triggered events csv file (for use in locate.) Format is: [Xmin, Ymin, Zmin, Xmax, Ymax, Zmax] Units are longitude / latitude / metres (in positive-down frame). savefig : bool Save triggered events figure (default) or open interactive view. """ logging.info("\tReading in .scanmseed...") data, stats = read_scanmseed(self.run, batchstart, batchend, self.pad, self.lut.unit_conversion_factor) logging.info("\n\tTriggering events...") trigger_on = "COA_N" if self.normalise_coalescence else "COA" threshold = self._get_threshold(data[trigger_on], stats.sampling_rate) candidate_events = self._identify_candidates(data, trigger_on, threshold) if candidate_events.empty: logging.info("\tNo events triggered at this threshold - try a " "lower detection threshold.") events = candidate_events else: refined_events = self._refine_candidates(candidate_events) events = self._filter_events(refined_events, batchstart, batchend, region) discarded = refined_events[~refined_events.isin(events)].dropna() logging.info(f"\n\t\t{len(events)} event(s) triggered within the " f"specified region between {batchstart} \n\t\tand " f"{batchend}") logging.info("\n\tWriting triggered events to file...") write_triggered_events(self.run, events, batchstart) logging.info("\n\tPlotting trigger summary...") trigger_summary(events, batchstart, batchend, self.run, self.marginal_window, self.min_event_interval, threshold, self.normalise_coalescence, self.lut, data, region, savefig, discarded, xy_files=self.xy_files) @util.timeit() def _get_threshold(self, scandata, sampling_rate): """ Determine the threshold to use when triggering candidate events. Parameters ---------- scandata : `pandas.Series` object (Normalised) coalescence values for which to calculate the threshold. sampling_rate : int Number of samples per second of the coalescence scan data. Returns ------- threshold : `numpy.ndarray` object Array of threshold values. """ if self.threshold_method == "dynamic": # Split the data in window_length chunks breaks = np.arange(len(scandata)) breaks = breaks[breaks % int(self.mad_window_length * sampling_rate) == 0][1:] chunks = np.split(scandata.values, breaks) # Calculate the mad and median values mad_values = np.asarray([calculate_mad(chunk) for chunk in chunks]) median_values = np.asarray([np.median(chunk) for chunk in chunks]) mad_trace = chunks2trace(mad_values, (len(chunks), len(chunks[0]))) median_trace = chunks2trace(median_values, (len(chunks), len(chunks[0]))) mad_trace = mad_trace[:len(scandata)] median_trace = median_trace[:len(scandata)] # Set the dynamic threshold threshold = median_trace + (mad_trace * self.mad_multiplier) else: # Set static threshold threshold = np.zeros_like(scandata) + self.static_threshold return threshold @util.timeit() def _identify_candidates(self, scandata, trigger_on, threshold): """ Identify distinct periods of time for which the maximum (normalised) coalescence trace exceeds the chosen threshold. Parameters ---------- scandata : `pandas.DataFrame` object Data output by detect() -- decimated scan. Columns: ["DT", "COA", "COA_N", "X", "Y", "Z"] - X/Y/Z as lon/lat/m trigger_on : str Specifies the maximum coalescence data on which to trigger events. threshold : `numpy.ndarray` object Array of threshold values. Returns ------- triggers : `pandas.DataFrame` object Candidate events exceeding some threshold. Columns: ["EventNum", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"] """ # Switch between user-facing minimum event interval definition (minimum # interval between event triggers) and internal definition (extra # buffer on top of marginal window within which events cannot overlap) min_event_interval = self.min_event_interval - self.marginal_window thresholded = scandata[scandata[trigger_on] >= threshold] r = np.arange(len(thresholded)) candidates = [d for _, d in thresholded.groupby(thresholded.index - r)] triggers = pd.DataFrame(columns=CANDIDATES_COLS) for i, candidate in enumerate(candidates): peak = candidate.loc[candidate[trigger_on].idxmax()] # If first sample above threshold is within the marginal window if (peak["DT"] - candidate["DT"].iloc[0]) < self.marginal_window: min_dt = peak["DT"] - self.min_event_interval # Otherwise just subtract the minimum event interval else: min_dt = candidate["DT"].iloc[0] - min_event_interval # If last sample above threshold is within the marginal window if (candidate["DT"].iloc[-1] - peak["DT"]) < self.marginal_window: max_dt = peak["DT"] + self.min_event_interval # Otherwise just add the minimum event interval else: max_dt = candidate["DT"].iloc[-1] + min_event_interval trigger = pd.Series([i, peak["DT"], peak[trigger_on], peak["X"], peak["Y"], peak["Z"], min_dt, max_dt, peak["COA"], peak["COA_N"]], index=CANDIDATES_COLS) triggers = triggers.append(trigger, ignore_index=True) return triggers @util.timeit() def _refine_candidates(self, candidate_events): """ Merge candidate events for which the marginal windows overlap with the minimum inter-event time. Parameters ---------- candidate_events : `pandas.DataFrame` object Candidate events corresponding to periods of time in which the coalescence signal exceeds some threshold. Columns: ["EventNum", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"] Returns ------- events : `pandas.DataFrame` object Merged events with some minimum inter-event spacing in time. Columns: ["EventID", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"]. """ # Iterate pairwise (event1, event2) over the candidate events to # identify overlaps between: # - event1 marginal window and event2 minimum window position # - event2 marginal window and event1 maximum window position event_count = 1 for i, event1 in candidate_events.iterrows(): candidate_events.loc[i, "EventNum"] = event_count if i + 1 == len(candidate_events): continue event2 = candidate_events.iloc[i+1] if all([event1["MaxTime"] < \ event2["CoaTime"] - self.marginal_window, event2["MinTime"] > \ event1["CoaTime"] + self.marginal_window]): event_count += 1 # Split into DataFrames by event number merged_candidates = [d for _, d in candidate_events.groupby( candidate_events["EventNum"])] # Update the min/max window times and build final event DataFrame refined_events = pd.DataFrame(columns=REFINED_EVENTS_COLS) for i, candidate in enumerate(merged_candidates): logging.debug(f"\t Triggered event {i+1} of " f"{len(merged_candidates)}") event = candidate.loc[candidate["TRIG_COA"].idxmax()].copy() event["MinTime"] = candidate["MinTime"].min() event["MaxTime"] = candidate["MaxTime"].max() # Add unique identifier event_uid = str(event["CoaTime"]) for char_ in ["-", ":", ".", " ", "Z", "T"]: event_uid = event_uid.replace(char_, "") event_uid = event_uid[:17].ljust(17, "0") event["EventID"] = event_uid refined_events = refined_events.append(event, ignore_index=True) return refined_events @util.timeit() def _filter_events(self, events, starttime, endtime, region): """ Remove events within the padding time and/or within a specific geographical region. Also adds a unique event identifier based on the coalescence time. Parameters ---------- events : `pandas.DataFrame` object Refined set of events to be filtered. Columns: ["EventID", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"]. starttime : `obspy.UTCDateTime` object Timestamp from which to trigger. endtime : `obspy.UTCDateTime` object Timestamp up to which to trigger. region : list Only write triggered events within this region to the triggered events csv file (for use in locate.) Format is: [Xmin, Ymin, Zmin, Xmax, Ymax, Zmax] Units are longitude / latitude / metres (elevation; up is positive) Returns ------- events : `pandas.DataFrame` object Final set of triggered events. Columns: ["EventID", "CoaTime", "TRIG_COA", "COA_X", "COA_Y", "COA_Z", "MinTime", "MaxTime", "COA", "COA_NORM"]. """ # Remove events which occur in the pre-pad and post-pad: events = events.loc[(events["CoaTime"] >= starttime) & (events["CoaTime"] < endtime), :].copy() if region is not None: events = events.loc[(events["COA_X"] >= region[0]) & (events["COA_Y"] >= region[1]) & (events["COA_Z"] >= region[2]) & (events["COA_X"] <= region[3]) & (events["COA_Y"] <= region[4]) & (events["COA_Z"] <= region[5]), :].copy() return events @property def min_event_interval(self): """Get and set the minimum event interval.""" return self._min_event_interval @min_event_interval.setter def min_event_interval(self, value): if value < 2 * self.marginal_window: msg = "\tMinimum event interval must be >= 2 * marginal window." raise ValueError(msg) else: self._min_event_interval = value # --- Deprecation/Future handling --- @property def minimum_repeat(self): """Handler for deprecated attribute name 'minimum_repeat'.""" return self._min_event_interval @minimum_repeat.setter def minimum_repeat(self, value): if value < 2 * self.marginal_window: msg = "\tMinimum repeat must be >= 2 * marginal window." raise ValueError(msg) print("FutureWarning: Parameter name has changed - continuing.") print("To remove this message, change:") print("\t'minimum_repeat' -> 'min_event_interval'") self._min_event_interval = value