Source code for pyunicorn.eventseries.event_series

# This file is part of pyunicorn.
# Copyright (C) 2008--2024 Jonathan F. Donges and pyunicorn authors
# URL: <https://www.pik-potsdam.de/members/donges/software-2/software>
# License: BSD (3-clause)
#
# Please acknowledge and cite the use of this software and its authors
# when results are used in publications or published elsewhere.
#
# You can use the following reference:
# J.F. Donges, J. Heitzig, B. Beronov, M. Wiedermann, J. Runge, Q.-Y. Feng,
# L. Tupikina, V. Stolbova, R.V. Donner, N. Marwan, H.A. Dijkstra,
# and J. Kurths, "Unified functional network and nonlinear time series analysis
# for complex systems science: The pyunicorn package"

"""
Provides class for event series analysis, namely event synchronization (ES) and
event coincidence analysis (ECA). In addition, a method for the generation of
binary event series from continuous time series data is included.
When instantiating a class, data must either be passed as an event
matrix (for details see below) or as a continuous time series. Using the class,
an ES or ECA matrix can be calculated to generate a climate network using the
EventSeriesClimateNetwork class. Both ES and ECA may be called without
instantiating an object of the class.
Significance levels are provided using analytic calculations using Poisson
point processes as a null model (for ECA only) or a Monte Carlo approach.
"""

from typing import Tuple
from collections.abc import Hashable

import warnings

import numpy as np
from scipy import stats

from ..core.cache import Cached


