Scenario modelling#
Note
If you are not familiar with pypfilt, you may want to begin with the Getting Started tutorial.
What is scenario modelling?#
Scenario modelling involves predicting future outcomes that would occur if specific circumstances (“scenarios”) were met. Key differences between scenario modelling and forecasting include:
Scenario modelling produces conditional predictions — each scenario represents a plausible, but hypothetical, set of circumstances. In contrast, forecasts characterise the expected future based on current circumstances.
Scenario modelling produces long-term predictions — often spanning months or years. In contrast, forecasts typically involve shorter horizons of days to weeks.
See also
See Coronavirus Disease Model to Inform Transmission-Reducing Measures and Health System Preparedness, Australia (Moss et al., 2020) for an example of scenario modelling that informed Australia’s emergency response to COVID-19.
Scenario modelling with pypfilt#
Scenario modelling is straightforward to implement in pypfilt. The key is to use different model prior distributions to characterise each scenario.
Implement a simulation model (see, e.g., the tutorial example).
Define a different prior distribution for each scenario.
Provide no observations, so that the results only reflect the choice of prior distribution.
Use observation models to simulate observations, if desired.
Generate results for each scenario with
pypfilt.fit()
. Alternatively, you can usepypfilt.forecast()
to generate forecasts from \(t = 0\) for each scenario.
Deterministic parameter sampling#
The Latin Hypercube sampler
returns deterministic parameter samples, based on the order in which the distributions are defined.
This allows for pair-wise comparisons between particles in different scenarios.
For example, you may wish to predict the impact of different interventions on a system. To do so, you could define a “baseline” scenario (no interventions) and one or more “intervention” scenarios, and create a summary table that records relevant outputs for each particle. This would allow you to compare output distributions between each intervention scenario and the baseline scenario.
However, with deterministic parameter sampling, you can also obtain the distribution of the relative impact of each intervention, by calculating the difference in output between (a) each particle in an intervention scenario; and (b) each corresponding particle in the baseline scenario.
We will now present an example that demonstrates how to do this.
Interventions in an SIRV model#
We will use an SIRV ODE model
to compare the impact of different interventions on an infectious disease epidemic.
This model divides the population into four compartments: Susceptible \(S(t)\), Infectious \(I(t)\), Recovered \(R(t)\), and Vaccinated \(V(t)\).
Note
The simulation model and interventions presented here are intended for illustrative purposes, rather than characterising real-world applications of scenario modelling.
Simulation model#
The model equations are:
The model supports two types of intervention.
Reducing transmission by a proportion \(e_R\) once daily incidence exceeds a threshold \(i_R\), representing non-pharmaceutical interventions such as wearing masks and social distancing measures:
\[\begin{split}\beta(t) &= [1 - e_R(t)] \cdot R_0 \cdot \gamma \\ e_R(t) &= \begin{cases} 0 & \text{if daily incidence } < i_R \\ e_R & \text{otherwise} \\ \end{cases}\end{split}\]Reducing the susceptible population at some rate \(v\) beginning at time \(t_V\), representing a vaccination program with a vaccine that confers immediate, complete, and permanent protection.
\[\begin{split}v(t) &= \begin{cases} 0 & \text{if } t < t_V \\ v & \text{otherwise} \\ \end{cases}\end{split}\]
Note
In the model implementation (below) we have used descriptive variable names for the intervention parameters, rather than the symbols listed in the equations above.
The SIRV model implementation.
class SirvOde(Model):
"""
An ordinary differential equation implementation of the SIRV model, using
the forward Euler method, which supports two types of interventions:
* Reducing ``R0`` by some fraction (``R0_reduction``) once daily incidence
reaches some threshold (``R0_inc_threshold``); and
* Moving individuals from ``S`` to ``V`` at some rate (``Vaccine_rate``),
beginning at some time (``Vaccine_start``).
The model settings must include the following keys:
* ``population_size``: The number of individuals in the population.
"""
def field_types(self, ctx):
"""
Define the state vector structure.
"""
return [
# Model state variables.
('S', np.float64),
('I', np.float64),
('R', np.float64),
('V', np.float64),
# Model parameters.
('R0', np.float64),
('gamma', np.float64),
# Intervention parameters: reducing R0.
('R0_reduction', np.float64),
('R0_inc_threshold', np.float64),
('R0_effect', bool),
# Intervention parameters: vaccination.
('Vaccine_rate', np.float64),
('Vaccine_start', np.float64),
]
def can_smooth(self):
"""
The fields that can be smoothed by the post-regularisation filter.
"""
# Return the continuous model parameters.
return {'R0', 'gamma'}
def init(self, ctx, vec):
"""
Initialise the state vectors.
"""
# Initialise the model state variables.
population = ctx.settings['model']['population_size']
vec['S'] = population - 1
vec['I'] = 1
vec['R'] = 0
vec['V'] = 0
# This records whether a reduction in R0 has already occurred.
vec['R0_effect'] = False
# Initialise the model parameters.
prior = ctx.data['prior']
for param, samples in prior.items():
vec[param] = samples
def update(self, ctx, time_step, is_forecast, prev, curr):
"""
Update the state vectors.
"""
# Define vaccination rates for each particle.
mask_vacc = (prev['Vaccine_rate'] > 0) & (
prev['Vaccine_start'] <= time_step.start
)
if np.any(mask_vacc):
vacc = mask_vacc.astype(int) * prev['Vaccine_rate']
else:
vacc = 0
# Check for particles where a reduction in R0 should now be imposed.
past = ctx.component['history'].snapshot(ctx).back_n_units(1)
daily_incidence = past['state_vec']['S'] - prev['S']
mask_decr = (prev['R0_reduction'] > 0) & (
prev['R0_inc_threshold'] <= daily_incidence
)
# Define reductions (if any) in R0 for each particle.
mask_R0_effect = mask_decr | prev['R0_effect']
if np.any(mask_R0_effect):
R0_scale = 1 - mask_R0_effect.astype(int) * prev['R0_reduction']
else:
R0_scale = 1
# Calculate the flow rates out of S and I.
beta = R0_scale * prev['R0'] * prev['gamma']
N = ctx.settings['model']['population_size']
s_out = time_step.dt * beta * prev['I'] * prev['S'] / N
i_out = time_step.dt * prev['gamma'] * prev['I']
v_in = time_step.dt * np.minimum(vacc, prev['S'] - s_out)
# Ensure all flows are non-negative.
s_out = s_out.clip(min=0)
i_out = i_out.clip(min=0)
v_in = v_in.clip(min=0)
# Update the state variables.
curr['S'] = (prev['S'] - s_out - v_in).clip(min=0)
curr['I'] = prev['I'] + s_out - i_out
curr['R'] = prev['R'] + i_out
curr['V'] = prev['V'] + v_in
# Copy the model parameters.
curr['R0'] = prev['R0']
curr['gamma'] = prev['gamma']
curr['R0_reduction'] = prev['R0_reduction']
curr['R0_inc_threshold'] = prev['R0_inc_threshold']
curr['R0_effect'] = mask_R0_effect
curr['Vaccine_rate'] = prev['Vaccine_rate']
curr['Vaccine_start'] = prev['Vaccine_start']
Observing daily incidence#
We need to calculate daily incidence in order to reduce transmission at the appropriate time. However, daily incidence is also a useful measure for characterising each particle trajectory and calculating quantities such as the peak size, peak time, and epidemic duration.
We can define an observation model for daily incidence, and record the simulated observations in a SimulatedObs
summary table.
For this purpose, we may not want to introduce any observation error, and so we can define a “perfect” observation model (i.e., zero variance); see the highlighted line in the observation model implementation below.
Observation model for daily incidence.
class Incidence(Univariate):
r"""
A perfect observation model for incidence in the SIRV model.
:param obs_unit: A descriptive name for the data.
:param settings: The observation model settings dictionary.
The settings dictionary should contain the following keys:
* ``observation_period``: The observation period :math:`\Delta`.
For example:
.. code-block:: toml
[observations.cases]
model = "pypfilt.examples.sir.SirvCumIncidence"
observation_period = 1
"""
def new_infections(self, ctx, snapshot):
r"""
Return the number of new infections :math:`S(t-\Delta) - S(t)` that
occurred during the observation period :math:`\Delta` for each
particle.
"""
period = self.settings['observation_period']
prev = snapshot.back_n_units_state_vec(period)
cum_infs_curr = snapshot.state_vec['I'] + snapshot.state_vec['R']
cum_infs_past = prev['I'] + prev['R']
incidence = (cum_infs_curr - cum_infs_past).clip(min=0)
return incidence
def distribution(self, ctx, snapshot):
"""
Return the observation distribution for each particle.
"""
infections = self.new_infections(ctx, snapshot)
return scipy.stats.norm(loc=infections, scale=0)
Measuring impact#
We define the impact of each intervention as the number of infections that were prevented, relative to the baseline (no intervention) scenario. We can measure this by recording the epidemic final size (i.e., the total number of infections) for each particle. This is simply the value of \(R(t)\) at the end of the simulation. We can then calculate by comparing the epidemic final sizes between each intervention scenario and the baseline scenario.
Summary table for epidemic final size.
class FinalSize(Table):
"""
Calculate the final epidemic size for each particle.
"""
def field_types(self, ctx, obs_list, name):
return [
time_field('fs_time'),
('weight', np.float64),
('final_size', np.float64),
]
def n_rows(self, ctx, forecasting):
return ctx.particle_count()
def add_rows(self, ctx, fs_time, window, insert_fn):
pass
def finished(self, ctx, fs_time, window, insert_fn):
final_snapshot = window[-1]
final_sizes = final_snapshot.state_vec['R']
for ix, final_size in enumerate(final_sizes):
weight = final_snapshot.weights[ix]
insert_fn((fs_time, weight, final_size))
Defining scenarios#
We define three scenarios:
"baseline"
: no interventions, the reference against which to calculate intervention impact;"Reduce R0"
: reduce transmission by 33% once incidence reaches 100 infections per day."Vaccination"
: beginning on day 28, vaccinate up to 1000 susceptible people per day.
For all scenarios, we use the same prior distributions for the pathogen parameters:
The intervention parameters differ between scenarios:
We run each scenario for 80 days, and record summary tables for daily incidence ("daily_inc"
) and epidemic final size ("final_sizes"
).
See the scenario definitions below for complete details.
The SIRV scenario definitions.
[components]
model = "pypfilt.examples.sirv.SirvOde"
time = "pypfilt.Scalar"
sampler = "pypfilt.sampler.LatinHypercube"
summary = "pypfilt.summary.HDF5"
[time]
start = 0.0
until = 80.0
steps_per_unit = 12
summaries_per_unit = 12
[model]
population_size = 10000
[prior]
R0 = { name = "uniform", args.loc = 2, args.scale = 1 }
gamma = { name = "uniform", args.loc = 0.25, args.scale = 0.1 }
R0_reduction = { name = "constant", args.value = 0 }
R0_inc_threshold = { name = "constant", args.value = 0 }
Vaccine_rate = { name = "constant", args.value = 0 }
Vaccine_start = { name = "constant", args.value = 0 }
[summary.tables]
daily_inc.component = "pypfilt.summary.SimulatedObs"
daily_inc.observation_unit = "daily_incidence"
final_sizes.component = "pypfilt.examples.sirv.FinalSize"
[filter]
particles = 500
prng_seed = 2001
history_window = -1
[observations.daily_incidence]
model = "pypfilt.examples.sirv.Incidence"
observation_period = 1
[scenario.Baseline]
[scenario."Reduce R0"]
prior.R0_reduction = { name = "constant", args.value = 0.33 }
prior.R0_inc_threshold = { name = "constant", args.value = 100 }
[scenario.Vaccination]
prior.Vaccine_rate = { name = "constant", args.value = 1000 }
prior.Vaccine_start = { name = "constant", args.value = 28 }
Running scenario simulations#
The scenario definitions are provided by the sirv_toml_data()
function.
Function that runs each of the SIRV scenarios.
def run_sirv_scenarios():
"""
Run the baseline SIRV scenario and each intervention scenario.
This returns a tuple that contains (a) a dictionary of simulation contexts
for each scenario; and (b) a dictionary of the results for each scenario
(:class:`pypfilt.pfilter.Result`).
"""
source = io.StringIO(pypfilt.examples.sirv.sirv_toml_data())
# Construct simulation contexts for each SIRV scenario.
contexts = {
instance.scenario_id: instance.build_context()
for instance in pypfilt.load_instances(source)
}
# Collect the simulation results for each scenario.
results = {
name: pypfilt.fit(ctx, filename=None).estimation
for name, ctx in contexts.items()
}
return contexts, results
Note that we record each simulation context, in addition to the simulation results. This allows us to inspect the parameter values sampled from the prior distributions for each scenario, so that we can confirm that the pathogen parameters — \(\gamma\) and \(R_0\) — are identical for each scenario, and that the intervention parameters differ between scenarios.
Function that inspects the parameter values for each scenario.
def verify_prior_samples(contexts):
"""
Verify that the parameter values sampled from the prior distributions for
each scenario are either identical (e.g., pathogen parameters) or differ
(e.g., intervention parameters).
"""
baseline_prior = contexts['Baseline'].data['prior']
reduce_R0_prior = contexts['Reduce R0'].data['prior']
vaccination_prior = contexts['Vaccination'].data['prior']
# Verify that samples for baseline parameters are identical.
baseline_parameters = ['R0', 'gamma']
for parameter in baseline_parameters:
baseline_vals = baseline_prior[parameter]
reduce_R0_vals = reduce_R0_prior[parameter]
vaccination_vals = vaccination_prior[parameter]
assert np.array_equal(baseline_vals, reduce_R0_vals)
assert np.array_equal(baseline_vals, vaccination_vals)
# Verify that vaccine parameter samples are different only for the
# vaccination scenario.
vacc_parameters = [
param for param in baseline_prior if param.startswith('Vaccine_')
]
for parameter in vacc_parameters:
baseline_vals = baseline_prior[parameter]
reduce_R0_vals = reduce_R0_prior[parameter]
vaccination_vals = vaccination_prior[parameter]
assert not np.array_equal(baseline_vals, vaccination_vals)
assert np.array_equal(baseline_vals, reduce_R0_vals)
# Verify that R0 parameter samples are different only for the R0 reduction
# scenario.
R0_parameters = [
param for param in baseline_prior if param.startswith('R0_')
]
for parameter in R0_parameters:
baseline_vals = baseline_prior[parameter]
reduce_R0_vals = reduce_R0_prior[parameter]
vaccination_vals = vaccination_prior[parameter]
assert not np.array_equal(baseline_vals, reduce_R0_vals)
assert np.array_equal(baseline_vals, vaccination_vals)
Simulation results#
We can plot the daily incidence curves, and observe the following features:
As expected, the baseline epidemics span rapid outbreaks with large peaks, and slower outbreaks with lower peaks;
Reducing transmission noticeably slows the epidemics and reduces the peak height; and
Vaccination occurs quite late, and only has a noticeable impact on the slowest epidemics.
Function that plots daily incidence curves.
def plot_epi_curves(results, png_file):
"""
Plot the ensemble of daily incidence curves for each scenario.
"""
fig, axs = plt.subplots(
layout='constrained', nrows=len(results), sharex=True, sharey=True
)
# Plot the epidemic curves for each scenario in separate subplots.
for ix, (scenario_id, result) in enumerate(results.items()):
ax = axs[ix]
daily_inc = result.tables['daily_inc']
time = np.unique(daily_inc['time'])
inc_curves = daily_inc['value'].reshape((len(time), -1))
ax.plot(time, inc_curves, alpha=0.02, color='black')
scenario_label = get_scenario_label(scenario_id)
ax.set_title(f'Scenario: {scenario_label}')
# Add the x-axis label to the bottom subplot.
if ix == len(results) - 1:
ax.set_xlabel('Time (days)')
# Add the y-axis label to the middle subplot.
if ix == 1:
ax.set_ylabel('Daily incidence')
fig.savefig(png_file, dpi=300, **png_kwargs())

