Source code for quakemigrate.export.to_mfast

# -*- coding: utf-8 -*-
"""
This module provides parsers to generate SAC waveform files from an ObsPy Catalog, with
headers correctly populated for MFAST.

:copyright:
    2020–2023, QuakeMigrate developers.
:license:
    GNU General Public License, Version 3
    (https://www.gnu.org/licenses/gpl-3.0.html)

"""

import pathlib

from obspy import read
from obspy.core import AttribDict
from obspy.geodetics import gps2dist_azimuth


cmpaz = {"N": 0, "Z": 0, "E": 90}
cmpinc = {"N": 90, "Z": 0, "E": 90}


[docs]def sac_mfast(event, stations, output_path, units, filename=None): """ Function to create the SAC file. Parameters ---------- event : `ObsPy.Event` object Contains information about the origin time and a list of associated picks. stations : `pandas.DataFrame` object DataFrame containing station information. output_path : str Location to save the SAC file. units : {"km", "m"} Grid projection coordinates for QM LUT (determines units of depths and uncertainties in the .event files). filename : str, optional Name of SAC file - defaults to "eventid/eventid.station.{comp}". """ # Read in the mSEED file containing stream = read(event.extra.cut_waveforms_file.value) # Set distance conversion factor (from units of QM LUT projection units). if units == "km": factor = 1 elif units == "m": factor = 1e3 else: raise AttributeError(f"units must be 'km' or 'm'; not {units}") # Create general SAC header AttribDict event_header = AttribDict() origin = event.preferred_origin() event_header.evla = origin.latitude event_header.evlo = origin.longitude # Obspy Event object already has all units converted to metres event_header.evdp = origin.depth / 1000.0 # converted to km eventid = str(event.resource_id) if filename is None: filename = eventid + ".{}.{}" else: filename = filename + ".{}.{}" output_path = pathlib.Path(output_path) / eventid output_path.mkdir(parents=True, exist_ok=True) # Loop over the available stations and get the pick information for _, station in stations.iterrows(): st = stream.select(station=station.Name) station_header = AttribDict() station_header.stla = station.Latitude station_header.stlo = station.Longitude station_header.stel = station.Elevation / factor # convert to m # Calculate the distance and azimuth between event and station dist, az, _ = gps2dist_azimuth( event_header.evla, event_header.evlo, station.Latitude, station.Longitude ) station_header.dist = dist / 1000.0 # convert m to km station_header.az = az # Get relevant picks here picks = [] for pick in event.picks: if pick.waveform_id.station_code == station.Name: picks.append(pick) if not picks: # If no phase picks for this station, continue continue reference = st[0].stats.starttime origin_time = origin.time - reference p_pick = s_pick = 0 for pick in picks: if pick.phase_hint == "P": p_pick = pick.time - reference elif pick.phase_hint == "S": s_pick = pick.time - reference if s_pick == 0: continue # Set pick error (think about good bounds?) kt5 = 1 pick_header = AttribDict() pick_header.t0 = s_pick pick_header.kt5 = str(kt5) pick_header.kt0 = str(kt5) pick_header.o = origin_time if p_pick != 0: pick_header.a = p_pick for comp in ["Z", "N", "E"]: tr = st.select(channel=f"*{comp}")[0] # Write out to SAC file, then read in again to fill header name = filename.format(station.Name, comp.lower()) name = str(output_path / name) tr.write(name, format="SAC") tr = read(name)[0] sac_header = AttribDict() sac_header.cmpaz = str(cmpaz[comp]) sac_header.cmpinc = str(cmpinc[comp]) sac_header.kcmpnm = f"HH{comp}" sac_header.update(event_header) sac_header.update(station_header) sac_header.update(pick_header) tr.stats.sac.update(sac_header) tr.write(name, format="SAC")