Source code for smrf.framework.model_framework

The module :mod:`~smrf.framework.model_framework` contains functions and
classes that act as a major wrapper to the underlying packages and modules
contained with SMRF. A class instance of
:class:`~smrf.framework.model_framework.SMRF` is initialized with a
configuration file indicating where data is located, what variables to
distribute and how, where to output the distributed data, or run as a threaded
application. See the help on the configuration file to learn more about how to
control the actions of :class:`~smrf.framework.model_framework.SMRF`.

    The following examples shows the most generic method of running SMRF. These
    commands will generate all the forcing data required to run iSnobal.  A
    complete example can be found in

    >>> import smrf
    >>> s = smrf.framework.SMRF(configFile) # initialize SMRF
    >>> s.loadTopo() # load topo data
    >>> s.initializeDistribution() # initialize the distribution
    >>> s.initializeOutput() # initialize the outputs if desired
    >>> s.loadData() # load weather data  and station metadata
    >>> s.distributeData() # distribute


import logging
import os
import sys
import coloredlogs
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import pytz
from smrf import data, distribute, output, __core_config__, __recipes__
from smrf.envphys import radiation
from smrf.utils import queue, io
from smrf.utils.utils import backup_input, getqotw, check_station_colocation
from threading import Thread
import shutil
from import get_user_config, check_config
from inicheck.config import UserConfig
from inicheck.output import print_config_report,generate_config, print_recipe_summary
from inicheck.utilities import pcfg, get_relative_to_cfg

