Source code for quakemigrate.util

# -*- coding: utf-8 -*-
"""
Module that supplies various utility functions and classes.

:copyright:
    2020, QuakeMigrate developers.
:license:
    GNU General Public License, Version 3
    (https://www.gnu.org/licenses/gpl-3.0.html)

"""

import logging
import sys
import time
from datetime import datetime
from functools import wraps

import matplotlib.ticker as ticker
import numpy as np
from obspy import Trace


log_spacer = "="*110


[docs]def make_directories(run, subdir=None): """ Make run directory, and optionally make subdirectories within it. Parameters ---------- run : pathlib Path object Location of parent output directory, named by run name. subdir : string, optional subdir to make beneath the run level. """ run.mkdir(exist_ok=True) if subdir: new_dir = run / subdir new_dir.mkdir(exist_ok=True, parents=True)
[docs]def gaussian_1d(x, a, b, c): """ Create a 1-dimensional Gaussian function. Parameters ---------- x : array-like Array of x values a : float / int Amplitude (height of Gaussian) b : float / int Mean (centre of Gaussian) c : float / int Sigma (width of Gaussian) Returns ------- f : function 1-dimensional Gaussian function """ f = a * np.exp(-1. * ((x - b) ** 2) / (2 * (c ** 2))) return f
[docs]def gaussian_3d(nx, ny, nz, sgm): """ Create a 3-dimensional Gaussian function. Parameters ---------- nx : array-like Array of x values ny : array-like Array of y values nz : array-like Array of z values sgm : float / int Sigma (width of gaussian in all directions) Returns ------- f : function 3-dimensional Gaussian function """ nx2 = (nx - 1) / 2 ny2 = (ny - 1) / 2 nz2 = (nz - 1) / 2 x = np.linspace(-nx2, nx2, nx) y = np.linspace(-ny2, ny2, ny) z = np.linspace(-nz2, nz2, nz) ix, iy, iz = np.meshgrid(x, y, z, indexing="ij") if np.isscalar(sgm): sgm = np.repeat(sgm, 3) sx, sy, sz = sgm f = np.exp(- (ix * ix) / (2 * sx * sx) - (iy * iy) / (2 * sy * sy) - (iz * iz) / (2 * sz * sz)) return f
[docs]def logger(logstem, log, loglevel="info"): """ Simple logger that will output to both a log file and stdout. Parameters ---------- logstem : str Filestem for log file. log : bool Toggle for logging - default is to only print information to stdout. If True, will also create a log file. loglevel : str, optional Toggle for logging level - default is to print only "info" messages to log. To print more detailed "debug" messages, set to "debug". """ level = logging.DEBUG if loglevel == "debug" else logging.INFO if log: now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") logfile = logstem.parent / f"{logstem.name}_{now}" logfile.parent.mkdir(exist_ok=True, parents=True) handlers = [logging.FileHandler(str(logfile.with_suffix(".log"))), logging.StreamHandler(sys.stdout)] else: handlers = [logging.StreamHandler(sys.stdout)] logging.basicConfig(level=level, format="%(message)s", handlers=handlers)
# format="%(asctime)s [%(levelname)s] %(message)s",
[docs]def time2sample(time, sampling_rate): """ Utility function to convert from seconds and sampling rate to number of samples. Parameters ---------- time : float Time to convert sampling_rate : int Sampling rate of input data/sampling rate at which to compute the coalescence function. Returns ------- out : int Time that correpsonds to an integer number of samples at a specific sampling rate. """ return int(round(time*int(sampling_rate)))
[docs]class DateFormatter(ticker.Formatter): """ Extend the `matplotlib.ticker.Formatter` class to allow for millisecond precision when formatting a tick (in days since the epoch) with a `~datetime.datetime.strftime` format string. """ def __init__(self, fmt, precision=3): """ Parameters ---------- fmt : str `~datetime.datetime.strftime` format string. precision : int Degree of precision to which to report sub-second time intervals. """ from matplotlib.dates import num2date self.num2date = num2date self.fmt = fmt self.precision = precision def __call__(self, x, pos=0): if x == 0: raise ValueError("DateFormatter found a value of x=0, which is " "an illegal date; this usually occurs because " "you have not informed the axis that it is " "plotting dates, e.g., with ax.xaxis_date()") dt = self.num2date(x) ms = dt.strftime("%f")[:self.precision] return dt.strftime(self.fmt).format(ms=ms)
[docs]def trim2sample(time, sampling_rate): """ Utility function to ensure time padding results in a time that is an integer number of samples. Parameters ---------- time : float Time to trim. sampling_rate : int Sampling rate of input data/sampling rate at which to compute the coalescence function. Returns ------- out : int Time that correpsonds to an integer number of samples at a specific sampling rate. """ return int(np.ceil(time * sampling_rate) / sampling_rate * 1000) / 1000
[docs]def wa_response(convert='DIS2DIS', obspy_def=True): """ Generate a Wood Anderson response dictionary. Parameters ---------- convert : {'DIS2DIS', 'VEL2VEL', ‘VEL2DIS'} Type of output to convert between; determines the number of complex zeros used. obspy_def : bool, optional Use the ObsPy definition of the Wood Anderson response (Default). Otherwise, use the IRIS/SAC definition. Returns ------- WOODANDERSON : dict Poles, zeros, sensitivity and gain of the Wood-Anderson torsion seismograph. """ if obspy_def: # Create Wood-Anderson response - ObsPy values woodanderson = {"poles": [-6.283185 - 4.712j, -6.283185 + 4.712j], "zeros": [0j], "sensitivity": 2080, "gain": 1.} else: # Create Wood Anderson response - different to the ObsPy values # http://www.iris.washington.edu/pipermail/sac-help/2013-March/001430.html woodanderson = {"poles": [-5.49779 + 5.60886j, -5.49779 - 5.60886j], "zeros": [0j], "sensitivity": 2080, "gain": 1.} if convert in ('DIS2DIS', 'VEL2VEL'): # Add an extra zero to go from disp to disp or vel to vel. woodanderson['zeros'].extend([0j]) return woodanderson
[docs]def decimate(trace, sr): """ Decimate a trace to achieve the desired sampling rate, sr. NOTE: data will be detrended and a cosine taper applied before decimation, in order to avoid edge effects when applying the lowpass filter. Parameters: ----------- trace : `obspy.Trace` object Trace to be decimated. sr : int Output sampling rate. Returns: -------- trace : `obspy.Trace` object Decimated trace. """ # Work on a copy of the trace trace = trace.copy() # Detrend and apply cosine taper trace.detrend('linear') trace.detrend('demean') trace.taper(type='cosine', max_percentage=0.05) # Zero-phase lowpass filter at Nyquist frequency trace.filter("lowpass", freq=float(sr) / 2.000001, corners=2, zerophase=True) trace.decimate(factor=int(trace.stats.sampling_rate / sr), strict_length=False, no_filter=True) return trace
[docs]def upsample(trace, upfactor): """ Upsample a data stream by a given factor, prior to decimation. The upsampling is done using a linear interpolation. Parameters ---------- trace : `obspy.Trace` object Trace to be upsampled. upfactor : int Factor by which to upsample the data in trace. Returns ------- out : `obpsy.Trace` object Upsampled trace. """ data = trace.data dnew = np.zeros(len(data)*upfactor - (upfactor - 1)) dnew[::upfactor] = data for i in range(1, upfactor): dnew[i::upfactor] = float(i)/upfactor*data[1:] \ + float(upfactor - i)/upfactor*data[:-1] out = Trace() out.data = dnew out.stats = trace.stats out.stats.npts = len(out.data) out.stats.starttime = trace.stats.starttime out.stats.sampling_rate = int(upfactor * trace.stats.sampling_rate) return out
[docs]def timeit(*args_, **kwargs_): """Function wrapper that measures the time elapsed during its execution.""" def inner_function(func): @wraps(func) def wrapper(*args, **kwargs): ts = time.time() result = func(*args, **kwargs) msg = " "*21 + f"Elapsed time: {time.time() - ts:6f} seconds." try: if args_[0] == "info": logging.info(msg) except IndexError: logging.debug(msg) return result return wrapper return inner_function
[docs]class StationFileHeaderException(Exception): """Custom exception to handle incorrect header columns in station file""" def __init__(self): msg = ("StationFileHeaderException: incorrect station file header - " "use:\nLatitude, Longitude, Elevation, Name") super().__init__(msg)
[docs]class InvalidVelocityModelHeader(Exception): """Custom exception to handle incorrect header columns in station file""" def __init__(self, key): super().__init__(f"Must include at least '{key}' in header.")
[docs]class ArchiveFormatException(Exception): """Custom exception to handle case where Archive.format is not set.""" def __init__(self): msg = ("ArchiveFormatException: Archive format has not been set. Set " "when making the Archive object with the kwarg " "'archive_format=<path_structure>', or afterwards by using the " "command 'Archive.path_structure(<path_structure>)'. To set a " "custom format, use 'Archive.format = " "custom/archive_{year}_{jday}/{day:02d}.{station}_structure' ") super().__init__(msg)
[docs]class ArchivePathStructureError(Exception): """Custom exception to handle case where an invalid Archive path structure is selected.""" def __init__(self, archive_format): msg = ("ArchivePathStructureError: The archive path structure you have" f" selected: '{archive_format}' is not a valid option! See the " "documentation for quakemigrate.data.Archive.path_structure for" " a complete list, or specify a custom format with Archive.form" "at = custom/archive_{year}_{jday}/{day:02d}.{station}_structur" "e") super().__init__(msg)
[docs]class ArchiveEmptyException(Exception): """Custom exception to handle empty archive""" def __init__(self): msg = "ArchiveEmptyException: No data was available for this timestep." super().__init__(msg) # Additional message printed to log self.msg = ("\t\tNo files found in archive for this time period.")
[docs]class NoScanMseedDataException(Exception): """ Custom exception to handle case when no .scanmseed files can be found by read_coastream() """ def __init__(self): msg = "NoScanMseedDataException: No .scanmseed data found." super().__init__(msg)
[docs]class NoStationAvailabilityDataException(Exception): """ Custom exception to handle case when no .StationAvailability files can be found by read_availability() """ def __init__(self): msg = ("NoStationAvailabilityDataException: No .StationAvailability " "files found.") super().__init__(msg)
[docs]class DataGapException(Exception): """ Custom exception to handle case when all data has gaps for a given timestep """ def __init__(self): msg = ("DataGapException: All available data had gaps for this " "timestep.\n OR: no data present in the archive for the" "selected stations.") super().__init__(msg) # Additional message printed to log self.msg = ("\t\tAll available data for this time period contains gaps" "\n\t\tor data not available at start/end of time period")
[docs]class ChannelNameException(Exception): """ Custom exception to handle case when waveform data header has channel names which do not conform to the IRIS SEED standard. """ def __init__(self, trace): msg = ("ChannelNameException: Channel name header does not conform " "to\nthe IRIS SEED standard - 3 characters; ending in 'Z' for\n" "vertical and ending either 'E' & 'N' or '1' & '2' for\n" "horizontal components.\n Working on trace: {}".format(trace)) super().__init__(msg)
[docs]class BadUpfactorException(Exception): """ Custom exception to handle case when the chosen upfactor does not create a trace with a sampling rate that can be decimated to the target sampling rate """ def __init__(self, trace): msg = ("BadUpfactorException: chosen upfactor cannot be decimated to\n" "target sampling rate.\n Working on trace: {}".format(trace)) super().__init__(msg)
[docs]class OnsetTypeError(Exception): """ Custom exception to handle case when the onset object passed to QuakeScan is not of the default type defined in QuakeMigrate. """ def __init__(self): msg = ("OnsetTypeError: The Onset object you have created does not " "inherit from the required base class - see manual.") super().__init__(msg)
[docs]class PickerTypeError(Exception): """ Custom exception to handle case when the phase picker object passed to QuakeScan is not of the default type defined in QuakeMigrate. """ def __init__(self): msg = ("PickerTypeError: The PhasePicker object you have created does " "not inherit from the required base class - see manual.") super().__init__(msg)
[docs]class PickOrderException(Exception): """ Custom exception to handle the case when the pick for the P phase is later than the pick for the S phase. """ def __init__(self, event_uid, station, p_pick, s_pick): msg = ("PickOrderException: The P-phase arrival-time pick is later " "than the S-phase arrival pick! Something has gone wrong. " f"Event: {event_uid}, station: {station}, p_pick: {p_pick}, " f"s_pick: {s_pick}. There is probably a bug with the picker.") super().__init__(msg)
[docs]class MagsTypeError(Exception): """ Custom exception to handle case when an object has been provided to calculate magnitudes during locate, but it isn't supported. """ def __init__(self): msg = ("MagsTypeError: The Mags object you have specified is not " "supported: currently only " "`quakemigrate.signal.local_mag.LocalMag` - see manual.") super().__init__(msg)
[docs]class NoTriggerFilesFound(Exception): """ Custom exception to handle case when no trigger files are found during locate. This can occur for one of two reasons - an entirely invalid time period was used (i.e. one that does not overlap at all with a period of time for which there exists TriggeredEvents.csv files) or an invalid run name was provided. """ def __init__(self): msg = ("NoTriggerFilesFound: Double check you have supplied a valid " "run name and a time period for which you have run detect.") super().__init__(msg)
[docs]class ResponseNotFoundError(Exception): """ Custom exception to handle the case where the provided response inventory doesn't contain the response information for a trace. Parameters ---------- e : str Error message from ObsPy `Inventory.get_response()` tr_id : str ID string for the Trace for which the response cannot be found """ def __init__(self, e, tr_id): msg = (f"ResponseNotFoundError: {e} -- skipping {tr_id}") super().__init__(msg)
[docs]class ResponseRemovalError(Exception): """ Custom exception to handle the case where the response removal was not successful. Parameters ---------- e : str Error message from ObsPy `Trace.remove_response()` or `Trace.simulate()` tr_id : str ID string for the Trace for which the response cannot be removed """ def __init__(self, e, tr_id): msg = (f"ResponseRemovalError: {e} -- skipping {tr_id}") super().__init__(msg)
[docs]class NyquistException(Exception): """ Custom exception to handle the case where the specified filter has a lowpass corner above the signal Nyquist frequency. Parameters ---------- freqmax : float Specified lowpass frequency for filter f_nyquist : float Nyquist frequency for the relevant waveform data tr_id : str ID string for the Trace """ def __init__(self, freqmax, f_nyquist, tr_id): msg = (f" NyquistException: Selected bandpass_highcut {freqmax} " f"Hz is at or above the Nyquist frequency ({f_nyquist} Hz) " f"for trace {tr_id}. ") super().__init__(msg)
[docs]class PeakToTroughError(Exception): """ Custom exception to handle case when amplitude._peak_to_trough_amplitude encounters an anomalous set of peaks and troughs, so can't calculate an amplitude. """ def __init__(self, err): msg = (f"PeakToTroughError: {err}") super().__init__(msg) # Additional message printed to log self.msg = (err)
[docs]class TimeSpanException(Exception): """ Custom exception to handle case when the user has submitted a start time that is after the end time. """ def __init__(self): msg = ("TimeSpanException: The start time specified is after the end" " time.") super().__init__(msg)
[docs]class InvalidThresholdMethodException(Exception): """ Custom exception to handle case when the user has not selected a valid threshold method. """ def __init__(self): msg = ("InvalidThresholdMethodException: Only 'static' or 'dynamic' " "thresholds are supported.") super().__init__(msg)