[docs] class EventSeries(Cached):
[docs] def __init__(self, data, timestamps=None, taumax=np.inf, lag=0.0, threshold_method=None, threshold_values=None, threshold_types=None): """ Initialize an instance of EventSeries. Input data must be a 2D numpy array with time as the first axis and variables as the second axis. Event data is stored as an eventmatrix. Format of eventmatrix: An eventmatrix is a 2D numpy array with the first dimension covering the timesteps and the second dimensions covering the variables. Each variable at a specific timestep is either '1' if an event occured or '0' if it did not, e.g. for 3 variables with 10 timesteps the eventmatrix could look like array([[0, 1, 0], [1, 0, 1], [0, 0, 0], [1, 0, 1], [0, 1, 0], [0, 0, 0], [1, 0, 0], [0, 0, 1], [0, 1, 0], [0, 0, 0]]) If input data is not provided as an eventmatrix, the constructor tries to generate one using the make_event_matrix method. Default keyword arguments are used in this case. :type data: 2D Numpy array [time, variables] :arg data: Event series array or array of non-binary variable values :type timestamps: 1D Numpy array :arg timestamps: Time points of events of data. If not provided, integer values are used :type taumax: float :arg taumax: maximum time difference between two events to be considered synchronous. Caution: For ES, the default is np.inf because of the intrinsic dynamic coincidence interval in ES. For ECA, taumax is a parameter of the method that needs to be defined! :type lag: float :arg lag: extra time lag between the event series :type threshold_method: str 'quantile' or 'value' or 1D numpy array or str 'quantile' or 'value' :arg threshold_method: specifies the method for generating a binary event matrix from an array of continuous time series. Default: None :type threshold_values: 1D Numpy array or float :arg threshold_values: quantile or real number determining threshold for each variable. Default: None. :type threshold_types: str 'above' or 'below' or 1D list of strings 'above' or 'below' :arg threshold_types: Determines for each variable if event is below or above threshold """ if threshold_method is None: # Check if data contains only binary values if len(np.unique(data)) != 2 or not ( np.unique(data) == np.array([0, 1])).all(): raise IOError("Event matrix not in correct format") # Save event matrix self.__T = data.shape[0] self.__N = data.shape[1] self.__eventmatrix = data else: # If data is not in eventmatrix format, use method # make_event_matrix to transform continuous time series to a binary # time series # Allow for wrong axis, i.e. first axis variables and second axis # time if time series have the same length if isinstance(data, np.ndarray): if data.shape[1] > data.shape[0]: data = np.swapaxes(data, 0, 1) self.__eventmatrix = \ self.make_event_matrix(data, threshold_method=threshold_method, threshold_values=threshold_values, threshold_types=threshold_types) self.__T = self.__eventmatrix.shape[0] self.__N = self.__eventmatrix.shape[1] else: raise IOError('Input data is not in event matrix format!') # If no timestamps are given, use integer array indices as timestamps if timestamps is not None: if timestamps.shape[0] != self.__T: raise IOError("Timestamps array has not the same length as" " event matrix!") self.__timestamps = timestamps else: self.__timestamps = np.linspace(0.0, self.__T - 1, self.__T) self.__taumax = float(taumax) self.__lag = float(lag) # save number of events NrOfEvs = np.array(np.sum(self.__eventmatrix, axis=0), dtype=int) self.__nrofevents = NrOfEvs # Dictionary of symmetrization functions for later use self.symmetrization_options = { 'directed': EventSeries._symmetrization_directed, 'symmetric': EventSeries._symmetrization_symmetric, 'antisym': EventSeries._symmetrization_antisym, 'mean': EventSeries._symmetrization_mean, 'max': EventSeries._symmetrization_max, 'min': EventSeries._symmetrization_min }
[docs] def __cache_state__(self) -> Tuple[Hashable, ...]: # The following attributes are assumed immutable: # (__eventmatrix, __timestamps, __taumax, __lag) return ()
[docs] def __str__(self): """ Return a string representation of the EventSeries object. """ return (f"EventSeries: {self.__N} variables, " f"{self.__T} timesteps, taumax: {self.__taumax:.1f}, " f"lag: {self.__lag:.1f}")
def get_event_matrix(self): return self.__eventmatrix
[docs] @staticmethod def _symmetrization_directed(matrix): """ Helper function for symmetrization options :type matrix: 2D numpy array :arg matrix: pairwise ECA/ES scores of data :rtype: 2D numpy array :return: original matrix """ return matrix
[docs] @staticmethod def _symmetrization_symmetric(matrix): """ Helper function for symmetrization options :type matrix: 2D numpy array :arg matrix: pairwise ECA/ES scores of data :rtype: 2D numpy array :return: symmetrized matrix """ return matrix + matrix.T
[docs] @staticmethod def _symmetrization_antisym(matrix): """ Helper function for symmetrization options :type matrix: 2D numpy array :arg matrix: pairwise ECA/ES scores of data :rtype: 2D numpy array :return: antisymmetrized matrix """ return matrix - matrix.T
[docs] @staticmethod def _symmetrization_mean(matrix): """ Helper function for symmetrization options :type matrix: 2D numpy array :arg matrix: pairwise ECA/ES scores of data :rtype: 2D numpy array :return: symmetrized matrix using element-wise mean of matrix and transposed matrix """ return np.mean([matrix, matrix.T], axis=0)
[docs] @staticmethod def _symmetrization_max(matrix): """ Helper function for symmetrization options :type matrix: 2D numpy array :arg matrix: pairwise ECA/ES scores of data :rtype: 2D numpy array :return: symmetrized matrix using element-wise maximum of matrix and transposed matrix """ return np.maximum(matrix, matrix.T)
[docs] @staticmethod def _symmetrization_min(matrix): """ Helper function for symmetrization options :type matrix: 2D numpy array :arg matrix: pairwise ECA/ES scores of data :rtype: 2D numpy array :return: symmetrized matrix using element-wise minimum of matrix and transposed matrix """ return np.minimum(matrix, matrix.T)
[docs] @staticmethod def make_event_matrix(data, threshold_method='quantile', threshold_values=None, threshold_types=None): """ Create a binary event matrix from continuous time series data. Data format is eventmatrix, i.e. a 2D numpy array with first dimension covering time and second dimension covering the values of the variables. :type data: 2D numpy array :arg data: Continuous input data :type threshold_method: str 'quantile' or 'value' or 1D numpy array of strings 'quantile' or 'value' :arg threshold_method: specifies the method for generating a binary event matrix from an array of continuous time series. Default: 'quantile' :type threshold_values: 1D Numpy array or float :arg threshold_values: quantile or real number determining threshold for each variable. Default: None. :type threshold_types: str 'above' or 'below' or 1D list of strings 'above' or 'below' :arg threshold_types: Determines for each variable if event is below or above threshold :rtype: 2D numpy array :return: eventmatrix """ # Check correct format of event matrix if not np.all([len(i) == len(data[0]) for i in data]): warnings.warn("Data does not contain equal number of events") data_axswap = np.swapaxes(data, 0, 1) thresholds = np.zeros(data.shape[1]) # Check if inserted keyword arguments are correct and create parameter # arrays in case only single keywords are used for data with more than # one variable threshold_method = np.array(threshold_method) if threshold_method.shape == (data.shape[1],): if not np.all([i in ['quantile', 'value'] for i in threshold_method]): raise IOError("'threshold_method' must be either 'quantile' or" " 'value' or a 1D array-like object with" " entries 'quantile' or 'value' for each" " variable!") elif not threshold_method.shape: if threshold_method in ['quantile', 'value']: threshold_method = np.array([threshold_method] * data.shape[1]) else: raise IOError("'threshold_method' must be either 'quantile' or" " 'value' or a 1D array-like object with entries" " 'quantile' or 'value' for each variable!") else: raise IOError("'threshold_method' must be either 'quantile' or " "'value' or a 1D array-like object with entries " "'quantile' or 'value' for each variable!") if threshold_values is not None: threshold_values = np.array(threshold_values) if threshold_values.shape == (data.shape[1],): if not np.all([isinstance(i, (float, int)) for i in threshold_values]): raise IOError("'threshold_values' must be either float/int" " or 1D array-like object of float/int for " " each variable!") elif not threshold_values.shape: if isinstance(threshold_values.item(), (int, float)): threshold_values = \ np.array([threshold_values] * data.shape[1]) else: raise IOError("'threshold_values' must be either float/int" " or 1D array-like object of float/int for " "each variable!") else: raise IOError("'threshold_values' must be either float/int or " "1D array-like object of float/int for each " "variable!") else: threshold_values = np.array([None] * data.shape[1]) warnings.warn("No 'threshold_values' given. Median is used by " "default!") if threshold_types is not None: threshold_types = np.array(threshold_types) if threshold_types.shape == (data.shape[1],): if not np.all([i in ['above', 'below'] for i in threshold_types]): raise IOError("'threshold_types' must be either 'above' or" " 'below' or a 1D array-like object with " "entries 'above' or 'below' for each " "variable!") elif not threshold_types.shape: if threshold_types in ['above', 'below']: threshold_types = \ np.array([threshold_types] * data.shape[1]) else: raise IOError("'threshold_types' must be either 'above' or" " 'below' or a 1D array-like object with " "entries 'above' or 'below' for each " "variable!") else: raise IOError("'threshold_types' must be either 'above' or " "'below' or a 1D array-like object with entries " "'above' or 'below' for each variable!") else: threshold_types = np.array([None] * data.shape[1]) warnings.warn("No 'threshold_types' given. If 'threshold_values' " ">= median, 'above' is used by default!") # Go through threshold_method, threshold_value and threshold_type # for each variable and check if input parameters are valid # In case of missing input parameters, try to set default values for i in range(data.shape[1]): if threshold_method[i] == 'quantile': # Check if threshold quantile is between zero and one if threshold_values[i] is not None: if threshold_values[i] > 1.0 or threshold_values[i] < 0.0: raise ValueError("Threshold_value for threshold_method" " 'quantile' must lie between 0.0 and" " 1.0!") # If threshold values are not given, use the median else: threshold_values[i] = 0.5 # Compute threshold value according to quantile thresholds[i] = \ np.quantile(data_axswap[i], threshold_values[i]) # If no threshold_types is given, check if threshold value is # larger or equal median, then 'above' if threshold_types[i] is None: if threshold_values[i] >= 0.5: threshold_types[i] = 'above' else: threshold_types[i] = 'below' if threshold_method[i] == 'value': if threshold_values[i] is None: thresholds[i] = np.median(data_axswap[i]) else: # Check if given threshold values lie within data range if np.max(data_axswap[i]) < threshold_values[i] or \ np.min(data_axswap[i]) > threshold_values[i]: raise IOError("Threshold_value for threshold_method " "'value' must lie within variable " "range!") thresholds[i] = threshold_values[i] if threshold_types[i] is None: if thresholds[i] >= np.median(data_axswap[i]): threshold_types[i] = 'above' else: threshold_types[i] = 'below' # Other methods for thresholding can be easily added here eventmatrix = np.zeros((data.shape[0], data.shape[1])) * (-1) # Iterate through all variables of the data and create event matrix # according to specified methods for t in range(data.shape[0]): for i in range(data.shape[1]): if threshold_types[i] == 'above': if data[t][i] > thresholds[i]: eventmatrix[t][i] = 1 else: eventmatrix[t][i] = 0 elif threshold_types[i] == 'below': if data[t][i] < thresholds[i]: eventmatrix[t][i] = 1 else: eventmatrix[t][i] = 0 return eventmatrix
[docs] @staticmethod def event_synchronization(eventseriesx, eventseriesy, ts1=None, ts2=None, taumax=np.inf, lag=0.0): """ Calculates the directed event synchronization from two event series X and Y using the algorithm described in [Quiroga2002]_, [Odenweller2020]_ :type eventseriesx: 1D Numpy array :arg eventseriesx: Event series containing '0's and '1's :type eventseriesy: 1D Numpy array :arg eventseriesy: Event series containing '0's and '1's :type ts1: 1D Numpy array :arg ts1: Event time array containing time points when events of event series 1 occur, not obligatory :type ts2: 1D Numpy array :arg ts2: Event time array containing time points when events of event series 2 occur, not obligatory :type taumax: float :arg taumax: maximum distance of two events to be counted as synchronous :type lag: float :arg lag: delay between the two event series, the second event series is shifted by the value of lag :rtype: list :return: [Event synchronization XY, Event synchronization YX] """ # Get time indices (type boolean or simple '0's and '1's) # Careful here with datatype, int16 allows for maximum time index 32767 # Get time indices if ts1 is None: ex = np.array(np.where(eventseriesx), dtype='int16') else: ex = np.array([ts1[eventseriesx == 1]], dtype='float') if ts2 is None: ey = np.array(np.where(eventseriesy), dtype='int16') else: ey = np.array([ts2[eventseriesy == 1]], dtype='float') ey = ey + lag lx = ex.shape[1] ly = ey.shape[1] if lx == 0 or ly == 0: # Division by zero in output return np.nan, np.nan if lx in [1, 2] or ly in [1, 2]: # Too few events to calculate return 0., 0. # Array of distances dstxy2 = 2 * (np.repeat(ex[:, 1:-1].T, ly - 2, axis=1) - np.repeat(ey[:, 1:-1], lx - 2, axis=0)) # Dynamical delay diffx = np.diff(ex) diffy = np.diff(ey) diffxmin = np.minimum(diffx[:, 1:], diffx[:, :-1]) diffymin = np.minimum(diffy[:, 1:], diffy[:, :-1]) tau2 = np.minimum(np.repeat(diffxmin.T, ly - 2, axis=1), np.repeat(diffymin, lx - 2, axis=0)) tau2 = np.minimum(tau2, 2 * taumax) # Count equal time events and synchronised events eqtime = dstxy2.size - np.count_nonzero(dstxy2) # Calculate boolean matrices of coincidences Axy = (dstxy2 > 0) * (dstxy2 <= tau2) Ayx = (dstxy2 < 0) * (dstxy2 >= -tau2) # Loop over coincidences and determine number of double counts # by checking at least one event of the pair is also coincided # in other direction countxydouble = countyxdouble = 0 for i, j in np.transpose(np.where(Axy)): countxydouble += np.any(Ayx[i, :]) or np.any(Ayx[:, j]) for i, j in np.transpose(np.where(Ayx)): countyxdouble += np.any(Axy[i, :]) or np.any(Axy[:, j]) # Calculate counting quantities and subtract half of double countings countxy = np.sum(Axy) + 0.5 * eqtime - 0.5 * countxydouble countyx = np.sum(Ayx) + 0.5 * eqtime - 0.5 * countyxdouble norm = np.sqrt((lx - 2) * (ly - 2)) return countxy / norm, countyx / norm
[docs] @staticmethod def event_coincidence_analysis(eventseriesx, eventseriesy, taumax, ts1=None, ts2=None, lag=0.0): """ Event coincidence analysis: Returns the precursor and trigger coincidence rates of two event series X and Y following the algorithm described in [Odenweller2020]_. :type eventseriesx: 1D Numpy array :arg eventseriesx: Event series containing '0's and '1's :type eventseriesy: 1D Numpy array :arg eventseriesy: Event series containing '0's and '1's :type ts1: 1D Numpy array :arg ts1: Event time array containing time points when events of event series 1 occur, not obligatory :type ts2: 1D Numpy array :arg ts2: Event time array containing time points when events of event series 2 occur, not obligatory :type taumax: float :arg taumax: coincidence interval width :type lag: int :arg lag: lag parameter :rtype: list :return: [Precursor coincidence rate XY, Trigger coincidence rate XY, Precursor coincidence rate YX, Trigger coincidence rate YX] """ # Get time indices if ts1 is None: e1 = np.where(eventseriesx)[0] else: e1 = ts1[eventseriesx == 1] if ts2 is None: e2 = np.where(eventseriesy)[0] else: e2 = ts2[eventseriesy == 1] # Count events that cannot be coincided due to lag and delT if not (lag == 0 and taumax == 0): n11 = len(e1[e1 <= e1[0] + lag + taumax]) # Start of es1 n12 = len(e1[e1 >= (e1[-1] - lag - taumax)]) # End of es1 n21 = len(e2[e2 <= e2[0] + lag + taumax]) # Start of es2 n22 = len(e2[e2 >= (e2[-1] - lag - taumax)]) # End of es2 else: n11, n12, n21, n22 = 0, 0, 0, 0 # Instantaneous coincidence # Number of events l1 = len(e1) l2 = len(e2) # Array of all interevent distances dst = (np.array([e1] * l2).T - np.array([e2] * l1)) # Count coincidences with array slicing prec12 = np.count_nonzero( np.any(((dst - lag >= 0) * (dst - lag <= taumax))[n11:, :], axis=1)) trig12 = np.count_nonzero( np.any(((dst - lag >= 0) * (dst - lag <= taumax)) [:, :dst.shape[1] - n22], axis=0)) prec21 = np.count_nonzero(np.any(((-dst - lag >= 0) * (-dst - lag <= taumax))[:, n21:], axis=0)) trig21 = np.count_nonzero( np.any(((-dst - lag >= 0) * (-dst - lag <= taumax)) [:dst.shape[0] - n12, :], axis=1)) # Normalisation and output return (np.float32(prec12) / (l1 - n11), np.float32(trig12) / (l2 - n22), np.float32(prec21) / (l2 - n21), np.float32(trig21) / (l1 - n12))
[docs] def _eca_coincidence_rate(self, eventseriesx, eventseriesy, window_type='symmetric', ts1=None, ts2=None): """ Event coincidence analysis: Returns the coincidence rates of two event series for both directions :type eventseriesx: 1D Numpy array :arg eventseriesx: Event series containing '0's and '1's :type eventseriesy: 1D Numpy array :arg eventseriesy: Event series containing '0's and '1's :type ts1: 1D Numpy array :arg ts1: Event time array containing time points when events of event series 1 occur, not obligatory :type ts2: 1D Numpy array :arg ts2: Event time array containing time points when events of event series 2 occur, not obligatory :type window_type: str {'retarded', 'advanced', 'symmetric'} :arg window_type: Only for ECA. Determines if precursor coincidence rate ('advanced'), trigger coincidence rate ('retarded') or a general coincidence rate with the symmetric interval [-taumax, taumax] are computed ('symmetric'). Default: 'symmetric' :rtype: list :return: Precursor coincidence rates [XY, YX] """ # Get time indices if ts1 is None: e1 = np.where(eventseriesx)[0] else: e1 = ts1[eventseriesx == 1] if ts2 is None: e2 = np.where(eventseriesy)[0] else: e2 = ts2[eventseriesy == 1] lag = self.__lag taumax = self.__taumax # Number of events l1 = len(e1) l2 = len(e2) # Array of all interevent distances dst = (np.array([e1] * l2).T - np.array([e2] * l1)) if window_type == 'advanced': deltaT1 = 0.0 deltaT2 = taumax # Count events that cannot be coincided due to lag and deltaT if not (lag == 0 and taumax == 0): n11 = len(e1[e1 <= (e1[0] + lag + deltaT2)]) # Start of es1 n21 = len(e2[e2 <= (e2[0] + lag + deltaT2)]) # Start of es2 n12, n22 = 0, 0 else: n11, n12, n21, n22 = 0, 0, 0, 0 # Instantaneous coincidence # Count coincidences with array slicing coincidence12 = np.count_nonzero( np.any(((dst - lag >= deltaT1) * (dst - lag <= deltaT2)) [n11:, :], axis=1)) coincidence21 = np.count_nonzero( np.any(((-dst - lag >= deltaT1) * (-dst - lag <= deltaT2)) [:, n21:], axis=0)) elif window_type == 'retarded': deltaT1 = 0.0 deltaT2 = taumax # Count events that cannot be coincided due to lag and delT if not (lag == 0 and taumax == 0): n11 = 0 # Start of es1 n12 = len(e1[e1 >= (e1[-1] - lag - deltaT2)]) # End of es1 n21 = 0 # Start of es2 n22 = len(e2[e2 >= (e2[-1] - lag - deltaT2)]) # End of es2 else: n11, n12, n21, n22 = 0, 0, 0, 0 # Instantaneous coincidence # Count coincidences with array slicing coincidence12 = np.count_nonzero( np.any(((dst - lag >= deltaT1) * (dst - lag <= deltaT2)) [:, :dst.shape[1] - n22], axis=0)) coincidence21 = np.count_nonzero( np.any(((-dst - lag >= deltaT1) * (-dst - lag <= deltaT2)) [:dst.shape[0] - n12, :], axis=1)) return ((np.float32(coincidence12) / (l2 - n22), np.float32(coincidence21) / (l1 - n12))) elif window_type == 'symmetric': deltaT1, deltaT2 = -taumax, taumax # Count events that cannot be coincided due to lag and delT if not (lag == 0 and taumax == 0): n11 = len(e1[e1 <= (e1[0] + lag + deltaT2)]) # Start of es1 n12 = len(e1[e1 >= (e1[-1] - lag + deltaT1)]) # End of es1 n21 = len(e2[e2 <= (e2[0] + lag + deltaT2)]) # Start of es2 n22 = len(e2[e2 >= (e2[-1] - lag + deltaT1)]) # End of es2 else: n11, n12, n21, n22 = 0, 0, 0, 0 # Instantaneous coincidence # Count coincidences with array slicing coincidence12 = np.count_nonzero( np.any(((dst - lag >= deltaT1) * (dst - lag <= deltaT2)) [n11:dst.shape[0]-n12, :], axis=1)) coincidence21 = np.count_nonzero( np.any(((-dst - lag >= deltaT1) * (-dst - lag <= deltaT2)) [:, n21:dst.shape[1]-n22], axis=0)) else: raise IOError("Parameter 'window_type' must be 'advanced'," " 'retarded' or 'symmetric'!") # Normalisation and output return (np.float32(coincidence12) / (l1 - n11 - n12), np.float32(coincidence21) / (l2 - n21 - n22))
[docs] def event_series_analysis(self, method='ES', symmetrization='directed', window_type='symmetric'): """ Returns the NxN matrix of the chosen event series measure where N is the number of variables. The entry [i, j] denotes the event synchronization or event coincidence analysis from variable j to variable i. According to the 'symmetrization' parameter the event series measure matrix is symmetrized or not. The event coincidence rate of ECA is calculated according to the formula: r(Y|X, DeltaT1, DeltaT2, tau) = 1/N_X sum_{i=1}^{N_X} Theta[sum{j=1}^{N_Y} 1_[DeltaT1, DeltaT2] (t_i^X - (t_j^Y + tau))], where X is the first input event series, Y the second input event series, N_X the number of events in X, DeltaT1 and DeltaT2 the given coincidence interval boundaries, tau the lag between X and Y, Theta the Heaviside function and 1 the indicator function. :type method: str 'ES' or 'ECA' :arg method: determines if ES or ECA should be used :type symmetrization: str {'directed', 'symmetric', 'antisym', 'mean', 'max', 'min'} for ES, str {'directed', 'mean', 'max', 'min'} for ECA :arg symmetrization: determines if and which symmetrisation method should be used for the ES/ECA score matrix :type window_type: str {'retarded', 'advanced', 'symmetric'} :arg window_type: Only for ECA. Determines if precursor coincidence rate ('advanced'), trigger coincidence rate ('retarded') or a general coincidence rate with the symmetric interval [-taumax, taumax] are computed ('symmetric'). Default: 'symmetric' :rtype: 2D numpy array :return: pairwise event synchronization or pairwise coincidence rates symmetrized according to input parameter 'symmetrization' """ if method not in ['ES', 'ECA']: raise IOError("'method' parameter must be 'ECA' or 'ES'!") directedESMatrix = [] if method == 'ES': if symmetrization not in ['directed', 'symmetric', 'antisym', 'mean', 'max', 'min']: raise IOError("'symmetrization' parameter must be 'directed', " "'symmetric', 'antisym', 'mean', 'max' or" "'min' for event synchronization!") directedESMatrix = self._ndim_event_synchronization() elif method == 'ECA': if self.__taumax is np.inf: raise ValueError("'delta' must be a finite time window to" " determine event coincidence!") if symmetrization not in ['directed', 'mean', 'max', 'min']: raise IOError("'symmetrization' parameter must be 'directed', " "'mean', 'max' or 'min' for event" "coincidence analysis!") if window_type not in ['retarded', 'advanced', 'symmetric']: raise IOError("'window_type' must be 'retarded'," "'advanced' or 'symmetric'!") directedESMatrix = \ self._ndim_event_coincidence_analysis(window_type=window_type) # Use symmetrization functions for symmetrization and return result return self.symmetrization_options[symmetrization](directedESMatrix)
[docs] @Cached.method() def _ndim_event_synchronization(self): """ Compute NxN event synchronization matrix [i,j] with event synchronization from j to i without symmetrization. :rtype: NxN numpy array where N is the number of variables of the eventmatrix :return: event synchronization matrix """ # Get instance variables eventmatrix = self.__eventmatrix timestamps = self.__timestamps lag = self.__lag taumax = self.__taumax directed = np.zeros((self.__N, self.__N)) for i in range(0, self.__N): for j in range(i + 1, self.__N): directed[i, j], directed[j, i] = \ self.event_synchronization(eventmatrix[:, i], eventmatrix[:, j], ts1=timestamps, ts2=timestamps, taumax=taumax, lag=lag) return directed
[docs] def _ndim_event_coincidence_analysis(self, window_type='symmetric'): """ Computes NxN event coincidence matrix of event coincidence rate :type window_type: str {'retarded', 'advanced', 'symmetric'} :arg window_type: Only for ECA. Determines if precursor coincidence rate ('advanced'), trigger coincidence rate ('retarded') or a general coincidence rate with the symmetric interval [-taumax, taumax] are computed ('symmetric'). Default: 'symmetric' :rtype: NxN numpy array where N is the number of variables of the eventmatrix :return: event coincidence matrix """ eventmatrix = self.__eventmatrix directed = np.zeros((self.__N, self.__N)) timestamps = self.__timestamps for i in range(0, self.__N): for j in range(i + 1, self.__N): directed[i, j], directed[j, i] = \ self._eca_coincidence_rate(eventmatrix[:, i], eventmatrix[:, j], window_type=window_type, ts1=timestamps, ts2=timestamps) return directed
[docs] def _empirical_percentiles(self, method=None, n_surr=1000, symmetrization='directed', window_type='symmetric'): """ Compute p-values of event synchronisation (ES) and event coincidence analysis (ECA) using a Monte-Carlo approach. Surrogates are obtained by shuffling the event series. ES/ECA scores of the surrogate event series are computed and p-values are the empirical percentiles of the original event series compared to the ES/ECA scores of the surrogates. :type method: str 'ES' or 'ECA' :arg method: determines if ES or ECA should be used :type n_surr: int :arg n_surr: number of surrogates for Monte-Carlo method :type symmetrization: str {'directed', 'symmetric', 'antisym', 'mean', 'max', 'min'} for ES, str {'directed', 'mean', 'max', 'min'} for ECA :arg symmetrization: determines if and which symmetrisation method should be used for the ES/ECA score matrix :type window_type: str {'retarded', 'advanced', 'symmetric'} :arg window_type: Only for ECA. Determines if precursor coincidence rate ('advanced'), trigger coincidence rate ('retarded') or a general coincidence rate with the symmetric interval [-taumax, taumax] are computed ('symmetric'). Default: 'symmetric' :rtype: 2D numpy array :return: p-values of the ES/ECA scores for all """ # Get instance variables lag = self.__lag deltaT = self.__taumax event_series_result = \ self.event_series_analysis(method=method, symmetrization=symmetrization, window_type=window_type) surrogates = np.zeros((n_surr, self.__N, self.__N)) shuffled_es = self.__eventmatrix.copy() # For each surrogate, shuffle each event series and perform ES/ECA # analysis for n in range(n_surr): for i in range(self.__N): np.random.shuffle(shuffled_es[:, i]) if method == 'ES': for i in range(0, self.__N): for j in range(i + 1, self.__N): surrogates[n, i, j], surrogates[n, j, i] = \ self.event_synchronization(shuffled_es[:, i], shuffled_es[:, j], taumax=deltaT, lag=lag) elif method == 'ECA': for i in range(0, self.__N): for j in range(i + 1, self.__N): surrogates[n, i, j], surrogates[n, j, i] = \ self._eca_coincidence_rate(shuffled_es[:, i], shuffled_es[:, j], window_type=window_type) # Symmetrize according to symmetry keyword argument surrogates[n, :, :] = \ self.symmetrization_options[ symmetrization](surrogates[n, :, :]) # Calculate significance level via strict empirical percentiles for # each event series pair empirical_percentiles = np.zeros((self.__N, self.__N)) for i in range(self.__N): for j in range(self.__N): empirical_percentiles[i, j] = \ stats.percentileofscore(surrogates[:, i, j], event_series_result[i][j], kind='strict') / 100 return empirical_percentiles
[docs] def event_analysis_significance(self, method=None, surrogate='shuffle', n_surr=1000, symmetrization='directed', window_type='symmetric'): """ Returns significance levels (1 - p-values) for event synchronisation (ES) and event coincidence analysis (ECA). For ECA, there is an analytic option providing significance levels based on independent Poisson processes. The 'shuffle' option uses a Monte-Carlo approach, calculating ES or ECA scores for surrogate event time series obtained by shuffling the original event time series. The significance levels are the empirical percentiles of the ES/ECA scores of the original event series compared with the scores of the surrogate data. :type method: str 'ES' or 'ECA' :arg method: determines if ES or ECA should be used :type surrogate: str 'analytic' or 'shuffle' :arg surrogate: determines if p-values should be calculated using a Monte-Carlo method or (only for ECA) an analytic Poisson process null model :type n_surr: int :arg n_surr: number of surrogates for Monte-Carlo method :type symmetrization: str {'directed', 'symmetric', 'antisym', 'mean', 'max', 'min'} for ES, str {'directed', 'mean', 'max', 'min'} for ECA :arg symmetrization: determines if and which symmetrisation method should be used for the ES/ECA score matrix :type window_type: str {'retarded', 'advanced', 'symmetric'} :arg window_type: Only for ECA. Determines if precursor coincidence rate ('advanced'), trigger coincidence rate ('retarded') or a general coincidence rate with the symmetric interval [-taumax, taumax] are computed ('symmetric'). Default: 'symmetric' :rtype: 2D numpy array :return: significance levels of the ES/ECA scores for all pairs of event series in event matrix """ if method not in ['ES', 'ECA']: raise IOError("'method' parameter must be 'ECA' or 'ES'!") if surrogate not in ['analytic', 'shuffle']: raise IOError("'surrogate' parameter must be 'analytic' or " "'shuffle'!") # Get instance variables deltaT = self.__taumax lag = self.__lag if method == 'ECA': if symmetrization not in ['directed', 'mean', 'max', 'min']: raise IOError("'symmetrization' parameter must be 'directed', " "'mean', 'max' or 'min' for event" "coincidence analysis!") if window_type not in ['retarded', 'advanced', 'symmetric']: raise IOError("'window_type' must be 'retarded'," "'advanced' or 'symmetric'!") if surrogate == 'analytic': if symmetrization != 'directed': raise IOError("'symmetrization' parameter should be" "'directed' for analytical calculation of" "significance levels!") if window_type not in ['retarded', 'advanced']: raise IOError("'window_type' parameter must be 'retarded'" " or 'advanced' for analytical computation" " of significance levels!") # Compute ECA scores of stored event matrix directedECAMatrix = \ self._ndim_event_coincidence_analysis( window_type=window_type) significance_levels = np.zeros((self.__N, self.__N)) NEvents = self.__nrofevents if window_type == 'advanced': for i in range(self.__N): for j in range(i + 1, self.__N): # Compute Poisson probability p p = deltaT / (float(self.__timestamps[-1]) - lag) # Compute number of precursor coincidences 2->1 K12 = int(directedECAMatrix[i][j] * NEvents[i]) # Compute probability of at least K12 precursor # events pvalue = 0.0 for K_star in range(K12, NEvents[i] + 1): pvalue += \ stats.binom.pmf(K_star, NEvents[i], 1.0 - pow(1.0 - p, NEvents[j])) significance_levels[i][j] = 1.0 - pvalue # Compute number of precursor coincidence 1->2 K21 = int(directedECAMatrix[j][i] * NEvents[j]) # Compute probability of at least K21 precursor # events pvalue = 0.0 for K_star in range(K21, NEvents[j] + 1): pvalue += \ stats.binom.pmf(K_star, NEvents[j], 1.0 - pow(1.0 - p, NEvents[i])) significance_levels[j][i] = 1.0 - pvalue return significance_levels # If window_type != 'advanced', it must be 'retarded' else: for i in range(self.__N): for j in range(i + 1, self.__N): p = deltaT / (float(self.__timestamps[-1]) - lag) # Compute probability of at least K12 trigger # events # Compute number of trigger coincidence 2->1 K12 = int(directedECAMatrix[i][j] * NEvents[j]) # Compute Poisson probability p pvalue = 0.0 for K_star in range(K12, NEvents[j]): pvalue += \ stats.binom.pmf(K_star, NEvents[j], 1.0 - pow(1.0 - p, NEvents[i])) significance_levels[i][j] = 1.0 - pvalue # Compute number of trigger coincidence 1->2 K21 = int(directedECAMatrix[j][i] * NEvents[i]) # Compute probability of at least K21 trigger # events pvalue = 0.0 for K_star in range(K21, NEvents[i]): pvalue += \ stats.binom.pmf(K_star, NEvents[i], 1.0 - pow(1.0 - p, NEvents[j])) significance_levels[j][i] = 1.0 - pvalue return significance_levels # If surrogate is not 'analytic', it must be 'shuffle' else: return \ self._empirical_percentiles(method='ECA', n_surr=n_surr, symmetrization=symmetrization, window_type=window_type) elif method == 'ES': if surrogate != 'shuffle': raise IOError("Analytical calculation of significance level is" " only possible for event coincidence analysis!") if symmetrization not in ['directed', 'symmetric', 'antisym', 'mean', 'max', 'min']: raise IOError("'symmetrization' parameter must be 'directed', " "'symmetric', 'antisym', 'mean', 'max' or" "'min' for event synchronization!") return \ self._empirical_percentiles(method='ES', n_surr=n_surr, symmetrization=symmetrization) else: return None