Source code for pyunicorn.funcnet.coupling_analysis

# This file is part of pyunicorn.
# Copyright (C) 2008--2024 Jonathan F. Donges and pyunicorn authors
# URL: <https://www.pik-potsdam.de/members/donges/software-2/software>
# License: BSD (3-clause)
#
# Please acknowledge and cite the use of this software and its authors
# when results are used in publications or published elsewhere.
#
# You can use the following reference:
# J.F. Donges, J. Heitzig, B. Beronov, M. Wiedermann, J. Runge, Q.-Y. Feng,
# L. Tupikina, V. Stolbova, R.V. Donner, N. Marwan, H.A. Dijkstra,
# and J. Kurths, "Unified functional network and nonlinear time series analysis
# for complex systems science: The pyunicorn package"

"""
Provides classes for analyzing spatially embedded complex networks, handling
multivariate data.
Written by Jakob Runge.
"""

import numpy                        # array object and fast numerics
from scipy import special, linalg   # special math functions

# import mpi                          # parallelized computations

from ..core._ext.types import to_cy, LAG, FIELD, \
    INT16TYPE, INT32TYPE, INT64TYPE
from ._ext.numerics import _symmetrize_by_absmax, _cross_correlation_max, \
    _cross_correlation_all, _get_nearest_neighbors


#
#  Define class Coupling Analysis
#

