Skip to content

Commit

Permalink
Merge pull request #232 from pabloitu/brier_evaluations
Browse files Browse the repository at this point in the history
Implementation of the Brier Score and its consistency test.
  • Loading branch information
Serra314 authored Feb 25, 2025
2 parents 7f151aa + a50d1e3 commit ecaec64
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 1 deletion.
178 changes: 178 additions & 0 deletions csep/core/brier_evaluations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import numpy
import numpy as np
from scipy.stats import poisson

from csep.models import EvaluationResult
from csep.core.exceptions import CSEPCatalogException


def _brier_score_ndarray(forecast, observations):
""" Computes the brier (binary) score for spatial-magnitude cells
using the formula:
Q(Lambda, Sigma) = 1/N sum_{i=1}^N (Lambda_i - Ind(Sigma_i > 0 ))^2
where Lambda is the forecast array, Sigma is the observed catalog, N the
number of spatial-magnitude cells and Ind is the indicator function, which
is 1 if Sigma_i > 0 and 0 otherwise.
Args:
forecast: 2d array of forecasted rates
observations: 2d array of observed counts
Returns
brier: float, brier score
"""

prob_success = 1 - poisson.cdf(0, forecast)
brier_cell = np.square(prob_success.ravel() - (observations.ravel() > 0))
brier = -2 * brier_cell.sum()

for n_dim in observations.shape:
brier /= n_dim
return brier


def _simulate_catalog(sim_cells, sampling_weights, random_numbers=None):
"""
Simulates a catalog by sampling from the sampling_weights array.
Identical to binomial_evaluations._simulate_catalog
Args:
sim_cells:
sampling_weights:
random_numbers:
Returns:
"""
sim_fore = numpy.zeros(sampling_weights.shape)

if random_numbers is None:
# Reset simulation array to zero, but don't reallocate
sim_fore.fill(0)
num_active_cells = 0
while num_active_cells < sim_cells:
random_num = numpy.random.uniform(0,1)
loc = numpy.searchsorted(sampling_weights, random_num,
side='right')
if sim_fore[loc] == 0:
sim_fore[loc] = 1
num_active_cells += 1
else:
# Find insertion points using binary search inserting
# to satisfy a[i-1] <= v < a[i]
pnts = numpy.searchsorted(sampling_weights, random_numbers,
side='right')
# Create simulated catalog by adding to the original locations
numpy.add.at(sim_fore, pnts, 1)

assert sim_fore.sum() == sim_cells, "simulated the wrong number of events!"
return sim_fore


def _brier_score_test(forecast_data, observed_data, num_simulations=1000,
random_numbers=None, seed=None, verbose=True):
""" Computes the Brier consistency test conditional on the total observed
number of events
Args:
forecast_data: 2d array of forecasted rates for spatial_magnitude cells
observed_data: 2d array of a catalog resampled to spatial_magnitude
cells
num_simulations: number of synthetic catalog simulations
random_numbers: numpy array of random numbers to use for simulation
seed: seed for random number generator
verbose: print status updates
"""
# Array-masking that avoids log singularities:
forecast_data = numpy.ma.masked_where(forecast_data <= 0.0, forecast_data)

# set seed for the likelihood test
if seed is not None:
numpy.random.seed(seed)

# used to determine where simulated earthquake should
# be placed, by definition of cumsum these are sorted
sampling_weights = (numpy.cumsum(forecast_data.ravel()) /
numpy.sum(forecast_data))

# data structures to store results
simulated_brier = []
n_active_cells = len(numpy.unique(numpy.nonzero(observed_data.ravel())))
num_cells_to_simulate = int(n_active_cells)

# main simulation step in this loop
for idx in range(num_simulations):
if random_numbers is None:
sim_fore = _simulate_catalog(num_cells_to_simulate,
sampling_weights)
else:
sim_fore = _simulate_catalog(num_cells_to_simulate,
sampling_weights,
random_numbers=random_numbers[idx, :])

# compute Brier score
current_brier = _brier_score_ndarray(forecast_data.data, sim_fore)

# append to list of simulated Brier score
simulated_brier.append(current_brier)

# just be verbose
if verbose:
if (idx + 1) % 100 == 0:
print(f'... {idx + 1} catalogs simulated.')

obs_brier = _brier_score_ndarray(forecast_data.data, observed_data)
# quantile score
qs = numpy.sum(simulated_brier <= obs_brier) / num_simulations

