4. How-to Guides

TODO: provide worked examples of specific tasks.

4.1. Creating your own model

Create a new subclass of pypfilt.model.Model and implement the following methods:

  • field_types(), which defines the structure of the particle state vector;
  • can_smooth(), which identifies the particle state vector elements that can be smoothed by the particle filter;
  • init(), which initialises the particle state vectors at the start of a simulation; and
  • update(), which updates the particle state vectors at each time-step.

Note

If your model class contains any internal variables that are not stored in the particle state vectors, you should also implement the resume_from_cache() method so that these variables can be properly initialised when beginning a simulation from a saved state.

It is preferable to store all necessary model variables in the particle state vectors and avoid this complication.

For example, shown below is the code for the Gaussian random walk model provided in the pypfilt.examples.simple module:

class GaussianWalk(pypfilt.Model):
    """A Gaussian random walk."""

    def init(self, ctx, vec):
        vec['x'] = ctx.data['prior']['x']

    def field_types(self, ctx):
        return [('x', np.dtype(float))]

    def update(self, ctx, t, dt, is_fs, prev, curr):
        """Perform a single time-step."""
        rnd = ctx.component['random']['model']
        curr['x'] = prev['x'] + rnd.uniform(size=curr.shape)

    def can_smooth(self):
        return {}

4.2. Creating your own observation model

Inherit from the pypfilt.obs.Univariate class to create observation models that are based on Scipy probability distributions.

For example, shown below is the code for the Gaussian random walk observation model provided in the pypfilt.examples.simple module:

class GaussianObs(pypfilt.obs.Univariate):
    """A Gaussian observation model for the GaussianWalk model."""
    def __init__(self, obs_unit, settings):
        super().__init__(obs_unit)
        self.sdev = settings['parameters']['sdev']

    def distribution(self, ctx, snapshot, ixs=None):
        expected = snapshot.vec['state_vec']['x']
        if ixs is not None:
            expected = expected[ixs]
        return scipy.stats.norm(loc=expected, scale=self.sdev)

Note

You can override any of the pypfilt.obs.Obs methods. This can be useful if you want to support incomplete observations (override log_llhd()), or need to support custom file formats when loading observations (override from_file()).

4.3. Defining prior distributions

4.4. Reading prior samples from external data files

Prior samples can be read from external data files. This allows you to use arbitrarily-complex model prior distributions, which can incorporate features such as correlations between model parameters.

Samples can be read from space-delimited text files using pypfilt.io.read_table():

[prior]
x = { external = true, table = "input-file.ssv", column = "x" }

Samples can also be read from HDF5 datasets, by specifying the file name, dataset path, and column name:

[prior]
x.external = true
x.hdf5 = "prior-samples.hdf5"
x.dataset = "data/prior-samples"
x.column = "x"

4.5. Provide models with lookup tables

4.6. Provide observation models with lookup tables

An observation model may allow some of its parameters to be defined in lookup tables, rather than as fixed values. This allows parameters to differ between particles and to vary over time.

Note

The observation model must support using lookup tables for parameter values. The example shown here uses the epifx.obs.PopnCounts observation model, which allows the observation probability to be defined in a lookup table.

To use a lookup table for an observation model parameter, the scenario must:

  • Define the lookup table by giving it a name (in this example, "pr_obs") and identifying the data file to use (in this example, "pr-obs.ssv");
  • Notify pypfilt that each particle should be associated with a column from this lookup table by setting sample_values = true (see the example below); and
  • Notify the observation model that it should use this lookup table for the observation probability, by providing it as an argument to the observation model’s constructor (in this example, the argument name is "pr_obs_lookup").

These steps are illustrated in the following TOML excerpt, which shows only the relevant lines:

[scenario.test.lookup_tables]
pr_obs = { file = "pr-obs.ssv", sample_values = true }

