Geoprocessing SNODAS state variables

Reference: https://nsidc.org/sites/nsidc.org/files/G02158-V001-UserGuide_2.pdf

cold_content(translated_tif)

Compute cold content as a function of SWE and snow pack avg temp

Parameters:

Name Type Description Default
translated_tif dict

"product_code": { "filetype": str, Matching database acquirable "file": str, Converted file "datetime": str, Valid Time, ISO format with timezone "version": str Reference Time (forecast), ISO format with timezone }

required

Returns:

Type Description
dict

{ "filetype": str, Matching database acquirable "file": str, Converted file "datetime": str, Valid Time, ISO format with timezone "version": str Reference Time (forecast), ISO format with timezone }

Source code in cumulus_geoproc/geoprocess/snodas/__init__.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def cold_content(translated_tif):
    """Compute cold content as a function of SWE and snow pack avg temp

    Parameters
    ----------
    translated_tif : dict

        "product_code": {
            "filetype": str,         Matching database acquirable
            "file": str,             Converted file
            "datetime": str,         Valid Time, ISO format with timezone
            "version": str           Reference Time (forecast), ISO format with timezone
        }

    Returns
    -------
    dict
        {
            "filetype": str,         Matching database acquirable
            "file": str,             Converted file
            "datetime": str,         Valid Time, ISO format with timezone
            "version": str           Reference Time (forecast), ISO format with timezone
        }
    """
    coldcontent_code = "2072"
    swe_code = "1034"
    avg_temp_sp = "1038"

    try:
        swe = translated_tif[swe_code]["file"]
        swe_dt = translated_tif[swe_code]["datetime"]
        avg_temp = translated_tif[avg_temp_sp]["file"]
    except KeyError as ex:
        logger.warning(f"{type(ex).__name__}: {this}: {ex}")
        return

    cold_content_filename = swe.replace(swe_code, coldcontent_code)
    temp_cold_content = swe.replace(swe_code, "9999")

    try:
        cgdal.gdal_calculate(
            "-A",
            swe,
            "-B",
            avg_temp,
            "--outfile",
            temp_cold_content,
            "--calc",
            "A.astype(numpy.float32) * 2114 * (B.astype(numpy.float32) - 273) / 333000",
            "--quiet",
        )
        gdal.Translate(
            tif := cold_content_filename,
            temp_cold_content,
            format="COG",
            creationOptions=[
                "RESAMPLING=BILINEAR",
                "OVERVIEWS=IGNORE_EXISTING",
                "OVERVIEW_RESAMPLING=BILINEAR",
            ],
        )
        # validate COG
        if (validate := cgdal.validate_cog("-q", tif)) == 0:
            logger.debug(f"Validate COG = {validate}\t{tif} is a COG")
    except RuntimeError as ex:
        logger.debug(f"{type(ex).__name__}: {this}: {ex}")
        return None

    return {
        coldcontent_code: {
            "file": tif,
            "filetype": product_code[coldcontent_code]["product"],
            "datetime": swe_dt,
            "version": None,
        }
    }

no_data_value(dt)

No data value determined by time

Parameters:

Name Type Description Default
dt datetime

datetime object

required

Returns:

Type Description
int

raster no data value

Source code in cumulus_geoproc/geoprocess/snodas/__init__.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def no_data_value(dt: datetime):
    """No data value determined by time

    Parameters
    ----------
    dt : datetime
        datetime object

    Returns
    -------
    int
        raster no data value
    """
    dt_nodata = datetime(2011, 1, 24, 0, 0, tzinfo=timezone.utc)
    if dt < dt_nodata:
        return "55537"
    else:
        return "-9999"

snow_melt_mm(translated_tif)

Dictionary of tiffs generated from gdal translate

Parameters:

Name Type Description Default
translated_tif dict

"product_code": { "filetype": str, Matching database acquirable "file": str, Converted file "datetime": str, Valid Time, ISO format with timezone "version": str Reference Time (forecast), ISO format with timezone }

required

Returns:

Type Description
dict

{ "filetype": str, Matching database acquirable "file": str, Converted file "datetime": str, Valid Time, ISO format with timezone "version": str Reference Time (forecast), ISO format with timezone }