Daily incidence curves for each scenario.#
We can plot the distribution of epidemic final sizes , and observe the following features:
Reducing transmission substantially reduces the number of infections; and
Vaccination is less consistent in its impact, sometimes having little effect, but also capable of greater reductions in final size than the reducing transmission intervention.
Function that retrieves the epidemic final sizes.
def get_final_sizes(results):
"""
Return the median and 95% credible interval for final size in each
scenario.
"""
ys = []
y_mins = []
y_maxs = []
probs = [0.5, 0.025, 0.975]
for result in results.values():
final_sizes = result.tables['final_sizes']['final_size']
weights = result.tables['final_sizes']['weight']
quantiles = pypfilt.stats.qtl_wt(final_sizes, weights, probs)
ys.append(quantiles[0])
y_mins.append(quantiles[1])
y_maxs.append(quantiles[2])
return (np.array(ys), np.array(y_mins), np.array(y_maxs))

Epidemic final size distributions, reported as medians (points) and 95% credible intervals (lines).#
For each intervention, we can calculate the number of prevented infections separately for each particle, and plot the distribution of prevented infections. The results are consistent with those shown in the previous figure.
Important
We can calculate numbers of prevented infections for each particle, because we have identical samples for \(R_0\) and \(\gamma\) in each scenario. Without this guarantee, it would not be sensible to calculate the difference in epidemic final size between individual particles in different scenarios, and attribute this difference to the intervention alone.
Function that calculates numbers of prevented infections.
def get_prevented_infections(results, relative_to):
"""
Calculate the number of infections that were prevented in each scenario,
relative to the specified baseline scenario (``relative_to``).
This returns a dictionary that maps intervention scenario names to arrays
of prevented infection counts for each particle.
"""
baseline_sizes = results[relative_to].tables['final_sizes']['final_size']
prevented_infections = {}
for scenario, result in results.items():
# Ignore the baseline scenario.
if scenario == relative_to:
continue
# Calculate the number of prevented infections for each particle,
# relative to the corresponding particle in the baseline scenario.
final_sizes = result.tables['final_sizes']['final_size']
num_prevented = baseline_sizes - final_sizes
prevented_infections[scenario] = num_prevented
return prevented_infections

