"""A module for hydrology-related functionality.
Most functions are used in the `swimpy.output` module but they are placed here
to enable """
import warnings
import numpy as np
import pandas as pd
[docs]def NSE(obs, sim):
"""Nash-Sutcliff-Efficiency.
Arguments
---------
obs, sim : same-length 1D array or pandas.Series
"""
# subset to valid so that nans arent incl in sum, list to enforce obs and
# sim are same length, throws error if not
valid = np.isfinite(obs).tolist()
# get sqared mean errors of obs-sim and obs-mean(obs)
simSME = (obs[valid] - sim[valid])**2
obsSME = (obs[valid] - obs[valid].mean())**2
# calculate efficiency
return 1 - (simSME.sum() / obsSME.sum())
[docs]def logNSE(obs, sim):
'''Calculate log Nash-Sutcliffe-Efficiency through the NSE function
Arguments
---------
obs, sim : same-length 1D array or pandas.Series
'''
obs, sim = np.log(obs), np.log(sim)
return NSE(obs, sim)
[docs]def mNSE(obs, sim):
"""Modified NSE weighted by Qobs to emphasise high/flood Q according to:
Y. Hundecha, A. Bardossy (2004)
Arguments
---------
obs, sim : same-length 1D array or pandas.Series
"""
valid = np.isfinite(obs).tolist()
# get sqared mean errors of obs-sim and obs-mean(obs)
simSME = obs[valid] * (obs[valid] - sim[valid])**2
obsSME = obs[valid] * (obs[valid] - obs[valid].mean())**2
# calculate efficiency
return 1 - (simSME.sum() / obsSME.sum())
[docs]def pbias(obs, sim):
"""Calculate water balance of obs and sim in percent.
Arguments
---------
obs, sim : same-length 1D array or pandas.Series
"""
valid = np.isfinite(obs).tolist() # so that nans arent incl in sum
return (sim[valid].sum() / obs[valid].sum() - 1) * 100
[docs]def q_to_runoff(q, area, freq='d'):
'''Convert discharge volume [m^3s^-1] to height [mm].
Arguments
---------
q : scalar | array | pd.Series | pd.DataFrame
Discharge in m3s-2.
area : scalar
Drainage area in km2.
freq : str
Frequency of q, any of d, m, a. Derived from pandas period index if
pandas object parsed.
Returns
-------
<same type as q> : Runoff height in mm.
'''
periods = {'D': 24*60**2, 'M': 24*60**2*30, 'A': 24*60**2*365.24}
try:
time = q.index.freqstr[0]
except AttributeError:
time = freq.upper()
qm = q*periods[time] # in meter per unit time
am = area*10**6 # in sq meter
qmm = (qm/am)*10**3 # ratio in mm
return qmm
[docs]def runoff_coefficient(q, p, area):
'''Calculate runoff coefficient.
Arguments
----------
q : array | pd.Series | pd.DataFrame
Discharge Q[m3s-1]. If objects without PeriodIndex parsed, p is assumed
to be annual sum and q annual mean discharge.
p : <same as q>
Precipitaton [mm]. Should have same frequency as q.
area : scalar
Drainage area in km2.
Returns
-------
<same type as q> : Runoff height in mm.
'''
# convert Q m3 per second to mm over area
try:
f = q.index.freq
except AttributeError:
f = 'a'
qtime = q_to_runoff(q, area, freq=f)
rc = qtime / p
return rc
[docs]def flow_duration(series, nbins=100):
'''Cummulative flow duration from a pandas (discharge) series.
Arguments
---------
series : 1D-array
Discharge series, nans will be removed.
nbins : int
How many regular bins to return.
Returns
-------
pd.Series : Discharge value exceeded X percent (index) of time.
'''
ts = series[np.isfinite(series)]
counts, bins = np.histogram(ts, bins=nbins, density=True)
dense = counts * np.diff(bins)
cumden = np.cumsum(dense[::-1])
bins = bins[-2::-1] # reverse and shorten by one
series = pd.Series(bins, index=cumden*100)
return series
[docs]def peak_over_threshold(q, percentile=1, threshold=None, maxgap=None):
'''An efficient method to identify all peaks over a threshold (POT).
Arguments
---------
q : pd.Series
The discharge series, preferable with a datetime/period index. If index
is not datetime/period, daily frequency is assumed to calculate the
recurrence interval (years).
percentile : number
The percentile threshold of q., e.g. 1 means Q1.
threshold : number, optional
Absolute threshold to use for peak identification.
maxgap : int, optional
Largest gap between two threshold exceedance periods to count as single
flood event. Number of timesteps. If not given, every exceedance is
counted as individual flood event.
Returns
-------
pd.DataFrame :
Peak discharge ordered dataframe with order index and peak q, length,
peak date and recurrence columns.
'''
def find_steps(bools):
# at each day before flood start 1, at each last day a -1
flgi = np.append(0, (bools[:-1].astype(int) - bools[1:].astype(int)))
# remove -1
flgi[flgi < 0] = 0
# make steps
return flgi.cumsum()
# get flood groups index
thresh = threshold or q.dropna().quantile(1 - percentile / 100.)
iflood = (q > thresh).values
flgisteps = find_steps(iflood)
# pretend gaps smaller than maxgap are also floods
if maxgap:
assert type(maxgap) == int
gaps = q[~iflood].groupby(flgisteps[~iflood]).count()
gaps[0] = maxgap
iflood[gaps[flgisteps] < maxgap] = True
flgisteps = find_steps(iflood)
# get floods
qfl = q[iflood]
qpotgrp = qfl.groupby(flgisteps[iflood])
qpot = qpotgrp.max().to_frame('q')
qpot['length'] = qpotgrp.count()
qpot['start_date'] = qpotgrp.apply(lambda df: df.index[0])
qpot['peak_date'] = qpotgrp.idxmax()
qpot['end_date'] = qpotgrp.apply(lambda df: df.index[-1])
qpot.sort_values('q', ascending=False, inplace=True)
qpot.index = np.arange(1, len(qpot) + 1)
nyears = (q.index.year[-1] - q.index.year[0] + 1
if hasattr(q.index, 'year') else len(q.index) / 365.25)
qpot['recurrence'] = float(nyears) / qpot.index
return qpot
[docs]def gumbel_recurrence(q, recurrence):
"""Deprecated! Use dist_recurrence(..., dist='gumbel_r')."""
warnings.warn(gumbel_recurrence.__doc__, DeprecationWarning)
return dist_recurrence(q, recurrence, dist='gumble_r')
[docs]def dist_recurrence(q, recurrence, dist='genextreme', shape=None):
"""Estimate values of given recurrence via any scipy distribution.
Requires the `scipy` package to be installed.
Arguments
---------
q : 1D-array-like
Observed values of distribution without NaNs. E.g. annual max Q.
recurrence : 1D-array-like
Recurrence intervals for which to return values for. Must be > 1.
dist : str
Any valid scipy distribution function. Common flood distributions are:
genextreme, gumbel_r, weibull_min, lognorm, gamma. Refer to:
https://docs.scipy.org/doc/scipy/reference/stats.html
shape : float | None
Fit distribution with fixed shape parameter or not if None. Only valid
if dist actually has a shape parameter.
"""
from scipy import stats
assert hasattr(stats, dist), '%s is not a valid scipy distribution.' % dist
df = getattr(stats, dist)
kw = {}
if shape and df.shapes:
kw['fix_'+df.shapes] = shape
# Estimate distribution parameters
fits = df.fit(q, **kw)
# quantiles of exceedence probability
r = 1 - 1./recurrence
# calculate recurrence/probability values
ppf = df.ppf(r, *fits)
return pd.Series(ppf, index=recurrence)
[docs]def hydrological_year_index(series, doy=None):
"""Return the series with an hydrological year index.
Arguments
---------
series : pd.Series
Discharge time series with DateTime/Period index.
doy : int, optional
Day of year. If not given, the day with the lowest mean Q is used.
"""
assert isinstance(series, pd.Series), \
'series needs to be a pandas.Series instance.'
assert hasattr(series.index, 'dayofyear'), \
'Series index needs to be a DatetimeIndex or PeriodIndex.'
if not doy:
doy = series.groupby(series.index.dayofyear).mean().loc[:365].idxmin()
print('Hydrological year starts on julian day %s' % doy)
# build index
years = series.index.year.values
jdays = series.index.dayofyear.values - doy
# shift year
years[jdays < 0] = years[jdays < 0] - 1
jdays[jdays < 0] = jdays[jdays < 0] + 365
index = pd.MultiIndex.from_arrays([years, jdays], names=['year', 'day'])
return pd.Series(series.values, index=index)