Source code in cumulus_geoproc/geoprocess/snodas/__init__.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def snow_melt_mm(translated_tif: dict):
    """Dictionary of tiffs generated from gdal translate

    Parameters
    ----------
    translated_tif : dict

        "product_code": {
            "filetype": str,         Matching database acquirable
            "file": str,             Converted file
            "datetime": str,         Valid Time, ISO format with timezone
            "version": str           Reference Time (forecast), ISO format with timezone
        }

    Returns
    -------
    dict
        {
            "filetype": str,         Matching database acquirable
            "file": str,             Converted file
            "datetime": str,         Valid Time, ISO format with timezone
            "version": str           Reference Time (forecast), ISO format with timezone
        }
    """
    snowmelt_code = "1044"
    snowmelt_code_mm = "3333"

    try:
        snowmelt = translated_tif[snowmelt_code]["file"]
        snowmelt_dt = translated_tif[snowmelt_code]["datetime"]
    except KeyError as ex:
        logger.warning(f"{type(ex).__name__}: {this}: {ex}")
        return

    snowmelt_mm = snowmelt.replace(snowmelt_code, snowmelt_code_mm)
    temp_snowmelt_mm = snowmelt.replace(snowmelt_code, "9999")

    # convert snow melt runoff as meters / 100_000 to mm
    # 100_000 is the scale factor getting values to meters
    try:
        cgdal.gdal_calculate(
            "-A",
            snowmelt,
            "--outfile",
            temp_snowmelt_mm,
            "--calc",
            "A.astype(numpy.float32) / 100_000 * 1000",
            "--quiet",
            "--overwrite",
        )
        gdal.Translate(
            tif := snowmelt_mm,
            temp_snowmelt_mm,
            format="COG",
            creationOptions=[
                "RESAMPLING=BILINEAR",
                "OVERVIEWS=IGNORE_EXISTING",
                "OVERVIEW_RESAMPLING=BILINEAR",
            ],
        )
        # validate COG
        if (validate := cgdal.validate_cog("-q", tif)) == 0:
            logger.debug(f"Validate COG = {validate}\t{tif} is a COG")
    except RuntimeError as ex:
        logger.debug(f"{type(ex).__name__}: {this}: {ex}")
        return None

    return {
        snowmelt_code_mm: {
            "file": tif,
            "filetype": product_code[snowmelt_code_mm]["product"],
            "datetime": snowmelt_dt,
            "version": None,
        }
    }

SNODAS interpolation

is_lakefix(dt, code)

Determine if needing 'lakefix'

Parameters:

Name Type Description Default
dt datetime

datetime object

required
code str

parameter code related to SNODAS parameters

required

Returns:

Type Description
bool

True | False if needing 'lakefix'

Source code in cumulus_geoproc/geoprocess/snodas/interpolate.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def is_lakefix(dt: datetime, code: str):
    """Determine if needing 'lakefix'

    Parameters
    ----------
    dt : datetime
        datetime object
    code : str
        parameter code related to SNODAS parameters

    Returns
    -------
    bool
        True | False if needing 'lakefix'
    """
    codes = ("1034", "1036")
    dt_after = datetime(2014, 10, 9, 0, 0, tzinfo=timezone.utc)
    dt_before = datetime(2019, 10, 10, 0, 0, tzinfo=timezone.utc)
    if (dt_after <= dt <= dt_before) and code in codes:
        return True
    else:
        return False

snodas(cfg, dst) async

Main method building asyncio tasks

Parameters:

Name Type Description Default
cfg namedtuple

namedtuple with SQS message as attributes

required
dst str

FQPN to temporary directory

required

Returns:

Type Description
List[dict[str, str]]

List of dictionary objects with attributes needed to upload to S3