The distribution of prevented infections (relative to the baseline scenario), reported as medians (points) and 95% credible intervals (lines).#
Finally, we can plot the correlations between prevented infections and pathogen parameters, and observe the following features:
The impact of reducing transmission is, unsurprisingly, strongly correlated with \(R_0\) and barely correlated with \(\gamma\);
The impact of vaccination is correlated with \(R_0\), and also with \(\gamma\), because larger values of \(\gamma\) produce shorter epidemics, generating more infections prior to the start of vaccination; and
The daily force of infection \(\beta = R_0 \cdot \gamma\) is more strongly correlated with the impact of vaccination than with the impact of reducing transmission, because the impact of vaccination strongly depends on the number of infections that have occurred prior to the start of vaccination.
Function that plots rank correlations.
def plot_prevented_infection_correlations(contexts, results, png_file):
"""
Plot Spearman correlation coefficients between the numbers of prevented
infections in each scenario, and model parameters that characterise the
pathogen (i.e., parameters that are not related to the interventions).
"""
prevented_infs = get_prevented_infections(results, relative_to='Baseline')
samples = {
'R0': contexts['Baseline'].data['prior']['R0'],
'gamma': contexts['Baseline'].data['prior']['gamma'],
}
samples['beta'] = samples['R0'] * samples['gamma']
corrs = np.zeros((len(samples), len(prevented_infs)))
for param_ix, values in enumerate(samples.values()):
for scenario_ix, num_prevented in enumerate(prevented_infs.values()):
coeff = scipy.stats.spearmanr(num_prevented, values).statistic
corrs[param_ix, scenario_ix] = coeff
fig, ax = plt.subplots(layout='constrained', figsize=[4.8, 4.8])
ax.imshow(corrs, vmin=-1, vmax=1)
for x in range(len(prevented_infs)):
for y in range(len(samples)):
ax.text(
x,
y,
f'{corrs[y, x]: 0.3f}',
ha='center',
va='center',
color='white',
fontsize='x-large',
)
ax.set_title('Correlation with infections prevented')
ax.set_xlabel('Scenario')
ax.set_ylabel('Parameter')
labels = [get_scenario_label(key) for key in prevented_infs.keys()]
ax.set_xticks(np.arange(len(prevented_infs)), labels=labels)
ax.set_yticks(
np.arange(len(samples)),
labels=['$R_0$', r'$\gamma$', r'$\beta$'],
fontsize='large',
)
fig.savefig(png_file, dpi=300, **png_kwargs())

