-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdelivery-eta.py
166 lines (128 loc) · 6.2 KB
/
delivery-eta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# > Delivery ETA task and dataset
import kaggle
import polars as pl
import numpy as np
from loguru import logger
from sklearn.preprocessing import OrdinalEncoder
import lib
from preprocessing.util import save_dataset
TMP_DATA_PATH = lib.PROJECT_DIR/'preprocessing/tmp/delivery-eta'
TMP_DATA_PATH.mkdir(exist_ok=True, parents=True)
def main():
# ================================================================
# >> Load and split into num, bin, cat
# ================================================================
logger.info('Preprocessing delivery eta dataset')
kaggle.api.dataset_download_files('pcovkrd84mejm/delivery-eta', path=TMP_DATA_PATH)
lib.unzip(TMP_DATA_PATH/'delivery-eta.zip')
# Store full_index to help with potential future experiments on large data
data = pl.read_parquet(TMP_DATA_PATH/'delivery_eta.parquet').with_row_index(name="index_in_full")
bin_cols = [c for c in data.columns if c.startswith('bin')]
cat_cols = [c for c in data.columns if c.startswith('cat') and c not in ['cat_1', 'cat_2']]
num_cols = [c for c in data.columns if c.startswith('num') or c in ['day_of_week', 'minute_of_day', 'hour_of_day']]
# Filter samples with less than 1 minute delivery time (only a few of those)
# Subsample to allow for faster experiment iterations
data = data.filter(pl.col('delivery_eta_minutes').ge(1.0))
data_cat = data.select(cat_cols)
data = data.drop(cat_cols)
data = pl.concat([
data,
pl.DataFrame(
OrdinalEncoder(min_frequency=1/100).fit_transform(data_cat.to_numpy()),
schema={c: pl.Int64 for c in data_cat.columns}
)
], how="horizontal")
data = data.sample(fraction=0.025, seed=0)
data = data.sort('timestamp')
data = data.with_columns(
pl.col('timestamp').dt.weekday().alias('day_of_week'),
pl.col('timestamp').dt.time().cast(pl.Duration).dt.total_minutes().alias('minute_of_day'),
pl.col('timestamp').dt.time().cast(pl.Duration).dt.total_hours().alias('hour_of_day'),
)
Y_data = data.select(pl.col('delivery_eta_minutes').log())
X_meta_data = data.select('timestamp', 'index_in_full')
X_bin_data = data.select(bin_cols)
X_cat_data = data.select(cat_cols)
X_num_data = data.select(num_cols)
# ======================================================================================
# >>> Default task split <<<
# ======================================================================================
# Validation and test are the last two weeks, train is the prior month
default_train_idx = data.with_row_index().filter(
pl.col('timestamp').lt(pl.datetime(2023, 12, 11, 0, 0, 0))
)['index'].to_numpy()
default_val_idx = data.with_row_index().filter(
pl.col('timestamp').ge(pl.datetime(2023, 12, 11, 0, 0, 0)) &
pl.col('timestamp').lt(pl.datetime(2023, 12, 18, 0, 0, 0))
)['index'].to_numpy()
default_test_idx = data.with_row_index().filter(
pl.col('timestamp').ge(pl.datetime(2023, 12, 18, 0, 0, 0)) &
pl.col('timestamp').lt(pl.datetime(2023, 12, 25, 0, 0, 0))
)['index'].to_numpy()
default_split = {
'train_idx': default_train_idx, 'val_idx': default_val_idx, 'test_idx': default_test_idx,
}
# ======================================================================================
# >>> Sliding window splits <<<
# ======================================================================================
# test/val size is roughly a week, and the train is roughly a month
test_val_size = 40_000
num_splits = 3
# Calculate the remaining train samples
train_size = data.shape[0] - (num_splits - 1) * test_val_size - 2 * test_val_size
sliding_window_splits = [
{
'train_idx': np.arange(train_size),
'val_idx': np.arange(train_size, train_size + test_val_size),
'test_idx': np.arange(train_size + test_val_size, train_size + 2 * test_val_size),
}
]
for _ in range(1, num_splits):
sliding_window_splits.append(
{
k: v + test_val_size for k,v in sliding_window_splits[-1].items()
}
)
for s in sliding_window_splits:
t_ = data[:s['train_idx'][-1]]['timestamp']
v_ = data[s['val_idx'][0]:s['val_idx'][-1]]['timestamp']
tes_ = data[s['test_idx'][0]:s['test_idx'][-1]]['timestamp']
print(
t_.max() - t_.min(),
v_.max() - v_.min(),
tes_.max() - tes_.min(),
)
# ======================================================================================
# >>> Random splits <<<
# ======================================================================================
# Random splits are created from the sliding_window splits by shuffling all the data in the
# respective window and respliting into same train validation and test dataset sizes
np.random.seed(0)
random_splits = []
for split in sliding_window_splits:
idxs = np.concatenate([v for v in split.values()])
np.random.shuffle(idxs)
random_splits.append({
'train_idx': idxs[:train_size],
'val_idx': idxs[train_size:train_size+test_val_size],
'test_idx': idxs[train_size+test_val_size:],
})
# Save dataset in the following format
# x_[bin|num|cat], y, split/default/ids_[train|val|test], split/sliding-window-N/ids-[train|val|test], split/random-N/ids-[train|val|test]
# sliding window splits are formed by a custom increment, with the same train/test/val sizes
# random splits match sliding window time-based splits in train/test/val sizes, but othervise are just random
data_parts = {n.rsplit('_', maxsplit=1)[0]: v for n, v in locals().items() if n.endswith('_data')}
all_splits = (
{'default': default_split} |
{f'sliding-window-{i}': split for i, split in enumerate(sliding_window_splits)} |
{f'random-{i}': split for i, split in enumerate(random_splits)}
)
logger.info('Writing data to disk')
save_dataset(
name='delivery-eta',
task_type='regression',
data=data_parts,
splits=all_splits,
)
if __name__ == "__main__":
main()