Source code for quakemigrate.core.lib

# -*- coding: utf-8 -*-
"""
Bindings for the QuakeMigrate C libraries.

:copyright:
    2020–2024, QuakeMigrate developers.
:license:
    GNU General Public License, Version 3
    (https://www.gnu.org/licenses/gpl-3.0.html)

"""

from __future__ import annotations

import logging

import numpy as np
import numpy.ctypeslib as clib

from quakemigrate.core.libnames import _load_cdll
import quakemigrate.util as util


qmlib = _load_cdll("qmlib")

# Make datatype aliases and build custom datatypes
c_int32 = clib.ctypes.c_int32
c_int64 = clib.ctypes.c_int64
c_dPt = clib.ndpointer(dtype=np.double, flags="C_CONTIGUOUS")
c_i32Pt = clib.ndpointer(dtype=np.int32, flags="C_CONTIGUOUS")
c_i64Pt = clib.ndpointer(dtype=np.int64, flags="C_CONTIGUOUS")

stalta_header_t = np.dtype(
    [("n", c_int32), ("nsta", c_int32), ("nlta", c_int32)], align=True
)
stalta_header_pt = clib.ndpointer(stalta_header_t, flags="C_CONTIGUOUS")

qmlib.migrate.argtypes = [
    c_dPt,
    c_i32Pt,
    c_dPt,
    c_int32,
    c_int32,
    c_int32,
    c_int32,
    c_int32,
    c_int64,
    c_int64,
]


