Source code for quakemigrate.io.data

# -*- coding: utf-8 -*-
"""
Module for processing waveform files stored in a data archive.

:copyright:
    2020, QuakeMigrate developers.
:license:
    GNU General Public License, Version 3
    (https://www.gnu.org/licenses/gpl-3.0.html)

"""

from itertools import chain
import logging
import pathlib

import numpy as np
from obspy import read, Stream, Trace, UTCDateTime

import quakemigrate.util as util


[docs]class Archive: """ The Archive class handles the reading of archived waveform data. It is capable of handling any regular archive structure. Requests to read waveform data are served up as a `quakemigrate.data.WaveformData` object. Data will be checked for availability within the requested time period, and optionally resampled to meet a unified sampling rate. The raw data read from the archive will also be retained. If provided, a response inventory provided for the archive will be stored with the waveform data for response removal, if needed. Parameters ---------- archive_path : str Location of seismic data archive: e.g.: ./DATA_ARCHIVE. stations : `pandas.DataFrame` object Station information. Columns ["Latitude", "Longitude", "Elevation", "Name"] archive_format : str, optional Sets path type for different archive formats. kwargs : **dict See Archive Attributes for details. Attributes ---------- archive_path : `pathlib.Path` object Location of seismic data archive: e.g.: ./DATA_ARCHIVE. stations : `pandas.Series` object Series object containing station names. format : str File naming format of data archive. read_all_stations : bool, optional If True, read all stations in archive for that time period. Else, only read specified stations. resample : bool, optional If true, perform resampling of data which cannot be decimated directly to the desired sampling rate. response_inv : `obspy.Inventory` object, optional ObsPy response inventory for this waveform archive, containing response information for each channel of each station of each network. upfactor : int, optional Factor by which to upsample the data to enable it to be decimated to the desired sampling rate, e.g. 40Hz -> 50Hz requires upfactor = 5. Methods ------- path_structure(path_type="YEAR/JD/STATION") Set the file naming format of the data archive. read_waveform_data(starttime, endtime, sampling_rate) Read in all waveform data between two times, decimate / resample if required to reach desired sampling rate. Return all raw data as an obspy Stream object and processed data for specified stations as an array for use by QuakeScan to calculate onset functions for migration. """ def __init__(self, archive_path, stations, archive_format=None, **kwargs): """Instantiate the Archive object.""" self.archive_path = pathlib.Path(archive_path) self.stations = stations["Name"] if archive_format: channels = kwargs.get("channels", "*") self.path_structure(archive_format, channels) else: self.format = kwargs.get("format") self.read_all_stations = kwargs.get("read_all_stations", False) self.response_inv = kwargs.get("response_inv") self.resample = kwargs.get("resample", False) self.upfactor = kwargs.get("upfactor") def __str__(self): """Returns a short summary string of the Archive object.""" out = ("QuakeMigrate Archive object" f"\n\tArchive path\t:\t{self.archive_path}" f"\n\tPath structure\t:\t{self.format}" f"\n\tResampling\t:\t{self.resample}" "\n\tStations:") for station in self.stations: out += f"\n\t\t{station}" return out
[docs] def path_structure(self, archive_format="YEAR/JD/STATION", channels="*"): """ Define the path structure of the data archive. Parameters ---------- archive_format : str, optional Sets path type for different archive formats. channels : str, optional Channel codes to include. E.g. channels="[B,H]H*". (Default "*") Raises ------ ArchivePathStructureError If the archive_format specified by the user is not a valid option. """ if archive_format == "SeisComp3": self.format = ("{year}/*/{station}/"+channels+"/*.{station}.*.*.D." "{year}.{jday:03d}") elif archive_format == "YEAR/JD/*_STATION_*": self.format = "{year}/{jday:03d}/*_{station}_*" elif archive_format == "YEAR/JD/STATION": self.format = "{year}/{jday:03d}/{station}*" elif archive_format == "STATION.YEAR.JULIANDAY": self.format = "*{station}.*.{year}.{jday:03d}" elif archive_format == "/STATION/STATION.YearMonthDay": self.format = "{station}/{station}.{year}{month:02d}{day:02d}" elif archive_format == "YEAR_JD/STATION*": self.format = "{year}_{jday:03d}/{station}*" elif archive_format == "YEAR_JD/STATION_*": self.format = "{year}_{jday:03d}/{station}_*" else: raise util.ArchivePathStructureError(archive_format)
[docs] def read_waveform_data(self, starttime, endtime, sampling_rate, pre_pad=0., post_pad=0.): """ Read in the waveform data for all stations in the archive between two times and return station availability of the stations specified in the station file during this period. Decimate / resample (optional) this data if required to reach desired sampling rate. Output both processed data for stations in station file and all raw data in an obspy Stream object. By default, data with mismatched sampling rates will only be decimated. If necessary, and if the user specifies `resample = True` and an upfactor to upsample by `upfactor = int`, data can also be upsampled and then, if necessary, subsequently decimated to achieve the desired sampling rate. For example, for raw input data sampled at a mix of 40, 50 and 100 Hz, to achieve a unified sampling rate of 50 Hz, the user would have to specify an upfactor of 5; 40 Hz x 5 = 200 Hz, which can then be decimated to 50 Hz. NOTE: data will be detrended and a cosine taper applied before decimation, in order to avoid edge effects when applying the lowpass filter. Otherwise, data for migration will be added tp data.signal with no processing applied. Supports all formats currently supported by ObsPy, including: "MSEED" (default), "SAC", "SEGY", "GSE2" . Parameters ---------- starttime : `obspy.UTCDateTime` object, optional Timestamp from which to read waveform data. endtime : `obspy.UTCDateTime` object, optional Timestamp up to which to read waveform data. sampling_rate : int Desired sampling rate for data to be added to signal. This will be achieved by resampling the raw waveform data. By default, only decimation will be applied, but data can also be upsampled if specified by the user when creating the Archive object. pre_pad : float, optional Additional pre pad of data to cut based on user-defined pre_cut parameter. Defaults to none: pre_pad calculated in QuakeScan will be used (included in starttime). post_pad : float, optional Additional post pad of data to cut based on user-defined post_cut parameter. Defaults to none: post_pad calculated in QuakeScan will be used (included in endtime). Returns ------- data : :class:`~quakemigrate.io.data.WaveformData` object Object containing the archived data that satisfies the query. """ data = WaveformData(starttime=starttime, endtime=endtime, sampling_rate=sampling_rate, stations=self.stations, read_all_stations=self.read_all_stations, response_inv=self.response_inv, pre_pad=pre_pad, post_pad=post_pad) files = self._load_from_path(starttime - pre_pad, endtime + post_pad) st = Stream() try: first = next(files) files = chain([first], files) for file in files: file = str(file) try: read_start = starttime - pre_pad read_end = endtime + post_pad st += read(file, starttime=read_start, endtime=read_end) except TypeError: logging.info(f"File not compatible with ObsPy - {file}") continue # Merge all traces with contiguous data, or overlapping data which # exactly matches (== st._cleanup(); i.e. no clobber) st.merge(method=-1) # Make copy of raw waveforms to output if requested data.raw_waveforms = st.copy() # Re-populate st with only stations in station file, and only # data between start and end time needed for QuakeScan st_selected = Stream() for station in self.stations: st_selected += st.select(station=station) st = st_selected.copy() for tr in st: tr.trim(starttime=starttime, endtime=endtime) if not bool(tr): st.remove(tr) del st_selected # Remove stations which have gaps in at least one trace gaps = st.get_gaps() if gaps: stations = np.unique(np.array(gaps)[:, 1]).tolist() for station in stations: traces = st.select(station=station) for trace in traces: st.remove(trace) # Test if the stream is completely empty # (see __nonzero__ for obspy Stream object) if not bool(st): raise util.DataGapException # Pass stream to be processed and added to data.signal. This # processing includes resampling and determining the availability # of the desired stations. data.add_stream(st, self.resample, self.upfactor) except StopIteration: raise util.ArchiveEmptyException return data
def _load_from_path(self, starttime, endtime): """ Retrieves available files between two times. Parameters ---------- starttime : `obspy.UTCDateTime` object Timestamp from which to read waveform data. endtime : `obspy.UTCDateTime` object Timestamp up to which to read waveform data. Returns ------- files : generator Iterator object of available waveform data files. Raises ------ ArchiveFormatException If the Archive.format attribute has not been set. """ if self.format is None: raise util.ArchiveFormatException start_day = UTCDateTime(starttime.date) dy = 0 files = [] # Loop through time period by day adding files to list # NOTE! This assumes the archive structure is split into days. while start_day + (dy * 86400) <= endtime: now = starttime + (dy * 86400) if self.read_all_stations is True: file_format = self.format.format(year=now.year, month=now.month, day=now.day, jday=now.julday, station="*", dtime=now) files = chain(files, self.archive_path.glob(file_format)) else: for station in self.stations: file_format = self.format.format(year=now.year, month=now.month, day=now.day, jday=now.julday, station=station, dtime=now) files = chain(files, self.archive_path.glob(file_format)) dy += 1 return files
[docs]class WaveformData: """ The WaveformData class encapsulates the waveform data returned by an` Archive query. This includes the waveform data which has been pre-processed to a unified sampling rate, and checked for gaps, ready for use to calculate onset functions. Parameters ---------- starttime : `obspy.UTCDateTime` object Timestamp of first sample of waveform data. endtime : `obspy.UTCDateTime` object Timestamp of last sample of waveform data. sampling_rate : int Desired sampling rate of signal data. stations : `pandas.Series` object, optional Series object containing station names. read_all_stations : bool, optional If True, raw_waveforms contain all stations in archive for that time period. Else, only selected stations will be included. response_inv : `obspy.Inventory` object, optional ObsPy response inventory for this waveform archive, containing response information for each channel of each station of each network. pre_pad : float, optional Additional pre pad of data cut based on user-defined pre_cut parameter. post_pad : float, optional Additional post pad of data cut based on user-defined post_cut parameter. Attributes ---------- starttime : `obspy.UTCDateTime` object Timestamp of first sample of waveform data. endtime : `obspy.UTCDateTime` object Timestamp of last sample of waveform data. sampling_rate : int Sampling rate of signal data. stations : `pandas.Series` object Series object containing station names. read_all_stations : bool If True, raw_waveforms contain all stations in archive for that time period. Else, only selected stations will be included. raw_waveforms : `obspy.Stream` object Raw seismic data found and read in from the archive within the specified time period. This may be for all stations in the archive, or only those specified by the user. See `read_all_stations`. pre_pad : float Additional pre pad of data cut based on user-defined pre_cut parameter. post_pad : float Additional post pad of data cut based on user-defined post_cut parameter. signal : `numpy.ndarray`, shape(3, nstations, nsamples) 3-component seismic data at the desired sampling rate; only for desired stations, which have continuous data on all 3 components throughout the desired time period and where (if necessary) the data could be successfully resampled to the desired sampling rate. availability : `np.ndarray` of ints, shape(nstations) Array containing 0s (no data) or 1s (data), corresponding to whether data for each station met the requirements outlined in `signal` filtered_signal : `numpy.ndarray`, shape(3, nstations, nsamples) Filtered data originally from signal. Methods ------- add_stream(stream, resample, upfactor) Function to add data supplied in the form of an `obspy.Stream` object. get_wa_waveform(trace, **response_removal_params) Calculate the Wood-Anderson corrected waveform for a `obspy.Trace` object. times Utility function to generate the corresponding timestamps for the waveform and coalescence data. Raises ------ NotImplementedError If the user attempts to use the get_real_waveform() method. """ def __init__(self, starttime, endtime, sampling_rate, stations=None, response_inv=None, read_all_stations=False, pre_pad=0., post_pad=0.): """Instantiate the WaveformData object.""" self.starttime = starttime self.endtime = endtime self.sampling_rate = sampling_rate self.stations = stations self.response_inv = response_inv self.read_all_stations = read_all_stations self.pre_pad = pre_pad self.post_pad = post_pad self.raw_waveforms = None self.signal = None self.availability = None self.p_onset = None self.s_onset = None self.filtered_signal = None self.wa_waveforms = None
[docs] def add_stream(self, stream, resample, upfactor): """ Add signal data supplied in an `obspy.Stream` object. Perform resampling if necessary (decimation and/or upsampling), and determine availability of selected stations. Parameters: ----------- stream : `obspy.Stream` object Contains list of `obspy.Trace` objects containing the waveform data to add. resample : bool, optional If true, perform resampling of data which cannot be decimated directly to the desired sampling rate. upfactor : int, optional Factor by which to upsample the data to enable it to be decimated to the desired sampling rate, e.g. 40Hz -> 50Hz requires upfactor = 5. """ # Decimate and/or upsample stream if required to achieve the specified # sampling rate stream = self._resample(stream, resample, upfactor) # Combine the data into an array and determine station availability self.signal, self.availability = self._station_availability(stream)
[docs] def get_real_waveforms(self, tr, remove_full_response=False, velocity=True): """ Coming soon. """ raise NotImplementedError("Coming soon. Please contact the " "QuakeMigrate developers.")
[docs] def get_wa_waveform(self, tr, water_level, pre_filt, remove_full_response=False, velocity=False): """ Calculate simulated Wood Anderson displacement waveform for a Trace. Parameters ---------- tr : `obspy.Trace` object Trace containing the waveform to be corrected to a Wood-Anderson response water_level : float Water-level to be used in the instrument correction. pre_filt : tuple of floats, or None Filter corners describing filter to be applied to the trace before deconvolution. E.g. (0.05, 0.06, 30, 35) (in Hz) remove_full_response : bool, optional Remove all response stages, inc. FIR (st.remove_response()), not just poles-and-zero response stage. Default: False. velocity : bool, optional Output velocity waveform, instead of displacement. Default: False. Returns ------- tr : `obspy.Trace` object Trace corrected to Wood-Anderson response. Raises ------ AttributeError If no response inventory has been supplied. ResponseNotFoundError If the response information for a trace can't be found in the supplied response inventory. ResponseRemovalError If the deconvolution of the instrument response and simulation of the Wood-Anderson response is unsuccessful. NotImplementedError If the user selects velocity=True. """ if not self.response_inv: raise AttributeError("No response inventory provided!") # Copy the Trace before operating on it tr = tr.copy() tr.detrend('linear') if velocity is True: msg = ("Only displacement WA waveforms are currently available. " "Please contact the QuakeMigrate developers.") raise NotImplementedError(msg) if not remove_full_response: # Just remove the response encapsulated in the instrument transfer # function (stored as a PolesAndZeros response). NOTE: this does # not account for the effect of the digital FIR filters applied to # the recorded waveforms. However, due to this it is significantly # faster to compute. try: response = self.response_inv.get_response(tr.id, tr.stats.starttime) except Exception as e: raise util.ResponseNotFoundError(str(e), tr.id) # Get the instrument transfer function as a PAZ dictionary paz = response.get_paz() # if not velocity: paz.zeros.extend([0j]) paz_dict = {'poles': paz.poles, 'zeros': paz.zeros, 'gain': paz.normalization_factor, 'sensitivity': response.instrument_sensitivity.value} try: tr.simulate(paz_remove=paz_dict, pre_filt=pre_filt, water_level=water_level, taper=True, sacsim=True, # To replicate remove_response() pitsasim=False, # To replicate remove_response() paz_simulate=util.wa_response()) except ValueError as e: raise util.ResponseRemovalError(e, tr.id) else: # Use remove_response(), which removes the effect of _all_ response # stages, including the FIR stages. Considerably slower. try: tr.remove_response(inventory=self.response_inv, output='DISP', pre_filt=pre_filt, water_level=water_level, taper=True) tr.simulate(paz_simulate=util.wa_response()) except ValueError as e: raise util.ResponseRemovalError(e, tr.id) try: self.wa_waveforms.append(tr) except AttributeError: self.wa_waveforms = Stream() self.wa_waveforms.append(tr) return tr
[docs] def times(self, **kwargs): """ Utility function to generate timestamps between `data.starttime` and `data.endtime`, with a sample size of `data.sample_size` Returns ------- times : `numpy.ndarray`, shape(nsamples) Timestamps for the timeseries data. """ # Utilise the .times() method of `obspy.Trace` objects tr = Trace(header={"npts": self.signal.shape[-1], "sampling_rate": self.sampling_rate, "starttime": self.starttime}) return tr.times(**kwargs)
def _resample(self, stream, resample, upfactor): """ Resample the stream to the specified sampling rate. By default, this function will only perform decimation of the data. If necessary, and if the user specifies `resample = True` and an upfactor to upsample by `upfactor = int`, data can also be upsampled and then, if necessary, subsequently decimated to achieve the desired sampling rate. For example, for raw input data sampled at a mix of 40, 50 and 100 Hz, to achieve a unified sampling rate of 50 Hz, the user would have to specify an upfactor of 5; 40 Hz x 5 = 200 Hz, which can then be decimated to 50 Hz. NOTE: data will be detrended and a cosine taper applied before decimation, in order to avoid edge effects when applying the lowpass filter. Parameters ---------- stream : `obspy.Stream` object Contains list of `obspy.Trace` objects to be decimated / resampled. resample : bool If true, perform resampling of data which cannot be decimated directly to the desired sampling rate. upfactor : int or None Factor by which to upsample the data to enable it to be decimated to the desired sampling rate, e.g. 40Hz -> 50Hz requires upfactor = 5. Returns ------- stream : `obspy.Stream` object Contains list of resampled `obspy.Trace` objects at the chosen sampling rate `sr`. """ sr = self.sampling_rate for trace in stream: if sr != trace.stats.sampling_rate: if (trace.stats.sampling_rate % sr) == 0: stream.remove(trace) trace = util.decimate(trace, sr) stream += trace elif resample and upfactor is not None: # Check the upsampled sampling rate can be decimated to sr if int(trace.stats.sampling_rate * upfactor) % sr != 0: raise util.BadUpfactorException(trace) stream.remove(trace) trace = util.upsample(trace, upfactor) if trace.stats.sampling_rate != sr: trace = util.decimate(trace, sr) stream += trace else: logging.info("Mismatched sampling rates - cannot decimate " "data - to resample data, set .resample " "= True and choose a suitable upfactor") return stream def _station_availability(self, stream): """ Determine whether continuous data exists between two times for a given station. Parameters ---------- stream : `obspy.Stream` object Stream containing 3-component data for stations in station file. samples : int Number of samples expected in the signal. Returns ------- signal : `numpy.ndarray`, shape(3, nstations, nsamples) 3-component seismic data only for stations with continuous data on all 3 components throughout the desired time period. availability : `np.ndarray` of ints, shape(nstations) Array containing 0s (no data) or 1s (data). """ samples = util.time2sample(self.endtime - self.starttime, self.sampling_rate) + 1 availability = np.zeros(len(self.stations)).astype(int) signal = np.zeros((3, len(self.stations), int(samples))) for i, station in enumerate(self.stations): tmp_st = stream.select(station=station) if len(tmp_st) == 3: # Check traces are the correct number of samples and not filled # by a constant value (i.e. not flatlines) if (tmp_st[0].stats.npts == samples and tmp_st[1].stats.npts == samples and tmp_st[2].stats.npts == samples and tmp_st[0].data.max() != tmp_st[0].data.min() and tmp_st[1].data.max() != tmp_st[1].data.min() and tmp_st[2].data.max() != tmp_st[2].data.min()): # Defining the station as available availability[i] = 1 for tr in tmp_st: # Check channel name has 3 characters try: channel = tr.stats.channel[2] # Assign data to signal array by component if channel == "E" or channel == "2": signal[1, i, :] = tr.data elif channel == "N" or channel == "1": signal[0, i, :] = tr.data elif channel == "Z": signal[2, i, :] = tr.data else: raise util.ChannelNameException(tr) except IndexError: raise util.ChannelNameException(tr) # Check to see if no traces were continuously active during this period if not np.any(availability): raise util.DataGapException return signal, availability @property def sample_size(self): """Get the size of a sample (units: s).""" return 1 / self.sampling_rate