From fbb5584d9e991653c9a12d66ef51b252cfff6c48 Mon Sep 17 00:00:00 2001 From: YUE LI Date: Wed, 18 Dec 2024 16:02:19 +0800 Subject: [PATCH] revise intersection and damage level calculation to batch process --- scripts/2_disruption_analysis.py | 613 +++++++++++++++++++------------ 1 file changed, 372 insertions(+), 241 deletions(-) diff --git a/scripts/2_disruption_analysis.py b/scripts/2_disruption_analysis.py index 5ff9041..d9af313 100644 --- a/scripts/2_disruption_analysis.py +++ b/scripts/2_disruption_analysis.py @@ -1,21 +1,28 @@ +# %% import sys from pathlib import Path -# import pandas as pd import geopandas as gpd +from collections import defaultdict from snail import io, intersection from nird.utils import load_config import warnings -warnings.simplefilter("ignore") +warnings.filterwarnings("ignore") base_path = Path(load_config()["paths"]["base_path"]) -raster_path = Path(load_config()["paths"]["JBA_data"]) +raster_path = Path(load_config()["paths"]["JBA_data"]) / "completed" -def intersect_features_with_raster(raster_path, raster_key, features): +# %% +def intersect_features_with_raster( + raster_path, + raster_key, + features, + flood_type, +): print(f"Intersecting features with raster {raster_key}...") # read the raster data: depth (meter) raster = io.read_raster_band_data(raster_path) @@ -27,8 +34,8 @@ def intersect_features_with_raster(raster_path, raster_key, features): if intersections is None: sys.exit() intersections = intersection.apply_indices(intersections, grid) - intersections["flood_depth"] = intersection.get_raster_values_for_splits( - intersections, raster + intersections[f"flood_depth_{flood_type}"] = ( + intersection.get_raster_values_for_splits(intersections, raster) ) # reproject back @@ -36,7 +43,11 @@ def intersect_features_with_raster(raster_path, raster_key, features): return intersections -def clip_features(features, clip_path, raster_key): +def clip_features( + features, + clip_path, + raster_key, +): print(f"Clipping features based on {raster_key}...") clips = gpd.read_file(clip_path, engine="pyogrio") if clips.crs != "epsg:4326": @@ -50,23 +61,11 @@ def clip_features(features, clip_path, raster_key): return clipped_features -def identify_disrupted_links(road_links, flood_key, flood_path, clip_path, out_path): - # convert feature crs to WGS-84 (same as raster) - road_links = road_links.to_crs("epsg:4326") - # mask - clipped_features = clip_features(road_links, clip_path, flood_key) - # intersection - intersections = intersect_features_with_raster( - flood_path, flood_key, clipped_features - ) - # export intersected featrues - intersections.to_parquet(out_path, index=False) - print("Complete identifying the disrupted links!") - - return intersections - - -def compute_maximum_speed_on_flooded_roads(depth, free_flow_speed, threshold=30): +def compute_maximum_speed_on_flooded_roads( + depth, + free_flow_speed, + threshold=30, +): depth = depth * 100 # m to cm if depth < threshold: # cm # value = 0.0009 * depth**2 - 0.5529 * depth + 86.9448 # kmph @@ -80,99 +79,249 @@ def compute_damage_level_on_flooded_roads( fldType, road_classification, trunk_road, - hasTunnel, + road_label, fldDepth, ): depth = fldDepth * 100 # cm if fldType == "surface": - if hasTunnel == 1 and ( + if road_label == "tunnel" and ( road_classification == "Motorway" - or (road_classification == "A Road" and trunk_road == "true") + or (road_classification == "A Road" and trunk_road) ): - if depth < 150: - return 0 # "no" - if 150 <= depth < 200: - return 1 # "minor" - elif 200 <= depth < 300: - return 2 # "moderate" - elif 300 <= depth < 700: - return 3 # "extensive" + if depth < 50: + return "no" + if 50 <= depth < 100: + return "minor" + elif 100 <= depth < 200: + return "moderate" + elif 200 <= depth < 600: + return "extensive" else: - return 4 # "severe" - elif hasTunnel == 0 and ( + return "severe" + elif road_label != "tunnel" and ( road_classification == "Motorway" - or (road_classification == "A Road" and trunk_road == "true") + or (road_classification == "A Road" and trunk_road) ): - if depth < 150: - return 0 # "no" - if 150 <= depth < 200: - return 0 # "no" - elif 200 <= depth < 300: - return 0 # "no" - elif 300 <= depth < 700: - return 1 # "minor" + if depth < 50: + return "no" + if 50 <= depth < 100: + return "no" + elif 100 <= depth < 200: + return "no" + elif 200 <= depth < 600: + return "minor" else: - return 2 # "moderate" + return "moderate" else: if depth < 50: - return 0 # "no" + return "no" if 50 <= depth < 100: - return 0 # "no" + return "no" elif 100 <= depth < 200: - return 1 # "minor" + return "minor" elif 200 <= depth < 600: - return 1 # "minor" + return "minor" else: - return 2 # "moderate" + return "moderate" elif fldType == "river": - if hasTunnel == 1 and ( + if road_label == "tunnel" and ( road_classification == "Motorway" - or (road_classification == "A Road" and trunk_road == "true") + or (road_classification == "A Road" and trunk_road) ): - if depth < 250: - return 0 # "no" - if 250 <= depth < 300: - return 1 # "minor" - elif 300 <= depth < 400: - return 1 # "minor" - elif 400 <= depth < 800: - return 2 # "moderate" + if depth < 50: + return "no" + if 50 <= depth < 100: + return "minor" + elif 100 <= depth < 200: + return "minor" + elif 200 <= depth < 600: + return "moderate" else: - return 3 # "extensive" - elif hasTunnel == 0 and ( + return "extensive" + elif road_label != "tunnel" and ( road_classification == "Motorway" - or (road_classification == "A Road" and trunk_road == "true") + or (road_classification == "A Road" and trunk_road) ): - if depth < 250: - return 0 # "no" - if 250 <= depth < 300: - return 1 # "minor" - elif 300 <= depth < 400: - return 2 # "moderate" - elif 400 <= depth < 800: - return 3 # "extensive" + if depth < 50: + return "no" + if 50 <= depth < 100: + return "minor" + elif 100 <= depth < 200: + return "moderate" + elif 200 <= depth < 600: + return "extensive" else: - return 4 # "severe" + return "severe" else: if depth < 50: - return 1 # "minor" + return "minor" if 50 <= depth < 100: - return 2 # "moderate" + return "moderate" elif 100 <= depth < 200: - return 2 # "moderate" + return "moderate" elif 200 <= depth < 600: - return 3 # "extensive" + return "extensive" else: - return 4 # "severe" + return "severe" else: print("Please enter the type of flood!") -def main(flood_type=None): - # road links - road_links = gpd.read_parquet( - base_path / "networks" / "road" / "GB_road_link_file.pq" +def intersections_with_damage( + road_links: gpd.GeoDataFrame, + flood_key: str, + flood_type: str, + flood_path: str, + clip_path: str, +) -> gpd.GeoDataFrame: + """ + Calculate flood depth and damage levels for individual road segments. + + Parameters: + road_links (GeoDataFrame): Road links with geometries and classifications. + flood_key (str): Key identifier for the flood dataset. + flood_type (str): Type of flood ("surface" or "river"). + flood_path (str): Path to the flood raster file. + clip_path (str): Path to the vector file used for clipping. + + Returns: + GeoDataFrame: Intersections with calculated flood depths and damage levels. + """ + # Clip road links with features in the provided vector file + road_links = road_links.to_crs("epsg:4326") + clipped_features = clip_features(road_links, clip_path, flood_key) + + # Perform intersection analysis with the flood raster + intersections = intersect_features_with_raster( + flood_path, + flood_key, + clipped_features, + flood_type, + ) + intersections.reset_index(drop=True, inplace=True) + # Adjust flood depths for embankment heights based on road classification + """ + embankment against surface flood: 100 cm + embankment against river flood: 200 cm + """ + if flood_type == "surface": + intersections.loc[ + (intersections.road_classification == "Motorway") + | ( + (intersections.road_classification == "A Road") + & (intersections["trunk_road"]) + ), + "flood_depth_surface", + ] = (intersections["flood_depth_surface"] - 100).clip(lower=0) + else: + intersections.loc[ + (intersections.road_classification == "Motorway") + | ( + (intersections.road_classification == "A Road") + & (intersections["trunk_road"]) + ), + "flood_depth_river", + ] = (intersections["flood_depth_river"] - 200).clip(lower=0) + + # Compute damage levels for flooded road segments + intersections[f"damage_level_{flood_type}"] = intersections.apply( + lambda row: compute_damage_level_on_flooded_roads( + flood_type, + row["road_classification"], + row["trunk_road"], + row["road_label"], + row[f"flood_depth_{flood_type}"], + ), + axis=1, + ) + + return intersections + + +def features_with_damage( + features, + intersections, + damage_level_dict, + damage_level_dict_reverse, +): + """ + Calculate damage levels and maximum flood depth for road links. + + Parameters: + features (GeoDataFrame): GeoDataFrame of road links. + intersections (GeoDataFrame): GeoDataFrame of intersections with flood data. + damage_level_dict (dict): Mapping of damage levels to numerical values. + damage_level_dict_reverse (dict): Reverse mapping of numerical values to damage levels. + + Returns: + GeoDataFrame: Updated features with maximum flood depth and damage level. + """ + # Flood depth + if ( + "flood_depth_surface" in intersections.columns + and "flood_depth_river" in intersections.columns + ): + intersections["flood_depth_max"] = intersections[ + ["flood_depth_surface", "flood_depth_river"] + ].max(axis=1) + elif "flood_depth_surface" in intersections.columns: + intersections["flood_depth_max"] = intersections.flood_depth_surface + elif "flood_depth_river" in intersections.columns: + intersections["flood_depth_max"] = intersections.flood_depth_river + else: + print("Error: flood depth columns are missing!") + sys.exit() + + # Damage level + if ( + "damage_level_surface" in intersections.columns + and "damage_level_river" in intersections.columns + ): + intersections["damage_level_surface"] = intersections[ + "damage_level_surface" + ].map(damage_level_dict) + intersections["damage_level_river"] = intersections["damage_level_river"].map( + damage_level_dict + ) + intersections["damage_level_max"] = intersections[ + ["damage_level_surface", "damage_level_river"] + ].max(axis=1) + elif "damage_level_surface" in intersections.columns: + intersections["damage_level_surface"] = intersections[ + "damage_level_surface" + ].map(damage_level_dict) + intersections["damage_level_max"] = intersections.damage_level_surface + elif "damage_level_river" in intersections.columns: + intersections["damage_level_river"] = intersections["damage_level_river"].map( + damage_level_dict + ) + intersections["damage_level_max"] = intersections.damage_level_river + else: + print("Error: damage level columns are missing!") + + intersections_gp = intersections.groupby("e_id", as_index=False).agg( + { + "flood_depth_max": "max", + "damage_level_max": "max", + } ) + intersections_gp["damage_level_max"] = intersections_gp.damage_level_max.astype( + int + ).map(damage_level_dict_reverse) + + features = features.merge( + intersections_gp[["e_id", "flood_depth_max", "damage_level_max"]], + how="left", + on="e_id", + ) + features["flood_depth_max"] = features["flood_depth_max"].fillna(0.0) + features["damage_level_max"] = features["damage_level_max"].fillna("no") + + return features + + +def main(): + # base scenario simulation results base_scenario_links = gpd.read_parquet( base_path.parent / "outputs" / "edge_flows_partial_1128.pq" ) @@ -184,177 +333,159 @@ def main(flood_type=None): inplace=True, ) - # damage level dict + # damage level dicts damage_level_dict = { - 0: "no", - 1: "minor", - 2: "moderate", - 3: "extensive", - 4: "severe", + "no": 0, + "minor": 1, + "moderate": 2, + "extensive": 3, + "severe": 4, } - - # paths - flood_path = ( - raster_path - / "completed" - / "12-14_2007 Summer_UK Floods" - / "May" - / "Raster" - / "UK_2007_May_FLSW_RD_5m_4326.tif" - ) - flood_key = flood_path.stem - - clip_path = ( - raster_path - / "completed" - / "12-14_2007 Summer_UK Floods" - / "May" - / "Vector" - / "UK_2007_May_FLSW_VE_5m_4326.shp" - ) - - out_path = ( - base_path.parent - / "outputs" - / "disruption_analysis_1129" - / f"{flood_key}_fld_depth.geopq" - ) - - # intersection analysis - intersections = identify_disrupted_links( - road_links, - flood_key, - flood_path, - clip_path, - out_path, # export to file - ) - - # for debug - # intersections = gpd.read_parquet(out_path) - - # DAMAGE LEVEL ESTIMATION - if flood_type == "surface": - intersections["damage_level"] = intersections.apply( - lambda row: compute_damage_level_on_flooded_roads( - "surface", - row["road_classification"], - row["trunk_road"], - row["hasTunnel"], - row["flood_depth"], - ), - axis=1, + damage_level_dict_reverse = {i: k for k, i in damage_level_dict.items()} + + # flood event classification into surface/river flood + flood_types = ["surface", "river", "both"] + event_files = {flood_type: [] for flood_type in ["surface", "river"]} + # Iterate through flood types and process files + for flood_type in flood_types: + folder_path = raster_path / flood_type + if folder_path.exists(): + for raster_dir in folder_path.rglob( + "Raster" + ): # Search for "Raster" directories + for tif_file in raster_dir.rglob( + "*.tif" + ): # Find .tif files recursively + # Filter files with "RD" in the name and exclude those with "IE" + if "RD" in tif_file.name and "IE" not in tif_file.name: + if flood_type == "both": + if "FLSW" in tif_file.name: + target_flood_type = "surface" + elif "FLRF" in tif_file.name: + target_flood_type = "river" + else: + continue + else: + target_flood_type = flood_type + + # Append the file path to the appropriate flood_type list + event_files[target_flood_type].append(str(tif_file)) + + event_dict = defaultdict(lambda: defaultdict(list)) + for flood_type, list_of_events in event_files.items(): + for event_path in list_of_events: + event_key = "_".join( + Path(event_path).stem.split("_")[:2] + ) # rename events under "both" category + event_dict[event_key][flood_type].append(event_path) + + # analysis + for flood_key, v in event_dict.items(): + # road links + road_links = gpd.read_parquet( + base_path / "networks" / "road" / "GB_road_link_file.pq" ) - elif flood_type == "river": - intersections["damage_level"] = intersections.apply( - lambda row: compute_damage_level_on_flooded_roads( - "river", - row.road_classification, - row.trunk_road, - row.hasTunnel, - row["flood_depth"], - ), - axis=1, + # added columns for debug + road_links["road_label"] = "road" + road_links.loc[road_links.road_structure == "Road In Tunnel", "road_label"] = ( + "tunnel" + ) + road_links.loc[road_links.aveBridgeWidth > 0, "road_label"] = "bridge" + + # out path + out_path = ( + base_path.parent + / "outputs" + / "disruption_analysis_1129" + / f"intersections_{flood_key}.pq" + ) + intersections = gpd.GeoDataFrame(columns=["e_id", "index_i", "index_j"]) + # debug + if flood_key != "UK_1998": + continue + + for flood_type, flood_paths in v.items(): + for flood_path in flood_paths: + # clip path + clip_path = Path( + flood_path.replace("Raster", "Vector").replace(".tif", ".shp") + ) + clip_path1 = clip_path.with_name(clip_path.name.replace("RD", "VE")) + clip_path2 = clip_path.with_name(clip_path.name.replace("RD", "PR")) + + if clip_path1.exists(): + clip_path = clip_path1 + elif clip_path2.exists(): + clip_path = clip_path2 + else: + print(f"Cannot find vector file for: {flood_path}") + continue # Skip further processing for this file + + # intersections + temp_file = intersections_with_damage( + road_links, flood_key, flood_type, flood_path, clip_path + ) + intersections = intersections.merge( + temp_file[ + [ + "e_id", + "index_i", + "index_j", + f"flood_depth_{flood_type}", + f"damage_level_{flood_type}", + ] + ], + on=["e_id", "index_i", "index_j"], + how="outer", + ) + + # save intersectiosn for damage analysis + intersections.to_parquet(out_path) + + # road integrations + road_links = features_with_damage( + road_links, + intersections, + damage_level_dict, + damage_level_dict_reverse, ) - else: - print("Damage level error: Please enter flood type!") - breakpoint() - intersections.to_parquet(out_path) - intersections = intersections.groupby("id", as_index=False).agg( - {"flood_depth": "max", "damage_level": "max"} - ) - intersections["damage_level"] = intersections.damage_level.map(damage_level_dict) - road_links = road_links.merge( - intersections[["id", "flood_depth", "damage_level"]], how="left", on="id" - ) - road_links["flood_depth"] = road_links["flood_depth"].fillna(0.0) - road_links["damage_level"] = road_links["damage_level"].fillna("no") - - # MAX SPEED ESTIMATION - # attach capacity and speed info on D-0 - road_links = road_links.merge( - base_scenario_links[ - [ - "id", - "current_capacity", - "current_speed", - "free_flow_speeds", - "initial_flow_speeds", - "min_flow_speeds", - ] - ], - how="left", - on="id", - ) + # max speed estimation + """ + Uncertainties of flood depth threshold for road closure (cm): 15, 30, 60 + """ + # attach capacity and speed info on D-0 + road_links = road_links.merge( + base_scenario_links[ + [ + "e_id", + "current_capacity", + "current_speed", + "free_flow_speeds", + "initial_flow_speeds", + "min_flow_speeds", + ] + ], + how="left", + on="e_id", + ) - # uncertainties: flood depth threshold for road closure (15, 30, 60 cm) - """ - Embankment height (cm): - Surface River - Motorway and A-trunk 100 200 - Others 0 0 - """ - # motorway and a-trunk - if flood_type == "surface": - road_links.loc[ - (road_links["road_classification"] == "Motorway") - | ( - (road_links["road_classification"] == "A Road") - & (road_links["trunk_road"]) - ), - "max_speed", - ] = road_links.apply( + # compute maximum speed restriction on individual road links + road_links["max_speed"] = road_links.apply( lambda row: compute_maximum_speed_on_flooded_roads( - max(row["flood_depth"] - 1.0, 0.0), # 100 cm embankment height + row["flood_depth_max"], row["free_flow_speeds"], threshold=30, ), axis=1, ) - elif flood_type == "river": - road_links.loc[ - (road_links["road_classification"] == "Motorway") - | ( - (road_links["road_classification"] == "A Road") - & (road_links["trunk_road"]) - ), - "max_speed", - ] = road_links.apply( - lambda row: compute_maximum_speed_on_flooded_roads( - max(row["flood_depth"] - 2.0, 0.0), # 200 cm embankment height - row["free_flow_speeds"], - threshold=30, - ), - axis=1, + road_links.to_parquet( + base_path.parent + / "outputs" + / "disruption_analysis_1129" + / f"road_links_{flood_key}.pq" ) - else: - print("Max speed error: please enter flood type!") - - # others - road_links.loc[ - ~( - (road_links["road_classification"] == "Motorway") - | ( - (road_links["road_classification"] == "A Road") - & (road_links["trunk_road"]) - ) - ), - "max_speed", - ] = road_links.apply( - lambda row: compute_maximum_speed_on_flooded_roads( - row["flood_depth"], # no embankment - row["free_flow_speeds"], - threshold=30, - ), - axis=1, - ) - - road_links.to_parquet(base_path / "disruption_analysis_1129" / "road_links.pq") - road_links.drop(columns="geometry", inplace=True) - road_links.to_csv( - base_path / "disruption_analysis_1129" / "road_links.csv", index=False - ) if __name__ == "__main__": - main(flood_type="surface") # "surface"/"river" + main()