# This file is part of pyunicorn.
# Copyright (C) 2008--2024 Jonathan F. Donges and pyunicorn authors
# URL: <https://www.pik-potsdam.de/members/donges/software-2/software>
# License: BSD (3-clause)
#
# Please acknowledge and cite the use of this software and its authors
# when results are used in publications or published elsewhere.
#
# You can use the following reference:
# J.F. Donges, J. Heitzig, B. Beronov, M. Wiedermann, J. Runge, Q.-Y. Feng,
# L. Tupikina, V. Stolbova, R.V. Donner, N. Marwan, H.A. Dijkstra,
# and J. Kurths, "Unified functional network and nonlinear time series analysis
# for complex systems science: The pyunicorn package"
"""
Provides classes for the analysis of dynamical systems and time series based
on recurrence plots, including measures of recurrence quantification
analysis (RQA) and recurrence network analysis.
"""
# array object and fast numerics
import numpy as np
from ..core import InteractingNetworks
from ..core._ext.types import to_cy, ADJ, MASK, FIELD
from ._ext.numerics import _visibility_relations_missingvalues, \
_visibility_relations_no_missingvalues, _visibility_relations_horizontal, \
_retarded_local_clustering, _advanced_local_clustering
#
# Class definitions
#
[docs]
class VisibilityGraph(InteractingNetworks):
"""
Class VisibilityGraph for generating and analyzing visibility graphs of
time series.
Visibility graphs were initially applied for time series analysis by
[Lacasa2008]_.
"""
#
# Internal methods
#
[docs]
def __init__(self, time_series, timings=None, missing_values=False,
horizontal=False, silence_level=0):
"""
Missing values are handled as infinite values, effectively separating
the visibility graph into different disconnected components.
.. note::
Missing values have to be marked by the Numpy NaN flag!
:type time_series: 2D array (time, dimension)
:arg time_series: The time series to be analyzed, can be scalar or
multi-dimensional.
:arg str timings: Timings of the observations in :attr:`time_series`.
:arg bool missing_values: Toggle special treatment of missing values in
:attr:`time_series`.
:arg bool horizontal: Indicates whether a horizontal visibility
relation is used.
:arg number silence_level: Inverse level of verbosity of the object.
"""
# Set silence_level
self.silence_level = silence_level
"""The inverse level of verbosity of the object."""
# Set missing_values flag
self.missing_values = missing_values
"""Controls special treatment of missing values in
:attr:`time_series`."""
# Store time series
self.time_series = to_cy(time_series, FIELD)
"""The time series from which the visibility graph is constructed."""
if timings is not None:
timings = to_cy(timings, FIELD)
else:
timings = np.arange(len(time_series), dtype=FIELD)
# Store timings
self.timings = timings
"""The timimgs of the time series data points."""
# Get missing value indices
if self.missing_values:
self.missing_value_indices = np.isnan(self.time_series)
# Determine visibility relations
if not horizontal:
A = self.visibility_relations()
else:
A = self.visibility_relations_horizontal()
# Initialize Network object
InteractingNetworks.__init__(self, A, directed=False,
silence_level=silence_level)
[docs]
def __str__(self):
"""
Returns a string representation.
"""
return ("VisibilityGraph: "
f"time series shape {self.time_series.shape}.\n"
f"{InteractingNetworks.__str__(self)}")
#
# Visibility methods
#
[docs]
def visibility_relations(self):
"""
Returns visibility between all nodes of self.timeseries
:rtype: 2D array of MASK
"""
if self.silence_level <= 1:
print("Calculating visibility relations...")
# Prepare
x = self.time_series
t = self.timings
N = len(self.time_series)
A = np.zeros((N, N), dtype=MASK)
if self.missing_values:
mv_indices = self.missing_value_indices
_visibility_relations_missingvalues(x, t, N, A, mv_indices)
else:
_visibility_relations_no_missingvalues(x, t, N, A)
return A
# FIXME: There is no option for missing values
[docs]
def visibility_relations_horizontal(self):
"""
Returns horizontal visibility between all nodes of self.timeseries
:rtype: 2D array of MASK
"""
if self.silence_level <= 1:
print("Calculating horizontal visibility relations...")
# Prepare
x = self.time_series
N = len(self.time_series)
A = np.zeros((N, N), dtype=MASK)
_visibility_relations_horizontal(x, N, A)
return A
#
# Specific measures for visibility graphs
#
[docs]
def visibility(self, node1, node2):
"""
Returns the visibility between node 1 and 2 as boolean.
:arg int node1: node index of node 1
:arg int node2: node index of node 2
:rtype: bool
"""
return self.adjacency[node1, node2]
[docs]
def visibility_single(self, node):
"""
Returns the visibility between all nodes of self.time_series and node
as array of booleans.
:arg int node: node index of the node
:rtype: 1D array of bool
"""
return self.adjacency[node, :]
[docs]
def retarded_degree(self):
"""Return number of neighbors in the past of a node."""
# Prepare
retarded_degree = np.zeros(self.N)
A = self.adjacency
for i in range(self.N):
retarded_degree[i] = A[i, :i].sum()
return retarded_degree
[docs]
def advanced_degree(self):
"""Return number of neighbors in the future of a node."""
# Prepare
advanced_degree = np.zeros(self.N)
A = self.adjacency
for i in range(self.N):
advanced_degree[i] = A[i, i:].sum()
return advanced_degree
[docs]
def retarded_local_clustering(self):
"""
Return probability that two neighbors of a node in its past are
connected.
"""
# Prepare
retarded_clustering = np.zeros(self.N)
# Get full adjacency matrix
A = self.adjacency
# Get number of nodes
N = self.N
# Get left degree
retarded_degree = self.retarded_degree()
# Prepare normalization factor
norm = retarded_degree * (retarded_degree - 1) / 2.
_retarded_local_clustering(N, to_cy(A, ADJ), norm, retarded_clustering)
return retarded_clustering
[docs]
def advanced_local_clustering(self):
"""
Return probability that two neighbors of a node in its future are
connected.
"""
# Prepare
advanced_clustering = np.zeros(self.N)
# Get full adjacency matrix
A = self.adjacency
# Get number of nodes
N = self.N
# Get right degree
advanced_degree = self.advanced_degree()
# Prepare normalization factor
norm = advanced_degree * (advanced_degree - 1) / 2.
_advanced_local_clustering(N, to_cy(A, ADJ), norm, advanced_clustering)
return advanced_clustering
[docs]
def retarded_closeness(self):
"""Return average path length to nodes in the past of a node."""
# Prepare
retarded_closeness = np.zeros(self.N)
path_lengths = self.path_lengths()
for i in range(self.N):
retarded_closeness[i] = path_lengths[i, :i].mean() ** (-1)
return retarded_closeness
[docs]
def advanced_closeness(self):
"""Return average path length to nodes in the future of a node."""
# Prepare
advanced_closeness = np.zeros(self.N)
path_lengths = self.path_lengths()
for i in range(self.N):
advanced_closeness[i] = path_lengths[i, i+1:].mean() ** (-1)
return advanced_closeness
[docs]
def retarded_betweenness(self):
"""
Return betweenness of a node with respect to all pairs of nodes in its
past.
"""
# Prepare
retarded_betweenness = np.zeros(self.N)
for i in range(self.N):
retarded_indices = np.arange(i)
retarded_betweenness[i] = self.nsi_betweenness(
sources=retarded_indices, targets=retarded_indices)[i]
return retarded_betweenness
[docs]
def advanced_betweenness(self):
"""
Return betweenness of a node with respect to all pairs of nodes in its
future.
"""
# Prepare
advanced_betweenness = np.zeros(self.N)
for i in range(self.N):
advanced_indices = np.arange(i+1, self.N)
advanced_betweenness[i] = self.nsi_betweenness(
sources=advanced_indices, targets=advanced_indices)[i]
return advanced_betweenness
[docs]
def trans_betweenness(self):
"""
Return betweenness of a node with respect to all pairs of nodes
with one node the past and one node in the future, respectively.
"""
# Prepare
trans_betweenness = np.zeros(self.N)
for i in range(self.N):
retarded_indices = np.arange(i)
advanced_indices = np.arange(i+1, self.N)
trans_betweenness[i] = self.nsi_betweenness(
sources=retarded_indices, targets=advanced_indices)[i]
return trans_betweenness
#
# Measures corrected for boundary effects
#
[docs]
def boundary_corrected_degree(self):
"""Return a weighted degree corrected for trivial boundary effects."""
# Prepare
N_past = np.arange(self.N)
N_future = N_past[::-1]
cdegree = (self.retarded_degree() * N_past
+ self.advanced_degree() * N_future) / float(self.N - 1)
return cdegree
[docs]
def boundary_corrected_closeness(self):
"""
Return a weighted closeness corrected for trivial boundary effects.
"""
# Prepare
N_past = np.arange(self.N)
N_future = N_past[::-1]
ccloseness = (self.N - 1) * (self.retarded_closeness() / N_past
+ self.advanced_closeness() / N_future)
return ccloseness