Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

29 process clipping of sentinel tiles on monthly basis #52

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0497e6d
added processing on montly basis and adjusted the CRS transformation
katyagikalo Oct 23, 2024
337c0aa
fixing month range
katyagikalo Oct 23, 2024
0b8994a
adjusting the files with make
katyagikalo Oct 28, 2024
9d9c555
minor changes
katyagikalo Nov 2, 2024
0da07f9
Merge branch 'main' into 29-Process-clipping-of-Sentinel-tiles-on-mon…
katyagikalo Nov 2, 2024
d44ffa9
Merge branch 'main' into 29-Process-clipping-of-Sentinel-tiles-on-mon…
jmaces Nov 11, 2024
39dbcc6
update example identifier string and fix typo
jmaces Nov 11, 2024
cc3392a
change the default bands
katyagikalo Nov 11, 2024
23ecd69
Merging with main
jsreuss Nov 18, 2024
0fc9bab
Fixing paranthesis
jsreuss Nov 18, 2024
0ab954c
Removed debugging code
jsreuss Nov 19, 2024
61d4d5b
Updated logging message
jsreuss Nov 19, 2024
b220ba5
Updated clipping directory
jsreuss Nov 19, 2024
d9b8897
Fixed crs string for S1
jsreuss Nov 19, 2024
9c0462e
Removed unnecessary try-catch-block
jsreuss Nov 20, 2024
6f96fdd
Updated final raw_data dir
jsreuss Nov 21, 2024
8ae251e
Updated logging message for NUTS acquistion
jsreuss Nov 21, 2024
e0eb445
Closing progress bar
jsreuss Nov 21, 2024
f10baa1
Raising FileNotFoundError
jsreuss Nov 21, 2024
d588897
Adding year git config
jsreuss Nov 21, 2024
1926dfe
Adding year to directory structure
jsreuss Nov 21, 2024
4a2492a
Merging monthly npz files
jsreuss Nov 22, 2024
a237265
Using different NUTS crs if not available
jsreuss Nov 22, 2024
72de3b8
split functions
jsreuss Nov 22, 2024
26b71db
Removing datetime, specifying exception
jsreuss Nov 22, 2024
1c749e8
Removed conversion back to string
jsreuss Nov 22, 2024
8b1edfe
Removed debugging code
jsreuss Nov 22, 2024
4801f1c
Remuved debugging code
jsreuss Nov 22, 2024
be43878
Checking identifier within filename instead of path
jsreuss Nov 22, 2024
51de8d3
Updating thermal noise interpolation
jsreuss Dec 4, 2024
6330b84
Added FileNotFoundError
jsreuss Dec 4, 2024
14c3e6f
Removed changes to noise removal (cf. new PR)
jsreuss Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Changes from previous releases are listed below.
- Add documentation of acquiring and processing Sentinel-1 data _(see #38)_
- Add preprocessing of Sentinel-1 _(see #31)_
- Renaming and moving of Sentinel-2 bands _(see #42)_
- Process clipping of Sentinel tiles on monthly basis _(see #29)_
- Adjust the shapefile CRS transformation for Sentinel-1 _(see #49)_
- Adjusting split generation _(see #45)_
- Fixing padding mask _(see #50)_
- Fasten up padding to 366 days _(see #54)_
Expand Down
3 changes: 2 additions & 1 deletion eurocropsml/acquisition/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def build_dataset(
config.country_config.post_init(vector_data_dir)
ct_config = config.country_config

final_output_dir = config.raw_data_dir.joinpath(ct_config.satellite)
final_output_dir = config.raw_data_dir.joinpath(ct_config.satellite, str(ct_config.year))
output_dir = config.output_dir
local_dir = config.local_dir

Expand Down Expand Up @@ -81,6 +81,7 @@ def build_dataset(
config.chunk_size,
config.multiplier,
local_dir,
config.rebuild,
)

logger.info("Finished step 3: Clipping parcels from raster tiles.")
Expand Down
256 changes: 141 additions & 115 deletions eurocropsml/acquisition/clipping/clipper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Clipping parcel polygons from raster files."""

import calendar
import concurrent.futures
import gc
import logging
Expand All @@ -12,7 +13,6 @@
import geopandas as gpd
import pandas as pd
import pyogrio
import typer
from tqdm import tqdm

from eurocropsml.acquisition.clipping.utils import _merge_clipper, mask_polygon_raster
Expand All @@ -26,52 +26,55 @@ def _merge_dataframe(
clipped_output_dir: Path,
output_dir: Path,
parcel_id_name: str,
month: str,
new_data: bool,
rebuild: bool,
) -> None:
"""Merging all clipped parcels into one final DataFrame.

full_df: Final dataframe containing all parcels without clipped values.
clipped_output_dir: Directory where the individual clipped parcels are stored.
output_dir: Directory to save the final output.
parcel_id_name: Country-specific parcel ID name.
month: Month that is being processed.
new_data: Whether new data has been processed.
rebuild: Whether to re-build the clipped parquet files for each month.
This will overwrite the existing ones.

"""
process: bool = True
if not output_dir.joinpath("clipped.parquet").exists():
_merge_clipper(
full_df,
clipped_output_dir,
output_dir,
parcel_id_name,
)
else:
if new_data:
reprocess = typer.confirm(
f"{output_dir.joinpath('clipped.parquet')} already exists but new data has been "
"processed. Do you want to reprocess the file?"
process = True
elif not new_data:
if not rebuild:
process = False
logger.info(
f"{output_dir.joinpath('clipped.parquet')} already exists, no new data has "
"ben processed and rebuild set to False. Nothing to be done."
)
else:
reprocess = typer.confirm(
f"{output_dir.joinpath('clipped.parquet')} already exists and no new data has been"
" processed. Do you want to reprocess the file?"
)

if reprocess:
logger.info("File is being deleted and reprocessed.")
output_dir.joinpath("clipped.parquet").unlink()
_merge_clipper(
full_df,
clipped_output_dir,
output_dir,
parcel_id_name,
logger.info(
f"{output_dir.joinpath('clipped.parquet')} already exists, no new data has "
"been processed but rebuild set to True. Redoing merge..."
)
elif new_data:
output_dir.joinpath("clipped.parquet").unlink()
logger.info(
f"{output_dir.joinpath('clipped.parquet')} already exists and new data has "
"processed. Redoing merge..."
)

if process:
_merge_clipper(full_df, clipped_output_dir, output_dir, parcel_id_name, month)


def _get_arguments(
config: CollectorConfig,
workers: int,
shape_dir: Path,
output_dir: Path,
month: int,
local_dir: Path | None = None,
) -> tuple[list[tuple[pd.DataFrame, list]], gpd.GeoDataFrame, Path]:
"""Get arguments for clipping polygons from raster files.
Expand All @@ -82,6 +85,7 @@ def _get_arguments(
shape_dir: Directory where EuroCrops shapefile is stored.
output_dir: Directory to get the list of .SAFE files from and to store the
argument list.
month: Month that is being processed.
local_dir: Local directory where the .SAFE files were copied to.

Returns:
Expand All @@ -94,7 +98,7 @@ def _get_arguments(
parcel_id_name: str = cast(str, config.parcel_id_name)
bands: list[str] = cast(list[str], config.bands)

clipping_path = output_dir.joinpath("clipper")
clipping_path = output_dir.joinpath("clipper", f"{month}")
clipping_path.mkdir(exist_ok=True, parents=True)

if clipping_path.joinpath("args.pkg").exists():
Expand All @@ -108,6 +112,8 @@ def _get_arguments(
full_images_paths: Path = output_dir.joinpath("collector", "full_parcel_list.pkg")
full_images = pd.read_pickle(full_images_paths)

full_images = full_images[pd.to_datetime(full_images["completionDate"]).dt.month == month]

if local_dir is not None:
full_images["productIdentifier"] = str(local_dir) + full_images[
"productIdentifier"
Expand All @@ -116,6 +122,11 @@ def _get_arguments(
band_image_path: Path = output_dir.joinpath("copier", "band_images.pkg")
band_images: pd.DataFrame = pd.read_pickle(band_image_path)

# filter out month
band_images = band_images[
band_images["productIdentifier"].isin(full_images["productIdentifier"])
]

max_workers = min(mp_orig.cpu_count(), max(1, min(len(band_images), workers)))
args = []
with mp_orig.Pool(processes=max_workers) as p:
Expand Down Expand Up @@ -186,26 +197,22 @@ def _process_raster_parallel(
Final DataFrame with clipped parcel reflectance values for this given raster tile.
"""

try:
# all parcel ids that match product Identifier
parcel_ids = list(filtered_images[parcel_id_name])
parcel_ids = [int(id) for id in parcel_ids]
# observation date
product_date = str(filtered_images["completionDate"].unique()[0])
# all parcel ids that match product Identifier
parcel_ids = list(filtered_images[parcel_id_name])
parcel_ids = [int(id) for id in parcel_ids]
# observation date
product_date = str(filtered_images["completionDate"].unique()[0])

# geometry information of all parcels
filtered_geom = polygon_df[polygon_df[parcel_id_name].isin(parcel_ids)]
# geometry information of all parcels
filtered_geom = polygon_df[polygon_df[parcel_id_name].isin(parcel_ids)]

result = mask_polygon_raster(
satellite, band_tiles, bands, filtered_geom, parcel_id_name, product_date, denoise
)

if result is not None:
result.set_index(parcel_id_name, inplace=True)
else:
result = pd.DataFrame()
result = mask_polygon_raster(
satellite, band_tiles, bands, filtered_geom, parcel_id_name, product_date, denoise
)

except Exception:
if result is not None:
result.set_index(parcel_id_name, inplace=True)
else:
result = pd.DataFrame()

return result
Expand All @@ -219,6 +226,7 @@ def clipping(
chunk_size: int,
multiplier: int,
local_dir: Path | None = None,
rebuild: bool = False,
) -> None:
"""Main function to conduct polygon clipping.

Expand All @@ -230,83 +238,101 @@ def clipping(
chunk_size: Chunk size used for multiprocessed raster clipping.
multiplier: Intermediate results will be saved every multiplier steps.
local_dir: Local directory where the .SAFE files were copied to.
rebuild: Whether to re-build the clipped parquet files for each month.
This will overwrite the existing ones.
"""
args, polygon_df, clipping_path = _get_arguments(
config=config,
workers=workers,
shape_dir=shape_dir,
output_dir=output_dir,
local_dir=local_dir,
)
for month in tqdm(
range(config.months[0], config.months[1] + 1), desc="Clipping rasters on monthly basis"
):

month_name = calendar.month_name[month]
logger.info(f"Starting clipping process for {month_name.upper()}...")

args_month, polygon_df_month, clipping_path = _get_arguments(
config=config,
workers=workers,
shape_dir=shape_dir,
output_dir=output_dir,
month=month,
local_dir=local_dir,
)

max_workers = min(mp_orig.cpu_count(), max(1, min(len(args), workers)))
max_workers = min(mp_orig.cpu_count(), max(1, min(len(args_month), workers)))

clipped_dir = clipping_path.joinpath("clipped")
clipped_dir.mkdir(exist_ok=True, parents=True)
clipped_dir = clipping_path.joinpath("clipped")
clipped_dir.mkdir(exist_ok=True, parents=True)

# Process data in smaller chunks
file_counts = len(list(clipped_dir.rglob("Final_*.pkg")))
# Process data in smaller chunks
file_counts = len(list(clipped_dir.rglob("Final_*.pkg")))

processed = file_counts * multiplier * chunk_size
save_files = multiplier * chunk_size
file_counts += 1
processed = file_counts * multiplier * chunk_size
save_files = multiplier * chunk_size
file_counts += 1

polygon_df[config.parcel_id_name] = polygon_df[config.parcel_id_name].astype(int)
func = partial(
_process_raster_parallel,
config.satellite,
config.denoise,
polygon_df,
cast(str, config.parcel_id_name),
config.bands,
)
polygon_df_month[config.parcel_id_name] = polygon_df_month[config.parcel_id_name].astype(
int
)
func = partial(
_process_raster_parallel,
config.satellite,
config.denoise,
polygon_df_month,
cast(str, config.parcel_id_name),
config.bands,
)

polygon_df = polygon_df.drop(["geometry"], axis=1)
df_final = polygon_df.copy()
df_final.set_index(config.parcel_id_name, inplace=True)

new_data: bool = False
if processed < len(args):
new_data = True
logger.info("Starting parallel raster clipping...")
te = tqdm(total=len(args) - processed, desc="Clipping raster tiles.")
while processed < len(args):
chunk_args: list[tuple[pd.DataFrame, list]] = args[processed : processed + chunk_size]
results: list[pd.DataFrame] = []

with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(func, *arg) for arg in chunk_args]

for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception:
results.append(None)

# Process and save results
for result in results:
if result is not None and not result.empty:
df_final = df_final.fillna(result)
te.update(n=1)

processed += len(chunk_args)
if processed % save_files == 0:
df_final.to_pickle(clipped_dir.joinpath(f"Final_{file_counts}.pkg"))
del df_final
df_final = polygon_df.copy()
df_final.set_index(config.parcel_id_name, inplace=True)
file_counts += 1
# Clear variables to release memory
del chunk_args, futures
gc.collect()

df_final.to_pickle(clipped_dir.joinpath(f"Final_{file_counts}.pkg"))

_merge_dataframe(
polygon_df,
clipped_dir,
clipping_path,
cast(str, config.parcel_id_name),
new_data,
)
polygon_df_month = polygon_df_month.drop(["geometry"], axis=1)
df_final_month = polygon_df_month.copy()
df_final_month.set_index(config.parcel_id_name, inplace=True)

new_data: bool = False
if processed < len(args_month):
new_data = True
te = tqdm(
total=len(args_month) - processed, desc=f"Clipping raster tiles for {month_name}..."
)
while processed < len(args_month):
chunk_args: list[tuple[pd.DataFrame, list]] = args_month[
processed : processed + chunk_size
]
results: list[pd.DataFrame] = []

with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(func, *arg) for arg in chunk_args]

for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
results.append(result)
except AttributeError:
results.append(None)

# Process and save results
for result in results:
if result is not None and not result.empty:
df_final_month = df_final_month.fillna(result)
te.update(n=1)

processed += len(chunk_args)
if processed % save_files == 0:
df_final_month.to_pickle(clipped_dir.joinpath(f"Final_{file_counts}.pkg"))
del df_final_month
df_final_month = polygon_df_month.copy()
df_final_month.set_index(config.parcel_id_name, inplace=True)
file_counts += 1
# Clear variables to release memory
del chunk_args, futures
gc.collect()

df_final_month.to_pickle(clipped_dir.joinpath(f"Final_{file_counts}.pkg"))
te.close()

_merge_dataframe(
polygon_df_month,
clipped_dir,
clipping_path,
cast(str, config.parcel_id_name),
month_name,
new_data,
rebuild,
)
Loading