-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathpruning_clustering_experiments.py
224 lines (186 loc) · 7.29 KB
/
pruning_clustering_experiments.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import argparse
import tensorflow as tf
import tensorflow_model_optimization as tfmot
from pathlib import Path
from typing import Optional
from dl_in_iot_course.misc.pet_dataset import PetDataset
from dl_in_iot_course.misc.modeltester import ModelTester
class TFMOTOptimizedModel(ModelTester):
def compress_and_fine_tune(self, originalmodel: Path):
"""
Runs selected compression algorithm and fine-tunes the model.
This method wraps the usage of TensorFlow Model Optimization Toolkit
optimization, fine-tunes the model, convert it to TensorFlow Lite and
saves the model to self.modelpath file.
All structures required for training are delivered to self object by
the optimize_model method.
Parameters
----------
originalmodel : Path
Path to the original model
"""
pass
def optimize_model(self, originalmodel: Path):
def preprocess_input(path, onehot):
data = tf.io.read_file(path)
img = tf.io.decode_jpeg(data, channels=3)
img = tf.image.resize(img, [224, 224])
img = tf.image.convert_image_dtype(img, dtype=tf.float32)
img /= 255.0
img = tf.image.random_brightness(img, 0.1)
img = tf.image.random_contrast(img, 0.7, 1.0)
img = tf.image.random_flip_left_right(img)
img = (img - self.dataset.mean) / self.dataset.std
return img, tf.convert_to_tensor(onehot)
self.batch_size = 8
self.learning_rate = 0.00001
self.epochs = 1
Xt, Xv, Yt, Yv = self.dataset.split_dataset(0.4)
Yt = list(self.dataset.onehotvectors[Yt])
Yv = list(self.dataset.onehotvectors[Yv])
# TensorFlow Dataset object for training
self.traindataset = (
tf.data.Dataset.from_tensor_slices((Xt, Yt))
.map(preprocess_input, num_parallel_calls=tf.data.experimental.AUTOTUNE)
.batch(self.batch_size)
)
# TensorFlow Dataset object for validation
self.validdataset = (
tf.data.Dataset.from_tensor_slices((Xv, Yv))
.map(preprocess_input, num_parallel_calls=tf.data.experimental.AUTOTUNE)
.batch(self.batch_size)
)
# loss function
self.loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
# Adam optimizer
self.optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)
# Categorical accuracy metric
self.metrics = [tf.keras.metrics.CategoricalAccuracy()]
# method to implement
self.compress_and_fine_tune(originalmodel)
# If l02_quantization tasks are finished, just use the FP32Model class as
# a parent in order not to implement below methods
# TODO def prepare_model(self):
# TODO def preprocess_input(self, X):
# TODO def run_inference(self):
# TODO def postprocess_outputs(self, Y):
class ClusteredModel(TFMOTOptimizedModel):
"""
This tester tests the performance of the clustered model.
"""
def __init__(
self,
dataset: PetDataset,
modelpath: Path,
originalmodel: Optional[Path] = None,
logdir: Optional[Path] = None,
num_clusters: int = 16,
):
"""
Initializer for ClusteredModel.
ClusteredModel runs model clustering with defined number of clusters
per layer.
Parameters
----------
dataset : PetDataset
A dataset object to test on
modelpath : Path
Path to the model to test
originalmodel : Path
Path to the model to optimize before testing.
Optimized model will be saved in modelpath
logdir : Path
Path to the training/optimization logs
num_clusters : int
Number of clusters per layer
"""
self.num_clusters = num_clusters
super().__init__(dataset, modelpath, originalmodel, logdir)
def compress_and_fine_tune(self, originalmodel):
# TODO implement the model clustering and fine-tuning
pass
class PrunedModel(TFMOTOptimizedModel):
"""
This tester tests the performance of the pruned model.
"""
def __init__(
self,
dataset: PetDataset,
modelpath: Path,
originalmodel: Optional[Path] = None,
logdir: Optional[Path] = None,
target_sparsity: float = 0.3,
):
"""
Initializer for PrunedModel.
PrunedModel performs level pruning to a specified target sparsity.
Parameters
----------
dataset : PetDataset
A dataset object to test on
modelpath : Path
Path to the model to test
originalmodel : Path
Path to the model to optimize before testing.
Optimized model will be saved in modelpath
logdir : Path
Path to the training/optimization logs
target_sparsity : float
The target sparsity of the model
"""
self.target_sparsity = target_sparsity
super().__init__(dataset, modelpath, originalmodel, logdir)
def compress_and_fine_tune(self, originalmodel):
self.epochs = 4
self.sched = tfmot.sparsity.keras.ConstantSparsity(
self.target_sparsity, begin_step=0, end_step=1, frequency=1
)
# TODO implement model pruning and fine-tuning
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--model-path", help="Path to the model file", type=Path)
parser.add_argument("--dataset-root", help="Path to the dataset file", type=Path)
parser.add_argument(
"--download-dataset",
help="Download the dataset before training",
action="store_true",
)
parser.add_argument("--results-path", help="Path to the results", type=Path)
parser.add_argument(
"--test-dataset-fraction",
help="What fraction of the test dataset should be used for evaluation",
type=float,
default=1.0,
)
args = parser.parse_args()
args.results_path.mkdir(parents=True, exist_ok=True)
dataset = PetDataset(args.dataset_root, args.download_dataset)
# TODO uncomment once the classes are implemented
# for num_clusters in [2, 4, 8, 16, 32]:
# # test of the model executed with FP32 precision
# tester = ClusteredModel(
# dataset,
# args.results_path / f'{args.model_path.stem}.clustered-{num_clusters}.tflite', # noqa: E501
# args.model_path,
# args.results_path / f'clusterlog-{num_clusters}',
# num_clusters
# )
# tester.test_inference(
# args.results_path,
# f'clustered-{num_clusters}-fp32',
# args.test_dataset_fraction
# )
# for sparsity in [0.2, 0.4, 0.5]:
# # test of the model executed with FP32 precision
# tester = PrunedModel(
# dataset,
# args.results_path / f'{args.model_path.stem}.pruned-{sparsity}.tflite', # noqa: E501
# args.model_path,
# args.results_path / f'prunelog-{sparsity}',
# sparsity
# )
# tester.test_inference(
# args.results_path,
# f'pruned-{sparsity}-fp32',
# args.test_dataset_fraction
# )