# This file is part of pyunicorn.
# Copyright (C) 2008--2024 Jonathan F. Donges and pyunicorn authors
# URL: <https://www.pik-potsdam.de/members/donges/software-2/software>
# License: BSD (3-clause)
#
# Please acknowledge and cite the use of this software and its authors
# when results are used in publications or published elsewhere.
#
# You can use the following reference:
# J.F. Donges, J. Heitzig, B. Beronov, M. Wiedermann, J. Runge, Q.-Y. Feng,
# L. Tupikina, V. Stolbova, R.V. Donner, N. Marwan, H.A. Dijkstra,
# and J. Kurths, "Unified functional network and nonlinear time series analysis
# for complex systems science: The pyunicorn package"
"""
Provides classes for the analysis of dynamical systems and time series based
on recurrence plots, including measures of recurrence quantification
analysis (RQA) and recurrence network analysis.
"""
from math import factorial
from typing import Tuple
from collections.abc import Hashable
import numpy as np
from numpy.typing import NDArray
from ..core.cache import Cached
from ..core._ext.types import to_cy, NODE, LAG, FIELD, DFIELD
from ._ext.numerics import _embed_time_series, _manhattan_distance_matrix_rp, \
_euclidean_distance_matrix_rp, _supremum_distance_matrix_rp, \
_set_adaptive_neighborhood_size, _bootstrap_distance_matrix_manhattan, \
_bootstrap_distance_matrix_euclidean, \
_bootstrap_distance_matrix_supremum, \
_diagline_dist_missingvalues, _diagline_dist, \
_diagline_dist_sequential_missingvalues, _diagline_dist_sequential, \
_vertline_dist_missingvalues, _vertline_dist, \
_vertline_dist_sequential_missingvalues, _vertline_dist_sequential, \
_rejection_sampling, _white_vertline_dist, _twins_r, _twin_surrogates_r
[docs]
class RecurrencePlot(Cached):
"""
Class RecurrencePlot for generating and quantitatively analyzing
:index:`recurrence plots <single: recurrence plot>`.
The RecurrencePlot class supports the construction of recurrence plots
from multi-dimensional time series, optionally using embedding. Currently,
manhattan, euclidean and supremum norms are provided for measuring
distances in phase space.
Methods for calculating commonly used measures of :index:`recurrence
quantification analysis <pair: RQA; recurrence plot>` (RQA) are provided,
e.g., determinism, maximum diagonal line length and laminarity. The
definitions of these measures together with a review of the theory and
applications of recurrence plots are given in [Marwan2007]_.
**Examples:**
- Create an instance of RecurrencePlot with a fixed recurrence threshold
and without embedding::
RecurrencePlot(time_series, threshold=0.1)
- Create an instance of RecurrencePlot with a fixed recurrence threshold
in units of STD and without embedding::
RecurrencePlot(time_series, threshold_std=0.03)
- Create an instance of RecurrencePlot at a fixed (global) recurrence rate
and using time delay embedding::
RecurrencePlot(time_series, dim=3, tau=2,
recurrence_rate=0.05).recurrence_rate()
"""
#
# Internal methods
#
[docs]
def __init__(self, time_series: NDArray, metric: str = "supremum",
normalize: bool = False, missing_values: bool = False,
sparse_rqa: bool = False, silence_level: int = 0,
**kwargs):
"""
Initialize an instance of RecurrencePlot.
Either recurrence threshold ``threshold``/``threshold_std``, recurrence
rate ``recurrence_rate`` or local recurrence rate
``local_recurrence_rate`` have to be given as keyword arguments.
Embedding is only supported for scalar time series. If embedding
dimension ``dim`` and delay ``tau`` are **both** given as keyword
arguments, embedding is applied. Multidimensional time series are
processed as is by default.
:attention: The sparse_rqa feature is experimental and currently only
works for fixed threshold and the supremum metric.
:type time_series: 2D array (time, dimension)
:arg time_series: The time series to be analyzed, can be scalar or
multi-dimensional.
:arg str metric: The metric for measuring distances in phase space
("manhattan", "euclidean", "supremum").
:arg bool normalize: Decide whether to normalize the time series to
zero mean and unit standard deviation.
:arg bool missing_values: Toggle special treatment of missing values in
:attr:`.RecurrencePlot.time_series`.
:arg bool sparse_rqa: Toggles sequential RQA computation using less
memory for use with long time series.
:arg bool skip_recurrence: Skip calculation of recurrence matrix within
RP class (e.g. when overloading respective methods in child class)
:arg int silence_level: Inverse level of verbosity of the object.
:arg number threshold: The recurrence threshold keyword for generating
the recurrence plot using a fixed threshold.
:arg number threshold_std: The recurrence threshold keyword for
generating the recurrence plot using a fixed threshold in units of
the time series' STD.
:arg number recurrence_rate: The recurrence rate keyword for generating
the recurrence plot using a fixed recurrence rate.
:arg number local_recurrence_rate: The local recurrence rate keyword
for generating the recurrence plot using a fixed local recurrence
rate (same number of recurrences for each state vector).
:arg number adaptive_neighborhood_size: The adaptive neighborhood size
parameter for generating recurrence plots based on the algorithm in
[Xu2008]_.
:arg number dim: The embedding dimension.
:arg number tau: The embedding delay.
"""
# Set silence_level
self.silence_level = silence_level
"""The inverse level of verbosity of the object."""
# Set missing_values flag
self.missing_values = missing_values
"""Controls special treatment of missing values in
:attr:`.RecurrencePlot.time_series`."""
# Set sparse RQA flag
self.sparse_rqa = sparse_rqa
"""Controls sequential calculation of RQA measures."""
# Store time series
self.time_series = to_cy(time_series, FIELD)
"""The time series from which the recurrence plot is constructed."""
# Reshape time series
self.time_series.shape = (self.time_series.shape[0], -1)
# Store type of metric
self._known_metrics = ("manhattan", "euclidean", "supremum")
assert metric in self._known_metrics
self.metric = metric
"""The metric used for measuring distances in phase space."""
# minimal denominator for numerical stability
self._epsilon = 1e-08
# Normalize time series
if normalize:
self.normalize_time_series(self.time_series)
# Get embedding dimension and delay from **kwargs
self.dim = kwargs.get("dim")
self.tau = kwargs.get("tau")
self.N: int = 0
"""The number of state vectors (number of lines and rows) of the RP."""
self.R = None
"""The recurrence matrix."""
self._mut_embedding: int = 0
if (self.dim is not None) and (self.tau is not None):
# Embed the time series
self.embedding = self.embed_time_series(
self.time_series, self.dim, self.tau)
else:
self.embedding = self.time_series
# Get missing value indices
if self.missing_values:
self.missing_value_indices = \
np.isnan(self.embedding).sum(axis=1) != 0
# Get threshold or recurrence rate from **kwargs, construct recurrence
# plot accordingly
self.threshold = kwargs.get("threshold")
self.threshold_std = kwargs.get("threshold_std")
# Make sure not to overwrite the method recurrence_rate()
recurrence_rate = kwargs.get("recurrence_rate")
self.local_recurrence_rate = kwargs.get("local_recurrence_rate")
self.adaptive_neighborhood_size = \
kwargs.get("adaptive_neighborhood_size")
# Precompute recurrence matrix only if sequential RQA is switched off,
# and not calling from child class with respective overriding methods.
skip_recurrence = kwargs.get("skip_recurrence")
if not sparse_rqa and not skip_recurrence:
if self.threshold is not None:
# Calculate the recurrence matrix R using the radius of
# neighborhood threshold
RecurrencePlot.set_fixed_threshold(self, self.threshold)
elif self.threshold_std is not None:
# Calculate the recurrence matrix R using the radius of
# neighborhood threshold in units of the time series' STD
RecurrencePlot.set_fixed_threshold_std(
self, self.threshold_std)
elif recurrence_rate is not None:
# Calculate the recurrence matrix R using a fixed recurrence
# rate
RecurrencePlot.set_fixed_recurrence_rate(
self, recurrence_rate)
elif self.local_recurrence_rate is not None:
# Calculate the recurrence matrix R using a fixed local
# recurrence rate
RecurrencePlot.set_fixed_local_recurrence_rate(
self, self.local_recurrence_rate)
elif self.adaptive_neighborhood_size is not None:
# Calculate the recurrence matrix R using the adaptive
# neighborhood size algorithm in [Xu2008]_
RecurrencePlot.set_adaptive_neighborhood_size(
self, self.adaptive_neighborhood_size)
else:
raise NameError("Please give either threshold or \
recurrence_rate to construct the recurrence \
plot!")
[docs]
def __cache_state__(self) -> Tuple[Hashable, ...]:
return (self._mut_embedding,)
[docs]
def __str__(self):
"""
Returns a string representation.
"""
return ("RecurrencePlot: "
f"time series shape {self.time_series.shape}.\n"
f"Embedding dimension {self.dim if self.dim else 0}\n"
f"Threshold {self.threshold}, {self.metric} metric")
@property
def embedding(self) -> np.ndarray:
"""
The embedded time series / phase space trajectory
(time, embedding dimension).
"""
return self._embedding
@embedding.setter
def embedding(self, embedding: np.ndarray):
self._embedding = to_cy(embedding, DFIELD)
self.N = self._embedding.shape[0]
self._mut_embedding += 1
#
# Service methods
#
[docs]
def recurrence_matrix(self):
"""
Return the current recurrence matrix :math:`R`.
:rtype: 2D square Numpy array
:return: the current recurrence matrix :math:`R`.
"""
if not self.sparse_rqa:
return self.R
else:
print("Exception: Sequential RQA mode is enabled. "
"Recurrence matrix is not stored in memory.")
return None
[docs]
def distance_matrix(self, metric: str):
"""
Return phase space distance matrix :math:`D` according to the chosen
metric.
:arg str metric: The metric for measuring distances in phase space
("manhattan", "euclidean", "supremum").
:rtype: 2D square array
:return: the phase space distance matrix :math:`D`
"""
assert metric in self._known_metrics, f"unknown metric: {metric}"
return getattr(RecurrencePlot, f"{metric}_distance_matrix")(self)
#
# Time series handling methods
#
[docs]
@staticmethod
def normalize_time_series(time_series):
"""
:index:`Normalize <pair: normalize; time series>` each component of a
time series **in place**.
Works also for complex valued time series.
.. note::
Modifies the given array in place!
:type time_series: 2D array (time, dimension)
:arg time_series: The time series to be normalized.
"""
# Get number of components of time_series
dim = time_series.shape[1]
mean = time_series.mean(axis=0)
std = time_series.std(axis=0)
# Normalize all components separately
for i in range(dim):
time_series[:, i] -= mean[i]
if std[i] != 0:
time_series[:, i] /= std[i]
# Heitzig:
[docs]
@staticmethod
def legendre_coordinates(x, dim=3, t=None, p=None, tau_w="est"):
"""
Return a phase space trajectory reconstructed using orthogonal
polynomial filters.
The reconstructed state vector components are the zero-th to (dim-1)-th
derivatives of the (possibly irregularly spaced) time series x
as estimated by folding with the orthogonal polynomial filters that
correspond to the sequence of measurement time points t.
This is a generalization for irregularly spaced time series
of the "Legendre coordinates" introduced in Gibson et al. (1992).
:arg array-like x: Time series values
:arg int dim: Dimension > 0 of reconstructed phase space. Default: 3
:type t: array-like or None
:arg t: Optional array of measurement time points corresponding to the
values in x. Default: [0,...,x.size-1]
:type p: int > 0 or None
:arg p: No. of past and future time points to use for the estimation.
Default: dim or determined by tau_w if given
:type tau_w: float > 0 or "est" or None
:arg tau_w: Optional (average) window width to use in determining p
when p = None. Following Gibson et al. (1992), this should be about
sqrt(3/< x**2 >) * std(x), or about a quarter period. If "est",
this is estimated iteratively, starting with
4 * (max(t)-min(t)) / (N-1) and estimating x' from that.
:rtype: 2D array [observation index, dimension index]
:return: Estimated derivatives. Rows are reconstructed state vectors.
"""
x = np.array(x).flatten()
N = x.size
# time points:
if t is None:
t = np.arange(N)
if p is None:
if tau_w == "est":
tau_w = 4 * (t.max() - t.min()) / (N-1)
for i in range(5):
y0 = RecurrencePlot.legendre_coordinates(x, dim=2, t=t,
tau_w=tau_w)
tau_w = np.sqrt(3*x.var()/(y0[:, 1]**2).mean())
print("tau_w set to", tau_w)
if tau_w is None:
p = dim
else:
p = 1
while (t[2*p+1:] - t[:-(2*p+1)]).mean() < tau_w and p < N/4:
p += 1
print("p set to", p)
m = 2*p + 1
N1 = N - m + 1
# time differences:
dt = np.zeros((N1, m))
for i in range(N1):
dt[i, :] = t[i:i+m] - t[i+p]
# filter weights
# = recursively computed values of orthogonal polynomials:
r = np.zeros((N1, dim, m))
for j in range(dim):
r[:, j, :] = dt**j - (
r[:, :j, :] * ((dt**j).reshape((N1, 1, m)) * r[:, :j, :]).sum(
axis=2).reshape((N1, j, 1))).sum(axis=1)
r[:, j, :] /= np.sqrt((r[:, j, :]**2).sum(axis=1)).reshape((N1, 1))
for j in range(dim):
r[:, j, :] *= factorial(j) / \
(r[:, j, :] * dt**j).sum(axis=1).reshape((-1, 1))
# reconstructed state vectors = filtered time series values:
y = np.zeros((N1, dim))
for i in range(N1):
y[i, :] = (r[i, :, :]*x[i:i+m].reshape((1, m))).sum(axis=1)
return y
[docs]
@staticmethod
def embed_time_series(time_series, dim, tau):
"""
Return a time series' delay embedding.
Returns a Numpy array containing a delay embedding of the time series
using embedding dimension dim and time delay tau.
:type time_series: 1D array
:arg time_series: The scalar time series to be embedded.
:arg int dim: The embedding dimension.
:arg int tau: The embedding delay.
:rtype: 2D array (time, dimension)
:return: the embedded phase space trajectory.
"""
# Make sure that dim and tau are Python integers
dim = int(dim)
tau = int(tau)
time_series = to_cy(time_series, FIELD)
if time_series.ndim > 1:
time_series = time_series.squeeze(axis=-1)
assert time_series.ndim == 1
n_time = time_series.shape[0]
embedding = np.empty((n_time - (dim - 1) * tau, dim), dtype=FIELD)
_embed_time_series(n_time, dim, tau, time_series, embedding)
return embedding
[docs]
def permutation_entropy(self, normalize=True):
"""
Returns the permutation entropy for an embedded time series.
An embedding of 3 <= embedding dimension <= 7 is recommended for this
method.
Reference: [Bandt2002]_
:rtype: double
:return: Permutation entropy of the embedded time series
"""
if self.dim is None or self.tau is None:
raise ValueError("Permutation Entropy only works for "
"one-dimensional embedded time series!")
# Calculate order from embedding
temp = self.embedding.argsort(axis=1)
ranks = temp.argsort(axis=1)
# Calculate probability distribution
P = np.unique(ranks, return_counts=True, axis=0)[1]
P = P / ranks.shape[0]
entropy = abs(-(P*np.log2(P)).sum())
if normalize:
entropy = entropy / np.log2(factorial(self.dim))
return entropy
[docs]
def complexity_entropy(self):
"""
Returns the complexity entropy for each dimension of the time series.
Reference: [Ribeiro2011]_
:rtype: double
:return: Complexity entropy of the embedded time series
"""
if self.dim is None or self.tau is None:
raise ValueError("Complexity Entropy only works for "
"one-dimensional embedded time series!")
# Calculate ranks from embedding
temp = self.embedding.argsort(axis=1)
ranks = temp.argsort(axis=1)
# Calculate probability distribution
P = np.unique(ranks, return_counts=True, axis=0)[1]
P = P / ranks.shape[0]
# Calculate factorial of embedding dimension
d_fac = factorial(self.dim)
# Calculate permutation entropy
S_P = abs(-(P*np.log2(P)).sum())
# Initialize unit probability distribution
P_e = np.ones(d_fac) / d_fac
# Calculate permutation entropy of unit probability distribution
S_Pe = abs(-(P_e*np.log2(P_e)).sum())
# Calculate combined probability distribution
P_comb = np.zeros(d_fac)
P_comb[:len(P)] = P
P_comb = (P_comb + P_e) / 2
# Calculate permutation entropy of combinded probability distribution
S_PPe = abs(-(P_comb*np.log2(P_comb)).sum())
# Calculation of maximum Q split into several steps for
# readability
Q_max = (d_fac + 1) / d_fac * np.log2(d_fac + 1)
Q_max = Q_max - 2*np.log2(2*d_fac) + np.log2(d_fac)
Q_max = - Q_max / 2
Q = (S_PPe - S_P/2 - S_Pe/2) / Q_max
complexity_entropy = Q * S_P / np.log2(d_fac)
return complexity_entropy
#
# Calculate recurrence plot
#
[docs]
@Cached.method(name="the manhattan distance matrix")
def manhattan_distance_matrix(self):
"""
Return the manhattan distance matrix from an embedding of a time
series.
:rtype: 2D square array
:return: the manhattan distance matrix.
"""
(n_time, dim) = self.embedding.shape
return _manhattan_distance_matrix_rp(n_time, dim, self.embedding)
[docs]
@Cached.method(name="the euclidean distance matrix")
def euclidean_distance_matrix(self):
"""
Return the euclidean distance matrix from an embedding of a time
series.
:rtype: 2D square array
:return: the euclidean distance matrix.
"""
(n_time, dim) = self.embedding.shape
return _euclidean_distance_matrix_rp(n_time, dim, self.embedding)
[docs]
@Cached.method(name="the supremum distance matrix")
def supremum_distance_matrix(self):
"""
Return the supremum distance matrix from an embedding of a time series.
:rtype: 2D square Numpy array
:return: the supremum distance matrix.
"""
(n_time, dim) = self.embedding.shape
return _supremum_distance_matrix_rp(n_time, dim, self.embedding)
[docs]
def set_fixed_threshold(self, threshold):
"""
Set the recurrence plot to a fixed threshold.
Modifies / sets the class variables :attr:`R` and :attr:`N`
accordingly.
:arg number threshold: The recurrence threshold.
"""
if self.silence_level <= 1:
print("Calculating recurrence plot at fixed threshold...")
distance = RecurrencePlot.distance_matrix(self, self.metric)
n_time = distance.shape[0]
recurrence = np.zeros((n_time, n_time), dtype="int8")
recurrence[distance < threshold] = 1
if self.missing_values:
# Write missing value lines and rows to recurrence matrix
# NaN flag is not supported by int8 data format -> use 0 here
recurrence[self.missing_value_indices, :] = 0
recurrence[:, self.missing_value_indices] = 0
self.R = recurrence
[docs]
def set_fixed_threshold_std(self, threshold_std):
"""
Set the recurrence plot to a fixed threshold in units of the standard
deviation of the time series.
Calculates the absolute threshold and calls
:meth:`set_fixed_threshold`.
:arg number threshold_std: The recurrence threshold in units of the
standard deviation of the time series.
"""
if self.silence_level <= 1:
print("Calculating recurrence plot at fixed threshold in units of "
"time series STD...")
threshold = threshold_std * self.time_series.std()
RecurrencePlot.set_fixed_threshold(self, threshold)
[docs]
def set_fixed_recurrence_rate(self, recurrence_rate):
"""
Set the recurrence plot to a fixed recurrence rate.
Modifies / sets the class variables :attr:`R` and :attr:`N`
accordingly.
:arg number recurrence_rate: The recurrence rate.
"""
if self.silence_level <= 1:
print("Calculating recurrence plot at fixed recurrence rate...")
distance = RecurrencePlot.distance_matrix(self, self.metric)
n_time = distance.shape[0]
threshold = self.threshold_from_recurrence_rate(distance,
recurrence_rate)
recurrence = np.zeros((n_time, n_time), dtype="int8")
recurrence[distance < threshold] = 1
self.R = recurrence
[docs]
def set_fixed_local_recurrence_rate(self, local_recurrence_rate):
"""
Set the recurrence plot to a fixed local recurrence rate.
This results in a fixed number of recurrences for each state vector,
i.e., all state vectors have the same number of recurrences. Modifies
/ sets the class variables :attr:`R` and :attr:`N` accordingly.
.. note::
The resulting recurrence matrix :math:`R` is generally asymmetric!
:arg number local_recurrence_rate: The local recurrence rate.
"""
if self.silence_level <= 1:
print("Calculating recurrence plot at fixed "
"local recurrence rate...")
distance = RecurrencePlot.distance_matrix(self, self.metric)
n_time = distance.shape[0]
recurrence = np.zeros((n_time, n_time), dtype="int8")
for i in range(n_time):
# Get threshold for state vector i to obtain fixed local
# recurrence rate
local_threshold = self.threshold_from_recurrence_rate(
distance[i, :], local_recurrence_rate)
# Thresholding the distance matrix for column i
recurrence[i, distance[i, :] < local_threshold] = 1
self.R = recurrence
[docs]
def set_adaptive_neighborhood_size(self, adaptive_neighborhood_size,
order=None):
"""
Construct recurrence plot using the :index:`adaptive neighborhood
size <single: adaptive neighborhood size; recurrence plot>` algorithm
introduced in [Xu2008]_.
The exact algorithm was deduced from private correspondence with the
authors, as the description given in the above mentioned is not correct
or at least ambiguous.
Modifies / sets the class variables :attr:`R` and :attr:`N`
accordingly.
:arg number adaptive_neighborhood_size: The number of adaptive nearest
neighbors (recurrences) assigned to each state vector.
:type order: 1D array of int32
:arg order: The indices of state vectors in the order desired for
processing by the algorithm. The standard order is :math:`1,...,N`.
"""
if self.silence_level <= 1:
print("Calculating recurrence plot using the "
"adaptive neighborhood size algorithm...")
distance = RecurrencePlot.distance_matrix(self, self.metric)
# Get indices that would sort the distance matrix.
# sorted_neighbors[i,j] contains the index of the jth nearest neighbor
# of i. Sorting order is very important here!
sorted_neighbors = to_cy(distance.argsort(axis=1), NODE)
n_time = distance.shape[0]
recurrence = np.zeros((n_time, n_time), dtype=LAG)
# Set processing order of state vectors
if order is None:
order = np.arange(n_time, dtype=NODE)
else:
order = to_cy(order, NODE)
_set_adaptive_neighborhood_size(n_time, adaptive_neighborhood_size,
sorted_neighbors, order, recurrence)
self.R = recurrence
[docs]
@staticmethod
def threshold_from_recurrence_rate(distance, recurrence_rate: float):
"""
Return the threshold for recurrence plot construction given the
recurrence rate.
Be aware, that the returned threshold can only approximately give the
desired recurrence rate. The accuracy depends on the distribution of
values in the given distance matrix :math:`D`.
:type distance: 2D square array.
:arg distance: The phase space distance matrix :math:`D`.
:arg number recurrence_rate: The desired recurrence rate.
:return number: the recurrence threshold corresponding to the desired
recurrence rate.
"""
# Flatten and sort distance matrix
flat_distance = distance.flatten()
flat_distance.sort()
# Get threshold
assert 0 <= recurrence_rate <= 1
N = len(flat_distance)
threshold = flat_distance[int(recurrence_rate * (N - 1))]
# Clean up
del flat_distance
return threshold
[docs]
@staticmethod
def threshold_from_recurrence_rate_fast(distance, recurrence_rate,
rr_precision=0.001):
"""
Return the threshold for recurrence plot construction given the
recurrence rate.
The threshold yielding a given recurrence_rate is approximated using a
randomly selected rr_precision percent of the distance matrix' entries.
Hence, the expected accuracy is lower than that achieved by using
:index:`threshold_from_recurrence_rate`.
:type distance: 2D square array.
:arg distance: The phase space distance matrix :math:`D`.
:arg number recurrence_rate: The desired recurrence rate.
:arg number rr_precision: The desired precision of recurrence rate
estimation.
:return number: the recurrence threshold corresponding to the desired
recurrence rate.
"""
# Get number of distances to be randomly chosen
n_samples = int(rr_precision * distance.size)
# Get number of phase space points
n_time = distance.shape[0]
# vectorized version
i = np.random.randint(n_time, size=n_samples)
j = np.random.randint(n_time, size=n_samples)
samples = distance[i, j]
# Sort and get threshold
samples.sort()
threshold = samples[int(recurrence_rate * n_samples)]
return threshold
[docs]
@staticmethod
def bootstrap_distance_matrix(embedding, metric, M):
"""
Return bootstrap samples from distance matrix.
:type embedding: 2D array (time, embedding dimension)
:arg embedding: The phase space trajectory.
:arg str metric: The metric for measuring distances in phase space
("manhattan", "euclidean", "supremum").
:arg int M: Number of bootstrap samples
:rtype: 1D array ("float32")
:return: the bootstrap samples from distance matrix.
"""
# Prepare
M = int(M)
embedding = to_cy(embedding, DFIELD)
distances = np.zeros(M, dtype=DFIELD)
(n_time, dim) = embedding.shape
if metric == "manhattan":
_bootstrap_distance_matrix_manhattan(n_time, dim, embedding,
distances, M)
elif metric == "euclidean":
_bootstrap_distance_matrix_euclidean(n_time, dim, embedding,
distances, M)
elif metric == "supremum":
_bootstrap_distance_matrix_supremum(n_time, dim, embedding,
distances, M)
return distances
#
# Recurrence quantification analysis (RQA)
#
[docs]
def rqa_summary(self, l_min=2, v_min=2):
"""
Return a selection of RQA measures.
The selection consists of the recurrence rate :math:`RR`, the
determinism :math:`DET`, the average diagonal line length :math:`L` and
the laminarity :math:`LAM`.
:arg int l_min: The minimum diagonal line length.
:arg int v_min: The minimum vertical line length.
:rtype: Python dictionary
:return: a selection of RQA measures.
"""
RR = self.recurrence_rate()
DET = self.determinism(l_min)
L = self.average_diaglength(l_min)
LAM = self.laminarity(v_min)
return {"RR": RR, "DET": DET, "L": L, "LAM": LAM}
[docs]
def recurrence_rate(self):
"""
Return the :index:`recurrence rate` :math:`RR`.
RR gives the percentage of black dots in the recurrence plot.
:return number: the recurrence rate :math:`RR`.
"""
N = self.N
if not self.sparse_rqa:
R = self.recurrence_matrix()
RR = R.sum() / N ** 2
elif self.metric == "supremum":
RR = (self.vertline_dist() * np.arange(1, N + 1)).sum() / N ** 2
else:
raise NotImplementedError(
"Sequential RQA is currently only available for "
"fixed threshold and the supremum metric.")
return RR
[docs]
def recurrence_probability(self, lag=0):
"""
Return the recurrence probability. This is the probability, that
the trajectory is recurrent after 'lag' time steps.
Contributed by Jan H. Feldhoff.
:return number: the recurrence probability
"""
R = self.recurrence_matrix()
N = self.N
SUM = np.sum(np.diag(R, lag))
return SUM / float(N-lag)
#
# RQA measures based on black diagonal lines
#
[docs]
@Cached.method(attrs=(
"metric", "threshold", "missing_values", "sparse_rqa"))
def diagline_dist(self):
"""
Return the :index:`frequency distribution of diagonal line lengths
<triple: frequency distribution; diagonal; line length>`
:math:`P(l-1)`.
Note that entry :math:`P(l-1)` contains the number of
:index:`diagonal lines <pair: diagonal; lines>` of length :math:`l`.
Thus, :math:`P(0)` counts lines of length :math:`1`,
:math:`P(1)` counts lines of length :math:`2`, asf.
The main diagonal is not counted,
hence :math:`P(N)` will always be :math:`0`.
.. note::
Experimental handling of missing values. Diagonal lines
touching lines and blocks of missing entries in the
recurrence matrix are not counted.
:rtype: 1D array (int32)
:return: the frequency distribution of diagonal line lengths
:math:`P(l-1)`.
"""
# Prepare
n_time = self.N
diagline = np.zeros(n_time, dtype=NODE)
if not self.sparse_rqa:
# Get recurrence matrix
recmat = self.recurrence_matrix()
if self.missing_values:
mv_indices = self.missing_value_indices
_diagline_dist_missingvalues(
n_time, diagline, recmat, mv_indices)
else:
_diagline_dist(n_time, diagline, recmat)
# Calculations for sequential RQA
elif self.metric == "supremum" and self.threshold is not None:
# Get embedding
embedding = self.embedding
# Get time series dimension
dim = embedding.shape[1]
# Get threshold
eps = float(self.threshold)
if self.missing_values:
mv_indices = self.missing_value_indices
_diagline_dist_sequential_missingvalues(
n_time, diagline, mv_indices, embedding, eps, dim)
else:
_diagline_dist_sequential(
n_time, diagline, embedding, eps, dim)
else:
raise NotImplementedError(
"Sequential RQA is currently only available for "
"fixed threshold and the supremum metric.")
# Function just runs over the upper triangular matrix
return 2 * diagline
[docs]
@staticmethod
def rejection_sampling(dist, M):
"""
Rejection sampling of discrete frequency distribution.
Use simple rejection sampling algorithm for computing a resampled
version of a given frequency distribution with discrete support.
:type dist: 1D array (integer)
:arg dist: discrete frequency distribution
:arg int M: number of resamplings
:rtype: 1D array (integer)
:return: the resampled frequency distribution.
"""
# Get number of support points
N = len(dist)
# Prepare
resampled_dist = np.zeros(N, dtype=NODE)
# Prescribed distribution
dist = to_cy(dist, DFIELD)
# Normalize distribution
dist /= dist.sum()
_rejection_sampling(dist, resampled_dist, N, M)
return resampled_dist
[docs]
def resample_diagline_dist(self, M):
"""
Return resampled frequency distribution of diagonal lines.
The resampled frequency distribution can be used for obtaining
confidence bounds on diagonal line based RQA measures. This is
described in detail in [Schinkel2009]_.
Concerning the choice of the number of resamplings, Schinkel et al.
write: "The number of resamplings is not generally agreed upon but
common guidelines suggest values between 800 and 1500."
:arg int M: number of resamplings
:rtype: 1D array (integer)
:return: the resampled frequency distribution of diagonal lines.
"""
# Get original distribution of diagonal lines
diagline = self.diagline_dist()
# Get maximal diagonal line length
L_max = self.max_diaglength()
# Get resampled distribution
if L_max == 0:
resampled_dist = diagline
else:
resampled_dist = np.zeros(len(diagline), dtype=NODE)
resampled_dist[:L_max] = RecurrencePlot.\
rejection_sampling(diagline[:L_max], M)
return resampled_dist
[docs]
def max_diaglength(self):
"""
Return diagonal line-based RQA measure :index:`maximum diagonal line
length <triple: maximum; diagonal; line length>` :math:`L_max`.
:math:`L_max` is defined as the maximal length of a diagonal line in
the recurrence matrix.
:return number: the maximal diagonal line length :math:`L_max`.
"""
return 1 + np.nonzero(self.diagline_dist())[0].max(initial=-1)
[docs]
def determinism(self, l_min=2, resampled_dist=None):
"""
Return diagonal line-based RQA measure :index:`determinism <pair: RQA;
determinism>` :math:`DET`.
:math:`DET` is defined as the ratio of recurrence points that form
diagonal structures (of at least length :math:`l_min`) to all
recurrence points.
:arg number l_min: The minimum diagonal line length.
:type resampled_dist: 1D array (integer)
:arg resampled_dist: resampled frequency distribution of diagonal lines
:return number: the determinism :math:`DET`.
"""
# Use resampled distribution of diagonal lines if provided
diagline = (self.diagline_dist() if resampled_dist is None
else resampled_dist)
n_time = self.N
# Number of recurrence points that form diagonal structures (of at
# least length l_min)
partial_sum = np.arange(l_min, n_time+1) @ diagline[l_min-1:]
# Number of all recurrence points that form diagonal lines (except
# the main diagonal)
full_sum = np.arange(1, n_time+1) @ diagline
return partial_sum / float(full_sum + self._epsilon)
[docs]
def average_diaglength(self, l_min=2, resampled_dist=None):
"""
Return diagonal line-based RQA measure :index:`average diagonal line
length <triple: average; diagonal; line length>` :math:`L`.
:math:`L` is defined as the average length of diagonal lines (of at
least length :math:`l_min`).
:arg number l_min: The minimum diagonal line length.
:type resampled_dist: 1D array (integer)
:arg resampled_dist: resampled frequency distribution of diagonal lines
:return number: the average diagonal line length :math:`L`.
"""
# Use resampled distribution of diagonal lines if provided
diagline = (self.diagline_dist() if resampled_dist is None
else resampled_dist)
n_time = self.N
# Number of recurrence points that form diagonal structures (of at
# least length l_min)
partial_sum = np.arange(l_min, n_time+1) @ diagline[l_min-1:]
# Total number of diagonal lines of at least length l_min
number_diagline = diagline[l_min-1:].sum()
return partial_sum / float(number_diagline + self._epsilon)
[docs]
def diag_entropy(self, l_min=2, resampled_dist=None):
"""
Return diagonal line-based RQA measure :index:`diagonal line entropy
<pair: diagonal; line entropy>` :math:`ENTR`.
:math:`ENTR` is defined as the entropy of the probability to find a
diagonal line of exactly length l in the RP - reflects the complexity
of the RP with respect to diagonal lines.
:arg number l_min: The minimal diagonal line length.
:type resampled_dist: 1D array (integer)
:arg resampled_dist: resampled frequency distribution of diagonal lines
:return number: the diagonal line-based entropy :math:`ENTR`.
"""
# Use resampled distribution of diagonal lines if provided
diagline = (self.diagline_dist() if resampled_dist is None
else resampled_dist)
# Creates a reduced array of the values (not 0) of the diagonal line
# length (langer than l_min)
diagline = diagline[l_min-1:]
diagline = np.extract(diagline != 0, diagline)
# Normalized array of the number of all diagonal lines = probability
# of diagonal line length
diagnorm = diagline / float(diagline.sum() + self._epsilon)
return -(diagnorm * np.log(diagnorm)).sum()
#
# RQA measures based on black vertical lines
#
[docs]
@Cached.method(attrs=(
"metric", "threshold", "missing_values", "sparse_rqa"))
def vertline_dist(self):
"""
Return the :index:`frequency distribution of vertical line lengths
<triple: frequency distribution; vertical; line length>`
:math:`P(v-1)`.
Note that entry :math:`P(v-1)` contains the number of
:index:`vertical lines <pair: vertical; lines>` of length :math:`v`.
Thus, :math:`P(0)` counts lines of length :math:`1`,
:math:`P(1)` counts lines of length :math:`2`, asf.
:rtype: 1D array (int32)
:return: the frequency distribution of vertical line lengths
:math:`P(v-1)`.
"""
# Prepare
n_time = self.N
vertline = np.zeros(n_time, dtype=NODE)
if not self.sparse_rqa:
# Get recurrence matrix
recmat = self.recurrence_matrix()
if self.missing_values:
mv_indices = self.missing_value_indices
_vertline_dist_missingvalues(
n_time, vertline, recmat, mv_indices)
else:
_vertline_dist(n_time, vertline, recmat)
# Calculations for sequential RQA
elif self.metric == "supremum" and self.threshold is not None:
# Get embedding
embedding = self.embedding
# Get time series dimension
dim = embedding.shape[1]
# Get threshold
eps = float(self.threshold)
if self.missing_values:
mv_indices = self.missing_value_indices
_vertline_dist_sequential_missingvalues(
n_time, vertline, mv_indices, embedding, eps, dim)
else:
_vertline_dist_sequential(
n_time, vertline, embedding, eps, dim)
else:
raise NotImplementedError(
"Sequential RQA is currently only available for "
"fixed threshold and the supremum metric.")
# Function covers the whole recurrence matrix
return vertline
[docs]
def resample_vertline_dist(self, M):
"""
Return resampled frequency distribution of vertical lines.
The resampled frequency distribution can be used for obtaining
confidence bounds on vertical line based RQA measures. This is
described in detail in [Schinkel2009]_.
Concerning the choice of the number of resamplings, Schinkel et al.
write: "The number of resamplings is not generally agreed upon but
common guidelines suggest values between 800 and 1500."
:arg int M: number of resamplings
:rtype: 1D array (integer)
:return: the resampled frequency distribution of vertical lines.
"""
# Get original distribution of vertical lines
vertline = self.vertline_dist()
# Get maximal vertical line length
L_max = self.max_vertlength()
# Get resampled distribution
if L_max == 0:
resampled_dist = vertline
else:
resampled_dist = np.zeros(len(vertline), dtype=NODE)
resampled_dist[:L_max] = RecurrencePlot.\
rejection_sampling(vertline[:L_max], M)
return resampled_dist
[docs]
def max_vertlength(self):
"""
Return vertical line-based RQA measure :index:`maximal vertical line
length <triple: maximum; vertical; line length>` :math:`V_max`.
:math:`V_max` is defined as the maximal length of a vertical line of
the recurrence matrix.
:return number: the maximal vertical line length :math:`V_max`.
"""
return 1 + np.nonzero(self.vertline_dist())[0].max(initial=-1)
[docs]
def laminarity(self, v_min=2, resampled_dist=None):
"""
Return vertical line-based RQA measure :index:`laminarity` :math:`LAM`.
:math:`LAM` is defined as the ratio of recurrence points that form
vertical structures (of at least length :math:`v_min`) to all
recurrence points.
:arg number v_min: The minimal vertical line length.
:type resampled_dist: 1D array (integer)
:arg resampled_dist: resampled frequency distribution of vertical lines
:return number: the laminarity :math:`LAM`.
"""
# Use resampled distribution of vertical lines if provided
vertline = (self.vertline_dist() if resampled_dist is None
else resampled_dist)
n_time = self.N
# Number of recurrence points that form vertical structures (of at
# least length v_min)
partial_sum = np.arange(v_min, n_time+1) @ vertline[v_min-1:]
# Number of all recurrence points that form vertical lines
full_sum = np.arange(1, n_time+1) @ vertline
return partial_sum / float(full_sum + self._epsilon)
[docs]
def average_vertlength(self, v_min=2, resampled_dist=None):
"""
Return vertical line-based RQA measure :index:`average vertical line
length <triple: average; vertical; line length>` :math:`TT`.
:math:`TT` is defined as the average vertical line length (of at least
length :math:`v_min`) and is also called :index:`trapping time`
:math:`TT`.
:arg number v_min: The minimal vertical line length.
:type resampled_dist: 1D array (integer)
:arg resampled_dist: resampled frequency distribution of vertical lines
:return number: the trapping time :math:`TT`.
"""
# Use resampled distribution of vertical lines if provided
vertline = (self.vertline_dist() if resampled_dist is None
else resampled_dist)
n_time = self.N
# Number of recurrence points that form vertical structures (of at
# least length v_min)
partial_sum = np.arange(v_min, n_time+1) @ vertline[v_min-1:]
# Total number of vertical lines of at least length v_min
number_vertline = vertline[v_min-1:].sum()
return partial_sum / (float(number_vertline) + self._epsilon)
[docs]
def trapping_time(self, v_min=2, resampled_dist=None):
"""
Alias for :meth:`average_vertlength` (see description there).
"""
return self.average_vertlength(v_min, resampled_dist)
[docs]
def vert_entropy(self, v_min=2, resampled_dist=None):
"""
Return vertical line-based RQA measure :index:`vertical line entropy
<pair: vertical; line entropy>`.
It is defined as the entropy of the probability to find a vertical line
of exactly length l in the RP - reflects the complexity of the RP with
respect to vertical lines.
:arg int v_min: The minimal vertical line length.
:type resampled_dist: 1D array (integer)
:arg resampled_dist: resampled frequency distribution of vertical lines
:return number: the vertical line-based entropy.
"""
# Use resampled distribution of vertical lines if provided
if resampled_dist is None:
vertline = self.vertline_dist()
else:
vertline = resampled_dist
# Creates a reduced array of the values (not 0) of the vertical line
# length (langer than v_min)
vertline = vertline[v_min-1:]
vertline = np.extract(vertline != 0, vertline)
# Normalized array of the number of all vertical lines = probability
# of vertical line length
vertline_normed = vertline / float(vertline.sum() + self._epsilon)
return -(vertline_normed * np.log(vertline_normed)).sum()
#
# RQA measures based on white vertical lines
#
[docs]
def white_vertline_dist(self):
"""
Return the :index:`frequency distribution of white vertical line
lengths <triple: frequency distribution; white vertical; line length>`
:math:`P(w-1)`.
Note that entry :math:`P(w-1)` contains the number of
:index:`white vertical lines <pair: white vertical; lines>` of length
:math:`w`. Thus, :math:`P(0)` counts lines of length :math:`1`,
:math:`P(1)` counts lines of length :math:`2`, asf.
The length of a white vertical line in a recurrence plot corresponds to
the time the system takes to return close to an earlier state.
:rtype: 1D array (int32)
:return: the frequency distribution of white vertical line lengths
:math:`P(w-1)`.
"""
R = self.recurrence_matrix()
n_time = self.N
white_vertline = np.zeros(n_time, dtype=NODE)
_white_vertline_dist(n_time, white_vertline, R)
# Function covers the whole recurrence matrix
return white_vertline
[docs]
def max_white_vertlength(self):
"""
Return white vertical line-based RQA measure :index:`maximal white
vertical line length <triple: maximum; white vertical; line length>`.
It is defined as the maximal length of a white vertical line of
the recurrence matrix and corresponds to the maximum recurrence time
occuring in the time series.
:return number: the maximal white vertical line length.
"""
return 1 + np.nonzero(self.white_vertline_dist())[0].max(initial=-1)
[docs]
def average_white_vertlength(self, w_min=1):
"""
Return white vertical line-based RQA measure :index:`average white
vertical line length <triple: average; white vertical; line length>`.
It is defined as the average white vertical line length (of at least
length :math:`w_min`) and is also called :index:`mean recurrence time`.
Reference: [Ngamga2007]_.
:arg number w_min: The minimal white vertical line length.
:return number: the mean recurrence time.
"""
white_vertline = self.white_vertline_dist()
n_time = self.N
# Number of recurrence points that form white vertical structures
# (of at least length w_min)
partial_sum = np.arange(w_min, n_time+1) @ white_vertline[w_min-1:]
# Total number of white vertical lines of at least length v_min
number_white_vertline = white_vertline[w_min-1:].sum()
return partial_sum / float(number_white_vertline + self._epsilon)
[docs]
def mean_recurrence_time(self, w_min=1):
"""
Alias for :meth:`average_white_vertlength` (see description there).
"""
return self.average_white_vertlength(w_min)
[docs]
def white_vert_entropy(self, w_min=1):
"""
Return white vertical line-based RQA measure :index:`white vertical
line entropy <pair: white vertical; line entropy>`.
It is defined as the entropy of the probability to find a white
vertical line of exactly length l in the RP - reflects the complexity
of the RP with respect to white vertical lines (recurrence times).
:arg int w_min: Minimal white vertical line length (recurrence time).
:return number: the white vertical line-based entropy.
"""
# Creates a reduced array of the values (not 0) of the vertical line
# length (langer than v_min)
white_vertline = self.white_vertline_dist()
white_vertline = white_vertline[w_min-1:]
white_vertline = np.extract(white_vertline != 0, white_vertline)
# Normalized array of the number of all vertical lines = probability
# of vertical line length
white_vertline_normed = white_vertline / float(
white_vertline.sum() + self._epsilon)
return -(white_vertline_normed * np.log(white_vertline_normed)).sum()
#
# Methods for recurrence-based surrogates
#
[docs]
def twins(self, min_dist=7):
"""
Return list of the :index:`twins <pair: twins; recurrence plot>` of
each state vector based on the recurrence matrix.
Two state vectors are said to be twins if they share the same
recurrences, i.e., if the corresponding rows or columns in the
recurrence plot are identical.
References: [Thiel2006]_, [Marwan2007]_.
:arg number min_dist: The minimum temporal distance for twins.
:return [[number]]: the list of twins for each state vector in the time
series.
"""
if self.silence_level <= 1:
print("Finding twins based on recurrence matrix...")
# Initialize
twins = []
N = self.N
# Get current recurrence matrix
R = self.recurrence_matrix()
# Get number of neighbors for each state vector
nR = R.sum(axis=0)
_twins_r(min_dist, N, R, nR, twins)
return twins
[docs]
def twin_surrogates(self, n_surrogates=1, min_dist=7):
"""
Generate surrogates based on the current (embedded) time series
:attr:`embedding` using the :index:`twin surrogate` method.
The twins surrogates have the same dimensionality as the (embedded)
trajectory used for constructing the recurrence plot. If scalar
surrogate time series are desired, any component of the twin surrogate
trajectory may be isolated.
Twin surrogates share linear and nonlinear properties with the original
time series, since they correspond to realizations of trajectories of
the same dynamical systems with different initial conditions.
References: [Thiel2006]_ [*], [Marwan2007]_.
:arg number min_dist: The minimum temporal distance for twins.
:arg int n_surrogates: The number of twin surrogate trajectories to be
returned.
:rtype: 3D array (surrogate number, time, dimension)
:return: the twin surrogate trajectories.
"""
# The algorithm proceeds in two steps:
# 1. Use the algorithm proposed in [*] to find twins
# 2. Reconstruct one-dimensional twin surrogate time series
if self.silence_level <= 1:
print("Generating twin surrogates...")
# Collect
N = self.N
embedding = self.embedding
dim = embedding.shape[1]
twins = self.twins(min_dist)
return _twin_surrogates_r(n_surrogates, N, dim, twins, embedding)