diff --git a/eurocropsml/acquisition/clipper.py b/eurocropsml/acquisition/clipper.py index 50129f5..aa6e8ff 100644 --- a/eurocropsml/acquisition/clipper.py +++ b/eurocropsml/acquisition/clipper.py @@ -84,6 +84,7 @@ def _get_arguments( output_dir: Directory to get the list of .SAFE files from and to store the argument list. local_dir: Local directory where the .SAFE files were copied to. + month: Month that is being processed. Returns: - List of tuples of arguments for clipping raster tiles. @@ -95,7 +96,7 @@ def _get_arguments( parcel_id_name: str = cast(str, config.parcel_id_name) bands: list[str] = cast(list[str], config.bands) - clipping_path = output_dir.joinpath("clipper").joinpath(f'{month}') + clipping_path = output_dir.joinpath("clipper").joinpath(f"{month}") clipping_path.mkdir(exist_ok=True, parents=True) if clipping_path.joinpath("args.pkg").exists(): @@ -109,8 +110,8 @@ def _get_arguments( full_images_paths: Path = output_dir.joinpath("collector", "full_parcel_list.pkg") full_images = pd.read_pickle(full_images_paths) - full_images['completionDate'] = pd.to_datetime(full_images['completionDate']) - full_images = full_images[(full_images['completionDate'].dt.month == month)] + full_images["completionDate"] = pd.to_datetime(full_images["completionDate"]) + full_images = full_images[(full_images["completionDate"].dt.month == month)] if local_dir is not None: full_images["productIdentifier"] = str(local_dir) + full_images[ @@ -121,7 +122,10 @@ def _get_arguments( band_images: pd.DataFrame = pd.read_pickle(band_image_path) band_images = band_images[ - (band_images['productIdentifier'].str.extract(r'/\d{4}/(\d{2})/')[0].astype(int) == month) + ( + band_images["productIdentifier"].str.extract(r"/\d{4}/(\d{2})/")[0].astype(int) + == month + ) ] max_workers = min(mp_orig.cpu_count(), max(1, min(len(band_images), workers))) @@ -237,7 +241,6 @@ def clipping( for month in range(config.months[0], config.months[1] + 1): - args_month, polygon_df, clipping_path = _get_arguments( config=config, workers=workers, @@ -277,7 +280,9 @@ def clipping( logger.info(f"Starting parallel raster clipping for {month}...") te = tqdm(total=len(args_month) - processed, desc=f"Clipping raster tiles for {month}.") while processed < len(args_month): - chunk_args: list[tuple[pd.DataFrame, list]] = args_month[processed : processed + chunk_size] + chunk_args: list[tuple[pd.DataFrame, list]] = args_month[ + processed : processed + chunk_size + ] results: list[pd.DataFrame] = [] with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: @@ -293,9 +298,11 @@ def clipping( # Process and save results for result in results: if result is not None and not result.empty: - result.columns = [pd.to_datetime(result.columns[0]).strftime('%Y-%m-%d')] - df_final_month.columns = [pd.to_datetime(col).strftime('%Y-%m-%d') for col in - df_final_month.columns] + result.columns = [pd.to_datetime(result.columns[0]).strftime("%Y-%m-%d")] + df_final_month.columns = [ + pd.to_datetime(col).strftime("%Y-%m-%d") + for col in df_final_month.columns + ] df_final_month = df_final_month.fillna(result) te.update(n=1) @@ -319,18 +326,3 @@ def clipping( cast(str, config.parcel_id_name), new_data, ) - -def main(): - config = CollectorConfig(country="Austria", year=2021) - vector_data_dir = Path('/big_volume/data_s1/meta_data/vector_data/') - config.post_init(vector_data_dir) - output_dir = Path('/big_volume/data_s1/output_data/Austria/S1') - shape_dir = Path('/big_volume/data_s1/meta_data/vector_data/AT_2021_clean/') - workers = 16 - chunk_size = 20 - multiplier = 15 - - clipping(config, output_dir, shape_dir, workers, chunk_size, multiplier) - -if __name__ == '__main__': - main() diff --git a/eurocropsml/acquisition/region.py b/eurocropsml/acquisition/region.py index c42ecd4..625f0bd 100644 --- a/eurocropsml/acquisition/region.py +++ b/eurocropsml/acquisition/region.py @@ -95,7 +95,8 @@ def add_nuts_regions( for nuts_level in [1, 2, 3]: nuts_filtered = nuts_df[nuts_df["LEVL_CODE"] == nuts_level] - # we use intersect instead of within since some parcels are at the border of two regions + # we use intersect instead of within + # since some parcels are at the border of two regions shapefile = gpd.sjoin(shapefile, nuts_filtered, how="left", predicate="intersects") no_intersections = shapefile[shapefile.index_right.isna()] no_intersections = no_intersections.to_crs("EPSG:3857") @@ -123,10 +124,13 @@ def add_nuts_regions( cols_shapefile = cols_shapefile + [f"nuts{nuts_level+1}"] # add nuts region to final reflectance dataframe - full_df: pd.DataFrame = pd.read_parquet(output_dir.joinpath("clipper", f"{month}", "clipped", "clipped.parquet")) + full_df: pd.DataFrame = pd.read_parquet( + output_dir.joinpath("clipper", f"{month}", "clipped", "clipped.parquet") + ) - full_df.columns = [full_df.columns[0]] + [pd.to_datetime(col).strftime('%Y-%m-%d') for col in - full_df.columns[1:]] + full_df.columns = [full_df.columns[0]] + [ + pd.to_datetime(col).strftime("%Y-%m-%d") for col in full_df.columns[1:] + ] shapefile[parcel_id_name] = shapefile[parcel_id_name].astype(int) joined_final = pd.merge(full_df, shapefile, on=parcel_id_name, how="left") @@ -162,11 +166,15 @@ def add_nuts_regions( geom_dir.mkdir(exist_ok=True, parents=True) classes_df.to_parquet( - label_dir.joinpath(f"{config.ec_filename}_{config.year}_labels.parquet"), index=False + label_dir.joinpath(f"{config.ec_filename}_{config.year}_labels.parquet"), + index=False, ) joined_final.to_parquet( - final_output_dir.joinpath(f"{month}", f"{config.ec_filename}_{config.year}.parquet"), index=False + final_output_dir.joinpath( + f"{month}", f"{config.ec_filename}_{config.year}.parquet" + ), + index=False, ) geometry_df.to_file( diff --git a/eurocropsml/acquisition/utils.py b/eurocropsml/acquisition/utils.py index 8439a06..0f90a69 100644 --- a/eurocropsml/acquisition/utils.py +++ b/eurocropsml/acquisition/utils.py @@ -109,9 +109,7 @@ def mask_polygon_raster( inv_transform = ~transform # Invert the affine transformation matrix polygon_df["geometry"] = polygon_df["geometry"].apply( - lambda poly, i_trans=inv_transform: _transform_polygon( - poly, i_trans - ) + lambda poly, i_trans=inv_transform: _transform_polygon(poly, i_trans) ) # clipping geometry out of raster tile and saving in dictionary polygon_df.apply( @@ -161,8 +159,9 @@ def _merge_clipper( logger.info("Starting merging of DataFrames...") df_list: list = [file for file in clipped_output_dir.iterdir() if "Final_" in file.name] - full_df.columns = [full_df.columns[0]] + [pd.to_datetime(col).strftime('%Y-%m-%d') for col in - full_df.columns[1:]] + full_df.columns = [full_df.columns[0]] + [ + pd.to_datetime(col).strftime("%Y-%m-%d") for col in full_df.columns[1:] + ] # setting parcel_id column to index full_df.set_index(parcel_id_name, inplace=True) diff --git a/eurocropsml/dataset/preprocess.py b/eurocropsml/dataset/preprocess.py index e968f3f..704a70f 100644 --- a/eurocropsml/dataset/preprocess.py +++ b/eurocropsml/dataset/preprocess.py @@ -1,6 +1,7 @@ """Preprocessing utilities for the EuroCrops dataset.""" import logging +import os import sys from functools import cache, partial from multiprocessing import Pool @@ -13,7 +14,6 @@ import requests import typer from tqdm import tqdm -import os from eurocropsml.acquisition.config import S1_BANDS, S2_BANDS from eurocropsml.dataset.config import EuroCropsDatasetPreprocessConfig @@ -276,7 +276,9 @@ def preprocess( # filter nan-values country_file = country_file[~country_file[f"nuts{nuts_level}"].isna()] points = _get_latlons(month_data_dir.joinpath("geometries"), file_path.stem) - labels = _get_labels(month_data_dir.joinpath("labels"), file_path.stem, preprocess_config) + labels = _get_labels( + month_data_dir.joinpath("labels"), file_path.stem, preprocess_config + ) # country_file.set_index("parcel_id", inplace=True) regions = country_file[f"nuts{nuts_level}"].unique() @@ -297,7 +299,9 @@ def preprocess( # replacing single empty timesteps region_data = region_data.apply( - lambda x, b=len(bands): x.map(lambda y: np.array([0] * b) if y is None else y) + lambda x, b=len(bands): x.map( + lambda y: np.array([0] * b) if y is None else y + ) ) with Pool(processes=num_workers) as p: func = partial(