Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking.py full functionality #15

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 90 additions & 35 deletions acquisition/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,20 @@
import math

# --------------------------- Constants Definition ---------------------------------------------- #
VIDEO_FILE = "../ExampleSplit.avi"
VIEW_TYPE = "image" # Options: "binary", "image", "frame", "cleanthresh"
OUTPUT = "none" # Options: "tuple", "none"
SHOW_QUANT = "none" # Options: "height", "micromotion", "both"
POOR_TRACKING = False # Ignoring index of the particle if varying
VIDEO_FILE = "Sample Video/8-18Trial19.avi"
VIEW_TYPE = "image" # Options: "binary", "image"
POOR_TRACKING = False # Ignores index of the particle if varying (Only for single particle!)
FPS = 20
CHANGE_INTERVAL = 5
POINTS_TO_IMAGE = [] # Frames to capture
SAMPLE_FRAMES = 15
BIN_THRESH = 26
X_RANGE = (800, 1200)
X_RANGE = (900, 1300)
Y_RANGE = (547, 933)
BOTTOM_BAR = 100
TOP_BAR = 0
LEFT_BAR = 0
RIGHT_BAR = 0
INDEX_LIST = [] # Indices of desired tracking particles


# ------------------------------- Helper Functions ---------------------------------------------- #

Expand All @@ -30,30 +27,28 @@ def initialize_video(cap):
filling_kernel = np.ones((2, 2), np.uint8)
return total_frames, cleaning_kernel, filling_kernel


def setup_detector():
params = cv2.SimpleBlobDetector.Params()
# Filter by color
params.filterByColor = True
params.blobColor = 255
# Filter by area (pixels)
params.filterByArea = True
params.minArea = 5
params.maxArea = 300
# Filter by circularity
params.filterByCircularity = False
# Filter by convexity
params.filterByConvexity = False
# Filter by inertia ratio (To detect elongated shapes)
params.filterByInertia = False
params.minInertiaRatio = 0.01
params.maxInertiaRatio = 0.3
return cv2.SimpleBlobDetector_create(params)


def get_frame(cap, got_frame_num):
cap.set(cv2.CAP_PROP_POS_FRAMES, got_frame_num)
ret, got_frame = cap.read()
return ret, got_frame


def frame_dimensions(cap, frame_num):
ret, start_frame = get_frame(cap, frame_num)
start_frame_dim = start_frame.shape
Expand All @@ -62,22 +57,15 @@ def frame_dimensions(cap, frame_num):
y_start, y_end = (imageheight - Y_RANGE[1]), (imageheight - Y_RANGE[0])
return x_start, x_end, y_start, y_end

def gen_initial_frame(cap):

def gen_initial_frame(cap, display_frame):
total_frames, cleaning_kernel, filling_kernel = initialize_video(cap)
x_start, x_end, y_start, y_end = frame_dimensions(cap, 1)
ret, start_frame = get_frame(cap, 1)
cv2.imshow("Frame", start_frame[y_start:y_end, x_start:x_end])
if display_frame == True:
cv2.imshow("Frame", start_frame[y_start:y_end, x_start:x_end])
return x_start, x_end, y_start, y_end

def define_blockers(frame_num):
x_start, x_end, y_start, y_end = frame_dimensions(frame_num)
ylength = y_end - y_start
xlength = x_end - x_start
top_rect_pt1, top_rect_pt2 = (0, 0), (1616, TOP_BAR)
left_rect_pt1, left_rect_pt2 = (0, 0), (LEFT_BAR, 1240)
right_rect_pt1, right_rect_pt2 = (xlength - RIGHT_BAR, 0), (xlength, 1240)
bottom_rect_pt1, bottom_rect_pt2 = (0, ylength - BOTTOM_BAR), (1616, ylength)
return (top_rect_pt1, top_rect_pt2, left_rect_pt1, left_rect_pt2, right_rect_pt1, right_rect_pt2, bottom_rect_pt1, bottom_rect_pt2)

def post_processing(cap, frame, frame_num):
x_start, x_end, y_start, y_end = frame_dimensions(cap, frame_num)
Expand All @@ -102,15 +90,17 @@ def post_processing(cap, frame, frame_num):
cv2.rectangle(clean_thresh, right_rect_pt1, right_rect_pt2, rectangle_color, -1) # Right Erasure
cv2.rectangle(clean_thresh, bottom_rect_pt1, bottom_rect_pt2, rectangle_color, -1) # Bottom erasure
closing = cv2.morphologyEx(clean_thresh, cv2.MORPH_CLOSE, filling_kernel, iterations=2)
return roi_frame, closing, clean_thresh
return roi_frame, closing, clean_thresh, x_start, x_end, y_start, y_end


def setup_tracker():
tracking_objects = {}
track_id = 0
keypoints_prev_frame = []
return tracking_objects, track_id, keypoints_prev_frame

def locate_particles(roi_frame, closing, keypoints_prev_frame, frame_num, tracking_objects, track_id, y_end, y_start):

