From b57a35f6c5e11923ed0758d4071a274b587c628e Mon Sep 17 00:00:00 2001 From: wrongkindofdoctor <20195932+wrongkindofdoctor@users.noreply.github.com> Date: Fri, 20 Dec 2024 11:39:55 -0500 Subject: [PATCH 1/9] move call_build function to base class and define a class parser attribute that is modified for each child class at initialization --- tools/catalog_builder/catalog_builder.py | 51 ++++++------------------ 1 file changed, 12 insertions(+), 39 deletions(-) diff --git a/tools/catalog_builder/catalog_builder.py b/tools/catalog_builder/catalog_builder.py index 1c7c1098e..4acd02fb7 100644 --- a/tools/catalog_builder/catalog_builder.py +++ b/tools/catalog_builder/catalog_builder.py @@ -386,6 +386,7 @@ def __init__(self): self.variable_col_name = "variable_id" self.path_col_name = "path" self.cb = None + self.file_parse_method = "" def cat_builder(self, data_paths: list, exclude_patterns=None, @@ -406,6 +407,14 @@ def cat_builder(self, data_paths: list, extension='.nc' # extension of target file ) + def call_build(self, file_parse_method=None): + if file_parse_method is None: + file_parse_method = self.file_parse_method + # see https://github.com/ncar-xdev/ecgtools/blob/main/ecgtools/parsers/cmip6.py + # for more parsing methods + self.cb = self.cb.build(parsing_func=file_parse_method) + print('Build complete') + def call_save(self, output_dir: str, output_filename: str ): @@ -434,14 +443,7 @@ class CatalogCMIP(CatalogBase): def __init__(self): super().__init__() - - def call_build(self, file_parse_method=None): - if file_parse_method is None: - file_parse_method = parse_cmip6 - # see https://github.com/ncar-xdev/ecgtools/blob/main/ecgtools/parsers/cmip6.py - # for more parsing methods - self.cb = self.cb.build(parsing_func=file_parse_method) - print('Build complete') + self.file_parse_method = parse_cmip6 @catalog_class.maker @@ -458,15 +460,7 @@ def __init__(self): 'member_id', 'realm' ] - def call_build(self, - file_parse_method=None): - - if file_parse_method is None: - file_parse_method = parse_gfdl_pp_ts - # see https://github.com/ncar-xdev/ecgtools/blob/main/ecgtools/parsers/cmip6.py - # for more parsing methods - self.cb = self.cb.build(parsing_func=file_parse_method) - print('Build complete') + self.file_parse_method = parse_gfdl_pp_ts @catalog_class.maker @@ -477,28 +471,7 @@ class CatalogCESM(CatalogBase): """ def __init__(self): super().__init__() - self.groupby_attrs = [ - 'component', - 'stream', - 'case', - 'frequency' - ] - - self.xarray_aggregations = [ - {'type': 'union', 'attribute_name': 'variable_id'}, - { - 'type': 'join_existing', - 'attribute_name': 'date', - 'options': {'dim': 'time', 'coords': 'minimal', 'compat': 'override'} - } - ] - - def call_build(self, file_parse_method=None): - if file_parse_method is None: - file_parse_method = parse_cesm_timeseries - # see https://github.com/ncar-xdev/ecgtools/blob/main/ecgtools/parsers/cesm.py - # for more parsing methods - self.cb = self.cb.build(parsing_func=file_parse_method) + self.file_parse_method = parse_cesm_timeseries def load_config(config): From eaa0d63b731bb7e01b102f0612b030e65ade0bfb Mon Sep 17 00:00:00 2001 From: wrongkindofdoctor <20195932+wrongkindofdoctor@users.noreply.github.com> Date: Fri, 20 Dec 2024 16:48:49 -0500 Subject: [PATCH 2/9] move parsers and json utilities to a separate module --- tools/catalog_builder/catalog_builder.py | 308 +--------------------- tools/catalog_builder/parsers.py | 313 +++++++++++++++++++++++ 2 files changed, 316 insertions(+), 305 deletions(-) create mode 100644 tools/catalog_builder/parsers.py diff --git a/tools/catalog_builder/catalog_builder.py b/tools/catalog_builder/catalog_builder.py index 4acd02fb7..57b63b170 100644 --- a/tools/catalog_builder/catalog_builder.py +++ b/tools/catalog_builder/catalog_builder.py @@ -15,19 +15,13 @@ import click import intake import os -import io -import json -import pathlib import sys import time -import traceback import typing -import collections -import xarray as xr import yaml +import parsers from datetime import timedelta from ecgtools import Builder -from ecgtools.builder import INVALID_ASSET, TRACEBACK from ecgtools.parsers.cmip import parse_cmip6 from ecgtools.parsers.cesm import parse_cesm_timeseries import logging @@ -65,302 +59,6 @@ def __getitem__(self, n): # instantiate the class maker catalog_class = ClassMaker() -def strip_comments(str_: str, delimiter=None): - """Remove comments from *str_*. Comments are taken to start with an - arbitrary *delimiter* and run to the end of the line. - """ - # would be better to use shlex, but that doesn't support multi-character - # comment delimiters like '//' - escaped_quote_placeholder = '\v' # no one uses vertical tab - - if not delimiter: - return str_ - lines = str_.splitlines() - for i in range(len(lines)): - # get rid of lines starting with delimiter - if lines[i].startswith(delimiter): - lines[i] = '' - continue - # handle delimiters midway through a line: - # If delimiter appears quoted in a string, don't want to treat it as - # a comment. So for each occurrence of delimiter, count number of - # "s to its left and only truncate when that's an even number. - # First we get rid of -escaped single "s. - replaced_line = lines[i].replace('\\\"', escaped_quote_placeholder) - line_parts = replaced_line.split(delimiter) - quote_counts = [s.count('"') for s in line_parts] - j = 1 - while sum(quote_counts[:j]) % 2 != 0: - if j >= len(quote_counts): - raise ValueError(f"Couldn't parse line {i+1} of string.") - j += 1 - replaced_line = delimiter.join(line_parts[:j]) - lines[i] = replaced_line.replace(escaped_quote_placeholder, '\\\"') - # make lookup table of correct line numbers, taking into account lines we - # dropped - line_nos = [i for i, s in enumerate(lines) if (s and not s.isspace())] - # join lines, stripping blank lines - new_str = '\n'.join([s for s in lines if (s and not s.isspace())]) - return new_str, line_nos -def parse_json(str_: str): - """Parse JSONC (JSON with ``//``-comments) string *str_* into a Python object. - Comments are discarded. Wraps standard library :py:func:`json.loads`. - - Syntax errors in the input (:py:class:`~json.JSONDecodeError`) are passed - through from the Python standard library parser. We correct the line numbers - mentioned in the errors to refer to the original file (i.e., with comments.) - """ - def _pos_from_lc(lineno, colno, str_): - # fix line number, since we stripped commented-out lines. JSONDecodeError - # computes line/col no. in error message from character position in string. - lines = str_.splitlines() - return (colno - 1) + sum((len(line) + 1) for line in lines[:lineno]) - - (strip_str, line_nos) = strip_comments(str_, delimiter='//') - try: - parsed_json = json.loads(strip_str, - object_pairs_hook=collections.OrderedDict) - except json.JSONDecodeError as exc: - # fix reported line number, since we stripped commented-out lines. - assert exc.lineno <= len(line_nos) - raise json.JSONDecodeError( - msg=exc.msg, doc=str_, - pos=_pos_from_lc(line_nos[exc.lineno-1], exc.colno, str_) - ) - except UnicodeDecodeError as exc: - raise json.JSONDecodeError( - msg=f"parse_json received UnicodeDecodeError:\n{exc}", - doc=strip_str, pos=0 - ) - - return parsed_json -def read_json(file_path: str, log=_log) -> dict: - """Reads a struct from a JSONC file at *file_path*. - """ - log.debug('Reading file %s', file_path) - try: - with io.open(file_path, 'r', encoding='utf-8') as file_: - str_ = file_.read() - except Exception as exc: - # something more serious than missing file - _log.critical("Caught exception when trying to read %s: %r", file_path, exc) - exit(1) - return parse_json(str_) - -freq_opts = ['mon', - 'day', - 'daily', - '6hr', - '3hr', - '1hr', - 'subhr', - 'annual', - 'year'] - -# custom parser for GFDL am5 data that uses fieldlist metadata and the DRS to populate -# required catalog fields -def parse_gfdl_am5_data(file_name: str): - - file = pathlib.Path(file_name) # uncomment when ready to run - - num_dir_parts = len(file.parts) # file name index = num_parts 1 - # isolate file from rest of path - stem = file.stem - # split the file name into components based on - # assume am5 file name format is {realm}.{time_range}.[variable_id}.nc - split = stem.split('.') - num_file_parts = len(split) - realm = split[0] - cell_methods = "" - cell_measures = "" - time_range = split[1] - start_time = time_range.split('-')[0] - end_time = time_range.split('-')[1] - variable_id = split[2] - source_type = "" - member_id = "" - experiment_id = "" - source_id = "" - chunk_freq = file.parts[num_dir_parts - 2] # e.g, 1yr, 5yr - variant_label = "" - grid_label = "" - table_id = "" - assoc_files = "" - activity_id = "GFDL" - institution_id = "" - long_name = "" - standard_name = "" - units = "" - output_frequency = "" - file_freq = file.parts[num_dir_parts - 3] - - for f in freq_opts: - if f in file_freq: - output_frequency = f - break - if 'daily' in output_frequency: - output_frequency = 'day' - elif 'monthly' in output_frequency: - output_frequency = 'mon' - - # read metadata from the appropriate fieldlist - if 'cmip' in realm.lower(): - gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_CMIP.jsonc') - else: - gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_GFDL.jsonc') - try: - gfdl_info = read_json(gfdl_fieldlist, log=_log) - except IOError: - print("Unable to open file", gfdl_fieldlist) - sys.exit(1) - - if hasattr(gfdl_info['variables'], variable_id): - var_metadata = gfdl_info['variables'].get(variable_id) - else: - raise KeyError(f'{variable_id} not found in {gfdl_fieldlist}') - - if hasattr(var_metadata, 'standard_name'): - standard_name = var_metadata.standard_name - if hasattr(var_metadata, 'long_name'): - long_name = var_metadata.long_name - if hasattr(var_metadata, 'units'): - units = var_metadata.units - - try: - info = { - 'activity_id': activity_id, - 'assoc_files': assoc_files, - 'institution_id': institution_id, - 'member_id': member_id, - 'realm': realm, - 'variable_id': variable_id, - 'table_id': table_id, - 'source_id': source_id, - 'source_type': source_type, - 'cell_methods': cell_methods, - 'cell_measures': cell_measures, - 'experiment_id': experiment_id, - 'variant_label': variant_label, - 'grid_label': grid_label, - 'units': units, - 'time_range': time_range, - 'start_time': start_time, - 'end_time': end_time, - 'chunk_freq': chunk_freq, - 'standard_name': standard_name, - 'long_name': long_name, - 'frequency': output_frequency, - 'file_name': stem, - 'path': str(file) - } - - return info - - except Exception as exc: - print(exc) - return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} - - -# custom parser for pp data stored on GFDL archive filesystem -# assumed DRS of [root_dir]/pp/[realm]/[analysis type (e.g, 'ts')]/[frequency]/[chunk size (e.g., 1yr, 5yr)] - -def parse_gfdl_pp_ts(file_name: str): - # files = sorted(glob.glob(os.path.join(file_name,'*.nc'))) # debug comment when ready to run - # file = pathlib.Path(files[0]) # debug comment when ready to run - file = pathlib.Path(file_name) # uncomment when ready to run - num_parts = len(file.parts) # file name index = num_parts 1 - # isolate file from rest of path - stem = file.stem - # split the file name into components based on _ - split = stem.split('.') - realm = split[0] - cell_methods = "" - cell_measures = "" - time_range = split[1] - start_time = time_range.split('-')[0] - end_time = time_range.split('-')[1] - variable_id = split[2] - source_type = "" - member_id = "" - experiment_id = "" - source_id = "" - chunk_freq = file.parts[num_parts - 2] # e.g, 1yr, 5yr - variant_label = "" - grid_label = "" - table_id = "" - assoc_files = "" - activity_id = "GFDL" - institution_id = "" - - output_frequency = "" - file_freq = file.parts[num_parts - 3] - for f in freq_opts: - if f in file_freq: - output_frequency = f - break - if 'daily' in output_frequency: - output_frequency = 'day' - elif 'monthly' in output_frequency: - output_frequency = 'mon' - - try: - # call to xr.open_dataset required by ecgtoos.builder.Builder - with xr.open_dataset(file, chunks={}, decode_times=False) as ds: - variable_list = [var for var in ds if 'standard_name' in ds[var].attrs or 'long_name' in ds[var].attrs] - if variable_id not in variable_list: - print(f'Asset variable {variable_id} not found in {file}') - exit(1) - standard_name = "" - long_name = "" - if 'standard_name' in ds[variable_id].attrs: - standard_name = ds[variable_id].attrs['standard_name'] - standard_name.replace("", "_") - if 'long_name' in ds[variable_id].attrs: - long_name = ds[variable_id].attrs['long_name'] - if len(long_name) == 0 and len(standard_name) == 0: - print('Asset variable does not contain a standard_name or long_name attribute') - exit(1) - - if 'cell_methods' in ds[variable_id].attrs: - cell_methods = ds[variable_id].attrs['cell_methods'] - if 'cell_measures' in ds[variable_id].attrs: - cell_measures = ds[variable_id].attrs['cell_measures'] - - units = ds[variable_id].attrs['units'] - info = { - 'activity_id': activity_id, - 'assoc_files': assoc_files, - 'institution_id': institution_id, - 'member_id': member_id, - 'realm': realm, - 'variable_id': variable_id, - 'table_id': table_id, - 'source_id': source_id, - 'source_type': source_type, - 'cell_methods': cell_methods, - 'cell_measures': cell_measures, - 'experiment_id': experiment_id, - 'variant_label': variant_label, - 'grid_label': grid_label, - 'units': units, - 'time_range': time_range, - 'start_time': start_time, - 'end_time': end_time, - 'chunk_freq': chunk_freq, - 'standard_name': standard_name, - 'long_name': long_name, - 'frequency': output_frequency, - 'file_name': stem, - 'path': str(file) - } - - return info - - except Exception as exc: - print(exc) - return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} - - class CatalogBase(object): """Catalog base class\n """ @@ -460,7 +158,7 @@ def __init__(self): 'member_id', 'realm' ] - self.file_parse_method = parse_gfdl_pp_ts + self.file_parse_method = parsers.parse_gfdl_pp_ts @catalog_class.maker @@ -515,7 +213,7 @@ def main(config: str): file_parse_method = None if conf['dataset_id'] is not None: if 'am5' in conf['dataset_id'].lower(): - file_parse_method = parse_gfdl_am5_data + file_parse_method = parsers.parse_gfdl_am5_data # build the catalog print('Building the catalog') diff --git a/tools/catalog_builder/parsers.py b/tools/catalog_builder/parsers.py new file mode 100644 index 000000000..675cf9cb9 --- /dev/null +++ b/tools/catalog_builder/parsers.py @@ -0,0 +1,313 @@ +import xarray as xr +import traceback +import pathlib +import os +import io +import sys +import json +import logging +import collections +from ecgtools.builder import INVALID_ASSET, TRACEBACK + + +# Define a log object for debugging +_log = logging.getLogger(__name__) + +ROOT_DIR = os.path.dirname(os.path.realpath(__file__)).split('/tools/catalog_builder')[0] + +freq_opts = ['mon', + 'day', + 'daily', + '6hr', + '3hr', + '1hr', + 'subhr', + 'annual', + 'year'] + + +def strip_comments(str_: str, delimiter=None): + """ Remove comments from *str_*. Comments are taken to start with an + arbitrary *delimiter* and run to the end of the line. + """ + # would be better to use shlex, but that doesn't support multi-character + # comment delimiters like '//' + escaped_quote_placeholder = '\v' # no one uses vertical tab + + if not delimiter: + return str_ + lines = str_.splitlines() + for i in range(len(lines)): + # get rid of lines starting with delimiter + if lines[i].startswith(delimiter): + lines[i] = '' + continue + # handle delimiters midway through a line: + # If delimiter appears quoted in a string, don't want to treat it as + # a comment. So for each occurrence of delimiter, count number of + # "s to its left and only truncate when that's an even number. + # First we get rid of -escaped single "s. + replaced_line = lines[i].replace('\\\"', escaped_quote_placeholder) + line_parts = replaced_line.split(delimiter) + quote_counts = [s.count('"') for s in line_parts] + j = 1 + while sum(quote_counts[:j]) % 2 != 0: + if j >= len(quote_counts): + raise ValueError(f"Couldn't parse line {i+1} of string.") + j += 1 + replaced_line = delimiter.join(line_parts[:j]) + lines[i] = replaced_line.replace(escaped_quote_placeholder, '\\\"') + # make lookup table of correct line numbers, taking into account lines we + # dropped + line_nos = [i for i, s in enumerate(lines) if (s and not s.isspace())] + # join lines, stripping blank lines + new_str = '\n'.join([s for s in lines if (s and not s.isspace())]) + return new_str, line_nos +def parse_json(str_: str): + """Parse JSONC (JSON with ``//``-comments) string *str_* into a Python object. + Comments are discarded. Wraps standard library :py:func:`json.loads`. + + Syntax errors in the input (:py:class:`~json.JSONDecodeError`) are passed + through from the Python standard library parser. We correct the line numbers + mentioned in the errors to refer to the original file (i.e., with comments.) + """ + def _pos_from_lc(lineno, colno, str_): + # fix line number, since we stripped commented-out lines. JSONDecodeError + # computes line/col no. in error message from character position in string. + lines = str_.splitlines() + return (colno - 1) + sum((len(line) + 1) for line in lines[:lineno]) + + (strip_str, line_nos) = strip_comments(str_, delimiter='//') + try: + parsed_json = json.loads(strip_str, + object_pairs_hook=collections.OrderedDict) + except json.JSONDecodeError as exc: + # fix reported line number, since we stripped commented-out lines. + assert exc.lineno <= len(line_nos) + raise json.JSONDecodeError( + msg=exc.msg, doc=str_, + pos=_pos_from_lc(line_nos[exc.lineno-1], exc.colno, str_) + ) + except UnicodeDecodeError as exc: + raise json.JSONDecodeError( + msg=f"parse_json received UnicodeDecodeError:\n{exc}", + doc=strip_str, pos=0 + ) + + return parsed_json +def read_json(file_path: str, log=_log) -> dict: + """Reads a struct from a JSONC file at *file_path*. + """ + log.debug('Reading file %s', file_path) + try: + with io.open(file_path, 'r', encoding='utf-8') as file_: + str_ = file_.read() + except Exception as exc: + # something more serious than missing file + _log.critical("Caught exception when trying to read %s: %r", file_path, exc) + exit(1) + return parse_json(str_) + + +# custom parser for GFDL am5 data that uses fieldlist metadata and the DRS to populate +# required catalog fields +def parse_gfdl_am5_data(file_name: str): + + file = pathlib.Path(file_name) # uncomment when ready to run + + num_dir_parts = len(file.parts) # file name index = num_parts 1 + # isolate file from rest of path + stem = file.stem + # split the file name into components based on + # assume am5 file name format is {realm}.{time_range}.[variable_id}.nc + split = stem.split('.') + num_file_parts = len(split) + realm = split[0] + cell_methods = "" + cell_measures = "" + time_range = split[1] + start_time = time_range.split('-')[0] + end_time = time_range.split('-')[1] + variable_id = split[2] + source_type = "" + member_id = "" + experiment_id = "" + source_id = "" + chunk_freq = file.parts[num_dir_parts - 2] # e.g, 1yr, 5yr + variant_label = "" + grid_label = "" + table_id = "" + assoc_files = "" + activity_id = "GFDL" + institution_id = "" + long_name = "" + standard_name = "" + units = "" + output_frequency = "" + file_freq = file.parts[num_dir_parts - 3] + + for f in freq_opts: + if f in file_freq: + output_frequency = f + break + if 'daily' in output_frequency: + output_frequency = 'day' + elif 'monthly' in output_frequency: + output_frequency = 'mon' + + # read metadata from the appropriate fieldlist + if 'cmip' in realm.lower(): + gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_CMIP.jsonc') + else: + gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_GFDL.jsonc') + try: + gfdl_info = read_json(gfdl_fieldlist, log=_log) + except IOError: + print("Unable to open file", gfdl_fieldlist) + sys.exit(1) + + if hasattr(gfdl_info['variables'], variable_id): + var_metadata = gfdl_info['variables'].get(variable_id) + else: + raise KeyError(f'{variable_id} not found in {gfdl_fieldlist}') + + if hasattr(var_metadata, 'standard_name'): + standard_name = var_metadata.standard_name + if hasattr(var_metadata, 'long_name'): + long_name = var_metadata.long_name + if hasattr(var_metadata, 'units'): + units = var_metadata.units + + try: + info = { + 'activity_id': activity_id, + 'assoc_files': assoc_files, + 'institution_id': institution_id, + 'member_id': member_id, + 'realm': realm, + 'variable_id': variable_id, + 'table_id': table_id, + 'source_id': source_id, + 'source_type': source_type, + 'cell_methods': cell_methods, + 'cell_measures': cell_measures, + 'experiment_id': experiment_id, + 'variant_label': variant_label, + 'grid_label': grid_label, + 'units': units, + 'time_range': time_range, + 'start_time': start_time, + 'end_time': end_time, + 'chunk_freq': chunk_freq, + 'standard_name': standard_name, + 'long_name': long_name, + 'frequency': output_frequency, + 'file_name': stem, + 'path': str(file) + } + + return info + + except Exception as exc: + print(exc) + return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} + + +# custom parser for pp data stored on GFDL archive filesystem +# assumed DRS of [root_dir]/pp/[realm]/[analysis type (e.g, 'ts')]/[frequency]/[chunk size (e.g., 1yr, 5yr)] + +def parse_gfdl_pp_ts(file_name: str): + # files = sorted(glob.glob(os.path.join(file_name,'*.nc'))) # debug comment when ready to run + # file = pathlib.Path(files[0]) # debug comment when ready to run + file = pathlib.Path(file_name) # uncomment when ready to run + num_parts = len(file.parts) # file name index = num_parts 1 + # isolate file from rest of path + stem = file.stem + # split the file name into components based on _ + split = stem.split('.') + realm = split[0] + cell_methods = "" + cell_measures = "" + time_range = split[1] + start_time = time_range.split('-')[0] + end_time = time_range.split('-')[1] + variable_id = split[2] + source_type = "" + member_id = "" + experiment_id = "" + source_id = "" + chunk_freq = file.parts[num_parts - 2] # e.g, 1yr, 5yr + variant_label = "" + grid_label = "" + table_id = "" + assoc_files = "" + activity_id = "GFDL" + institution_id = "" + + output_frequency = "" + file_freq = file.parts[num_parts - 3] + for f in freq_opts: + if f in file_freq: + output_frequency = f + break + if 'daily' in output_frequency: + output_frequency = 'day' + elif 'monthly' in output_frequency: + output_frequency = 'mon' + + try: + # call to xr.open_dataset required by ecgtoos.builder.Builder + with xr.open_dataset(file, chunks={}, decode_times=False) as ds: + variable_list = [var for var in ds if 'standard_name' in ds[var].attrs or 'long_name' in ds[var].attrs] + if variable_id not in variable_list: + print(f'Asset variable {variable_id} not found in {file}') + exit(1) + standard_name = "" + long_name = "" + if 'standard_name' in ds[variable_id].attrs: + standard_name = ds[variable_id].attrs['standard_name'] + standard_name.replace("", "_") + if 'long_name' in ds[variable_id].attrs: + long_name = ds[variable_id].attrs['long_name'] + if len(long_name) == 0 and len(standard_name) == 0: + print('Asset variable does not contain a standard_name or long_name attribute') + exit(1) + + if 'cell_methods' in ds[variable_id].attrs: + cell_methods = ds[variable_id].attrs['cell_methods'] + if 'cell_measures' in ds[variable_id].attrs: + cell_measures = ds[variable_id].attrs['cell_measures'] + + units = ds[variable_id].attrs['units'] + info = { + 'activity_id': activity_id, + 'assoc_files': assoc_files, + 'institution_id': institution_id, + 'member_id': member_id, + 'realm': realm, + 'variable_id': variable_id, + 'table_id': table_id, + 'source_id': source_id, + 'source_type': source_type, + 'cell_methods': cell_methods, + 'cell_measures': cell_measures, + 'experiment_id': experiment_id, + 'variant_label': variant_label, + 'grid_label': grid_label, + 'units': units, + 'time_range': time_range, + 'start_time': start_time, + 'end_time': end_time, + 'chunk_freq': chunk_freq, + 'standard_name': standard_name, + 'long_name': long_name, + 'frequency': output_frequency, + 'file_name': stem, + 'path': str(file) + } + + return info + + except Exception as exc: + print(exc) + return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} From 07cef4b1e86e3473b29297f536af57218d0c6c76 Mon Sep 17 00:00:00 2001 From: wrongkindofdoctor <20195932+wrongkindofdoctor@users.noreply.github.com> Date: Mon, 23 Dec 2024 12:15:12 -0500 Subject: [PATCH 3/9] refine gfdl parsers --- tools/catalog_builder/parsers.py | 189 +++++++++++-------------------- 1 file changed, 67 insertions(+), 122 deletions(-) diff --git a/tools/catalog_builder/parsers.py b/tools/catalog_builder/parsers.py index 675cf9cb9..9de9714dc 100644 --- a/tools/catalog_builder/parsers.py +++ b/tools/catalog_builder/parsers.py @@ -7,8 +7,11 @@ import json import logging import collections + +from dask.array.core import chunks_from_arrays from ecgtools.builder import INVALID_ASSET, TRACEBACK +from src.cmip6 import variant_label_regex # Define a log object for debugging _log = logging.getLogger(__name__) @@ -25,6 +28,30 @@ 'annual', 'year'] +catalog_keys = [ + 'activity_id', + 'assoc_files', + 'institution_id', + 'member_id', + 'realm', + 'variable_id', + 'table_id', + 'source_id', + 'source_type', + 'cell_methods', + 'cell_measures', + 'experiment_id', + 'variant_label', + 'grid_label', + 'units', + 'time_range', + 'chunk_freq', + 'standard_name', + 'long_name', + 'frequency', + 'file_name', + 'path' + ] def strip_comments(str_: str, delimiter=None): """ Remove comments from *str_*. Comments are taken to start with an @@ -109,6 +136,10 @@ def read_json(file_path: str, log=_log) -> dict: return parse_json(str_) +catalog_info = dict() +for k in catalog_keys: + catalog_info[k] = "" + # custom parser for GFDL am5 data that uses fieldlist metadata and the DRS to populate # required catalog fields def parse_gfdl_am5_data(file_name: str): @@ -122,41 +153,25 @@ def parse_gfdl_am5_data(file_name: str): # assume am5 file name format is {realm}.{time_range}.[variable_id}.nc split = stem.split('.') num_file_parts = len(split) - realm = split[0] - cell_methods = "" - cell_measures = "" - time_range = split[1] - start_time = time_range.split('-')[0] - end_time = time_range.split('-')[1] - variable_id = split[2] - source_type = "" - member_id = "" - experiment_id = "" - source_id = "" - chunk_freq = file.parts[num_dir_parts - 2] # e.g, 1yr, 5yr - variant_label = "" - grid_label = "" - table_id = "" - assoc_files = "" - activity_id = "GFDL" - institution_id = "" - long_name = "" - standard_name = "" - units = "" - output_frequency = "" + catalog_info.update({"realm": split[0]}) + catalog_info.update({"time_range": split[1]}) + catalog_info.update({"variable_id": split[2]}) + catalog_info.update({"chunk_freq": file.parts[num_dir_parts - 2]}) + catalog_info.update({"activity_id": "GFDL"}) + catalog_info.update({"institution_id": "GFDL"}) file_freq = file.parts[num_dir_parts - 3] for f in freq_opts: if f in file_freq: - output_frequency = f + catalog_info.update({"frequency": f}) break - if 'daily' in output_frequency: - output_frequency = 'day' - elif 'monthly' in output_frequency: - output_frequency = 'mon' + if 'daily' in file_freq: + catalog_info.update({"frequency": "day"}) + elif 'monthly' in file_freq: + catalog_info.update({"frequency": "mon"}) # read metadata from the appropriate fieldlist - if 'cmip' in realm.lower(): + if 'cmip' in catalog_info['realm'].lower(): gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_CMIP.jsonc') else: gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_GFDL.jsonc') @@ -166,51 +181,19 @@ def parse_gfdl_am5_data(file_name: str): print("Unable to open file", gfdl_fieldlist) sys.exit(1) - if hasattr(gfdl_info['variables'], variable_id): - var_metadata = gfdl_info['variables'].get(variable_id) + if hasattr(gfdl_info['variables'], catalog_info['variable_id']): + var_metadata = gfdl_info['variables'].get(catalog_info['variable_id']) else: - raise KeyError(f'{variable_id} not found in {gfdl_fieldlist}') + raise KeyError(f'{catalog_info['variable_id']} not found in {gfdl_fieldlist}') if hasattr(var_metadata, 'standard_name'): - standard_name = var_metadata.standard_name + catalog_info.update({'standard_name': var_metadata.standard_name}) if hasattr(var_metadata, 'long_name'): - long_name = var_metadata.long_name + catalog_info.update({'long_name': var_metadata.long_name}) if hasattr(var_metadata, 'units'): - units = var_metadata.units + catalog_info.update({'units': var_metadata.units}) - try: - info = { - 'activity_id': activity_id, - 'assoc_files': assoc_files, - 'institution_id': institution_id, - 'member_id': member_id, - 'realm': realm, - 'variable_id': variable_id, - 'table_id': table_id, - 'source_id': source_id, - 'source_type': source_type, - 'cell_methods': cell_methods, - 'cell_measures': cell_measures, - 'experiment_id': experiment_id, - 'variant_label': variant_label, - 'grid_label': grid_label, - 'units': units, - 'time_range': time_range, - 'start_time': start_time, - 'end_time': end_time, - 'chunk_freq': chunk_freq, - 'standard_name': standard_name, - 'long_name': long_name, - 'frequency': output_frequency, - 'file_name': stem, - 'path': str(file) - } - - return info - - except Exception as exc: - print(exc) - return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} + return catalog_info # custom parser for pp data stored on GFDL archive filesystem @@ -226,35 +209,24 @@ def parse_gfdl_pp_ts(file_name: str): # split the file name into components based on _ split = stem.split('.') realm = split[0] - cell_methods = "" - cell_measures = "" time_range = split[1] - start_time = time_range.split('-')[0] - end_time = time_range.split('-')[1] variable_id = split[2] - source_type = "" - member_id = "" - experiment_id = "" - source_id = "" chunk_freq = file.parts[num_parts - 2] # e.g, 1yr, 5yr - variant_label = "" - grid_label = "" - table_id = "" - assoc_files = "" - activity_id = "GFDL" - institution_id = "" - output_frequency = "" + catalog_info.update({"variable_id": variable_id}) + catalog_info.update({"chunk_freq": chunk_freq}) + catalog_info.update({"realm": realm}) + catalog_info.update({"time_range": time_range}) + file_freq = file.parts[num_parts - 3] for f in freq_opts: if f in file_freq: - output_frequency = f + catalog_info.update({"frequency": f}) break - if 'daily' in output_frequency: - output_frequency = 'day' - elif 'monthly' in output_frequency: - output_frequency = 'mon' - + if 'daily' in file_freq: + catalog_info.update({"frequency": "day"}) + elif 'monthly' in file_freq: + catalog_info.update({"frequency": "mon"}) try: # call to xr.open_dataset required by ecgtoos.builder.Builder with xr.open_dataset(file, chunks={}, decode_times=False) as ds: @@ -262,51 +234,24 @@ def parse_gfdl_pp_ts(file_name: str): if variable_id not in variable_list: print(f'Asset variable {variable_id} not found in {file}') exit(1) - standard_name = "" - long_name = "" if 'standard_name' in ds[variable_id].attrs: standard_name = ds[variable_id].attrs['standard_name'] standard_name.replace("", "_") + catalog_info.update({"standard_name": ds[variable_id].attrs['standard_name']}) if 'long_name' in ds[variable_id].attrs: - long_name = ds[variable_id].attrs['long_name'] - if len(long_name) == 0 and len(standard_name) == 0: + catalog_info.update({"long_name": ds[variable_id].attrs['long_name']}) + if len(ds[variable_id].attrs['long_name']) == 0 and len(standard_name) == 0: print('Asset variable does not contain a standard_name or long_name attribute') exit(1) if 'cell_methods' in ds[variable_id].attrs: - cell_methods = ds[variable_id].attrs['cell_methods'] + catalog_info.update({"cell_methods": ds[variable_id].attrs['cell_methods']}) if 'cell_measures' in ds[variable_id].attrs: - cell_measures = ds[variable_id].attrs['cell_measures'] + catalog_info.update({"cell_measures": ds[variable_id].attrs['cell_measures']}) - units = ds[variable_id].attrs['units'] - info = { - 'activity_id': activity_id, - 'assoc_files': assoc_files, - 'institution_id': institution_id, - 'member_id': member_id, - 'realm': realm, - 'variable_id': variable_id, - 'table_id': table_id, - 'source_id': source_id, - 'source_type': source_type, - 'cell_methods': cell_methods, - 'cell_measures': cell_measures, - 'experiment_id': experiment_id, - 'variant_label': variant_label, - 'grid_label': grid_label, - 'units': units, - 'time_range': time_range, - 'start_time': start_time, - 'end_time': end_time, - 'chunk_freq': chunk_freq, - 'standard_name': standard_name, - 'long_name': long_name, - 'frequency': output_frequency, - 'file_name': stem, - 'path': str(file) - } + catalog_info.update({"units": [variable_id].attrs['units']}) - return info + return catalog_info except Exception as exc: print(exc) From cba1d829e5e5e657fe4596b6bfdebb3eb89673f5 Mon Sep 17 00:00:00 2001 From: wrongkindofdoctor <20195932+wrongkindofdoctor@users.noreply.github.com> Date: Mon, 23 Dec 2024 15:53:55 -0500 Subject: [PATCH 4/9] add function to get metadata from file attributes --- tools/catalog_builder/parsers.py | 97 ++++++++++++++++++++++++-------- 1 file changed, 73 insertions(+), 24 deletions(-) diff --git a/tools/catalog_builder/parsers.py b/tools/catalog_builder/parsers.py index 9de9714dc..36300c0e0 100644 --- a/tools/catalog_builder/parsers.py +++ b/tools/catalog_builder/parsers.py @@ -140,6 +140,22 @@ def read_json(file_path: str, log=_log) -> dict: for k in catalog_keys: catalog_info[k] = "" + +def parse_nc_file(file_path: pathlib.Path, catalog_dict: dict) -> dict: + # call to xr.open_dataset required by ecgtools.builder.Builder + with xr.open_dataset(file_path, chunks={}, decode_times=False) as ds: + variable_list = [var for var in ds if 'standard_name' in ds[var].attrs or 'long_name' in ds[var].attrs] + for var in variable_list: + for attr in catalog_keys: + if attr in ds[var].attrs: + catalog_info.update({attr: ds[var].attrs[attr]}) + if len(ds[var].attrs['long_name']) == 0 and len(ds[var].attrs['long_name']) == 0: + print('Asset variable does not contain a standard_name or long_name attribute') + exit(1) + + return catalog_info + + # custom parser for GFDL am5 data that uses fieldlist metadata and the DRS to populate # required catalog fields def parse_gfdl_am5_data(file_name: str): @@ -228,31 +244,64 @@ def parse_gfdl_pp_ts(file_name: str): elif 'monthly' in file_freq: catalog_info.update({"frequency": "mon"}) try: - # call to xr.open_dataset required by ecgtoos.builder.Builder - with xr.open_dataset(file, chunks={}, decode_times=False) as ds: - variable_list = [var for var in ds if 'standard_name' in ds[var].attrs or 'long_name' in ds[var].attrs] - if variable_id not in variable_list: - print(f'Asset variable {variable_id} not found in {file}') - exit(1) - if 'standard_name' in ds[variable_id].attrs: - standard_name = ds[variable_id].attrs['standard_name'] - standard_name.replace("", "_") - catalog_info.update({"standard_name": ds[variable_id].attrs['standard_name']}) - if 'long_name' in ds[variable_id].attrs: - catalog_info.update({"long_name": ds[variable_id].attrs['long_name']}) - if len(ds[variable_id].attrs['long_name']) == 0 and len(standard_name) == 0: - print('Asset variable does not contain a standard_name or long_name attribute') - exit(1) - - if 'cell_methods' in ds[variable_id].attrs: - catalog_info.update({"cell_methods": ds[variable_id].attrs['cell_methods']}) - if 'cell_measures' in ds[variable_id].attrs: - catalog_info.update({"cell_measures": ds[variable_id].attrs['cell_measures']}) - - catalog_info.update({"units": [variable_id].attrs['units']}) - - return catalog_info + # populate information from file metadata + parse_nc_file(file, catalog_info) + except Exception as exc: + print(exc) + return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} + +# custom parser for GFDL am5 data that uses fieldlist metadata and the DRS to populate +# required catalog fields + +def parse_cesm(file_name: str): + file = pathlib.Path(file_name) + + num_dir_parts = len(file.parts) # file name index = num_parts 1 + # isolate file from rest of path + stem = file.stem + # split the file name into components based on + # assume am5 file name format is {realm}.{time_range}.[variable_id}.nc + split = stem.split('.') + num_file_parts = len(split) + catalog_info.update({"realm": split[0]}) + catalog_info.update({"time_range": split[1]}) + catalog_info.update({"variable_id": split[2]}) + catalog_info.update({"activity_id": "CESM"}) + catalog_info.update({"institution_id": "NCAR"}) + file_freq = file.parts[num_dir_parts - 3] + + for f in freq_opts: + if f in file_freq: + catalog_info.update({"frequency": f}) + break + if 'daily' in file_freq: + catalog_info.update({"frequency": "day"}) + elif 'monthly' in file_freq: + catalog_info.update({"frequency": "mon"}) + + # read metadata from the appropriate fieldlist + cesm_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_CESM.jsonc') + try: + cesm_info = read_json(cesm_fieldlist, log=_log) + except IOError: + print("Unable to open file", cesm_fieldlist) + sys.exit(1) + + if hasattr(cesm_info['variables'], catalog_info['variable_id']): + var_metadata = cesm_info['variables'].get(catalog_info['variable_id']) + else: + raise KeyError(f'{catalog_info['variable_id']} not found in {cesm_fieldlist}') + if hasattr(var_metadata, 'standard_name'): + catalog_info.update({'standard_name': var_metadata.standard_name}) + if hasattr(var_metadata, 'long_name'): + catalog_info.update({'long_name': var_metadata.long_name}) + if hasattr(var_metadata, 'units'): + catalog_info.update({'units': var_metadata.units}) + + try: + # populate information from file metadata + parse_nc_file(file, catalog_info) except Exception as exc: print(exc) return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} From 471f2cc24d05af0c467dc1c4266e9f981256177d Mon Sep 17 00:00:00 2001 From: wrongkindofdoctor <20195932+wrongkindofdoctor@users.noreply.github.com> Date: Thu, 26 Dec 2024 16:35:33 -0500 Subject: [PATCH 5/9] add debugging statements to catalog_builder make reference dict definition a function refine the catalog population procedure need to define time_range --- tools/catalog_builder/catalog_builder.py | 14 +++- tools/catalog_builder/parsers.py | 92 +++++++++++------------- 2 files changed, 55 insertions(+), 51 deletions(-) diff --git a/tools/catalog_builder/catalog_builder.py b/tools/catalog_builder/catalog_builder.py index 57b63b170..15443e66c 100644 --- a/tools/catalog_builder/catalog_builder.py +++ b/tools/catalog_builder/catalog_builder.py @@ -108,6 +108,7 @@ def cat_builder(self, data_paths: list, def call_build(self, file_parse_method=None): if file_parse_method is None: file_parse_method = self.file_parse_method + print(file_parse_method) # see https://github.com/ncar-xdev/ecgtools/blob/main/ecgtools/parsers/cmip6.py # for more parsing methods self.cb = self.cb.build(parsing_func=file_parse_method) @@ -169,7 +170,15 @@ class CatalogCESM(CatalogBase): """ def __init__(self): super().__init__() - self.file_parse_method = parse_cesm_timeseries + self.groupby_attrs = [ + 'activity_id', + 'institution_id', + 'experiment_id', + 'frequency', + 'member_id', + 'realm' + ] + self.file_parse_method = parsers.parse_cesm def load_config(config): @@ -188,6 +197,7 @@ def main(config: str): conf = load_config(config) for p in conf['data_root_dirs']: try: + print(p) os.path.isdir(p) except FileNotFoundError: print("{p} not found. Check data_root_dirs for typos.") @@ -225,7 +235,7 @@ def main(config: str): print("Time to build catalog:", timedelta(seconds=end_time - start_time)) # save the catalog - print('Saving catalog to', conf['output_dir'],'/',conf['output_filename'] + ".csv") + print('Saving catalog to',conf['output_dir'],'/',conf['output_filename'] + ".csv") cat_obj.call_save(output_dir=conf['output_dir'], output_filename=conf['output_filename'] diff --git a/tools/catalog_builder/parsers.py b/tools/catalog_builder/parsers.py index 36300c0e0..29fff2a79 100644 --- a/tools/catalog_builder/parsers.py +++ b/tools/catalog_builder/parsers.py @@ -11,7 +11,6 @@ from dask.array.core import chunks_from_arrays from ecgtools.builder import INVALID_ASSET, TRACEBACK -from src.cmip6 import variant_label_regex # Define a log object for debugging _log = logging.getLogger(__name__) @@ -136,30 +135,36 @@ def read_json(file_path: str, log=_log) -> dict: return parse_json(str_) -catalog_info = dict() -for k in catalog_keys: - catalog_info[k] = "" - - -def parse_nc_file(file_path: pathlib.Path, catalog_dict: dict) -> dict: +def parse_nc_file(file_path: pathlib.Path, catalog_info: dict) -> dict: # call to xr.open_dataset required by ecgtools.builder.Builder - with xr.open_dataset(file_path, chunks={}, decode_times=False) as ds: - variable_list = [var for var in ds if 'standard_name' in ds[var].attrs or 'long_name' in ds[var].attrs] + exclude_vars = ('time', 'time_bnds', 'date') + with xr.open_dataset(file_path, chunks={}, decode_times=False, engine="netcdf4") as ds: + variable_list = [var for var in ds if 'standard_name' in ds[var].attrs + or 'long_name' in ds[var].attrs and + var not in ds.coords and + var.lower() not in exclude_vars] for var in variable_list: + if len(ds[var].attrs['long_name']) == 0 and len(ds[var].attrs['long_name']) == 0: + print('Asset variable does not contain a standard_name or long_name attribute') + exit(1) for attr in catalog_keys: if attr in ds[var].attrs: catalog_info.update({attr: ds[var].attrs[attr]}) - if len(ds[var].attrs['long_name']) == 0 and len(ds[var].attrs['long_name']) == 0: - print('Asset variable does not contain a standard_name or long_name attribute') - exit(1) + if catalog_info['variable_id'] == "": + catalog_info.update({'variable_id': var}) return catalog_info +def setup_catalog() -> dict(): + catalog_info = dict() + for k in catalog_keys: + catalog_info[k] = "" + return catalog_info # custom parser for GFDL am5 data that uses fieldlist metadata and the DRS to populate # required catalog fields def parse_gfdl_am5_data(file_name: str): - + catalog_info = setup_catalog() file = pathlib.Path(file_name) # uncomment when ready to run num_dir_parts = len(file.parts) # file name index = num_parts 1 @@ -168,7 +173,6 @@ def parse_gfdl_am5_data(file_name: str): # split the file name into components based on # assume am5 file name format is {realm}.{time_range}.[variable_id}.nc split = stem.split('.') - num_file_parts = len(split) catalog_info.update({"realm": split[0]}) catalog_info.update({"time_range": split[1]}) catalog_info.update({"variable_id": split[2]}) @@ -191,6 +195,7 @@ def parse_gfdl_am5_data(file_name: str): gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_CMIP.jsonc') else: gfdl_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_GFDL.jsonc') + try: gfdl_info = read_json(gfdl_fieldlist, log=_log) except IOError: @@ -201,21 +206,26 @@ def parse_gfdl_am5_data(file_name: str): var_metadata = gfdl_info['variables'].get(catalog_info['variable_id']) else: raise KeyError(f'{catalog_info['variable_id']} not found in {gfdl_fieldlist}') - if hasattr(var_metadata, 'standard_name'): catalog_info.update({'standard_name': var_metadata.standard_name}) if hasattr(var_metadata, 'long_name'): catalog_info.update({'long_name': var_metadata.long_name}) if hasattr(var_metadata, 'units'): catalog_info.update({'units': var_metadata.units}) + try: + # populate information from file metadata + parse_nc_file(file, catalog_info) + except Exception as exc: + print(exc) + return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} - return catalog_info # custom parser for pp data stored on GFDL archive filesystem # assumed DRS of [root_dir]/pp/[realm]/[analysis type (e.g, 'ts')]/[frequency]/[chunk size (e.g., 1yr, 5yr)] def parse_gfdl_pp_ts(file_name: str): + catalog_info = setup_catalog() # files = sorted(glob.glob(os.path.join(file_name,'*.nc'))) # debug comment when ready to run # file = pathlib.Path(files[0]) # debug comment when ready to run file = pathlib.Path(file_name) # uncomment when ready to run @@ -254,31 +264,27 @@ def parse_gfdl_pp_ts(file_name: str): # required catalog fields def parse_cesm(file_name: str): + catalog_info = setup_catalog() file = pathlib.Path(file_name) - + catalog_info.update({"path": file_name}) num_dir_parts = len(file.parts) # file name index = num_parts 1 # isolate file from rest of path stem = file.stem # split the file name into components based on # assume am5 file name format is {realm}.{time_range}.[variable_id}.nc split = stem.split('.') - num_file_parts = len(split) - catalog_info.update({"realm": split[0]}) - catalog_info.update({"time_range": split[1]}) - catalog_info.update({"variable_id": split[2]}) + for s in split: + if any(freq_opts) == s: + catalog_info.update({"frequency": s}) + break catalog_info.update({"activity_id": "CESM"}) catalog_info.update({"institution_id": "NCAR"}) - file_freq = file.parts[num_dir_parts - 3] - - for f in freq_opts: - if f in file_freq: - catalog_info.update({"frequency": f}) - break - if 'daily' in file_freq: - catalog_info.update({"frequency": "day"}) - elif 'monthly' in file_freq: - catalog_info.update({"frequency": "mon"}) - + try: + # populate information from file metadata + new_catalog = parse_nc_file(file, catalog_info) + except Exception as exc: + print(exc) + return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} # read metadata from the appropriate fieldlist cesm_fieldlist = os.path.join(ROOT_DIR, 'data/fieldlist_CESM.jsonc') try: @@ -287,21 +293,9 @@ def parse_cesm(file_name: str): print("Unable to open file", cesm_fieldlist) sys.exit(1) - if hasattr(cesm_info['variables'], catalog_info['variable_id']): - var_metadata = cesm_info['variables'].get(catalog_info['variable_id']) - else: - raise KeyError(f'{catalog_info['variable_id']} not found in {cesm_fieldlist}') + if new_catalog['standard_name'] == "" and hasattr(cesm_info['variables'], new_catalog['variable_id']): + var_metadata = cesm_info['variables'].get(new_catalog['variable_id']) + if hasattr(var_metadata, 'standard_name') : + new_catalog.update({'standard_name': var_metadata.standard_name}) - if hasattr(var_metadata, 'standard_name'): - catalog_info.update({'standard_name': var_metadata.standard_name}) - if hasattr(var_metadata, 'long_name'): - catalog_info.update({'long_name': var_metadata.long_name}) - if hasattr(var_metadata, 'units'): - catalog_info.update({'units': var_metadata.units}) - - try: - # populate information from file metadata - parse_nc_file(file, catalog_info) - except Exception as exc: - print(exc) - return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} + return new_catalog \ No newline at end of file From 0b0bd1b357e6fad3b0bde050065451e084bcb24b Mon Sep 17 00:00:00 2001 From: wrongkindofdoctor <20195932+wrongkindofdoctor@users.noreply.github.com> Date: Fri, 27 Dec 2024 11:56:20 -0500 Subject: [PATCH 6/9] fix cesm parser frequency def add procedure to generate time_range to parsers add procedure to fix units in catalog --- tools/catalog_builder/catalog_builder.py | 3 +- tools/catalog_builder/parsers.py | 48 ++++++++++++++++-------- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/tools/catalog_builder/catalog_builder.py b/tools/catalog_builder/catalog_builder.py index 15443e66c..e8368cef1 100644 --- a/tools/catalog_builder/catalog_builder.py +++ b/tools/catalog_builder/catalog_builder.py @@ -100,9 +100,8 @@ def cat_builder(self, data_paths: list, depth=dir_depth, exclude_patterns=exclude_patterns, # Exclude the following directories include_patterns=include_patterns, - joblib_parallel_kwargs={'n_jobs': nthreads}, # Number of jobs to execute - + joblib_parallel_kwargs={'n_jobs': nthreads} # Number of jobs to execute - # should be equal to # threads you are using - extension='.nc' # extension of target file ) def call_build(self, file_parse_method=None): diff --git a/tools/catalog_builder/parsers.py b/tools/catalog_builder/parsers.py index 29fff2a79..addd09827 100644 --- a/tools/catalog_builder/parsers.py +++ b/tools/catalog_builder/parsers.py @@ -7,6 +7,8 @@ import json import logging import collections +import netCDF4 as nc +import cftime from dask.array.core import chunks_from_arrays from ecgtools.builder import INVALID_ASSET, TRACEBACK @@ -71,7 +73,7 @@ def strip_comments(str_: str, delimiter=None): # handle delimiters midway through a line: # If delimiter appears quoted in a string, don't want to treat it as # a comment. So for each occurrence of delimiter, count number of - # "s to its left and only truncate when that's an even number. + # 's to its left and only truncate when that's an even number. # First we get rid of -escaped single "s. replaced_line = lines[i].replace('\\\"', escaped_quote_placeholder) line_parts = replaced_line.split(delimiter) @@ -137,12 +139,25 @@ def read_json(file_path: str, log=_log) -> dict: def parse_nc_file(file_path: pathlib.Path, catalog_info: dict) -> dict: # call to xr.open_dataset required by ecgtools.builder.Builder - exclude_vars = ('time', 'time_bnds', 'date') + exclude_vars = ('time', 'time_bnds', 'date', 'hyam', 'hybm') with xr.open_dataset(file_path, chunks={}, decode_times=False, engine="netcdf4") as ds: variable_list = [var for var in ds if 'standard_name' in ds[var].attrs or 'long_name' in ds[var].attrs and var not in ds.coords and - var.lower() not in exclude_vars] + var not in exclude_vars] + # append time range + if 'time' in ds.coords: + time_var = ds.coords['time'] + calendar = None + if 'calendar' in time_var.attrs: + calendar = time_var.attrs['calendar'] + if calendar == 'no_leap': + calendar = 'noleap' + start_time = cftime.num2date(time_var.values[0], time_var.attrs['units'], calendar=calendar) + end_time = cftime.num2date(time_var.values[-1], time_var.attrs['units']) + time_range = start_time.strftime("%Y%m%d:%H%M%S") + '-' + end_time.strftime("%Y%m%d:%H%M%S") + catalog_info.update({'time_range': time_range}) + for var in variable_list: if len(ds[var].attrs['long_name']) == 0 and len(ds[var].attrs['long_name']) == 0: print('Asset variable does not contain a standard_name or long_name attribute') @@ -155,7 +170,7 @@ def parse_nc_file(file_path: pathlib.Path, catalog_info: dict) -> dict: return catalog_info -def setup_catalog() -> dict(): +def setup_catalog() -> dict: catalog_info = dict() for k in catalog_keys: catalog_info[k] = "" @@ -270,13 +285,7 @@ def parse_cesm(file_name: str): num_dir_parts = len(file.parts) # file name index = num_parts 1 # isolate file from rest of path stem = file.stem - # split the file name into components based on - # assume am5 file name format is {realm}.{time_range}.[variable_id}.nc - split = stem.split('.') - for s in split: - if any(freq_opts) == s: - catalog_info.update({"frequency": s}) - break + # split the file name into components catalog_info.update({"activity_id": "CESM"}) catalog_info.update({"institution_id": "NCAR"}) try: @@ -293,9 +302,16 @@ def parse_cesm(file_name: str): print("Unable to open file", cesm_fieldlist) sys.exit(1) - if new_catalog['standard_name'] == "" and hasattr(cesm_info['variables'], new_catalog['variable_id']): - var_metadata = cesm_info['variables'].get(new_catalog['variable_id']) - if hasattr(var_metadata, 'standard_name') : - new_catalog.update({'standard_name': var_metadata.standard_name}) - + units = new_catalog.get('units') + new_catalog.update({'units': units.replace('/s',' s-1').replace('/m2', ' m-2')}) + var_metadata = cesm_info['variables'].get(new_catalog['variable_id'], None) + if var_metadata is not None: + if var_metadata.get('standard_name', None) is not None: + new_catalog.update({'standard_name': var_metadata['standard_name']}) + if var_metadata.get('realm', None) is not None : + new_catalog.update({'realm': var_metadata['realm']}) + for p in file.parts: + if p in freq_opts: + new_catalog.update({"frequency": p}) + break return new_catalog \ No newline at end of file From 257b2a56917fdaf60718c4e20342863619be4b90 Mon Sep 17 00:00:00 2001 From: wrongkindofdoctor <20195932+wrongkindofdoctor@users.noreply.github.com> Date: Fri, 27 Dec 2024 11:56:49 -0500 Subject: [PATCH 7/9] add entries to CESM and GFDL fieldlists --- data/fieldlist_CESM.jsonc | 92 +++++++++++++++++++++++++++++++++++++-- data/fieldlist_GFDL.jsonc | 4 +- 2 files changed, 90 insertions(+), 6 deletions(-) diff --git a/data/fieldlist_CESM.jsonc b/data/fieldlist_CESM.jsonc index 7e500ee3c..f5d426581 100644 --- a/data/fieldlist_CESM.jsonc +++ b/data/fieldlist_CESM.jsonc @@ -37,12 +37,26 @@ }, "lev": { "standard_name": "atmosphere_hybrid_sigma_pressure_coordinate", - "units": "level", // equivalent to '1' + "units": "level", + // equivalent to '1' "positive": "down", "axis": "Z" }, + "hyam": { + "standard_name": "hybrid A coefficient at layer midpoints", + "long_name": "hybrid A coefficient at layer midpoints", + "units": "1", + "axis": "Z" + + }, + "hybm": { + "standard_name": "hybrid B coefficient at layer midpoints", + "long_name": "hybrid B coefficient at layer midpoints", + "units": "1", + "axis": "Z" + }, "z_t": { - "standard_name": "depth from surface to midpoint of layer", + "standard_name": "depth_from_surface_to_midpoint_of_layer", "units": "centimeters", "positive": "down", "axis": "Z" @@ -61,6 +75,27 @@ "scalar_coord_templates": {"plev": "U{value}"}, "ndim": 4 }, + "U250": { + "standard_name": "eastward_wind", + "long_name": "Zonal wind at 250 mbar pressure surface", + "realm": "atmos", + "units": "m s-1", + "ndim": 3 + }, + "U200": { + "standard_name": "eastward_wind", + "long_name": "Zonal wind at 200 mbar pressure surface", + "realm": "atmos", + "units": "m s-1", + "ndim": 3 + }, + "U850": { + "standard_name": "eastward_wind", + "long_name": "Zonal wind at 850 mbar pressure surface", + "realm": "atmos", + "units": "m s-1", + "ndim": 3 + }, "V": { "standard_name": "northward_wind", "realm":"atmos", @@ -68,6 +103,27 @@ "scalar_coord_templates": {"plev": "V{value}"}, "ndim": 4 }, + "V250": { + "standard_name": "northward_wind", + "long_name": "Meridional wind at 250 mbar pressure surface", + "realm":"atmos", + "units": "m s-1", + "ndim": 3 + }, + "V200": { + "standard_name": "northward_wind", + "long_name": "Meridional wind at 200 mbar pressure surface", + "realm":"atmos", + "units": "m s-1", + "ndim": 3 + }, + "V850": { + "standard_name": "northward_wind", + "long_name": "Meridional wind at 850 mbar pressure surface", + "realm":"atmos", + "units": "m s-1", + "ndim": 3 + }, "Z3": { "standard_name": "geopotential_height", "units": "m", @@ -78,10 +134,23 @@ }, "Z500": { "standard_name": "geopotential_height", - "long_name": "geopotential height at 500 hPa", + "long_name": "geopotential height at 500 mbar pressure surface", + "realm": "atmos", + "units": "m", + "ndim": 3 + }, + "Z850": { + "standard_name": "geopotential_height", + "long_name": "geopotential height at 850 mbar pressure surface", + "realm": "atmos", + "units": "m", + "ndim": 3 + }, + "Z250": { + "standard_name": "geopotential_height", + "long_name": "geopotential height at 250 mbar pressure surface", "realm": "atmos", "units": "m", - // note: 4d name is 'Z3' but Z500 = height at 500 mb, etc. "ndim": 3 }, "Q": { @@ -96,6 +165,13 @@ "units": "Pa s-1", "scalar_coord_templates": {"plev": "OMEGA{value}"}, "ndim": 4 + }, + "OMEGA500": { + "standard_name": "lagrangian_tendency_of_air_pressure", + "long_name": "Vertical velocity at 500 mbar pressure surface", + "realm": "atmos", + "units": "Pa s-1", + "ndim": 3 }, "TS": { "standard_name": "surface_temperature", @@ -349,6 +425,13 @@ "realm": "atmos", "units": "K", "ndim": 4 + }, + "T250": { + "standard_name": "air_temperature", + "long_name": "air temperature at 250 mbar pressure surface", + "realm": "atmos", + "units": "K", + "ndim": 3 }, // prw: Column Water Vapor (precipitable water vapor), units = mm (or kg/m^2) "prw": { @@ -358,6 +441,7 @@ "units": "kg m-2", "ndim": 3 } + // Variables for SM_ET_coupling module // "mrsos": { // "standard_name": "mass_content_of_water_in_soil_layer", diff --git a/data/fieldlist_GFDL.jsonc b/data/fieldlist_GFDL.jsonc index 16c225a38..82ef5dfb0 100644 --- a/data/fieldlist_GFDL.jsonc +++ b/data/fieldlist_GFDL.jsonc @@ -625,14 +625,14 @@ "ndim": 3 }, "zg500": { - "standard_name": "", + "standard_name": "geopotential_height", "long_name": "Geopotential Height at 500 hPa", "realm": "atmos", "units": "m", "ndim": 3 }, "zg": { - "standard_name": "", + "standard_name": "geopotential_heihgt", "long_name": "Geopotential Height", "realm": "atmos", "units": "m", From 5e32919051ea510c7089366076c2fdf4ee94d218 Mon Sep 17 00:00:00 2001 From: wrongkindofdoctor <20195932+wrongkindofdoctor@users.noreply.github.com> Date: Fri, 27 Dec 2024 15:32:53 -0500 Subject: [PATCH 8/9] clean up cesm parser include file name parts in cesm parser frequency search --- tools/catalog_builder/parsers.py | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/tools/catalog_builder/parsers.py b/tools/catalog_builder/parsers.py index addd09827..258257a99 100644 --- a/tools/catalog_builder/parsers.py +++ b/tools/catalog_builder/parsers.py @@ -7,10 +7,7 @@ import json import logging import collections -import netCDF4 as nc import cftime - -from dask.array.core import chunks_from_arrays from ecgtools.builder import INVALID_ASSET, TRACEBACK @@ -238,7 +235,6 @@ def parse_gfdl_am5_data(file_name: str): # custom parser for pp data stored on GFDL archive filesystem # assumed DRS of [root_dir]/pp/[realm]/[analysis type (e.g, 'ts')]/[frequency]/[chunk size (e.g., 1yr, 5yr)] - def parse_gfdl_pp_ts(file_name: str): catalog_info = setup_catalog() # files = sorted(glob.glob(os.path.join(file_name,'*.nc'))) # debug comment when ready to run @@ -275,19 +271,21 @@ def parse_gfdl_pp_ts(file_name: str): print(exc) return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()} -# custom parser for GFDL am5 data that uses fieldlist metadata and the DRS to populate -# required catalog fields - +# custom parser for CESM data that uses fieldlist metadata and the DRS to populate +# required catalog fields. Bas def parse_cesm(file_name: str): catalog_info = setup_catalog() - file = pathlib.Path(file_name) catalog_info.update({"path": file_name}) - num_dir_parts = len(file.parts) # file name index = num_parts 1 - # isolate file from rest of path - stem = file.stem - # split the file name into components catalog_info.update({"activity_id": "CESM"}) catalog_info.update({"institution_id": "NCAR"}) + # split the file path and name into parts + file = pathlib.Path(file_name) + stem_parts = file.stem.split('.') + # search file and path for output frequency + for p in list(file.parts) + stem_parts: + if p in freq_opts: + catalog_info.update({"frequency": p}) + break try: # populate information from file metadata new_catalog = parse_nc_file(file, catalog_info) @@ -310,8 +308,5 @@ def parse_cesm(file_name: str): new_catalog.update({'standard_name': var_metadata['standard_name']}) if var_metadata.get('realm', None) is not None : new_catalog.update({'realm': var_metadata['realm']}) - for p in file.parts: - if p in freq_opts: - new_catalog.update({"frequency": p}) - break - return new_catalog \ No newline at end of file + + return new_catalog From 2c58f478b4dd2c074aabfcf86a239fe5f037a3c6 Mon Sep 17 00:00:00 2001 From: wrongkindofdoctor <20195932+wrongkindofdoctor@users.noreply.github.com> Date: Fri, 27 Dec 2024 15:46:34 -0500 Subject: [PATCH 9/9] clean up print statements --- tools/catalog_builder/catalog_builder.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/catalog_builder/catalog_builder.py b/tools/catalog_builder/catalog_builder.py index e8368cef1..0a9d037b6 100644 --- a/tools/catalog_builder/catalog_builder.py +++ b/tools/catalog_builder/catalog_builder.py @@ -107,7 +107,6 @@ def cat_builder(self, data_paths: list, def call_build(self, file_parse_method=None): if file_parse_method is None: file_parse_method = self.file_parse_method - print(file_parse_method) # see https://github.com/ncar-xdev/ecgtools/blob/main/ecgtools/parsers/cmip6.py # for more parsing methods self.cb = self.cb.build(parsing_func=file_parse_method) @@ -196,7 +195,6 @@ def main(config: str): conf = load_config(config) for p in conf['data_root_dirs']: try: - print(p) os.path.isdir(p) except FileNotFoundError: print("{p} not found. Check data_root_dirs for typos.")