Source code for pyirf.cut_optimization

from itertools import product
import numpy as np
from astropy.table import QTable
import astropy.units as u
from tqdm.auto import tqdm
import operator


from .cuts import evaluate_binned_cut_by_index, calculate_percentile_cut, evaluate_binned_cut
from .sensitivity import calculate_sensitivity, estimate_background
from .binning import create_histogram_table, bin_center, calculate_bin_indices


__all__ = [
    "optimize_gh_cut",
    "optimize_cuts",
]


[docs] def optimize_cuts( signal, background, reco_energy_bins, multiplicity_cuts, gh_cut_efficiencies, theta_cut_efficiencies, fov_offset_min=0 * u.deg, fov_offset_max=1 * u.deg, alpha=1.0, theta_min_value=0.02 * u.deg, theta_max_value=0.3 * u.deg, progress=True, **kwargs ): """ Optimize the gamma/hadronnes, theta and multiplicity cut in every energy bin of reconstructed energy for best sensitivity. Parameters ---------- signal: astropy.table.QTable event list of simulated signal events. Required columns are `theta`, `reco_energy`, 'weight', `gh_score` No directional (theta) or gamma/hadron cut should already be applied. background: astropy.table.QTable event list of simulated background events. Required columns are `reco_source_fov_offset`, `reco_energy`, 'weight', `gh_score`. No directional (theta) or gamma/hadron cut should already be applied. reco_energy_bins: astropy.units.Quantity[energy] Bins in reconstructed energy to use for sensitivity computation multiplicity_cuts: np.ndarray[int, ndim=1] Values to scan for minimum telescope multiplicity gh_cut_efficiencies: np.ndarray[float, ndim=1] The gamma/hadron separation cut efficiencies to scan for best sensitivity. theta_cut_efficiencies: astropy.table.QTable The theta cut efficiencies to scan for best sensitivity. fov_offset_min: astropy.units.Quantity[angle] Minimum distance from the fov center for background events to be taken into account fov_offset_max: astropy.units.Quantity[angle] Maximum distance from the fov center for background events to be taken into account alpha: float Size ratio of off region / on region. Will be used to scale the background rate. theta_min_value : u.Quantity[angle] minimum theta cut value theta_max_value : u.Quantity[angle] maximum theta cut value progress: bool If True, show a progress bar during cut optimization **kwargs are passed to ``calculate_sensitivity`` """ gh_cut_efficiencies = np.asanyarray(gh_cut_efficiencies) gh_cut_percentiles = 100 * (1 - gh_cut_efficiencies) fill_value = signal['gh_score'].max() sensitivities = [] cut_indices = [] n_theta_cuts = len(theta_cut_efficiencies) n_gh_cuts = len(gh_cut_efficiencies) n_cuts = len(multiplicity_cuts) * n_theta_cuts * n_gh_cuts signal_bin_index, signal_valid = calculate_bin_indices( signal['reco_energy'], reco_energy_bins, ) background_bin_index, background_valid = calculate_bin_indices( background['reco_energy'], reco_energy_bins, ) gh_cut_grid = [] theta_cut_grid = [] with tqdm(total=n_cuts, disable=not progress) as bar: for multiplicity_index, multiplicity_cut in enumerate(multiplicity_cuts): signal_mask_multiplicity = signal['multiplicity'] >= multiplicity_cut background_mask_multiplicity = background["multiplicity"] >= multiplicity_cut gh_cuts = calculate_percentile_cut( signal['gh_score'][signal_mask_multiplicity], signal['reco_energy'][signal_mask_multiplicity], bins=reco_energy_bins, fill_value=fill_value, percentile=gh_cut_percentiles, ) gh_cut_grid.append(gh_cuts) theta_cuts = calculate_percentile_cut( signal['theta'][signal_mask_multiplicity], signal['reco_energy'][signal_mask_multiplicity], bins=reco_energy_bins, fill_value=theta_max_value, min_value=theta_min_value, max_value=theta_max_value, percentile=100 * theta_cut_efficiencies, ) theta_cut_grid.append(theta_cuts) for gh_index, theta_index in product(range(n_gh_cuts), range(n_theta_cuts)): # apply the current cuts theta_cut = theta_cuts.copy() theta_cut["cut"] = theta_cuts["cut"][:, theta_index] signal_mask_theta = evaluate_binned_cut_by_index( signal["theta"], signal_bin_index, signal_valid, theta_cut, operator.le, ) gh_cut = gh_cuts.copy() gh_cut["cut"] = gh_cuts["cut"][:, gh_index] signal_mask_gh = evaluate_binned_cut_by_index( signal["gh_score"], signal_bin_index, signal_valid, gh_cut, operator.ge, ) signal_selected = signal_mask_gh & signal_mask_theta & signal_mask_multiplicity background_mask_gh = evaluate_binned_cut_by_index( background["gh_score"], background_bin_index, background_valid, gh_cut, operator.ge, ) background_selected = background_mask_gh & background_mask_multiplicity # create the histograms signal_hist = create_histogram_table( signal[signal_selected], reco_energy_bins, "reco_energy" ) background_hist = estimate_background( events=background[background_selected], reco_energy_bins=reco_energy_bins, theta_cuts=theta_cut, alpha=alpha, fov_offset_min=fov_offset_min, fov_offset_max=fov_offset_max, ) sensitivity = calculate_sensitivity( signal_hist, background_hist, alpha=alpha, **kwargs, ) cut_indices.append((multiplicity_index, theta_index, gh_index)) sensitivities.append(sensitivity) bar.update(1) best_gh_cut = QTable() best_gh_cut["low"] = reco_energy_bins[0:-1] best_gh_cut["center"] = bin_center(reco_energy_bins) best_gh_cut["high"] = reco_energy_bins[1:] best_gh_cut["cut"] = np.nan best_gh_cut["efficiency"] = np.nan best_multiplicity_cut = QTable() best_multiplicity_cut["low"] = reco_energy_bins[0:-1] best_multiplicity_cut["center"] = bin_center(reco_energy_bins) best_multiplicity_cut["high"] = reco_energy_bins[1:] best_multiplicity_cut["cut"] = np.nan best_theta_cut = QTable() best_theta_cut["low"] = reco_energy_bins[0:-1] best_theta_cut["center"] = bin_center(reco_energy_bins) best_theta_cut["high"] = reco_energy_bins[1:] best_theta_cut["cut"] = np.nan best_theta_cut["cut"].unit = theta_cuts["cut"].unit best_theta_cut["efficiency"] = np.nan best_sensitivity = sensitivities[0].copy() for bin_id in range(len(reco_energy_bins) - 1): sensitivities_bin = [s["relative_sensitivity"][bin_id] for s in sensitivities] if not np.all(np.isnan(sensitivities_bin)): # nanargmin won't return the index of nan entries best = np.nanargmin(sensitivities_bin) else: # if all are invalid, just use the first one best = 0 multiplicity_index, theta_index, gh_index = cut_indices[best] best_sensitivity[bin_id] = sensitivities[best][bin_id] best_gh_cut["cut"][bin_id] = gh_cut_grid[multiplicity_index]["cut"][bin_id][gh_index] best_multiplicity_cut["cut"][bin_id] = multiplicity_cuts[multiplicity_index] best_theta_cut["cut"][bin_id] = theta_cut_grid[multiplicity_index]["cut"][bin_id][theta_index] best_gh_cut["efficiency"][bin_id] = gh_cut_efficiencies[gh_index] best_theta_cut["efficiency"][bin_id] = theta_cut_efficiencies[theta_index] return best_sensitivity, best_multiplicity_cut, best_theta_cut, best_gh_cut
[docs] def optimize_gh_cut( signal, background, reco_energy_bins, gh_cut_efficiencies, theta_cuts, op=operator.ge, fov_offset_min=0 * u.deg, fov_offset_max=1 * u.deg, alpha=1.0, progress=True, **kwargs ): """ This procedure is EventDisplay-like, since it only applies a pre-computed theta cut and then optimizes only the gamma/hadron separation cut. Parameters ---------- signal: astropy.table.QTable event list of simulated signal events. Required columns are `theta`, `reco_energy`, 'weight', `gh_score` No directional (theta) or gamma/hadron cut should already be applied. background: astropy.table.QTable event list of simulated background events. Required columns are `reco_source_fov_offset`, `reco_energy`, 'weight', `gh_score`. No directional (theta) or gamma/hadron cut should already be applied. reco_energy_bins: astropy.units.Quantity[energy] Bins in reconstructed energy to use for sensitivity computation gh_cut_efficiencies: np.ndarray[float, ndim=1] The cut efficiencies to scan for best sensitivity. op: comparison function with signature f(a, b) -> bool The comparison function to use for the gamma hadron score. Returning true means an event passes the cut, so is not discarded. E.g. for gammaness-like score, use `operator.ge` (>=) and for a hadroness-like score use `operator.le` (<=). fov_offset_min: astropy.units.Quantity[angle] Minimum distance from the fov center for background events to be taken into account fov_offset_max: astropy.units.Quantity[angle] Maximum distance from the fov center for background events to be taken into account alpha: float Size ratio of off region / on region. Will be used to scale the background rate. progress: bool If True, show a progress bar during cut optimization **kwargs are passed to ``calculate_sensitivity`` """ # we apply each cut for all reco_energy_bins globally, calculate the # sensitivity and then lookup the best sensitivity for each # bin independently signal_selected_theta = evaluate_binned_cut( signal['theta'], signal['reco_energy'], theta_cuts, op=operator.le, ) sensitivities = [] # calculate necessary percentile needed for # ``calculate_percentile_cut`` with the correct efficiency. # Depends on the operator, since we need to invert the # efficiency if we compare using >=, since percentile is # defines as <=. gh_cut_efficiencies = np.asanyarray(gh_cut_efficiencies) if op(-1, 1): # if operator behaves like "<=", "<" etc: percentiles = 100 * gh_cut_efficiencies fill_value = signal['gh_score'].min() else: # operator behaves like ">=", ">" percentiles = 100 * (1 - gh_cut_efficiencies) fill_value = signal['gh_score'].max() gh_cuts = calculate_percentile_cut( signal['gh_score'], signal['reco_energy'], bins=reco_energy_bins, fill_value=fill_value, percentile=percentiles, ) for col in tqdm(range(len(gh_cut_efficiencies)), disable=not progress): gh_cut = gh_cuts.copy() gh_cut["cut"] = gh_cuts["cut"][:, col] # apply the current cut signal_selected = evaluate_binned_cut( signal["gh_score"], signal["reco_energy"], gh_cut, op, ) & signal_selected_theta background_selected = evaluate_binned_cut( background["gh_score"], background["reco_energy"], gh_cut, op, ) # create the histograms signal_hist = create_histogram_table( signal[signal_selected], reco_energy_bins, "reco_energy" ) background_hist = estimate_background( events=background[background_selected], reco_energy_bins=reco_energy_bins, theta_cuts=theta_cuts, alpha=alpha, fov_offset_min=fov_offset_min, fov_offset_max=fov_offset_max, ) sensitivity = calculate_sensitivity( signal_hist, background_hist, alpha=alpha, **kwargs, ) sensitivities.append(sensitivity) best_cut_table = QTable() best_cut_table["low"] = reco_energy_bins[0:-1] best_cut_table["center"] = bin_center(reco_energy_bins) best_cut_table["high"] = reco_energy_bins[1:] best_cut_table["cut"] = np.nan best_cut_table["efficiency"] = np.nan best_sensitivity = sensitivities[0].copy() for bin_id in range(len(reco_energy_bins) - 1): sensitivities_bin = [s["relative_sensitivity"][bin_id] for s in sensitivities] if not np.all(np.isnan(sensitivities_bin)): # nanargmin won't return the index of nan entries best = np.nanargmin(sensitivities_bin) else: # if all are invalid, just use the first one best = 0 best_sensitivity[bin_id] = sensitivities[best][bin_id] best_cut_table["cut"][bin_id] = gh_cuts["cut"][bin_id][best] best_cut_table["efficiency"][bin_id] = gh_cut_efficiencies[best] return best_sensitivity, best_cut_table