Source code for smrf.spatial.kriging

import numpy as np
import pandas as pd
from pykrige.ok import OrdinaryKriging


[docs]class KRIGE: ''' Kriging class based on the pykrige package ''' def __init__(self, mx, my, mz, GridX, GridY, GridZ, config): """ Args: mx: x locations for the points my: y locations for the points GridX: x locations in grid to interpolate over GridY: y locations in grid to interpolate over power: power of the inverse distance weighting """ # Measurement point locations self.mx = mx self.my = my self.mz = mz self.nsta = len(mx) # grid information self.GridX = GridX self.GridY = GridY self.GridZ = GridZ self.ngrid = GridX.shape[0] * GridX.shape[1] # data information self.data = None self.nan_val = [] self.config = config # kriging parameters for pykrige self.variogram_model = self.config['krig_variogram_model'] # 'linear' self.variogram_parameters = None self.variogram_function = None # np.min([np.round(len(mx)/2), 6]) self.nlags = self.config['krig_nlags'] self.weight = self.config['krig_weight'] self.anisotropy_scaling = self.config['krig_anisotropy_scaling'] # 1.0 self.anisotropy_angle = self.config['krig_anisotropy_angle'] # 0.0 self.verbose = False self.enable_plotting = False self.enable_statistics = False # not in the pypi release of PyKrige self.coordinates_type = self.config['krig_coordinates_type'] # pykrige execution self.backend = 'vectorized' self.n_closest_points = None
[docs] def calculate(self, data): """ Estimate the variogram, calculate the model, then apply to the grid Arg: data: numpy array same length as m* config: configuration for dk Returns: v: Z-values of specified grid or at thespecified set of points. If style was specified as 'masked', zvalues will be a numpy masked array. sigmasq: Variance at specified grid points or at the specified set of points. If style was specified as 'masked', sigmasq will be a numpy masked array. """ nan_val = pd.isnull(data) if self.config['detrend']: d = self.detrendData(data) else: d = data.copy() OK = OrdinaryKriging(self.mx[~nan_val], self.my[~nan_val], d[~nan_val], variogram_model=self.variogram_model, variogram_parameters=self.variogram_parameters, variogram_function=self.variogram_function, nlags=self.nlags, weight=self.weight, anisotropy_scaling=self.anisotropy_scaling, anisotropy_angle=self.anisotropy_angle, verbose=self.verbose, enable_plotting=self.enable_plotting, enable_statistics=self.enable_statistics) v, ss1 = OK.execute('grid', self.GridX[0, :], self.GridY[:, 0], backend=self.backend, n_closest_points=self.n_closest_points) if self.config['detrend']: # retrend the residuals v = self.retrendData(v) return v, ss1
[docs] def detrendData(self, data, flag=0, zeros=None): ''' Detrend the data in val using the heights zmeas Args: data: is the same size at mx,my flag: - 1 for positive, -1 for negative, 0 for any trend imposed Returns: data minus the elevation trend ''' # calculate the trend on any real data nan_val = np.isnan(data) pv = np.polyfit(self.mz[~nan_val], data[~nan_val], 1) # apply trend constraints if flag == 1 and pv[0] < 0: pv = np.array([0, 0]) elif (flag == -1 and pv[0] > 0): pv = np.array([0, 0]) self.pv = pv # detrend the data el_trend = self.mz * pv[0] + pv[1] if zeros is not None: data[zeros] = self.zeroVal return data - el_trend
[docs] def retrendData(self, idw): ''' Retrend the IDW values ''' # retrend the data return idw + self.pv[0]*self.GridZ + self.pv[1]