import contextlib
from dateutil.tz import tzlocal
import numpy as np
import os
import pandas as pd
import sys
import warnings
from datetime import date, datetime
from scipy import stats
[docs]def date_range(df, start, end):
"""Extract out a certain date range from a DataFrame.
Extract out a certain data range from a dataframe. The index must be the
dates, and the index must be sorted.
"""
# TODO: is this needed? Do normal pandas operation, timestamp
# checking is not really needed (and limits the formats that can
# be used, pandas can take more than pd.Timestamp)
# Move this function to utils
# Deal with pandas timestamp compatibility
if(start!=None):
assert isinstance(start,pd.Timestamp),"start not given in timestamp format"
else:
start = df.index[0]
if(end!= None):
assert isinstance(end,pd.Timestamp),"end not given in timestamp format"
else:
end = df.index[-1]
df_new = df.loc[start:end]
return df_new
#SYSTEM_TZ = tzlocal() # the operating system timezone - for sqlite output compat
SYSTEM_TZ = 'Europe/Helsinki'
TZ = tzlocal()
TZ = 'Europe/Helsinki'
[docs]def set_tz(tz):
"""Globally set the preferred local timezone"""
global TZ
TZ = tz
[docs]@contextlib.contextmanager
def tmp_timezone(new_tz):
"""Temporarily override the global timezone for a black.
This is used as a context manager::
with tmp_timezone('Europe/Berlin'):
....
Note: this overrides the global timezone. In the future, there will
be a way to handle timezones as non-global variables, which should
be preferred.
"""
global TZ
old_tz = TZ
TZ = new_tz
yield
TZ = old_tz
SQLITE3_EXTENSIONS_BASENAME = os.path.join(os.path.dirname(__file__), 'sqlite-extension-functions.c')
SQLITE3_EXTENSIONS_FILENAME = os.path.join(os.path.dirname(__file__), 'sqlite-extension-functions.so')
[docs]def install_extensions():
"""Automatically install sqlite extension functions.
Only works on Linux for now, improvements welcome."""
import hashlib
if not os.path.exists(SQLITE3_EXTENSIONS_BASENAME):
import urllib.request
extension_url = 'https://sqlite.org/contrib/download/extension-functions.c?get=25'
urllib.request.urlretrieve(extension_url, SQLITE3_EXTENSIONS_BASENAME)
expected_digest = '991b40fe8b2799edc215f7260b890f14a833512c9d9896aa080891330ffe4052'
if hashlib.sha256(open(SQLITE3_EXTENSIONS_BASENAME, 'rb').read()).hexdigest() != expected_digest:
print("sqlite-extension-functions.c has wrong sha256 hash", file=sys.stderr)
os.system('cd %s; gcc -lm -shared -fPIC sqlite-extension-functions.c -o sqlite-extension-functions.so'%
os.path.dirname(__file__))
print("Sqlite extension successfully compiled.")
[docs]def uninstall_extensions():
"""Uninstall any installed extensions"""
def unlink_if_exists(x):
if os.path.exists(x):
os.unlink(x)
unlink_if_exists(SQLITE3_EXTENSIONS_FILENAME)
#TODO: reanme to data.py
[docs]def df_normalize(df, tz=None, old_tz=None):
"""Normalize a df (from sql) before presenting it to the user.
This sets the dataframe index to the time values, and converts times
to pandas.TimeStamp:s. Modifies the data frame inplace.
"""
if tz is None:
warnings.warn(DeprecationWarning("From now on, you should explicitely specify timezone with e.g. tz='Europe/Helsinki'. Specify as part of the reading function."))
tz = TZ
if 'time' in df:
df.index = to_datetime(df['time'])
df.index.name = None
df['datetime'] = df.index
elif 'day' in df and 'hour' in df:
index = df[['day', 'hour']].apply(lambda row: pd.Timestamp('%s %s:00'%(row['day'], row['hour'])), axis=1)
if old_tz is not None:
# old_tz is given - e.g. sqlite already converts it to localtime
index = index.dt.tz_localize(old_tz).dt.tz_convert(tz)
else:
index = index.dt.tz_localize(tz)
df.index = index
df.index.name = None
[docs]def to_datetime(value):
times = pd.to_datetime(value, unit='s', utc=True)
if isinstance(times, pd.Series):
return times.dt.tz_convert(TZ)
else:
return times.tz_convert(TZ)
[docs]def occurrence(series, bin_width=720, grouping_width=3600):
"""Number of 12-minute
This reproduces the logic of the "occurrence" database function, without needing the database.
inputs: pandas.Series of pandas.Timestamps
Output: pandas.DataFrame with timestamp index and 'occurance' column.
TODO: use the grouping_width option.
"""
if grouping_width != 3600:
raise ValueError("Changing grouping_width is not currently supported.")
if grouping_width % bin_width != 0:
raise ValueError("grouping_width must be a multiple of bin_width")
if not isinstance(series, (pd.Series, pd.Index)):
raise ValueError("The input to niimpy.util.occurrence must be a "
"pandas Series or Index, not a DataFrame. "
"(your input type was: %s)"%type(series))
if not np.issubdtype(series.dtype.base, np.datetime64):
df = pd.to_datetime(series, unit='s')
df = pd.DataFrame({"time":series})
df['day'] = df['time'].dt.strftime("%Y-%m-%d")
df['hour'] = df['time'].dt.hour
df['bin'] = df['time'].dt.minute // (bin_width//60)
gb1 = df.groupby(by=["day", "hour", "bin"], as_index=False).size() # everything in the same bin goes to one row
gb2 = gb1.groupby(by=["day", "hour"]).size().to_frame() # count number of bins in an hour
gb2.rename({0:"occurrence"}, axis=1, inplace=True)
# Following handles cases where there are not enough rows... TODO what is the right thing to do here?
gb2.reset_index(inplace=True)
gb2.index = gb2.loc[:, ['day', 'hour']].apply(lambda row: pd.Timestamp('%s %s:00'%(row['day'], row['hour'])), axis=1)
return gb2
[docs]def aggregate(df, freq, method_numerical='mean', method_categorical='first', groups=['user'], **resample_kwargs):
""" Grouping and resampling the data. This function performs separated resampling
for different types of columns: numerical and categorical.
Parameters
----------
df : pandas Dataframe
Dataframe to resample
freq : string
Frequency to resample the data. Requires the dataframe to have datetime-like index.
method_numerical : str
Resampling method for numerical columns. Possible values:
'sum', 'mean', 'median'. Default value is 'mean'.
method_categorical : str
Resampling method for categorical columns. Possible values: 'first', 'mode', 'last'.
groups : list
Columns used for groupby operation.
resample_kwargs : dict
keywords to pass pandas resampling function
Returns
-------
An aggregated and resampled multi-index dataframe.
"""
#Groupby user
groupby = df.groupby(groups)
#Resample numerical columns -> sub_df1
assert method_numerical in ['mean', 'sum', 'median'], \
'Cannot recognize sampling method. Possible values: "mean", "sum", "median".'
if method_numerical == 'sum':
sub_df1 = groupby.resample(freq, **resample_kwargs).sum()
elif method_numerical == 'mean':
sub_df1 = groupby.resample(freq, **resample_kwargs).mean()
elif method_numerical == 'median':
sub_df1 = groupby.resample(freq, **resample_kwargs).median()
else:
print("Can't recognize sampling method")
#Resample cat columns -> sub_df2
cat_cols = df.select_dtypes(include=['object']).columns.to_list()
cat_cols.extend(groups)
cat_cols = list(set(cat_cols))
groupby = df[cat_cols].groupby(groups)
assert method_categorical in ['first', 'mode', 'last']
if method_categorical == 'first':
sub_df2 = groupby.resample(freq, **resample_kwargs).first()
elif method_categorical == 'last':
sub_df2 = groupby.resample(freq, **resample_kwargs).last()
elif method_categorical == 'mode':
sub_df2 = groupby.resample(freq, **resample_kwargs).agg(lambda x: tuple(stats.mode(x)[0]))
#Merge sub_df1 and sub_df2
sub_df1 = sub_df1.drop(groups, axis=1, errors='ignore')
sub_df2 = sub_df2.drop(groups, axis=1, errors='ignore')
final_df = sub_df1.join(sub_df2)
return final_df