# -*- coding: utf-8 -*-
"""
Module containing methods to measure Wood-Anderson corrected waveform amplitudes to be
used for local magnitude calculation.
:copyright:
2020–2023, QuakeMigrate developers.
:license:
GNU General Public License, Version 3
(https://www.gnu.org/licenses/gpl-3.0.html)
"""
import logging
import numpy as np
from obspy import UTCDateTime
from obspy.geodetics.base import gps2dist_azimuth
import pandas as pd
from scipy.signal import find_peaks, iirfilter, sosfreqz, hilbert
import quakemigrate.util as util
[docs]class Amplitude:
"""
Part of the QuakeMigrate LocalMag class; measures Wood-Anderson corrected waveform
amplitudes to be used for local magnitude calculation.
Simulates the Wood-Anderson waveforms using a user-supplied set of response removal
parameters, then measures the maximum peak-to-trough amplitude in time windows
around the P and S phase arrivals. These windows are calculated from the phase pick
times from the autopicker, if available, or from the modelled pick times. The length
of the S-wave signal window is calculated according to a user-specified
`signal_window` parameter.
The user may optionally specify a filter to apply to the waveforms before amplitudes
are measured, in order (for example) to reduce the impact of high-amplitude noise
associated with the oceanic microseisms on the measurement of low-amplitude
wavetrains associated with microseismic events. Note this will generally result in
an underestimate of the true earthquake waveform amplitude, even when the filter
gain is corrected for.
A measurement of the signal amplitude in a window preceding the P-wave arrival is
used to characterise the "noise" amplitude. This can be used to filter out null
observations, and to provide an estimate of the uncertainty on the max amplitude
measurements contributed by extraneous noise.
Attributes
----------
signal_window : float
Length of S-wave signal window, in addition to the time window associated with
the marginal_window and traveltime uncertainty. (Default 0 s)
noise_window : float
Length of the time window before the P-wave signal window in which to measure
the noise amplitude. (Default 5 s)
noise_measure : {"RMS", "STD", "ENV"}
Method by which to measure the noise amplitude; root-mean-quare, standard
deviation or average amplitude of the envelope of the signal. (Default "RMS")
loc_method : {"spline", "gaussian", "covariance"}
Which event location estimate to use. (Default "spline")
highpass_filter : bool
Whether to apply a high-pass filter before measuring amplitudes. (Default False)
highpass_freq : float
High-pass filter frequency. Required if highpass_filter is True.
bandpass_filter : bool
Whether to apply a band-pass filter before measuring amplitudes. (Default False)
bandpass_lowcut : float
Band-pass filter low-cut frequency. Required if bandpass_filter is True.
bandpass_highcut : float
Band-pass filter high-cut frequency. Required if bandpass_filter is True.
filter_corners : int
number of corners for the chosen filter. (Default 4)
prominence_multiplier : float
To set a prominence filter in the peak-finding algorithm. (Default 0. = off)
NOTE: not recommended for use in combination with a filter; filter gain
corrections can lead to spurious results. Please see the
`scipy.signal.find_peaks` documentation for further guidance.
Methods
-------
get_amplitudes(event, lut)
Raises
------
AttributeError
If both `highpass_filter` and `bandpass_filter` are selected, or if the user
selects to apply a filter but does not provide the relevant frequencies.
AttributeError
If response removal parameters are provided here instead of to the
:class:`~quakemigrate.io.data.Archive` object.
"""
def __init__(self, amplitude_params={}):
"""Instantiate the Amplitude object."""
# Amplitude measurement parameters
if "signal_window" not in amplitude_params.keys():
logging.warning("Warning: 'signal_window' not specified. Set to default: 0")
self.signal_window = amplitude_params.get("signal_window", 0.0)
self.noise_window = amplitude_params.get("noise_window", 5.0)
self.noise_measure = amplitude_params.get("noise_measure", "RMS")
self.prominence_multiplier = amplitude_params.get("prominence_multiplier", 0.0)
self.loc_method = amplitude_params.get("loc_method", "spline")
# Pre-processing parameters
self.highpass_filter = amplitude_params.get("highpass_filter", False)
if self.highpass_filter:
try:
self.highpass_freq = amplitude_params["highpass_freq"]
except KeyError as e:
raise AttributeError(f"Highpass filter frequency not specified! {e}")
self.bandpass_filter = amplitude_params.get("bandpass_filter", False)
if self.bandpass_filter:
try:
self.bandpass_lowcut = amplitude_params.get("bandpass_lowcut")
self.bandpass_highcut = amplitude_params.get("bandpass_highcut")
except KeyError as e:
raise AttributeError(f"Bandpass filter frequencies not specified! {e}")
self.filter_corners = amplitude_params.get("filter_corners", 4)
if self.highpass_filter and self.bandpass_filter:
raise AttributeError(
"Both bandpass filter *and* highpass filter selected! "
"Please choose one or the other."
)
# Handle deprecated response removal parameters
if any(
param in amplitude_params.keys()
for param in ["water_level", "pre_filt", "remove_full_response"]
):
raise AttributeError(
"The response removal parameters ('water_level', 'pre_filt', "
"'remove_full_response') have been moved to the Archive object. Please "
"specify them there as e.g. 'archive.water_level = 60.' or by "
"providing a dictionary of response_removal parameters - see the "
"template locate script for guidance."
)
def __str__(self):
"""Return short summary string of the Amplitude object."""
out = (
"\t Amplitude parameters:\n"
f"\t\tSignal window = {self.signal_window} s\n"
f"\t\tNoise window = {self.noise_window} s\n"
f"\t\tNoise measure = {self.noise_measure}\n"
f"\t\tLocation used = {self.loc_method}\n"
)
if self.prominence_multiplier != 0.0:
out += f"\t\tProminence multiplier = {self.prominence_multiplier}"
out += "\n"
if self.highpass_filter:
out += (
"\t\tHighpass filter: \n"
f"\t\t Filter frequency = {self.highpass_freq} Hz\n"
f"\t\t Filter corners = {self.filter_corners}\n"
)
elif self.bandpass_filter:
out += (
"\t\tBandpass filter: \n"
f"\t\t Lowcut frequency = {self.bandpass_lowcut} Hz\n"
f"\t\t Highcut frequency = {self.bandpass_highcut} Hz\n"
f"\t\t Filter corners = {self.filter_corners}\n"
)
return out
[docs] @util.timeit()
def get_amplitudes(self, event, lut):
"""
Measure phase amplitudes for an event.
Parameters
----------
event : :class:`~quakemigrate.io.event.Event` object
Light class encapsulating waveforms, coalescence information, picks and
location information for a given event.
lut : :class:`~quakemigrate.lut.lut.LUT` object
Contains the traveltime lookup tables for seismic phases, computed for some
pre-defined velocity model.
Returns
-------
amplitudes : `pandas.DataFrame` object
P- and S-wave amplitude measurements for each component of each station in
the look-up table.
Columns:
epi_dist : float
Epicentral distance between the station and the event hypocentre.
z_dist : float
Vertical distance between the station and the event hypocentre.
P_amp : float
Half maximum peak-to-trough amplitude in the P signal window. In
*millimetres*. Corrected for filter gain, if applicable.
P_freq : float
Approximate frequency of the maximum amplitude P-wave signal.
Calculated from the peak-to-trough time interval of the max
peak-to-trough amplitude.
P_time : `obspy.UTCDateTime` object
Approximate time of amplitude observation (halfway between peak and
trough times).
P_avg_amp : float
Average amplitude in the P signal window, measured by the same
method as the Noise_amp (see `noise_measure`) and corrected for the
same filter gain as `P_amp`. In *millimetres*.
P_filter_gain : float or NaN
Filter gain at `P_freq` - which has been corrected for in the P_amp
measurements - if a filter was applied prior to amplitude
measurement; Else NaN.
S_amp : float
As for P, but in the S wave signal window.
S_freq : float
As for P.
S_time : `obspy.UTCDateTime` object
As for P.
S_avg_amp : float
As for P.
S_filter_gain : float or NaN.
As for P.
Noise_amp : float
The average signal amplitude in the noise window. In *millimetres*.
See `noise_measure` parameter.
is_picked : bool
Whether at least one of the phase arrivals was picked by the
autopicker.
Index = Trace ID (see `obspy.Trace` object property 'id')
"""
# Initialise amplitudes DataFrame
amplitudes = pd.DataFrame(
columns=[
"id",
"epi_dist",
"z_dist",
"P_amp",
"P_freq",
"P_time",
"P_avg_amp",
"P_filter_gain",
"S_amp",
"S_freq",
"S_time",
"S_avg_amp",
"S_filter_gain",
"Noise_amp",
"is_picked",
]
)
# Get event hypocentre location
ev_loc = event.get_hypocentre(self.loc_method)
# Get traveltimes for all stations and phases: much quicker than
# doing this multiple times in the loop
event_ijk = lut.index2coord(ev_loc, inverse=True)[0]
try:
p_ttimes = lut.traveltime_to("P", event_ijk)
s_ttimes = lut.traveltime_to("S", event_ijk)
except KeyError:
raise util.LUTPhasesException(
"Both P and S traveltimes are required to measure phase "
"amplitudes for local magnitude calculation. Please create "
"a new lookup table with phases=['P', 'S']"
)
# Get start of earliest possible noise window and end of latest
# possible signal window
max_tt = lut.max_traveltime
pre_pad, post_pad = self.pad(event.marginal_window, max_tt, lut.fraction_tt)
tr_start = event.otime - pre_pad
tr_end = event.otime + post_pad
logging.debug(f"{tr_start}, {tr_end}, {event.otime}")
# Loop through stations in LUT, calculating amplitude info
for i, station_data in lut.station_data.iterrows():
station = station_data["Name"]
epi_dist, z_dist = self._get_distances(
ev_loc, station_data, lut.unit_conversion_factor
)
# Columns: tr_id, epicentral distance, vertical distance, P_amp,
# P_freq, P_time, P_noise_ratio, S_amp, S_freq, S_time,
# S_noise_ratio, Noise_amp, picked
amps_template = [
"",
epi_dist,
z_dist,
np.nan,
np.nan,
np.nan,
np.nan,
np.nan,
np.nan,
np.nan,
np.nan,
np.nan,
np.nan,
np.nan,
False,
]
# Read in raw waveforms -- work on a copy!!
st = event.data.raw_waveforms.select(station=station).copy()
# Trim to padding window to ensure taper does not encroach on the
# noise or signal window.
st.trim(starttime=tr_start, endtime=tr_end)
for j, comp in enumerate(["[E,2]", "[N,1]", "Z"]):
amps = amps_template.copy()
tr = st.select(component=comp)
# if trace is empty (no traces) or there is more than 1 (gaps),
# or data isn't continuous within the time window where data is
# needed to make the amplitude measurements, skip
if (
bool(tr)
and len(tr) == 1
and tr[0].stats.starttime < (tr_start + tr[0].stats.delta)
and tr[0].stats.endtime > (tr_end - tr[0].stats.delta)
):
tr = tr[0]
else:
amps[0] = f".{station}..{comp}"
amplitudes.loc[i * 3 + j] = amps
continue
amps[0] = tr.id
# Do response removal
try:
tr = event.data.get_wa_waveform(tr, velocity=False)
except (util.ResponseNotFoundError, util.ResponseRemovalError) as e:
logging.warning(e)
amplitudes.loc[i * 3 + j] = amps
continue
if self.bandpass_filter or self.highpass_filter:
filter_sos = self._filter_trace(tr)
else:
filter_sos = None
try:
windows, picked = self._get_amplitude_windows(
station, i, event, p_ttimes, s_ttimes, lut.fraction_tt
)
amps[14] = picked
except util.PickOrderException as e:
logging.warning(f"{e}")
amplitudes.loc[i * 3 + j] = amps
continue
amps = self._measure_signal_amps(
amps, tr, windows, self.noise_measure, filter_sos
)
noise_amp = self._measure_noise_amp(tr, windows, self.noise_measure)
amps[13] = noise_amp
# 3 rows per station; one for each component
amplitudes.loc[i * 3 + j] = amps
amplitudes = amplitudes.set_index("id")
return amplitudes
def _get_distances(self, ev_loc, station_data, unit_conversion_factor):
"""
Get epicentral and vertical distances between a station and an event hypocentre.
Parameters
----------
ev_loc : array-like
Event hypocentre location in geographic coordinate system.
station_data : `pandas.Series` object
Station information - keys: ["Name", "Latitude", "Longitude", "Elevation"].
unit_conversion_factor : float
A conversion factor based on the lookup table grid projection, used to
ensure the distances returned have units of kilometres.
Returns
-------
epi_dist : float
Epicentral distance between the station and the event hypocentre.
z_dist : float
Vertical distance between the station and the event hypocentre.
"""
# Get station location
stla, stlo, stel = station_data[["Latitude", "Longitude", "Elevation"]].values
# Get event location
evlo, evla, evdp = ev_loc
# Evaluate epicentral distance between station and event.
# gps2dist_azimuth returns distances in metres -- magnitudes
# calculation requires distances in kilometres.
epi_dist = gps2dist_azimuth(evla, evlo, stla, stlo)[0] / 1000
# Evaulate vertical distance between station and event. Convert to kilometres.
km_cf = 1000 / unit_conversion_factor
z_dist = (evdp - stel) / km_cf # NOTE: stel is actually depth.
return epi_dist, z_dist
def _filter_trace(self, tr):
"""
Apply a highpass or bandpass filter to the supplied Trace. Filtering is applied
in-place on the `obspy.Trace` object.
Parameters
----------
tr : `obspy.Trace` object
Trace to be filtered
Returns
-------
filter_sos : `numpy.ndarray`
Second-order sections representation of the applied filter.
"""
# Try to apply bandpass filter, unless the specified lowpass frequency is higher
# than the Nyquist. In this case, apply a highpass.
if self.bandpass_filter:
try:
filter_sos = self._bandpass_filter(tr)
except util.NyquistException as e:
logging.warning(f"\t{e} Applying a high-pass filter instead..")
filter_sos = self._highpass_filter(tr)
else:
filter_sos = self._highpass_filter(tr)
return filter_sos
def _bandpass_filter(self, tr):
"""
Apply a bandpass filter to the supplied `obspy.Trace` object; filter operation
is applied in-place.
Parameters
----------
tr : `obspy.Trace` object
Trace to be filtered.
Returns
-------
filter_sos : `numpy.ndarray`
Second-order sections representation of the applied filter.
Raises
------
NyquistException
If the high-cut filter specified for the bandpass filter is higher than the
Nyquist frequency of a trace.
"""
freqmin = self.bandpass_lowcut
freqmax = self.bandpass_highcut
corners = self.filter_corners
# Check specified freqmax is possible for this trace
f_nyquist = 0.5 * tr.stats.sampling_rate
low_f_crit = freqmin / f_nyquist
high_f_crit = freqmax / f_nyquist
if high_f_crit - 1.0 > -1e-6:
raise util.NyquistException(freqmax, f_nyquist, tr.id)
# Pre-process and apply filter
tr.detrend("linear")
tr.taper(0.05, "cosine")
tr.filter(
type="bandpass",
freqmin=freqmin,
freqmax=freqmax,
corners=corners,
zerophase=False,
)
# Generate filter coefficients for the bandpass filter we applied; this is how
# the filter is designed within ObsPy
filter_sos = iirfilter(
N=corners,
Wn=[low_f_crit, high_f_crit],
btype="bandpass",
ftype="butter",
output="sos",
)
return filter_sos
def _highpass_filter(self, tr):
"""
Apply a highpass filter to the supplied `obspy.Trace` object; filter
operation is applied in-place.
Parameters
----------
tr : `obspy.Trace` object
Trace to be filtered.
Returns
-------
filter_sos : `numpy.ndarray`
Second-order sections representation of the applied filter.
"""
# Check if we have been diverted here from trying to apply a bandpass filter.
# Else use specified params for highpass.
if self.bandpass_filter:
filt_freq = self.bandpass_lowcut
else:
filt_freq = self.highpass_freq
corners = self.filter_corners
f_nyquist = 0.5 * tr.stats.sampling_rate
f_crit = filt_freq / f_nyquist
# Pre-process and apply filter
tr.detrend("linear")
tr.taper(0.05, "cosine")
tr.filter(type="highpass", freq=filt_freq, corners=corners, zerophase=False)
# Generate filter coefficients for the highpass filter we applied; this is how
# the filter is designed within ObsPy
filter_sos = iirfilter(
N=corners, Wn=f_crit, btype="highpass", ftype="butter", output="sos"
)
return filter_sos
def _get_amplitude_windows(
self, station, i, event, p_ttimes, s_ttimes, fraction_tt
):
"""
Calculate the start and end time of the windows to measure the max P- and S-wave
amplitudes in. This is done on the basis of the pick times, the event marginal
window, the traveltime and uncertainty and the specified S-wave signal window.
P_window_start : P_pick - marginal_window - traveltime_uncertainty
P_window_end : equivalent to start, or S_pick time; whichever is
earlier
S_window_start : same as P
S_window_end : S_pick + signal_window + marginal_window +
traveltime_uncertainty
traveltime_uncertainty = traveltime * fraction_tt
(where fraction_tt is as specified for the lookup table).
Parameters
----------
station : str
Station name.
i : int
Iterator variable.
event : :class:`~quakemigrate.io.event.Event` object
Light class encapsulating signal, onset, pick and location information for a
given event.
p_ttimes : array-like
Array of interpolated P traveltimes to the requested grid position.
s_ttimes : array-like
Array of interpolated S traveltimes to the requested grid position.
fraction_tt : float
An estimate of the uncertainty in the velocity model as a function of the
traveltime.
Returns
-------
windows : array-like
[[P_window_start, P_window_end], [S_window_start, S_window_end]]
picked : bool
Whether at least one of the phases was picked by the autopicker.
Raises
------
PickOrderException
If the P pick for an event/station is later than the S pick.
"""
p_pick, s_pick, picked = self._get_picks(station, event)
for pick, phase in [[p_pick, "P"], [s_pick, "S"]]:
if not isinstance(pick, UTCDateTime):
if pick == "-1":
if phase == "P":
p_pick = event.otime + p_ttimes[i]
else:
s_pick = event.otime + s_ttimes[i]
# If there was no onset available for one phase, the pick for the other
# may not have been checked to ensure it was made before/after the
# modelled arrival time for the other phase. So use modelled arrival
# time for both phases.
elif pick == f"No {phase} onset":
logging.debug(
f"No onset available when picking {phase} on "
f"{station}. Using modelled arrival times."
)
p_pick = event.otime + p_ttimes[i]
s_pick = event.otime + s_ttimes[i]
break
# Check p_pick is before s_pick
try:
assert p_pick < s_pick
except AssertionError:
raise util.PickOrderException(event.uid, station, p_pick, s_pick)
# For P:
p_start = p_pick - event.marginal_window - p_ttimes[i] * fraction_tt
p_end = p_pick + event.marginal_window + p_ttimes[i] * fraction_tt
# For S:
s_start = s_pick - event.marginal_window - s_ttimes[i] * fraction_tt
s_end = (
s_pick
+ event.marginal_window
+ s_ttimes[i] * fraction_tt
+ self.signal_window
)
# Check for overlaps
if s_start < p_end:
mid_time = p_end + (s_start - p_end) / 2
windows = [[p_start, mid_time], [mid_time, s_end]]
elif s_start - p_end < self.signal_window:
windows = [[p_start, s_start], [s_start, s_end]]
else:
windows = [[p_start, p_end + self.signal_window], [s_start, s_end]]
return windows, picked
def _get_picks(self, station, event):
"""
Get picks from this station for this event. If no phase pick is found, -1 is
returned. If no picks at all are found, "No <phase> onset" is returned.
Parameters
----------
station : str
Station name.
event : :class:`~quakemigrate.io.event.Event` object
Light class encapsulating waveforms, coalescence information, picks and
location information for a given event.
Returns
-------
p_pick : `obspy.UTCDateTime` object, "-1" or "No_P_onset"
P pick time. Autopick time if available, otherwise -1. If no onset function
was available to the autopicker, "No_<phase>_onset" is returned.
s_pick : `obspy.UTCDateTime` object, "-1" or "No_S_onset"
As for P.
picked : bool
Whether at least one phase was picked by the auto-picker.
"""
picks = event.picks["df"]
picks = picks.loc[picks["Station"] == station]
picked = False
if len(picks) > 0:
try:
p_pick = picks.loc[picks["Phase"] == "P"]["PickTime"].iloc[0]
p_pick = UTCDateTime(str(p_pick))
picked = True
except IndexError:
p_pick = "No P onset"
except ValueError: # UTCDateTime("-1") -> ValueError
p_pick = "-1"
try:
s_pick = picks.loc[picks["Phase"] == "S"]["PickTime"].iloc[0]
s_pick = UTCDateTime(str(s_pick))
picked = True
except IndexError:
s_pick = "No S onset"
except ValueError: # UTCDateTime("-1") -> ValueError
s_pick = "-1"
else:
p_pick = s_pick = "-1"
return p_pick, s_pick, picked
def _measure_signal_amps(self, amps, tr, windows, method="RMS", filter_sos=None):
"""
Loop through the windows and measure the maximum half peak-to-peak amplitude,
the approximate frequency (derived from the p2p time) and the time at which it
occurs.
Performs a linear detrend across the window before measuring amplitudes.
NOTE: signal amplitudes are returned in *millimetres*. This may mean the
formulation of the local magnitude attenuation function being used needs to be
adjusted to match these units. All functions provided in QM are in millimetres.
Parameters
----------
amps : list
Amplitude information for this trace.
Columns = ["epi_dist", "z_dist", "P_amp", "P_freq", "P_time", "P_avg_amp",
"P_filter_gain", "S_amp", "S_freq", "S_time", "S_avg_amp",
"S_filter_gain", "Noise_amp", "is_picked"]
tr : `obspy.Trace` object
Trace from which to measure amplitudes.
windows : array-like
[[P_window_start, P_window_end], [S_window_start, S_window_end]]
method : {"RMS", "STD", "ENV"}, optional
The method by which to measure the average amplitude in the signal window:
root-mean-square, standard deviation or average amplitude of the envelope of
the signal. (Default "RMS")
filter_sos : `numpy.ndarray`, optional
Second-order sections representation of the filter applied to the trace (if
applicable).
Returns
-------
amps : list
Amplitude information for this trace.
Columns = ["epi_dist", "z_dist", "P_amp", "P_freq", "P_time", "P_avg_amp",
"P_filter_gain", "S_amp", "S_freq", "S_time", "S_avg_amp",
"S_filter_gain", "Noise_amp", "is_picked"]
With newly populated values:
P_amp : float
Half maximum peak-to-trough amplitude in the P signal window. In
*millimetres*. Corrected for filter gain, if applicable.
P_freq : float
Approximate frequency of the maximum amplitude P-wave signal.
Calculated from the peak-to-trough time interval of the max
peak-to-trough amplitude.
P_time : `obspy.UTCDateTime` object
Approximate time of amplitude observation (halfway between peak and
trough times).
P_avg_amp : float
Average amplitude in the P signal window, measured by the same
method as the Noise_amp (see `noise_measure`) and corrected for the
same filter gain as `P_amp`. In *millimetres*.
P_filter_gain : float
Filter gain at `P_freq`, which has been corrected for in the P_amp
measurements (if a filter was applied prior to amplitude
measurement).
S_amp : float
As for P, but in the S wave signal window.
S_freq : float
As for P.
S_time : `obspy.UTCDateTime` object
As for P.
S_avg_amp : float
As for P.
S_filter_gain : float
As for P.
"""
# Loop over windows, cut data and measure amplitude
for k, (start_time, end_time) in enumerate(windows):
window = tr.slice(start_time, end_time)
window.detrend("linear")
data = window.data
phase = ["P", "S"][k]
# if trace (window) is empty (no data points) or a flat line, do
# not make a measurement
if not bool(window) or data.max() == data.min():
logging.warning(
f"{phase} signal window doesn't contain any "
f"data for trace {window.id}"
)
continue
# Measure maximum half peak-to-trough amplitude
try:
half_amp, approx_freq, p2t_time = self._peak_to_trough_amplitude(window)
except util.PeakToTroughError as e:
logging.warning(
f"Amplitude measurement failed in {phase} "
f"signal window for trace {window.id}: {e.msg}"
)
continue
# Measure average amplitude
average_amp = self._average_amplitude(window, method)
# Correct for filter gain at approximate frequency of
# measured amplitude
filter_gain = None
if self.bandpass_filter or self.highpass_filter:
_, filter_gain = sosfreqz(
filter_sos, worN=[approx_freq], fs=tr.stats.sampling_rate
)
filter_gain = np.abs(filter_gain[0])
if not filter_gain:
logging.info(
"\t Warning: Invalid frequency ("
f"{approx_freq:.5g} Hz) for {phase}_amp "
f"measurement on:\n\t\t{tr}"
)
continue
else:
half_amp /= filter_gain
average_amp /= filter_gain
# Put in relevant columns for P / S amplitude, approx_freq, p2t_time
amps[3 + k * 5 : 8 + k * 5] = (
half_amp,
approx_freq,
p2t_time,
average_amp,
filter_gain,
)
return amps
def _peak_to_trough_amplitude(self, trace):
"""
Measure the maximum peak-to-trough amplitude for a given trace; additionally
output the approximate frequency of this signal (from the peak-to-trough time)
and the time at which it occurs.
NOTE: Returns *half* the maximum peak-to-trough amplitude, as this is what the
measurement of local magnitude is defined from.
NOTE: Units are *millimetres*.
Parameters
----------
trace : `obspy.Trace` object
Waveform for which to measure max peak-to-trough amplitude (corrected to
displacement in units of metres).
Returns
-------
half_amp : float
Half the value of maximum peak-to-trough amplitude, *in millimetres*.
Returns -1 if no measurement could be made.
approx_freq : float
Approximate frequency of the arrival, based on the half-period between the
maximum peak/trough. Returns -1 if no measurement could be made.
p2t_time : `obspy.UTCDateTime` object
Approximate time of amplitude observation (halfway between peak and trough
times.)
Raises
------
PeakToTroughError
If the measurement fails, due to no peaks or troughs being found or
consecutive peaks or troughs being found.
"""
prominence = self.prominence_multiplier * np.max(np.abs(trace.data))
peaks, _ = find_peaks(trace.data, prominence=prominence)
troughs, _ = find_peaks(-trace.data, prominence=prominence)
# Loop through possible orders of peaks and troughs to find the maximum
# peak-to-peak amplitude, and the time difference separating the peaks
full_amp = None
if len(peaks) == 0 or len(troughs) == 0:
raise util.PeakToTroughError("No peaks or troughs found!")
elif len(peaks) == 1 and len(troughs) == 1:
full_amp = np.abs(trace.data[peaks] - trace.data[troughs])[0]
pos = 0
elif len(peaks) == len(troughs):
if peaks[0] < troughs[0]:
a, b, c, d = peaks, troughs, peaks[1:], troughs[:-1]
else:
a, b, c, d = peaks, troughs, peaks[:-1], troughs[1:]
elif not np.abs(len(peaks) - len(troughs)) == 1:
# More than two peaks/troughs next to one another
raise util.PeakToTroughError("Consecutive peaks/troughs!")
elif len(peaks) > len(troughs):
try:
assert peaks[0] < troughs[0]
except AssertionError:
raise util.PeakToTroughError("Consecutive peaks/troughs!")
a, b, c, d = peaks[:-1], troughs, peaks[1:], troughs
elif len(peaks) < len(troughs):
try:
assert peaks[0] > troughs[0]
except AssertionError:
raise util.PeakToTroughError("Consecutive peaks/troughs!")
a, b, c, d = peaks, troughs[1:], peaks, troughs[:-1]
if not full_amp:
fp1 = np.abs(trace.data[a] - trace.data[b])
fp2 = np.abs(trace.data[c] - trace.data[d])
if np.max(fp1) >= np.max(fp2):
pos = np.argmax(fp1)
full_amp = np.max(fp1)
peaks, troughs = a, b
else:
pos = np.argmax(fp2)
full_amp = np.max(fp2)
peaks, troughs = c, d
peak_time = trace.times()[peaks[pos]]
trough_time = trace.times()[troughs[pos]]
p2t_time = trace.stats.starttime + peak_time + (trough_time - peak_time) / 2
# Peak-to-trough is half a period
approx_freq = 1.0 / (np.abs(peak_time - trough_time) * 2.0)
# Local magnitude is defined based on maximum zero-to-peak amplitude in
# *millimetres*
half_amp = full_amp * 1000 / 2
return half_amp, approx_freq, p2t_time
def _measure_noise_amp(self, tr, windows, method="RMS"):
"""
Make a measurement of the signal amplitude in a 'noise window' before the P
signal window. Several methods for making this measurement are available.
Performs a linear detrend across the window before measuring amplitudes.
NOTE: Returns the noise amplitude in millimetres: the chosen formulation of the
local magnitude attenuation function may have to be adjusted to match these
units.
Parameters
----------
tr : `obspy.Trace` object
Trace from which to measure the noise amplitude (corrected to displacement
in units of metres).
windows : array-like
[[P_window_start, P_window_end], [S_window_start, S_window_end]]
method : {"RMS", "STD", "ENV"}, optional
The method by which to measure the amplitude of the signal in the noise
window: root-mean-square, standard deviation or average amplitude of the
envelope of the signal. (Default "RMS")
Returns
-------
noise_amp : float
An estimate of the signal amplitude in the noise window. In millimetres. Not
corrected for filter gain.
"""
p_start = windows[0][0]
# Make a noise measurement in a window of length noise_window, ending at the
# start of the p_window
noise_start = p_start - self.noise_window
noise_end = p_start
noise = tr.slice(noise_start, noise_end)
if not bool(noise) or noise.data.max() == noise.data.min():
logging.warning(
f"Noise window doesn't contain any data for trace {noise.id}"
)
noise_amp = np.nan
else:
noise.detrend("linear")
noise_amp = self._average_amplitude(noise, method)
return noise_amp
def _average_amplitude(self, trace, method):
"""
Measure the average amplitude of a trace.
NOTE: returns amplitude in *millimetres*.
Parameters
----------
trace : `obspy.Trace` object
Trace from which to measure the amplitude (corrected to displacement in
units of metres).
method : {"RMS", "STD", "ENV"}
The method by which to measure the average amplitude of the signal:
root-mean-square, standard deviation or average amplitude of the envelope of
the signal. (Default "RMS").
Returns
-------
amp : float
Average amplitude of the trace (in millimetres).
Raises
------
NotImplementedError
For measurement methods other than {"RMS", "STD", "ENV"}.
"""
if method == "RMS":
amp = np.sqrt(np.mean(np.square(trace.data)))
elif method == "STD":
amp = np.std(trace.data)
elif method == "ENV":
amp = np.mean(np.abs(hilbert(trace.data)))
else:
raise NotImplementedError(
"Only 'RMS', 'STD' and 'ENV' are available currently. Please contact "
"the QuakeMigrate developers."
)
# Convert to *millimetres*
amp *= 1000.0
return amp
[docs] def pad(self, marginal_window, max_tt, fraction_tt):
"""
Calculate padding, including an allowance for the taper applied when filtering/
removing instrument response, to ensure the noise and signal window amplitude
measurements are not affected by the taper.
Parameters
----------
marginal_window : float
Half-width of window centred on the maximum coalescence time of the event
over which the 4-D coalescence function is marginalised. Used here as an
estimate of the origin time uncertainity when calculating the signal
windows.
max_tt : float
Maximum traveltime in the look-up table.
fraction_tt : float
An estimate of the uncertainty in the velocity model as a function of a
fraction of the traveltime. (Default 0.1 == 10%)
Returns
-------
pre_pad : float
Time window by which to pre-pad the data when reading from the waveform
archive.
post_pad : float
Time window by which to post-pad the data when reading from the waveform
archive.
"""
pre_pad = self.noise_window + marginal_window
logging.debug(f"Raw pre-pad: {pre_pad}")
post_pad = self.signal_window + max_tt * (1 + fraction_tt) + marginal_window
logging.debug(f"Raw post-pad: {post_pad}")
timespan = pre_pad + post_pad
pre_pad += np.ceil(timespan * 0.06)
post_pad += np.ceil(timespan * 0.06)
logging.debug(f"Final pre-pad: {pre_pad}, final post-pad: {post_pad}")
return pre_pad, post_pad