[scenario.test.observations.cases]
model = "epifx.obs.PopnCounts"
pr_obs_lookup = "pr_obs"

4.7. Using lookup tables in your own models

Time-varying inputs can be provided in a lookup table, and retrieved by the simulation model when updating the particle state vectors at each time-step. The example below shows how to retrieve the current value(s) for a parameter alpha from the lookup table called “alpha_table”.

def update(self, ctx, step_date, dt, is_fs, prev, curr):
    """Perform a single time-step.""""
    # Retrieve the current value(s) for the parameter alpha.
    alpha = ctx.component['lookup']['alpha_table'].lookup(time)
    # NOTE: now update the particle state vectors.
    pass

This table must be defined in the scenario definition:

[lookup_tables]
alpha_table = "alpha-values.ssv"

Note

You may want to avoid hard-coding the table name, and make the table name a model setting.

The scenario definition would need to include this new setting:

[model]
alpha_lookup = "some_table_name"

And the simulation model would use this setting in its update() method:

def update(self, ctx, step_date, dt, is_fs, prev, curr):
    """Perform a single time-step.""""
    # Retrieve the current value(s) for the parameter alpha.
    table_name = ctx.settings['model']['alpha_lookup']
    alpha = ctx.component['lookup'][table_name].lookup(time)
    # NOTE: now update the particle state vectors.
    pass

4.8. Using lookup tables in your own observation models

4.9. Defining and using parameters

4.10. Particle filter: resampling

4.11. Particle filter: post-regularisation

4.12. Creating summary tables

4.13. Creating summary monitors

4.14. Load summary tables as Pandas data frames

You can load summary tables with pypfilt.io.load_dataset(), which returns these tables as NumPy structured arrays. This can easily be extended to return summary tables as Pandas data frames:

import h5py
import pandas as pd
import pypfilt

def load_dataframe(source, dataset, time_scale):
    """
    Load a pypfilt summary table and convert it into a Pandas dataframe.

    :param source: The path to the HDF5 file.
    :param dataset: The HDF5 path to the summary table.
    :param time_scale: The simulation time scale.
    """
    with h5py.File(source, 'r') as f:
        table = pypfilt.io.load_dataset(time_scale, f[dataset])

    return pd.DataFrame(table)

You can then use this function to read summary tables from simulation output files:

output_file = 'output.hdf5'
dataset = '/tables/forecasts'
time_scale = pypfilt.Datetime()
forecasts = load_dataframe(output_file, dataset, time_scale)

4.15. Implement a continuous-time Markov chain (CTMC) model

The pypfilt.examples.sir.SirCtmc model implementation is shown below.

