Source code for smrf.distribute.wind


import numpy as np
import pandas as pd
import logging
import os
from smrf.distribute import image_data
from smrf.utils import utils
import netCDF4 as nc
import pytz
import glob


[docs]class wind(image_data.image_data): """ The :mod:`~smrf.distribute.wind.wind` class allows for variable specific distributions that go beyond the base class. Estimating wind speed and direction is complex terrain can be difficult due to the interaction of the local topography with the wind. The methods described here follow the work developed by Winstral and Marks (2002) and Winstral et al. (2009) :cite:`Winstral&Marks:2002` :cite:`Winstral&al:2009` which parameterizes the terrain based on the upwind direction. The underlying method calulates the maximum upwind slope (maxus) within a search distance to determine if a cell is sheltered or exposed. See :mod:`smrf.utils.wind.model` for a more in depth description. A maxus file (library) is used to load the upwind direction and maxus values over the dem. The following steps are performed when estimating the wind speed: 1. Adjust measured wind speeds at the stations and determine the wind direction componenets 2. Distribute the flat wind speed 3. Distribute the wind direction components 4. Simulate the wind speeds based on the distribute flat wind, wind direction, and maxus values After the maxus is calculated for multiple wind directions over the entire DEM, the measured wind speed and direction can be distirbuted. The first step is to adjust the measured wind speeds to estimate the wind speed if the site were on a flat surface. The adjustment uses the maxus value at the station location and an enhancement factor for the site based on the sheltering of that site to wind. A special consideration is performed when the station is on a peak, as artificially high wind speeds can be calcualted. Therefore, if the station is on a peak, the minimum maxus value is choosen for all wind directions. The wind direction is also broken up into the u,v componenets. Next the flat wind speed, u wind direction component, and v wind direction compoenent are distributed using the underlying distribution methods. With the distributed flat wind speed and wind direction, the simulated wind speeds can be estimated. The distributed wind direction is binned into the upwind directions in the maxus library. This determines which maxus value to use for each pixel in the DEM. Each cell's maxus value is further enhanced for vegetation, with larger, more dense vegetation increasing the maxus value (more sheltering) and bare ground not enhancing the maxus value (exposed). With the adjusted maxus values, wind speed is estimated using the relationships in Winstral and Marks (2002) and Winstral et al. (2009) :cite:`Winstral&Marks:2002` :cite:`Winstral&al:2009` based on the distributed flat wind speed and each cell's maxus value. When gridded data is provided, the methods outlined above are not performed due to the unknown complexity of parameterizing the gridded dataset for using the maxus methods. Therefore, the wind speed and direction are distributed using the underlying distribution methods. Args: windConfig: The [wind] section of the configuration file Attributes: config: configuration from [vapor_pressure] section wind_speed: numpy matrix of the wind speed wind_direction: numpy matrix of the wind direction veg_type: numpy array for the veg type, from :py:attr:`smrf.data.loadTopo.topo.veg_type` _maxus_file: the location of the maxus NetCDF file maxus: the loaded library values from :py:attr:`_maxus_file` maxus_direction: the directions associated with the :py:attr:`maxus` values min: minimum value of wind is 0.447 max: maximum value of wind is 35 stations: stations to be used in alphabetical order output_variables: Dictionary of the variables held within class :mod:`!smrf.distribute.wind.wind` that specifies the ``units`` and ``long_name`` for creating the NetCDF output file. variable: 'wind' """ variable = 'wind' # these are variables that can be output output_variables = {'flatwind': { 'units': 'm/s', 'standard_name': 'flatwind_wind_speed', 'long_name': 'Simulated wind on a flat surface' }, 'wind_speed': { 'units': 'm/s', 'standard_name': 'wind_speed', 'long_name': 'Wind speed' }, 'wind_direction': { 'units': 'degrees', 'standard_name': 'wind_direction', 'long_name': 'Wind direction' } } # these are variables that are operate at the end only and do not need to # be written during main distribute loop post_process_variables = {} def __init__(self, windConfig, distribute_drifts, wholeConfig, tempDir=None): # extend the base class image_data.image_data.__init__(self, self.variable) self._logger = logging.getLogger(__name__) # check and assign the configuration self.getConfig(windConfig) if (tempDir is None) | (tempDir == 'WORKDIR'): tempDir = os.environ['WORKDIR'] self.tempDir = tempDir if windConfig['distribution'] == 'grid': self.gridded = True self.distribute_drifts = False # wind ninja parameters self.wind_ninja_dir = self.config['wind_ninja_dir'] self.wind_ninja_dxy = self.config['wind_ninja_dxy'] self.wind_ninja_pref = self.config['wind_ninja_pref'] if self.config['wind_ninja_tz'] is not None: self.wind_ninja_tz = pytz.timezone(self.config['wind_ninja_tz']) self.start_date = pd.to_datetime(wholeConfig['time']['start_date']) self.grid_data = wholeConfig['gridded']['data_type'] else: # open the maxus netCDF self._maxus_file = nc.Dataset(self.config['maxus_netcdf'], 'r') self.maxus = self._maxus_file.variables['maxus'][:] self.maxus_direction = self._maxus_file.variables['direction'][:] self._maxus_file.close() self._logger.debug('Read data from {}' .format(self.config['maxus_netcdf'])) # get the veg values matching = [s for s in self.config.keys() if "veg_" in s] v = {} for m in matching: ms = m.split('_') if type(self.config[m]) == list: v[ms[1]] = float(self.config[m][0]) else: v[ms[1]] = float(self.config[m]) self.veg = v # whether or not we will use this data to redistribute precip self.distribute_drifts = distribute_drifts self._logger.debug('Created distribute.wind')
[docs] def initialize(self, topo, data): """ Initialize the distribution, calls :mod:`smrf.distribute.image_data.image_data._initialize`. Checks for the enhancement factors for the stations and vegetation. Args: topo: :mod:`smrf.data.loadTopo.topo` instance contain topographic data and infomation data: data Pandas dataframe containing the station data, from :mod:`smrf.data.loadData` or :mod:`smrf.data.loadGrid` """ self._logger.debug('Initializing distribute.wind') if type(self.config['peak']) != list: self.config['peak'] = [self.config['peak']] self._initialize(topo, data.metadata) # meshgrid points self.X = topo.X self.Y = topo.Y if not self.gridded: self.veg_type = topo.veg_type # get the enhancements for the stations if 'enhancement' not in self.metadata.columns: self.metadata['enhancement'] = \ float(self.config['station_default']) for m in self.metadata.index: sta_e = m.lower() if sta_e in self.config: if type(self.config[sta_e]) == list: enhancement = self.config[sta_e][0] else: enhancement = self.config[sta_e] self.metadata.loc[m, 'enhancement'] = \ float(enhancement) else: self.flatwind = None if self.config['wind_ninja_dir'] is not None: # WindNinja output height in meters self.wind_height = float(self.config['wind_ninja_height']) # set roughness that was used in WindNinja simulation # WindNinja uses 0.01m for grass, 0.43 for shrubs, and 1.0 for forest self.wn_roughness = float(self.config['wind_ninja_roughness']) * \ np.ones_like(topo.dem) # get our effective veg surface roughness # to use in log law scaling of WindNinja data # using the relationship in # https://www.jstage.jst.go.jp/article/jmsj1965/53/1/53_1_96/_pdf self.veg_roughness = topo.veg_height / 7.39 # make sure roughness stays reasonable using bounds from # http://www.iawe.org/Proceedings/11ACWE/11ACWE-Cataldo3.pdf self.veg_roughness[self.veg_roughness < 0.01] = 0.01 self.veg_roughness[np.isnan(self.veg_roughness)] = 0.01 self.veg_roughness[self.veg_roughness > 1.6] = 1.6 # precalculate scale arrays so we don't do it every timestep self.ln_wind_scale = np.log((self.veg_roughness + self.wind_height) / self.veg_roughness) / \ np.log((self.wn_roughness + self.wind_height) / self.wn_roughness) ###### do this first to speedup the interpolation later #### # find vertices and weights to speedup interpolation fro ascii file fmt_d = '%Y%m%d' fp_vel = glob.glob(os.path.join(self.wind_ninja_dir, 'data{}'.format(self.start_date.strftime(fmt_d)), 'wind_ninja_data', '*_vel.asc'))[0] # get wind ninja topo stats ts2 = utils.get_asc_stats(fp_vel) xwn = ts2['x'][:] ywn = ts2['y'][:] XW, YW = np.meshgrid(xwn, ywn) xwint = XW.flatten() ywint = YW.flatten() xy=np.zeros([XW.shape[0]*XW.shape[1],2]) xy[:,1]=ywint xy[:,0]=xwint uv=np.zeros([self.X.shape[0]*self.X.shape[1],2]) uv[:,1]=self.Y.flatten() uv[:,0]=self.X.flatten() self.vtx, self.wts = utils.interp_weights(xy, uv,d=2) ###### end do this first to speedup the interpolation later #### if not self.distribute_drifts: # we have to pass these to precip, so make them none if we won't use them self.dir_round_cell = None self.cellmaxus = None
[docs] def distribute(self, data_speed, data_direction, t): """ Distribute given a Panda's dataframe for a single time step. Calls :mod:`smrf.distribute.image_data.image_data._distribute`. Follows the following steps for station measurements: 1. Adjust measured wind speeds at the stations and determine the wind direction componenets 2. Distribute the flat wind speed 3. Distribute the wind direction components 4. Simulate the wind speeds based on the distribute flat wind, wind direction, and maxus values Gridded interpolation distributes the given wind speed and direction. Args: data_speed: Pandas dataframe for single time step from wind_speed data_direction: Pandas dataframe for single time step from wind_direction t: time stamp """ self._logger.debug('{} Distributing wind_direction and wind_speed' .format(data_speed.name)) data_speed = data_speed[self.stations] data_direction = data_direction[self.stations] if self.gridded: # check if running with windninja if self.wind_ninja_dir is not None: wind_speed, wind_direction = self.convert_wind_ninja(t) self.wind_speed = wind_speed self.wind_direction = wind_direction else: self._distribute(data_speed, other_attribute='wind_speed') # wind direction components at the station self.u_direction = np.sin(data_direction * np.pi/180) # u self.v_direction = np.cos(data_direction * np.pi/180) # v # distribute u_direction and v_direction self._distribute(self.u_direction, other_attribute='u_direction_distributed') self._distribute(self.v_direction, other_attribute='v_direction_distributed') # combine u and v to azimuth az = np.arctan2(self.u_direction_distributed, self.v_direction_distributed)*180/np.pi az[az < 0] = az[az < 0] + 360 self.wind_direction = az # set min and max self.wind_speed = utils.set_min_max(self.wind_speed, self.min, self.max) else: # calculate the maxus at each site self.stationMaxus(data_speed, data_direction) # distribute the flatwind self._distribute(self.flatwind_point, other_attribute='flatwind') # distribute u_direction and v_direction self._distribute(self.u_direction, other_attribute='u_direction_distributed') self._distribute(self.v_direction, other_attribute='v_direction_distributed') # Calculate simulated wind speed at each cell from flatwind self.simulateWind(data_speed)
[docs] def distribute_thread(self, queue, data_speed, data_direction): """ Distribute the data using threading and queue. All data is provided and ``distribute_thread`` will go through each time step and call :mod:`smrf.distribute.wind.wind.distribute` then puts the distributed data into the queue for :py:attr:`wind_speed`. Args: queue: queue dictionary for all variables data: pandas dataframe for all data, indexed by date time """ for t in data_speed.index: self.distribute(data_speed.loc[t], data_direction.loc[t], t) queue['wind_speed'].put([t, self.wind_speed]) queue['wind_direction'].put([t, self.wind_direction]) # if not self.gridded: queue['flatwind'].put([t, self.flatwind]) queue['cellmaxus'].put([t,self.cellmaxus]) queue['dir_round_cell'].put([t,self.dir_round_cell])
[docs] def simulateWind(self, data_speed): """ Calculate the simulated wind speed at each cell from flatwind and the distributed directions. Each cell's maxus value is pulled from the maxus library based on the distributed wind direction. The cell's maxus is further adjusted based on the vegetation type and the factors provided in the [wind] section of the configuration file. Args: data_speed: Pandas dataframe for a single time step of wind speed to make the pixel locations same as the measured values """ # combine u and v to azimuth az = np.arctan2(self.u_direction_distributed, self.v_direction_distributed)*180/np.pi az[az < 0] = az[az < 0] + 360 dir_round_cell = np.ceil((az - self.nstep/2) / self.nstep) * self.nstep dir_round_cell[dir_round_cell < 0] = dir_round_cell[dir_round_cell < 0] + 360 dir_round_cell[dir_round_cell == -0] = 0 dir_round_cell[dir_round_cell == 360] = 0 cellmaxus = np.zeros(dir_round_cell.shape) cellwind = np.zeros(dir_round_cell.shape) dir_unique = np.unique(dir_round_cell) for d in dir_unique: # find all values for matching direction ind = dir_round_cell == d i = np.argwhere(self.maxus_direction == d)[0][0] cellmaxus[ind] = self.maxus[i][ind] # correct for veg dynamic_mask = np.ones(cellmaxus.shape) for k,v in self.veg.items(): # Adjust veg types that were specified by the user if k != 'default': ind = self.veg_type == int(k) dynamic_mask[ind] = 0 cellmaxus[ind] += v # Apply the veg default to those that weren't messed with if self.veg['default'] != 0: cellmaxus[dynamic_mask == 1] += self.veg['default'] # correct unreasonable values cellmaxus[cellmaxus > 32] = 32 cellmaxus[cellmaxus < -32] = -32 # determine wind factor = float(self.config['reduction_factor']) ind = cellmaxus < -30.10 cellwind[ind] = factor * self.flatwind[ind] * 4.211 ind = (cellmaxus > -30.10) & (cellmaxus < -21.3) c = np.abs(cellmaxus[ind]) cellwind[ind] = factor * self.flatwind[ind] * \ (1.756507 - 0.1678945 * c + 0.01927844 * np.power(c, 2) - 0.0003651592 * np.power(c, 3)) ind = (cellmaxus > -21.3) & (cellmaxus < 0) c = np.abs(cellmaxus[ind]) cellwind[ind] = factor * self.flatwind[ind] * \ (1.0 + 0.1031717 * c - 0.008003561 * np.power(c, 2) + 0.0003996581 * np.power(c, 3)) ind = cellmaxus > 30.10 cellwind[ind] = self.flatwind[ind] / 4.211 ind = (cellmaxus < 30.10) & (cellmaxus > 21.3) c = cellmaxus[ind] cellwind[ind] = self.flatwind[ind] / \ (1.756507 - 0.1678945 * c + 0.01927844 * np.power(c, 2) - 0.0003651592 * np.power(c, 3)) ind = (cellmaxus < 21.3) & (cellmaxus >= 0) c = cellmaxus[ind] cellwind[ind] = self.flatwind[ind] / \ (1.0 + 0.1031717 * c - 0.008003561 * np.power(c, 2) + 0.0003996581 * np.power(c, 3)) # Convert from 3m to 5m wind speed cellwind *= 1.07985 # preseve the measured values cellwind[self.metadata.yi, self.metadata.xi] = data_speed # check for NaN nans, x = utils.nan_helper(cellwind) if np.sum(nans) > 0: cellwind[nans] = np.interp(x(nans), x(~nans), cellwind[~nans]) self.wind_speed = utils.set_min_max(cellwind, self.min, self.max) self.wind_direction = az self.cellmaxus = cellmaxus self.dir_round_cell = dir_round_cell
[docs] def stationMaxus(self, data_speed, data_direction): """ Determine the maxus value at the station given the wind direction. Can specify the enhancemet for each station or use the default, along with whether or not the station is on a peak which will ensure that the station cannot be sheltered. The station enhancement and peak stations are specified in the [wind] section of the configuration file. Calculates the following for each station: * :py:attr:`flatwind` * :py:attr:`u_direction` * :py:attr:`v_direction` Args: data_speed: wind_speed data frame for single time step data_direction: wind_direction data frame for single time step """ # ---------------------------------------- # Get data and site maxus value flatwind = data_speed.copy() # number of bins that the maxus library was calculated for self.nbins = len(self.maxus_direction) self.nstep = 360/self.nbins for m in self.metadata.index: # pixel locations xi = self.metadata.loc[m, 'xi'] yi = self.metadata.loc[m, 'yi'] e = self.metadata.loc[m, 'enhancement'] # maxus value at the station if not pd.isnull(data_direction[m]): if m.upper() in self.config['peak']: val_maxus = np.min(self.maxus[:, yi, xi] + e) else: idx = int(np.ceil((data_direction[m] - self.nstep/2) / self.nstep) * self.nstep) if idx == 360: idx = 0 # special case when 360=0 ind = self.maxus_direction == idx val_maxus = self.maxus[ind, yi, xi] + e # correct unreasonable values if val_maxus > 35: val_maxus = 35 if val_maxus < -35: val_maxus = -35 ma = np.abs(val_maxus) # Lapse all measurements to flat terrain (i.e. maxus = 0) if (ma > 21.3 and ma < 30.0): expVal = 1.756507 - 0.1678945 * ma + \ 0.01927844 * np.power(ma, 2) - \ 0.0003651592 * np.power(ma, 3) elif (ma >= 30.0): expVal = 4.21 else: expVal = 1.0 + 0.1031717 * (ma) - \ 0.008003561 * np.power(ma, 2) + \ 0.0003996581 * np.power(ma, 3) if val_maxus > 0: flatwind.loc[m] = data_speed[m] * expVal else: flatwind.loc[m] = data_speed[m] / expVal else: flatwind.loc[m] = np.NaN self.flatwind_point = flatwind # wind direction components at the station self.u_direction = np.sin(data_direction * np.pi/180) # u self.v_direction = np.cos(data_direction * np.pi/180) # v
[docs] def convert_wind_ninja(self, t): """ Convert the WindNinja ascii grids back to the SMRF grids and into the SMRF data streamself. Args: t: datetime of timestep Returns: ws: wind speed numpy array wd: wind direction numpy array """ # fmt_wn = '%Y-%m-%d_%H%M' fmt_wn = '%m-%d-%Y_%H%M' fmt_d = '%Y%m%d' # get the ascii files that need converted # file example tuol_09-20-2018_1900_200m_vel.prj # find timestamp of WindNinja file t_file = t.astimezone(self.wind_ninja_tz) fp_vel = os.path.join(self.wind_ninja_dir, 'data{}'.format(t.strftime(fmt_d)), 'wind_ninja_data', '{}_{}_{:d}m_vel.asc'.format(self.wind_ninja_pref, t_file.strftime(fmt_wn), self.wind_ninja_dxy)) # make sure files exist if not os.path.isfile(fp_vel): raise ValueError('{} in windninja convert module does not exist!'.format(fp_vel)) data_vel = np.loadtxt(fp_vel, skiprows=6) data_vel_int = data_vel.flatten() # # interpolate to the SMRF grid from the WindNinja grid g_vel = utils.grid_interpolate(data_vel_int, self.vtx, self.wts, self.X.shape) # flip because it comes out upsidedown g_vel = np.flipud(g_vel) # log law scale g_vel = g_vel * self.ln_wind_scale # Don't get angle if not distributing drifts if self.distribute_drifts: fp_ang = os.path.join(self.wind_ninja_dir, 'data{}'.format(t.strftime(fmt_d)), 'wind_ninja_data', '{}_{}_{:d}m_ang.asc'.format(self.wind_ninja_pref, t_file.strftime(fmt_wn), self.wind_ninja_dxy)) if not os.path.isfile(fp_ang): raise ValueError('{} in windninja convert module does not exist!'.format(fp_ang)) data_ang = np.loadtxt(fp_ang, skiprows=6) data_ang_int = data_ang.flatten() g_ang = utils.grid_interpolate(data_ang_int, self.vtx, self.wts, self.X.shape) g_ang = np.flipud(g_ang) else: g_ang = None return g_vel, g_ang