Source code for quakemigrate.signal.onsets.stalta

# -*- coding: utf-8 -*-
"""
The default onset function class - performs some pre-processing on raw seismic data and
calculates STA/LTA onset (characteristic) function.

:copyright:
    2020–2024, QuakeMigrate developers.
:license:
    GNU General Public License, Version 3
    (https://www.gnu.org/licenses/gpl-3.0.html)

"""

import logging

import numpy as np
from obspy import Stream
from scipy.signal import hilbert

import quakemigrate.util as util
from quakemigrate.core import overlapping_sta_lta, centred_sta_lta
from .base import Onset, OnsetData


[docs]def centred_sta_lta_py(signal, nsta, nlta): """ Calculates the ratio of the average of the signal in a short-term (signal) window to a preceding long-term (noise) window. STA/LTA value is assigned to the end of the LTA / one sample before the start of the STA. NOTE: signal must be non-negative. Parameters ---------- signal : array-like Transformed non-negative seismic waveform. nsta : int Number of samples in short-term window. nlta : int Number of samples in long-term window. Returns ------- sta / lta : array-like Short-term average / long-term average ratio of the signal amplitude, computed in adjacent STA/LTA windows. """ # Cumulative sum to calculate moving average sta = np.cumsum(signal) # Convert to float sta = np.require(sta, dtype=float) # Copy for LTA lta = sta.copy() # Compute the STA and the LTA sta[nsta:] = sta[nsta:] - sta[:-nsta] sta[nsta:-nsta] = sta[nsta * 2 :] sta /= nsta lta[nlta:] = lta[nlta:] - lta[:-nlta] lta /= nlta # Pad with ones (= null result) sta[: nlta - 1] = 1.0 lta[: nlta - 1] = 1.0 sta[-nsta:] = 1.0 lta[-nsta:] = 1.0 # Avoid division by zero by setting zero values to tiny float, giving an # STA/LTA of 1 (= null result) dtiny = np.finfo(float).tiny idx = lta < dtiny lta[idx] = dtiny sta[idx] = dtiny return sta / lta
[docs]def overlapping_sta_lta_py(signal, nsta, nlta): """ Computes the standard STA/LTA from a given input array `signal`. The length of the STA window is given by nsta (in samples), nlta is the length of the LTA window (in samples). STA window fully overlaps with the LTA window, and is positioned to the "right" i.e. the end of both of the windows is at the latest point in time; this is where the STA / LTA value is assigned. NOTE: signal must be non-negative. Parameters ---------- signal : `~numpy.ndarray` Transformed non-negative seismic waveform. nsta : int Length of short time average window in samples. nlta : int Length of long time average window in samples. Returns ------- sta/lta :`~numpy.ndarray` Short-term average / long-term average ratio of the signal amplitude, computed in overlapping STA/LTA windows. """ # Cumulative sum to calculate moving average sta = np.cumsum(signal) # Convert to float sta = np.require(sta, dtype=float) # Copy for LTA lta = sta.copy() # Compute the STA and the LTA sta[nsta:] = sta[nsta:] - sta[:-nsta] sta /= nsta lta[nlta:] = lta[nlta:] - lta[:-nlta] lta /= nlta # Pad with ones (= null result) sta[: nlta - 1] = 1.0 lta[: nlta - 1] = 1.0 # Avoid division by zero by setting zero values to tiny float, giving an # STA/LTA of 1 (= null result) dtiny = np.finfo(0.0).tiny idx = lta < dtiny lta[idx] = dtiny sta[idx] = dtiny return sta / lta
[docs]def pre_process(stream, sampling_rate, resample, upfactor, filter_, starttime, endtime): """ Resample raw seismic data, detrend and apply cosine taper and zero phase-shift Butterworth band-pass filter; all carried out using the built-in obspy functions. By default, data with mismatched sampling rates will only be decimated. If necessary, and if the user has specified `resample = True` and an `upfactor` to upsample by `upfactor = int` for the waveform archive, data can also be upsampled and then, if necessary, subsequently decimated to achieve the desired sampling rate. For example, for raw input data sampled at a mix of 40, 50 and 100 Hz, to achieve a unified sampling rate of 50 Hz, the user would have to specify an `upfactor` of 5; 40 Hz x 5 = 200 Hz, which can then be decimated to 50 Hz. NOTE: data will be detrended and a cosine taper applied before decimation, in order to avoid edge effects when applying the lowpass filter. See :func:`~quakemigrate.util.resample` Parameters ---------- stream : `obspy.Stream` object Waveform data to be pre-processed. sampling_rate : int Desired sampling rate for data to be used to calculate onset. This will be achieved by resampling the raw waveform data. By default, only decimation will be applied, but data can also be upsampled if specified by the user when creating the :class:`~quakemigrate.io.data.Archive` object. resample : bool, optional If true, perform resampling of data which cannot be decimated directly to the desired sampling rate. See :func:`~quakemigrate.util.resample` upfactor : int, optional Factor by which to upsample the data to enable it to be decimated to the desired sampling rate, e.g. 40Hz -> 50Hz requires upfactor = 5. See :func:`~quakemigrate.util.resample` filter_ : list Filter specifications, as [lowcut (Hz), highcut (Hz), order]. NOTE - two-pass filter effectively doubles the number of corners (order). Returns ------- filtered_waveforms : `obspy.Stream` object Pre-processed seismic data. Raises ------ NyquistException If the high-cut filter specified for the bandpass filter is higher than the Nyquist frequency of the `sampling_rate`. """ logging.debug(stream.__str__(extended=True)) logging.debug(f"Resample={resample}, Upfactor={upfactor}") # Resample the data here resampled_stream = util.resample( stream, sampling_rate, resample, upfactor, starttime, endtime ) # Grab filter info lowcut, highcut, order = filter_ # Check that the filter is compatible with the sampling rate if highcut >= 0.5 * sampling_rate: raise util.NyquistException(highcut, 0.5 * sampling_rate, "") # Detrend, apply cosine taper then apply zero-phase band-pass filter # Copy to not operate in-place on the input stream filtered_waveforms = resampled_stream.copy() filtered_waveforms.detrend("linear") filtered_waveforms.detrend("constant") filtered_waveforms.taper(type="cosine", max_percentage=0.05) filtered_waveforms.filter( type="bandpass", freqmin=lowcut, freqmax=highcut, corners=order, zerophase=True ) return filtered_waveforms
[docs]class STALTAOnset(Onset): """ QuakeMigrate default onset function class - uses the Short-Term Average to Long-Term Average ratio of the signal energy amplitude. Raw seismic data will be pre-processed, including re-sampling if necessary to reach the specified uniform sampling raate, checked against a user- specified set of data quality criteria, then used to calculate onset functions for each phase (using seismic channels as specified in `channel_maps`) by computing the STA/LTA of s^2. Attributes ---------- phases : list of str Which phases to calculate onset functions for. This will determine which phases are used for migration/picking. The selected phases must be present in the travel-time look-up table to be used for these purposes. bandpass_filters : dict of [float, float, int] Butterworth bandpass filter specification - keys are phases. [lowpass (Hz), highpass (Hz), corners*] *NOTE: two-pass filter effectively doubles the number of corners. channel_maps : dict of str Data component maps - keys are phases. These are passed into the :meth:`ObsPy.stream.select` method. channel_counts : dict of int Number of channels to be used to calculate the onset function for each phase. Keys are phases. sta_lta_windows : dict of [float, float] Short-term average (STA) and Long-term average (LTA) window lengths - keys are phases. [STA, LTA] (both in seconds) all_channels : bool If True, only calculate an onset function when all requested channels meet the availability criteria. Otherwise, if at least one channel is available (e.g. just the N component for the S phase) the onset function will be calculated from that/those. allow_gaps : bool If True, allow gappy data to be used to calculate the onset function. Gappy data will be detrended, tapered and filtered, then gaps padded with zeros. This should help mitigate the expected spikes as data goes on- and off-line, but will not eliminate it. Onset functions for periods with no data will be filled with ~ zeros (smallest possible float, to avoid divide by zero errors). NOTE: This feature is experimental and still under development. full_timespan : bool If False, allow data which doesn't cover the full timespan requested to be used for onset function calculation. This is a subtly different test to `allow_gaps`; data must be continuous within the timespan, but may not span the whole period. Data will be treated as described in `allow_gaps`. NOTE: This feature is experimental and still under development. position : str, optional Compute centred STA/LTA (STA window is preceded by LTA window; value is assigned to end of LTA window / start of STA window) or classic STA/LTA (STA window is within LTA window; value is assigned to end of STA & LTA windows). Default: "classic". Centred gives less phase-shifted (late) onset function, and is closer to a Gaussian approximation, but is far more sensitive to data with sharp offsets due to instrument failures. We recommend using classic for detect() and centred for locate() if your data quality allows it. This is the default behaviour; override by setting this variable. sampling_rate : int Desired sampling rate for input data, in Hz; sampling rate at which the onset functions will be computed. signal_transform : {"energy", "abs", "env", "env_squared"}, optional Transformation to apply to the signal before taking the STA/LTA, to ensure the signal is always positive: energy (signal^2), absolute value, envelope (absolute value of the analytic signal), or envelope^2 (analytic - arguably more correct - measure of the energy of the signal). Default: "energy" min_onset_value : float, optional Minimum value at which to clip the onset function. This is the equivalent to setting a minimum SNR filter for which observations to include. The appropriate value will depend on the signal and noise characteristics, and the `signal_transform` selected. Default: 0.4 NOTE: must be greater than 0.01 Methods ------- calculate_onsets Generate onset functions that represent seismic phase arrivals. gaussian_halfwidth Phase-appropriate Gaussian half-width estimate based on the short-term average window length. """ def __init__(self, **kwargs): """Instantiate the STALTAOnset object.""" super().__init__(**kwargs) # --- General parameters --- self.position = kwargs.get("position", "classic") self.use_python_backend = kwargs.get("use_python_backend", False) self.signal_transform = kwargs.get("signal_transform", "energy") self.min_onset_value = kwargs.get("min_onset_value", 0.4) if self.min_onset_value < 0.01: raise ValueError("The `min_onset_value` must be greater than 0.01") # --- Phase-specific parameters --- self.phases = kwargs.get("phases", ["P", "S"]) self.bandpass_filters = kwargs.get( "bandpass_filters", {"P": [2.0, 16.0, 2], "S": [2.0, 16.0, 2]} ) self.sta_lta_windows = kwargs.get( "sta_lta_windows", {"P": [0.2, 1.0], "S": [0.2, 1.0]} ) self.channel_maps = kwargs.get("channel_maps", {"P": "*Z", "S": "*[N,E,1,2]"}) self.channel_counts = kwargs.get("channel_counts", {"P": 1, "S": 2}) # --- Data quality parameters --- self.all_channels = kwargs.get("all_channels", False) self.allow_gaps = kwargs.get("allow_gaps", False) self.full_timespan = kwargs.get("full_timespan", True) # --- Deprecated --- self.onset_centred = kwargs.get("onset_centred") self.p_bp_filter = kwargs.get("p_bp_filter") self.s_bp_filter = kwargs.get("s_bp_filter") self.p_onset_win = kwargs.get("p_onset_win") self.s_onset_win = kwargs.get("s_onset_win") def __str__(self): """Return short summary string of the Onset object.""" out = ( f"\tOnset parameters - using the {self.position} STA/LTA onset" f"\n\t\tOnset function sampling rate = {self.sampling_rate} Hz" f"\n\t\tPhase(s) = {self.phases}\n" ) for phase, filt in self.bandpass_filters.items(): out += f"\n\t\t{phase} bandpass filter = {filt} (Hz, Hz, -)" out += "\n" for phase, windows in self.sta_lta_windows.items(): out += f"\n\t\t{phase} onset [STA, LTA] = {windows} (s, s)" out += "\n" return out
[docs] def calculate_onsets(self, data, timespan=None): """ Calculate onset functions for the requested stations and phases. Returns a stacked array of onset functions for the requested phases, and an :class:`~quakemigrate.signal.onsets.base.OnsetData` object containing all outputs from the onset function calculation: a dict of the onset functions, a Stream containing the pre-processed input waveforms, and a dict of availability info describing which of the requested onset functions could be calculated (depending on data availability and data quality checks). Parameters ---------- data : :class:`~quakemigrate.io.data.WaveformData` object Light class encapsulating data returned by an archive query. timespan : float or None, optional If the timespan for which the onsets are being generated is provided, this will be used to calculate the tapered window of data at the start and end of the onset function which should be disregarded. This is necessary to accurately set the pick threshold in GaussianPicker, for example. Returns ------- onsets : `numpy.ndarray` of float Stacked onset functions served up for migration, shape(nonsets, nsamples). onset_data : :class:`~quakemigrate.signal.onsets.base.OnsetData` object Light class encapsulating data generated during onset calculation. """ onsets = [] onsets_dict = {} filtered_waveforms = Stream() availability = {} # Loop through phases, pre-process data, and calculate onsets. for phase in self.phases: # Select traces based on channel map for this phase phase_waveforms = data.waveforms.select(channel=self.channel_maps[phase]) # Convert sta window, lta window lengths from seconds to samples. stw, ltw = self.sta_lta_windows[phase] stw = util.time2sample(stw, self.sampling_rate) + 1 ltw = util.time2sample(ltw, self.sampling_rate) + 1 # Pre-process the data. The ObsPy functions operate by trace, so # will not break on gappy data (we haven't checked availability # yet) filtered_phase_waveforms = pre_process( phase_waveforms, self.sampling_rate, data.resample, data.upfactor, self.bandpass_filters[phase], data.starttime, data.endtime, ) # Loop through stations, check data availability for this phase, # and store this info, filtered waveforms and calculated onsets for station in data.stations: waveforms = filtered_phase_waveforms.select(station=station) available, av_dict = data.check_availability( waveforms, all_channels=self.all_channels, n_channels=self.channel_counts[phase], allow_gaps=self.allow_gaps, full_timespan=self.full_timespan, check_sampling_rate=True, sampling_rate=self.sampling_rate, ) availability[f"{station}_{phase}"] = available # If no data available, skip if available == 0: logging.info(f"\t\tNo {phase} onset for {station}.") continue # Check that all channels met the availability critera. If # not, remove this channel from the stream. for key, available in av_dict.items(): if available == 0: to_remove = waveforms.select(id=key) [waveforms.remove(tr) for tr in to_remove] # Pad with tiny floats so onset will be the correct length. # Note: this will only have an effect if allow_gaps=True or # full_timespan=False. Otherwise, there will be no gaps to pad. if self.allow_gaps or not self.full_timespan: # Square root to avoid floating point errors when value # is squared to compute the energy trace tiny = np.sqrt(np.finfo(float).tiny) # Apply another taper to remove transients from filtering - # this is within the pre- and post-pad for continuous data waveforms.taper(type="cosine", max_percentage=0.05) # Fill gaps waveforms.merge(method=1, fill_value=tiny) # Pad start/end; delta of +/-0.00001 is to avoid # occasional obspy weirdness. `nearest_sample` is # appropriate as data is at uniform sampling rate with # off-sample data corrected by util.shift_to_sample() waveforms.trim( starttime=data.starttime - 0.00001, endtime=data.endtime + 0.00001, pad=True, fill_value=tiny, nearest_sample=False, ) # Calculate onset and add to WaveForm data object; add filtered # waveforms that have passed the availability check to # WaveformData.filtered_waveforms onsets_dict.setdefault(station, {}).update( {phase: self._onset(waveforms, stw, ltw, timespan)} ) onsets.append(onsets_dict[station][phase]) filtered_waveforms += waveforms logging.debug(filtered_waveforms.__str__(extended=True)) if sum(availability.values()) == 0: raise util.DataAvailabilityException onsets = np.stack(onsets, axis=0) onset_data = OnsetData( onsets_dict, self.phases, self.channel_maps, filtered_waveforms, availability, data.starttime, data.endtime, self.sampling_rate, ) return onsets, onset_data
def _onset(self, stream, stw, ltw, timespan): """ Generates an onset (characteristic) function. If there are multiple components, these are combined as the root-mean-square of the onset functions. Parameters ---------- stream : `obspy.Stream` object Stream containing the pre-processed data from which to calculate the onset function. stw : int Number of samples in the short-term window. ltw : int Number of samples in the long-term window. timespan : float or None If a timespan is provided it will be used to calculate the tapered window of data at the start and end of the onset function which should be disregarded. Returns ------- onset : `numpy.ndarray` of float STA/LTA onset function. """ if self.signal_transform == "energy": transformed_waveforms = [tr.data**2 for tr in stream] elif self.signal_transform == "abs": transformed_waveforms = [np.abs(tr.data) for tr in stream] elif self.signal_transform == "env": transformed_waveforms = [np.abs(hilbert(tr.data)) for tr in stream] elif self.signal_transform == "env_squared": transformed_waveforms = [np.abs(hilbert(tr.data)) ** 2 for tr in stream] if self.position == "centred": if self.use_python_backend: onset_fn = centred_sta_lta_py else: onset_fn = centred_sta_lta elif self.position == "classic": if self.use_python_backend: onset_fn = overlapping_sta_lta_py else: onset_fn = overlapping_sta_lta onsets = np.array( [onset_fn(waveform, stw, ltw) for waveform in transformed_waveforms] ) if timespan: onsets = self._trim_taper_pad(onsets, stw, ltw, timespan) # Combine onsets when using multiple components onset = np.sqrt(np.sum([onset**2 for onset in onsets], axis=0) / len(onsets)) onset = np.clip(onset, self.min_onset_value, np.inf) return onset def _trim_taper_pad(self, onsets, stw, ltw, timespan): """ Set the value of the tapered windows at the start and end of the onset function (plus one long-term window and one short-term window, respectively) to 1. Parameters ---------- onsets : `numpy.ndarray` of float STA/LTA onset function. stw : int Number of samples in the short-term window. ltw : int Number of samples in the long-term window. timespan : float Used to calculate the tapered window of data at the start and end of the onset function which should be disregarded. Returns ------- onsets : `numpy.ndarray` of float STA/LTA onset function, with the value in the tapered regions of data set to 1. """ # Calculate duration of taper pre- and post-pad and convert to samples pre_pad, _ = self.pad(timespan) # Taper pre- and post-pad are identical - just calculate one taper_pad = util.time2sample(pre_pad - self.pre_pad, self.sampling_rate) for onset in onsets: onset[: (taper_pad + ltw - 1)] = 1.0 onset[-(stw + taper_pad) :] = 1.0 return onsets
[docs] def gaussian_halfwidth(self, phase): """ Return the phase-appropriate Gaussian half-width estimate based on the short-term average window length. Parameters ---------- phase : {'P', 'S'} Seismic phase for which to serve the estimate. """ return self.sta_lta_windows[phase][0] * self.sampling_rate / 2
@property def pre_pad(self): """Pre-pad is determined as a function of the onset windows""" windows = self.sta_lta_windows pre_pad = max([windows[key][1] for key in windows.keys()]) + 3 * max( [windows[key][0] for key in windows.keys()] ) return pre_pad @pre_pad.setter def pre_pad(self, value): """Setter for pre-pad""" self._pre_pad = value @property def post_pad(self): """ Post-pad is determined as a function of the max traveltime in the grid and the onset windows """ return self._post_pad @post_pad.setter def post_pad(self, ttmax): """ Define post-pad as a function of the maximum travel-time between a station and a grid point plus the LTA (in case onset_centred is True) """ windows = self.sta_lta_windows lta_max = max([windows[key][1] for key in windows.keys()]) self._post_pad = np.ceil(ttmax + 2 * lta_max) # --- Deprecation/Future handling --- @property def onset_centred(self): """Handle deprecated onset_centred kwarg / attribute""" return self.position @onset_centred.setter def onset_centred(self, value): """ Handle deprecated onset_centred kwarg / attribute and print warning. """ if value is None: return print( "FutureWarning: Parameter name has changed - continuing.\n" "To remove this message, change:\n" "\t'onset_centred' -> 'position'" ) if value: self.position = "centred" else: self.position = "classic" @property def p_bp_filter(self): """Handle deprecated p_bp_filter kwarg / attribute""" return self.bandpass_filters["P"] @p_bp_filter.setter def p_bp_filter(self, value): """ Handle deprecated p_bp_filter kwarg / attribute and print warning. """ if value is None: return print( "FutureWarning: Parameter name has changed - continuing.\n" "To remove this message, refer to the documentation." ) self.bandpass_filters["P"] = value @property def s_bp_filter(self): """Handle deprecated s_bp_filter kwarg / attribute""" return self.bandpass_filters["S"] @s_bp_filter.setter def s_bp_filter(self, value): """ Handle deprecated s_bp_filter kwarg / attribute and print warning. """ if value is None: return print( "FutureWarning: Parameter name has changed - continuing.\n" "To remove this message, refer to the documentation." ) self.bandpass_filters["S"] = value @property def p_onset_win(self): """Handle deprecated p_onset_win kwarg / attribute""" return self.sta_lta_windows["P"] @p_onset_win.setter def p_onset_win(self, value): """ Handle deprecated p_onset_win kwarg / attribute and print warning. """ if value is None: return print( "FutureWarning: Parameter name has changed - continuing.\n" "To remove this message, refer to the documentation." ) self.sta_lta_windows["P"] = value @property def s_onset_win(self): """Handle deprecated s_onset_win kwarg / attribute""" return self.sta_lta_windows["S"] @s_onset_win.setter def s_onset_win(self, value): """ Handle deprecated s_onset_win kwarg / attribute and print warning. """ if value is None: return print( "FutureWarning: Parameter name has changed - continuing.\n" "To remove this message, refer to the documentation." ) self.sta_lta_windows["S"] = value
[docs]class CentredSTALTAOnset(STALTAOnset): """ QuakeMigrate default onset function class - uses a centred STA/LTA onset. NOTE: THIS CLASS HAS BEEN DEPRECATED AND WILL BE REMOVED IN A FUTURE UPDATE """ def __init__(self, **kwargs): """Instantiate CentredSTALTAOnset object.""" super().__init__(**kwargs) print( "FutureWarning: This class has been deprecated - continuing.\n" "To remove this message:\n" "\tCentredSTALTAOnset -> STALTAOnset\n" "\tAnd add keyword argument 'position=centred'\n" ) self.position = "centred"
[docs]class ClassicSTALTAOnset(STALTAOnset): """ QuakeMigrate default onset function class - uses a classic STA/LTA onset. NOTE: THIS CLASS HAS BEEN DEPRECATED AND WILL BE REMOVED IN A FUTURE UPDATE """ def __init__(self, **kwargs): """Instantiate ClassicSTALTAOnset object.""" super().__init__(**kwargs) print( "FutureWarning: This class has been deprecated - continuing.\n" "To remove this message:\n" "\tClassicSTALTAOnset -> STALTAOnset\n" "\tAnd add keyword argument 'position=classic'\n" ) self.position = "classic"