class SirCtmc(Model):
    """
    A continuous-time Markov chain implementation of the SIR model.

    The model settings must include the following keys:

    * ``population_size``: The number of individuals in the population.
    """

    def field_types(self, ctx):
        """
        Define the state vector structure.
        """
        return [
            # Model state variables.
            ('S', np.int_), ('I', np.int_), ('R', np.int_),
            # Model parameters.
            ('R0', np.float_), ('gamma', np.float_),
            # Next event details.
            ('next_event', np.int_), ('next_time', np.float_),
        ]

    def can_smooth(self):
        """
        The fields that can be smoothed by the post-regularisation filter.
        """
        # Return the continuous model parameters.
        return {'R0', 'gamma'}

    def init(self, ctx, vec):
        """
        Initialise the state vectors.
        """
        # Initialise the model state variables.
        population = ctx.settings['model']['population_size']
        vec['S'] = population - 1
        vec['I'] = 1
        vec['R'] = 0

        # Initialise the model parameters.
        prior = ctx.data['prior']
        vec['R0'] = prior['R0']
        vec['gamma'] = prior['gamma']

        # Select the first event for event particle.
        vec['next_time'] = 0
        vec['next_event'] = 0
        self.select_next_event(ctx, vec, stop_time=0)

    def update(self, ctx, time, dt, is_forecast, prev, curr):
        """
        Update the state vectors to account for all events that occur up to,
        and including, ``time``.
        """
        # Copy the state vectors, and update the current state.
        curr[:] = prev[:]

        # Simulate events for active particles.
        active = self.active_particles(curr, time)
        while any(active):
            # Simulate infection events.
            infections = np.logical_and(active, curr['next_event'] == 0)
            curr['S'][infections] -= 1
            curr['I'][infections] += 1

            # Simulate recovery events.
            recoveries = np.logical_and(active, curr['next_event'] == 1)
            curr['I'][recoveries] -= 1
            curr['R'][recoveries] += 1

            # Identifying which particles are still active.
            self.select_next_event(ctx, curr, stop_time=time)
            active = self.active_particles(curr, time)

    def active_particles(self, vec, stop_time):
        """
        Return a Boolean array that identifies the particles whose most recent
        event occurred no later than ``stop_time``.
        """
        return np.logical_and(
            vec['next_time'] <= stop_time,
            vec['I'] > 0,
        )

    def select_next_event(self, ctx, vec, stop_time):
        """
        Calculate the next event time and event type for each active particle.
        """
        active = self.active_particles(vec, stop_time)
        if not any(active):
            return

        # Extract state variables and parameters for the active particles.
        S = vec['S'][active]
        I = vec['I'][active]
        R0 = vec['R0'][active]
        gamma = vec['gamma'][active]
        N = ctx.settings['model']['population_size']

        # Calculate the mean rate of infection and recovery events.
        s_to_i_rate = R0 * gamma * S * I / (N - 1)
        i_to_r_rate = gamma * I
        rate_sum = s_to_i_rate + i_to_r_rate

        # Select the time of the next event.
        rng = ctx.component['random']['model']
        dt = - np.log(rng.random(S.shape)) / rate_sum
        vec['next_time'][active] += dt

        # Select the event type: False for infection and True for recovery.
        threshold = rng.random(S.shape) * rate_sum
        recovery_event = threshold > s_to_i_rate
        vec['next_event'][active] = recovery_event.astype(np.int_)

4.16. Implement a discrete-time Markov chain (CTMC) model

The pypfilt.examples.sir.SirDtmc model implementation is shown below.

class SirDtmc(Model):
    """
    A discrete-time Markov chain implementation of the SIR model.

    The model settings must include the following keys:

    * ``population_size``: The number of individuals in the population.
    """

    def field_types(self, ctx):
        """
        Define the state vector structure.
        """
        return [
            # Model state variables.
            ('S', np.int_), ('I', np.int_), ('R', np.int_),
            # Model parameters.
            ('R0', np.float_), ('gamma', np.float_),
        ]

    def can_smooth(self):
        """
        The fields that can be smoothed by the post-regularisation filter.
        """
        # Return the continuous model parameters.
        return {'R0', 'gamma'}

    def init(self, ctx, vec):
        """
        Initialise the state vectors.
        """
        # Initialise the model state variables.
        population = ctx.settings['model']['population_size']
        vec['S'] = population - 1
        vec['I'] = 1
        vec['R'] = 0

        # Initialise the model parameters.
        prior = ctx.data['prior']
        vec['R0'] = prior['R0']
        vec['gamma'] = prior['gamma']

    def update(self, ctx, time, dt, is_forecast, prev, curr):
        """
        Update the state vectors.
        """
        rng = ctx.component['random']['model']
        beta = prev['R0'] * prev['gamma']
        denom = ctx.settings['model']['population_size'] - 1

        # Calculate the rate at which *an individual* leaves S.
        s_out_rate = dt * beta * prev['I'] / denom
        # Select the number of infections.
        s_out = rng.binomial(prev['S'], - np.expm1(- s_out_rate))

        # Calculate the rate at which *an individual* leaves I.
        i_out_rate = dt * prev['gamma']
        # Select the number of recoveries.
        i_out = rng.binomial(prev['I'], - np.expm1(- i_out_rate))

        # Update the state variables.
        curr['S'] = prev['S'] - s_out
        curr['I'] = prev['I'] + s_out - i_out
        curr['R'] = prev['R'] + i_out

        # Copy the model parameters.
        curr['R0'] = prev['R0']
        curr['gamma'] = prev['gamma']