Spearman rank correlation coefficients between (a) the number of prevented infections (relative to the baseline scenario); and (b) model parameters that characterise the pathogen.#
We can also display these correlations as a bar plot, which may be more useful for visual comparisons between individual parameters and/or scenarios.
Function that plots rank correlation bars.
def plot_prevented_infection_correlations_bars(contexts, results, png_file):
"""
Plot Spearman correlation coefficients between the numbers of prevented
infections in each scenario, and model parameters that characterise the
pathogen (i.e., parameters that are not related to the interventions).
"""
prevented_infs = get_prevented_infections(results, relative_to='Baseline')
samples = {
'R0': contexts['Baseline'].data['prior']['R0'],
'gamma': contexts['Baseline'].data['prior']['gamma'],
}
samples['beta'] = samples['R0'] * samples['gamma']
corrs = np.zeros((len(samples), len(prevented_infs)))
for param_ix, values in enumerate(samples.values()):
for scenario_ix, num_prevented in enumerate(prevented_infs.values()):
coeff = scipy.stats.spearmanr(num_prevented, values).statistic
corrs[param_ix, scenario_ix] = coeff
palette = mpl.colormaps['Pastel2']
bar_width = 0.3
fig, ax = plt.subplots(layout='constrained', figsize=[4.8, 4.8])
for ix, scenario in enumerate(prevented_infs):
# Plot the correlation coefficients for this scenario as bars.
corrcoefs = corrs[:, ix]
bars = ax.barh(
np.arange(len(samples)) + ix * bar_width,
width=corrcoefs,
height=bar_width,
color=palette(ix),
label=get_scenario_label(scenario),
)
# NOTE: we position labels differently for small and large values.
# Labels for large values are placed within the bars, while labels for
# small values are placed adjacent to the bars.
large_corrcoefs = [
f'{corrcoef:0.3f}' if abs(corrcoef) > 0.2 else ''
for corrcoef in corrcoefs
]
ax.bar_label(
bars,
labels=large_corrcoefs,
padding=-40,
)
small_corrcoefs = [
f'{corrcoef:0.3f}' if abs(corrcoef) < 0.2 else ''
for corrcoef in corrcoefs
]
ax.bar_label(
bars,
labels=small_corrcoefs,
padding=8,
)
# Ensure that the x-axis spans [-1, 1] by plotting invisible points (this
# adds some padding to the axis limits).
ax.scatter(x=[-1, 1], y=[0, 0], alpha=0)
ax.set_xticks([-1, -0.5, 0, 0.5, 1.0])
# Display parameter names on the left-hand side of the plot.
ax.set_yticks(
np.arange(len(samples)) + 0.5 * bar_width,
labels=['$R_0$', r'$\gamma$', r'$\beta$'],
fontsize='large',
)
axis_linewidth = mpl.rcParams['axes.linewidth']
ax.axvline(x=0, color='black', linewidth=axis_linewidth)
ax.legend(loc='best')
ax.set_xlabel('Rank correlation with infections prevented')
fig.savefig(png_file, dpi=300, **png_kwargs())

