Source code for pesummary.gw.waveform

# Licensed under an MIT style license -- see LICENSE.md

import numpy as np
import lalsimulation as lalsim
from lalsimulation import (
    SimInspiralGetSpinFreqFromApproximant, SIM_INSPIRAL_SPINS_CASEBYCASE,
    SIM_INSPIRAL_SPINS_FLOW
)
from pesummary.utils.utils import iterator, logger
from pesummary.utils.exceptions import EvolveSpinError

__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]


def _get_spin_freq_from_approximant(approximant):
    """Determine whether the reference frequency is the starting frequency
    for a given approximant string.

    Parameters
    ----------
    approximant: str
        Name of the approximant you wish to check
    """
    try:
        # default to using LAL code
        approx = getattr(lalsim, approximant)
        return SimInspiralGetSpinFreqFromApproximant(approx)
    except AttributeError:
        from lalsimulation import (
            SIM_INSPIRAL_SPINS_NONPRECESSING, SIM_INSPIRAL_SPINS_F_REF
        )
        # check to see if approximant is in gwsignal
        from lalsimulation.gwsignal.models import gwsignal_get_waveform_generator
        approx = gwsignal_get_waveform_generator(approximant)
        meta = approx.metadata
        if meta["type"] == "aligned_spin":
            return SIM_INSPIRAL_SPINS_NONPRECESSING
        elif meta["type"] == "precessing_spin":
            if meta["f_ref_spin"]:
                return SIM_INSPIRAL_SPINS_F_REF
            return SIM_INSPIRAL_SPINS_FLOW
        raise EvolveSpinError(
            "Unable to evolve spins as '{}' does not have a set frequency "
            "at which the spins are defined".format(approximant)
        )


def _get_start_freq_from_approximant(approximant, f_low, f_ref):
    """Determine the starting frequency to use when evolving the spins for
    a given approximant string.

    Parameters
    ----------
    approximant: str
        Name of the approximant you wish to check
    f_low: float
        Low frequency used when generating the posterior samples
    f_ref: float
        Reference frequency used when generating the posterior samples
    """
    try:
        spinfreq_enum = _get_spin_freq_from_approximant(approximant)
    except ValueError:  # raised when approximant is not in gwsignal
        raise EvolveSpinError(
            "Unable to evolve spins as '{}' is unknown to lalsimulation "
            "and gwsignal".format(approximant)
        )
    if spinfreq_enum == SIM_INSPIRAL_SPINS_CASEBYCASE:
        _msg = (
            "Unable to evolve spins as '{}' does not have a set frequency "
            "at which the spins are defined".format(approximant)
        )
        logger.warning(_msg)
        raise EvolveSpinError(_msg)
    return float(np.where(
        np.array(spinfreq_enum == SIM_INSPIRAL_SPINS_FLOW), f_low, f_ref
    ))


def _check_approximant_from_string(approximant):
    """Check to see if the approximant is known to lalsimulation and/or
    gwsignal

    Parameters
    ----------
    approximant: str
        approximant you wish to check
    """
    if hasattr(lalsim, approximant):
        return True
    else:
        from lalsimulation.gwsignal.models import gwsignal_get_waveform_generator
        try:
            _ = gwsignal_get_waveform_generator(approximant)
        except (ValueError, NameError):
            return False
        return True


def _lal_approximant_from_string(approximant):
    """Return the LAL approximant number given an approximant string

    Parameters
    ----------
    approximant: str
        approximant you wish to convert
    """
    return lalsim.GetApproximantFromString(approximant)


def _insert_mode_array(modes, LAL_parameters=None):
    """Add a mode array to a LAL dictionary

    Parameters
    ----------
    modes: 2d list
        2d list of modes you wish to add to a LAL dictionary. Must be of the
        form [[l1, m1], [l2, m2]]
    LAL_parameters: LALDict, optional
        An existing LAL dictionary to add mode array to. If not provided, a new
        LAL dictionary is created. Default None.
    """
    if LAL_parameters is None:
        import lal
        LAL_parameters = lal.CreateDict()
    _mode_array = lalsim.SimInspiralCreateModeArray()
    for l, m in modes:
        lalsim.SimInspiralModeArrayActivateMode(_mode_array, l, m)
    lalsim.SimInspiralWaveformParamsInsertModeArray(LAL_parameters, _mode_array)
    return LAL_parameters