Source code in cumulus_geoproc/geoprocess/snodas/interpolate.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
async def snodas(cfg: namedtuple, dst: str):
    """Main method building asyncio tasks

    Parameters
    ----------
    cfg : namedtuple
        namedtuple with SQS message as attributes
    dst : str
        FQPN to temporary directory

    Returns
    -------
    List[dict[str, str]]
        List of dictionary objects with attributes needed to upload to S3
    """
    # products that don't actually get interpolated but don't know why
    no_interp = (
        "2072",
        "3333",
        "1038",
    )
    tasks = []
    dt = (
        datetime.strptime(cfg.datetime, "%Y%m%d")
        .replace(hour=6)
        .replace(tzinfo=timezone.utc)
    )
    nodata_value = no_data_value(dt)
    # determine spatial coverage; date dependent
    _sc = (
        "us"
        if (
            datetime(2003, 1, 1, tzinfo=timezone.utc)
            <= dt
            < datetime(2010, 2, 17, tzinfo=timezone.utc)
        )
        else "zz"
    )
    logger.debug(f"Spatial coverage is {_sc}")

    for code in ("1034", "1036", "1038", "3333", "2072"):
        filename = Template.substitute(
            product_code[code]["file_template"], SC=_sc, YMD=cfg.datetime
        )
        product = product_code[code]["product"] + "-interpolated"
        key = (
            CUMULUS_PRODUCTS_BASEKEY
            + "/"
            + product_code[code]["product"]
            + "/"
            + filename
        )
        lakefix = is_lakefix(
            dt,
            code,
        )

        max_dist = 0 if code in no_interp else cfg.max_distance

        if download_file := boto.s3_download_file(cfg.bucket, key, dst=dst):
            tasks.append(
                asyncio.create_task(
                    snodas_interp_task(
                        download_file,
                        product,
                        dt,
                        max_dist,
                        nodata_value,
                        lakefix,
                    )
                )
            )

    # return the list of objects that are not None
    return_objs = []
    for task in tasks:
        result = await task
        if result is not None:
            return_objs.append(result)

    return return_objs

snodas_interp_task(filepath, product, dt, max_dist, nodata, lakefix=False) async

SNODAS interpolation task method used with asyncio

Parameters:

Name Type Description Default
filepath str

FQPN to processed SNODAS COG

required
product str

Cumulus product name

required
dt datetime

datetime object

required
max_dist str

maximum distance in pixel for gdal_fillnodata

required
nodata str

raster no data value

required
lakefix bool, optional

determine if product needs 'lakefix', by default False

False

Returns:

Type Description
dict[str, str] | None

Dictionary of attributes needed to upload to S3 or None

Source code in cumulus_geoproc/geoprocess/snodas/interpolate.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
async def snodas_interp_task(
    filepath: str,
    product: str,
    dt: datetime,
    max_dist: str,
    nodata: str,
    lakefix: bool = False,
):
    """SNODAS interpolation task method used with asyncio

    Parameters
    ----------
    filepath : str
        FQPN to processed SNODAS COG
    product : str
        Cumulus product name
    dt : datetime
        datetime object
    max_dist : str
        maximum distance in pixel for gdal_fillnodata
    nodata : str
        raster no data value
    lakefix : bool, optional
        determine if product needs 'lakefix', by default False

    Returns
    -------
    dict[str, str] | None
        Dictionary of attributes needed to upload to S3 or None
    """
    try:
        dst, filename = os.path.split(filepath)

        if lakefix:
            # get the no data masking raster
            masking_raster = pkg_resources.resource_filename(
                __package__, "data/no_data_areas_swe_20140201.tif"
            )
            lakefix_tif = os.path.join(
                dst,
                file_extension(filename, suffix=".tiff"),
            )
            # set zeros as no data (-9999)
            cgdal.gdal_calculate(
                "-A",
                filepath,
                "-B",
                masking_raster,
                "--outfile",
                lakefix_tif,
                "--calc",
                f"numpy.where((A == 0) & (B == {nodata}), {nodata}, A)",
                "--NoDataValue",
                nodata,
                "--quiet",
            )
            filepath = lakefix_tif

        # -of GTiff required here because COG has no Create(); therefore GTiff
        # driver used with creationOptions to be COG
        fill_tif = os.path.join(
            dst, file_extension(filename, suffix="-interpolated.tiff")
        )

        # fillnodata to GTiff
        if max_dist == 0 or (
            cgdal.gdal_fillnodataval(
                filepath,
                fill_tif,
                "-q",
                "-md",
                str(max_dist),
            )
            != 0
        ):
            logger.info("gdal_fillnodata.py not executed")
            fill_tif = filepath

        # convert to COG
        gdal.Translate(
            tif := os.path.join(
                dst, file_extension(filename, suffix="-interpolated.tif")
            ),
            fill_tif,
            format="COG",
            creationOptions=[
                "RESAMPLING=BILINEAR",
                "OVERVIEWS=IGNORE_EXISTING",
                "OVERVIEW_RESAMPLING=BILINEAR",
            ],
        )
        # validate COG
        if (validate := cgdal.validate_cog("-q", tif)) == 0:
            logger.debug(f"Validate COG = {validate}\t{tif} is a COG")

        return {
            "file": tif,
            "filetype": product,
            "datetime": dt.isoformat(),
            "version": None,
        }

    except (RuntimeError, KeyError, Exception) as ex:
        logger.error(f"{type(ex).__name__}: {this}: {ex}")
    finally:
        ds = None

