Skip to content

Commit

Permalink
revise module 2 and 3 to compute damage based on road segments
Browse files Browse the repository at this point in the history
  • Loading branch information
YueeeeeLi committed Dec 16, 2024
1 parent 80befd1 commit 27c81cc
Show file tree
Hide file tree
Showing 2 changed files with 586 additions and 363 deletions.
210 changes: 136 additions & 74 deletions scripts/2_disruption_analysis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# %%
import sys
from pathlib import Path

Expand All @@ -16,7 +15,6 @@
raster_path = Path(load_config()["paths"]["JBA_data"])


# %%
def intersect_features_with_raster(raster_path, raster_key, features):
print(f"Intersecting features with raster {raster_key}...")
# read the raster data: depth (meter)
Expand Down Expand Up @@ -62,17 +60,17 @@ def identify_disrupted_links(road_links, flood_key, flood_path, clip_path, out_p
flood_path, flood_key, clipped_features
)
# export intersected featrues
intersections.to_csv(out_path, index=False)
intersections.to_parquet(out_path, index=False)
print("Complete identifying the disrupted links!")

return intersections


def compute_maximum_speed_on_flooded_roads(depth, free_flow_speed):
depth = depth * 1000 # m to mm
if depth < 300: # mm: !!! uncertainty analysis (150, 300, 600)
def compute_maximum_speed_on_flooded_roads(depth, free_flow_speed, threshold=30):
depth = depth * 100 # m to cm
if depth < threshold: # cm
# value = 0.0009 * depth**2 - 0.5529 * depth + 86.9448 # kmph
value = free_flow_speed * (depth / 300 - 1) ** 2 # mph
value = free_flow_speed * (depth / threshold - 1) ** 2 # mph
return value # mph
else:
return 0.0 # mph
Expand All @@ -92,80 +90,80 @@ def compute_damage_level_on_flooded_roads(
or (road_classification == "A Road" and trunk_road == "true")
):
if depth < 150:
return "no"
return 0 # "no"
if 150 <= depth < 200:
return "minor"
return 1 # "minor"
elif 200 <= depth < 300:
return "moderate"
return 2 # "moderate"
elif 300 <= depth < 700:
return "extensive"
return 3 # "extensive"
else:
return "severe"
return 4 # "severe"
elif hasTunnel == 0 and (
road_classification == "Motorway"
or (road_classification == "A Road" and trunk_road == "true")
):
if depth < 150:
return "no"
return 0 # "no"
if 150 <= depth < 200:
return "no"
return 0 # "no"
elif 200 <= depth < 300:
return "no"
return 0 # "no"
elif 300 <= depth < 700:
return "minor"
return 1 # "minor"
else:
return "moderate"
return 2 # "moderate"
else:
if depth < 50:
return "no"
return 0 # "no"
if 50 <= depth < 100:
return "no"
return 0 # "no"
elif 100 <= depth < 200:
return "minor"
return 1 # "minor"
elif 200 <= depth < 600:
return "minor"
return 1 # "minor"
else:
return "moderate"
return 2 # "moderate"
elif fldType == "river":
if hasTunnel == 1 and (
road_classification == "Motorway"
or (road_classification == "A Road" and trunk_road == "true")
):
if depth < 250:
return "no"
return 0 # "no"
if 250 <= depth < 300:
return "minor"
return 1 # "minor"
elif 300 <= depth < 400:
return "minor"
return 1 # "minor"
elif 400 <= depth < 800:
return "moderate"
return 2 # "moderate"
else:
return "extensive"
return 3 # "extensive"
elif hasTunnel == 0 and (
road_classification == "Motorway"
or (road_classification == "A Road" and trunk_road == "true")
):
if depth < 250:
return "no"
return 0 # "no"
if 250 <= depth < 300:
return "minor"
return 1 # "minor"
elif 300 <= depth < 400:
return "moderate"
return 2 # "moderate"
elif 400 <= depth < 800:
return "extensive"
return 3 # "extensive"
else:
return "severe"
return 4 # "severe"
else:
if depth < 50:
return "minor"
return 1 # "minor"
if 50 <= depth < 100:
return "moderate"
return 2 # "moderate"
elif 100 <= depth < 200:
return "moderate"
return 2 # "moderate"
elif 200 <= depth < 600:
return "extensive"
return 3 # "extensive"
else:
return "severe"
return 4 # "severe"
else:
print("Please enter the type of flood!")

Expand All @@ -185,6 +183,16 @@ def main(flood_type=None):
},
inplace=True,
)

# damage level dict
damage_level_dict = {
0: "no",
1: "minor",
2: "moderate",
3: "extensive",
4: "severe",
}