4.17. Implement an ordinary differential equation (ODE) model

The pypfilt.examples.sir.SirOdeEuler model implementation is shown below. For simplicity, it uses the forward Euler method.

class SirOdeEuler(Model):
    """
    An ordinary differential equation implementation of the SIR model, which
    uses the forward Euler method.

    The model settings must include the following keys:

    * ``population_size``: The number of individuals in the population.
    """

    def field_types(self, ctx):
        """
        Define the state vector structure.
        """
        return [
            # Model state variables.
            ('S', np.float_), ('I', np.float_), ('R', np.float_),
            # Model parameters.
            ('R0', np.float_), ('gamma', np.float_),
        ]

    def can_smooth(self):
        """
        The fields that can be smoothed by the post-regularisation filter.
        """
        # Return the continuous model parameters.
        return {'R0', 'gamma'}

    def init(self, ctx, vec):
        """
        Initialise the state vectors.
        """
        # Initialise the model state variables.
        population = ctx.settings['model']['population_size']
        vec['S'] = population - 1
        vec['I'] = 1
        vec['R'] = 0

        # Initialise the model parameters.
        prior = ctx.data['prior']
        vec['R0'] = prior['R0']
        vec['gamma'] = prior['gamma']

    def update(self, ctx, time, dt, is_forecast, prev, curr):
        """
        Update the state vectors.
        """
        # Calculate the flow rates out of S and I.
        beta = prev['R0'] * prev['gamma']
        N = ctx.settings['model']['population_size']
        s_out = dt * beta * prev['I'] * prev['S'] / N
        i_out = dt * prev['gamma'] * prev['I']

        # Update the state variables.
        curr['S'] = prev['S'] - s_out
        curr['I'] = prev['I'] + s_out - i_out
        curr['R'] = prev['R'] + i_out

        # Copy the model parameters.
        curr['R0'] = prev['R0']
        curr['gamma'] = prev['gamma']

The pypfilt.examples.sir.SirOdeRk model implementation is shown below. It uses a SciPy initial value problem (IVP) solver for ODEs.

Note

The SirOdeRk model converts the structured state vector array into a 1-dimensional float array for use with the solver. The right-hand side function (d_dt) converts this 1-dimensional array into a structured array for convenience, and then converts the output array back into a 1-dimensional float array.

Note

The SirOdeRk model requires fewer time-steps (time.steps_per_unit) than the SirOde model, because the IVP solver can divide each time-step into as many smaller steps as required.