def _waveform_args(samples, f_ref=20., ind=0, longAscNodes=0., eccentricity=0.):
    """Arguments to be passed to waveform generation

    Parameters
    ----------
    f_ref: float, optional
        reference frequency to use when converting spherical spins to
        cartesian spins
    ind: int, optional
        index for the sample you wish to plot
    longAscNodes: float, optional
        longitude of ascending nodes, degenerate with the polarization
        angle. Default 0.
    eccentricity: float, optional
        eccentricity at reference frequency. Default 0.
    """
    from lal import MSUN_SI, PC_SI

    key = list(samples.keys())[0]
    if isinstance(samples[key], (list, np.ndarray)):
        _samples = {key: value[ind] for key, value in samples.items()}
    else:
        _samples = samples.copy()
    required = [
        "mass_1", "mass_2", "luminosity_distance"
    ]
    if not all(param in _samples.keys() for param in required):
        raise ValueError(
            "Unable to generate a waveform. Please add samples for "
            + ", ".join(required)
        )
    waveform_args = [
        _samples["mass_1"] * MSUN_SI, _samples["mass_2"] * MSUN_SI
    ]
    spin_angles = [
        "theta_jn", "phi_jl", "tilt_1", "tilt_2", "phi_12", "a_1", "a_2",
        "phase"
    ]
    spin_angles_condition = all(
        spin in _samples.keys() for spin in spin_angles
    )
    cartesian_spins = [
        "spin_1x", "spin_1y", "spin_1z", "spin_2x", "spin_2y", "spin_2z"
    ]
    cartesian_spins_condition = any(
        spin in _samples.keys() for spin in cartesian_spins
    )
    if spin_angles_condition and not cartesian_spins_condition:
        from pesummary.gw.conversions import component_spins
        data = component_spins(
            _samples["theta_jn"], _samples["phi_jl"], _samples["tilt_1"],
            _samples["tilt_2"], _samples["phi_12"], _samples["a_1"],
            _samples["a_2"], _samples["mass_1"], _samples["mass_2"],
            f_ref, _samples["phase"]
        )
        iota, spin_1x, spin_1y, spin_1z, spin_2x, spin_2y, spin_2z = data.T
        spins = [spin_1x, spin_1y, spin_1z, spin_2x, spin_2y, spin_2z]
    else:
        iota = _samples["iota"]
        spins = [
            _samples[param] if param in _samples.keys() else 0. for param in
            ["spin_1x", "spin_1y", "spin_1z", "spin_2x", "spin_2y", "spin_2z"]
        ]
    _zero_spins = np.isclose(spins, 0.)
    if sum(_zero_spins):
        spins = np.array(spins)
        spins[_zero_spins] = 0.
        spins = list(spins)
    waveform_args += spins
    phase = _samples["phase"] if "phase" in _samples.keys() else 0.
    waveform_args += [
        _samples["luminosity_distance"] * PC_SI * 10**6, iota, phase
    ]
    waveform_args += [longAscNodes, eccentricity, 0.]
    return waveform_args, _samples