[docs]class SMRF(): """ SMRF - Spatial Modeling for Resources Framework Args: configFile (str): path to configuration file. Returns: SMRF class instance. Attributes: start_date: start_date read from configFile end_date: end_date read from configFile date_time: Numpy array of date_time objects between start_date and end_date config: Configuration file read in as dictionary distribute: Dictionary the contains all the desired variables to distribute and is initialized in :func:`~smrf.framework.model_framework.initializeDistirbution` """ # These are the modules that the user can modify and use different methods modules = ['air_temp', 'albedo', 'precip', 'soil_temp', 'solar', 'thermal', 'vapor_pressure', 'wind'] # These are the variables that will be queued thread_variables = ['cosz', 'azimuth', 'illum_ang', 'air_temp', 'dew_point', 'vapor_pressure', 'wind_speed', 'precip', 'percent_snow', 'snow_density', 'last_storm_day_basin', 'storm_days', 'precip_temp', 'clear_vis_beam', 'clear_vis_diffuse', 'clear_ir_beam', 'clear_ir_diffuse', 'albedo_vis', 'albedo_ir', 'net_solar', 'cloud_factor', 'thermal', 'output', 'veg_ir_beam','veg_ir_diffuse', 'veg_vis_beam', 'veg_vis_diffuse', 'cloud_ir_beam', 'cloud_ir_diffuse', 'cloud_vis_beam', 'cloud_vis_diffuse', 'thermal_clear', 'wind_direction'] def __init__(self, config, external_logger=None): """ Initialize the model, read config file, start and end date, and logging """ # read the config file and store if isinstance(config, str): if not os.path.isfile(config): raise Exception('Configuration file does not exist --> {}' .format(config)) configFile = config try: #Read in the original users config ucfg = get_user_config(config, modules = 'smrf') except UnicodeDecodeError as e: print(e) raise Exception(('The configuration file is not encoded in ' 'UTF-8, please change and retry')) elif isinstance(config, UserConfig): ucfg = config configFile = config.filename else: raise Exception('Config passed to SMRF is neither file name nor UserConfig instance') # start logging if external_logger == None: if 'log_level' in ucfg.cfg['logging']: loglevel = ucfg.cfg['logging']['log_level'].upper() else: loglevel = 'INFO' numeric_level = getattr(logging, loglevel, None) if not isinstance(numeric_level, int): raise ValueError('Invalid log level: %s' % loglevel) # setup the logging logfile = None if ucfg.cfg['logging']['log_file']!= None: logfile = ucfg.cfg['logging']['log_file'] if not os.path.isabs(logfile): logfile = os.path.abspath(os.path.join( os.path.dirname(configFile), ucfg.cfg['logging']['log_file'])) if not os.path.isdir(os.path.dirname(logfile)): os.makedirs(os.path.dirname(logfile)) if not os.path.isfile(logfile): with open(logfile,'w+') as f: f.close() fmt = '%(levelname)s:%(name)s:%(message)s' if logfile is not None: logging.basicConfig(filename=logfile, level=numeric_level, filemode='w+', format=fmt) else: logging.basicConfig(level=numeric_level) coloredlogs.install(level=numeric_level, fmt=fmt) self._loglevel = numeric_level self._logger = logging.getLogger(__name__) else: self._logger = external_logger # add the title title = self.title(2) for line in title: out = get_relative_to_cfg(ucfg.cfg['output']['out_location'],ucfg.filename) #Make the tmp and output directories if they do not exist makeable_dirs = [out,os.path.join(out,'tmp')] for path in makeable_dirs: if not os.path.isdir(path): try:"Directory does not exist, \nCreating {0}".format(path)) os.makedirs(path) except OSError as e: raise e self.temp_dir = path # Check the user config file for errors and report issues if any"Checking config file for issues...") warnings, errors = check_config(ucfg) print_config_report(warnings, errors, logger = self._logger) self.config = ucfg.cfg self.ucfg = ucfg # Exit SMRF if config file has errors if len(errors) > 0: self._logger.error("Errors in the config file. See configuration" " status report above.") sys.exit() # Write the config file to the output dir no matter where the project is fname = 'config.ini' full_config_out = self.config['output']['out_location'] full_config_out = os.path.abspath(os.path.join( os.path.dirname(configFile), full_config_out,fname))"Writing config file with full options.") generate_config(self.ucfg,full_config_out) # After writing update the paths to be full abs paths. self.config = self.ucfg.update_config_paths() # process the system variables for k,v in self.config['system'].items(): setattr(self,k,v) os.environ['WORKDIR'] = self.temp_dir # Get the time sectionutils self.start_date = pd.to_datetime(self.config['time']['start_date']) self.end_date = pd.to_datetime(self.config['time']['end_date']) # Check to see if user specified a real end time. if self.start_date > self.end_date: raise ValueError("start_date cannot be larger than end_date.") # Get the timesetps correctly in the time zone d = data.mysql_data.date_range(self.start_date, self.end_date, timedelta(minutes=int(self.config['time']['time_step']))) tzinfo = pytz.timezone(self.config['time']['time_zone']) self.date_time = [di.replace(tzinfo=tzinfo) for di in d] self.time_steps = len(self.date_time) # need to align date time if self.config['albedo']['start_decay'] is not None: self.config['albedo']['start_decay'] = self.config['albedo']['start_decay'].replace(tzinfo=tzinfo) self.config['albedo']['end_decay'] = self.config['albedo']['end_decay'].replace(tzinfo=tzinfo) # if a gridded dataset will be used self.gridded = False if 'gridded' in self.config: self.gridded = True self.forecast_flag = self.config['gridded']['forecast_flag'] self.n_forecast_hours = self.config['gridded']['n_forecast_hours'] # hours from start of day self.day_hour = self.start_date - pd.to_datetime(d[0].strftime("%Y%m%d")) self.day_hour = int(self.day_hour / np.timedelta64(1, 'h')) if ((self.start_date > and not self.gridded) or (self.end_date > and not self.gridded)): raise ValueError("A date set in the future can only be used with" " WRF generated data!") self.distribute = {} if self.config['logging']['qotw']: # Initialize the distribute dict'Started SMRF --> %s' %'Model start --> %s' % self.start_date)'Model end --> %s' % self.end_date)'Number of time steps --> %i' % self.time_steps) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): """ Provide some logging info about when SMRF was closed """ # clean up the WORKDIR if hasattr(self, 'topo'): if self.topo.stoporad_in_file is not None: if os.path.isfile(self.topo.stoporad_in_file): os.remove(self.topo.stoporad_in_file) if hasattr(self, 'distribute'): if 'solar' in self.distribute.keys(): if os.path.isfile(self.distribute['solar'].vis_file): os.remove(self.distribute['solar'].vis_file) if os.path.isfile(self.distribute['solar'].ir_file): os.remove(self.distribute['solar'].ir_file) if hasattr(self,'temp_dir'): if os.path.isdir(self.temp_dir): shutil.rmtree(self.temp_dir)'SMRF closed --> %s' %
[docs] def loadTopo(self, calcInput=True): """ Load the information from the configFile in the ['topo'] section. See :func:`` for full description. """ # load the topo self.topo = data.loadTopo.topo(self.config['topo'], calcInput, tempDir=self.temp_dir)
[docs] def initializeDistribution(self): """ This initializes the distirbution classes based on the configFile sections for each variable. :func:`~smrf.framework.model_framework.SMRF.initializeDistribution` will initialize the variables within the :func:`smrf.distribute` package and insert into a dictionary 'distribute' with variable names as the keys. Variables that are intialized are: * :func:`Air temperature <smrf.distribute.air_temp.ta>` * :func:`Vapor pressure <smrf.distribute.vapor_pressure.vp>` * :func:`Wind speed and direction <smrf.distribute.wind.wind>` * :func:`Precipitation <smrf.distribute.precipitation.ppt>` * :func:`Albedo <smrf.distribute.albedo.albedo>` * :func:`Solar radiation <>` * :func:`Thermal radiation <>` * :func:`Soil Temperature <smrf.distribute.soil_temp.ts>` """ # 1. Air temperature self.distribute['air_temp'] = \ distribute.air_temp.ta(self.config['air_temp']) # 2. Vapor pressure self.distribute['vapor_pressure'] = \ distribute.vapor_pressure.vp(self.config['vapor_pressure'], self.config['precip']['precip_temp_method']) # 3. Wind self.distribute['wind'] = \ distribute.wind.wind(self.config['wind'], self.config['precip']['distribute_drifts'], self.config, tempDir=self.temp_dir) # 4. Precipitation self.distribute['precip'] = \ distribute.precipitation.ppt(self.config['precip'], self.start_date, self.config['time']['time_step']) # 5. Albedo self.distribute['albedo'] = \ distribute.albedo.albedo(self.config['albedo']) # 6. Solar radiation self.distribute['solar'] = \['solar'], self.distribute['albedo'].config, self.topo.stoporad_in_file, self.temp_dir) # 7. thermal radiation self.distribute['thermal'] = \['thermal']) # 8. soil temperature self.distribute['soil_temp'] = \ distribute.soil_temp.ts(self.config['soil_temp'])
[docs] def loadData(self): """ Load the measurement point data for distributing to the DEM, must be called after the distributions are initialized. Currently, data can be loaded from three different sources: * :func:`CSV files <>` * :func:`MySQL database <>` * :func:`Gridded data source (WRF) <>` After loading, :func:`~smrf.framework.mode_framework.SMRF.loadData` will call :func:`smrf.framework.model_framework.find_pixel_location` to determine the pixel locations of the point measurements and filter the data to the desired stations if CSV files are used. """ # get the start date and end date requested flag = True if 'csv' in self.config: = data.loadData.wxdata(self.config['csv'], self.start_date, self.end_date, time_zone=self.config['time']['time_zone'], stations=self.config['stations'], dataType='csv') elif 'mysql' in self.config: = data.loadData.wxdata(self.config['mysql'], self.start_date, self.end_date, time_zone=self.config['time']['time_zone'], stations=self.config['stations'], dataType='mysql') #Add stations to stations in config for input backup self.config['stations']['stations'] = \ elif 'gridded' in self.config: flag = False = data.loadGrid.grid(self.config['gridded'], self.topo, self.start_date, self.end_date, time_zone=self.config['time']['time_zone'], dataType=self.config['gridded']['data_type'], tempDir=self.temp_dir, forecast_flag=self.forecast_flag, day_hour=self.day_hour, n_forecast_hours=self.n_forecast_hours) # set the stations in the distribute try: for key in self.distribute.keys(): setattr(self.distribute[key], 'stations', except Exception as e: self._logger.warning("Distribution not initialized, grid stations" "could not be set.") self._logger.exception(e) else: raise KeyError('Could not determine where station data is located') # determine the locations of the stations on the grid while maintaining reverse compatibility #New DB uses utm_x utm_y instead of X,Y try:['xi'] = \ row: find_pixel_location(row, self.topo.x, 'utm_x'), axis=1)['yi'] = \ row: find_pixel_location(row, self.topo.y, 'utm_y'), axis=1) #Old DB has X and Y except:['xi'] = \ row: find_pixel_location(row, self.topo.x, 'X'), axis=1)['yi'] = \ row: find_pixel_location(row, self.topo.y, 'Y'), axis=1) # pre filter the data to only the desired stations if flag: try: for key in self.distribute.keys(): if key in # Pull out the loaded data d = getattr(, key) # Check to find the matching stations match = d.columns.isin(self.distribute[key].stations) sta_match = d.columns[match] # Update the dataframe and the distribute stations self.distribute[key].stations = sta_match.tolist() setattr(, key, d[sta_match]) if hasattr(, 'cloud_factor'): d = getattr(, 'cloud_factor') setattr(, 'cloud_factor', d[self.distribute['solar'].stations]) except Exception as e: self._logger.warning("Distribution not initialized, data not " "filtered to desired stations in variable {}".format(key)) self._logger.error(e) #Check all sections for stations that are colocated for key in self.distribute.keys(): if key in if self.distribute[key].stations != None: if self.config['stations']['check_colocation']: # Confirm out stations all have a unique position for each section colocated = check_station_colocation([self.distribute[key].stations]) # Stations are co-located, throw error if colocated != None: self._logger.error("Stations in the {0} section are colocated.\n{1}".format(key,','.join(colocated[0]))) sys.exit() #Does the user want to create a CSV copy of the station data used. if self.config["output"]['input_backup'] == True:'Backing up input data...') backup_input(, self.ucfg)
[docs] def distributeData(self): """ Wrapper for various distribute methods. If threading was set in configFile, then :func:`~smrf.framework.model_framework.SMRF.distributeData_threaded` will be called. Default will call :func:`~smrf.framework.model_framework.SMRF.distributeData_single`. """ if self.threading: self.distributeData_threaded() else: self.distributeData_single()
[docs] def distributeData_single(self): """ Distribute the measurement point data for all variables in serial. Each variable is initialized first using the :func:`` instance and the metadata loaded from :func:`~smrf.framework.model_framework.SMRF.loadData`. The function distributes over each time step, all the variables below. Steps performed: 1. Sun angle for the time step 2. Illumination angle 3. Air temperature 4. Vapor pressure 5. Wind direction and speed 6. Precipitation 7. Solar radiation 8. Thermal radiation 9. Soil temperature 10. Output time step if needed """ # ------------------------------------- # Initialize the distibution for v in self.distribute: self.distribute[v].initialize(self.topo, # ------------------------------------- # Distribute the data for output_count, t in enumerate(self.date_time): # wait here for the model to catch up if needed startTime ='Distributing time step %s' % t) # 0.1 sun angle for time step cosz, azimuth = radiation.sunang(t.astimezone(pytz.utc), self.topo.topoConfig['basin_lat'], self.topo.topoConfig['basin_lon'], zone=0, slope=0, aspect=0) # 0.2 illumination angle illum_ang = None if cosz > 0: illum_ang = radiation.shade(self.topo.slope, self.topo.aspect, azimuth, cosz) # 1. Air temperature self.distribute['air_temp'].distribute([t]) # 2. Vapor pressure self.distribute['vapor_pressure'].distribute([t], self.distribute['air_temp'].air_temp) # 3. Wind_speed and wind_direction self.distribute['wind'].distribute([t],[t], t) #self, data, dpt, time, wind, temp, mask=None # 4. Precipitation self.distribute['precip'].distribute([t], self.distribute['vapor_pressure'].dew_point, self.distribute['vapor_pressure'].precip_temp, self.distribute['air_temp'].air_temp, t,[t],[t], self.distribute['wind'].wind_direction, self.distribute['wind'].dir_round_cell, self.distribute['wind'].wind_speed, self.distribute['wind'].cellmaxus) # 5. Albedo self.distribute['albedo'].distribute(t, illum_ang, self.distribute['precip'].storm_days) # 6. Solar self.distribute['solar'].distribute([t], illum_ang, cosz, azimuth, self.distribute['precip'].last_storm_day_basin, self.distribute['albedo'].albedo_vis, self.distribute['albedo'].albedo_ir) # 7. thermal radiation if self.distribute['thermal'].gridded and \ self.config['gridded']['data_type'] != 'hrrr': self.distribute['thermal'].distribute_thermal([t], self.distribute['air_temp'].air_temp) else: self.distribute['thermal'].distribute(t, self.distribute['air_temp'].air_temp, self.distribute['vapor_pressure'].vapor_pressure, self.distribute['vapor_pressure'].dew_point, self.distribute['solar'].cloud_factor) # 8. Soil temperature self.distribute['soil_temp'].distribute() # 9. output at the frequency and the last time step self.output(t) telapsed = - startTime self._logger.debug('{0:.2f} seconds for time step' .format(telapsed.total_seconds())) self.forcing_data = 1
[docs] def distributeData_threaded(self): """ Distribute the measurement point data for all variables using threading and queues. Each variable is initialized first using the :func:`` instance and the metadata loaded from :func:`~smrf.framework.model_framework.SMRF.loadData`. A :func:`DateQueue <smrf.utils.queue.DateQueue_Threading>` is initialized for :attr:`all threading variables <smrf.framework.model_framework.SMRF.thread_variables>`. Each variable in :func:`smrf.distribute` is passed all the required point data at once using the distribute_thread function. The distribute_thread function iterates over :attr:`~smrf.framework.model_framework.SMRF.date_time` and places the distributed values into the :func:`DateQueue <smrf.utils.queue.DateQueue_Threading>`. """ #Create threads for distribution t,q = self.create_distributed_threads() # output thread t.append(queue.QueueOutput(q, self.date_time, self.out_func, self.config['output']['frequency'], self.topo.nx, self.topo.ny)) # the cleaner t.append(queue.QueueCleaner(self.date_time, q)) # start all the threads for i in range(len(t)): t[i].start() # Wait for the end for i in range(len(t)): t[i].join() self._logger.debug('DONE!!!!')
[docs] def create_distributed_threads(self): """ Creates the threads for a distributed run in smrf. Designed for smrf runs in memory Returns t: list of threads for distirbution q: queue """ # ------------------------------------- # Initialize the distibutions for v in self.distribute: self.distribute[v].initialize(self.topo, # ------------------------------------- # Create Queues for all the variables q = {} t = [] self.thread_variables += ['storm_total'] if self.distribute['precip'].nasde_model == 'marks2017': self.thread_variables += ['storm_id'] #Add threaded variables on the fly if self.distribute['thermal'].correct_cloud: self.thread_variables += ['thermal_cloud'] if self.distribute['thermal'].correct_veg: self.thread_variables += ['thermal_veg'] # add some variables to thread_variables based on what we're doing self.thread_variables += ['flatwind'] self.thread_variables += ['cellmaxus', 'dir_round_cell'] for v in self.thread_variables: q[v] = queue.DateQueue_Threading(self.max_values, self.time_out) # ------------------------------------- # Distribute the data # 0.1 sun angle for time step t.append(Thread(target=radiation.sunang_thread, name='sun_angle', args=(q, self.date_time, self.topo.topoConfig['basin_lat'], self.topo.topoConfig['basin_lon'], 0, 0, 0))) # 0.2 illumination angle t.append(Thread(target=radiation.shade_thread, name='illum_angle', args=(q, self.date_time, self.topo.slope, self.topo.aspect))) # 1. Air temperature t.append(Thread(target=self.distribute['air_temp'].distribute_thread, name='air_temp', args=(q, # 2. Vapor pressure t.append(Thread(target=self.distribute['vapor_pressure'].distribute_thread, name='vapor_pressure', args=(q, # 3. Wind_speed and wind_direction t.append(Thread(target=self.distribute['wind'].distribute_thread, name='wind', args=(q,, # 4. Precipitation t.append(Thread(target=self.distribute['precip'].distribute_thread, name='precipitation', args=(q,, self.date_time, self.topo.mask))) # 5. Albedo t.append(Thread(target=self.distribute['albedo'].distribute_thread, name='albedo', args=(q, self.date_time))) # 6.1 Clear sky visible t.append(Thread(target=self.distribute['solar'].distribute_thread_clear, name='clear_vis', args=(q,, 'clear_vis'))) # 6.2 Clear sky ir t.append(Thread(target=self.distribute['solar'].distribute_thread_clear, name='clear_ir', args=(q,, 'clear_ir'))) # 6.3 Net radiation t.append(Thread(target=self.distribute['solar'].distribute_thread, name='solar', args=(q, # 7. thermal radiation if self.distribute['thermal'].gridded: t.append(Thread(target=self.distribute['thermal'].distribute_thermal_thread, name='thermal', args=(q, else: t.append(Thread(target=self.distribute['thermal'].distribute_thread, name='thermal', args=(q, self.date_time))) return t,q
[docs] def initializeOutput(self): """ Initialize the output files based on the configFile section ['output']. Currently only :func:`NetCDF files <smrf.output.output_netcdf>` is supported. """ if self.config['output']['frequency'] is not None: # check the out location pth = os.path.abspath(os.path.expanduser( self.config['output']['out_location'])) if 'out_location' not in self.config['output']: raise Exception('''out_location must be specified for variable outputs''') elif self.config['output']['out_location'] == 'WORKDIR': pth = os.environ['WORKDIR'] elif not os.path.isdir(pth): os.makedirs(pth) self.config['output']['out_location'] = pth # frequency of outputs self.config['output']['frequency'] = \ int(self.config['output']['frequency']) # determine the variables to be output'{} variables will be output' .format(self.config['output']['variables'])) output_variables = self.config['output']['variables'] # output_variables = list(map(str.strip, output_variables)) if not isinstance(output_variables, list): output_variables = output_variables.split(',') # determine which variables belong where variable_list = {} for v in output_variables: for m in self.modules: if m in self.distribute.keys(): if v in self.distribute[m].output_variables.keys(): # if there is a key in the config file, # then change the output file name if v in self.config['output'].keys(): fname = os.path.join(self.config['output']['out_location'], self.config['output'][v]) else: fname = os.path.join(self.config['output']['out_location'], v) d = {'variable': v, 'module': m, 'out_location':fname, 'info': self.distribute[m].output_variables[v]} variable_list[v] = d # determine what type of file to output if self.config['output']['file_type'].lower() == 'netcdf': self.out_func = output.output_netcdf(variable_list, self.topo, self.config['time'], self.config['output']) elif self.config['output']['file_type'].lower() == 'hru': self.out_func = output.output_hru(variable_list, self.topo, self.date_time, self.config['output']) else: raise Exception('Could not determine type of file for output') # is there a function to apply? self.out_func.func = None if 'func' in self.config['output']: self.out_func.func = self.config['output']['func'] else:'No variables will be output') self.output_variables = None
[docs] def output(self, current_time_step, module=None, out_var=None): """ Output the forcing data or model outputs for the current_time_step. Args: current_time_step (date_time): the current time step datetime object module - var_name - """ output_count = self.date_time.index(current_time_step) # Only output according to the user specified value, # or if it is the end. if (output_count % self.config['output']['frequency'] == 0) or \ (output_count == len(self.date_time)): # User is attempting to output single variable if module is not None and out_var is not None: # add only one variable to the output list and preceed as normal var_vals = [self.out_func.variable_list[out_var]] # Incomplete request elif module is not None or out_var is not None: raise ValueError("Function requires an output module and" " variable name when outputting a specific" " variables") else: # Output all the variables var_vals = self.out_func.variable_list.values() # Get the output variables then pass to the function for v in var_vals: # get the data desired data = getattr(self.distribute[v['module']], v['variable']) if data is None: data = np.zeros((self.topo.ny, self.topo.nx)) # output the time step self._logger.debug("Outputting {0}".format(v['module'])) self.out_func.output(v['variable'], data, current_time_step)
[docs] def post_process(self): """ Execute all the post processors """ for k in self.distribute.keys(): self.distribute[k].post_processor(self)
[docs] def title(self, option): """ A little title to go at the top of the logger file """ if option == 1: title = [" .----------------. .----------------. .----------------. .----------------.", " | .--------------. || .--------------. || .--------------. || .--------------. |", " | | _______ | || | ____ ____ | || | _______ | || | _________ | |", " | | / ___ | | || ||_ \ / _|| || | |_ __ \ | || | |_ ___ | | |", " | | | (__ \_| | || | | \/ | | || | | |__) | | || | | |_ \_| | |", " | | '.___`-. | || | | |\ /| | | || | | __ / | || | | _| | |", " | | |`\____) | | || | _| |_\/_| |_ | || | _| | \ \_ | || | _| |_ | |", " | | |_______.' | || ||_____||_____|| || | |____| |___| | || | |_____| | |", " | | | || | | || | | || | | |", " | '--------------' || '--------------' || '--------------' || '--------------' |", " '----------------' '----------------' '----------------' '----------------' ", " "] elif option == 2: title = [" SSSSSSSSSSSSSSS MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRRRR FFFFFFFFFFFFFFFFFFFFFF", " SS:::::::::::::::S M:::::::M M:::::::M R::::::::::::::::R F::::::::::::::::::::F", " S:::::SSSSSS::::::S M::::::::M M::::::::M R::::::RRRRRR:::::R F::::::::::::::::::::F", " S:::::S SSSSSSS M:::::::::M M:::::::::M RR:::::R R:::::R FF::::::FFFFFFFFF::::F", " S:::::S M::::::::::M M::::::::::M R::::R R:::::R F:::::F FFFFFF", " S:::::S M:::::::::::M M:::::::::::M R::::R R:::::R F:::::F", " S::::SSSS M:::::::M::::M M::::M:::::::M R::::RRRRRR:::::R F::::::FFFFFFFFFF", " SS::::::SSSSS M::::::M M::::M M::::M M::::::M R:::::::::::::RR F:::::::::::::::F", " SSS::::::::SS M::::::M M::::M::::M M::::::M R::::RRRRRR:::::R F:::::::::::::::F", " SSSSSS::::S M::::::M M:::::::M M::::::M R::::R R:::::R F::::::FFFFFFFFFF", " S:::::S M::::::M M:::::M M::::::M R::::R R:::::R F:::::F", " S:::::S M::::::M MMMMM M::::::M R::::R R:::::R F:::::F", " SSSSSSS S:::::S M::::::M M::::::M RR:::::R R:::::R FF:::::::FF", " S::::::SSSSSS:::::S M::::::M M::::::M R::::::R R:::::R F::::::::FF", " S:::::::::::::::SS M::::::M M::::::M R::::::R R:::::R F::::::::FF", " SSSSSSSSSSSSSSS MMMMMMMM MMMMMMMM RRRRRRRR RRRRRRR FFFFFFFFFFF", " "] return title
[docs]def run_smrf(config): """ Function that runs smrf how it should be operate for full runs. Args: config: string path to the config file or inicheck UserConfig instance """ start = # initialize with SMRF(config) as s: # load topo data s.loadTopo() # initialize the distribution s.initializeDistribution() # initialize the outputs if desired s.initializeOutput() # load weather data and station metadata s.loadData() # distribute s.distributeData() # post process if necessary s.post_process() - start)
[docs]def can_i_run_smrf(config): """ Function that wraps run_smrf in try, except for testing purposes Args: config: string path to the config file or inicheck UserConfig instance """ try: run_smrf(config) return True except Exception as e: print(e) return False
[docs]def find_pixel_location(row, vec, a): """ Find the index of the stations X/Y location in the model domain Args: row (pandas.DataFrame): metadata rows vec (nparray): Array of X or Y locations in domain a (str): Column in DataFrame to pull data from (i.e. 'X') Returns: Pixel value in vec where row[a] is located """ return np.argmin(np.abs(vec - row[a]))