"""
20160104 Scott Havens
Collection of utility functions
"""
import numpy as np
import pandas as pd
from datetime import datetime
import pytz
import os
from smrf.utils import io
from shutil import copyfile
from .gitinfo import __gitVersion__, __gitPath__
from smrf import __version__, __core_config__
import random
import sys
from inicheck.checkers import CheckType
from inicheck.output import generate_config
from inicheck.utilities import mk_lst
import copy
import scipy.spatial.qhull as qhull
from scipy.interpolate.interpnd import CloughTocher2DInterpolator, LinearNDInterpolator
[docs]class CheckStation(CheckType):
"""
Custom check for ensuring our stations are always capitalized
"""
def __init__(self,**kwargs):
super(CheckStation,self).__init__(**kwargs)
[docs] def cast(self):
if self.value.lower() != 'none':
return self.value.upper()
else:
return self.value
[docs]def find_configs(directory):
"""
Searches through a directory and returns all the .ini fulll filenames.
Args:
directory: string path to directory.
Returns:
configs: list of paths pointing to the config file.
"""
configs = []
directory = os.path.abspath(os.path.expanduser(directory))
for f in os.listdir(directory):
if f.split('.')[-1] == 'ini':
configs.append(os.path.join(directory,f))
return configs
[docs]def handle_run_script_options(config_option):
"""
Handle function for dealing with args in the SMRF run script
Args:
config_option: string path to a directory or a specific config file.
Returns:
configFile:Full path to an existing config file.
"""
config_option = os.path.abspath(os.path.expanduser(config_option))
#User passes a directory
if os.path.isdir(config_option):
configs = find_configs(config_option)
if len(configs) > 1:
print("\nError: Multiple config files detected in {0} please ensure"
" only one is in the folder.\n".format(config_option))
sys.exit()
else:
configFile = configs[0]
else:
configFile = config_option
if not os.path.isfile(configFile):
print('\nError: Please provide a config file or a directory containing'
' one.\n')
sys.exit()
return configFile
[docs]def nan_helper(y):
"""
Helper to handle indices and logical indices of NaNs.
Example:
>>> # linear interpolation of NaNs
>>> nans, x= nan_helper(y)
>>> y[nans]= np.interp(x(nans), x(~nans), y[~nans])
Args:
y: 1d numpy array with possible NaNs
Returns:
tuple:
**nans** - logical indices of NaNs
**index** - a function, with signature
indices=index(logical_indices) to convert logical
indices of NaNs to 'equivalent' indices
"""
return np.isnan(y), lambda z: z.nonzero()[0]
[docs]def set_min_max(data, min_val, max_val):
"""
Ensure that the data is in the bounds of min and max
Args:
data: numpy array of data to be min/maxed
min_val: minimum threshold to trim data
max_val: Maximum threshold to trim data
Returns:
data: numpy array of data trimmed at min_val and max_val
"""
if max_val == None:
max_val = np.inf
if min_val == None:
min_val = -np.inf
ind = np.isnan(data)
data[data <= min_val] = min_val
data[data >= max_val] = max_val
data[ind] = np.nan
return data
[docs]def water_day(indate):
"""
Determine the decimal day in the water year
Args:
indate: datetime object
Returns:
tuple:
**dd** - decimal day from start of water year
**wy** - Water year
20160105 Scott Havens
"""
tp = indate.timetuple()
# create a test start of the water year
test_date = datetime(tp.tm_year, 10, 1, 0, 0, 0)
test_date = test_date.replace(tzinfo=pytz.timezone(indate.tzname()))
# check to see if it makes sense
if indate < test_date:
wy = tp.tm_year
else:
wy = tp.tm_year + 1
# actual water year start
wy_start = datetime(wy-1, 10, 1, 0, 0, 0)
wy_start = wy_start.replace(tzinfo=pytz.timezone(indate.tzname()))
# determine the decimal difference
d = indate - wy_start
dd = d.days + d.seconds/86400.0
return dd, wy
[docs]def is_leap_year(year):
return (year % 4 == 0 and year % 100 != 0) or year % 400 == 0
[docs]def getgitinfo():
"""
gitignored file that contains specific SMRF version and path
Returns:
str: git version from 'git describe'
"""
# return git describe if in git tracked SMRF
if len(__gitVersion__) > 1:
return __gitVersion__
# return overarching version if not in git tracked SMRF
else:
version = 'v'+__version__
return version
[docs]def check_station_colocation(metadata_csv=None,metadata=None):
"""
Takes in a data frame representing the metadata for the weather stations
as produced by :mod:`smrf.framework.model_framework.SMRF.loadData` and
check to see if any stations have the same location.
Args:
metadata_csv: CSV containing the metdata for weather stations
metadata: Pandas Dataframe containing the metdata for weather stations
Returns:
repeat_sta: list of station primary_id that are colocated
"""
if metadata_csv != None:
metadata = pd.read_csv(metadata_csv)
metadata.set_index('primary_id', inplace=True)
#Unique station locations
unique_x = list(metadata.xi.unique())
unique_y = list(metadata.yi.unique())
repeat_sta = []
#Cycle through all the positions look for multiple stations at a position
for x in unique_x:
for y in unique_y:
x_search = metadata['xi'] == x
y_search = metadata['yi'] == y
stations = metadata.index[x_search & y_search].tolist()
if len(stations) > 1:
repeat_sta.append(stations)
if len(repeat_sta) == 0:
repeat_sta = None
return repeat_sta
[docs]def get_config_doc_section_hdr():
"""
Returns the header dictionary for linking modules in smrf to the
documentation generated by inicheck auto doc functions
"""
hdr_dict = {}
dist_modules = ['air_temp', 'vapor_pressure', 'precip', 'wind', 'albedo',
'thermal','solar','soil_temp']
for d in dist_modules:
if d == 'precip':
sec = 'precipitation'
else:
sec = d
# If distributed module link api
intro = ("The {0} section controls all the available parameters that"
" effect the distribution of the {0} module, espcially the"
" associated models. For more detailed information please see"
" :mod:`smrf.distribute.{0}`").format(sec)
hdr_dict[d] = intro
return hdr_dict
[docs]def get_asc_stats(fp):
"""
Returns header of ascii dem file
"""
ts = {}
header = {}
ff = open(fp, 'r')
for idl, line in enumerate(ff):
tmp_line = line.strip().split()
header[tmp_line[0]] = tmp_line[1]
if idl >= 5:
break
ff.close()
ts['nx'] = int(header['ncols'])
ts['ny'] = int(header['nrows'])
ts['du'] = float(header['cellsize'])
ts['dv'] = float(header['cellsize'])
ts['u'] = float(header['yllcorner'])
ts['v'] = float(header['xllcorner'])
ts['x'] = ts['v'] + ts['dv']*np.arange(ts['nx'])
ts['y'] = ts['u'] + ts['du']*np.arange(ts['ny'])
return ts
[docs]def getqotw():
p = os.path.dirname(__core_config__)
q_f = os.path.abspath(os.path.join('{0}'.format(p),'.qotw'))
with open(q_f) as f:
qs = f.readlines()
f.close()
i = random.randrange(0,len(qs))
return qs[i]
[docs]def interp_weights(xy, uv,d=2):
"""
Find vertices and weights of LINEAR interpolation for gridded interp.
This routine follows the methods of scipy.interpolate.griddata as outlined
here:
https://stackoverflow.com/questions/20915502/speedup-scipy-griddata-for-multiple-interpolations-between-two-irregular-grids
This function finds the vertices and weights which is the most computationally
expensive part of the routine. The interpolateion can then be done quickly.
Args:
xy: n by 2 array of flattened meshgrid x and y coords of WindNinja grid
uv: n by 2 array of flattened meshgrid x and y coords of SMRF grid
d: dimensions of array (i.e. 2 for our purposes)
Returns:
vertices:
wts:
"""
tri = qhull.Delaunay(xy)
simplex = tri.find_simplex(uv)
vertices = np.take(tri.simplices, simplex, axis=0)
temp = np.take(tri.transform, simplex, axis=0)
delta = uv - temp[:, d]
bary = np.einsum('njk,nk->nj', temp[:, :d, :], delta)
return vertices, np.hstack((bary, 1 - bary.sum(axis=1, keepdims=True)))
[docs]def grid_interpolate(values, vtx, wts, shp, fill_value=np.nan):
"""
Broken out gridded interpolation from scipy.interpolate.griddata that takes
the vertices and wts from interp_weights function
Args:
values: flattened WindNinja wind speeds
vtx: vertices for interpolation
wts: weights for interpolation
shape: shape of SMRF grid
fill_value: value for extrapolated points
Returns:
ret: interpolated values
"""
ret = np.einsum('nj,nj->n', np.take(values, vtx), wts)
ret[np.any(wts < 0, axis=1)] = fill_value
ret = ret.reshape(shp[0], shp[1])
return ret
[docs]def grid_interpolate_deconstructed(tri, values, grid_points, method='linear'):
"""
Underlying methods from scipy grid_data broken out to pass in the tri
values returned from qhull.Delaunay. This is done to improve the speed
of using grid_data
Args:
tri: values returned from qhull.Delaunay
values: values at HRRR stations generally
grid_points: tuple of vectors for X,Y coords of grid stations
method: either linear or cubic
Returns:
result of interpolation to gridded points
"""
if method == 'cubic':
return CloughTocher2DInterpolator(tri, values)(grid_points)
elif method == 'linear':
return LinearNDInterpolator(tri, values)(grid_points)