Source code for smrf.ipw.ipw

"""
Created on Apr 7, 2015

Adapted from Roger Lew (rogerlew.gmail.com)isnobal.py

@author: Scott Havens
"""

from glob import glob
import os
import sys
import numpy as np
from math import ceil


in_db__vars = tuple('I_lw T_a e_a u T_g S_n'.split())
out_em__vars = tuple('R_n H L_v_E G M delta_Q E_s '
                     'melt ro_predict cc_s'.split())
out_snow__vars = tuple('z_s rho m_s h2o T_s_0 T_s_l T_s z_s_l h2o_sat'.split())


def _unpackindx(L):
    return int(L.split()[2])


def _unpackint(L):
    return int(L.split('=')[1].strip())


def _unpackfloat(L):
    return float(L.split('=')[1].strip())


def _unpackstr(L):
    return L.split('=')[1].strip()


def _decode(s):
    return s.decode('utf-8')


[docs]def map_fn(x, float_min, float_max, int_min, int_max): return np.round(((x - float_min) * (int_max - int_min)) / (float_max - float_min))
# _unpackindx = lambda L: int(L.split()[2]) # _unpackint = lambda L: int(L.split('=')[1].strip()) # _unpackfloat = lambda L: float(L.split('=')[1].strip()) # _unpackstr = lambda L: L.split('=')[1].strip() # _decode = lambda s: s.decode('utf-8') # for reading binary files PY_VERSION = 2 if sys.version_info > (3, 0): PY_VERSION = 3
[docs]def readline3(fid): """ Read a line from a binary file in Python3 and decode """ return fid.readline().decode('utf-8')
[docs]class Band: """ Represents a raster band of geospatial data """ def __init__(self, nlines, nsamps): # Using classes instead of dicts makes things faster # even if though they are more verbose to implement # width, height self.nlines, self.nsamps = nlines, nsamps # basic_image self.name = None self.bytes = None self.fmt = None self.frmt = None self.bits = None self.annot = None self.history = [] # geo self.bline = None self.bsamp = None self.dline = None self.dsamp = None self.geounits = None self.coord_sys_ID = None self.geotransform = None self.x = None self.y = None # lq self.int_min = 0 self.int_max = 255 self.float_min = 0.0 self.float_max = 1.0 self.units = None self.transform = lambda x: (1.0 - 0.0) * (x / 255) + 0 self.data = None def _parse_geo_readline(self, L0, L1, L2, L3, L4, L5): """ Get the geo header information from readline() """ bline = self.bline = _unpackfloat(L0) bsamp = self.bsamp = _unpackfloat(L1) dline = self.dline = _unpackfloat(L2) dsamp = self.dsamp = _unpackfloat(L3) self.geounits = _unpackstr(L4) self.coord_sys_ID = _unpackstr(L5) self.geotransform = [bsamp - dsamp / 2.0, dsamp, 0.0, bline - dline / 2.0, 0.0, dline] self.y = bline + np.arange(self.nlines)*dline self.x = bsamp + np.arange(self.nsamps)*dsamp def _parse_geo(self, L0, L1, L2, L3, L4, L5): """ Get the geo header information given values """ bline = self.bline = L0 bsamp = self.bsamp = L1 dline = self.dline = L2 dsamp = self.dsamp = L3 self.geounits = L4 self.coord_sys_ID = L5 self.geotransform = [bsamp - dsamp / 2.0, dsamp, 0.0, bline - dline / 2.0, 0.0, dline] def _parse_lq(self, L0, L1): """ Pulls the scale values from line0 and line1 builds a function for transforming integer values to floats """ [[int_min, float_min], [int_max, float_max]] = [L0.split(), L1.split()] int_min, float_min, int_max, float_max = \ map(float, [int_min, float_min, int_max, float_max]) self.transform = \ lambda x: (float_max - float_min) * (x / int_max) + float_min self.int_min, self.int_max = int_min, int_max self.float_min, self.float_max = float_min, float_max def __str__(self): return '''\ nlines (height): {0.nlines} nsamps (width): {0.nsamps} bytes: {0.bytes} transform: {0.int_min} -> {0.int_max} {0.float_min} -> {0.float_max} geo: {0.bline} {0.bsamp} {0.dline} {0.dsamp} '''.format(self)
[docs]class IPW: """ Represents a IPW file container """ def __init__(self, fname=None, epsg=32611): """ IPW(fname[, rescale=True]) Parameters ---------- fname : string path to the IPW container if "in.", "em.", or "snow." are in fname band names will be pulled from the cooresponding global varlist: in_db__vars out_em__vars out_snow__vars rescale : bool (default = True) Specifies whether data should be rescaled to singles based on the lq map or whether they should remain uint8/16 epsg : int (default = 32611) North America Centric Cheat Sheet ----------------------------------------------- UTM Zone 10 Northern Hemisphere (WGS 84) 32610 UTM Zone 11 Northern Hemisphere (WGS 84) 32611 UTM Zone 12 Northern Hemisphere (WGS 84) 32612 UTM Zone 13 Northern Hemisphere (WGS 84) 32613 UTM Zone 14 Northern Hemisphere (WGS 84) 32614 UTM Zone 15 Northern Hemisphere (WGS 84) 32615 UTM Zone 16 Northern Hemisphere (WGS 84) 32616 UTM Zone 17 Northern Hemisphere (WGS 84) 32617 UTM Zone 18 Northern Hemisphere (WGS 84) 32618 UTM Zone 19 Northern Hemisphere (WGS 84) 32619 """ global in_db__vars, out_em__vars, out_snow__vars # this should just be stored as an attribute # it produces alot of book-keeping otherwise self.epsg = epsg # read a file or create an empty object if fname is not None: if PY_VERSION == 2: self.read(fname) else: self.read3(fname) else: self.fname = None self.bands = [] self.bip = None self.byteorder = [0, 1, 2, 3] self.nlines = None self.nsamps = None self.nbands = None self.geohdr = None
[docs] def read(self, fname): """ Read the IPW file into the various bands Turns the data into a numpy array of float32 """ # read the data to a list of lines fid = open(fname, 'rb') readline = fid.readline # dots make things slow (and ugly) tell = fid.tell bands = [] byteorder = None nlines = None nsamps = None nbands = None geo = None # size of the file we are reading in bytes st_size = os.fstat(fid.fileno()).st_size while 1: # while 1 is faster than while True line = _decode(readline()) # fail safe, haven't needed it, but if this is running # on a server not a bad idea to have it here if tell() == st_size: raise Exception('Unexpectedly reached EOF') if '!<header> basic_image_i' in line: byteorder = map(int, readline().split('=')[1].strip()) nlines = _unpackint(readline()) nsamps = _unpackint(readline()) nbands = _unpackint(readline()) # initialize the band instances in the bands list bands = [Band(nlines, nsamps) for j in range(nbands)] elif '!<header> basic_image' in line: indx = _unpackindx(line) # band number bytes = bands[indx].bytes = _unpackint(readline()) # bands[indx].frmt = ('uint8', 'uint16')[bytes == 2] bands[indx].frmt = 'uint' + str(bytes*8) bands[indx].bits = _unpackint(readline()) # see if there is any other information last_pos = fid.tell() line = readline() while line[0] != '!': key = line.split('=')[0].strip() val = line.split('=')[1].strip() if key == 'annot': bands[indx].annot = val elif key == 'history': bands[indx].history.append(val) elif key == 'fmt': bands[indx].fmt = val last_pos = fid.tell() line = readline() if len(line) == 0: break # set pointer back one line fid.seek(last_pos) elif '!<header> geo' in line: indx = _unpackindx(line) bands[indx]._parse_geo_readline( *[readline() for i in range(6)]) geo = 1 elif '!<header> lq' in line: indx = _unpackindx(line) line1 = fid.readline() if 'units' in line1: bands[indx].units = _unpackstr(line1) bands[indx]._parse_lq(_unpackstr(readline()), _unpackstr(readline())) else: bands[indx]._parse_lq(_unpackstr(line1), _unpackstr(readline())) if '\f' in line: # feed form character separates the break # image header from the binary data # attempt to assign names to the bands assert nbands == len(bands) if 'in.' in fname: varlist = in_db__vars elif 'em.' in fname: varlist = out_em__vars elif 'snow.' in fname: varlist = out_snow__vars else: varlist = ['band%02i' % i for i in range(nbands)] assert len(varlist) >= nbands for b, name in zip(bands, varlist[:nbands]): b.name = name # Unpack the binary data using numpy.fromfile # because we have been reading line by line fid is at the # first data byte, we will read it all as one big chunk # and then put it into the appropriate bands # # np.types allow you to define heterogenous arrays of mixed # types and reference them with keys, this helps us out here dt = np.dtype([(b.name, b.frmt) for b in bands]) bip = sum([b.bytes for b in bands]) # bytes-in-pixel required_bytes = bip * nlines * nsamps assert (st_size - tell()) >= required_bytes # this is way faster than looping with struct.unpack # struct.unpack also starts assuming there are pad bytes # when format strings with different types are supplied data = np.fromfile(fid, dt, count=nlines*nsamps) # Separate into bands data = data.reshape(nlines, nsamps) for b in bands: b.data = np.array(b.transform(data[b.name]), dtype=np.float32) # else: # b.data = np.array(data[b.name], # dtype=np.dtype(b.frmt)) # clean things up self.fname = fname # self.rescale = rescale # self.name_dict = dict(zip(varlist, range(nbands))) self.bands = bands self.bip = bip self.byteorder = byteorder self.nlines = nlines self.nsamps = nsamps self.nbands = nbands self.geohdr = geo fid.close()
[docs] def read3(self, fname): """ Read the IPW file into the various bands Turns the data into a numpy array of float32 This is meant for Python3 which has different ways of ready binary files than Python2 """ # read the data to a list of lines fid = open(fname, 'rb') bands = [] byteorder = None nlines = None nsamps = None nbands = None geo = None # size of the file we are reading in bytes st_size = os.fstat(fid.fileno()).st_size while 1: # while 1 is faster than while True line = readline3(fid) # fail safe, haven't needed it, but if this is running # on a server not a bad idea to have it here if fid.tell() == st_size: raise Exception('Unexpectedly reached EOF') if '!<header> basic_image_i' in line: byteorder = list(map(int, readline3(fid).split('=')[1].strip())) nlines = _unpackint(readline3(fid)) nsamps = _unpackint(readline3(fid)) nbands = _unpackint(readline3(fid)) # initialize the band instances in the bands list bands = [Band(nlines, nsamps) for j in range(nbands)] elif '!<header> basic_image' in line: indx = _unpackindx(line) # band number bytes = bands[indx].bytes = _unpackint(readline3(fid)) # bands[indx].frmt = ('uint8', 'uint16')[bytes == 2] bands[indx].frmt = 'uint' + str(bytes*8) bands[indx].bits = _unpackint(readline3(fid)) # see if there is any other information last_pos = fid.tell() line = readline3(fid) while line[0] != '!': key = line.split('=')[0].strip() val = line.split('=')[1].strip() if key == 'annot': bands[indx].annot = val elif key == 'history': bands[indx].history.append(val) elif key == 'fmt': bands[indx].fmt = val last_pos = fid.tell() line = readline3(fid) if len(line) == 0: break # set pointer back one line fid.seek(last_pos) elif '!<header> geo' in line: indx = _unpackindx(line) bands[indx]._parse_geo_readline( *[readline3(fid) for i in range(6)]) geo = 1 elif '!<header> lq' in line: indx = _unpackindx(line) line1 = readline3(fid) if 'units' in line1: bands[indx].units = _unpackstr(line1) bands[indx]._parse_lq(_unpackstr(readline3(fid)), _unpackstr(readline3(fid))) else: bands[indx]._parse_lq(_unpackstr(line1), _unpackstr(readline3(fid))) if '\f' in line: # feed form character separates the break # image header from the binary data # attempt to assign names to the bands assert nbands == len(bands) if 'in.' in fname: varlist = in_db__vars elif 'em.' in fname: varlist = out_em__vars elif 'snow.' in fname: varlist = out_snow__vars else: varlist = ['band%02i' % i for i in range(nbands)] assert len(varlist) >= nbands for b, name in zip(bands, varlist[:nbands]): b.name = name # Unpack the binary data using numpy.fromfile # because we have been reading line by line fid is at the # first data byte, we will read it all as one big chunk # and then put it into the appropriate bands # # np.types allow you to define heterogenous arrays of mixed # types and reference them with keys, this helps us out here dt = np.dtype([(b.name, b.frmt) for b in bands]) bip = sum([b.bytes for b in bands]) # bytes-in-pixel required_bytes = bip * nlines * nsamps assert (st_size - fid.tell()) >= required_bytes # this is way faster than looping with struct.unpack # struct.unpack also starts assuming there are pad bytes # when format strings with different types are supplied data = np.fromfile(fid, dt, count=nlines*nsamps) # Separate into bands data = data.reshape(nlines, nsamps) for b in bands: b.data = np.array(b.transform(data[b.name]), dtype=np.float32) # else: # b.data = np.array(data[b.name], # dtype=np.dtype(b.frmt)) # clean things up self.fname = fname # self.rescale = rescale # self.name_dict = dict(zip(varlist, range(nbands))) self.bands = bands self.bip = bip self.byteorder = byteorder self.nlines = nlines self.nsamps = nsamps self.nbands = nbands self.geohdr = geo fid.close()
[docs] def write(self, fileName, nbits=8): """ Write the IPW data to file Args: fileName - file to write to nbits - number of bits for each band default=8 """ # set the bits, bytes, and int_max for each band # have to convert nbits one to float then back at the end # for ceil to work for i, b in enumerate(self.bands): self.bands[i].name = 'band%02i' % i self.bands[i].bits = nbits bytes = self.bands[i].bytes = int(ceil(float(nbits)/8)) self.bands[i].frmt = 'uint' + str(bytes*8) self.bands[i].int_min = 0 self.bands[i].int_max = 2**nbits - 1 float_max = np.amax(self.bands[i].data) float_min = np.amin(self.bands[i].data) # correct if the min and max are the same if float_max == float_min: float_max += 1 self.bands[i].float_max = float_max self.bands[i].float_min = float_min # prepare the headers last_line = "!<header> image -1 $Revision: 1.5 $" with open(fileName, 'w') as f: # write the global variables for l in self._write_basic_image_i(): f.write(l + '\n') # write any basic_image headers for l in self._write_basic_image(): f.write(l + '\n') # write the geo headers, if there if self.geohdr is not None: for l in self._write_geo_hdr(): f.write(l + '\n') # write the linear quantization (lq) headers for l in self._write_lq_hdr(): f.write(l + '\n') # if other headers are required, then put them here # write the last header line to the file f.write(last_line + '\f\n') # Convert the data to a structured array write the binary data data = self._convert_float_to_int(nbits) # for i in range(data.shape[0]): # data[i,:,:].tofile(f) data.tofile(f) # close the file f.close() return None
def _write_basic_image_i(self): """ Create the first 5 lines of the IPW file for byteorder, nlines, nsamps, and nbands """ firstLine = "!<header> basic_image_i -1 $Revision: 1.11 $" # byteorder from array to string byteorder = ''.join(str(x) for x in self.byteorder) firstLines = [firstLine, "byteorder = {0} ".format(byteorder), "nlines = {0} ".format(self.nlines), "nsamps = {0} ".format(self.nsamps), "nbands = {0} ".format(self.nbands)] return firstLines def _write_basic_image(self): """ Create the basic_image headers for each band self.name = None self.bytes = None self.fmt = None self.bits = None self.annot = None self.history = [] """ lines = [] for i, band in enumerate(self.bands): lines += ["!<header> basic_image {0} $Revision: 1.11 $".format(i), "bytes = {0} ".format(band.bytes), "bits = {0} ".format(band.bits)] if band.fmt is not None: lines += ["fmt = {0} ".format(band.fmt)] if band.annot is not None: lines += ['annot = {0} '.format(band.annot)] if band.history: for l in band.history: lines += ["history = {0} ".format(l)] return lines def _write_lq_hdr(self): """ Create the linear quantization (lq) headers """ lines = [] # build the linear quantization (lq) headers for i, b in enumerate(self.bands): int_min = int(b.int_min) int_max = int(b.int_max) # IPW writes integer floats without a dec point, # so remove if necessary float_min = int_match(b.float_min) float_max = int_match(b.float_max) # determine if there are units if b.units is not None: lines += ["!<header> lq {0} $Revision: 1.6 $".format(i), "units = {0} ".format(b.units), "map = {0} {1} ".format(int_min, float_min), "map = {0} {1} ".format(int_max, float_max)] else: lines += ["!<header> lq {0} $Revision: 1.6 $".format(i), "map = {0} {1} ".format(int_min, float_min), "map = {0} {1} ".format(int_max, float_max)] return lines def _write_geo_hdr(self): """ Create the geo headers """ lines = [] # build the geographic header for i, b in enumerate(self.bands): bline = int(b.bline) bsamp = int(b.bsamp) dline = int(b.dline) dsamp = int(b.dsamp) units = b.geounits coord_sys_ID = b.coord_sys_ID lines += ["!<header> geo {0} $Revision: 1.7 $".format(i), "bline = {0} ".format(bline), "bsamp = {0} ".format(bsamp), "dline = {0} ".format(dline), "dsamp = {0} ".format(dsamp), "units = {0} ".format(units), "coord_sys_ID = {0} ".format(coord_sys_ID)] return lines def _convert_float_to_int(self, nbits): """ Convert the data floating point data to mapped integer """ # define a structure array dt = np.dtype([(b.name, b.frmt) for b in self.bands]) data_int = np.zeros((self.nlines, self.nsamps,), dt) # data_int = [] for b in self.bands: data_int[b.name] = map_fn(b.data, b.float_min, b.float_max, 0, b.int_max) return data_int def _bits_to_bytes(self, nbits): """ Convert bits to equiavalent bytes """ return int(ceil(float(nbits)/8)) def __getitem__(self, key): return self.bands[self.name_dict[key]]
[docs] def new_band(self, data): """ Create a new band in IPW that is placed at the end if bands already exist. """ # get the size of data nlines, nsamps = data.shape # check if there are other bands if self.nbands is not None: assert nlines == self.nlines assert nsamps == self.nsamps else: # self.byteorder = [0,1,2,3] self.nlines = nlines self.nsamps = nsamps # create a new empty band band = Band(nlines, nsamps) # add the data band.data = data # add to the end of IPW bands self.bands.append(band) # adjust the number of band value self.nbands = len(self.bands)
[docs] def add_geo_hdr(self, coordinates, d, units, csys, band='all'): """ Add geo header information to the band Args: coordinates: [u, v] The coordinates of image line 0 and sample 0 in csys are u and v, respectively. d: [dx, dy] The distances between adjacent image lines and samples in csys are du and dv, respectively. units: u, v, du, and dv are specified in units (e.g., "meters", "km"). csys: The geodetic coordinate system identifier is csys (e.g., "UTM", "Lambert"). See the manual for mkproj for a list of standard names for coordinate systems. band (optional): [0,1,4,...] The "geo" header will be applied only to the specified bands (default: all bands). if 'all' for band number then do all, or a subset from array? # geo self.bline = None self.bsamp = None self.dline = None self.dsamp = None self.geounits = None self.coord_sys_ID = None self.geotransform = None mkgeoh -o coord,coord -d incr,incr -u units -c csys [-b band,...] [-f] [image] """ # Check the inputs if len(coordinates) != 2: raise SyntaxError('Coordinates must have two values [u, v]') if len(d) != 2: raise SyntaxError('d must have two values [du, dv]') if band == 'all': band = range(0, self.nbands) # all bands elif type(band) == int: band = [band] # single band # loop through each band and add the header for bidx in band: self.bands[bidx]._parse_geo(coordinates[0], coordinates[1], d[0], d[1], units, csys) self.geohdr = 1
# def _parse_geo(self, L0, L1, L2, L3, L4, L5): # """ # sets attributes and builds GDAL ordered # geotransform list # """ # bline = self.bline = _unpackfloat(L0) # bsamp = self.bsamp = _unpackfloat(L1) # dline = self.dline = _unpackfloat(L2) # dsamp = self.dsamp = _unpackfloat(L3) # self.geounits = _unpackstr(L4) # self.coord_sys_ID = _unpackstr(L5) # self.geotransform = [bsamp - dsamp / 2.0, # dsamp, 0.0, # bline - dline / 2.0, # 0.0, dline] def __str__(self): s = ['''\ IPW({0.fname}) --------------------------------------------------------- byteorder: {0.byteorder} nlines: {0.nlines} nsamps: {0.nsamps} nbands: {0.nbands} '''.format(self)] for i, b in enumerate(self.bands): s.append('\nBand %i\n' % i) s.append(str(b)) return ''.join(s)
[docs]def int_match(val): """ (Devel team is unsure if this is the case. This description is our interpretation of pre-existing code that was re-written to be up-to-date with deprecations. We are confident it behaves the same as it did before.) IPW writes in the header a integer represented as a float in python without the decimal. So sometimes we have to proactively check and pass a int to the header function. e.g. float_min == 1.00, IPW writes 1 Args: val: an potential int represented as float. Returns: result: the val is casted as int if val is equivalent to itself casted as an int """ int_val = int(val) return int_val if val == int_val else val
def _packgrp(root, grp, wc, varlist, nbands=None): fns = glob(wc) assert len(fns) > 0 ipw0 = IPW(fns[0]) shape = (ipw0.nlines * ipw0.nsamps, len(fns), (nbands, ipw0.nbands)[nbands is None]) data = np.zeros(shape, np.float32) for i, fn in enumerate(fns): ipw = IPW(fn) for j, b in enumerate(ipw.bands): assert varlist[j] == b.name data[:, i, j] = b.data.flatten() root.create_group(grp) for i, key in enumerate(varlist): root[grp].create_dataset(key, data=data[:, :, i])