# float, float, list
return qs, obs_brier, simulated_brier


def brier_score_test(gridded_forecast,
observed_catalog,
num_simulations=1000,
seed=None,
random_numbers=None,
verbose=False):
"""
Performs the Brier conditional test on a Gridded Forecast using an
Observed Catalog. Normalizes the forecast so the forecasted rate are
consistent with the observations. This modification eliminates the strong
impact differences in the number distribution have on the forecasted rates.
"""

# grid catalog onto spatial grid
try:
_ = observed_catalog.region.magnitudes
except CSEPCatalogException:
observed_catalog.region = gridded_forecast.region
gridded_catalog_data = observed_catalog.spatial_magnitude_counts()

# simply call likelihood test on catalog data and forecast
qs, obs_brier, simulated_brier = _brier_score_test(
gridded_forecast.data,
gridded_catalog_data,
num_simulations=num_simulations,
seed=seed,
random_numbers=random_numbers,
verbose=verbose
)

# populate result data structure
result = EvaluationResult()
result.test_distribution = simulated_brier
result.name = 'Brier score-Test'
result.observed_statistic = obs_brier
result.quantile = qs
result.sim_name = gridded_forecast.name
result.obs_name = observed_catalog.name
result.status = 'normal'
result.min_mw = numpy.min(gridded_forecast.magnitudes)

return result

61 changes: 60 additions & 1 deletion tests/test_evaluations.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import numpy
import numpy as np
import scipy
import unittest

import csep.core.poisson_evaluations as poisson
import csep.core.binomial_evaluations as binary

import csep.core.brier_evaluations as brier
def get_datadir():
root_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = os.path.join(root_dir, 'artifacts', 'Comcat')
Expand Down Expand Up @@ -115,5 +117,62 @@ def test_binomial_likelihood(self):
numpy.testing.assert_allclose(simulated_ll[0], -7.921741654647629)


class TestPoissonBrier(unittest.TestCase):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.seed = 1
self.forecast_data = numpy.array([[0.3, 0.2, 0.1], [0.2, 0.1, 0.1]])
self.observed_data = numpy.array([[0, 1, 2], [1, 1, 0]])

def test_brier_score_calculation(self):

# 1 bin
rate = 1
prob = 1 - scipy.stats.poisson.pmf(0, rate)
brier_score_hit = -2 * (prob - 1)**2
brier_score = brier._brier_score_ndarray(numpy.array([[rate]]),
numpy.array([[1]]))
numpy.testing.assert_allclose(brier_score_hit, brier_score)

brier_score_nohit = -2 * prob**2
brier_score = brier._brier_score_ndarray(numpy.array([[rate]]),
numpy.array([[0]]))
numpy.testing.assert_allclose(brier_score_nohit, brier_score)
# 2 bins
rate = np.array([1, 0.5])
hits = np.array([0, 1])
prob = 1 - scipy.stats.poisson.pmf(0, rate)
brier_score_2bins = (-2 * (prob - hits)**2).sum() / len(rate)
brier_score = brier._brier_score_ndarray(np.array(rate),
np.array([hits]))
numpy.testing.assert_allclose(brier_score_2bins, brier_score)

hits = np.array([0, 0])
brier_score_2bins = (-2 * (prob - hits)**2).sum() / len(rate)
brier_score = brier._brier_score_ndarray(np.array(rate),
np.array([hits]))
numpy.testing.assert_allclose(brier_score_2bins, brier_score)

def test_brier_test(self):

expected_sim = numpy.array([1, 1, 1, 1, 0, 0])
qs, obs_brier, simulated_brier = brier._brier_score_test(
self.forecast_data,
self.observed_data,
num_simulations=1,
seed=self.seed,
verbose=True)

probs = 1 - scipy.stats.poisson.pmf(0, self.forecast_data.ravel())
sim_brier_analytic = ((-2 * (probs - expected_sim)**2).sum() /
len(probs))
numpy.testing.assert_allclose(simulated_brier[0], sim_brier_analytic)
obs_brier_analytic = (
(-2 * (probs - self.observed_data.ravel().astype(bool))**2).sum() /
len(probs))
numpy.testing.assert_allclose(obs_brier, obs_brier_analytic)


if __name__ == '__main__':
unittest.main()

0 comments on commit ecaec64

Please sign in to comment.