[docs] class CouplingAnalysis: """ Contains methods to calculate coupling matrices from large arrays of scalar time series. Comprises linear and information-theoretic measures, lagged and directed couplings. """ # # Definitions of internal methods #
[docs] def __init__(self, data, silence_level=0): """ Initialize an instance of CouplingAnalysis from data array. :type data: multidimensional numpy array :arg data: The time series array with time in first dimension. :type silence_level: int >= 0 :arg silence_level: The higher, the less progress info is output. """ self.silence_level = silence_level """(int>=0) higher -> less progress info""" # Flatten array along spatial dimensions to allow # for more convinient indexing self.n_time = data.shape[0] self.data = data.reshape(self.n_time, -1) self.N = self.data.shape[1] # precalculation of p*log(p) needed for entropies self.plogp = None
[docs] def __str__(self): """Return a string representation of the CouplingAnalysis object.""" return (f'CouplingAnalysis: {self.N} variables, ' f'{self.n_time} timesteps.')
[docs] @staticmethod def test_data(): """ Return example test data as discussed in pyunicorn description paper. """ numpy.random.seed(42) noise = numpy.random.randn(1000, 4) data = noise for t in range(2, 1000): data[t, 0] = 0.8 * data[t-1, 0] + noise[t, 0] data[t, 1] = 0.8 * data[t-1, 1] + 0.5 * data[t-2, 0] + noise[t, 1] data[t, 2] = 0.7 * data[t-1, 0] + noise[t, 2] data[t, 3] = 0.7 * data[t-2, 0] + noise[t, 3] return data
[docs] def symmetrize_by_absmax(self, similarity_matrix, lag_matrix): """ Returns symmetrized similarity matrix. Computes the largest absolute value for each pair (i,j) and (j,i) and returns the in-place changed matrices of measures and lags. A negative lag for an entry (i,j) in the lag_matrix then indicates a 'direction' j --> i regarding the peak of the lag function, and vice versa for a positive lag. **Example:** >>> coup_ana = CouplingAnalysis(CouplingAnalysis.test_data()) >>> similarity_matrix, lag_matrix = coup_ana.cross_correlation( ... tau_max=2) >>> r((similarity_matrix, lag_matrix)) (array([[ 1. , 0.698 , 0.7788, 0.7535], [ 0.4848, 1. , 0.4507, 0.52 ], [ 0.6219, 0.5704, 1. , 0.5996], [ 0.4833, 0.5503, 0.5002, 1. ]]), array([[0, 2, 1, 2], [0, 0, 0, 0], [0, 2, 0, 1], [0, 2, 0, 0]])) >>> r(coup_ana.symmetrize_by_absmax(similarity_matrix, lag_matrix)) (array([[ 1. , 0.698 , 0.7788, 0.7535], [ 0.698 , 1. , 0.5704, 0.5503], [ 0.7788, 0.5704, 1. , 0.5996], [ 0.7535, 0.5503, 0.5996, 1. ]]), array([[ 0, 2, 1, 2], [-2, 0, -2, -2], [-1, 2, 0, 1], [-2, 2, -1, 0]])) :type similarity_matrix: array-like [float] :arg similarity_matrix: array-like [node, node] matrix of similarity estimates :type lag_matrix: array-like [int>=0] :arg lag_matrix: array-like [node, node] matrix of lags :rtype: tuple of arrays :returns: the value at the absolute maximum and the (pos or neg) lag. """ return _symmetrize_by_absmax(to_cy(similarity_matrix, FIELD), to_cy(lag_matrix, LAG), self.N)
# # Define methods to estimate similarity measures #
[docs] def cross_correlation(self, tau_max=0, lag_mode='max'): r""" Return cross correlation between all pairs of nodes. Two lag-modes are available (default: lag_mode='max'): lag_mode = 'all': Return 3-dimensional array of lagged cross correlations between all pairs of nodes. An entry :math:`(i, j, \tau)` corresponds to :math:`\rho(X^i_t-\tau, X^j_t)` for positive lags tau, i.e., the direction i --> j for :math:`\tau \ne 0`. lag_mode = 'max': Return matrix of absolute maxima and corresponding lags of lagged cross correlation (CC) between all pairs of nodes. Returns two usually asymmetric matrices of CC values and lags: In each matrix, an entry :math:`(i, j)` corresponds to the (positive or negative) value and lag, respectively, at absolute maximum of :math:`\rho(X^i_t-\tau, X^j_t)` for positive lags tau, i.e., the direction i --> j for :math:`\tau > 0`. The matrices are, thus, asymmetric. The function :meth:`.symmetrize_by_absmax` can be used to obtain a symmetric matrix. **Example:** >>> coup_ana = CouplingAnalysis(CouplingAnalysis.test_data()) >>> similarity_matrix, lag_matrix = coup_ana.cross_correlation( ... tau_max=5, lag_mode='max') >>> r((similarity_matrix, lag_matrix)) (array([[ 1. , 0.757 , 0.779 , 0.7536], [ 0.4847, 1. , 0.4502, 0.5197], [ 0.6219, 0.5844, 1. , 0.5992], [ 0.4827, 0.5509, 0.4996, 1. ]]), array([[0, 4, 1, 2], [0, 0, 0, 0], [0, 3, 0, 1], [0, 2, 0, 0]])) :type tau_max: int [int>=0] :arg tau_max: maximum lag of cross correlation lag function. :type lag_mode: str [('max'|'all')] :arg lag_mode: lag-mode of cross correlations to return. :rtype: 3D-array or tuple of matrices :returns: all-lag array or matrices of value and lag at the absolute maximum. """ data = self.data T, N = data.shape # Sanity checks if not isinstance(data, numpy.ndarray): raise TypeError(f"data is of type {type(data)}, " "must be numpy.ndarray") if N > T: print(f"Warning: data.shape = {data.shape}," " is it of shape (observations, variables) ?") assert numpy.isnan(data).sum() == 0, "NaNs in the data" assert tau_max >= 0, f"{tau_max =}" assert lag_mode in ['max', 'all'], f"{lag_mode =}" # Normalize time series to zero mean and unit variance for all lags corr_range = T - tau_max array = numpy.empty((tau_max + 1, N, corr_range), dtype=FIELD) for t in range(tau_max + 1): # Remove mean value from time series at each node array[t] = ( data[t:t+corr_range, :] - data[t:t+corr_range, :].mean(axis=0).reshape(1, N)).T # Normalize the variance of anomalies to one array[t] /= array[t].std(axis=1).reshape(N, 1) # Correct for nodes with zero variance in their time series array[t][numpy.isnan(array[t])] = 0 if lag_mode == 'max': return _cross_correlation_max( to_cy(array, FIELD), N, tau_max, corr_range) elif lag_mode == 'all': return _cross_correlation_all( to_cy(array, FIELD), N, tau_max, corr_range) else: return None
[docs] def mutual_information(self, tau_max=0, estimator='knn', knn=10, bins=6, lag_mode='max'): r""" Return mutual information (MI) between all pairs of nodes. Three estimators are available: estimator = 'knn' (Recommended): Based on k-nearest-neighbors [Kraskov2004]_, version 1 in their paper. Larger k have smaller variance, but larger (typically negative) bias, and vice versa. estimator = 'binning': Binning estimator based on equal-quantile binning. estimator = 'gauss': Captures only linear part of association. Essentially estimates a transformed partial correlation. Two lag-modes are available (default: lag_mode='max'): lag_mode = 'all': Return 3-dimensional array of lagged MI between all pairs of nodes. An entry :math:`(i, j, \tau)` corresponds to :math:`I(X^i_t-\tau, X^j_t)` for positive lags tau, i.e., the direction i --> j for :math:`\tau \ne 0`. lag_mode = 'max': Return matrix of absolute maxima and corresponding lags of lagged MI between all pairs of nodes. Returns two usually asymmetric matrices of MI values and lags: In each matrix, an entry :math:`(i, j)` corresponds to the value and lag, respectively, at absolute maximum of :math:`I(X^i_t-\tau, X^j_t)` for positive lags tau, i.e., the direction i --> j for :math:`\tau > 0`. The matrices are, thus, asymmetric. The function :meth:`.symmetrize_by_absmax` can be used to obtain a symmetric matrix. Reference: [Kraskov2004]_ **Example:** >>> coup_ana = CouplingAnalysis(CouplingAnalysis.test_data()) >>> similarity_matrix, lag_matrix = coup_ana.mutual_information( ... tau_max=5, knn=10, estimator='knn') >>> r(similarity_matrix) array([[ 4.6505, 0.4387, 0.4652, 0.4126], [ 0.147 , 4.6505, 0.1065, 0.1639], [ 0.2483, 0.2126, 4.6505, 0.2204], [ 0.1209, 0.199 , 0.1453, 4.6505]]) >>> lag_matrix array([[0, 4, 1, 2], [0, 0, 0, 0], [0, 2, 0, 1], [0, 2, 0, 0]], dtype=int8) :type tau_max: int [int>=0] :arg tau_max: maximum lag of MI lag function. :type knn: int [int>=1] :arg knn: nearest-neighbor MI estimation parameter. (default: 10) :type bins: int [int>=2] :arg bins: binning MI estimation parameter. (default: 6) :type estimator: str [('knn'|'binning'|'gauss')] :arg estimator: MI estimator. (default: 'knn') :type lag_mode: str [('max'|'all')] :arg lag_mode: lag-mode of MI to return. :rtype: 3D-array or tuple of matrices :returns: all-lag array or matrices of value and lag at the absolute maximum. """ data = self.data T, N = data.shape # Sanity checks if not isinstance(data, numpy.ndarray): raise TypeError(f"data is of type {type(data)}, " "must be numpy.ndarray") if N > T: print(f"Warning: data.shape = {data.shape}," " is it of shape (observations, variables) ?") if T < 500: print(f"Warning: T = {T} ," " unreliable estimation using MI estimator") assert numpy.isnan(data).sum() == 0, "NaNs in the data" assert tau_max >= 0, f"{tau_max =}" if estimator == 'knn': assert 1 <= knn <= T/2., f"{knn =}" if lag_mode == 'max': similarity_matrix = numpy.ones((N, N), dtype=FIELD) lag_matrix = numpy.zeros((N, N), dtype=LAG) elif lag_mode == 'all': lagfuncs = numpy.zeros((N, N, tau_max+1), dtype=FIELD) if estimator == 'binning': self.plogp = self.create_plogp(T) for i in range(N): for j in range(N): maximum = 0. lag_at_max = 0 for tau in range(tau_max + 1): X = [(i, -tau)] Y = [(j, 0)] Z = [] XYZ = X + Y + Z dim = len(XYZ) max_lag = tau_max array = numpy.zeros((dim, T - max_lag)) for d, node in enumerate(XYZ): var, lag = node array[d, :] = data[max_lag + lag: T + lag, var] if estimator == 'knn': xyz = numpy.array([0, 1]) k_xz, k_yz, k_z = self.get_nearest_neighbors( array=array, xyz=xyz, k=knn, standardize=True) ixy_z = (special.digamma(knn) + (- special.digamma(k_xz) - special.digamma(k_yz) + special.digamma(k_z)).mean()) elif estimator == 'binning': symb_array = self._quantile_bin_array(array, bins=bins) # High-dimensional Histogram hist = self.bincount_hist(symb_array) # Entropies by use of vectorized function plogp hxyz = (-(self.plogp(hist)).sum() + self.plogp(T))/float(T) hxz = (-(self.plogp(hist.sum(axis=1))).sum() + self.plogp(T))/float(T) hyz = (-(self.plogp(hist.sum(axis=0))).sum() + self.plogp(T))/float(T) hz = (-(self.plogp(hist.sum(axis=0).sum(axis=0))).sum() + self.plogp(T))/float(T) ixy_z = hxz + hyz - hz - hxyz elif estimator == 'gauss': # Standardize array -= array.mean(axis=1).reshape(dim, 1) array /= array.std(axis=1).reshape(dim, 1) if numpy.isnan(array).sum() != 0: raise ValueError("nans after standardizing, \ possibly constant array!") x = array[0, :] y = array[1, :] ixy_z = self._par_corr_to_cmi( numpy.dot(x, y) / numpy.sqrt( numpy.dot(x, x) * numpy.dot(y, y))) if lag_mode == 'max': if ixy_z > maximum: maximum = ixy_z lag_at_max = tau elif lag_mode == 'all': lagfuncs[i, j, tau] = ixy_z if lag_mode == 'max': similarity_matrix[i, j] = maximum lag_matrix[i, j] = lag_at_max if lag_mode == 'max': return similarity_matrix, lag_matrix elif lag_mode == 'all': return lagfuncs else: return None
[docs] def information_transfer(self, tau_max=0, estimator='knn', knn=10, past=1, cond_mode='ity', lag_mode='max'): r""" Return bivariate information transfer between all pairs of nodes. Two condition modes of information transfer are available as described in [Runge2012b]_. Information transfer to Y (ITY): .. math:: I(X^i_t-\tau, X^j_t | X^j_t-1, ...,X^j_t-past) Momentary information transfer (MIT): .. math:: I(X^i_t-\tau, X^j_t | X^j_t-1, ...,X^j_t-past, X^i_t-\tau-1, ...,X^j_t-\tau-past) Two estimators are available: estimator = 'knn' (Recommended): Based on k-nearest-neighbors [Kraskov2004]_, version 1 in their paper. Larger k have smaller variance, but larger (typically negative) bias, and vice versa. estimator = 'gauss': Captures only linear part of association. Essentially estimates a transformed partial correlation. Two lag-modes are available (default: lag_mode='max'): lag_mode = 'all': Return 3-dimensional array of lag-functions between all pairs of nodes. An entry :math:`(i, j, \tau)` corresponds to :math:`I(X^i_t-\tau, X^j_t | ...)` for positive lags tau, i.e., the direction i --> j for :math:`\tau \ne 0`. lag_mode = 'max': Return matrix of absolute maxima and corresponding lags of lag-functions between all pairs of nodes. Returns two usually asymmetric matrices of values and lags: In each matrix, an entry :math:`(i, j)` corresponds to the value and lag, respectively, at absolute maximum of :math:`I(X^i_t-\tau, X^j_t | ...)` for positive lags tau, i.e., the direction i --> j for :math:`\tau > 0`. The matrices are, thus, asymmetric. The function :meth:`.symmetrize_by_absmax` can be used to obtain a symmetric matrix. **Example:** >>> coup_ana = CouplingAnalysis(CouplingAnalysis.test_data()) >>> similarity_matrix, lag_matrix = coup_ana.information_transfer( ... tau_max=5, estimator='knn', knn=10) >>> r((similarity_matrix, lag_matrix)) (array([[ 0. , 0.1544, 0.3261, 0.3047], [ 0.0218, 0. , 0.0394, 0.0976], [ 0.0134, 0.0663, 0. , 0.1502], [ 0.0066, 0.0694, 0.0401, 0. ]]), array([[0, 2, 1, 2], [5, 0, 0, 0], [5, 1, 0, 1], [5, 0, 0, 0]])) :type tau_max: int [int>=0] :arg tau_max: maximum lag of ITY lag function. :type past: int [int>=1] :arg past: maximum lag of past history. :type knn: int [int>=1] :arg knn: nearest-neighbor ITY estimation parameter. (default: 10) :type bins: int [int>=2] :arg bins: binning ITY estimation parameter. (default: 6) :type estimator: str [('knn'|'gauss')] :arg estimator: ITY estimator. (default: 'knn') :type cond_mode: str [('ity'|'mit')] :arg cond_mode: condition mode. (default: 'ity') :type lag_mode: str [('max'|'all')] :arg lag_mode: lag-mode of ITY to return. :rtype: 3D-array or tuple of matrices :returns: all-lag array or matrices of value and lag at the absolute maximum. """ data = self.data T, N = data.shape # Sanity checks if not isinstance(data, numpy.ndarray): raise TypeError(f"data is of type {type(data)}," " must be numpy.ndarray") if N > T: print(f"Warning: data.shape = {data.shape}," " is it of shape (observations, variables) ?") if estimator == 'knn' and T < 500: print(f"Warning: T = {T} ," " unreliable estimation using knn-estimator") if numpy.isnan(data).sum() != 0: raise ValueError("NaNs in the data") if tau_max < 0: raise ValueError(f"tau_max = {tau_max}, but 0 <= tau_max") if estimator == 'knn': if knn > T/2. or knn < 1: raise ValueError(f"knn = {knn}, should be between 1 and T/2") if lag_mode == 'max': similarity_matrix = numpy.ones((N, N), dtype=FIELD) lag_matrix = numpy.zeros((N, N), dtype=LAG) elif lag_mode == 'all': lagfuncs = numpy.zeros((N, N, tau_max+1), dtype=FIELD) for i in range(N): for j in range(N): maximum = 0. lag_at_max = 0 for tau in range(tau_max + 1): X = [(i, -tau)] Y = [(j, 0)] if cond_mode == 'ity': Z = [(j, -p) for p in range(1, past + 1)] elif cond_mode == 'mit': Z = [(j, -p) for p in range(1, past + 1)] Z += [(i, -tau - p) for p in range(1, past + 1)] XYZ = X + Y + Z dim = len(XYZ) max_lag = tau_max + past array = numpy.zeros((dim, T - max_lag)) for d, node in enumerate(XYZ): var, lag = node array[d, :] = data[max_lag + lag: T + lag, var] if estimator == 'knn': xyz = numpy.array([0, 1]) k_xz, k_yz, k_z = self.get_nearest_neighbors( array=array, xyz=xyz, k=knn, standardize=True) ixy_z = (special.digamma(knn) + (- special.digamma(k_xz) - special.digamma(k_yz) + special.digamma(k_z)).mean()) elif estimator == 'gauss': if numpy.isnan(array).sum() != 0: raise ValueError("nans in the array!") # Standardize array -= array.mean(axis=1).reshape(dim, 1) array /= array.std(axis=1).reshape(dim, 1) if numpy.isnan(array).sum() != 0: raise ValueError("nans after standardizing, \ possibly constant array!") x = array[0, :] y = array[1, :] if len(array) > 2: confounds = array[2:, :] ortho_confounds = linalg.qr(confounds.T.copy(), mode='economic')[0].T x -= numpy.dot(numpy.dot(ortho_confounds, x), ortho_confounds) y -= numpy.dot(numpy.dot(ortho_confounds, y), ortho_confounds) ixy_z = self._par_corr_to_cmi( numpy.dot(x, y) / numpy.sqrt( numpy.dot(x, x) * numpy.dot(y, y))) if lag_mode == 'max': if ixy_z > maximum: maximum = ixy_z lag_at_max = tau elif lag_mode == 'all': lagfuncs[i, j, tau] = ixy_z if lag_mode == 'max': similarity_matrix[i, j] = maximum lag_matrix[i, j] = lag_at_max if lag_mode == 'max': similarity_matrix[range(N), range(N)] = 0. elif lag_mode == 'all': lagfuncs[range(N), range(N), 0.] = 0. if lag_mode == 'max': return similarity_matrix, lag_matrix elif lag_mode == 'all': return lagfuncs else: return None
# # Define helper methods #
[docs] @staticmethod def _par_corr_to_cmi(par_corr): """ Transformation of partial correlation to conditional mutual information scale using the (multivariate) Gaussian assumption. :type par_corr: float or array :arg par_corr: partial correlation :rtype: float :returns: transformed partial correlation. """ return -0.5*numpy.log(1. - par_corr**2)
[docs] @staticmethod def get_nearest_neighbors(array, xyz, k, standardize=True): """ Returns nearest-neighbors for conditional mutual information estimator. Reference: [Kraskov2004]_ :type array: array (float) :arg array: data array. :type xyz: array [int(0|1|2)] :arg xyz: identifier of X, Y, Z in CMI :type k: int [int>=1] :arg k: nearest-neighbor MI estimation parameter. :type standardize: bool :arg standardize: standardize array before estimation. (default: True) :rtype: tuple of arrays :returns: nearest neighbors for each sample point. """ dim, T = array.shape if standardize: # Standardize array = array.astype(FIELD) array -= array.mean(axis=1).reshape(dim, 1) array /= array.std(axis=1).reshape(dim, 1) # If the time series is constant, return nan rather than raising # Exception if numpy.isnan(array).sum() != 0: raise ValueError("nans after standardizing, possibly \ constant array!") # Add noise to destroy ties... array += 1E-10 * numpy.random.rand(dim, T) dim_x = int(numpy.where(xyz == 0)[0][-1] + 1) dim_y = int(numpy.where(xyz == 1)[0][-1] + 1 - dim_x) # dim_z = maxdim - dim_x - dim_y return _get_nearest_neighbors( to_cy(array, FIELD), dim, T, dim_x, dim_y, k)
[docs] @staticmethod def _quantile_bin_array(array, bins=6): """ Returns symbolified array with aequi-quantile binning. This partition results in a uniform distribution of the marginals. :type array: array :arg array: data :type bins: int :arg bins: number of bins :rtype: array :returns: converted data """ dim, T = array.shape # get the bin quantile steps bin_edge = numpy.ceil(T/float(bins)) symb_array = numpy.zeros((dim, T), dtype=INT32TYPE) # get the lower edges of the bins for every time series edges = numpy.sort(array, axis=1)[:, ::bin_edge] bins = edges.shape[1] # This gives the symbolic time series symb_array = (array.reshape(dim, T, 1) >= edges.reshape(dim, 1, bins)).sum(axis=2) - 1 return symb_array
[docs] @staticmethod def bincount_hist(symb_array): """ Computes histogram from symbolic array. :type symb_array: array of integers :arg symb_array: symbolic data :rtype: array :returns: (unnormalized) histogram """ base = int(symb_array.max() + 1) D, T = symb_array.shape # Needed because numpy.bincount cannot process longs assert isinstance(base**D, int) assert base**D*16./8./1024.**3 < 3., ( 'Dimension exceeds 3 GB of necessary memory ' '(change this code line if you got more...)') assert D*base**D < 2**65, ( f'base = {base}, D = {D}: Histogram failed:' ' dimension D*base**D exceeds int64 data type') flathist = numpy.zeros((base**D), dtype=INT16TYPE) multisymb = numpy.zeros(T, dtype=INT64TYPE) for i in range(D): multisymb += symb_array[i, :]*base**i result = numpy.bincount(multisymb) flathist[:len(result)] += result return flathist.reshape(tuple( [base, base] + [base for i in range(D-2)])).T
[docs] @staticmethod def create_plogp(T): """ Precalculation of p*log(p) needed for entropies. :type T: int :arg T: sample length :rtype: array :returns: p*log(p) array from p=1 to p=T """ gfunc = numpy.zeros(T+1) gfunc[1:] = numpy.arange(1, T+1, 1)*numpy.log(numpy.arange(1, T+1, 1)) return numpy.vectorize(lambda t: gfunc[t])