Source code for mc3.utils.utils

# Copyright (c) 2015-2023 Patricio Cubillos and contributors.
# mc3 is open-source software under the MIT license (see LICENSE).

__all__ = [
    'ROOT',
    'parray',
    'saveascii',
    'loadascii',
    'savebin',
    'loadbin',
    'isfile',
    'burn',
    'default_parnames',
    'tex_parameters',
]

from decimal import Decimal
import os

import numpy as np

ROOT = os.path.realpath(os.path.dirname(__file__) + '/../..') + '/'


[docs]def parray(string): """ Convert a string containin a list of white-space-separated (and/or newline-separated) values into a numpy array """ if string == 'None': return None try: # If they can be converted into doubles, do it: return np.asarray(string.split(), np.double) except: # Else, return a string array: return string.split()
[docs]def saveascii(data, filename, precision=8): """ Write (numeric) data to ASCII file. Parameters ---------- data: 1D/2D numeric iterable (ndarray, list, tuple, or combination) Data to be stored in file. filename: String File where to store the arrlist. precision: Integer Maximum number of significant digits of values. Example ------- >>> import numpy as np >>> import mc3.utils as mu >>> a = np.arange(4) * np.pi >>> b = np.arange(4) >>> c = np.logspace(0, 12, 4) >>> outfile = 'delete.me' >>> mu.saveascii([a,b,c], outfile) >>> # This will produce this file: >>> with open(outfile) as f: >>> print(f.read()) 0 0 1 3.1415927 1 10000 6.2831853 2 1e+08 9.424778 3 1e+12 """ # Force it to be a 2D ndarray: data = np.array(data, ndmin=2).T # Save arrays to ASCII file: with open(filename, 'w') as f: for parvals in data: f.write(' '.join(f'{v:9.{precision:d}g}' for v in parvals) + '\n')
[docs]def loadascii(filename): """ Extract data from file and store in a 2D ndarray (or list of arrays if not square). Blank or comment lines are ignored. Parameters ---------- filename: String Name of file containing the data to read. Returns ------- array: 2D ndarray or list See parameters description. """ # Open and read the file: lines = [] for line in open(filename, 'r'): if not line.startswith('#') and line.strip() != '': lines.append(line) # Count number of lines: npars = len(lines) # Extract values: ncolumns = len(lines[0].split()) array = np.zeros((npars, ncolumns), np.double) for i, line in enumerate(lines): array[i] = line.strip().split() array = np.transpose(array) return array
[docs]def savebin(data, filename): """ Write data variables into a numpy npz file. Parameters ---------- data: List of data objects Data to be stored in file. Each array must have the same length. filename: String File where to store the arrlist. Note ---- This wrapper around np.savez() preserves the data type of list and tuple variables when the file is open with loadbin(). Example ------- >>> import mc3.utils as mu >>> import numpy as np >>> # Save list of data variables to file: >>> datafile = 'datafile.npz' >>> indata = [np.arange(4), 'one', np.ones((2,2)), True, [42], (42, 42)] >>> mu.savebin(indata, datafile) >>> # Now load the file: >>> outdata = mu.loadbin(datafile) >>> for data in outdata: >>> print(repr(data)) array([0, 1, 2, 3]) 'one' array([[ 1., 1.], [ 1., 1.]]) True [42] (42, 42) """ # Get the number of elements to determine the key's fmt: ndata = len(data) fmt = len(str(ndata)) key = [] for i, datum in enumerate(data): dkey = 'file{:{}d}'.format(i, fmt) # Encode in the key if a variable is a list or tuple: if isinstance(datum, list): dkey += '_list' elif isinstance(datum, tuple): dkey += '_tuple' elif isinstance(datum, str): dkey += '_str' elif isinstance(datum, bool): dkey += '_bool' key.append(dkey) # Use a dictionary so savez() include the keys for each item: d = dict(zip(key, data)) np.savez(filename, **d)
[docs]def loadbin(filename): """ Read a binary npz array, casting list and tuple variables into their original data types. Parameters ---------- filename: String Path to file containing the data to be read. Return ------ data: List List of objects stored in the file. Example ------- See example in savebin(). """ # Unpack data: npz = np.load(filename) data = [] for key, val in sorted(npz.items()): data.append(val[()]) # Check if val is a str, bool, list, or tuple: if '_' in key: exec('data[-1] = ' + key[key.find('_')+1:] + '(data[-1])') return data
[docs]def isfile(input, iname, log, dtype, unpack=True, not_none=False): """ Check if an input is a file name; if it is, read it. Genereate error messages if it is the case. Parameters ---------- input: Iterable or String The input variable. iname: String Input-variable name. log: File pointer If not None, print message to the given file pointer. dtype: String File data type, choose between 'bin' or 'ascii'. unpack: Bool If True, return the first element of a read file. not_none: Bool If True, throw an error if input is None. """ # Set the loading function depending on the data type: if dtype == 'bin': load = loadbin elif dtype == 'ascii': load = loadascii else: log.error( f"Invalid data type '{dtype}', must be either 'bin' or 'ascii'", ) # Check if the input is None, throw error if requested: if input is None: if not_none: log.error(f"'{iname}' is a required argument") return None # Check that it is an iterable: if not np.iterable(input): log.error(f'{iname} must be an iterable or a file name') # Check if it is a string, a string in a list, or an array: if isinstance(input, str): ifile = input elif isinstance(input[0], str): ifile = input[0] else: return input # It is a file name: if not os.path.isfile(ifile): log.error(f"{iname} file '{ifile}' not found") if unpack: # Unpack (remove outer dimension) if necessary return load(ifile)[0] return load(ifile)
[docs]def burn(Zdict=None, burnin=None, Z=None, zchain=None, sort=True): """ Return a posterior distribution removing the burnin initial iterations of each chain from the input distribution. Parameters ---------- Zdict: dict A dictionary (as in mc3's output) containing a posterior distribution (Z) and number of iterations to burn (burnin). burnin: Integer Number of iterations to remove from the start of each chain. If specified, it overrides value from Zdict. Z: 2D float ndarray Posterior distribution (of shape [nsamples,npars]) to consider if Zdict is None. zchain: 1D integer ndarray Chain indices for the samples in Z (used only of Zdict is None). sort: Bool If True, sort the outputs by chain index. Returns ------- posterior: 2D float ndarray Burned posterior distribution. zchain: 1D integer ndarray Burned zchain array. zmask: 1D integer ndarray Indices that transform Z into posterior. Examples -------- >>> import mc3.utils as mu >>> import numpy as np >>> # Mock a posterior-distribution output: >>> Z = np.expand_dims([0., 1, 10, 20, 30, 11, 31, 21, 12, 22, 32], axis=1) >>> zchain = np.array([-1, -1, 0, 1, 2, 0, 2, 1, 0, 1, 2]) >>> Zdict = {'posterior':Z, 'zchain':zchain, 'burnin':1} >>> # Simply apply burn() into the dict: >>> posterior, zchain, zmask = mu.burn(Zdict) >>> print(posterior[:,0]) [11. 12. 21. 22. 31. 32.] >>> print(zchain) [0 0 1 1 2 2] >>> print(zmask) [ 5 8 7 9 6 10] >>> # Samples were sorted by chain index, but one can prevent with: >>> posterior, zchain, zmask = mu.burn(Zdict, sort=False) >>> print(posterior[:,0]) [11. 31. 21. 12. 22. 32.] >>> # One can also override the burn-in samples: >>> posterior, zchain, zmask = mu.burn(Zdict, burnin=0) >>> print(posterior[:,0]) [10. 11. 12. 20. 21. 22. 30. 31. 32.] >>> # Or apply directly to arrays: >>> posterior, zchain, zmask = mu.burn(Z=Z, zchain=zchain, burnin=1) >>> print(posterior[:,0]) [11. 12. 21. 22. 31. 32.] """ if Zdict is None and (Z is None or zchain is None or burnin is None): raise ValueError( 'Need to input either Zdict or all three of burnin, Z, and zchain') if Zdict is not None: Z = Zdict['posterior'] zchain = Zdict['zchain'] if burnin is None: burnin = Zdict['burnin'] mask = np.zeros_like(zchain, bool) nchains = np.amax(zchain) + 1 for c in range(nchains): mask[np.where(zchain == c)[0][burnin:]] = True if sort: zsort = np.lexsort([zchain]) zmask = zsort[np.where(mask[zsort])] else: zmask = np.where(mask)[0] # Values accepted for posterior stats: posterior = Z[zmask] zchain = zchain[zmask] return posterior, zchain, zmask
[docs]def default_parnames(npars): """ Create an array of parameter names with sequential indices. Parameters ---------- npars: Integer Number of parameters. Results ------- 1D string ndarray of parameter names. """ namelen = len(str(npars))+1 return np.array([f'param{i+1:0{namelen}d}' for i in range(npars)])
[docs]def tex_parameters( values, low_bounds, high_bounds, names=None, significant_digits=2, ): r""" Parse parameter values and +/- confidence intervals as LaTex strings with desired number of significant digits. Parameters ---------- values: 1D iterable of floats Parameter estimate values (e.g., best fits or posterior medians). If a value is None or NaN report the range from low to high. low_bounds: 1D iterable of floats Lower boundary of the parameter credible intervals. high_bounds: 1D iterable of floats Upper boundary of the parameter credible intervals. names: 1D iterable of strings If not None, prepend to each output value the parameter name (including an equal sign in between). significant_digits: Integer How many significant digits to display. Returns ------- tex_values: 1D list of strings String representation of the estimated values as LaTeX text. Examples -------- >>> import mc3.utils as mu >>> values = [9.29185155e+02, -3.25725507e+00, 8.80628658e-01] >>> lo_bounds = [5.29185155e+02, -4.02435791e+00, 6.43578351e-01] >>> hi_bounds = [1.43406714e+03, -2.76718364e+00, 9.87000918e-01] >>> # Default behavior: >>> tex_vals = mu.tex_parameters(values, lo_bounds, hi_bounds) >>> for tex in tex_vals: >>> print(tex) $929.2^{+504.9}_{-400.0}$ $-3.26^{+0.49}_{-0.77}$ $0.88^{+0.11}_{-0.24}$ >>> # Custom significant digits: >>> tex_vals = mu.tex_parameters( >>> values, lo_bounds, hi_bounds, significant_digits=1, >>> ) >>> for tex in tex_vals: >>> print(tex) $929.2^{+504.9}_{-400.0}$ $-3.3^{+0.5}_{-0.8}$ $0.9^{+0.1}_{-0.2}$ >>> # Including the name of the parameters: >>> names = [ >>> r'$T_{\rm iso}$', r'$\log\,X_{\rm H2O}$', r'$\phi_{\rm patchy}$', >>> ] >>> tex_vals = mu.tex_parameters( >>> values, lo_bounds, hi_bounds, names, >>> ) >>> for tex in tex_vals: >>> print(tex) $T_{\rm iso} = 929.2^{+504.9}_{-400.0}$ $\log\,X_{\rm H2O} = -3.26^{+0.49}_{-0.77}$ $\phi_{\rm patchy} = 0.88^{+0.11}_{-0.24}$ """ npars = len(values) tex_values = [] for k in range(npars): value = values[k] if value is None or np.isnan(value): low = low_bounds[k] high = high_bounds[k] dec_place = Decimal(low-high).adjusted() dec = np.clip(significant_digits - 1 - dec_place, 1, 10) tex_value = f'[{low:.{dec}f}, {high:.{dec}f}]' else: low = low_bounds[k] - value high = high_bounds[k] - value decs_low = Decimal(low).adjusted() decs_high = Decimal(high).adjusted() dec_place = np.min((decs_low,decs_high)) dec = np.clip(significant_digits - 1 - dec_place, 1, 10) tex_value = f'{value:>.{dec}f}' tex_low = f'{low:+.{dec}f}' tex_high = f'{high:+.{dec}f}' tex_value += f'^{{{tex_high}}}_{{{tex_low}}}' # Override if parameter is fixed: if low == high: tex_value = f'{value}' # Prepend parameter name if needed, care for math-mode characters: if names is not None: pname = names[k].strip() if pname.startswith('$') and pname.endswith('$'): prefix = f'{pname[:-1]} = ' else: prefix = f'{pname}$ = ' else: prefix = '$' tex_value = f'{prefix}{tex_value}$' tex_values.append(tex_value) return tex_values