Source code for nestcheck.dummy_data

#!/usr/bin/env python
"""
Create dummy nested sampling run data for testing.
"""

import numpy as np
import nestcheck.ns_run_utils
import nestcheck.write_polychord_output


[docs]def get_dummy_thread(nsamples, **kwargs): """Generate dummy data for a single nested sampling thread. Log-likelihood values of points are generated from a uniform distribution in (0, 1), sorted, scaled by logl_range and shifted by logl_start (if it is not -np.inf). Theta values of each point are each generated from a uniform distribution in (0, 1). Parameters ---------- nsamples: int Number of samples in thread. ndim: int, optional Number of dimensions. seed: int, optional If not False, the seed is set with np.random.seed(seed). logl_start: float, optional logl at which thread starts. logl_range: float, optional Scale factor applied to logl values. """ seed = kwargs.pop('seed', False) ndim = kwargs.pop('ndim', 2) logl_start = kwargs.pop('logl_start', -np.inf) logl_range = kwargs.pop('logl_range', 1) if kwargs: raise TypeError('Unexpected **kwargs: {0}'.format(kwargs)) if seed is not False: np.random.seed(seed) thread = {'logl': np.sort(np.random.random(nsamples)) * logl_range, 'nlive_array': np.full(nsamples, 1.), 'theta': np.random.random((nsamples, ndim)), 'thread_labels': np.zeros(nsamples).astype(int)} if logl_start != -np.inf: thread['logl'] += logl_start thread['thread_min_max'] = np.asarray([[logl_start, thread['logl'][-1]]]) return thread
[docs]def get_dummy_run(nthread, nsamples, **kwargs): """Generate dummy data for a nested sampling run. Log-likelihood values of points are generated from a uniform distribution in (0, 1), sorted, scaled by logl_range and shifted by logl_start (if it is not -np.inf). Theta values of each point are each generated from a uniform distribution in (0, 1). Parameters ---------- nthreads: int Number of threads in the run. nsamples: int Number of samples in thread. ndim: int, optional Number of dimensions. seed: int, optional If not False, the seed is set with np.random.seed(seed). logl_start: float, optional logl at which thread starts. logl_range: float, optional Scale factor applied to logl values. """ seed = kwargs.pop('seed', False) ndim = kwargs.pop('ndim', 2) logl_start = kwargs.pop('logl_start', -np.inf) logl_range = kwargs.pop('logl_range', 1) if kwargs: raise TypeError('Unexpected **kwargs: {0}'.format(kwargs)) threads = [] # set seed before generating any threads and do not reset for each thread if seed is not False: np.random.seed(seed) threads = [] for _ in range(nthread): threads.append(get_dummy_thread( nsamples, ndim=ndim, seed=False, logl_start=logl_start, logl_range=logl_range)) # Sort threads in order of starting logl so labels match labels that would # have been given processing a dead points array. N.B. this only works when # all threads have same start_logl threads = sorted(threads, key=lambda th: th['logl'][0]) for i, _ in enumerate(threads): threads[i]['thread_labels'] = np.full(nsamples, i) # Use combine_ns_runs rather than combine threads as this relabels the # threads according to their order return nestcheck.ns_run_utils.combine_threads(threads)
[docs]def get_dummy_dynamic_run(nsamples, **kwargs): """Generate dummy data for a dynamic nested sampling run. Loglikelihood values of points are generated from a uniform distribution in (0, 1), sorted, scaled by logl_range and shifted by logl_start (if it is not -np.inf). Theta values of each point are each generated from a uniform distribution in (0, 1). Parameters ---------- nsamples: int Number of samples in thread. nthread_init: int Number of threads in the inital run (starting at logl=-np.inf). nthread_dyn: int Number of threads in the inital run (starting at randomly chosen points in the initial run). ndim: int, optional Number of dimensions. seed: int, optional If not False, the seed is set with np.random.seed(seed). logl_start: float, optional logl at which thread starts. logl_range: float, optional Scale factor applied to logl values. """ seed = kwargs.pop('seed', False) ndim = kwargs.pop('ndim', 2) nthread_init = kwargs.pop('nthread_init', 2) nthread_dyn = kwargs.pop('nthread_dyn', 3) logl_range = kwargs.pop('logl_range', 1) if kwargs: raise TypeError('Unexpected **kwargs: {0}'.format(kwargs)) init = get_dummy_run(nthread_init, nsamples, ndim=ndim, seed=seed, logl_start=-np.inf, logl_range=logl_range) dyn_starts = list(np.random.choice( init['logl'], nthread_dyn, replace=True)) threads = nestcheck.ns_run_utils.get_run_threads(init) # Seed must be False here so it is not set again for each thread threads += [get_dummy_thread( nsamples, ndim=ndim, seed=False, logl_start=start, logl_range=logl_range) for start in dyn_starts] # make sure the threads have unique labels and combine them for i, _ in enumerate(threads): threads[i]['thread_labels'] = np.full(nsamples, i) run = nestcheck.ns_run_utils.combine_threads(threads) # To make sure the thread labelling is same way it would when # processing a dead points file, tranform into dead points samples = nestcheck.write_polychord_output.run_dead_birth_array(run) return nestcheck.data_processing.process_samples_array(samples)