Metadata parser

to_dictionary(src)

ASCII input from SNODAS metadata to a dictionary

all keys are psuedo-slugified

Parameters:

Name Type Description Default
src str

SNODAS metadata as .txt

required

Returns:

Type Description
dict | None

dictionary of metadata parameters and their values or None

Source code in cumulus_geoproc/geoprocess/snodas/metaparse.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def to_dictionary(src: str):
    """ASCII input from SNODAS metadata to a dictionary

    all keys are psuedo-slugified

    Parameters
    ----------
    src : str
        SNODAS metadata as .txt

    Returns
    -------
    dict | None
        dictionary of metadata parameters and their values or None
    """
    try:
        with open(src, "r") as fh:
            config_dict = {}
            for line in fh.readlines():
                k, v = line.split(":")[:2]
                k = k.replace(" ", "_").replace("-", "_").lower()
                v = v.strip()
                # try make the string an int or float if a number
                try:
                    v = int(v) if v.isdigit() else float(v)
                except:
                    pass
                config_dict[k] = v
    except FileNotFoundError as ex:
        return

    return config_dict

to_namedtuple(src, name='Metadata')

Use to_dictionary to generate a namedtuple

Parameters:

Name Type Description Default
src str

SNODAS metadata as .txt

required
name str, optional

collections.namedtuple name, by default "Metadata"

'Metadata'

Returns:

Type Description
collections.namedtuple

namedtuple

Source code in cumulus_geoproc/geoprocess/snodas/metaparse.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def to_namedtuple(src: str, name: str = "Metadata"):
    """Use to_dictionary to generate a namedtuple

    Parameters
    ----------
    src : str
        SNODAS metadata as .txt
    name : str, optional
        collections.namedtuple name, by default "Metadata"

    Returns
    -------
    collections.namedtuple
        namedtuple
    """
    mdata = to_dictionary(src)
    if isinstance(mdata, dict):
        return namedtuple(name, mdata.keys())(*mdata.values())

write_hdr(src, /, columns, rows)

Write a header file (hdr)

Parameters:

Name Type Description Default
src str

SNODAS metadata as .txt

required
columns int

number of columns from the metadata text file

required
rows int

number of rows from the metadata text file

required

Returns:

Type Description
str

FQPN to the written hdr file | None

Source code in cumulus_geoproc/geoprocess/snodas/metaparse.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def write_hdr(src: str, /, columns: int, rows: int):
    """Write a header file (hdr)

    Parameters
    ----------
    src : str
        SNODAS metadata as .txt
    columns: int
        number of columns from the metadata text file
    rows: int
        number of rows from the metadata text file

    Returns
    -------
    str
        FQPN to the written hdr file | None
    """
    try:
        if src.endswith(".txt"):
            with open(hdr_file := src.replace(".txt", ".hdr"), "w") as fh:
                # dedent removing common indentation and lstrip to remove first \n
                hdr = dedent(
                    f"""
                    ENVI
                    samples = {columns}
                    lines = {rows}
                    bands = 1
                    header offset = 0
                    file type = ENVI Standard
                    data type = 2
                    interleave = bsq
                    byte order = 1
                    """
                )
                fh.write(hdr.lstrip())
            return hdr_file
    except OSError as ex:
        return