Source code for quakemigrate.io.data

# -*- coding: utf-8 -*-
"""
Module for processing waveform files stored in a data archive.

:copyright:
    2020–2026, QuakeMigrate developers.
:license:
    GNU General Public License, Version 3
    (https://www.gnu.org/licenses/gpl-3.0.html)

"""

from itertools import chain
import logging
import pathlib

from obspy import read, Stream, UTCDateTime

import quakemigrate.util as util


[docs]class Archive: """ The Archive class handles the reading of archived waveform data. It is capable of handling any regular archive structure. Requests to read waveform data are served up as a :class:`~quakemigrate.io.data.WaveformData` object. If provided, a response inventory for the archive will be stored with the waveform data for response removal, if needed (e.g. for local magnitude calculation, or to output real cut waveforms). By default, data with mismatched sampling rates will only be decimated. If necessary, and if the user specifies `resample = True` and an upfactor to upsample by `upfactor = int` for the waveform archive, data can also be upsampled and then, if necessary, subsequently decimated to achieve the desired sampling rate. For example, for raw input data sampled at a mix of 40, 50 and 100 Hz, to achieve a unified sampling rate of 50 Hz, the user would have to specify an upfactor of 5; 40 Hz x 5 = 200 Hz, which can then be decimated to 50 Hz - see :func:`~quakemigrate.util.resample`. Parameters ---------- archive_path : str Location of seismic data archive: e.g.: "./DATA_ARCHIVE". stations : `pandas.DataFrame` object Station information. Columns ["Latitude", "Longitude", "Elevation", "Name"]. See :func:`~quakemigrate.io.core.read_stations` archive_format : str, optional Sets directory structure and file naming format for different archive formats. See :func:`~quakemigrate.io.data.Archive.path_structure` kwargs : **dict See Archive Attributes for details. Attributes ---------- archive_path : `pathlib.Path` object Location of seismic data archive: e.g.: ./DATA_ARCHIVE. stations : `pandas.Series` object Series object containing station names. format : str Directory structure and file naming format of data archive. read_all_stations : bool, optional If True, read all stations in archive for that time period. Else, only read specified stations. resample : bool, optional If true, perform resampling of data which cannot be decimated directly to the desired sampling rate. See :func:`~quakemigrate.util.resample` response_inv : `obspy.Inventory` object, optional ObsPy response inventory for this waveform archive, containing response information for each channel of each station of each network. pre_filt : tuple of floats Pre-filter to apply during the instrument response removal. E.g. (0.03, 0.05, 30., 35.) - all in Hz. (Default None) water_level : float Water level to use in instrument response removal. (Default 60.) remove_full_response : bool Whether to remove the full response (including the effect of digital FIR filters) or just the instrument transform function (as defined by the PolesZeros Response Stage). Significantly slower. (Default False) upfactor : int, optional Factor by which to upsample the data to enable it to be decimated to the desired sampling rate, e.g. 40Hz -> 50Hz requires upfactor = 5. See :func:`~quakemigrate.util.resample` interpolate : bool, optional If data is timestamped "off-sample" (i.e. a non-integer number of samples after midnight), whether to interpolate the data to apply the necessary correction. Default behaviour is to just alter the metadata, resulting in a sub-sample timing offset. See :func:`~quakemigrate.util.shift_to_sample`. ignore_network_code : bool, optional If True, replace all network codes in the waveform archive with a dummy value. Note this may cause issues if station codes are repeated, with SEED-ID's only distinguished by their differing network codes. dummy_network_code : str, optional Provides the option to specify the dummy network code applied to the waveform archive, if `ignore_network_code` is set to True. ignore_location_code : bool, optional If True, replace all location codes in the waveform archive with a blank string. Methods ------- path_structure(archive_format, channels="*") Set the directory structure and file naming format of the data archive. read_waveform_data(starttime, endtime) Read in waveform data between two times. """ def __init__(self, archive_path, stations, archive_format=None, **kwargs): """Instantiate the Archive object.""" self.archive_path = pathlib.Path(archive_path) self.stations = stations["Name"] if archive_format: channels = kwargs.get("channels", "*") self.path_structure(archive_format, channels) else: self.format = kwargs.get("format") self.read_all_stations = kwargs.get("read_all_stations", False) # Resampling parameters self.resample = kwargs.get("resample", False) self.upfactor = kwargs.get("upfactor") self.interpolate = kwargs.get("interpolate", False) # SEED ID params self.ignore_network_code = kwargs.get("ignore_network_code", False) self.dummy_network_code = kwargs.get("dummy_network_code", "XX") self.ignore_location_code = kwargs.get("ignore_location_code", False) # Response removal parameters self.response_inv = kwargs.get("response_inv") response_removal_params = kwargs.get("response_removal_params", {}) if self.response_inv and "water_level" not in response_removal_params.keys(): print( # Logger not yet spun up "Warning: 'water level' for instrument correction not " "specified. Set to default: 60" ) self.water_level = response_removal_params.get("water_level", 60.0) self.pre_filt = response_removal_params.get("pre_filt") self.remove_full_response = response_removal_params.get( "remove_full_response", False ) def __str__(self, response_only=False): """ Returns a short summary string of the Archive object. Parameters ---------- response_only : bool, optional Whether to just output the a string describing the instrument response parameters. Returns ------- out : str Summary string. """ if self.response_inv: response_str = ( "\tResponse removal parameters:\n" f"\t\tWater level = {self.water_level}\n" ) if self.pre_filt is not None: response_str += f"\t\tPre-filter = {self.pre_filt} Hz\n" response_str += ( "\t\tRemove full response (inc. FIR stages) = " f"{self.remove_full_response}\n" ) else: response_str = "\tNo instrument response inventory provided!\n" if not response_only: out = ( "QuakeMigrate Archive object" f"\n\tArchive path\t:\t{self.archive_path}" f"\n\tPath structure\t:\t{self.format}" f"\n\tResampling\t:\t{self.resample}" ) if self.upfactor: out += f"\n\tUpfactor\t:\t{self.upfactor}" out += "\n\tStations:" for station in self.stations: out += f"\n\t\t{station}" out += f"\n{response_str}" else: out = response_str return out
[docs] def path_structure(self, archive_format="YEAR/JD/STATION", channels="*"): """ Define the directory structure and file naming format of the data archive. Parameters ---------- archive_format : str, optional Directory structure and file naming format of the data archive. This may be the name of a generic archive format (e.g. SeisComp3), or one of a selection of additional formats built into QuakeMigrate. channels : str, optional Channel codes to include. E.g. channels="[B,H]H*". (Default "*") Raises ------ ArchivePathStructureError If the `archive_format` specified by the user is not a valid option. """ if archive_format == "SeisComp3": self.format = ( "{year}/*/{station}/" + channels + "/*.{station}.*.*.D." "{year}.{jday:03d}" ) elif archive_format == "YEAR/JD/*_STATION_*": self.format = "{year}/{jday:03d}/*_{station}_*" elif archive_format == "YEAR/JD/STATION": self.format = "{year}/{jday:03d}/{station}*" elif archive_format == "STATION.YEAR.JULIANDAY": self.format = "*{station}.*.{year}.{jday:03d}" elif archive_format == "/STATION/STATION.YearMonthDay": self.format = "{station}/{station}.{year}{month:02d}{day:02d}" elif archive_format == "YEAR_JD/STATION*": self.format = "{year}_{jday:03d}/{station}*" elif archive_format == "YEAR_JD/STATION_*": self.format = "{year}_{jday:03d}/{station}_*" else: raise util.ArchivePathStructureError(archive_format)
[docs] def read_waveform_data(self, starttime, endtime, pre_pad=0.0, post_pad=0.0): """ Read in waveform data from the archive between two times. Supports all formats currently supported by ObsPy, including: "MSEED", "SAC", "SEGY", "GSE2". Optionally, read data with some pre- and post-pad, and for all stations in the archive - this will be stored in `data.raw_waveforms`, while `data.waveforms` will contain only data for selected stations between `starttime` and `endtime`. Parameters ---------- starttime : `obspy.UTCDateTime` object Timestamp from which to read waveform data. endtime : `obspy.UTCDateTime` object Timestamp up to which to read waveform data. pre_pad : float, optional Additional pre pad of data to read. Defaults to 0. post_pad : float, optional Additional post pad of data to read. Defaults to 0. Returns ------- data : :class:`~quakemigrate.io.data.WaveformData` object Object containing the waveform data read from the archive that satisfies the query. Raises ------ ArchiveEmptyException If no data files are found in the archive for this day(s). DataAvailabilityException If no data is found in the archive for the specified stations within the specified time window. """ # Ensure pre-pad and post-pad are not negative. pre_pad = max(0.0, pre_pad) post_pad = max(0.0, post_pad) data = WaveformData( starttime=starttime, endtime=endtime, stations=self.stations, read_all_stations=self.read_all_stations, resample=self.resample, upfactor=self.upfactor, response_inv=self.response_inv, water_level=self.water_level, pre_filt=self.pre_filt, remove_full_response=self.remove_full_response, pre_pad=pre_pad, post_pad=post_pad, ) files = self._load_from_path(starttime - pre_pad, endtime + post_pad) st = Stream() try: first = next(files) files = chain([first], files) for file in files: file = str(file) try: read_start = starttime - pre_pad read_end = endtime + post_pad st += read( file, starttime=read_start, endtime=read_end, nearest_sample=True, ) except TypeError: logging.info(f"File not compatible with ObsPy - {file}") continue # If network code is to be ignored, set all channels to network code 'XX' if self.ignore_network_code: for tr in st: tr.stats.network = self.dummy_network_code # If location code is to be ignored, set all channels to location code '' if self.ignore_location_code: for tr in st: tr.stats.location = "" # Merge waveforms channel-by-channel with no-clobber merge st = util.merge_stream(st) # Make copy of raw waveforms to output if requested data.raw_waveforms = st.copy() # Ensure data is timestamped "on-sample" (i.e. an integer number of samples # after midnight). Otherwise the data will be implicitly shifted when it is # used to calculate the onset function / migrated. st = util.shift_to_sample(st, interpolate=self.interpolate) if self.read_all_stations: # Re-populate st with only stations in station file st_selected = Stream() for station in self.stations: st_selected += st.select(station=station) st = st_selected.copy() del st_selected if pre_pad != 0.0 or post_pad != 0.0: # Trim data between start and end time for tr in st: tr.trim(starttime=starttime, endtime=endtime, nearest_sample=True) if not bool(tr): st.remove(tr) # Test if the stream is completely empty # (see __nonzero__ for `obspy.Stream` object) if not bool(st): raise util.DataGapException # Add cleaned stream to `waveforms` data.waveforms = st except StopIteration: raise util.ArchiveEmptyException return data
def _load_from_path(self, starttime, endtime): """ Retrieves available files between two times. Parameters ---------- starttime : `obspy.UTCDateTime` object Timestamp from which to read waveform data. endtime : `obspy.UTCDateTime` object Timestamp up to which to read waveform data. Returns ------- files : generator Iterator object of available waveform data files. Raises ------ ArchiveFormatException If the Archive.format attribute has not been set. """ if self.format is None: raise util.ArchiveFormatException # Loop through time period by day adding files to list # NOTE! This assumes the archive structure is split into days. files = [] loadstart = UTCDateTime(starttime.date) while loadstart <= endtime: temp_format = self.format.format( year=loadstart.year, month=loadstart.month, day=loadstart.day, jday=loadstart.julday, station="{station}", dtime=loadstart, ) if self.read_all_stations is True: file_format = temp_format.format(station="*") file_format = file_format.replace("**", "*") files = chain(files, self.archive_path.glob(file_format)) else: for station in self.stations: file_format = temp_format.format(station=station) files = chain(files, self.archive_path.glob(file_format)) loadstart += 86400 return files @property def dummy_network_code(self): """ Dummy network code is a 2 character string to replace the (possibly mixed) network codes in the input archive. """ return self._dummy_network_code @dummy_network_code.setter def dummy_network_code(self, value): """Setter for dummy network code.""" if isinstance(value, str) and len(value) == 2: self._dummy_network_code = value else: raise ValueError( f"`dummy_network_code` must be a 2 character string, not {value}." )
[docs]class WaveformData: """ The WaveformData class encapsulates the waveform data returned by an Archive query. It also provides a number of utility functions. These include removing instrument response and checking data availability against a flexible set of data quality criteria. Parameters ---------- starttime : `obspy.UTCDateTime` object Timestamp of first sample of waveform data requested from the archive. endtime : `obspy.UTCDateTime` object Timestamp of last sample of waveform data requested from the archive. stations : `pandas.Series` object, optional Series object containing station names. read_all_stations : bool, optional If True, `raw_waveforms` contain all stations in archive for that time period. Else, only selected stations will be included. resample : bool, optional If true, allow resampling of data which cannot be decimated directly to the desired sampling rate. See :func:`~quakemigrate.util.resample` Default: False upfactor : int, optional Factor by which to upsample the data to enable it to be decimated to the desired sampling rate, e.g. 40Hz -> 50Hz requires upfactor = 5. See :func:`~quakemigrate.util.resample` response_inv : `obspy.Inventory` object, optional ObsPy response inventory for this waveform data, containing response information for each channel of each station of each network. pre_filt : tuple of floats Pre-filter to apply during the instrument response removal. E.g. (0.03, 0.05, 30., 35.) - all in Hz. (Default None) water_level : float Water level to use in instrument response removal. (Default 60.) remove_full_response : bool Whether to remove the full response (including the effect of digital FIR filters) or just the instrument transform function (as defined by the PolesZeros Response Stage). Significantly slower. (Default False) pre_pad : float, optional Additional pre pad of data included in `raw_waveforms`. post_pad : float, optional Additional post pad of data included in `raw_waveforms`. Attributes ---------- starttime : `obspy.UTCDateTime` object Timestamp of first sample of waveform data requested from the archive. endtime : `obspy.UTCDateTime` object Timestamp of last sample of waveform data requested from the archive. stations : `pandas.Series` object Series object containing station names. read_all_stations : bool If True, `raw_waveforms` contain all stations in archive for that time period. Else, only selected stations will be included. raw_waveforms : `obspy.Stream` object Raw seismic data read in from the archive. This may be for all stations in the archive, or only those specified by the user. See `read_all_stations`. It may also cover the time period between `starttime` and `endtime`, or feature an additional pre- and post-pad. See `pre_pad` and `post_pad`. waveforms : `obspy.Stream` object Seismic data read in from the archive for the specified list of stations, between `starttime` and `endtime`. pre_pad : float Additional pre pad of data included in `raw_waveforms`. post_pad : float Additional post pad of data included in `raw_waveforms`. Methods ------- check_availability(stream, **data_quality_params) Check data availability against a set of data quality criteria. get_wa_waveform(trace, **response_removal_params) Calculate the Wood-Anderson corrected waveform for a `obspy.Trace` object. Raises ------ NotImplementedError If the user attempts to use the get_real_waveform() method. """ def __init__( self, starttime, endtime, stations=None, response_inv=None, water_level=60.0, pre_filt=None, remove_full_response=False, read_all_stations=False, resample=False, upfactor=None, pre_pad=0.0, post_pad=0.0, ): """Instantiate the WaveformData object.""" self.starttime = starttime self.endtime = endtime self.stations = stations self.response_inv = response_inv self.water_level = water_level self.pre_filt = pre_filt self.remove_full_response = remove_full_response self.read_all_stations = read_all_stations self.resample = resample self.upfactor = upfactor self.pre_pad = pre_pad self.post_pad = post_pad self.raw_waveforms = None self.waveforms = Stream() self.wa_waveforms = None self.real_waveforms = None
[docs] def check_availability( self, st, all_channels=False, n_channels=None, allow_gaps=False, full_timespan=True, check_sampling_rate=False, sampling_rate=None, check_start_end_times=False, ): """ Check waveform availability against data quality criteria. There are a number of hard-coded checks: for whether any data is present; for whether the data is a flatline (all samples have the same value); and for whether the data contains overlaps. There are a selection of additional optional checks which can be specified according to the onset function / user preference. Parameters ---------- st : `obspy.Stream` object Stream containing the waveform data to check against the availability criteria. all_channels : bool, optional Whether all supplied channels (distinguished by SEED id) need to meet the availability criteria to mark the data as 'available'. n_channels : int, optional If `all_channels=True`, this argument is required (in order to specify the number of channels expected to be present). allow_gaps : bool, optional Whether to allow gaps. full_timespan : bool, optional Whether to ensure the data covers the entire timespan requested; note that this implicitly requires that there be no gaps. Checks the number of samples in the trace, not the start and end times; for that see `check_start_end_times`. check_sampling_rate : bool, optional Check that all channels are at the desired sampling rate. sampling_rate : float, optional If `check_sampling_rate=True`, this argument is required to specify the sampling rate that the data should be at. check_start_end_times : bool, optional A stricter alternative to `full_timespan`; checks that the first and last sample of the trace have exactly the requested timestamps. Returns ------- available : int 0 if data doesn't meet the availability requirements; 1 if it does. availability : dict Dict of {tr_id : available} for each unique SEED ID in the input stream (available is again 0 or 1). Raises ------ TypeError If the user specifies `all_channels=True` but does not specify `n_channels`. """ availability = {} available = 0 timespan = self.endtime - self.starttime # Check if any channels in stream if bool(st): # Loop through channels with unique SEED id's for tr_id in sorted(set([tr.id for tr in st])): st_id = st.select(id=tr_id) availability[tr_id] = 0 # Check it's not flatlined if any(tr.data.max() == tr.data.min() for tr in st_id): continue # Check for overlaps overlaps = st_id.get_gaps(max_gap=-0.000001) if len(overlaps) != 0: continue # Check for gaps (if requested) if not allow_gaps: gaps = st_id.get_gaps() # Overlaps already dealt with if len(gaps) != 0: continue # Check sampling rate if check_sampling_rate: if not sampling_rate: raise TypeError( "Please specify sampling_rate if you wish to check all " "channels are at the correct sampling rate." ) if any(tr.stats.sampling_rate != sampling_rate for tr in st_id): continue # Check data covers full timespan (if requested) - this # strictly checks the *timespan*, so uses the trace sampling # rate as provided. To check that as well, use # `check_sampling_rate=True` and specify a sampling rate. if full_timespan: n_samples = int(round(timespan * st_id[0].stats.sampling_rate + 1)) if len(st_id) > 1: continue elif st_id[0].stats.npts < n_samples: logging.debug("Trace has too few samples.") logging.debug( f"(n_samples, trace_npts) : ({n_samples}, {st_id[0].stats.npts})" ) continue # Check start and end times of trace are exactly correct if check_start_end_times: if len(st_id) > 1: continue elif ( st_id[0].stats.starttime != self.starttime or st_id[0].stats.endtime != self.endtime ): continue # If passed all tests, set availability to 1 availability[tr_id] = 1 # Return availability based on "all_channels" setting if all(ava == 1 for ava in availability.values()): if all_channels: # If all_channels requested, must also check that the # expected number of channels are present if not n_channels: raise TypeError( "Please specify n_channels if you wish to check all " "channels meet the availability criteria." ) elif len(availability) == n_channels: available = 1 else: available = 1 elif not all_channels and any(ava == 1 for ava in availability.values()): available = 1 return available, availability
[docs] def get_real_waveform(self, tr, velocity=True): """ Calculate the real waveform for a Trace by removing the instrument response. Parameters ---------- tr : `obspy.Trace` object Trace containing the waveform for which to remove the instrument response. velocity : bool, optional Output velocity waveform (as opposed to displacement). Default: True. Returns ------- tr : `obspy.Trace` object Trace with instrument response removed. Raises ------ AttributeError If no response inventory has been supplied. ResponseNotFoundError If the response information for a trace can't be found in the supplied response inventory. ResponseRemovalError If the deconvolution of the instrument response is unsuccessful. """ if not self.response_inv: raise AttributeError("No response inventory provided!") # Copy the Trace before operating on it tr = tr.copy() tr.detrend("linear") if not self.remove_full_response: # Just remove the response encapsulated in the instrument transfer function # (stored as a PolesAndZeros response). NOTE: this does not account for the # effect of the digital FIR filters applied to the recorded waveforms. # However, due to this it is significantly faster to compute. try: response = self.response_inv.get_response(tr.id, tr.stats.starttime) except Exception as e: raise util.ResponseNotFoundError(str(e), tr.id) # Get the instrument transfer function as a PAZ dictionary paz = response.get_paz() if not velocity: paz.zeros.extend([0j]) paz_dict = { "poles": paz.poles, "zeros": paz.zeros, "gain": paz.normalization_factor, "sensitivity": response.instrument_sensitivity.value, } try: tr.simulate( paz_remove=paz_dict, pre_filt=self.pre_filt, water_level=self.water_level, taper=True, sacsim=True, # To replicate remove_response() pitsasim=False, # To replicate remove_response() ) except ValueError as e: raise util.ResponseRemovalError(e, tr.id) else: # Use remove_response(), which removes the effect of _all_ response stages, # including the FIR stages. Considerably slower. output = "VEL" if velocity else "DISP" try: tr.remove_response( inventory=self.response_inv, output=output, pre_filt=self.pre_filt, water_level=self.water_level, taper=True, ) except ValueError as e: raise util.ResponseRemovalError(e, tr.id) try: self.real_waveforms.append(tr.copy()) except AttributeError: self.real_waveforms = Stream() self.real_waveforms.append(tr.copy()) return tr
[docs] def get_wa_waveform(self, tr, velocity=False): """ Calculate simulated Wood Anderson displacement waveform for a Trace. Parameters ---------- tr : `obspy.Trace` object Trace containing the waveform to be corrected to a Wood-Anderson response. velocity : bool, optional Output velocity waveform, instead of displacement. Default: False. NOTE: all attenuation functions provided within the QM local_mags module are calculated for displacement seismograms. Returns ------- tr : `obspy.Trace` object Trace corrected to Wood-Anderson response. """ # Copy the Trace before operating on it tr = tr.copy() tr.detrend("linear") # Remove instrument response tr = self.get_real_waveform(tr, velocity) # Simulate Wood-Anderson response tr.simulate( paz_simulate=util.wa_response(obspy_def=True), pre_filt=self.pre_filt, water_level=self.water_level, taper=True, sacsim=True, # To replicate remove_response() pitsasim=False, # To replicate remove_response() ) try: self.wa_waveforms.append(tr.copy()) except AttributeError: self.wa_waveforms = Stream() self.wa_waveforms.append(tr.copy()) return tr