Source code for pyirf.binning

"""
Utility functions for binning
"""

import numpy as np
from scipy.interpolate import interp1d
import astropy.units as u
from astropy.table import QTable

from .compat import COPY_IF_NEEDED


#: Index returned by `calculate_bin_indices` for underflown values
UNDERFLOW_INDEX = np.iinfo(np.int64).min
#: Index returned by `calculate_bin_indices` for overflown values
OVERFLOW_INDEX = np.iinfo(np.int64).max


[docs] def bin_center(edges): return 0.5 * (edges[:-1] + edges[1:])
[docs] def join_bin_lo_hi(bin_lo, bin_hi): """ Function joins bins into lo and hi part, e.g. [0, 1, 2] and [1, 2, 4] into [0, 1, 2, 4] It works on multidimentional arrays as long as the binning is in the last axis Parameters ---------- bin_lo: np.array or u.Quantity Lo bin edges array bin_hi: np.array or u.Quantity Hi bin edges array Returns ------- bins: np.array of u.Quantity The joint bins """ if np.allclose(bin_lo[..., 1:], bin_hi[..., :-1], rtol=1.0e-5): last_axis = len(bin_lo.shape) - 1 bins = np.concatenate((bin_lo, bin_hi[..., -1:]), axis=last_axis) return bins else: raise ValueError("Not matching bin edges")
[docs] def split_bin_lo_hi(bins): """ Inverted function to join_bin_hi_lo, e.g. it splits [0, 1, 2, 4] into [0, 1, 2] and [1, 2, 4] Parameters ---------- bins: np.array of u.Quantity The joint bins Returns ------- bin_lo: np.array or u.Quantity Lo bin edges array bin_hi: np.array or u.Quantity Hi bin edges array """ bin_lo = bins[..., :-1] bin_hi = bins[..., 1:] return bin_lo, bin_hi
[docs] def add_overflow_bins(bins, positive=True): """ Add under and overflow bins to a bin array. Parameters ---------- bins: np.array or u.Quantity Bin edges array positive: bool If True, the underflow array will start at 0, if not at ``-np.inf`` """ lower = 0 if positive else -np.inf upper = np.inf if hasattr(bins, "unit"): lower *= bins.unit upper *= bins.unit if bins[0] > lower: bins = np.append(lower, bins) if bins[-1] < upper: bins = np.append(bins, upper) return bins
[docs] @u.quantity_input(e_min=u.TeV, e_max=u.TeV) def create_bins_per_decade(e_min, e_max, bins_per_decade=5): """ Create a bin array with bins equally spaced in logarithmic energy with ``bins_per_decade`` bins per decade. Parameters ---------- e_min: u.Quantity[energy] Minimum energy, inclusive e_max: u.Quantity[energy] Maximum energy, non-inclusive If the endpoint exactly matches the ``n_bins_per_decade`` requirement, it will be included. n_bins_per_decade: int number of bins per decade Returns ------- bins: u.Quantity[energy] The created bin array, will have units of e_min """ unit = e_min.unit log_lower = np.log10(e_min.to_value(unit)) log_upper = np.log10(e_max.to_value(unit)) step = 1 / bins_per_decade # include endpoint if reasonably close eps = step / 10000 bins = 10 ** np.arange(log_lower, log_upper + eps, step) return u.Quantity(bins, e_min.unit, copy=COPY_IF_NEEDED)
[docs] def calculate_bin_indices(data, bins): """ Calculate bin indices of inidividula entries of the given data array using the supplied binning. Underflow will be indicated by `UNDERFLOW_INDEX` and overflow by `OVERFLOW_INDEX`. If the bins already include underflow / overflow bins, e.g. `bins[0] = -np.inf` and `bins[-1] = np.inf`, using the result of this function will always be a valid index into the resulting histogram. Parameters ---------- data: ``~np.ndarray`` or ``~astropy.units.Quantity`` Array with the data bins: ``~np.ndarray`` or ``~astropy.units.Quantity`` Array or Quantity of bin edges. Must have the same unit as ``data`` if a Quantity. Returns ------- bin_index: np.ndarray[int] Indices of the histogram bin the values in data belong to. Under- and overflown values will have values of `UNDERFLOW_INDEX` and `OVERFLOW_INDEX` respectively. valid: np.ndarray[bool] Boolean mask indicating if a given value belongs into one of the defined bins. False indicates that an entry fell into the over- or underflow bins. """ if hasattr(data, "unit"): if not hasattr(bins, "unit"): raise TypeError(f"If ``data`` is a Quantity, so must ``bins``, got {bins}") unit = data.unit data = data.to_value(unit) bins = bins.to_value(unit) n_bins = len(bins) - 1 idx = np.digitize(data, bins) - 1 underflow = idx < 0 overflow = idx >= n_bins idx[underflow] = UNDERFLOW_INDEX idx[overflow] = OVERFLOW_INDEX valid = ~underflow & ~overflow return idx, valid
[docs] def create_histogram_table(events, bins, key="reco_energy"): """ Histogram a variable from events data into an astropy table. Parameters ---------- events : ``astropy.QTable`` Astropy Table object containing the reconstructed events information. bins: ``~np.ndarray`` or ``~astropy.units.Quantity`` Array or Quantity of bin edges. It must have the same units as ``data`` if a Quantity. key : ``string`` Variable to histogram from the events table. Returns ------- hist: ``astropy.QTable`` Astropy table containg the histogram. """ hist = QTable() hist[key + "_low"] = bins[:-1] hist[key + "_high"] = bins[1:] hist[key + "_center"] = 0.5 * (hist[key + "_low"] + hist[key + "_high"]) hist["n"], _ = np.histogram(events[key], bins) # also calculate weighted number of events if "weight" in events.colnames: hist["n_weighted"], _ = np.histogram( events[key], bins, weights=events["weight"] ) else: hist["n_weighted"] = hist["n"] # create counts per particle type, only works if there is at least 1 event if "particle_type" in events.colnames and len(events) > 0: by_particle = events.group_by("particle_type") for group_key, group in zip(by_particle.groups.keys, by_particle.groups): particle = group_key["particle_type"] hist["n_" + particle], _ = np.histogram(group[key], bins) # also calculate weighted number of events col = "n_" + particle if "weight" in events.colnames: hist[col + "_weighted"], _ = np.histogram( group[key], bins, weights=group["weight"] ) else: hist[col + "_weighted"] = hist[col] return hist
[docs] def resample_histogram1d(data, old_edges, new_edges, axis=0): """ Rebinning of a histogram by interpolation along a given axis. Parameters ---------- data : ``numpy.ndarray`` or ``astropy.units.Quantity`` Histogram. old_edges : ``numpy.array`` or ``astropy.units.Quantity`` Binning used to calculate ``data``. ``len(old_edges) - 1`` needs to equal the length of ``data`` along interpolation axis (``axis``). If quantity, needs to be compatible to ``new_edges``. new_edges : ``numpy.array`` or ``astropy.units.Quantity`` Binning of new histogram. If quantity, needs to be compatible to ``old_edges``. axis : int Interpolation axis. Returns ------- ``numpy.ndarray`` or ``astropy.units.Quantity`` Interpolated histogram with dimension according to ``data`` and ``new_edges``. If ``data`` is a quantity, this has the same unit. """ data_unit = None if isinstance(data, u.Quantity): data_unit = data.unit data = data.to_value(data_unit) over_underflow_bin_width = old_edges[-2] - old_edges[1] old_edges = u.Quantity( np.nan_to_num( old_edges, posinf=old_edges[-2] + over_underflow_bin_width, neginf=old_edges[1] - over_underflow_bin_width, ) ) new_edges = u.Quantity(np.nan_to_num(new_edges)) old_edges = old_edges.to(new_edges.unit) cumsum = np.insert(data.cumsum(axis=axis), 0, 0, axis=axis) norm = data.sum(axis=axis, keepdims=True) norm[norm == 0] = 1 cumsum /= norm f_integral = interp1d( old_edges, cumsum, bounds_error=False, fill_value=(0, 1), kind="quadratic", axis=axis, ) values = np.diff(f_integral(new_edges), axis=axis) * norm if data_unit: values = u.Quantity(values, unit=data_unit) return values