class SirOdeRk(Model):
    """
    An ordinary differential equation implementation of the SIR model, which
    uses the explicit Runge-Kutta method of order 5(4).

    The model settings must include the following keys:

    * ``population_size``: The number of individuals in the population.
    """

    def __init__(self):
        # Record the state vector data type.
        fields = self.field_types(ctx=None)
        self.dtype = np.dtype(fields)
        self.num_fields = len(fields)

    def field_types(self, ctx):
        """
        Define the state vector structure.
        """
        return [
            # Model state variables.
            ('S', np.float_), ('I', np.float_), ('R', np.float_),
            # Model parameters.
            ('R0', np.float_), ('gamma', np.float_),
        ]

    def can_smooth(self):
        """
        The fields that can be smoothed by the post-regularisation filter.
        """
        # Return the continuous model parameters.
        return {'R0', 'gamma'}

    def init(self, ctx, vec):
        """
        Initialise the state vectors.
        """
        # Initialise the model state variables.
        population = ctx.settings['model']['population_size']
        vec['S'] = population - 1
        vec['I'] = 1
        vec['R'] = 0

        # Initialise the model parameters.
        prior = ctx.data['prior']
        vec['R0'] = prior['R0']
        vec['gamma'] = prior['gamma']

    def d_dt(self, t, xt, population):
        """
        The right-hand side of the system.
        """
        xt = xt.view(self.dtype)
        s_out = xt['R0'] * xt['gamma'] * xt['I'] * xt['S'] / population
        i_out = xt['gamma'] * xt['I']
        d_dt = np.zeros(xt.shape, dtype=xt.dtype)
        d_dt['S'] = - s_out
        d_dt['I'] = s_out - i_out
        d_dt['R'] = i_out
        return d_dt.view((np.float_, self.num_fields)).reshape(-1)

    def update(self, ctx, time, dt, is_forecast, prev, curr):
        """
        Update the state vectors.
        """
        population = ctx.settings['model']['population_size']
        prev_xt = prev.view((np.float_, self.num_fields)).reshape(-1)
        result = scipy.integrate.solve_ivp(
            self.d_dt, (time - dt, time), prev_xt, args=(population,),
            method='RK45')
        curr_xt = result.y[:, -1]
        curr[:] = curr_xt.view(self.dtype)

4.18. Implement a stochastic differential equation (SDE) model

The pypfilt.examples.sir.SirSde model implementation is shown below. It uses the Euler-Maruyama method.

class SirSde(Model):
    """
    A stochastic differential equation implementation of the SIR model.

    The model settings must include the following keys:

    * ``population_size``: The number of individuals in the population.
    """

    def field_types(self, ctx):
        """
        Define the state vector structure.
        """
        return [
            # Model state variables.
            ('S', np.float_), ('I', np.float_), ('R', np.float_),
            # Model parameters.
            ('R0', np.float_), ('gamma', np.float_),
        ]

    def can_smooth(self):
        """
        The fields that can be smoothed by the post-regularisation filter.
        """
        # Return the continuous model parameters.
        return {'R0', 'gamma'}

    def init(self, ctx, vec):
        """
        Initialise the state vectors.
        """
        # Initialise the model state variables.
        population = ctx.settings['model']['population_size']
        vec['S'] = population - 1
        vec['I'] = 1
        vec['R'] = 0

        # Initialise the model parameters.
        prior = ctx.data['prior']
        vec['R0'] = prior['R0']
        vec['gamma'] = prior['gamma']

    def update(self, ctx, time, dt, is_forecast, prev, curr):
        """
        Update the state vectors.
        """
        rng = ctx.component['random']['model']
        beta = prev['R0'] * prev['gamma']
        N = ctx.settings['model']['population_size']
        size = prev.shape

        # Calculate the mean flows out of S and I.
        s_mean = dt * beta * prev['I'] * prev['S'] / N
        i_mean = dt * prev['gamma'] * prev['I']
        # Sample the stochastic term for each flow.
        s_stoch = np.sqrt(s_mean) * rng.normal(size=size)
        i_stoch = np.sqrt(i_mean) * rng.normal(size=size)
        # Calculate the stochastic flows out of S and I, ensuring that all
        # compartments remain non-negative.
        s_out = np.clip(s_mean + s_stoch, a_min=0, a_max=prev['S'])
        i_out = np.clip(i_mean + i_stoch, a_min=0, a_max=prev['I'])

        # Update the state variables.
        curr['S'] = prev['S'] - s_out
        curr['I'] = prev['I'] + s_out - i_out
        curr['R'] = prev['R'] + i_out

        # Copy the model parameters.
        curr['R0'] = prev['R0']
        curr['gamma'] = prev['gamma']

4.19. Implement an observation model that relies on past state

The pypfilt.examples.sir.SirObs model implementation is shown below. The expected value is derived from the decrease in susceptible individuals over the observation period \(\Delta\).