[docs]@util.timeit() def migrate( onsets: np.ndarray[np.double], traveltimes: np.ndarray[int], first_idx: int, last_idx: int, available: int, threads: int, ) -> np.ndarray[np.double]: """ Computes 4-D coalescence map by migrating seismic phase onset functions. Parameters ---------- onsets: Onset functions for each seismic phase, shape(nonsets, nsamples). traveltimes: Grids of seismic phase traveltimes, converted to an integer multiple of the sampling rate, shape(nx, ny, nz, nonsets). first_idx: Index of first sample in array from which to scan. last_idx: Index of last sample in array up to which to scan. available: Number of available onset functions. threads: Number of threads with which to perform the scan. Returns ------- map4d: 4-D coalescence map, shape(nx, ny, nz, nsamples). Raises ------ ValueError If mismatch between number of onset functions and traveltime lookup tables. Expect both to be equal to the no. stations * no. phases. ValueError If the number of samples in the onset functions is less than the number of samples array is smaller than map4d[0, 0, 0, :]. """ # By taking the log of the onsets, we can calculate the geometric mean as an # arithmetic mean (we then exponentiate within the C function to return the # correct coalescence value). Clip as a safety check to prevent trying to take # log(0). onsets = np.clip(onsets, 0.01, np.inf) onsets = np.log(onsets) *grid_dimensions, n_luts = traveltimes.shape n_onsets, t_samples = onsets.shape logging.debug(f"(n_onsets, t_samples) : ({n_onsets}, {t_samples})") n_samples = t_samples - first_idx - last_idx logging.debug(f"n_samples : {n_samples}") map4d = np.zeros(tuple(grid_dimensions) + (n_samples,), dtype=np.double) logging.debug(f"map4d shape : {map4d.shape}") n_nodes = np.prod(grid_dimensions) if not n_luts == n_onsets: raise ValueError( f"Mismatch between number of stations for data and LUT, {n_onsets}:{n_luts}" ) if onsets.size < n_samples + first_idx: raise ValueError("Data array smaller than coalescence array.") qmlib.migrate( onsets, traveltimes, map4d, c_int32(first_idx), c_int32(last_idx), c_int32(n_samples), c_int32(n_onsets), c_int32(available), c_int64(n_nodes), c_int64(threads), ) return map4d
qmlib.find_max_coa.argtypes = [c_dPt, c_dPt, c_dPt, c_i64Pt, c_int32, c_int64, c_int64]
[docs]@util.timeit() def find_max_coa( map4d: np.ndarray[np.double], threads: int ) -> tuple[np.ndarray[np.double], np.ndarray[np.double], np.ndarray[int]]: """ Finds time series of the maximum coalescence/normalised coalescence in the 3-D volume, and the corresponding grid indices. Parameters ---------- map4d: 4-D coalescence map, shape(nx, ny, nz, nsamples). threads: Number of threads with which to perform the scan. Returns ------- max_coa: Time series of the maximum coalescence value in the 3-D volume. max_norm_coa: Times series of the maximum normalised coalescence value in the 3-D volume. max_coa_idx: Time series of the flattened grid indices corresponding to the maximum coalescence value in the 3-D volume. """ *grid_dimensions, n_samples = map4d.shape n_nodes = np.prod(grid_dimensions) max_coa = np.zeros(n_samples, dtype=np.double) max_norm_coa = np.zeros(n_samples, dtype=np.double) max_coa_idx = np.zeros(n_samples, dtype=np.int64) qmlib.find_max_coa( map4d, max_coa, max_norm_coa, max_coa_idx, c_int32(n_samples), c_int64(n_nodes), c_int64(threads), ) return max_coa, max_norm_coa, max_coa_idx
qmlib.overlapping_sta_lta.argtypes = [c_dPt, stalta_header_pt, c_dPt]
[docs]def overlapping_sta_lta( signal: np.ndarray[float], nsta: int, nlta: int ) -> np.ndarray[np.double]: """ Compute the STA/LTA onset function with overlapping windows. The return value is allocated to the last sample of the STA window. |--- STA ---| |------------------------- LTA -------------------------| ^ Value assigned here Parameters ---------- signal: Pre-processed waveform data to be processed into an onset function. nsta: Number of samples in the short-term average window. nlta: Number of samples in the long-term average window. Returns ------- onset: Overlapping STA/LTA onset function. """ # Build header structure and ensure signal data is contiguous in memory head = np.empty(1, dtype=stalta_header_t) head[:] = (len(signal), nsta, nlta) signal = np.ascontiguousarray(signal, dtype=np.double) onset = np.ones(len(signal), dtype=np.double) qmlib.overlapping_sta_lta(signal, head, onset) return onset
qmlib.centred_sta_lta.argtypes = [c_dPt, stalta_header_pt, c_dPt]
[docs]def centred_sta_lta( signal: np.ndarray[float], nsta: int, nlta: int ) -> np.ndarray[np.double]: """ Compute the STA/LTA onset function with consecutive windows. The return value is allocated to the last sample of the LTA window. |--- STA ---| |---------------------- LTA ----------------------| ^ Value assigned here Parameters ---------- signal: Pre-processed waveform data to be processed into an onset function. nsta: Number of samples in the short-term average window. nlta: Number of samples in the long-term average window. Returns ------- onset: Centred STA/LTA onset function. """ # Build header structure and ensure signal data is contiguous in memory head = np.empty(1, dtype=stalta_header_t) head[:] = (len(signal), nsta, nlta) signal = np.ascontiguousarray(signal, dtype=np.double) onset = np.ones(len(signal), dtype=np.double) qmlib.centred_sta_lta(signal, head, onset) return onset
qmlib.recursive_sta_lta.argtypes = [c_dPt, stalta_header_pt, c_dPt]
[docs]def recursive_sta_lta( signal: np.ndarray[float], nsta: int, nlta: int ) -> np.ndarray[np.double]: """ Compute the STA/LTA onset function with consecutive windows using a recursive method (minimises memory costs). Reproduces exactly the centred STA/LTA onset. |--- STA ---| |---------------------- LTA ----------------------| ^ Value assigned here Parameters ---------- signal: Pre-processed waveform data to be processed into an onset function. nsta: Number of samples in the short-term average window. nlta: Number of samples in the long-term average window. Returns ------- onset: Recursive (centred) STA/LTA onset function. """ # Build header structure and ensure signal data is contiguous in memory head = np.empty(1, dtype=stalta_header_t) head[:] = (len(signal), nsta, nlta) signal = np.ascontiguousarray(signal, dtype=np.double) onset = np.zeros(len(signal), dtype=np.double) qmlib.recursive_sta_lta(signal, head, onset) return onset