-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathzip.py
241 lines (220 loc) · 9.8 KB
/
zip.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#!/usr/bin/env python3
"""
This is a python script to annotate the final output image and produce a final
JSON output file. It's called zip because we zip multiple data sources back
together again from the original data source.
Usage:
python zip.py /path/to/input/files \
/path/to/output \
/path/to/text/crops \
/path/to/ocr/bboxes \
/path/to/aggregate|bib
If no aggregate dir exists (i.e., NOT cropping on a person), then
pass in the bib directory for this value instead.
Author: Alex Cummaudo
Date: 23 Aug 2017
"""
import os
import sys
from glob import glob
import cv2
import json
import numpy as np
import re
def annotate_bbox(img, bib_bbox, color):
"""Annotates a bbox on an image given the bounding box of the bib region.
Args:
img (cv2 image): Image read by cv2.
bib_bbox (dict): Bounding box for the bib region.
color (list): Color to annotate.
Returns:
img (cv2 image): Annotated cv2 image.
"""
# Bib regions (draw first)
x1 = bib_bbox["x1"]
y1 = bib_bbox["y1"]
x2 = bib_bbox["x2"]
y2 = bib_bbox["y2"]
cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
return img
def annotate_label(img, bib_bbox, label, color, fontcolor = (0,0,0)):
"""Annotates an bbox's label given the bounding box of the bib region.
Args:
img (cv2 image): Image read by cv2.
bib_bbox (dict): Bounding box for the bib region.
label (string): The string to label.
color (list): Color to annotate.
fontcolor (list): Color of font to annotate. Default black.
Returns:
img (cv2 image): Annotated cv2 image.
"""
font = cv2.FONT_HERSHEY_PLAIN
# labels for accuracy (overlay)
x1 = bib_bbox["x1"]
y1 = bib_bbox["y1"]
x2 = bib_bbox["x2"]
y2 = bib_bbox["y2"]
fnt_sz, baseline = cv2.getTextSize(label, font, 1, 1)
acc_rect_pt1 = (x1, y1 + baseline - 5)
acc_rect_pt2 = (x1 + fnt_sz[0] + 5, y1 - fnt_sz[1] - 5)
cv2.rectangle(img, acc_rect_pt1, acc_rect_pt2, color, -1)
cv2.putText(img, label, (x1,y1), font, 1, fontcolor)
return img
def read_json(json_filename):
"""Reads the JSON file as a dictionary.
Args:
json_filename (string): The JSON file to read.
Returns:
dict: The JSON data, parsed as a dictionary.
"""
with open(json_filename, 'r') as json_fp:
json_data = json.load(json_fp)
return json_data
def main():
assert len(sys.argv) - 1 >= 5, "Must provide 5 arguments (in_dir, out_dir, text_dir, ocr_dir, aggregate_dir)"
in_dir = sys.argv[1]
assert in_dir != None, "Missing input directory (argv[1])"
out_dir = sys.argv[2]
assert out_dir != None, "Missing output directory (argv[2])"
text_dir = sys.argv[3]
assert text_dir != None, "Missing text directory (argv[3])"
ocr_dir = sys.argv[4]
assert ocr_dir != None, "Missing ocr directory (argv[4])"
aggregate_dir = sys.argv[5]
assert aggregate_dir != None, "Missing aggregate directory (argv[5])"
if not os.path.exists(out_dir):
os.makedirs(out_dir)
all_json = {}
all_jpgs = {}
for file in glob("%s/*.jpg" % in_dir):
image_id = os.path.splitext(os.path.basename(file))[0]
# Explicitly indicate nothing for output file UNLESS overriden at end...
all_json[image_id] = None
img = cv2.imread(file)
all_jpgs[image_id] = img
aggregate_json_file = "%s/%s.json" % (aggregate_dir, image_id)
if not os.path.exists(aggregate_json_file):
print("No aggregate json file for '%s'. Skipping..." % image_id)
continue
aggregate_json = read_json(aggregate_json_file)
aggregate_json["text"] = []
aggregate_json["ocr"] = []
for text_crop_file in glob("%s/%s*.json" % (text_dir, image_id)):
text_crop_id = os.path.splitext(os.path.basename(text_crop_file))[0]
# This maps the text crop back to the respective bib...
print("Looking for %s_crop_bib_x.json in %s " % (image_id, text_crop_file))
matches = re.search(re.escape(image_id) + "_crop_bib_(\d+).json", text_crop_file)
if matches is None:
print("No matches found for %s_crop_bib_X.json in %s " % (image_id, text_crop_file))
continue
bib_idx = int(matches.group(1))
bib_for_text_crop = aggregate_json["bib"]["regions"][bib_idx]
bib_for_text_crop["crop_idx"] = bib_idx
bib_for_text_crop["is_text_detected"] = True
# Attempt to read string for this text_crop json...
ocr_bbox_file = "%s/%s_crop_text.json" % (ocr_dir, text_crop_id)
if not os.path.exists(ocr_bbox_file):
print("No string file for '%s'. Skipping..." % text_crop_id)
bib_for_text_crop["is_text_detected"] = False
continue
# Load the text crops and push them down by the correct origin
txt_crop_json = read_json(text_crop_file)
bib_origin_x1 = bib_for_text_crop["x1"]
bib_origin_y1 = bib_for_text_crop["y1"]
txt_crop_json["text"]["regions"][0]["x1"] += bib_origin_x1
txt_crop_json["text"]["regions"][0]["y1"] += bib_origin_y1
txt_crop_json["text"]["regions"][0]["x2"] += bib_origin_x1
txt_crop_json["text"]["regions"][0]["y2"] += bib_origin_y1
# Do the same for each individual character
ocr_bbox_json = read_json(ocr_bbox_file)
for ocr in ocr_bbox_json["ocr"]:
for region in ocr["regions"]:
region["x1"] += txt_crop_json["text"]["regions"][0]["x1"]
region["y1"] += txt_crop_json["text"]["regions"][0]["y1"]
region["x2"] += txt_crop_json["text"]["regions"][0]["x1"]
region["y2"] += txt_crop_json["text"]["regions"][0]["y1"]
# Now annotate the image and JSON
all_strings = [ocr["string"] for ocr in ocr_bbox_json["ocr"] if "string" in ocr]
aggregate_json["bib"]["regions"][bib_idx]["rbns"] = all_strings
aggregate_json["text"].append(txt_crop_json["text"])
aggregate_json["ocr"] = aggregate_json["ocr"] + ocr_bbox_json["ocr"]
# Annotation
lime = (0,255,0)
cyan = (255,255,0)
white = (255,255,255)
black = (0,0,0)
# Annotate each person if exists
if 'person' in aggregate_json:
for r in [r for r in aggregate_json['person']['regions']]:
s = ("Person [c:%0.2f]" % r['accuracy'])
img = annotate_bbox(img, r, cyan)
img = annotate_label(img, r, s, cyan)
# Annotate each bib region
for r in [r for r in aggregate_json['bib']['regions']]:
rbns = [] if 'rbns' not in r else r['rbns']
if len(rbns) == 0:
continue
rbns = ','.join(rbns)
s = ("Bib [#:%s][c:%0.2f]" % (rbns, r['accuracy']))
img = annotate_bbox(img, r, lime)
img = annotate_label(img, r, s, lime)
# Annotate each text & char region
all_txt_regions = np.array([txt["regions"] for txt in aggregate_json["text"]]).flatten()
all_ocr_regions = np.array([ocr["regions"] for ocr in aggregate_json["ocr"]])
if len(all_ocr_regions) > 1:
all_ocr_regions = all_ocr_regions.flatten()
for r in all_ocr_regions:
if type(r) is dict:
r = [r]
for r in r:
s = ("%s" % r['char'])
img = annotate_bbox(img, r, black)
img = annotate_label(img, r, s, black, white)
for r in all_txt_regions:
s = ("Text [c:%0.2f]" % r['accuracy'])
bbox = {
"x1": r["x1"] - 18,
"x2": r["x2"] + 18,
"y1": r["y1"] - 18,
"y2": r["y2"] + 18,
"accuracy": r["accuracy"]
}
img = annotate_bbox(img, bbox, white)
img = annotate_label(img, bbox, s, white)
# Statistics
all_txt_runtime = np.array([txt["elapsed_seconds"] for txt in aggregate_json["text"]]).flatten().sum()
all_ocr_runtime = np.array([ocr["elapsed_seconds"] for ocr in aggregate_json["ocr"]]).flatten().sum()
aggregate_json["stats"] = {
"num_regions": {
"bib": len(aggregate_json["bib"]["regions"]),
"text": len(all_txt_regions),
"ocr": len(all_ocr_regions)
},
"runtime": {
"bib": aggregate_json["bib"]["elapsed_seconds"],
"text": all_txt_runtime,
"ocr": all_ocr_runtime
}
}
# Add person stats if exists
if "person" in aggregate_json:
aggregate_json["stats"]["runtime"]["person"] = aggregate_json["person"]["elapsed_seconds"]
aggregate_json["stats"]["num_regions"]["person"] = len(aggregate_json["person"]["regions"])
# Now finally spit everything out!
if len(aggregate_json["text"]) == 0 or len(aggregate_json["ocr"]) == 0:
print("No annotations to be made for '%s' - no text detections. Skipping..." % image_id)
continue
# Indicate all json output should now be the updated aggregate
all_json[image_id] = aggregate_json
# Output all jpegs
for (image_id, img) in all_jpgs.items():
out_jpeg_file = ("%s/%s.jpg" % (out_dir, image_id))
print("Writing annotated JPEG '%s' to '%s'" % (image_id, out_jpeg_file))
cv2.imwrite(out_jpeg_file, img)
# Writeout global stats file
out_json_file = ("%s/results.json" % out_dir)
print("Writing results JSON to '%s'" % out_json_file)
with open(out_json_file, 'w') as f:
json.dump(all_json, f)
if __name__ == '__main__':
main()