Source code for quakemigrate.io.scanmseed

# -*- coding: utf-8 -*-
"""
Module to handle input/output of .scanmseed files.

:copyright:
    2020–2024, QuakeMigrate developers.
:license:
    GNU General Public License, Version 3
    (https://www.gnu.org/licenses/gpl-3.0.html)

"""

import logging

import numpy as np
from obspy import read, Stream, Trace, UTCDateTime
from obspy.io.mseed import InternalMSEEDError
import pandas as pd

import quakemigrate.util as util


[docs]class ScanmSEED: """ Light class to encapsulate the data output by the detect stage of QuakeMigrate. This data is stored in an `obspy.Stream` object with the channels: ["COA", "COA_N", "X", "Y", "Z"]. Parameters ---------- run : :class:`~quakemigrate.io.core.Run` object Light class encapsulating i/o path information for a given run. continuous_write : bool Option to continuously write the .scanmseed file output by :func:`~quakemigrate.signal.scan.QuakeScan.detect()` at the end of every time step. Default behaviour is to write in day chunks where possible. sampling_rate : int Desired sampling rate of input data; sampling rate at which to compute the coalescence function. Default: 50 Hz. Attributes ---------- stream : `obspy.Stream` object Output of :func:`~quakemigrate.signal.scan.QuakeScan.detect()` stored in `obspy.Stream` object. The values have been multiplied by a factor to make use of more efficient compression. Channels: ["COA", "COA_N", "X", "Y", "Z"] written : bool Tracker for whether the data appended has been written recently. Methods ------- append(times, max_coa, max_coa_n, coord, map4d=None) Append the output of :func:`~quakemigrate.signal.scan.QuakeScan._compute()` to the coalescence stream. empty(starttime, timestep, i, msg) Create an set of empty arrays for a given timestep and append to the coalescence stream. write(write_start=None, write_end=None) Write the coalescence stream to a .scanmseed file. """ def __init__(self, run, continuous_write, sampling_rate): """Instantiate the ScanmSEED object.""" self.run = run self.continuous_write = continuous_write self.sampling_rate = sampling_rate self.written = False self.stream = Stream()
[docs] def append(self, starttime, max_coa, max_coa_n, coord, ucf): """ Append latest timestep of :func:`~quakemigrate.signal.scan.QuakeScan.detect()` output to `obspy.Stream` object. Multiply channels ["COA", "COA_N", "X", "Y", "Z"] by factors of ["1e5", "1e5", "1e6", "1e6", "1e3"] respectively, round and convert to int32 as this dramatically reduces memory usage, and allows the coastream data to be saved in mSEED format with STEIM2 compression. The multiplication factor is removed when the data is read back in. Parameters ---------- starttime : `obspy.UTCDateTime` object Timestamp of first sample of coalescence data. max_coa : `numpy.ndarray` of floats, shape(nsamples) Coalescence value through time. max_coa_n : `numpy.ndarray` of floats, shape(nsamples) Normalised coalescence value through time. coord : `numpy.ndarray` of floats, shape(nsamples) Location of maximum coalescence through time in input projection space. ucf : float A conversion factor based on the lookup table grid projection. Used to ensure the same level of precision (millimetre) is retained during compression, irrespective of the units of the grid projection. """ # Clip max value of COA to prevent int overflow max_coa[max_coa > 21474.0] = 21474.0 max_coa_n[max_coa_n > 21474.0] = 21474.0 meta = { "network": "NW", "npts": len(max_coa), "sampling_rate": self.sampling_rate, "starttime": starttime, } self.stream += Trace( data=self._data2int(max_coa, 1e5), header={**{"station": "COA"}, **meta}, ) self.stream += Trace( data=self._data2int(max_coa_n, 1e5), header={**{"station": "COA_N"}, **meta}, ) self.stream += Trace( data=self._data2int(coord[:, 0], 1e6), header={**{"station": "X"}, **meta} ) self.stream += Trace( data=self._data2int(coord[:, 1], 1e6), header={**{"station": "Y"}, **meta} ) self.stream += Trace( data=self._data2int(coord[:, 2], 1e3 * ucf), header={**{"station": "Z"}, **meta}, ) self.stream.merge(method=-1) # Write to file if passed day line self.written = False stats = self.stream[0].stats if stats.endtime == UTCDateTime(stats.starttime.date) + 86400 - stats.delta: self.write() self.stream = Stream() elif stats.starttime.julday != stats.endtime.julday: logging.debug("Timestep doesn't fall at midnight!") write_start = stats.starttime write_end = UTCDateTime(stats.endtime.date) - stats.delta self.write(write_start, write_end) self.stream.trim(starttime=write_end + stats.delta) # Set written to false, because there is residual data that has not yet been # written to file. self.written = False if self.continuous_write and not self.written: self.write()
[docs] def empty(self, starttime, timestep, i, msg, ucf): """ Create an empty set of arrays to write to .scanmseed; used where there is no data available to run :func:`~quakemigrate.signal.scan.QuakeScan._compute()`. Parameters ---------- starttime : `obspy.UTCDateTime` object Timestamp of first sample in the given timestep. timestep : float Length (in seconds) of timestep used in detect(). i : int The ith timestep of the continuous compute. msg : str Message to output to log giving details as to why this timestep is empty. ucf : float A conversion factor based on the lookup table grid projection. Used to ensure the same level of precision (millimetre) is retained during compression, irrespective of the units of the grid projection. """ logging.info(msg) starttime = starttime + timestep * i n = util.time2sample(timestep, self.sampling_rate) max_coa = max_coa_n = np.full(n, 0) coord = np.full((n, 3), 0) self.append(starttime, max_coa, max_coa_n, coord, ucf)
[docs] def write(self, write_start=None, write_end=None): """ Write a new .scanmseed file from an `obspy.Stream` object containing the data output from detect(). Note: values have been multiplied by a power of ten, rounded and converted to an int32 array so the data can be saved as mSEED with STEIM2 compression. This multiplication factor is removed when the data is read back in with read_scanmseed(). Parameters ---------- write_start : `obspy.UTCDateTime` object, optional Timestamp from which to write the coalescence stream to file. write_end : `obspy.UTCDateTime` object, optional Timestamp up to which to write the coalescence stream to file. """ fpath = self.run.path / "detect" / "scanmseed" fpath.mkdir(exist_ok=True, parents=True) if write_start is not None and write_end is not None: st = self.stream.slice(starttime=write_start, endtime=write_end) else: st = self.stream starttime = st[0].stats.starttime fstem = f"{starttime.year}_{starttime.julday:03d}" file = (fpath / fstem).with_suffix(".scanmseed") try: st.write(str(file), format="MSEED", encoding="STEIM2") except InternalMSEEDError as e: logging.debug( f"Cannot compress data: {e}\nUnable to compress data using STEIM2 - " "falling back on STEIM1." ) st.write(str(file), format="MSEED", encoding="STEIM1") self.written = True
def _data2int(self, data, factor): """ Utility function to convert data to ints before writing. Parameters ---------- data : `numpy.Array` Data stream to convert to integer values. factor : float Scaling factor used to control the number of decimal places saved. Returns ------- out : `numpy.Array`, int Original data stream multiplied by factor and converted to int. """ return np.round(data * factor).astype(np.int32)
[docs]@util.timeit() def read_scanmseed(run, starttime, endtime, pad, ucf): """ Read .scanmseed files between two time stamps. Files are labelled by year and Julian day. Parameters ---------- run : :class:`~quakemigrate.io.core.Run` object Light class encapsulating i/o path information for a given run. starttime : `obspy.UTCDateTime` object Timestamp from which to read the coalescence stream. endtime : `obspy.UTCDateTime` object Timestamp up to which to read the coalescence stream. pad : float Read in "pad" seconds of additional data on either end. ucf : float A conversion factor based on the lookup table grid projection. Used to ensure the same level of precision (millimetre) is retained during compression, irrespective of the units of the grid projection. Returns ------- data : `pandas.DataFrame` object Data output by detect() -- decimated scan. Columns: ["DT", "COA", "COA_N", "X", "Y", "Z"] - X/Y/Z as lon/lat/units where units is the user-selected units of the lookup table grid projection (either metres or kilometres). stats : `obspy.trace.Stats` object Container for additional header information for coalescence trace. Contains keys: network, station, channel, starttime, endtime, sampling_rate, delta, npts, calib, _format, mseed """ fpath = run.path / "detect" / "scanmseed" readstart, readend = starttime - pad, endtime + pad startday = UTCDateTime(readstart.date) dy = 0 scanmseed = Stream() # Loop through days trying to read .scanmseed files while startday + (dy * 86400) <= readend: now = readstart + (dy * 86400) fstem = f"{now.year}_{now.julday:03d}" file = (fpath / fstem).with_suffix(".scanmseed") try: scanmseed += read( str(file), starttime=readstart, endtime=readend, format="MSEED" ) except FileNotFoundError: logging.info(f"\n\t No .scanmseed file found for day {fstem}!") dy += 1 if not bool(scanmseed): raise util.NoScanMseedDataException scanmseed.merge(method=-1) stats = scanmseed.select(station="COA")[0].stats data = pd.DataFrame() data["DT"] = scanmseed[0].times(type="utcdatetime") data["COA"] = scanmseed.select(station="COA")[0].data / 1e5 data["COA_N"] = scanmseed.select(station="COA_N")[0].data / 1e5 data["X"] = scanmseed.select(station="X")[0].data / 1e6 data["Y"] = scanmseed.select(station="Y")[0].data / 1e6 data["Z"] = scanmseed.select(station="Z")[0].data / (1e3 * ucf) # Check if the data covers the entirety of the requested period if stats.starttime > starttime: logging.info( "\n\t Warning! .scanmseed starttime is later than trigger() starttime!" ) elif stats.starttime > readstart: logging.info("\t Warning! No .scanmseed data found for pre-pad!") if stats.endtime < endtime: logging.info("\n\t Warning! .scanmseed endtime is before trigger() endtime!") elif stats.endtime < readend: logging.info("\t Warning! No .scanmseed data found for post-pad!") logging.info(f"\t ...from {stats.starttime} - {stats.endtime}.") return data, stats