def locate_particles(roi_frame, closing, keypoints_prev_frame, frame_num, tracking_objects, track_id, y_end, y_start, INDEX_LIST):
detector = setup_detector()
keypoints = detector.detect(closing)
keypoints_cur_frame = []
Expand Down Expand Up @@ -151,16 +141,29 @@ def locate_particles(roi_frame, closing, keypoints_prev_frame, frame_num, tracki
for key in tracking_objects.keys():
if x <= tracking_objects[key][0][0] <= x + w and y <= tracking_objects[key][0][1] <= y + h:
tracking_objects[key].append(h)
if frame_num >= 2:
if frame_num >= 2 and POOR_TRACKING == False:
try:
if len(tracking_objects.keys()) > 0:
try:
x_position, y_position, height = int(tracking_objects[0][0][0]), int(tracking_objects[0][0][1]), int(tracking_objects[0][1])
x_position, y_position, height = int(tracking_objects[0][0][0]), int(
tracking_objects[0][0][1]), int(tracking_objects[0][1])
except KeyError:
pass
except KeyError or IndexError:
pass
return x_position, y_position, height
if frame_num >= 2 and POOR_TRACKING == True:
try:
for i in range(1000):
if len(tracking_objects.keys()) > 0 and tracking_objects[i][0][0] > 0:
x_position, y_position, height = int(tracking_objects[i][0][0]), int(
tracking_objects[i][0][1]), int(tracking_objects[i][1])
else:
pass
except KeyError or IndexError:
pass

return x_position, y_position, height, image_with_keypoints


def analyze_trial(datapoint):
x, y, h = [], [], []
Expand All @@ -171,39 +174,91 @@ def analyze_trial(datapoint):
xav, yav, hav = round(np.mean(x), 2), round(np.mean(y), 2), round(np.mean(h), 2)
return xav, yav, hav

def auto_run(cap):

def prepare_frame(cap):
total_frames, cleaning_kernel, filling_kernel = initialize_video(cap)
tracking_objects, track_id, keypoints_prev_frame = setup_tracker()
x_start, x_end, y_start, y_end = gen_initial_frame(cap)
x_start, x_end, y_start, y_end = gen_initial_frame(cap, False)
collection_frames = [int((FPS * CHANGE_INTERVAL * i) + (FPS * CHANGE_INTERVAL * 0.4)) for i in range(100)]
end_collection_frames = [cf + SAMPLE_FRAMES for cf in collection_frames]
trial, datapoint = [], []
collect_data = False
return total_frames, keypoints_prev_frame, tracking_objects, track_id, y_end, y_start, collection_frames, end_collection_frames, trial, collect_data, datapoint


def auto_run(cap):
total_frames, keypoints_prev_frame, tracking_objects, track_id, y_end, y_start, collection_frames, end_collection_frames, trial, collect_data, datapoint = prepare_frame(
cap)
for frame_num in range(total_frames):
ret, frame = get_frame(cap, frame_num)
roi_frame, closing, clean_thresh = post_processing(cap, frame, frame_num)
x, y, h = locate_particles(roi_frame, closing, keypoints_prev_frame, frame_num, tracking_objects, track_id, y_end, y_start)
roi_frame, closing, clean_thresh, x_start, x_end, y_start, y_end = post_processing(cap, frame, frame_num)
x, y, h, image_with_keypoints = locate_particles(roi_frame, closing, keypoints_prev_frame, frame_num, tracking_objects, track_id, y_end, y_start, INDEX_LIST)
if frame_num in collection_frames:
collect_data = True
if frame_num in end_collection_frames:
collect_data = False
xav, yav, hav = analyze_trial(datapoint)
trial.append([xav, yav, hav])
save_data(yav, hav, y_start, y_end, frame_num)
datapoint = []
if collect_data and x != "NaN":
datapoint.append([x, y, h])


def run_frame(cap, frame_num):
total_frames, keypoints_prev_frame, tracking_objects, track_id, y_end, y_start, collection_frames, end_collection_frames, trial, collect_data, datapoint = prepare_frame(
cap)
ret, frame = get_frame(cap, frame_num)
roi_frame, closing, clean_thresh, x_start, x_end, y_start, y_end = post_processing(cap, frame, frame_num)
x, y, h, image_with_keypoints = locate_particles(roi_frame, closing, keypoints_prev_frame, frame_num,
tracking_objects, track_id, y_end, y_start, INDEX_LIST)
if VIEW_TYPE == "image":
cropped_image_w_keypoints = image_with_keypoints[0:1240, 0:1616] # y_start:y_end, x_start:x_end
cv2.imshow("drawn_frame", cropped_image_w_keypoints)
if VIEW_TYPE == "binary":
cv2.imshow("binary", closing)
frame_num = frame_num + 1
return frame_num


def save_data(yav, hav, y_start, y_end, frame_num):
try:
if os.stat('Tuple.txt').st_size != 0 and frame_num < 150:
acknowledgement = ""
while acknowledgement != "continue":
acknowledgement = input(
'Tuple.txt already contains data. Type "continue" to add to the existing file, otherwise stop. ')
print("\ncontinuing...")
with open('Tuple.txt', 'a') as f:
yav_oriented = (y_end - y_start) - yav
f.write('[' + str(round(yav_oriented, 2)) + ', ' + str(round(hav, 2)) + ']\n')
print("Saved: " + str(yav_oriented) + ', ' + str(hav))
except FileNotFoundError:
with open('Tuple.txt', 'w') as f:
yav_oriented = (y_end - y_start) - yav
f.write('[' + str(round(yav_oriented, 2)) + ', ' + str(round(hav, 2)) + ']\n')
print("Saved: " + str(yav_oriented) + ', ' + str(hav))


# --------------------------- Main Functionality ---------------------------------------------- #
def main():
cap = cv2.VideoCapture(VIDEO_FILE)
x_start, x_end, y_start, y_end = gen_initial_frame(cap)
x_start, x_end, y_start, y_end = gen_initial_frame(cap, True)
key = cv2.waitKey()
if key == 27:
exit()
if key == 32:
auto_run(cap)

else:
cv2.destroyAllWindows()
frame_num = 0
for i in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
frame_num = run_frame(cap, frame_num)
key = cv2.waitKey()
if key == 27:
exit()
if key != 27:
pass

if __name__ == "__main__":
main()