[docs]def antenna_response(samples, ifo): """ """ import importlib mod = importlib.import_module("pesummary.gw.plots.plot") func = getattr(mod, "__antenna_response") antenna = func( ifo, samples["ra"], samples["dec"], samples["psi"], samples["geocent_time"] ) return antenna
def _project_waveform(ifo, hp, hc, ra, dec, psi, time): """Project a waveform onto a given detector Parameters ---------- ifo: str name of the detector you wish to project the waveform onto hp: np.ndarray plus gravitational wave polarization hc: np.ndarray cross gravitational wave polarization ra: float right ascension to be passed to antenna response function dec: float declination to be passed to antenna response function psi: float polarization to be passed to antenna response function time: float time to be passed to antenna response function """ samples = { "ra": ra, "dec": dec, "psi": psi, "geocent_time": time } antenna = antenna_response(samples, ifo) ht = hp * antenna[0] + hc * antenna[1] return ht
[docs]def fd_waveform( samples, approximant, delta_f, f_low, f_high, f_ref=20., project=None, ind=0, longAscNodes=0., eccentricity=0., LAL_parameters=None, mode_array=None, pycbc=False, flen=None ): """Generate a gravitational wave in the frequency domain Parameters ---------- approximant: str name of the approximant to use when generating the waveform delta_f: float spacing between frequency samples f_low: float frequency to start evaluating the waveform f_high: float frequency to stop evaluating the waveform f_ref: float, optional reference frequency project: str, optional name of the detector to project the waveform onto. If None, the plus and cross polarizations are returned. Default None ind: int, optional index for the sample you wish to plot longAscNodes: float, optional longitude of ascending nodes, degenerate with the polarization angle. Default 0. eccentricity: float, optional eccentricity at reference frequency. Default 0. LAL_parameters: LALDict, optional LAL dictionary containing accessory parameters. Default None mode_array: 2d list 2d list of modes you wish to include in waveform. Must be of the form [[l1, m1], [l2, m2]] pycbc: Bool, optional return a the waveform as a pycbc.frequencyseries.FrequencySeries object flen: int Length of the frequency series in samples. Default is None. Only used when pycbc=True """ from gwpy.frequencyseries import FrequencySeries waveform_args, _samples = _waveform_args( samples, f_ref=f_ref, ind=ind, longAscNodes=longAscNodes, eccentricity=eccentricity ) approx = _lal_approximant_from_string(approximant) if mode_array is not None: LAL_parameters = _insert_mode_array( mode_array, LAL_parameters=LAL_parameters ) hp, hc = lalsim.SimInspiralChooseFDWaveform( *waveform_args, delta_f, f_low, f_high, f_ref, LAL_parameters, approx ) hp = FrequencySeries(hp.data.data, df=hp.deltaF, f0=0.) hc = FrequencySeries(hc.data.data, df=hc.deltaF, f0=0.) if pycbc: hp, hc = hp.to_pycbc(), hc.to_pycbc() if flen is not None: hp.resize(flen) hc.resize(flen) if project is None: return {"h_plus": hp, "h_cross": hc} ht = _project_waveform( project, hp, hc, _samples["ra"], _samples["dec"], _samples["psi"], _samples["geocent_time"] ) return ht
def _wrapper_for_td_waveform(args): """Wrapper function for td_waveform for a pool of workers Parameters ---------- args: tuple All args passed to td_waveform """ return td_waveform(*args)
[docs]def td_waveform( samples, approximant, delta_t, f_low, f_ref=20., project=None, ind=0, longAscNodes=0., eccentricity=0., LAL_parameters=None, mode_array=None, pycbc=False, level=None, multi_process=1 ): """Generate a gravitational wave in the time domain Parameters ---------- approximant: str name of the approximant to use when generating the waveform delta_t: float spacing between frequency samples f_low: float frequency to start evaluating the waveform f_ref: float, optional reference frequency project: str, optional name of the detector to project the waveform onto. If None, the plus and cross polarizations are returned. Default None ind: int, optional index for the sample you wish to plot longAscNodes: float, optional longitude of ascending nodes, degenerate with the polarization angle. Default 0. eccentricity: float, optional eccentricity at reference frequency. Default 0. LAL_parameters: LALDict, optional LAL dictionary containing accessory parameters. Default None mode_array: 2d list 2d list of modes you wish to include in waveform. Must be of the form [[l1, m1], [l2, m2]] pycbc: Bool, optional return a the waveform as a pycbc.timeseries.TimeSeries object level: list, optional the symmetric confidence interval of the time domain waveform. Level must be greater than 0 and less than 1 multi_process: int, optional number of cores to run on when generating waveforms. Only used when level is not None """ approx = _lal_approximant_from_string(approximant) if mode_array is not None: LAL_parameters = _insert_mode_array( mode_array, LAL_parameters=LAL_parameters ) if level is not None: import multiprocessing from pesummary.core.plots.interpolate import Bounded_interp1d td_waveform_list = [] _key = list(samples.keys())[0] N = len(samples[_key]) with multiprocessing.Pool(multi_process) as pool: args = np.array([ [samples] * N, [approximant] * N, [delta_t] * N, [f_low] * N, [f_ref] * N, [project] * N, np.arange(N), [longAscNodes] * N, [eccentricity] * N, [LAL_parameters] * N, [mode_array] * N, [pycbc] * N, [None] * N ], dtype="object").T td_waveform_list = list( iterator( pool.imap(_wrapper_for_td_waveform, args), tqdm=True, logger=logger, total=N, desc="Generating waveforms" ) ) td_waveform_array = np.array(td_waveform_list, dtype=object) _level = (1 + np.array(level)) / 2 if project is None: mint = np.min( [ np.min([_.times[0].value for _ in waveform.values()]) for waveform in td_waveform_array ] ) maxt = np.max( [ np.max([_.times[-1].value for _ in waveform.values()]) for waveform in td_waveform_array ] ) new_t = np.arange(mint, maxt, delta_t) td_waveform_array = { polarization: np.array( [ Bounded_interp1d( np.array(waveform[polarization].times, dtype=np.float64), waveform[polarization], xlow=mint, xhigh=maxt )(new_t) for waveform in td_waveform_array ] ) for polarization in ["h_plus", "h_cross"] } else: mint = np.min([_.times[0].value for _ in td_waveform_array]) maxt = np.max([_.times[-1].value for _ in td_waveform_array]) new_t = np.arange(mint, maxt, delta_t) td_waveform_array = { "h_t": [ Bounded_interp1d( np.array(waveform.times, dtype=np.float64), waveform, xlow=mint, xhigh=maxt )(new_t) for waveform in td_waveform_array ] } upper = { polarization: np.percentile( td_waveform_array[polarization], _level * 100, axis=0 ) for polarization in td_waveform_array.keys() } lower = { polarization: np.percentile( td_waveform_array[polarization], (1 - _level) * 100, axis=0 ) for polarization in td_waveform_array.keys() } if len(upper) == 1: upper = upper["h_t"] lower = lower["h_t"] waveform_args, _samples = _waveform_args( samples, ind=ind, longAscNodes=longAscNodes, eccentricity=eccentricity, f_ref=f_ref ) waveform = _td_waveform( waveform_args, approx, delta_t, f_low, f_ref, LAL_parameters, _samples, pycbc=pycbc, project=project ) if level is not None: return waveform, upper, lower, new_t return waveform
def _td_waveform( waveform_args, approximant, delta_t, f_low, f_ref, LAL_parameters, samples, pycbc=False, project=None ): """Generate a gravitational wave in the time domain Parameters ---------- waveform_args: tuple args to pass to lalsimulation.SimInspiralChooseTDWaveform approximant: str lalsimulation approximant number to use when generating a waveform delta_t: float spacing between time samples f_low: float frequency to start evaluating the waveform f_ref: float, optional reference frequency LAL_parameters: LALDict LAL dictionary containing accessory parameters. Default None samples: dict dictionary of posterior samples to use when projecting the waveform onto a given detector pycbc: Bool, optional return a the waveform as a pycbc.timeseries.TimeSeries object project: str, optional name of the detector to project the waveform onto. If None, the plus and cross polarizations are returned. Default None """ from gwpy.timeseries import TimeSeries from astropy.units import Quantity hp, hc = lalsim.SimInspiralChooseTDWaveform( *waveform_args, delta_t, f_low, f_ref, LAL_parameters, approximant ) hp = TimeSeries(hp.data.data, dt=hp.deltaT, t0=hp.epoch) hc = TimeSeries(hc.data.data, dt=hc.deltaT, t0=hc.epoch) if pycbc: hp, hc = hp.to_pycbc(), hc.to_pycbc() if project is None: return {"h_plus": hp, "h_cross": hc} ht = _project_waveform( project, hp, hc, samples["ra"], samples["dec"], samples["psi"], samples["geocent_time"] ) if "{}_time".format(project) not in samples.keys(): from pesummary.gw.conversions import time_in_each_ifo try: _detector_time = time_in_each_ifo( project, samples["ra"], samples["dec"], samples["geocent_time"] ) except Exception: logger.warning( "Unable to calculate samples for '{}_time' using the provided " "posterior samples. Unable to shift merger to merger time in " "the detector".format(project) ) return ht else: _detector_time = samples["{}_time".format(project)] ht.times = ( Quantity(ht.times, unit="s") + Quantity(_detector_time, unit="s") ) return ht