diff --git a/src/pypromice/qc/github_data_issues.py b/src/pypromice/qc/github_data_issues.py index 8d963727..26fbd218 100644 --- a/src/pypromice/qc/github_data_issues.py +++ b/src/pypromice/qc/github_data_issues.py @@ -36,7 +36,7 @@ def flagNAN(ds_in, ds : xr.Dataset Level 0 data with flagged data ''' - ds = ds_in.copy() + ds = ds_in.copy(deep=True) df = None df = _getDF(flag_url + ds.attrs["station_id"] + ".csv", @@ -71,7 +71,7 @@ def flagNAN(ds_in, for v in varlist: if v in list(ds.keys()): - logger.info(f'---> flagging {t0} {t1} {v}') + logger.info(f'---> flagging {v} between {t0} and {t1}') ds[v] = ds[v].where((ds['time'] < t0) | (ds['time'] > t1)) else: logger.info(f'---> could not flag {v} not in dataset') @@ -99,7 +99,7 @@ def adjustTime(ds, ds : xr.Dataset Level 0 data with flagged data ''' - ds_out = ds.copy() + ds_out = ds.copy(deep=True) adj_info=None adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv", @@ -165,7 +165,7 @@ def adjustData(ds, ds : xr.Dataset Level 0 data with flagged data ''' - ds_out = ds.copy() + ds_out = ds.copy(deep=True) adj_info=None adj_info = _getDF(adj_url + ds.attrs["station_id"] + ".csv", os.path.join(adj_dir, ds.attrs["station_id"] + ".csv"), @@ -176,13 +176,11 @@ def adjustData(ds, # removing potential time shifts from the adjustment list adj_info = adj_info.loc[adj_info.adjust_function != "time_shift", :] - # if t1 is left empty, then adjustment is applied until the end of the file - adj_info.loc[adj_info.t0.isnull(), "t0"] = ds_out.time.values[0] - adj_info.loc[adj_info.t1.isnull(), "t1"] = ds_out.time.values[-1] - # making all timestamps timezone naive (compatibility with xarray) - adj_info.t0 = pd.to_datetime(adj_info.t0).dt.tz_localize(None) - adj_info.t1 = pd.to_datetime(adj_info.t1).dt.tz_localize(None) - + # making sure that t0 and t1 columns are object dtype then replaceing nan with None + adj_info[['t0','t1']] = adj_info[['t0','t1']].astype(object) + adj_info.loc[adj_info.t1.isnull()|(adj_info.t1==''), "t1"] = None + adj_info.loc[adj_info.t0.isnull()|(adj_info.t0==''), "t0"] = None + # if "*" is in the variable name then we interpret it as regex selec = adj_info['variable'].str.contains('\*') & (adj_info['variable'] != "*") for ind in adj_info.loc[selec, :].index: @@ -217,88 +215,92 @@ def adjustData(ds, adj_info.loc[var].adjust_function, adj_info.loc[var].adjust_value, ): - if (t0 > pd.to_datetime(ds_out.time.values[-1])) | (t1 < pd.to_datetime(ds_out.time.values[0])): + # making all timestamps timezone naive (compatibility with xarray) + if isinstance(t0, str): + t0 = pd.to_datetime(t0, utc=True).tz_localize(None) + if isinstance(t1, str): + t1 = pd.to_datetime(t1, utc=True).tz_localize(None) + + index_slice = dict(time=slice(t0, t1)) + + if len(ds_out[var].loc[index_slice].time.time) == 0: + logger.info("Time range does not intersect with dataset") continue - logger.info(f'---> {t0} {t1} {var} {func} {val}') + + logger.info(f'---> adjusting {var} between {t0} and {t1} ({func} {val})') + if func == "add": - ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values + val + ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice].values + val # flagging adjusted values # if var + "_adj_flag" not in ds_out.columns: # ds_out[var + "_adj_flag"] = 0 - # msk = ds_out[var].loc[dict(time=slice(t0, t1))])].notnull() - # ind = ds_out[var].loc[dict(time=slice(t0, t1))])].loc[msk].time + # msk = ds_out[var].loc[index_slice])].notnull() + # ind = ds_out[var].loc[index_slice])].loc[msk].time # ds_out.loc[ind, var + "_adj_flag"] = 1 if func == "multiply": - ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))].values * val + ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice].values * val if "DW" in var: - ds_out[var].loc[dict(time=slice(t0, t1))] = ds_out[var].loc[dict(time=slice(t0, t1))] % 360 + ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice] % 360 # flagging adjusted values # if var + "_adj_flag" not in ds_out.columns: # ds_out[var + "_adj_flag"] = 0 - # msk = ds_out[var].loc[dict(time=slice(t0, t1))].notnull() - # ind = ds_out[var].loc[dict(time=slice(t0, t1))].loc[msk].time + # msk = ds_out[var].loc[index_slice].notnull() + # ind = ds_out[var].loc[index_slice].loc[msk].time # ds_out.loc[ind, var + "_adj_flag"] = 1 if func == "min_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values + tmp = ds_out[var].loc[index_slice].values tmp[tmp < val] = np.nan if func == "max_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))].values + tmp = ds_out[var].loc[index_slice].values tmp[tmp > val] = np.nan - ds_out[var].loc[dict(time=slice(t0, t1))] = tmp + ds_out[var].loc[index_slice] = tmp if func == "upper_perc_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() - df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").quantile(1 - val / 100) - df_w = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").var() + tmp = ds_out[var].loc[index_slice].copy() + df_w = ds_out[var].loc[index_slice].resample(time="14D").quantile(1 - val / 100) + df_w = ds_out[var].loc[index_slice].resample(time="14D").var() for m_start, m_end in zip(df_w.time[:-2], df_w.time[1:]): msk = (tmp.time >= m_start) & (tmp.time < m_end) values_month = tmp.loc[msk].values values_month[values_month < df_w.loc[m_start]] = np.nan tmp.loc[msk] = values_month - ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values + ds_out[var].loc[index_slice] = tmp.values if func == "biweekly_upper_range_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() - df_max = ds_out[var].loc[dict(time=slice(t0, t1))].resample("14D").max() - for m_start, m_end in zip(df_max.time[:-2], df_max.time[1:]): - msk = (tmp.time >= m_start) & (tmp.time < m_end) - lim = df_max.loc[m_start] - val - values_month = tmp.loc[msk].values - values_month[values_month < lim] = np.nan - tmp.loc[msk] = values_month - # remaining samples following outside of the last 2 weeks window - msk = tmp.time >= m_end - lim = df_max.loc[m_start] - val - values_month = tmp.loc[msk].values - values_month[values_month < lim] = np.nan - tmp.loc[msk] = values_month - # updating original pandas - ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values + df_max = ( + ds_out[var].loc[index_slice].copy(deep=True) + .resample(time="14D",offset='7D').max() + .sel(time=ds_out[var].loc[index_slice].time.values, method='ffill') + ) + df_max['time'] = ds_out[var].loc[index_slice].time + # updating original pandas + ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice].where(ds_out[var].loc[index_slice] > df_max-val) + if func == "hampel_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))] + tmp = ds_out[var].loc[index_slice] tmp = _hampel(tmp, k=7 * 24, t0=val) - ds_out[var].loc[dict(time=slice(t0, t1))] = tmp.values + ds_out[var].loc[index_slice] = tmp.values if func == "grad_filter": - tmp = ds_out[var].loc[dict(time=slice(t0, t1))].copy() - msk = ds_out[var].loc[dict(time=slice(t0, t1))].copy().diff() + tmp = ds_out[var].loc[index_slice].copy() + msk = ds_out[var].loc[index_slice].copy().diff() tmp[np.roll(msk.abs() > val, -1)] = np.nan - ds_out[var].loc[dict(time=slice(t0, t1))] = tmp + ds_out[var].loc[index_slice] = tmp if "swap_with_" in func: var2 = func[10:] - val_var = ds_out[var].loc[dict(time=slice(t0, t1))].values.copy() - val_var2 = ds_out[var2].loc[dict(time=slice(t0, t1))].values.copy() - ds_out[var2].loc[dict(time=slice(t0, t1))] = val_var - ds_out[var].loc[dict(time=slice(t0, t1))] = val_var2 + val_var = ds_out[var].loc[index_slice].values.copy() + val_var2 = ds_out[var2].loc[index_slice].values.copy() + ds_out[var2].loc[index_slice] = val_var + ds_out[var].loc[index_slice] = val_var2 if func == "rotate": - ds_out[var].loc[dict(time=slice(t0, t1))] = (ds_out[var].loc[dict(time=slice(t0, t1))].values + val) % 360 + ds_out[var].loc[index_slice] = (ds_out[var].loc[index_slice].values + val) % 360 return ds_out