# paths
flood_path = (
raster_path
Expand All @@ -208,8 +216,8 @@ def main(flood_type=None):
out_path = (
base_path.parent
/ "outputs"
/ "disruption_analysis"
/ f"{flood_key}_fld_depth.csv"
/ "disruption_analysis_1129"
/ f"{flood_key}_fld_depth.geopq"
)

# intersection analysis
Expand All @@ -218,26 +226,52 @@ def main(flood_type=None):
flood_key,
flood_path,
clip_path,
out_path,
out_path, # export to file
)

# for debug
# flood_key = "UK_2007_May_FLSW"
# intersections = pd.read_csv(
# base_path.parent
# / "outputs"
# / "disruption_analysis"
# / "UK_2007_May_FLSW_fld_depth.csv"
# )
# intersections = gpd.read_parquet(out_path)

intersections = intersections.groupby("id", as_index=False)["flood_depth"].max()
# DAMAGE LEVEL ESTIMATION
if flood_type == "surface":
intersections["damage_level"] = intersections.apply(
lambda row: compute_damage_level_on_flooded_roads(
"surface",
row["road_classification"],
row["trunk_road"],
row["hasTunnel"],
row["flood_depth"],
),
axis=1,
)
elif flood_type == "river":
intersections["damage_level"] = intersections.apply(
lambda row: compute_damage_level_on_flooded_roads(
"river",
row.road_classification,
row.trunk_road,
row.hasTunnel,
row["flood_depth"],
),
axis=1,
)
else:
print("Damage level error: Please enter flood type!")
breakpoint()
intersections.to_parquet(out_path)
intersections = intersections.groupby("id", as_index=False).agg(
{"flood_depth": "max", "damage_level": "max"}
)

intersections["damage_level"] = intersections.damage_level.map(damage_level_dict)
road_links = road_links.merge(
intersections[["id", "flood_depth"]], how="left", on="id"
intersections[["id", "flood_depth", "damage_level"]], how="left", on="id"
)
road_links["flood_depth"] = road_links["flood_depth"].fillna(0.0)
road_links["damage_level"] = road_links["damage_level"].fillna("no")

# attach capacity and speed info on D-zero (Edge_flows)
# MAX SPEED ESTIMATION
# attach capacity and speed info on D-0
road_links = road_links.merge(
base_scenario_links[
[
Expand All @@ -253,39 +287,67 @@ def main(flood_type=None):
on="id",
)

# calculate the maximum speeds on flooded road links
road_links["max_speed"] = road_links.apply(
lambda row: compute_maximum_speed_on_flooded_roads(
row["flood_depth"], row["free_flow_speeds"]
),
axis=1,
)

# estimate damage levels
# uncertainties: flood depth threshold for road closure (15, 30, 60 cm)
"""
Embankment height (cm):
Surface River
Motorway and A-trunk 100 200
Others 0 0
"""
# motorway and a-trunk
if flood_type == "surface":
road_links["damage_level"] = road_links.apply(
lambda row: compute_damage_level_on_flooded_roads(
"surface",
row["road_classification"],
row["trunk_road"],
row["hasTunnel"],
row["flood_depth"],
road_links.loc[
(road_links["road_classification"] == "Motorway")
| (
(road_links["road_classification"] == "A Road")
& (road_links["trunk_road"])
),
"max_speed",
] = road_links.apply(
lambda row: compute_maximum_speed_on_flooded_roads(
max(row["flood_depth"] - 1.0, 0.0), # 100 cm embankment height
row["free_flow_speeds"],
threshold=30,
),
axis=1,
)
elif flood_type == "river":
road_links["damage_level"] = road_links.apply(
lambda row: compute_damage_level_on_flooded_roads(
"river",
row.road_classification,
row.trunk_road,
row.hasTunnel,
row["flood_depth"],
road_links.loc[
(road_links["road_classification"] == "Motorway")
| (
(road_links["road_classification"] == "A Road")
& (road_links["trunk_road"])
),
"max_speed",
] = road_links.apply(
lambda row: compute_maximum_speed_on_flooded_roads(
max(row["flood_depth"] - 2.0, 0.0), # 200 cm embankment height
row["free_flow_speeds"],
threshold=30,
),
axis=1,
)
else:
print("Please enter flood type!")
print("Max speed error: please enter flood type!")

# others
road_links.loc[
~(
(road_links["road_classification"] == "Motorway")
| (
(road_links["road_classification"] == "A Road")
& (road_links["trunk_road"])
)
),
"max_speed",
] = road_links.apply(
lambda row: compute_maximum_speed_on_flooded_roads(
row["flood_depth"], # no embankment
row["free_flow_speeds"],
threshold=30,
),
axis=1,
)

road_links.to_parquet(base_path / "disruption_analysis_1129" / "road_links.pq")
road_links.drop(columns="geometry", inplace=True)
Expand Down
Loading

0 comments on commit 27c81cc

Please sign in to comment.