# This file is part of pyunicorn.
# Copyright (C) 2008--2024 Jonathan F. Donges and pyunicorn authors
# URL: <https://www.pik-potsdam.de/members/donges/software-2/software>
# License: BSD (3-clause)
#
# Please acknowledge and cite the use of this software and its authors
# when results are used in publications or published elsewhere.
#
# You can use the following reference:
# J.F. Donges, J. Heitzig, B. Beronov, M. Wiedermann, J. Runge, Q.-Y. Feng,
# L. Tupikina, V. Stolbova, R.V. Donner, N. Marwan, H.A. Dijkstra,
# and J. Kurths, "Unified functional network and nonlinear time series analysis
# for complex systems science: The pyunicorn package"
"""
Provides classes for analyzing spatially embedded complex networks, handling
multivariate data and generating time series surrogates.
Written by Jakob Runge.
CMSI Method Reference: [Pompe2011]_
"""
# array object and fast numerics
import numpy
#
# Define class CouplingAnalysisPurePython
#
[docs]
class CouplingAnalysisPurePython:
"""
Contains methods to calculate coupling matrices from large arrays
of scalar time series.
Comprises linear and information theoretic measures, lagged
and directed (causal) couplings.
"""
#
# Definitions of internal methods
#
[docs]
def __init__(self, dataarray, only_tri=False, silence_level=0):
"""
Initialize an instance of CouplingAnalysisPurePython.
Possible choices for only_tri:
- "True" will calculate only the upper triangle of the coupling
matrix, excluding the diagonal, assuming symmetry (not for directed
measures)
- "False" will calculate the whole matrix (asymmetry somes from
different integration ranges)
:type dataarray: 4D, 3D or 2D Numpy array [time, index, index] or
[time, index]
:arg dataarray: The time series array with time in first dimension
:arg bool only_tri: Symmetric/asymmetric assumption on coupling matrix.
:arg int silence_level: The inverse level of verbosity of the object.
"""
# only_tri will calculate the upper triangle excluding the diagonal
# only. This assumes stationarity on the time series
self.only_tri = only_tri
# Set silence level
self.silence_level = silence_level
# Flatten observable anomaly array along lon/lat dimension to allow
# for more convinient indexing and transpose the whole array as this
# is faster in loops
if numpy.ndim(dataarray) == 4:
(self.total_time, n_lev, n_lat, n_lon) = dataarray.shape
self.N = n_lev * n_lat * n_lon
self.dataarray = dataarray.reshape(-1, self.N).T.copy()
if numpy.ndim(dataarray) == 3:
(self.total_time, n_lat, n_lon) = dataarray.shape
self.N = n_lat * n_lon
self.dataarray = dataarray.reshape(-1, self.N).T.copy()
elif numpy.ndim(dataarray) == 2:
(self.total_time, self.N) = dataarray.shape
self.dataarray = dataarray.T.copy()
else:
print("irregular array shape...")
self.dataarray = dataarray.T.copy()
# factorials below 10 in a list for permutation patterns
self.factorial = \
numpy.array([1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880])
self.patternized = False
self.has_fft = False
self.originalFFT = None
# lag_mode dict
self.lag_modi = {"all": 0, "sum": 1, "max": 2}
[docs]
def __str__(self):
"""
Return a string representation of the CouplingAnalysisPurePython
object.
"""
shape = self.dataarray.shape
return (f'CouplingAnalysisPurePython: {shape[0]} variables, '
f'{shape[1]} timesteps.')
#
# Define methods to calculate correlation strength and lags
#
#
# Routines for calculating Cross Correlation
#
[docs]
def cross_correlation(self, tau_max=0, lag_mode='all'):
"""
Returns the normalized cross correlation from all pairs of nodes from
a range of time lags.
The calculation ranges are shown below::
(-------------------------total_time--------------------------)
(---tau_max---)(---------corr_range------------)(---tau_max---)
CC is calculated about corr_range and with the other time series
shifted by tau
Possible choices for lag_mode:
- "all" will return the full function for all lags, possible large
memory need if only_tri is True, only the upper triangle contains the
values, the lower one is zeros
- "sum" will return the sum over positive and negative lags seperatly,
each inclunding tau=0 corrmat[0] is the positive sum, corrmat[1] the
negative sum
- "max" will return only the maximum coupling (in corrmat[0]) and its
lag (in corrmat[1])
:arg int tau_max: maximum lag in both directions, including last lag
:arg str lag_mode: the output mode
:rtype: 3D numpy array (float) [index, index, index]
:return: correlation matrix with different lag_mode choices
"""
# Normalize anomaly time series to zero mean and unit variance for all
# lags, array contains normalizations for all lags
corr_range = self.total_time - 2*tau_max
normalized_array = numpy.empty((2*tau_max + 1, self.N, corr_range),
dtype="float32")
for t in range(2*tau_max + 1):
# Remove mean value from time series at each vertex (grid point)
normalized_array[t] = self.dataarray[:, t:t+corr_range] - \
self.dataarray[:, t:t+corr_range].\
mean(axis=1).reshape(self.N, 1)
# Normalize the variance of anomalies to one
normalized_array[t] /= normalized_array[t].\
std(axis=1).reshape(self.N, 1)
# Correct for grid points with zero variance in their time series
normalized_array[t][numpy.isnan(normalized_array[t])] = 0
return self._calculate_cc(normalized_array, corr_range=corr_range,
tau_max=tau_max, lag_mode=lag_mode)
[docs]
def shuffled_surrogate_for_cc(self, fourier=False, tau_max=1,
lag_mode='all'):
"""
Returns a correlation matrix calculated with an independently shuffled
surrogate of the dataarray of length corr_range for all taus.
:arg int corr_range: length of sample
:arg int tau_max: maximum lag in both directions, including last lag
:arg str lag_mode: output mode
:rtype: 3D numpy array (float) [index, index, index]
:return: correlation matrix with different lag_mode choices
"""
corr_range = self.total_time - 2*tau_max
# Shuffle a copy of dataarray separatly for each node
array = numpy.copy(self.dataarray)
if fourier:
array = self.correlatedNoiseSurrogates(array)
else:
for i in range(self.N):
numpy.random.shuffle(array[i])
sample_array = numpy.zeros((1, self.N, corr_range), dtype="float32")
sample_array[0] = array[:, :corr_range]
sample_array[0] -= sample_array[0].mean(axis=1).reshape(self.N, 1)
sample_array[0] /= sample_array[0].std(axis=1).reshape(self.N, 1)
sample_array[0, numpy.isnan(sample_array[0])] = 0
res = self._calculate_cc(sample_array, corr_range=corr_range,
tau_max=0, lag_mode='all')
if lag_mode == 'all':
corrmat = numpy.repeat(res, 2*tau_max + 1, axis=0)
elif lag_mode == 'sum':
corrmat = numpy.array([abs(res[0]), abs(res[0])]) * (tau_max+1.)
elif lag_mode == 'max':
corrmat = numpy.array([abs(res[0]),
numpy.random.randint(-tau_max, tau_max+1,
(self.N, self.N))])
return corrmat
[docs]
def time_surrogate_for_cc(self, sample_range=100, tau_max=1,
lag_mode='all'):
"""
Returns a joint shuffled surrogate of the full dataarray of length
sample_range for all taus.
Used for time evolution analysis. First one initializes the
CouplingAnalysis class with the full dataarray and then this function
is called for every single surrogate.
:arg int sample_range: length of sample
:arg int tau_max: maximum lag in both directions, including last lag
:arg str lag_mode: output mode
:rtype: 3D numpy array (float) [index, index, index]
:return: correlation matrix with different lag_mode choices
"""
perm = numpy.random.permutation(
range(tau_max, self.total_time - tau_max))[:sample_range]
sample_array = numpy.empty((2*tau_max + 1, self.N, sample_range),
dtype="float32")
for t in range(2 * tau_max + 1):
tau = t - tau_max
sample_array[t] = self.dataarray[:, perm + tau]
sample_array[t] -= sample_array[t].mean(axis=1).reshape(self.N, 1)
sample_array[t] /= sample_array[t].std(axis=1).reshape(self.N, 1)
sample_array[t][numpy.isnan(sample_array[t])] = 0
return self._calculate_cc(sample_array, corr_range=sample_range,
tau_max=tau_max, lag_mode=lag_mode)
[docs]
def _calculate_cc(self, array, corr_range, tau_max, lag_mode):
"""
Returns the CC matrix.
:arg int tau_max: maximum lag in both directions, including last lag
:arg str lag_mode: output mode
:rtype: 3D numpy array (float) [index, index, index]
:return: correlation matrix with different lag_mode choices
## lag_mode dict
mode = self.lag_modi[lag_mode]
"""
# lag_mode dict
mode = self.lag_modi[lag_mode]
only_tri = int(self.only_tri)
if lag_mode == 'all':
corrmat = numpy.zeros((2*tau_max + 1, self.N, self.N),
dtype='float32')
elif lag_mode == 'sum':
corrmat = numpy.zeros((2, self.N, self.N), dtype='float32')
elif lag_mode == 'max':
corrmat = numpy.zeros((2, self.N, self.N), dtype='float32')
# loop over all node pairs, NOT symmetric due to time shifts!
for i in range(self.N-only_tri):
for j in range((i+1)*only_tri, self.N):
if mode == 2:
maxcross = 0.0
argmax = 0
# loop over taus INCLUDING the last tau value
for t in range(2*tau_max+1):
# here the actual cross correlation is calculated
crossij = (array[tau_max, i, :] * array[t, j, :]).mean()
# fill in values in matrix depending on lag_mode
if mode == 0:
corrmat[t, i, j] = crossij
elif mode == 1:
if t <= tau_max:
corrmat[1, i, j] += numpy.abs(crossij)
if t >= tau_max:
corrmat[0, i, j] += numpy.abs(crossij)
elif mode == 2:
# calculate max and argmax by comparing to previous
# value and storing max
if numpy.abs(crossij) > maxcross:
maxcross = numpy.abs(crossij)
argmax = t
if mode == 2:
corrmat[0, i, j] = maxcross
corrmat[1, i, j] = argmax - tau_max
if self.only_tri:
if lag_mode == 'all':
corrmat = corrmat + corrmat.transpose(0, 2, 1)[::-1]
elif lag_mode == 'sum':
corrmat[0] += corrmat[1].transpose()
corrmat[1] = corrmat[0].transpose()
elif lag_mode == 'max':
corrmat[0] += corrmat[0].transpose()
corrmat[1] -= corrmat[1].transpose()
return corrmat
#
# Routines for calculating Mutual Information with adaptive bins
#
[docs]
def shuffled_surrogate_for_mi(self, fourier=False, bins=16, tau_max=0,
lag_mode='all'):
"""
Returns a shuffled surrogate of normalized mutual information from all
pairs of nodes from a range of time lags.
:arg int bins: number of bins for estimating MI
:arg int tau_max: maximum lag in both directions, including last lag
:arg str lag_mode: output mode
:rtype: 3D numpy array (float) [index, index, index]
:return: correlation matrix with different lag_mode choices
"""
if bins < 255:
dtype = 'uint8'
else:
dtype = 'int16'
# Normalize anomaly time series to zero mean and unit variance for all
# lags, array contains normalizations for all lags
corr_range = self.total_time - 2*tau_max
# Shuffle a copy of dataarray seperatly for each node
array = numpy.copy(self.dataarray)
if fourier:
array = self.correlatedNoiseSurrogates(array)
else:
for i in range(self.N):
numpy.random.shuffle(array[i])
# get the bin quantile steps
bin_edge = numpy.ceil(corr_range/float(bins))
symbolic_array = numpy.empty((1, self.N, corr_range), dtype=dtype)
array = array[:, :corr_range]
# get the lower edges of the bins for every time series
edges = numpy.sort(array, axis=1)[:, ::bin_edge]
bins = edges.shape[1]
# This gives the symbolic time series
symbolic_array[0] = \
(array.reshape(self.N, corr_range, 1)
>= edges.reshape(self.N, 1, bins)).sum(axis=2) - 1
res = self._calculate_mi(symbolic_array, corr_range=corr_range,
bins=bins, tau_max=0, lag_mode='all')
if lag_mode == 'all':
corrmat = numpy.repeat(res, 2*tau_max + 1, axis=0)
elif lag_mode == 'sum':
corrmat = numpy.array([res[0], res[0]]) * (tau_max+1.)
elif lag_mode == 'max':
corrmat = numpy.array(
[res[0], numpy.random.randint(-tau_max, tau_max+1,
(self.N, self.N))])
return corrmat
[docs]
def time_surrogate_for_mi(self, bins=16, sample_range=100, tau_max=1,
lag_mode='all'):
"""
Returns a joint shuffled surrogate of the full dataarray of length
sample_range for all taus.
Used for time evolution analysis. First one initializes the
CouplingAnalysis class with the full dataarray and then this function
is called for every single surrogate.
:arg int sample_range: length of sample
:arg int bins: number of bins for estimating MI
:arg int tau_max: maximum lag in both directions, including last lag
:arg str lag_mode: output mode
:rtype: 3D numpy array (float) [index, index, index]
:return: correlation matrix with different lag_mode choices
"""
if bins < 255:
dtype = 'uint8'
else:
dtype = 'int16'
perm = numpy.random.permutation(
range(tau_max, self.total_time - tau_max))[:sample_range]
# get the bin quantile steps
bin_edge = numpy.ceil(sample_range/float(bins))
symbolic_array = numpy.empty((2*tau_max + 1, self.N, sample_range),
dtype=dtype)
for t in range(2*tau_max + 1):
tau = t - tau_max
array = self.dataarray[:, perm + tau]
# get the lower edges of the bins for every time series
edges = numpy.sort(array, axis=1)[:, ::bin_edge]
bins = edges.shape[1]
# This gives the symbolic time series
symbolic_array[t] = \
(array.reshape(self.N, sample_range, 1)
>= edges.reshape(self.N, 1, bins)).sum(axis=2) - 1
return self._calculate_mi(symbolic_array, corr_range=sample_range,
bins=bins, tau_max=tau_max,
lag_mode=lag_mode)
[docs]
def _calculate_mi(self, array, corr_range, bins, tau_max, lag_mode):
"""
Returns the mi matrix.
:arg int bins: number of bins for estimating MI
:arg int tau_max: maximum lag in both directions, including last lag
:arg str lag_mode: output mode
:rtype: 3D numpy array (float) [index, index, index]
:return: correlation matrix with different lag_mode choices
"""
# lag_mode dict
mode = self.lag_modi[lag_mode]
only_tri = int(self.only_tri)
# Initialize
hist2D = numpy.zeros((bins, bins), dtype="int32")
if lag_mode == 'all':
corrmat = numpy.zeros((2*tau_max + 1, self.N, self.N),
dtype='float32')
elif lag_mode == 'sum':
corrmat = numpy.zeros((2, self.N, self.N), dtype='float32')
elif lag_mode == 'max':
corrmat = numpy.zeros((2, self.N, self.N), dtype='float32')
# Precalculation of the log
gfunc = numpy.zeros(corr_range+1)
for t in range(1, corr_range + 1):
gfunc[t] = t*numpy.log(t)
# loop over all node pairs, NOT symmetric due to time shifts!
for i in range(self.N-only_tri):
for j in range((i+1)*only_tri, self.N):
if mode == 2:
maxcross = 0.0
argmax = 0
# loop over taus from -tau_max to tau_max INCLUDING the last
# tau value
for t in range(2*tau_max + 1):
tau = t - tau_max
# here the joint probability distribution is calculated
for k in range(corr_range):
indexi = array[tau_max, i, k]
indexj = array[t, j, k]
hist2D[indexi, indexj] += 1
# here the joint entropy is calculated by summing over all
# pattern combinations
jointent = 0.0
for m in range(bins):
for n in range(bins):
jointent -= gfunc[hist2D[m, n]]
hist2D[m, n] = 0
jointent /= float(corr_range)
jointent += numpy.log(float(corr_range))
# Mutual Information is...
mi = 0.0
mi = 2. * numpy.log(bins) - jointent
# norm the mi
mi /= numpy.log(bins)
# fill in values in matrix depending on lag_mode
if mode == 0:
corrmat[t, i, j] = mi
elif mode == 1:
if t <= tau_max:
corrmat[1, i, j] += mi
if t >= tau_max:
corrmat[0, i, j] += mi
elif mode == 2:
# calculate max and argmax by comparing to previous
# value and storing max
if mi > maxcross:
maxcross = mi
argmax = tau
if mode == 2:
corrmat[0, i, j] = maxcross
corrmat[1, i, j] = argmax
if self.only_tri:
if lag_mode == 'all':
corrmat = corrmat + corrmat.transpose(0, 2, 1)[::-1]
if lag_mode == 'sum':
corrmat[0] += corrmat[1].transpose()
corrmat[1] = corrmat[0].transpose()
elif lag_mode == 'max':
corrmat[0] += corrmat[0].transpose()
corrmat[1] -= corrmat[1].transpose()
return corrmat
#
# A subroutine for fourier surrogates (from J Donges)
#