Spearman rank correlation coefficients between (a) the number of prevented infections (relative to the baseline scenario); and (b) model parameters that characterise the pathogen.#
Further details#
The code to run these simulations and plot the results can be found in tests/test_sirv.py
.
The contents of this file are provided below.
All code used to generate the results and figures.
tests/test_sirv.py
.#"""Test cases for the SIRV model in ``pypfilt.examples.sirv``."""
import io
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
import scipy.stats
import pypfilt
import pypfilt.examples.sirv
import pypfilt.stats
def test_sirv_scenarios():
# Run the baseline scenario and each intervention scenario.
contexts, results = run_sirv_scenarios()
# Check that parameter values sampled from the prior distributions meet
# our expectations about being identical or different between scenarios.
verify_prior_samples(contexts)
output_dir = Path('doc').resolve() / 'how-to'
# Plot daily incidence curves for each scenario.
plot_epi_curves(results, output_dir / 'scenario-sirv-epi-curves.png')
# Plot the distribution of epidemic final sizes for each scenario.
plot_final_sizes(results, output_dir / 'scenario-sirv-final-sizes.png')
# Plot the distribution of prevented infections for each scenario.
plot_prevented_infections(
results, output_dir / 'scenario-sirv-prevented-infections.png'
)
# Plot the correlation between prevented infections and model parameters.
plot_prevented_infection_correlations(
contexts, results, output_dir / 'scenario-sirv-correlations.png'
)
plot_prevented_infection_correlations_bars(
contexts, results, output_dir / 'scenario-sirv-correlations-bars.png'
)
def run_sirv_scenarios():
"""
Run the baseline SIRV scenario and each intervention scenario.
This returns a tuple that contains (a) a dictionary of simulation contexts
for each scenario; and (b) a dictionary of the results for each scenario
(:class:`pypfilt.pfilter.Result`).
"""
source = io.StringIO(pypfilt.examples.sirv.sirv_toml_data())
# Construct simulation contexts for each SIRV scenario.
contexts = {
instance.scenario_id: instance.build_context()
for instance in pypfilt.load_instances(source)
}
# Collect the simulation results for each scenario.
results = {
name: pypfilt.fit(ctx, filename=None).estimation
for name, ctx in contexts.items()
}
return contexts, results
def verify_prior_samples(contexts):
"""
Verify that the parameter values sampled from the prior distributions for
each scenario are either identical (e.g., pathogen parameters) or differ
(e.g., intervention parameters).
"""
baseline_prior = contexts['Baseline'].data['prior']
reduce_R0_prior = contexts['Reduce R0'].data['prior']
vaccination_prior = contexts['Vaccination'].data['prior']
# Verify that samples for baseline parameters are identical.
baseline_parameters = ['R0', 'gamma']
for parameter in baseline_parameters:
baseline_vals = baseline_prior[parameter]
reduce_R0_vals = reduce_R0_prior[parameter]
vaccination_vals = vaccination_prior[parameter]
assert np.array_equal(baseline_vals, reduce_R0_vals)
assert np.array_equal(baseline_vals, vaccination_vals)
# Verify that vaccine parameter samples are different only for the
# vaccination scenario.
vacc_parameters = [
param for param in baseline_prior if param.startswith('Vaccine_')
]
for parameter in vacc_parameters:
baseline_vals = baseline_prior[parameter]
reduce_R0_vals = reduce_R0_prior[parameter]
vaccination_vals = vaccination_prior[parameter]
assert not np.array_equal(baseline_vals, vaccination_vals)
assert np.array_equal(baseline_vals, reduce_R0_vals)
# Verify that R0 parameter samples are different only for the R0 reduction
# scenario.
R0_parameters = [
param for param in baseline_prior if param.startswith('R0_')
]
for parameter in R0_parameters:
baseline_vals = baseline_prior[parameter]
reduce_R0_vals = reduce_R0_prior[parameter]
vaccination_vals = vaccination_prior[parameter]
assert not np.array_equal(baseline_vals, reduce_R0_vals)
assert np.array_equal(baseline_vals, vaccination_vals)
def get_scenario_label(scenario_id):
"""
Use maths formatting for "R0" in plot axis labels.
"""
return scenario_id.replace('R0', '$R_0$')
def plot_epi_curves(results, png_file):
"""
Plot the ensemble of daily incidence curves for each scenario.
"""
fig, axs = plt.subplots(
layout='constrained', nrows=len(results), sharex=True, sharey=True
)
# Plot the epidemic curves for each scenario in separate subplots.
for ix, (scenario_id, result) in enumerate(results.items()):
ax = axs[ix]
daily_inc = result.tables['daily_inc']
time = np.unique(daily_inc['time'])
inc_curves = daily_inc['value'].reshape((len(time), -1))
ax.plot(time, inc_curves, alpha=0.02, color='black')
scenario_label = get_scenario_label(scenario_id)
ax.set_title(f'Scenario: {scenario_label}')
# Add the x-axis label to the bottom subplot.
if ix == len(results) - 1:
ax.set_xlabel('Time (days)')
# Add the y-axis label to the middle subplot.
if ix == 1:
ax.set_ylabel('Daily incidence')
fig.savefig(png_file, dpi=300, **png_kwargs())
def plot_final_sizes(results, png_file):
"""
Plot the median and 95% credible interval for final size in each scenario.
"""
ys, y_mins, y_maxs = get_final_sizes(results)
fig, ax = plt.subplots(layout='constrained')
# Plot horizontal lines for the baseline median and credible interval.
ax.axhline(ys[0], linestyle='--', color='grey', linewidth=1)
ax.axhline(y_mins[0], linestyle='--', color='grey', linewidth=1)
ax.axhline(y_maxs[0], linestyle='--', color='grey', linewidth=1)
# Plot the median and credible interval for each scenario.
y_errs = np.array([ys - y_mins, y_maxs - ys])
labels = [get_scenario_label(key) for key in results.keys()]
ax.errorbar(labels, ys, yerr=y_errs, fmt='o')
ax.set_xlabel('Scenario')
ax.set_ylabel('Total infections')
ax.set_ylim(bottom=0)
fig.savefig(png_file, dpi=300, **png_kwargs())
def get_final_sizes(results):
"""
Return the median and 95% credible interval for final size in each
scenario.
"""
ys = []
y_mins = []
y_maxs = []
probs = [0.5, 0.025, 0.975]
for result in results.values():
final_sizes = result.tables['final_sizes']['final_size']
weights = result.tables['final_sizes']['weight']
quantiles = pypfilt.stats.qtl_wt(final_sizes, weights, probs)
ys.append(quantiles[0])
y_mins.append(quantiles[1])
y_maxs.append(quantiles[2])
return (np.array(ys), np.array(y_mins), np.array(y_maxs))
def get_prevented_infections(results, relative_to):
"""
Calculate the number of infections that were prevented in each scenario,
relative to the specified baseline scenario (``relative_to``).
This returns a dictionary that maps intervention scenario names to arrays
of prevented infection counts for each particle.
"""
baseline_sizes = results[relative_to].tables['final_sizes']['final_size']
prevented_infections = {}
for scenario, result in results.items():
# Ignore the baseline scenario.
if scenario == relative_to:
continue
# Calculate the number of prevented infections for each particle,
# relative to the corresponding particle in the baseline scenario.
final_sizes = result.tables['final_sizes']['final_size']
num_prevented = baseline_sizes - final_sizes
prevented_infections[scenario] = num_prevented
return prevented_infections
def plot_prevented_infections(results, png_file):
"""
Plot the median and 95% credible interval for prevented infections in each
intervention scenario.
"""
prevented_infs = get_prevented_infections(results, relative_to='Baseline')
ys = []
y_mins = []
y_maxs = []
probs = [0.5, 0.025, 0.975]
for num_prevented in prevented_infs.values():
# NOTE: we know that each particle has the same weight, because we are
# not conditioning on any observations.
weights = np.ones(num_prevented.shape)
quantiles = pypfilt.stats.qtl_wt(num_prevented, weights, probs)
ys.append(quantiles[0])
y_mins.append(quantiles[1])
y_maxs.append(quantiles[2])
ys = np.array(ys)
y_mins = np.array(y_mins)
y_maxs = np.array(y_maxs)
fig, ax = plt.subplots(layout='constrained')
# Plot the median and credible interval for each scenario.
y_errs = np.array([ys - y_mins, y_maxs - ys])
labels = [get_scenario_label(key) for key in prevented_infs.keys()]
ax.errorbar(labels, ys, yerr=y_errs, fmt='o')
ax.set_xlabel('Scenario')
# Expand the x-axis limits to avoid plotting the two sets of results
# at the extreme left and extreme right of the plot.
padding = 0.5
ax.set_xlim(left=0 - padding, right=len(prevented_infs) - 1 + padding)
ax.set_ylabel('Prevented infections')
ax.set_ylim(bottom=0)
fig.savefig(png_file, dpi=300, **png_kwargs())
def plot_prevented_infection_correlations(contexts, results, png_file):
"""
Plot Spearman correlation coefficients between the numbers of prevented
infections in each scenario, and model parameters that characterise the
pathogen (i.e., parameters that are not related to the interventions).
"""
prevented_infs = get_prevented_infections(results, relative_to='Baseline')
samples = {
'R0': contexts['Baseline'].data['prior']['R0'],
'gamma': contexts['Baseline'].data['prior']['gamma'],
}
samples['beta'] = samples['R0'] * samples['gamma']
corrs = np.zeros((len(samples), len(prevented_infs)))
for param_ix, values in enumerate(samples.values()):
for scenario_ix, num_prevented in enumerate(prevented_infs.values()):
coeff = scipy.stats.spearmanr(num_prevented, values).statistic
corrs[param_ix, scenario_ix] = coeff
fig, ax = plt.subplots(layout='constrained', figsize=[4.8, 4.8])
ax.imshow(corrs, vmin=-1, vmax=1)
for x in range(len(prevented_infs)):
for y in range(len(samples)):
ax.text(
x,
y,
f'{corrs[y, x]: 0.3f}',
ha='center',
va='center',
color='white',
fontsize='x-large',
)
ax.set_title('Correlation with infections prevented')
ax.set_xlabel('Scenario')
ax.set_ylabel('Parameter')
labels = [get_scenario_label(key) for key in prevented_infs.keys()]
ax.set_xticks(np.arange(len(prevented_infs)), labels=labels)
ax.set_yticks(
np.arange(len(samples)),
labels=['$R_0$', r'$\gamma$', r'$\beta$'],
fontsize='large',
)
fig.savefig(png_file, dpi=300, **png_kwargs())
def plot_prevented_infection_correlations_bars(contexts, results, png_file):
"""
Plot Spearman correlation coefficients between the numbers of prevented
infections in each scenario, and model parameters that characterise the
pathogen (i.e., parameters that are not related to the interventions).
"""
prevented_infs = get_prevented_infections(results, relative_to='Baseline')
samples = {
'R0': contexts['Baseline'].data['prior']['R0'],
'gamma': contexts['Baseline'].data['prior']['gamma'],
}
samples['beta'] = samples['R0'] * samples['gamma']
corrs = np.zeros((len(samples), len(prevented_infs)))
for param_ix, values in enumerate(samples.values()):
for scenario_ix, num_prevented in enumerate(prevented_infs.values()):
coeff = scipy.stats.spearmanr(num_prevented, values).statistic
corrs[param_ix, scenario_ix] = coeff
palette = mpl.colormaps['Pastel2']
bar_width = 0.3
fig, ax = plt.subplots(layout='constrained', figsize=[4.8, 4.8])
for ix, scenario in enumerate(prevented_infs):
# Plot the correlation coefficients for this scenario as bars.
corrcoefs = corrs[:, ix]
bars = ax.barh(
np.arange(len(samples)) + ix * bar_width,
width=corrcoefs,
height=bar_width,
color=palette(ix),
label=get_scenario_label(scenario),
)
# NOTE: we position labels differently for small and large values.
# Labels for large values are placed within the bars, while labels for
# small values are placed adjacent to the bars.
large_corrcoefs = [
f'{corrcoef:0.3f}' if abs(corrcoef) > 0.2 else ''
for corrcoef in corrcoefs
]
ax.bar_label(
bars,
labels=large_corrcoefs,
padding=-40,
)
small_corrcoefs = [
f'{corrcoef:0.3f}' if abs(corrcoef) < 0.2 else ''
for corrcoef in corrcoefs
]
ax.bar_label(
bars,
labels=small_corrcoefs,
padding=8,
)
# Ensure that the x-axis spans [-1, 1] by plotting invisible points (this
# adds some padding to the axis limits).
ax.scatter(x=[-1, 1], y=[0, 0], alpha=0)
ax.set_xticks([-1, -0.5, 0, 0.5, 1.0])
# Display parameter names on the left-hand side of the plot.
ax.set_yticks(
np.arange(len(samples)) + 0.5 * bar_width,
labels=['$R_0$', r'$\gamma$', r'$\beta$'],
fontsize='large',
)
axis_linewidth = mpl.rcParams['axes.linewidth']
ax.axvline(x=0, color='black', linewidth=axis_linewidth)
ax.legend(loc='best')
ax.set_xlabel('Rank correlation with infections prevented')
fig.savefig(png_file, dpi=300, **png_kwargs())
def png_kwargs():
"""
Return a dictionary of keyword arguments for ``Figure.savefig`` that avoid
storing metadata in PNG images, to ensure that they are reproducible.
"""
return {'pil_kwargs': {'exif': None, 'pnginfo': None}}