# -*- coding: utf-8 -*-
"""
Module that supplies various utility functions and classes.
:copyright:
2020–2023, QuakeMigrate developers.
:license:
GNU General Public License, Version 3
(https://www.gnu.org/licenses/gpl-3.0.html)
"""
import logging
import sys
import time
import warnings
from datetime import datetime
from functools import wraps
from itertools import tee
import matplotlib.ticker as ticker
import numpy as np
from obspy import Trace, Stream
log_spacer = "=" * 110
[docs]def make_directories(run, subdir=None):
"""
Make run directory, and optionally make subdirectories within it.
Parameters
----------
run : `pathlib.Path` object
Location of parent output directory, named by run name.
subdir : str, optional
subdir to make beneath the run level.
"""
run.mkdir(exist_ok=True)
if subdir:
new_dir = run / subdir
new_dir.mkdir(exist_ok=True, parents=True)
[docs]def gaussian_1d(x, a, b, c):
"""
Create a 1-dimensional Gaussian function.
Parameters
----------
x : array-like
Array of x values.
a : float / int
Amplitude (height of Gaussian).
b : float / int
Mean (centre of Gaussian).
c : float / int
Sigma (width of Gaussian).
Returns
-------
f : function
1-dimensional Gaussian function
"""
f = a * np.exp(-1.0 * ((x - b) ** 2) / (2 * (c**2)))
return f
[docs]def gaussian_3d(nx, ny, nz, sgm):
"""
Create a 3-dimensional Gaussian function.
Parameters
----------
nx : array-like
Array of x values.
ny : array-like
Array of y values.
nz : array-like
Array of z values.
sgm : float / int
Sigma (width of gaussian in all directions).
Returns
-------
f : function
3-dimensional Gaussian function
"""
nx2 = (nx - 1) / 2
ny2 = (ny - 1) / 2
nz2 = (nz - 1) / 2
x = np.linspace(-nx2, nx2, nx)
y = np.linspace(-ny2, ny2, ny)
z = np.linspace(-nz2, nz2, nz)
ix, iy, iz = np.meshgrid(x, y, z, indexing="ij")
if np.isscalar(sgm):
sgm = np.repeat(sgm, 3)
sx, sy, sz = sgm
f = np.exp(
-(ix * ix) / (2 * sx * sx)
- (iy * iy) / (2 * sy * sy)
- (iz * iz) / (2 * sz * sz)
)
return f
[docs]def logger(logstem, log, loglevel="info"):
"""
Simple logger that will output to both a log file and stdout.
Parameters
----------
logstem : str
Filestem for log file.
log : bool
Toggle for logging - default is to only print information to stdout.
If True, will also create a log file.
loglevel : str, optional
Toggle for logging level - default is to print only "info" messages to log.
To print more detailed "debug" messages, set to "debug".
"""
level = logging.DEBUG if loglevel == "debug" else logging.INFO
if log:
now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
logfile = logstem.parent / f"{logstem.name}_{now}"
logfile.parent.mkdir(exist_ok=True, parents=True)
handlers = [
logging.FileHandler(str(logfile.with_suffix(".log"))),
logging.StreamHandler(sys.stdout),
]
else:
handlers = [logging.StreamHandler(sys.stdout)]
logging.basicConfig(level=level, format="%(message)s", handlers=handlers)
[docs]def time2sample(time, sampling_rate):
"""
Utility function to convert from seconds and sampling rate to number of samples.
Parameters
----------
time : float
Time to convert.
sampling_rate : int
Sampling rate of input data/sampling rate at which to compute the coalescence
function.
Returns
-------
out : int
Time that correpsonds to an integer number of samples at a specific sampling
rate.
"""
return int(round(time * int(sampling_rate)))
[docs]def calculate_mad(x, scale=1.4826):
"""
Calculates the Median Absolute Deviation (MAD) of the input array x.
Parameters
----------
x : array-like
Input data.
scale : float, optional
A scaling factor for the MAD output to make the calculated MAD factor a
consistent estimation of the standard deviation of the distribution.
Returns
-------
scaled_mad : array-like
Array of scaled mean absolute deviation values for the input array, x, scaled
to provide an estimation of the standard deviation of the distribution.
"""
x = np.asarray(x)
if not x.size:
return np.nan
if np.isnan(np.sum(x)):
return np.nan
# Calculate median and mad values:
med = np.apply_over_axes(np.median, x, 0)
mad = np.median(np.abs(x - med), axis=0)
return scale * mad
[docs]def trim2sample(time, sampling_rate):
"""
Utility function to ensure time padding results in a time that is an integer number
of samples.
Parameters
----------
time : float
Time to trim.
sampling_rate : int
Sampling rate of input data/sampling rate at which to compute the coalescence
function.
Returns
-------
out : int
Time that correpsonds to an integer number of samples at a specific sampling
rate.
"""
return int(np.ceil(time * sampling_rate) / sampling_rate * 1000) / 1000
[docs]def wa_response(convert="DIS2DIS", obspy_def=True):
"""
Generate a Wood Anderson response dictionary.
Parameters
----------
convert : {'DIS2DIS', 'VEL2VEL', 'VEL2DIS'}
Type of output to convert between; determines the number of complex zeros used.
obspy_def : bool, optional
Use the ObsPy definition of the Wood Anderson response (Default).
Otherwise, use the IRIS/SAC definition.
Returns
-------
WOODANDERSON : dict
Poles, zeros, sensitivity and gain of the Wood-Anderson torsion seismograph.
"""
if obspy_def:
# Create Wood-Anderson response - ObsPy values
woodanderson = {
"poles": [-6.283185 - 4.712j, -6.283185 + 4.712j],
"zeros": [0j],
"sensitivity": 2080,
"gain": 1.0,
}
else:
# Create Wood Anderson response - different to the ObsPy values
# http://www.iris.washington.edu/pipermail/sac-help/2013-March/001430.html
woodanderson = {
"poles": [-5.49779 + 5.60886j, -5.49779 - 5.60886j],
"zeros": [0j],
"sensitivity": 2080,
"gain": 1.0,
}
if convert in ("DIS2DIS", "VEL2VEL"):
# Add an extra zero to go from disp to disp or vel to vel.
woodanderson["zeros"].extend([0j])
return woodanderson
[docs]def shift_to_sample(stream, interpolate=False):
"""
Check whether any data in an `obspy.Stream` object is "off-sample" - i.e. the data
timestamps are *not* an integer number of samples after midnight. If so, shift data
to be "on-sample".
This can either be done by shifting the timestamps by a sub-sample time interval, or
interpolating the trace to the "on-sample" timestamps. The latter has the benefit
that it will not affect the timing of the data, but will require additional
computation time and some inevitable edge effects - though for onset calculation
these should be contained within the pad windows. If you are using a sampling
rate < 10 Hz, contact the QuakeMigrate developers.
Parameters
----------
stream : `obspy.Stream` object
Contains list of `obspy.Trace` objects for which to check the timing.
interpolate : bool, optional
Whether to interpolate the data to correct the "off-sample" timing. Otherwise,
the metadata will simply be altered to shift the timestamps "on-sample"; this
will lead to a sub-sample timing offset.
Returns
-------
stream : `obspy.Stream` object
Waveform data with all timestamps "on-sample".
"""
# work on a copy
stream = stream.copy()
for tr in stream:
# Check if microsecond is divisible by sampling rate; only guaranteed to work
# for sampling rates of 1 Hz or less
delta = tr.stats.starttime.microsecond % (1e6 / tr.stats.sampling_rate)
if delta == 0:
if tr.stats.sampling_rate < 1.0:
logging.warning(
f"Trace\n\t{tr}\nhas a sampling rate less than 1 Hz, so off-sample "
"data might not be corrected!"
)
continue
else:
# Calculate time shift to closest "on-sample" timestamp
time_shift = (
round(delta / 1e6 * tr.stats.sampling_rate) / tr.stats.sampling_rate
- delta / 1e6
)
if not interpolate:
logging.info(
f"Trace\n\t{tr}\nhas off-sample data. Applying "
f"{time_shift:+f} s shift to timing."
)
tr.stats.starttime += time_shift
logging.debug(f"Shifted trace: {tr}")
else:
logging.info(
f"Trace\n\t{tr}\nhas off-sample data. "
f"Interpolating to apply a {time_shift:+f} s "
"shift to timing."
)
# Interpolate can only map between values contained within the original
# array. For negative time shift, shift by one sample so new starttime
# is within original array, and add constant value pad after
# interpolation.
new_starttime = tr.stats.starttime + time_shift
if time_shift < 0.0:
new_starttime += tr.stats.delta
tr.interpolate(
sampling_rate=tr.stats.sampling_rate,
method="lanczos",
a=20,
starttime=new_starttime,
)
# Add constant-value pad at end if time_shift is positive, (last sample
# is dropped when interpolating for positive time shifts), else at
# start. If adding at start, also adjust start time.
if time_shift > 0.0:
tr.data = np.append(tr.data, tr.data[-1])
else:
tr.data = np.append(tr.data[0], tr.data)
tr.stats.starttime -= tr.stats.delta
logging.debug(f"Interpolated tr:\n\t{tr}")
return stream
[docs]def resample(stream, sampling_rate, resample, upfactor, starttime, endtime):
"""
Resample data in an `obspy.Stream` object to the specified sampling rate.
By default, this function will only perform decimation of the data. If necessary,
and if the user specifies `resample = True` and an upfactor to upsample by
`upfactor = int`, data can also be upsampled and then, if necessary, subsequently
decimated to achieve the desired sampling rate.
For example, for raw input data sampled at a mix of 40, 50 and 100 Hz, to achieve a
unified sampling rate of 50 Hz, the user would have to specify an upfactor of 5;
40 Hz x 5 = 200 Hz, which can then be decimated to 50 Hz.
NOTE: assumes any data with off-sample timing has been corrected with
:func:`~quakemigrate.util.shift_to_sample`. If not, the resulting traces may not all
contain the correct number of samples.
NOTE: data will be detrended and a cosine taper applied before decimation, in order
to avoid edge effects when applying the lowpass filter.
Parameters
----------
stream : `obspy.Stream` object
Contains list of `obspy.Trace` objects to be decimated / resampled.
resample : bool
If true, perform resampling of data which cannot be decimated directly to the
desired sampling rate.
upfactor : int or None
Factor by which to upsample the data to enable it to be decimated to the desired
sampling rate, e.g. 40Hz -> 50Hz requires upfactor = 5.
Returns
-------
stream : `obspy.Stream` object
Contains list of resampled `obspy.Trace` objects at the chosen sampling rate
`sr`.
"""
# Work on a copy of the stream
stream = stream.copy()
for trace in stream:
trace_sampling_rate = trace.stats.sampling_rate
if sampling_rate != trace_sampling_rate:
if (trace_sampling_rate % sampling_rate) == 0:
stream.remove(trace)
trace = decimate(trace, sampling_rate)
stream += trace
elif resample and upfactor is not None:
# Check the upsampled sampling rate can be decimated to sr
if int(trace_sampling_rate * upfactor) % sampling_rate != 0:
raise BadUpfactorException(trace)
stream.remove(trace)
trace = upsample(trace, upfactor, starttime, endtime)
if trace_sampling_rate != sampling_rate:
trace = decimate(trace, sampling_rate)
stream += trace
else:
logging.info(
f"Mismatched sampling rates - cannot decimate data from\n\t{trace}"
"\n...to resample data, set resample = True and choose a suitable "
"upfactor"
)
# Trim as a general safety net. NOTE: here we are using 'nearest_sample=False', as
# all data in the stream should now be at the desired sampling rate, and with any
# off-sample data having had its timing shifted.
stream.trim(
starttime=starttime - 0.00001, endtime=endtime + 0.00001, nearest_sample=False
)
return stream
[docs]def decimate(trace, sampling_rate):
"""
Decimate a trace to achieve the desired sampling rate, sr.
NOTE: data will be detrended and a cosine taper applied before decimation, in order
to avoid edge effects when applying the lowpass filter before decimating.
Parameters:
-----------
trace : `obspy.Trace` object
Trace to be decimated.
sampling_rate : int
Output sampling rate.
Returns:
--------
trace : `obspy.Trace` object
Decimated trace.
"""
# Work on a copy of the trace
trace = trace.copy()
# Detrend and apply cosine taper
trace.detrend("linear")
trace.detrend("demean")
trace.taper(type="cosine", max_percentage=0.05)
# Zero-phase Butterworth-lowpass filter at Nyquist frequency
trace.filter(
"lowpass", freq=float(sampling_rate) / 2.000001, corners=2, zerophase=True
)
trace.decimate(
factor=int(trace.stats.sampling_rate / sampling_rate),
strict_length=False,
no_filter=True,
)
return trace
[docs]def upsample(trace, upfactor, starttime, endtime):
"""
Upsample a data stream by a given factor, prior to decimation. The upsampling is
carried out by linear interpolation.
NOTE: assumes any data with off-sample timing has been corrected with
:func:`~quakemigrate.util.shift_to_sample`. If not, the resulting traces may not all
contain the correct number of samples (and desired start and end times).
Parameters
----------
trace : `obspy.Trace` object
Trace to be upsampled.
upfactor : int
Factor by which to upsample the data in trace.
Returns
-------
out : `obpsy.Trace` object
Upsampled trace.
"""
data = trace.data
# Fenceposts
dnew = np.zeros((len(data) - 1) * upfactor + 1)
dnew[::upfactor] = data
for i in range(1, upfactor):
dnew[i::upfactor] = (
float(i) / upfactor * data[1:] + float(upfactor - i) / upfactor * data[:-1]
)
# Check if start needs pad - if so pad with constant value (start value of original
# trace). Use inequality here to only apply padding to data at the start and end of
# the requested time window; not for other traces floating in the middle (in the
# case that there are gaps).
if 0.0 < trace.stats.starttime - starttime < trace.stats.delta:
logging.debug(f"Mismatched starttimes: {trace.stats.starttime}, {starttime}")
# Calculate how many additional samples are needed
start_pad = np.round(
(trace.stats.starttime - starttime) * trace.stats.sampling_rate * upfactor
)
logging.debug(f"Start pad = {start_pad}")
# Add padding data (constant value)
start_fill = np.full(int(start_pad), trace.data[0], dtype=int)
dnew = np.append(start_fill, dnew)
# Calculate new starttime of trace
new_starttime = trace.stats.starttime - start_pad / (
trace.stats.sampling_rate * upfactor
)
logging.debug(f"New starttime = {new_starttime}")
else:
new_starttime = trace.stats.starttime
# Ditto for end of trace
if 0.0 < endtime - trace.stats.endtime < trace.stats.delta:
logging.debug(f"Mismatched endtimes: {trace.stats.endtime}, {endtime}")
# Calculate how many additional samples are needed
end_pad = np.round(
(endtime - trace.stats.endtime) * trace.stats.sampling_rate * upfactor
)
logging.debug(f"End pad = {end_pad}")
# Add padding data (constant value)
end_fill = np.full(int(end_pad), trace.data[-1], dtype=int)
dnew = np.append(dnew, end_fill)
out = Trace()
out.data = dnew
out.stats = trace.stats.copy()
out.stats.npts = len(out.data)
out.stats.starttime = new_starttime
out.stats.sampling_rate = int(upfactor * trace.stats.sampling_rate)
logging.debug(f"Raw upsampled trace:\n\t{out}")
# Trim to remove additional padding left from reading with nearest_sample=True at a
# variety of sampling rates. NOTE: here we are using nearest_sample=False, as all
# data in the stream should now be at a *multiple* of the desired sampling rate, and
# with any off-sample data having had its timing shifted.
out.trim(
starttime=starttime - 0.00001, endtime=endtime + 0.00001, nearest_sample=False
)
logging.debug(f"Trimmed upsampled trace:\n\t{out}")
return out
[docs]def merge_stream(stream):
"""
Merge all traces with contiguous data, or overlapping data which exactly matches
(== st._cleanup(); i.e. no clobber). Apply this on a channel by channel basis so
that if any individual merge fails then only that channel will be omitted.
Parameters
----------
stream : `obspy.Stream` object
Stream to be merged.
Returns
-------
stream_merged : `obpsy.Stream` object
Merged Stream.
"""
# Work on a copy
stream = stream.copy()
seed_ids = set([trace.id for trace in stream])
stream_merged = Stream()
with warnings.catch_warnings():
warnings.filterwarnings("error")
for seed_id in seed_ids:
try:
stream_merged += stream.select(id=seed_id).merge(method=-1)
except UserWarning as error_message:
logging.info(f"\t\t{error_message}")
logging.info(f"\t\t{stream.select(id=seed_id)}")
logging.info("\t\tThis channel will not be used for onset calculation.")
return stream_merged
[docs]def pairwise(iterable):
"""Utility to iterate over an iterable pairwise."""
a, b = tee(iterable)
next(b, None)
return zip(a, b)
[docs]def timeit(*args_, **kwargs_):
"""Function wrapper that measures the time elapsed during its execution."""
def inner_function(func):
@wraps(func)
def wrapper(*args, **kwargs):
ts = time.time()
result = func(*args, **kwargs)
msg = " " * 21 + f"Elapsed time: {time.time() - ts:6f} seconds."
try:
if args_[0] == "info":
logging.info(msg)
except IndexError:
logging.debug(msg)
return result
return wrapper
return inner_function
[docs]def get_phase_component_strings(channel_maps):
"""
Get regex strings to select the correct waveform components to plot on each panel
of the pick summary plot, given the custom channel mapping specified by the user.
P phase components are assembled into a single list. S phase components labelled
with letters (e.g. "E", "N") are treated separately to those labelled numerically
(e.g. "1", "2"), to enable a data containing a mix of the above to be plotted. If
more than two different component labels are associated with the S phase, then only
the first two alphabetical and/or numeric labels are retained (sorted
alphabetically/numerically), in order to assure compatibility with our current (not
that flexible) plotting setup. If the use of this function is expanded beyond that
of a plotting utility, this might need to be revised.
Currently all P-phase components are retained; this means if more than one
component is specified for the P onset, the waveforms will be overlain.
Parameters
----------
channel_maps : dict of str
Data component maps - keys are phases. (e.g. {"P": "Z"})
Returns
-------
p_str : str
Regex string (e.g. "[Z]") for selecting data from an `~obspy.Stream` object
by component.
s_str_1 : str
Regex string (e.g. "[N,1]") for selecting data from an `~obspy.Stream` object
by component.
s_str_2 : str
Regex string (e.g. "[E,2]") for selecting data from an `~obspy.Stream` object
by component.
"""
p_comps = list(channel_maps["P"].strip("*").strip("[").strip("]"))[::2]
s_comps = list(channel_maps["S"].strip("*").strip("[").strip("]"))[::2]
p_str, s_str_1, s_str_2 = "", "", ""
for p_comp in p_comps:
p_str += f"{p_comp},"
s_comps_alpha = [
s_comps[i] for i in np.where([not x.isnumeric() for x in s_comps])[0]
]
s_comps_numeric = [
s_comps[i] for i in np.where([x.isnumeric() for x in s_comps])[0]
]
if s_comps_alpha and s_comps_numeric:
if len(s_comps_alpha) > 2 or len(s_comps_numeric) > 2:
logging.info(
"More than two pairs of S-phase components found in "
"channel maps. Only using first two for plotting!"
)
for i, (s_alpha, s_numeric) in enumerate(zip(s_comps_alpha, s_comps_numeric)):
if i == 0:
s_str_1 += f"{s_alpha},{s_numeric},"
elif i == 1:
s_str_2 += f"{s_alpha},{s_numeric},"
elif i > 1:
continue
else:
for s_comps_list in [s_comps_alpha, s_comps_numeric]:
if s_comps_list:
s_str_1 += f"{s_comps_list[0]},"
if len(s_comps_list) > 1:
s_str_2 += f"{s_comps_list[1]},"
if len(s_comps_list) > 2:
logging.info(
"More than two alphabetical or numeric S-phase components"
" found in channel maps. Only using first two for plotting!"
)
p_str = f"[{p_str.rstrip(',')}]"
s_str_1 = f"[{s_str_1.rstrip(',')}]"
s_str_2 = f"[{s_str_2.rstrip(',')}]"
return p_str, s_str_1, s_str_2
[docs]class ArchivePathStructureError(Exception):
"""
Custom exception to handle case where an invalid Archive path structure is selected.
"""
def __init__(self, archive_format):
super().__init__(
f"The archive path structure you have selected: '{archive_format}' is not "
"a valid option! See the documentation for "
"'quakemigrate.data.Archive.path_structure' for a complete list, or specify"
" a custom format with 'Archive.format = "
"custom/archive_{year}_{jday}/{day:02d}.{station}_structure'."
)
[docs]class ArchiveEmptyException(Exception):
"""Custom exception to handle empty archive."""
def __init__(self):
super().__init__("No data was available for this timestep.")
# Additional message printed to log
self.msg = "\t\tNo files found in archive for this time period."
[docs]class NoScanMseedDataException(Exception):
"""
Custom exception to handle case when no .scanmseed files can be found by
read_coastream().
"""
def __init__(self):
super().__init__("No .scanmseed data found.")
[docs]class NoStationAvailabilityDataException(Exception):
"""
Custom exception to handle case when no .StationAvailability files can be found by
read_availability().
"""
def __init__(self):
super().__init__("No .StationAvailability files found.")
[docs]class DataAvailabilityException(Exception):
"""
Custom exception to handle case when all data for the selected stations did not pass
the data quality criteria specified by the user.
"""
def __init__(self):
super().__init__(
"All data for this timestep did not pass the specified data quality "
"criteria."
)
# Additional message printed to log
self.msg = (
"\t\tAll data for this timestep failed to pass the"
"\n\t\tspecified data quality criteria. This includes the"
"\n\t\tpresence of gaps or overlaps, or the data not"
"\n\t\tspanning the full time window."
)
[docs]class DataGapException(Exception):
"""
Custom exception to handle case when no data is found for the selected stations for
a given timestep.
"""
def __init__(self):
super().__init__(
"No data present in the archive for theselected stations for this time "
"window."
)
# Additional message printed to log
self.msg = (
"\t\tNo data for the selected stations was found in the"
"\n\t\tarchive for this time window."
)
[docs]class ChannelNameException(Exception):
"""
Custom exception to handle case when waveform data header has channel names which do
not conform to the IRIS SEED standard.
"""
def __init__(self, trace):
super().__init__(
"Channel name header does not conform to\nthe IRIS SEED standard - 3 "
"characters; ending in 'Z' for\nvertical and ending either 'E' & 'N' or "
f"'1' & '2' for\nhorizontal components.\n Working on trace: {trace}"
)
[docs]class NoOnsetPeak(Exception):
"""
Custom exception to handle case when no values in the onset function exceed the
threshold used for picking.
"""
def __init__(self, pick_threshold):
self.msg = (
"\t\t No onset signal exceeding pick threshold "
f"({pick_threshold:5.3f}) - continuing."
)
super().__init__(self.msg)
[docs]class BadUpfactorException(Exception):
"""
Custom exception to handle case when the chosen upfactor does not create a trace
with a sampling rate that can be decimated to the target sampling rate.
"""
def __init__(self, trace):
super().__init__(
"Chosen upfactor cannot be decimated to\ntarget sampling rate."
f"\n Working on trace: {trace}"
)
[docs]class OnsetTypeError(Exception):
"""
Custom exception to handle case when the onset object passed to QuakeScan is not of
the default type defined in QuakeMigrate.
"""
def __init__(self):
super().__init__(
"The Onset object you have created does not inherit from the required base "
"class - see manual."
)
[docs]class PickerTypeError(Exception):
"""
Custom exception to handle case when the phase picker object passed to QuakeScan is
not of the default type defined in QuakeMigrate.
"""
def __init__(self):
super().__init__(
"The PhasePicker object you have created does not inherit from the "
"required base class - see manual."
)
[docs]class LUTPhasesException(Exception):
"""
Custom exception to handle the case when the look-up table does not contain the
traveltimes for the phases necessary for a given function.
"""
def __init__(self, message):
super().__init__(message)
[docs]class PickOrderException(Exception):
"""
Custom exception to handle the case when the pick for the P phase is later than the
pick for the S phase.
"""
def __init__(self, event_uid, station, p_pick, s_pick):
super().__init__(
"The P-phase arrival-time pick is later than the S-phase arrival pick! "
f"Something has gone wrong.\nEvent: {event_uid}, station: {station}, "
f"p_pick: {p_pick}, s_pick: {s_pick}. There is probably a bug with the "
"picker."
)
[docs]class MagsTypeError(Exception):
"""
Custom exception to handle case when an object has been provided to calculate
magnitudes during locate, but it isn't supported.
"""
def __init__(self):
super().__init__(
"The Mags object you have specified is not supported: currently only "
"`quakemigrate.signal.local_mag.LocalMag` - see manual."
)
[docs]class NoTriggerFilesFound(Exception):
"""
Custom exception to handle case when no trigger files are found during locate. This
can occur for one of two reasons - an entirely invalid time period was used (i.e.
one that does not overlap at all with a period of time for which there exists
TriggeredEvents.csv files) or an invalid run name was provided.
"""
def __init__(self):
super().__init__(
"Double check you have supplied a valid run name and a time period for "
"which you have run detect."
)
[docs]class ResponseNotFoundError(Exception):
"""
Custom exception to handle the case where the provided response inventory doesn't
contain the response information for a trace.
Parameters
----------
e : str
Error message from ObsPy `Inventory.get_response()`.
tr_id : str
ID string for the Trace for which the response cannot be found.
"""
def __init__(self, e, tr_id):
super().__init__(f"{e} -- skipping {tr_id}")
[docs]class ResponseRemovalError(Exception):
"""
Custom exception to handle the case where the response removal was not successful.
Parameters
----------
e : str
Error message from ObsPy `Trace.remove_response()` or `Trace.simulate()`.
tr_id : str
ID string for the Trace for which the response cannot be removed.
"""
def __init__(self, e, tr_id):
super().__init__(f"{e} -- skipping {tr_id}")
[docs]class NyquistException(Exception):
"""
Custom exception to handle the case where the specified filter has a lowpass corner
above the signal Nyquist frequency.
Parameters
----------
freqmax : float
Specified lowpass frequency for filter.
f_nyquist : float
Nyquist frequency for the relevant waveform data.
tr_id : str
ID string for the Trace.
"""
def __init__(self, freqmax, f_nyquist, tr_id):
super().__init__(
f" Selected bandpass_highcut {freqmax} Hz is at or above the Nyquist "
f"frequency ({f_nyquist} Hz) for trace {tr_id}. "
)
[docs]class PeakToTroughError(Exception):
"""
Custom exception to handle case when amplitude._peak_to_trough_amplitude encounters
an anomalous set of peaks and troughs, so can't calculate an amplitude.
"""
def __init__(self, err):
super().__init__(err)
# Additional message printed to log
self.msg = err
[docs]class TimeSpanException(Exception):
"""
Custom exception to handle case when the user has submitted a start time that is
after the end time.
"""
def __init__(self):
super().__init__("The start time specified is after the end time.")
[docs]class InvalidTriggerThresholdMethodException(Exception):
"""
Custom exception to handle case when the user has not selected a valid trigger
threshold method.
"""
def __init__(self):
super().__init__(
"Only 'static', 'mad' or 'median_ratio' thresholds are " "supported."
)
[docs]class InvalidPickThresholdMethodException(Exception):
"""
Custom exception to handle case when the user has not selected a valid pick
threshold method.
"""
def __init__(self):
super().__init__("Only 'percentile' or 'MAD' thresholds are supported.")