-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmin_max_temp_df.py
49 lines (43 loc) · 1.85 KB
/
min_max_temp_df.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from taming_pyspark.utils.spark_runner import spark_session_runner
from taming_pyspark.config import BaseConfig
def show_min_or_max_temps(spark: SparkSession, show_max: bool = False):
"""
Shows min weather temperature observed for each weather station by default
Setting show_max, will instead show the max temperature recorded
Data:
weather_station_id, date_identifier, observation_type, measurement, other... data
date_identifier = f'{4-digit-year}{2-digit-month}-(2-digit-date-integer)'
observation_type =
TMIN - minimum temperature
TMAX - maximum temperature
PRCP - precipitation %
:param show_max: Boolean to should the max temp recorded at a station instead of the min
:param spark: SparkSession class instance
:return: None
"""
data_file = f'{BaseConfig.DATA_FOLDER}/{BaseConfig.TEMP_1800S}/1800.csv'
schema = StructType() \
.add(StructField('weather_station_identifier', StringType(), False)) \
.add(StructField('date_identifier', StringType(), False)) \
.add(StructField('observation_type', StringType(), False)) \
.add(StructField('measurement', IntegerType(), False))
# Dataframe
df = spark.read.csv(path=data_file, schema=schema)
if not show_max:
df.filter(df["observation_type"] == "TMIN") \
.groupby("weather_station_identifier") \
.min("measurement") \
.show()
else:
df.filter(df["observation_type"] == "TMAX") \
.groupby("weather_station_identifier") \
.max("measurement") \
.show()
if __name__ == '__main__':
spark_session_runner(
show_min_or_max_temps,
app_name="1800s_Min_or_Max_Temp",
show_max=True
)