# This file is part of pyunicorn.
# Copyright (C) 2008--2024 Jonathan F. Donges and pyunicorn authors
# URL: <https://www.pik-potsdam.de/members/donges/software-2/software>
# License: BSD (3-clause)
#
# Please acknowledge and cite the use of this software and its authors
# when results are used in publications or published elsewhere.
#
# You can use the following reference:
# J.F. Donges, J. Heitzig, B. Beronov, M. Wiedermann, J. Runge, Q.-Y. Feng,
# L. Tupikina, V. Stolbova, R.V. Donner, N. Marwan, H.A. Dijkstra,
# and J. Kurths, "Unified functional network and nonlinear time series analysis
# for complex systems science: The pyunicorn package"
"""
Provides classes for analyzing spatially embedded complex networks, handling
multivariate data and generating time series surrogates.
"""
# array object and fast numerics
import numpy as np
from numpy import random
# easy progress bar handling
from tqdm import trange
from ..core._ext.types import to_cy, ADJ, DEGREE, FIELD, DFIELD
from ._ext.numerics import _embed_time_series_array, _recurrence_plot, \
_twins_s, _twin_surrogates_s, _test_pearson_correlation, \
_test_mutual_information
#
# Define class Surrogates
#
[docs]
class Surrogates:
"""
Encapsulates structures and methods related to surrogate time series.
Provides data structures and methods to generate surrogate data sets from a
set of time series and to evaluate the significance of various correlation
measures using these surrogates.
More information on time series surrogates can be found in [Schreiber2000]_
and [Kantz2006]_.
"""
#
# Define internal methods
#
[docs]
def __init__(self, original_data, silence_level=1):
"""
Initialize an instance of Surrogates.
.. note::
The order of array dimensions is different from the standard of
``core``. Here it is [index, time] for reasons of computational
speed!
:type original_data: 2D array [index, time]
:arg original_data: The original time series for surrogate generation.
:arg int silence_level: The inverse level of verbosity of the object.
"""
if silence_level <= 1:
print("Generated an instance of the Surrogates class.")
# Set class variables
self.original_data = original_data
"""The original time series for surrogate generation."""
self.silence_level = silence_level
"""(string) - The inverse level of verbosity of the object."""
# Set flags
self._normalized = False
self._fft_cached = False
self._twins_cached = False
# Cache
self._twins = None
self._original_data_fft = None
[docs]
def __str__(self):
"""
Returns a string representation.
"""
return f"Surrogates: time series shape {self.original_data.shape}."
[docs]
def clear_cache(self):
"""Clean up cache."""
try:
del self._original_data_fft
del self._twins
except AttributeError:
pass
#
# Methods for testing purposes
#
[docs]
@staticmethod
def SmallTestData():
"""
Return Surrogates instance representing test a data set of 6 time
series.
:rtype: Surrogates instance
:return: a Surrogates instance for testing purposes.
"""
# Create time series
ts = np.zeros((6, 200))
for i in range(6):
ts[i, :] = np.sin(np.arange(200)*np.pi/15. + i*np.pi/2.) + \
np.sin(np.arange(200) * np.pi / 30.)
return Surrogates(original_data=ts, silence_level=2)
#
# Define methods to normalize and analyze the data
#
[docs]
@staticmethod
def normalize_time_series_array(time_series_array):
"""
:index:`Normalize <pair: normalize; time series array>` an array of
time series to zero mean and unit variance individually for each
individual time series.
**Modifies the given array in place!**
**Examples:**
>>> ts = Surrogates.SmallTestData().original_data
>>> Surrogates.SmallTestData().normalize_time_series_array(ts)
>>> r(ts.mean(axis=1))
array([ 0., 0., 0., 0., 0., 0.])
>>> r(ts.std(axis=1))
array([ 1., 1., 1., 1., 1., 1.])
:type time_series_array: 2D array [index, time]
:arg time_series_array: The time series array to be normalized.
"""
mean = time_series_array.mean(axis=1)
std = time_series_array.std(axis=1)
for i in range(time_series_array.shape[0]):
# Remove mean value from time series at each node (grid point)
time_series_array[i, :] -= mean[i]
# Normalize the standard deviation of anomalies to one
if std[i] != 0:
time_series_array[i, :] /= std[i]
[docs]
def embed_time_series_array(self, time_series_array, dimension, delay):
"""
Return a :index:`delay embedding` of all time series.
.. note::
Only works for scalar time series!
**Example:**
>>> ts = Surrogates.SmallTestData().original_data
>>> Surrogates.SmallTestData().embed_time_series_array(
... time_series_array=ts, dimension=3, delay=2)[0,:6,:]
array([[ 0. , 0.61464833, 1.14988147],
[ 0.31244015, 0.89680225, 1.3660254 ],
[ 0.61464833, 1.14988147, 1.53884177],
[ 0.89680225, 1.3660254 , 1.6636525 ],
[ 1.14988147, 1.53884177, 1.73766672],
[ 1.3660254 , 1.6636525 , 1.76007351]])
:type time_series_array: 2D array [index, time]
:arg time_series_array: The time series array to be normalized.
:arg int dimension: The embedding dimension.
:arg int delay: The embedding delay.
:rtype: 3D array [index, time, dimension]
:return: the embedded time series.
"""
if self.silence_level <= 1:
print(f"Embedding all time series in dimension {dimension} "
f"and with lag {delay} ...")
(N, n_time) = time_series_array.shape
embedding = np.empty(
(N, n_time - (dimension - 1)*delay, dimension), dtype=DFIELD)
_embed_time_series_array(
N, n_time, dimension, delay,
to_cy(time_series_array, DFIELD), embedding)
return embedding
# FIXME: I(wb) included the line
# dimension = embedding.shape[1]
# whose missing caused an error. I can't guarantee if it is correct.
[docs]
def recurrence_plot(self, embedding, threshold):
"""
Return the :index:`recurrence plot <pair: recurrence plot; time
series>` from an embedding of a time series.
Uses supremum norm.
:type embedding: 2D array [time, dimension]
:arg embedding: The embedded time series.
:arg float threshold: The recurrence threshold.
:rtype: 2D array [time, time]
:return: the recurrence matrix.
"""
if self.silence_level <= 1:
print("Calculating the recurrence plot...")
n_time = embedding.shape[0]
dimension = embedding.shape[1]
R = np.ones((n_time, n_time), dtype=ADJ)
_recurrence_plot(n_time, dimension, threshold,
to_cy(embedding, FIELD), R)
return R
# FIXME: I(wb) included the line
# dimension = embedding_array.shape[2]
# whose missing caused an error. I can't guarantee if it is correct.
[docs]
def twins(self, embedding_array, threshold, min_dist=7):
"""
Return list of the :index:`twins <pair: twins; surrogates>` of each
state vector for all time series.
Two state vectors are said to be twins if they share the same
recurrences, i.e., if the corresponding rows or columns in the
recurrence plot are identical.
References: [Thiel2006]_, [Marwan2007]_.
:type embedding_array: 3D array [index, time, dimension]
:arg embedding_array: The embedded time series array.
:arg float threshold: The recurrence threshold.
:arg number min_dist: The minimum temporal distance for twins.
:rtype: [[number]]
:return: the list of twins for each state vector in the time series.
"""
if self.silence_level <= 1:
print("Finding twins...")
N = embedding_array.shape[0]
n_time = embedding_array.shape[1]
dimension = embedding_array.shape[2]
twins = []
# Initialize the R matrix with ones
R = np.empty((n_time, n_time), dtype=ADJ)
# Initialize array to store the number of neighbors for each sample
nR = np.empty(n_time, dtype=DEGREE)
_twins_s(N, n_time, dimension, threshold, min_dist,
to_cy(embedding_array, DFIELD), R, nR, twins)
return twins
#
# Define methods to generate sets of surrogate time series
#
[docs]
def white_noise_surrogates(self, original_data):
"""
Return a shuffled copy of a time series array.
Each time series is shuffled individually. The surrogates correspond to
realizations of white noise consistent with the :attr:`original_data`
time series' amplitude distribution.
**Example** (Distributions of white noise surrogates should the same as
for the original data):
>>> ts = Surrogates.SmallTestData().original_data
>>> surrogates = Surrogates.\
SmallTestData().white_noise_surrogates(ts)
>>> np.allclose(np.histogram(ts[0,:])[0],
... np.histogram(surrogates[0,:])[0])
True
:type original_data: 2D array [index, time]
:arg original_data: The original time series.
:rtype: 2D array [index, time]
:return: The surrogate time series.
"""
if self.silence_level <= 1:
print("Generating white noise surrogates by random shuffling...")
# Generate reference to shuffle function
shuffle = random.shuffle
surrogates = original_data.copy()
for i in range(surrogates.shape[0]):
shuffle(surrogates[i, :])
return surrogates
[docs]
def AAFT_surrogates(self, original_data):
"""
Return surrogates using the amplitude adjusted Fourier transform
method.
Reference: [Schreiber2000]_
:type original_data: 2D array [index, time]
:arg original_data: The original time series.
:rtype: 2D array [index, time]
:return: The surrogate time series.
"""
# Create sorted Gaussian reference series
gaussian = random.randn(original_data.shape[0], original_data.shape[1])
gaussian.sort(axis=1)
# Rescale data to Gaussian distribution
ranks = original_data.argsort(axis=1).argsort(axis=1)
rescaled_data = np.zeros(original_data.shape)
for i in range(original_data.shape[0]):
rescaled_data[i, :] = gaussian[i, ranks[i, :]]
# Phase randomize rescaled data
phase_randomized_data = \
self.correlated_noise_surrogates(rescaled_data)
# Rescale back to amplitude distribution of original data
sorted_original = original_data.copy()
sorted_original.sort(axis=1)
ranks = phase_randomized_data.argsort(axis=1).argsort(axis=1)
for i in range(original_data.shape[0]):
rescaled_data[i, :] = sorted_original[i, ranks[i, :]]
return rescaled_data
[docs]
def refined_AAFT_surrogates(self, original_data, n_iterations,
output="true_amplitudes"):
"""
Return surrogates using the iteratively refined amplitude adjusted
Fourier transform method.
A set of AAFT surrogates (:meth:`AAFT_surrogates`) is iteratively
refined to produce a closer match of both amplitude distribution and
power spectrum of surrogate and original data.
Reference: [Schreiber2000]_
:type original_data: 2D array [index, time]
:arg original_data: The original time series.
:arg int n_iterations: Number of iterations / refinement steps
:arg str output: Type of surrogate to return. "true_amplitudes":
surrogates with correct amplitude distribution, "true_spectrum":
surrogates with correct power spectrum, "both": return both outputs
of the algorithm.
:rtype: 2D array [index, time]
:return: The surrogate time series.
"""
# Get size of dimensions
n_time = original_data.shape[1]
# Get Fourier transform of original data with caching
if self._fft_cached:
fourier_transform = self._original_data_fft
else:
fourier_transform = np.fft.rfft(original_data, axis=1)
self._original_data_fft = fourier_transform
self._fft_cached = True
# Get Fourier amplitudes
original_fourier_amps = np.abs(fourier_transform)
# Get sorted copy of original data
sorted_original = original_data.copy()
sorted_original.sort(axis=1)
# Get starting point / initial conditions for R surrogates
# (see [Schreiber2000]_)
R = self.AAFT_surrogates(original_data)
# Start iteration
for i in range(n_iterations):
# Get Fourier phases of R surrogate
r_fft = np.fft.rfft(R, axis=1)
r_phases = r_fft / np.abs(r_fft)
# Transform back, replacing the actual amplitudes by the desired
# ones, but keeping the phases exp(iψ(i)
s = np.fft.irfft(original_fourier_amps * r_phases, n=n_time,
axis=1)
# Rescale to desired amplitude distribution
ranks = s.argsort(axis=1).argsort(axis=1)
for j in range(original_data.shape[0]):
R[j, :] = sorted_original[j, ranks[j, :]]
if output == "true_amplitudes":
return R
elif output == "true_spectrum":
return s
elif output == "both":
return (R, s)
else:
return (R, s)
[docs]
def twin_surrogates(self, original_data, dimension, delay, threshold,
min_dist=7):
"""
Return surrogates using the twin surrogate method.
Scalar twin surrogates are created by isolating the first component
(dimension) of the twin surrogate trajectories.
Twin surrogates share linear and nonlinear properties with the original
time series, since they correspond to realizations of trajectories of
the same dynamical systems with different initial conditions.
References: [Thiel2006]_ [*], [Marwan2007]_.
The twin lists of all time series are cached to facilitate a faster
generation of several surrogates for each time series. Hence,
:meth:`clear_cache` has to be called before generating twin surrogates
from a different set of time series!
:type original_data: 2D array [index, time]
:arg original_data: The original time series.
:arg int dimension: The embedding dimension.
:arg int delay: The embedding delay.
:arg float threshold: The recurrence threshold.
:arg number min_dist: The minimum temporal distance for twins.
:rtype: 2D array [index, time]
:return: the twin surrogates.
"""
# The algorithm proceeds in several steps:
# 1. Embed the original_data time series, using time delay embedding
# for simplicity. Use the same dimension and time delay delay for
# all time series for simplicity. Determine delay using time
# delayed mutual information and d using false nearest neighbors
# methods.
# 2. Use the algorithm proposed in [*] to find twins
# 3. Reconstruct one-dimensional twin surrogate time series
(N, n_time) = original_data.shape
n_time = n_time - (dimension-1)*delay
# Make sure that twins are calculated only once
if self._twins_cached:
twins = self._twins
else:
embedding = self.embed_time_series_array(original_data,
dimension, delay)
twins = self.twins(embedding, threshold, min_dist)
self._twins = twins
self._twins_cached = True
return _twin_surrogates_s(N, n_time, twins,
to_cy(original_data, DFIELD))
#
# Defines methods to generate correlation measure matrices based on
# original_data and surrogate data for significance testing.
#
[docs]
@staticmethod
def eval_fast_code(function, original_data, surrogates):
"""
Evaluate performance of fast and slow versions of algorithms.
Designed for evaluating fast and dirty C code against cleaner code
using Blitz arrays. Does some profiling and returns the total error
between the results.
:type function: Python function
:arg function: The function to be evaluated.
:type original_data: 2D array [index, time]
:arg original_data: The original time series.
:type surrogates: 2D array [index, time]
:arg surrogates: The surrogate time series.
:return float: The total squared difference between resulting matrices.
"""
# Some profiling
# profile.run("fastResult = function(original_data, surrogates,
# fast=True)")
# profile.run("slowResult = function(original_data, surrogates,
# fast=False)")
fast_result = function(original_data, surrogates, fast=True)
slow_result = function(original_data, surrogates, fast=False)
# Return error
return np.sqrt(((fast_result - slow_result)**2).sum())
[docs]
@staticmethod
def test_pearson_correlation(original_data, surrogates):
"""
Return a test matrix of the Pearson correlation coefficient (zero lag).
The test matrix's entry :math:`(i,j)` contains the Pearson correlation
coefficient between original time series i and surrogate time series j
at lag zero. The resulting matrix is useful for significance tests
based on the Pearson correlation matrix of the original data.
.. note::
Assumes, that original_data and surrogates are already normalized.
:type original_data: 2D array [index, time]
:arg original_data: The original time series.
:type surrogates: 2D array [index, time]
:arg surrogates: The surrogate time series.
:rtype: 2D array [index, index]
:return: the Pearson correlation test matrix.
"""
(N, n_time) = original_data.shape
return _test_pearson_correlation(to_cy(original_data, DFIELD),
to_cy(surrogates, DFIELD),
N, n_time)
#
# Define methods to perform significance tests on correlation measures
# based on surrogates.
#
[docs]
def original_distribution(self, test_function, original_data, n_bins=100):
"""
Return a normalized histogram of a similarity measure matrix.
The absolute value of the similarity measure is used, since only the
degree of similarity was of interest originally.
:type test_function: Python function
:arg test_function: The function implementing the similarity measure.
:type original_data: 2D array [index, time]
:arg original_data: The original time series.
:arg int n_bins: The number of bins for estimating prob. distributions.
:rtype: tuple of 1D arrays ([bins],[bins])
:return: the similarity measure histogram and lower bin boundaries.
"""
if self.silence_level <= 1:
print("Estimating probability density distribution of "
"original_data ...")
# Normalize original_data time series to zero mean and unit variance
if not self._normalized:
self.normalize_time_series_array(original_data)
self._normalized = True
correlation_measure = np.abs(test_function(original_data,
original_data))
(hist, lbb) = np.histogram(correlation_measure, n_bins, density=True)
# Normalize
hist /= hist.sum()
lbb = lbb[:-1]
return (hist, lbb)
[docs]
def test_threshold_significance(self, surrogate_function, test_function,
realizations=1, n_bins=100,
interval=(-1, 1)):
"""
Return a test distribution for a similarity measure.
Perform a significance test on the values of a correlation measure
based on original_data time series and surrogate data. Returns a
density estimate (histogram) of the absolute value of the correlation
measure over all realizations.
The resulting distribution of the values of similarity measure from
original and surrogate time series is of use for testing the
statistical significance of a selected threshold value for climate
network generation.
:type surrogate_function: Python function
:arg surrogate_function: The function implementing the surrogates.
:type test_function: Python function
:arg test_function: The function implementing the similarity measure.
:arg int realizations: The number of surrogates to be created for each
time series.
:arg int n_bins: The number of bins for estimating probability
distribution of test similarity measure.
:type interval: (float, float)
:arg interval: The range over which to estimate similarity measure
distribution.
:rtype: tuple of 1D arrays ([bins],[bins])
:return: similarity measure test histogram and lower bin boundaries.
"""
if self.silence_level <= 1:
print(f"Starting significance test based on {realizations} "
"realizations of surrogates...")
original_data = self.original_data
self._fft_cached = False
self._twins_cached = False
# Create reference to np.histogram function
numpy_hist = np.histogram
# Normalize original_data time series to zero mean and unit variance
if not self._normalized:
self.normalize_time_series_array(original_data)
self._normalized = True
# Initialize density estimate
density_estimate = np.zeros(n_bins)
for _ in trange(realizations, disable=self.silence_level > 2):
# Get the surrogate
# Mean and variance are conserved by all surrogates
surrogates = surrogate_function(original_data)
# Get the correlation measure test matrix
correlation_measure_test = np.abs(test_function(original_data,
surrogates))
# Test if correlation measure values are outside range
if correlation_measure_test.min() < interval[0]:
print("Warning! Correlation measure value left of range.")
if correlation_measure_test.max() > interval[1]:
print("Warning! Correlation measure value right of range.")
# Estimate density of current realization
(hist, lbb) = numpy_hist(correlation_measure_test, n_bins,
interval, normed=True)
# Add to density estimate over all realizations
density_estimate += hist
# Clean up (should be done automatically by Python,
# but you never know...)
del surrogates, correlation_measure_test
# Normalize density estimate
density_estimate /= density_estimate.sum()
lbb = lbb[:-1]
return (density_estimate, lbb)