class SirObs(Univariate):
    r"""
    A binomial observation model for the example SIR models.

    .. math::

       \mathcal{L}(y_t \mid x_t) &\sim B(n, p)

       n &= S(t-\Delta) - S(t)

    :param obs_unit: A descriptive name for the data.
    :param settings: The observation model settings dictionary.

    The settings dictionary should contain the following keys:

    * ``observation_period``: The observation period :math:`\Delta`.

    For example, for daily observations that capture 80% of new infections:

    .. code-block:: toml

       [observations.cases]
       model = "pypfilt.examples.sir.SirObs"
       observation_period = 1
       parameters.p = 0.8
       descriptor.format.p = "0.2f"
       descriptor.name.p = "p"
    """

    def __init__(self, obs_unit, settings):
        super().__init__(obs_unit)
        self.period = settings['observation_period']

    def new_infections(self, ctx, snapshot):
        r"""
        Return the number of new infections :math:`S(t-\Delta) - S(t)` that
        occurred during the observation period :math:`\Delta` for each
        particle.
        """
        prev = snapshot.back_n_units(self.period)
        new_infs = prev['state_vec']['S'] - snapshot.vec['state_vec']['S']
        # Round continuous values to the nearest integer.
        if not np.issubdtype(new_infs.dtype, np.int_):
            new_infs = new_infs.round().astype(np.int_)
        if np.any(new_infs < 0):
            raise ValueError('Negative number of new infections')
        return new_infs

    def distribution(self, ctx, snapshot, ixs=None):
        """
        Return the observation distribution for each particle.
        """
        prob = ctx.settings['observations'][self.unit]['parameters']['p']
        infections = self.new_infections(ctx, snapshot)
        if ixs is not None:
            infections = infections[ixs]
        return scipy.stats.binom(n=infections, p=prob)

4.20. Record only a subset of simulation model time-steps

For simulation models that require very small time-steps, it may be desirable to avoid recording the particle states at each time-step. This can be achieved by using a large time-step and dividing each time-step into a number of “mini-steps” that will not be recorded, by applying the pypfilt.model.ministeps() decorator to the simulation model’s update() method.

This decorator can be used in two ways.

  1. Providing a default number of mini-steps, which can be overridden by the scenario settings:

    class TestModelPredef(pypfilt.model.Model):
        """
        A simple monotonic deterministic process, where the number of mini-steps
        is predefined by the decorator.
        """
    
        def field_types(self, ctx):
            return [('x', np.float_)]
    
        def can_smooth(self):
            return set()
    
        def init(self, ctx, vec):
            vec['x'] = ctx.data['prior']['x']
    
        @pypfilt.model.ministeps(50)
        def update(self, ctx, step_date, dt, is_fs, prev, curr):
            rnd = ctx.component['random']['model']
            curr['x'] = prev['x'] + dt
            logger = logging.getLogger(__name__)
            logger.debug('Incrementing x by {}'.format(dt))
    
  2. Requiring the number of mini-steps to be defined in the simulation settings, by providing no default number of mini-steps:

    class TestModelSetting(pypfilt.model.Model):
        """
        A simple monotonic deterministic process, where the number of mini-steps
        must be defined in the scenario settings.
        """
    
        def field_types(self, ctx):
            return [('x', np.float_)]
    
        def can_smooth(self):
            return set()
    
        def init(self, ctx, vec):
            vec['x'] = ctx.data['prior']['x']
    
        @pypfilt.model.ministeps()
        def update(self, ctx, step_date, dt, is_fs, prev, curr):
            rnd = ctx.component['random']['model']
            curr['x'] = prev['x'] + dt
            logger = logging.getLogger(__name__)
            logger.debug('Incrementing x by {}'.format(dt))
    

In both cases, the number of mini-steps can be specified in the scenario definition:

[time]
mini_steps_per_step = 100