-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetrabsPose3D_TF.py
325 lines (260 loc) · 12.1 KB
/
metrabsPose3D_TF.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
import argparse
import os
import re
import sys
import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_hub as hub
import toml
from tqdm import tqdm
from trc import TRCData
parser = argparse.ArgumentParser(description='Metrabs 2D Pose Estimation for iDrink using Tensorflow')
parser.add_argument('--identifier', metavar='id', type=str, help='Trial Identifier')
parser.add_argument('--video_file', metavar='dvi', type=str,
help='Path to folder containing videos for pose estimation')
parser.add_argument('--calib_file', metavar='c', type=str, help='Path to calibration file')
parser.add_argument('--dir_out_video', metavar='dvo', type=str, help='Path to folder to save output videos')
parser.add_argument('--dir_out_trc', metavar='dtrc', type=str, help='Path to folder to save output trc files')
parser.add_argument('--skeleton', metavar='skel', type=str, default='coco_19',
help='Skeleton to use for pose estimation, Default: coco_19')
parser.add_argument('--model_path', metavar='m', type=str, help='Path to the model to use for pose estimation')
parser.add_argument('--verbose', metavar='v', type=int, default=1, help='Verbose Mode (0, 1, 2), Default: 1')
parser.add_argument('--DEBUG', metavar='d', type=bool, default=False, help='Debug Mode, Default: False')
def filter_df(df_unfiltered, fs, verbose, normcutoff=False):
"""
Use a butterworth-filter on the 3D-keypoints in the DF and return the filtered DF.
:param df: DataFrame with 3D coordinates
:param fs: Sampling frequency
:return: DataFrame with filtered 3D coordinates
"""
from scipy.signal import butter, sosfiltfilt
df_filtered = pd.DataFrame(columns=df_unfiltered.columns)
cutoff = 5
order = 2 # Desired order 5. Because of filtfilt, half of that needs to be given. --> filtfilt doubles the order
nyquist = 0.5 * fs
if cutoff >= nyquist:
if verbose >= 1:
print(f"Warning: Cutoff frequency {cutoff} is higher than Nyquist frequency {nyquist}.")
print("Filtering with Nyquist frequency.")
cutoff = int(nyquist - 1)
if normcutoff:
cutoff = cutoff / nyquist
else:
sos = butter(order, cutoff, btype="low", analog=False, output="sos", fs=fs)
if verbose >= 2:
print(f"Filtering 3D keypoints:\n"
f"Filter: Butterworth\n"
f"Order: {order}\n"
f"Sampling frequency: {fs} Hz\n"
f"cutoff frequency of {cutoff} Hz")
if verbose >= 1:
progress = tqdm(total=len(df_unfiltered.columns), desc="Filtering 3D keypoints", position=0, leave=True)
for column in df_unfiltered.columns:
data = np.array(df_unfiltered[column].tolist())
df_filtered[column] = sosfiltfilt(sos, data, axis=0).tolist()
if verbose >= 1:
progress.update(1)
if verbose >= 1:
progress.close()
return df_filtered
def df_to_trc(df, trc_file, identifier, fps, n_frames, n_markers, verbose=1):
"""
Converts the DataFrame to a .trc file according to nomenclature used by Pose2Sim.
Writes to trc file to given path.
:param df: DataFrame with 3D coordinates
"""
trc = TRCData()
if verbose >= 1:
print(f"Writing .trc file {os.path.basename(trc_file)}")
trc['PathFileType'] = '4'
trc['DataFormat'] = '(X/Y/Z)'
trc['FileName'] = f'{identifier}.trc'
trc['DataRate'] = fps
trc['CameraRate'] = fps
trc['NumFrames'] = n_frames
trc['NumMarkers'] = n_markers
trc['Units'] = 'm'
trc['OrigDataRate'] = fps
trc['OrigDataStartFrame'] = 0
trc['OrigNumFrames'] = n_frames
trc['Frame#'] = [i for i in range(n_frames)]
trc['Time'] = [round(i / fps, 3) for i in range(n_frames)]
trc['Markers'] = df.columns.tolist()
for column in df.columns:
trc[column] = df[column].tolist()
if verbose >= 2:
print(f'columns added to {os.path.basename(trc_file)}')
for column in tqdm(df.columns, desc="Writing .trc file", position=0, leave=True):
trc[column] = df[column].tolist()
for i in range(n_frames):
trc[i] = [trc['Time'][i], df.iloc[i, :].tolist()]
if verbose >= 2:
print(f'Timestamps added to {os.path.basename(trc_file)}')
trc.save(trc_file)
if verbose >= 1:
print(f"Saved .trc file {os.path.basename(trc_file)}")
def get_column_names(joint_names, verbose=1):
"""
Uses the list of joint names to create a list of column names for the DataFrame
e.g. neck --> neck_x, neck_y, neck_z
Input: joint_names: List of joint names
Output: List of column names for x, y, and z axis
"""
columns = []
"""
# Old version might be used if x, y, z coordinates need to be split up in DataFrame
for joint in joint_names:
for axis in ['x', 'y', 'z']:
columns.append(f"{joint}_{axis}")"""
for joint in joint_names:
columns.append(joint)
if verbose >= 2:
print(f'Columns created: {columns}')
return columns
def add_to_dataframe(df, pose_result_3d):
"""
Add 3D Keypoints to DataFrame
Returns DataFrame with added 3D keypoints
"""
temp = []
for i in range(pose_result_3d.shape[1]):
temp.append([x / 1000 for x in pose_result_3d[0][i].tolist()])
df.loc[len(df)] = temp
return df
def metrabs_pose_estimation_3d(video_file, calib_file, dir_out_video, dir_out_trc, model_path, identifier,
skeleton='coco_19', verbose=1, DEBUG=False):
"""
3D Pose estimaiton using Metrabs
The coordinates are saved into .trc files according to the Pose2Sim nomenclature.
This script uses the tensorflow version of Metrabs.
:param in_video:
:param out_video:
:param out_json:
:param skeleton:
:param writevideofiles:
:param filter_2d:
:param DEBUG:
:return:
"""
try:
if verbose >= 1:
print("loading HPE model")
model = hub.load(model_path)
except:
tmp = os.path.join(os.getcwd(), 'metrabs_models')
# tmp = input("Loading model failed. The model will be donwloaded. Please give a path to save the model.") ##If we want to give the choice of the path to the user
if not os.path.exists(tmp):
os.makedirs(tmp)
# Add path to the environment variable --> This is necessary to save the model in the given path
os.environ['TFHUB_CACHE_DIR'] = tmp
model = hub.load(
'https://bit.ly/metrabs_l') # To load the model from the internet and save it in a given tmp folder
# Check if the directory exists, if not create it
if not os.path.exists(dir_out_video):
os.makedirs(dir_out_video)
calib = toml.load(calib_file)
##################################################
############# OPENING THE VIDEO ################
# For a video file
cap = cv2.VideoCapture(video_file)
# Check if file is opened correctly
if not cap.isOpened():
print("Could not open file")
exit()
# get intrinsics from calib file
cam = re.search(r"cam\d*", os.path.basename(video_file)).group()
intrinsic_matrix = None
distortions = None
for key in calib.keys():
if calib.get(key).get("name") == cam:
intrinsic_matrix = tf.constant(calib.get(key).get("matrix"), dtype=tf.float32)
distortions = tf.constant(calib.get(key).get("distortions"), dtype=tf.float32)
joint_names = model.per_skeleton_joint_names[skeleton].numpy().astype(str)
joint_edges = model.per_skeleton_joint_edges[skeleton].numpy()
# Initializing variables for the loop
frame_idx = 0
# Prepare DataFrame
df = pd.DataFrame(columns=get_column_names(joint_names))
n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
n_markers = len(joint_names)
if verbose >= 2:
print(f"Number of frames: {n_frames}")
print(f"FPS: {fps}")
print(f"Number of markers: {n_markers}")
if verbose >= 1:
progress = tqdm(total=n_frames, desc=f"Processing {os.path.basename(video_file)}", position=0, leave=True)
while True:
# Read frame from the webcam
ret, frame = cap.read()
# If frame is read correctly ret is True
if not ret:
break
# Stop setting for development
if DEBUG:
if frame_idx == 30:
break
# convert Image t0 jpeg
_, frame = cv2.imencode('.jpg', frame)
frame = frame.tobytes()
# covnert jpeg to tensor and run prediction
frame = tf.image.decode_jpeg(frame, channels=3)
##############################################
################## DETECTION #################
# Perform inference on the frame
pred = model.detect_poses(frame, intrinsic_matrix=intrinsic_matrix, skeleton=skeleton)
# Save detection's parameters
bboxes = pred['boxes']
pose_result_3d = pred['poses3d'].numpy()
################## Add to DataFrame #################
# Add coordinates to Dataframe
df = add_to_dataframe(df, pose_result_3d)
frame_idx += 1
if verbose >= 1:
progress.update(1)
# Release the VideoCapture object and close progressbar
cap.release()
if verbose >= 1:
progress.close()
if verbose >= 2:
print(f'Call filtering function')
df_filt = filter_df(df, fps, verbose)
if not os.path.exists(dir_out_trc):
os.makedirs(dir_out_trc)
trc_file_filt = os.path.join(dir_out_trc,
f"{os.path.basename(video_file).split('.mp4')[0]}_0-{frame_idx}_filt_iDrinkbutter.trc")
trc_file_unfilt = os.path.join(dir_out_trc,
f"{os.path.basename(video_file).split('.mp4')[0]}_0-{frame_idx}_unfilt_iDrink.trc")
df_to_trc(df_filt, trc_file_filt, identifier, fps, n_frames, n_markers)
df_to_trc(df, trc_file_unfilt, identifier, fps, n_frames, n_markers)
if verbose >= 2:
print(f'3D Pose Estimation done and .trc files saved to {dir_out_trc}')
if __name__ == '__main__':
args = parser.parse_args()
if sys.gettrace() is not None or args.DEBUG:
print("Debug Mode is activated\n"
"Starting debugging script.")
if os.name == 'posix': # if running on WSL
args.identifier = "S20240501-115510_P01_T01"
args.model_path = hub.load(
"/mnt/c/iDrink/metrabs_models/tensorflow/metrabs_eff2l_y4_384px_800k_28ds/d8503163f1198d9d4ee97bfd9c7f316ad23f3d90")
args.video_file = "/mnt/c/iDrink/Session Data/S20240501-115510/S20240501-115510_P07/S20240501-115510_P07_T44/videos/recordings/cam1_trial_44_R_affected.mp4"
args.dir_out_video = "/mnt/c/iDrink/Session Data/S20240501-115510/S20240501-115510_P07/S20240501-115510_P07_T44/videos/pose"
args.dir_out_trc = "/mnt/c/iDrink/Session Data/S20240501-115510/S20240501-115510_P07/S20240501-115510_P07_T44/pose-3d"
args.calib_file = "/mnt/c/iDrink/Session Data/S20240501-115510/S20240501-115510_Calibration/Calib_S20240501-115510.toml"
args.skeleton = 'coco_19'
args.verbose = 1
else:
args.identifier = "S20240501-115510_P01_T01"
args.model_path = hub.load(
r"C:\iDrink\metrabs_models\tensorflow\metrabs_eff2l_y4_384px_800k_28ds\d8503163f1198d9d4ee97bfd9c7f316ad23f3d90")
args.video_file = r"C:\iDrink\Session Data\S20240501-115510\S20240501-115510_P07\S20240501-115510_P07_T44\videos\recordings\cam1_trial_44_R_affected.mp4"
args.dir_out_video = r"C:\iDrink\Session Data\S20240501-115510\S20240501-115510_P07\S20240501-115510_P07_T44\videos\pose"
args.dir_out_trc = r"C:\iDrink\Session Data\S20240501-115510\S20240501-115510_P07\S20240501-115510_P07_T44\pose-3d"
args.calib_file = r"C:\iDrink\Session Data\S20240501-115510\S20240501-115510_Calibration\Calib_S20240501-115510.toml"
args.skeleton = 'coco_19'
args.verbose = 1
metrabs_pose_estimation_3d(args.video_file, args.calib_file, args.dir_out_video, args.dir_out_trc, args.model_path,
args.identifier, args.skeleton, args.verbose, args.DEBUG)