Source code for smrf.spatial.dk.dk

"""
2016-02-22 Scott Havens

Distributed forcing data over a grid using detrended kriging
"""

import numpy as np
from . import detrended_kriging
import logging
import pandas as pd


[docs]class DK: """ Detrended kriging class """ def __init__(self, mx, my, mz, GridX, GridY, GridZ, config): """ Args: mx: x locations for the points my: y locations for the points mz: z locations for the points GridX: x locations in grid to interpolate over GridY: y locations in grid to interpolate over GridZ: z locations in grid to interpolate over """ # measurement point locations self.mx = mx self.my = my self.mz = mz self.nsta = len(mx) # grid information self.GridX = GridX self.GridY = GridY self.GridZ = GridZ self.ngrid = GridX.shape[0] * GridX.shape[1] # data information self.data = None self.nan_val = [] # initialize some things self.weights = [] self.dgrid = [] self.ad = [] self.config = config # calculate the distances # self.calculateDistances() # calculate the weights # self.calculateWeights() self._logger = logging.getLogger(__name__)
[docs] def calculate(self, data): """ Calcluate the deternded kriging for the data and config Arg: data: numpy array same length as m* config: configuration for dk Returns: v: returns the distributed and calculated value """ nan_val = pd.isnull(data) # only calcualte if the stations involved have changed # if np.sum(np.array_equal(nan_val,self.nan_val)) != len(self.mx): if not np.array_equal(nan_val, self.nan_val): self.nan_val = nan_val nsta = np.sum(~nan_val) self._logger.debug('''Recalculating detrended kriging weights for {} stations ...'''.format(nsta)) self.calculateWeights() # now calculate the trend and the residuals self.detrendData(data) # distribute the risduals r = np.nansum(self.weights * self.residuals, 2) # retrend the residuals v = self.retrendData(r) # # create a plot for the DOCS # fs = 16 # fw = 'bold' # xi = np.array([500, 3000]) # yi = self.pv[0]*xi + self.pv[1] # fig = plt.figure(figsize=(24,9)) # # extent = (np.min(self.GridX), np.max(self.GridX), np.min(self.GridY), np.max(self.GridY)) # # elevational trend # ax0 = plt.subplot(1,3,1) # plt.plot(self.mz, data, 'o', xi, yi, 'k--') # plt.text(600, 2.0, 'Slope: %f' % self.pv[0], fontsize=fs, fontweight=fw) # plt.xlabel('Elevation [m]', fontsize=fs, fontweight=fw) # plt.ylabel('Air Temperature [C]', fontsize=fs, fontweight=fw) # ax0.set_ylim(0, 3.0) # # ax1 = plt.subplot(1,3,2) # im1 = ax1.imshow(r, aspect='equal',extent=extent) # plt.plot(self.mx, self.my, 'o') # plt.title('Distributed Residuals', fontsize=fs, fontweight=fw) # cbar = plt.colorbar(im1, orientation="horizontal") # cbar.ax.tick_params(labelsize=fs-2) # plt.tick_params( # axis='both', # changes apply to the x-axis # which='both', # both major and minor ticks are affected # bottom='off', # ticks along the bottom edge are off # top='off', # ticks along the top edge are off # left='off', # right='off', # labelleft='off', # labelbottom='off') # labels along the bottom edge are off # ax1.set_xlim(extent[0], extent[1]) # ax1.set_ylim(extent[2], extent[3]) # # # # retrended # ax2 = plt.subplot(133) # im2 = ax2.imshow(v, aspect='equal',extent=extent) # plt.plot(self.mx, self.my, 'o') # plt.title('Retrended by Elevation', fontsize=fs, fontweight=fw) # cbar = plt.colorbar(im2, orientation="horizontal") # cbar.ax.tick_params(labelsize=fs-2) # plt.tick_params( # axis='both', # changes apply to the x-axis # which='both', # both major and minor ticks are affected # bottom='off', # ticks along the bottom edge are off # top='off', # ticks along the top edge are off # left='off', # right='off', # labelleft='off', # labelbottom='off') # labels along the bottom edge are off # ax2.set_xlim(extent[0], extent[1]) # ax2.set_ylim(extent[2], extent[3]) # # for item in ([ax0.xaxis.label, ax0.yaxis.label] + # ax0.get_xticklabels() + ax0.get_yticklabels()): # item.set_fontsize(fs) # item.set_fontweight(fw) # # plt.tight_layout() # plt.show() return v
[docs] def calculateWeights(self): """ Calculate the weights given those stations with nan values for data """ nsta = np.sum(~self.nan_val) mx = self.mx[~self.nan_val] my = self.my[~self.nan_val] mz = self.mz[~self.nan_val] # calculate the distances between stations ad = np.zeros((nsta, nsta)) for i in range(nsta): ad[i, i] = 0 for j in range(i+1, nsta): ad[i, j] = np.sqrt((mx[i] - mx[j])**2 + (my[i] - my[j])**2) ad[j, i] = np.sqrt((mx[i] - mx[j])**2 + (my[i] - my[j])**2) self.ad = ad # calculate the distances from the grid to the station dgrid = np.zeros((self.ngrid, nsta)) Xa = self.GridX.ravel() Ya = self.GridY.ravel() for i in range(nsta): dgrid[:, i] = np.sqrt((Xa - mx[i])**2 + (Ya - my[i])**2) self.dgrid = dgrid # calculate the weights wg = np.zeros_like(dgrid) detrended_kriging.call_grid(self.ad, self.dgrid, mz.astype(np.double), wg, self.config['dk_nthreads']) # reshape the weights self.weights = np.zeros((self.GridX.shape[0], self.GridX.shape[1], nsta)) for v in range(nsta): self.weights[:, :, v] = wg[:, v].reshape(self.GridX.shape)
[docs] def detrendData(self, data): """ Detrend the data in val using the heights zmeas data - is the same size at mx,my flag - 1 for positive, -1 for negative, 0 for any trend imposed """ # calculate the trend on any real data if self.config['regression_method'] == 1: pv = np.polyfit(self.mz[~self.nan_val], data[~self.nan_val], 1) # apply trend constraints if self.config['slope'] == 1 and pv[0] < 0: pv = np.array([0, 0]) elif (self.config['slope'] == -1 and pv[0] > 0): pv = np.array([0, 0]) self.pv = pv # detrend the data el_trend = self.mz[~self.nan_val] * pv[0] + pv[1] self.residuals = data[~self.nan_val] - el_trend
[docs] def retrendData(self, r): """ Retrend the residual values """ # retrend the data return r + self.pv[0]*self.GridZ + self.pv[1]