diff --git a/README.md b/README.md index 42d5891..1ac4977 100644 --- a/README.md +++ b/README.md @@ -28,9 +28,24 @@ sh launch.sh ## ROS Interface Input / output topics and format +Input topics : +- `[namespace]/rgb` : `Ìmage` message (`bgr8` format) +- `[namespace]/depth` : `Image` message (`mono16` format) +- `[namespace]/pcl` : `PointCloud2` message (`.data` buffer is expected to contain 3 or 6 fields of `float32` such that it can be reshaped to a numpy array of size `[image_height, image_width, n_fields]` that offers pixel correspondance with the `rgb` and `depth` image) + +Output topics : +- `[namespace]/human` : `TransformStamped` message with detected human position + +Output tf : +- `TransformStamped` with child frame id `[namespace]/human` + ## Options Flags to run with options +See `python rgbd_detect_3d_dir.py -h` + +Use `export VERBOSE=0` to set minimal verbosity (warnings, debug, errors and success). Level 1 adds info and 2 adds timing info. + ## Acknowledgments MMDet, MMPose, 6DRepNet... diff --git a/rgbd_detect_3d_dir.py b/rgbd_detect_3d_dir.py index bbbc26e..ae1039a 100644 --- a/rgbd_detect_3d_dir.py +++ b/rgbd_detect_3d_dir.py @@ -14,7 +14,6 @@ try: from mmdet.apis import inference_detector, init_detector - has_mmdet = True except (ImportError, ModuleNotFoundError): has_mmdet = False @@ -37,38 +36,40 @@ import warnings import numpy as np from PyKDL import Rotation -import copy -# import imageio from PIL import Image as PILImage +# custom utils import from utils import * +# torch import import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import DataLoader -try: - has_mb = True - # motion bert import - from lib.utils.tools import * - from lib.utils.learning import * - from lib.utils.utils_data import flip_data - from lib.data.dataset_wild import WildDetDataset - from lib.utils.vismo import render_and_save -except: - has_mb = False - prWarning("No MotionBERT import, fail") +# # MotionBERT +# try: +# has_mb = True +# # motion bert import +# from lib.utils.tools import * +# from lib.utils.learning import * +# from lib.utils.utils_data import flip_data +# from lib.data.dataset_wild import WildDetDataset +# from lib.utils.vismo import render_and_save +# except: +# has_mb = False +# prWarning("No MotionBERT import, fail") -try: - # gafa import (optional) - has_gafa = True - from gafa_utils import body_transform, head_transform, head_transform_rest, normalize_img, body_transform_from_bb, normalize_img_torch, head_transform_face - from gafa.models.gazenet import GazeNet -except: - has_gafa = False - prWarning("No GAFA import, fail") - from gafa_utils import body_transform, head_transform, head_transform_rest, normalize_img, body_transform_from_bb, normalize_img_torch, head_transform_face +# try: +# # gafa import (optional) +# has_gafa = True +# from transform_utils import body_transform, head_transform, head_transform_rest, normalize_img, body_transform_from_bb, normalize_img_torch, head_transform_face +# from gafa.models.gazenet import GazeNet +# except: +# has_gafa = False +# prWarning("No GAFA import, fail") + +from transform_utils import body_transform, head_transform, head_transform_rest, normalize_img, body_transform_from_bb, normalize_img_torch, head_transform_face # 6D Rep import try: @@ -81,14 +82,14 @@ # gaze estimation simple models import -try: - has_gaze_est = True - from gaze_estimation.models import resnet18, mobilenet_v2, mobileone_s0 - from gaze_estimation.utils import pre_process - from gaze_estimation.models import SCRFD -except: - has_gaze_est = False - prWarning("No GazeEst import, fail") +# try: +# has_gaze_est = True +# from gaze_estimation.models import resnet18, mobilenet_v2, mobileone_s0 +# from gaze_estimation.utils import pre_process +# from gaze_estimation.models import SCRFD +# except: +# has_gaze_est = False +# prWarning("No GazeEst import, fail") # remove numpy scientific notation @@ -110,61 +111,61 @@ def __init__(self, args): args.pose_config, args.pose_checkpoint, device=args.device.lower() ) - # if enabled, init MotionBERT - if self.args.use_mb: - # init 3d MotionBERT model - prInfo('Initialiazing 3D Pose Lifter {}'.format(args.mb_checkpoint)) - mb_3d_args = get_config(args.mb_3d_config) - self.motionbert_3d_model = load_backbone(mb_3d_args) - if torch.cuda.is_available(): - self.motionbert_3d_model = nn.DataParallel(self.motionbert_3d_model) - self.motionbert_3d_model = self.motionbert_3d_model.cuda() - else: - prError("Expect cuda to be available but is_available returned false") - exit(0) - - prInfo('Loading checkpoint {}'.format(args.mb_checkpoint)) - mb_checkpoint = torch.load(args.mb_checkpoint, map_location=lambda storage, loc: storage) - self.motionbert_3d_model.load_state_dict(mb_checkpoint['model_pos'], strict=True) - self.motionbert_3d_model.eval() - prInfo('Loaded motionbert_3d_model') - # no need for the whole WildDetDataset stuff, just manually make the input trajectories for the tracks - - # if enabled, init GAFA - if self.args.use_gafa: - self.gafa_model = GazeNet(n_frames=self.args.gafa_n_frames) - self.gafa_model.load_state_dict(torch.load( - self.args.gafa_checkpoint)) #, map_location=torch.device("cpu"))['state_dict']) - - self.gafa_model.cuda() - self.gafa_model.eval() - - prInfo( - "Loaded GAFA model from {}".format( - self.args.gafa_checkpoint)) + # # if enabled, init MotionBERT + # if self.args.use_mb: + # # init 3d MotionBERT model + # prInfo('Initialiazing 3D Pose Lifter {}'.format(args.mb_checkpoint)) + # mb_3d_args = get_config(args.mb_3d_config) + # self.motionbert_3d_model = load_backbone(mb_3d_args) + # if torch.cuda.is_available(): + # self.motionbert_3d_model = nn.DataParallel(self.motionbert_3d_model) + # self.motionbert_3d_model = self.motionbert_3d_model.cuda() + # else: + # prError("Expect cuda to be available but is_available returned false") + # exit(0) + + # prInfo('Loading checkpoint {}'.format(args.mb_checkpoint)) + # mb_checkpoint = torch.load(args.mb_checkpoint, map_location=lambda storage, loc: storage) + # self.motionbert_3d_model.load_state_dict(mb_checkpoint['model_pos'], strict=True) + # self.motionbert_3d_model.eval() + # prInfo('Loaded motionbert_3d_model') + # # no need for the whole WildDetDataset stuff, just manually make the input trajectories for the tracks + + # # if enabled, init GAFA + # if self.args.use_gafa: + # self.gafa_model = GazeNet(n_frames=self.args.gafa_n_frames) + # self.gafa_model.load_state_dict(torch.load( + # self.args.gafa_checkpoint)) #, map_location=torch.device("cpu"))['state_dict']) + + # self.gafa_model.cuda() + # self.gafa_model.eval() + + # prInfo( + # "Loaded GAFA model from {}".format( + # self.args.gafa_checkpoint)) - # if enabled, init gaze resnet - if self.args.use_gaze_resnet: - self.face_detector = SCRFD(model_path="./gaze_estimation/weights/det_10g.onnx") - self.gaze_estimation_model = resnet18(pretrained = False, num_classes = 90) - state_dict = torch.load("./gaze_estimation/weights/resnet18.pt", map_location=args.device.lower()) - self.gaze_estimation_model.load_state_dict(state_dict) - self.gaze_estimation_model.to(args.device.lower()) - self.gaze_estimation_model.eval() - prInfo('Loaded ResNet18 for gaze estimation') + # # if enabled, init gaze resnet + # if self.args.use_gaze_resnet: + # self.face_detector = SCRFD(model_path="./gaze_estimation/weights/det_10g.onnx") + # self.gaze_estimation_model = resnet18(pretrained = False, num_classes = 90) + # state_dict = torch.load("./gaze_estimation/weights/resnet18.pt", map_location=args.device.lower()) + # self.gaze_estimation_model.load_state_dict(state_dict) + # self.gaze_estimation_model.to(args.device.lower()) + # self.gaze_estimation_model.eval() + # prInfo('Loaded ResNet18 for gaze estimation') # if enabled, init 6DRep if self.args.use_six_d_rep: - self.sixdrep_model = torch.load(f='./sixdrep/weights/best.pt', map_location='cuda') + self.sixdrep_model = torch.load(f='./sixdrep/weights/best.pt', map_location='cuda') # TODO: Change path with arg self.sixdrep_model = self.sixdrep_model['model'].float().fuse() - self.sixdrep_detector = FaceDetector('./sixdrep/weights/detection.onnx') + self.sixdrep_detector = FaceDetector('./sixdrep/weights/detection.onnx') # TODO: Load only if use face detector, change path with arg self.sixdrep_model.half() self.sixdrep_model.eval() - # dataset for detection and pose + # dataset object for detection and pose from MMPose self.dataset = self.pose_model.cfg.data["test"]["type"] self.dataset_info = self.pose_model.cfg.data["test"].get( "self.dataset_info", None @@ -186,7 +187,7 @@ def __init__(self, args): self.tracks_in_current_image = {} self.tracks = {} # all the tracks along time, we need to keep and history with some data - # shared variables for the received images and pcl + # shared variables for the received images and pcl self.rgb = None # Image frame self.depth = None # Image frame @@ -208,12 +209,12 @@ def __init__(self, args): self.last_inferred_seq = -1 self.depth_current_seq = -1 self.current_image_count = 0 - self.rgb_frame_id = None # received from ROS image + self.rgb_frame_id = None # received from ROS image, used as frame_id in publisher - # CV Bridge for receiving frames + # CV Bridge for receiving frames self.br = CvBridge() - # Set ROS node rate + # Set ROS node rate prInfo("Setting node rate to {} fps".format(args.fps)) self.loop_rate = rospy.Rate(args.fps) @@ -287,7 +288,6 @@ def callback_pcl(self, msg): (msg.height, msg.width, -1) ) - # pcl_array = pcl_array[::-1, :, :] self.pcl_array_xyz = pcl_array[:, :, :3] # self.pcl_array_rgb = pcl_array[:,:,3:] self.pcl_current_seq = msg.header.seq @@ -296,12 +296,12 @@ def callback_pcl(self, msg): def callback_rgb(self, msg): if self.rgb_frame_id != msg.header.frame_id: self.rgb_frame_id = msg.header.frame_id + if self.args.flip: self.rgb = cv2.flip(self.br.imgmsg_to_cv2(msg, "bgr8"), -1) else: self.rgb = self.br.imgmsg_to_cv2(msg, "bgr8") - # self.rgb = cv2.rotate(self.rgb, cv2.ROTATE_180) self.rgb_current_seq = msg.header.seq # rospy.loginfo('RGB received ({})...'.format(msg.header.seq)) self.rgb_timestamp = msg.header.stamp @@ -316,6 +316,11 @@ def callback_depth(self, msg): # rospy.loginfo('Depth received ({})...'.format(msg.header.seq)) def is_ready(self): + """ Make sure the node is ready to process (i.e. it has received an RGB image, a depth image and a PCL) + + Returns: + bool: True if ready + """ ready = ( (self.rgb is not None) and (self.depth is not None) @@ -325,7 +330,6 @@ def is_ready(self): @timeit def save_rgb(self, image_count, image_seq_unique, timestamp): - prWarning("Saving images here may suffer synchronization issues, use visualizer.py for lighter save") rgb_path = os.path.join( self.save_dir_rgb, "{:08d}_seq_{:010d}_ts_{}.png".format( @@ -337,7 +341,6 @@ def save_rgb(self, image_count, image_seq_unique, timestamp): @timeit def save_depth(self, image_count, image_seq_unique, timestamp): - prWarning("Saving images here may suffer synchronization issues, use visualizer.py for lighter save") depth_path = os.path.join( self.save_dir_depth, "{:08d}_seq_{:010d}_ts_{}.png".format( @@ -349,7 +352,6 @@ def save_depth(self, image_count, image_seq_unique, timestamp): @timeit def save_output_image(self, image_count, image_seq_unique, timestamp): - prWarning("Saving images here may suffer synchronization issues, use visualizer.py for lighter save") results_path = os.path.join( self.save_dir_result, "{:08d}_seq_{:010d}_ts_{}.png".format( @@ -361,7 +363,6 @@ def save_output_image(self, image_count, image_seq_unique, timestamp): @timeit def save_pcl(self, image_count, image_seq_unique, timestamp): - prWarning("Saving images here may suffer synchronization issues, use visualizer.py for lighter save") pcl_path = os.path.join( self.save_dir_pcl_bin, "{:08d}_seq_{:010d}_ts_{}.bin".format( @@ -373,6 +374,12 @@ def save_pcl(self, image_count, image_seq_unique, timestamp): @timeit def plot_mmdet_bbox(self, mmdet_results, array_shape): + """ Plot all the bounding box detected in the image by the mmdet model (eg. YOLOv3) if score is superior to bbox_thr. Add classes names (assuming it is from YOLO_COCO_80_CLASSES). + + Args: + mmdet_results (list): list of detections following the format of mmdet output + array_shape (np.array): array describing the output image width and height + """ for c in range(len(mmdet_results)): if len(mmdet_results[c]) > 0: for bi in range(mmdet_results[c].shape[0]): @@ -426,6 +433,15 @@ def plot_mmdet_bbox(self, mmdet_results, array_shape): @timeit def plot_xyxy_person_bbox(self, idx, bbox, array_shape, track, poses_torso = None): + """ Plot the bounding box of detected humans with associated informations + + Args: + idx (int): track if of the detected human + bbox (np.array or list): bounding box of the detected human [x_min, y_min, x_max, y_max, score] + array_shape (np.array or list): array describing the output image width and height + track (dict): current track (human) information, may include gaze orientation information + poses_torso (np.ndarray, optional): position of the body center in camera frame. Defaults to None. + """ bbox_ints = [ int(bbox[0]), int(bbox[1]), @@ -444,10 +460,10 @@ def plot_xyxy_person_bbox(self, idx, bbox, array_shape, track, poses_torso = Non # color_tuple = (int(color[0]), int(color[1]), int(color[2])) color_tuple = (255,255,255) - # yolo score + # yolo score score = bbox[4] - # current gaze + # current gaze if len(track["gaze_yaw_rad"]) > 0: yaw_g = int(np.rad2deg(track["gaze_yaw_rad"][-1])) pitch_g = int(np.rad2deg(track["gaze_pitch_rad"][-1])) @@ -477,7 +493,7 @@ def plot_xyxy_person_bbox(self, idx, bbox, array_shape, track, poses_torso = Non else: pose_body_n = ["Unk", "Unk", "Unk"] - # attention + # attention heat_count = 0 history = 20 length = min(len(track["depth_face"]), history) @@ -487,18 +503,20 @@ def plot_xyxy_person_bbox(self, idx, bbox, array_shape, track, poses_torso = Non pitch = np.rad2deg(track["gaze_pitch_rad"][index]) depth = track["depth_face"][index] - thresh = (int(depth / 1000) + 1) * 3 # 3 deg per meter + thresh = (int(depth / 1000) + 1) * 6 # 6 deg per meter # TODO : set attention condition somewhere else if np.abs(yaw) < thresh and pitch < 0: + # assume we are looking down heat_count += 1 - # suppose we are looking down attention_score = int( min((heat_count * 2 / history), 1) * 100) draw_bbox_with_corners(self.vis_img, bbox_ints, color = color_tuple, thickness = 5, proportion = 0.2) - text = "Person : {}% | Attention : {}%".format(score, attention_score) + text = "ID : {} | Person : {}% | Attention : {}%".format(idx, score, attention_score) if poses_torso is not None and type(pose_body) == np.ndarray: text2 = "Yaw = {} | Pitch = {} | pos = ({:.2f}, {:.2f}, {:.2f})".format(yaw_g, pitch_g, pose_body_n[0], pose_body_n[1], pose_body_n[2]) + else: + text2 = "Pose undefined (shoulder and/or hips not visible) | Yaw = {} | Pitch = {}".format(yaw_g, pitch_g) cv2.putText( self.vis_img, @@ -510,21 +528,29 @@ def plot_xyxy_person_bbox(self, idx, bbox, array_shape, track, poses_torso = Non 1, ) - if poses_torso is not None and type(pose_body) == np.ndarray: - cv2.putText( - self.vis_img, - text2, - (bbox_ints[0], bbox_ints[1] - 15), - cv2.FONT_HERSHEY_SIMPLEX, - 0.35 * TEXT_SCALE, - color_tuple, - 1, - ) - - # cv2.rectangle(self.vis_img, pt1, pt2, color_tuple, 2) + # if poses_torso is not None and type(pose_body) == np.ndarray: + cv2.putText( + self.vis_img, + text2, + (bbox_ints[0], bbox_ints[1] - 15), + cv2.FONT_HERSHEY_SIMPLEX, + 0.35 * TEXT_SCALE, + color_tuple, + 1, + ) @timeit def process_keypoints(self, keypoints, depth_array, idx): + """Iterate over all keypoints detected for the given track and process them, add special keypoints (wrists) to the track informations in self.tracks_in_current_image + + Args: + keypoints (np.ndarray): array of keypoints detected in the image shape is expected to be ([17,3] : COCO17 format with x,y pixel coordinates + score) + depth_array (np.ndarray): array of depth calibrated with the RGB image (depth in mm) + idx (int): id of the current track + + Returns: + List: list of body center joints (arrays of size [3] : x,y pixel coordinates + score), i.e. hips and shoulders, only if they are detected with a score > kpt_thr and inside the frame + """ body_center_joints = ( [] ) # to store center of lsho, rsho, lhip, rhip in pixels @@ -670,6 +696,19 @@ def process_keypoints(self, keypoints, depth_array, idx): @timeit def get_depth_and_poses_of_torso(self, depth_array, lsho, rsho, lhip, rhip, idx): + """ Compute the depth and position of the detected human using multiple points on its torso + + Args: + depth_array (np.ndarray): depth image calibrated with RGB image (in mm) + lsho (np.array): array of size [3] with left shoulder keypoint data : x,y pixel coordinates and score + rsho (np.array): array of size [3] with right shoulder keypoint data : x,y pixel coordinates and score + lhip (np.array): array of size [3] with left hip keypoint data : x,y pixel coordinates and score + rhip (np.array): array of size [3] with right hip keypoint data : x,y pixel coordinates and score + idx (int): id of the current description + + Returns: + List, List: lists of valid depth and poses (if depth is 0, ie. pixel is a blank spot in the depth image, then it is discarded) + """ color = RANDOM_COLORS[idx] # color_tuple = (int(color[0]), int(color[1]), int(color[2])) @@ -774,7 +813,12 @@ def plot_body_pose_data(self, body_center, depth_body, pose_body, idx): @timeit def plot_skeleton_2d(self, keypoints, idx): + """Plot skeleton limbds assuming the keypoints are COCO17 format + Args: + keypoints (np.ndarray): array of shape [17,3] with the body keypoints + idx (int): id of the track + """ color = RANDOM_COLORS[idx] # color_tuple = (int(color[0]), int(color[1]), int(color[2])) color_tuple = (255,255,255) @@ -839,81 +883,90 @@ def plot_det_text_info(self, pose_closest): 3, ) - @timeit - def plot_gaze_text_info(self, gaze_res, head_outputs, body_outputs, head_bb_abs, idx): - prediction = gaze_res['direction'] - kappa = gaze_res['kappa'][0, -1].item() - prediction_body = body_outputs['direction'] - prediction_head = head_outputs['direction'] + # @timeit + # def plot_gaze_text_info(self, gaze_res, head_outputs, body_outputs, head_bb_abs, idx): + # prediction = gaze_res['direction'] + # kappa = gaze_res['kappa'][0, -1].item() + # prediction_body = body_outputs['direction'] + # prediction_head = head_outputs['direction'] - prediction_show = prediction.clone().cpu().detach().numpy()[0, -1, :] - prediction_show_body = prediction_body.clone().cpu().detach().numpy()[0, -1, :] - prediction_show_head = prediction_head.clone().cpu().detach().numpy()[0, -1, :] + # prediction_show = prediction.clone().cpu().detach().numpy()[0, -1, :] + # prediction_show_body = prediction_body.clone().cpu().detach().numpy()[0, -1, :] + # prediction_show_head = prediction_head.clone().cpu().detach().numpy()[0, -1, :] - prediction_show_norm = prediction_show / np.linalg.norm(prediction_show) - prediction_show_norm_body = prediction_show_body / np.linalg.norm(prediction_show_body) - prediction_show_norm_head = prediction_show_head / np.linalg.norm(prediction_show_head) + # prediction_show_norm = prediction_show / np.linalg.norm(prediction_show) + # prediction_show_norm_body = prediction_show_body / np.linalg.norm(prediction_show_body) + # prediction_show_norm_head = prediction_show_head / np.linalg.norm(prediction_show_head) - cv2.putText( - self.vis_img, - "Gaze {:.2f} {:.2f} {:.2f} ({:.2f})".format( - prediction_show_norm[0], prediction_show_norm[1], prediction_show_norm[2], kappa - ), - (30, 70), - cv2.FONT_HERSHEY_SIMPLEX, - 1 * TEXT_SCALE, - (255, 255, 255), - 5, - ) + # cv2.putText( + # self.vis_img, + # "Gaze {:.2f} {:.2f} {:.2f} ({:.2f})".format( + # prediction_show_norm[0], prediction_show_norm[1], prediction_show_norm[2], kappa + # ), + # (30, 70), + # cv2.FONT_HERSHEY_SIMPLEX, + # 1 * TEXT_SCALE, + # (255, 255, 255), + # 5, + # ) - cv2.putText( - self.vis_img, - "Gaze {:.2f} {:.2f} {:.2f} ({:.2f})".format( - prediction_show_norm[0], prediction_show_norm[1], prediction_show_norm[2], kappa - ), - (30, 70), - cv2.FONT_HERSHEY_SIMPLEX, - 1 * TEXT_SCALE, - (0, 255, 0), - 3, - ) - - - @timeit - def plot_gaze_and_body_dir(self, gaze_res, head_outputs, body_outputs, head_bb_abs, body_bbox): - head_bb_abs[2] += head_bb_abs[0] - head_bb_abs[3] += head_bb_abs[1] + # cv2.putText( + # self.vis_img, + # "Gaze {:.2f} {:.2f} {:.2f} ({:.2f})".format( + # prediction_show_norm[0], prediction_show_norm[1], prediction_show_norm[2], kappa + # ), + # (30, 70), + # cv2.FONT_HERSHEY_SIMPLEX, + # 1 * TEXT_SCALE, + # (0, 255, 0), + # 3, + # ) + + + # @timeit + # def plot_gaze_and_body_dir(self, gaze_res, head_outputs, body_outputs, head_bb_abs, body_bbox): + # head_bb_abs[2] += head_bb_abs[0] + # head_bb_abs[3] += head_bb_abs[1] - prediction = gaze_res['direction'] - prediction_body = body_outputs['direction'] - prediction_head = head_outputs['direction'] + # prediction = gaze_res['direction'] + # prediction_body = body_outputs['direction'] + # prediction_head = head_outputs['direction'] - prediction_show = prediction.clone().cpu().detach().numpy()[0, -1, :] - prediction_show_body = prediction_body.clone().cpu().detach().numpy()[0, -1, :] - prediction_show_head = prediction_head.clone().cpu().detach().numpy()[0, -1, :] + # prediction_show = prediction.clone().cpu().detach().numpy()[0, -1, :] + # prediction_show_body = prediction_body.clone().cpu().detach().numpy()[0, -1, :] + # prediction_show_head = prediction_head.clone().cpu().detach().numpy()[0, -1, :] - prediction_show_norm = prediction_show / np.linalg.norm(prediction_show) - prediction_show_norm_body = prediction_show_body / np.linalg.norm(prediction_show_body) - prediction_show_norm_head = prediction_show_head / np.linalg.norm(prediction_show_head) + # prediction_show_norm = prediction_show / np.linalg.norm(prediction_show) + # prediction_show_norm_body = prediction_show_body / np.linalg.norm(prediction_show_body) + # prediction_show_norm_head = prediction_show_head / np.linalg.norm(prediction_show_head) - gaze_dir_2d = prediction_show_norm[0:2] - body_dir_2d = prediction_show_norm_body[0:2] - head_dir_2d = prediction_show_norm_head[0:2] + # gaze_dir_2d = prediction_show_norm[0:2] + # body_dir_2d = prediction_show_norm_body[0:2] + # head_dir_2d = prediction_show_norm_head[0:2] - body_center = (int((body_bbox[0] + body_bbox[2]) / 2), int((body_bbox[1] + body_bbox[3]) / 2)) - head_center = (int(head_bb_abs[0] / 2 + head_bb_abs[2] / 2), int(head_bb_abs[1] / 2 + head_bb_abs[3] / 2)) + # body_center = (int((body_bbox[0] + body_bbox[2]) / 2), int((body_bbox[1] + body_bbox[3]) / 2)) + # head_center = (int(head_bb_abs[0] / 2 + head_bb_abs[2] / 2), int(head_bb_abs[1] / 2 + head_bb_abs[3] / 2)) - des = (head_center[0] + int(gaze_dir_2d[0]*150), int(head_center[1] + gaze_dir_2d[1]*150)) - des_body = (body_center[0] + int(body_dir_2d[0]*150), int(body_center[1] + body_dir_2d[1]*150)) - des_head = (head_center[0] + int(head_dir_2d[0]*150), int(head_center[1] + head_dir_2d[1]*150)) + # des = (head_center[0] + int(gaze_dir_2d[0]*150), int(head_center[1] + gaze_dir_2d[1]*150)) + # des_body = (body_center[0] + int(body_dir_2d[0]*150), int(body_center[1] + body_dir_2d[1]*150)) + # des_head = (head_center[0] + int(head_dir_2d[0]*150), int(head_center[1] + head_dir_2d[1]*150)) - cv2.arrowedLine(self.vis_img, head_center, des, (0, 255, 0), 3, tipLength=0.3) - cv2.arrowedLine(self.vis_img, body_center, des_body, (0, 255, 255), 3, tipLength=0.3) - cv2.arrowedLine(self.vis_img, head_center, des_head, (255, 255, 255), 3, tipLength=0.3) + # cv2.arrowedLine(self.vis_img, head_center, des, (0, 255, 0), 3, tipLength=0.3) + # cv2.arrowedLine(self.vis_img, body_center, des_body, (0, 255, 255), 3, tipLength=0.3) + # cv2.arrowedLine(self.vis_img, head_center, des_head, (255, 255, 255), 3, tipLength=0.3) @timeit def plot_gaze_from_pitch_yaw(self, pitch, yaw, head_bb_abs, idx, keypoints): + """ Plot 2D reprojection of the computed gaze direction + + Args: + pitch (float): pitch of gaze in rad + yaw (float): yaw of gaze in rad (0 = facing camera) + head_bb_abs (lsit of np.array): bounding box of the head with format [x_min, y_min, width, height] + idx (int): id of the current track + keypoints (np.ndarray): array of shape [17,3] with the body keypoints + """ # color = RANDOM_COLORS[idx] # color_tuple = (int(color[0]), int(color[1]), int(color[2])) @@ -940,141 +993,148 @@ def plot_gaze_from_pitch_yaw(self, pitch, yaw, head_bb_abs, idx, keypoints): cv2.arrowedLine(self.vis_img, head_center, des, color_tuple, 2, tipLength=0.1) cv2.circle(self.vis_img, head_center, 5, color = color_tuple, thickness=-1) - @timeit - def plot_gaze_angle_info(self, pitch, yaw, head_bb, idx): - color = RANDOM_COLORS[idx] - color_tuple = (int(color[0]), int(color[1]), int(color[2])) - - cv2.putText( - self.vis_img, - "{:.2f} {:.2f} deg".format( - pitch, yaw - ), - (head_bb[0] + 30, head_bb[1] + 30), - cv2.FONT_HERSHEY_SIMPLEX, - 0.5 * TEXT_SCALE, - (255,255,255), - 2, - ) - cv2.putText( - self.vis_img, - "{:.2f} {:.2f} deg".format( - pitch, yaw - ), - (head_bb[0] + 30, head_bb[1] + 30), - cv2.FONT_HERSHEY_SIMPLEX, - 0.5 * TEXT_SCALE, - color_tuple, - 1, - ) + # @timeit + # def plot_gaze_angle_info(self, pitch, yaw, head_bb, idx): + # color = RANDOM_COLORS[idx] + # color_tuple = (int(color[0]), int(color[1]), int(color[2])) + + # cv2.putText( + # self.vis_img, + # "{:.2f} {:.2f} deg".format( + # pitch, yaw + # ), + # (head_bb[0] + 30, head_bb[1] + 30), + # cv2.FONT_HERSHEY_SIMPLEX, + # 0.5 * TEXT_SCALE, + # (255,255,255), + # 2, + # ) + # cv2.putText( + # self.vis_img, + # "{:.2f} {:.2f} deg".format( + # pitch, yaw + # ), + # (head_bb[0] + 30, head_bb[1] + 30), + # cv2.FONT_HERSHEY_SIMPLEX, + # 0.5 * TEXT_SCALE, + # color_tuple, + # 1, + # ) - @timeit - def get_gafa_input_from_current_image(self, image, keypoints, body_yolo_bbox): - - body_yolo_bbox_int = {} - body_yolo_bbox_int["u"] = int(body_yolo_bbox[0]) - body_yolo_bbox_int["v"] = int(body_yolo_bbox[1]) - body_yolo_bbox_int["w"] = int(body_yolo_bbox[2] - body_yolo_bbox[0]) - body_yolo_bbox_int["h"] = int(body_yolo_bbox[3] - body_yolo_bbox[1]) - - # use torch instead of PIL because faster conversion - # image_pil = PILImage.fromarray(image) - image_torch = torch.from_numpy(image.copy()).moveaxis(2, 0) + # @timeit + # def get_gafa_input_from_current_image(self, image, keypoints, body_yolo_bbox): + + # body_yolo_bbox_int = {} + # body_yolo_bbox_int["u"] = int(body_yolo_bbox[0]) + # body_yolo_bbox_int["v"] = int(body_yolo_bbox[1]) + # body_yolo_bbox_int["w"] = int(body_yolo_bbox[2] - body_yolo_bbox[0]) + # body_yolo_bbox_int["h"] = int(body_yolo_bbox[3] - body_yolo_bbox[1]) + + # # use torch instead of PIL because faster conversion + # # image_pil = PILImage.fromarray(image) + # image_torch = torch.from_numpy(image.copy()).moveaxis(2, 0) - item = { - "image": image_torch, - "keypoints": keypoints[:, :2], - } - - # get head bb in pixels - head_trans = head_transform(item) - head_bb = head_trans['bb'] - head_bb = np.array([head_bb['u'], head_bb['v'], head_bb['w'], head_bb['h']]).astype(np.float32) + # item = { + # "image": image_torch, + # "keypoints": keypoints[:, :2], + # } + + # # get head bb in pixels + # head_trans = head_transform(item) + # head_bb = head_trans['bb'] + # head_bb = np.array([head_bb['u'], head_bb['v'], head_bb['w'], head_bb['h']]).astype(np.float32) - # get body bb in pixels - # body_trans = body_transform(item) - body_trans = body_transform(item) - body_bb = body_trans['bb'] - body_bb = np.array([body_bb['u'], body_bb['v'], body_bb['w'], body_bb['h']]) - body_image = body_trans['image'] # keep as tensor + # # get body bb in pixels + # # body_trans = body_transform(item) + # body_trans = body_transform(item) + # body_bb = body_trans['bb'] + # body_bb = np.array([body_bb['u'], body_bb['v'], body_bb['w'], body_bb['h']]) + # body_image = body_trans['image'] # keep as tensor - # change head bb to relative to body bb - head_bb_abs = head_bb.copy() + # # change head bb to relative to body bb + # head_bb_abs = head_bb.copy() - head_bb[0] -= body_bb[0] - head_bb[1] -= body_bb[1] + # head_bb[0] -= body_bb[0] + # head_bb[1] -= body_bb[1] - head_bb[0] = head_bb[0] / body_bb[2] - head_bb[1] = head_bb[1] / body_bb[3] - head_bb[2] = head_bb[2] / body_bb[2] - head_bb[3] = head_bb[3] / body_bb[3] + # head_bb[0] = head_bb[0] / body_bb[2] + # head_bb[1] = head_bb[1] / body_bb[3] + # head_bb[2] = head_bb[2] / body_bb[2] + # head_bb[3] = head_bb[3] / body_bb[3] - # store body center - norm_body_center = (body_bb[[0, 1]] + body_bb[[2, 3]] / 2) / body_bb[[2,3]] + # # store body center + # norm_body_center = (body_bb[[0, 1]] + body_bb[[2, 3]] / 2) / body_bb[[2,3]] - # normalize image - # img = normalize_img(image = body_image)['image'] # with albumnentations normalization - # img = img.transpose(2, 0, 1) # with albumnentations normalization - img = normalize_img_torch((body_image.float())/255) # ith torchvision normalization, to float and in range [0-1] before normalization - - assert(img.shape[0] == 3) - assert(img.shape[1] == 256) - assert(img.shape[2] == 192) + # # normalize image + # # img = normalize_img(image = body_image)['image'] # with albumnentations normalization + # # img = img.transpose(2, 0, 1) # with albumnentations normalization + # img = normalize_img_torch((body_image.float())/255) # ith torchvision normalization, to float and in range [0-1] before normalization + + # assert(img.shape[0] == 3) + # assert(img.shape[1] == 256) + # assert(img.shape[2] == 192) - # create mask of head bounding box - head_mask = np.zeros((1, img.shape[1], img.shape[2])) - head_bb_int = head_bb.copy() - head_bb_int[[0, 2]] *= img.shape[2] - head_bb_int[[1, 3]] *= img.shape[1] - head_bb_int[2] += head_bb_int[0] - head_bb_int[3] += head_bb_int[1] - head_bb_int = head_bb_int.astype(np.int64) - head_bb_int[head_bb_int < 0] = 0 - - head_mask[:, head_bb_int[1]:head_bb_int[3], head_bb_int[0]:head_bb_int[2]] = 1 + # # create mask of head bounding box + # head_mask = np.zeros((1, img.shape[1], img.shape[2])) + # head_bb_int = head_bb.copy() + # head_bb_int[[0, 2]] *= img.shape[2] + # head_bb_int[[1, 3]] *= img.shape[1] + # head_bb_int[2] += head_bb_int[0] + # head_bb_int[3] += head_bb_int[1] + # head_bb_int = head_bb_int.astype(np.int64) + # head_bb_int[head_bb_int < 0] = 0 + + # head_mask[:, head_bb_int[1]:head_bb_int[3], head_bb_int[0]:head_bb_int[2]] = 1 - return img, head_mask, norm_body_center, head_bb_abs + # return img, head_mask, norm_body_center, head_bb_abs - @timeit - def plot_overlay_face_attention(self, track, head_bbox): - x_min, y_min, x_max, y_max = map(int, head_bbox[:4]) + # @timeit + # def plot_overlay_face_attention(self, track, head_bbox): + # x_min, y_min, x_max, y_max = map(int, head_bbox[:4]) - valid_depths = [] - valid_yaws = [] - valid_pitchs = [] + # valid_depths = [] + # valid_yaws = [] + # valid_pitchs = [] - heat_count = 0 - history = 1 - length = min(len(track["depth_face"]), history) - for i in range(length): - index = len(track["depth_face"]) - i - 1 - yaw = np.rad2deg(track["gaze_yaw_rad"][index]) - pitch = np.rad2deg(track["gaze_pitch_rad"][index]) - depth = track["depth_face"][index] + # heat_count = 0 + # history = 1 + # length = min(len(track["depth_face"]), history) + # for i in range(length): + # index = len(track["depth_face"]) - i - 1 + # yaw = np.rad2deg(track["gaze_yaw_rad"][index]) + # pitch = np.rad2deg(track["gaze_pitch_rad"][index]) + # depth = track["depth_face"][index] - thresh = (int(depth / 1000) + 1) * 5 # 5 deg per meter - if np.abs(yaw) < thresh and np.abs(pitch) < thresh: - heat_count += 1 + # thresh = (int(depth / 1000) + 1) * 5 # 5 deg per meter + # if np.abs(yaw) < thresh and np.abs(pitch) < thresh: + # heat_count += 1 - cv2.putText( - self.vis_img, - "{:d}".format( - heat_count - ), - (x_min - 30, y_min + 30), - cv2.FONT_HERSHEY_SIMPLEX, - 0.5 * TEXT_SCALE, - (255,0,255), - 2, - ) + # cv2.putText( + # self.vis_img, + # "{:d}".format( + # heat_count + # ), + # (x_min - 30, y_min + 30), + # cv2.FONT_HERSHEY_SIMPLEX, + # 0.5 * TEXT_SCALE, + # (255,0,255), + # 2, + # ) - overlay_img = self.vis_img.copy() - cv2.rectangle(overlay_img, (x_min,y_min), (x_max,y_max), color = (0,255,0), thickness = -1) - strength = (heat_count / history) * 0.75 - self.vis_img = cv2.addWeighted(self.vis_img,(1-strength),overlay_img,strength,0) + # overlay_img = self.vis_img.copy() + # cv2.rectangle(overlay_img, (x_min,y_min), (x_max,y_max), color = (0,255,0), thickness = -1) + # strength = (heat_count / history) * 0.75 + # self.vis_img = cv2.addWeighted(self.vis_img,(1-strength),overlay_img,strength,0) @timeit def plot_overlay_face_attention_6d(self, track, head_bbox, keypoints): + """ Plot ellipse around the eyes, colored depending on the attention of the person + + Args: + track (dict): information oin the current track including yaw and pitch of gaze in the last frames + head_bbox (np.array): array of size [4] or [5] with bounding box as [x_min, y_min, x_max, y_max, (score)] + keypoints (np.ndarray): array of shape [17,3] with the body keypoints + """ x_min, y_min, x_max, y_max = map(int, head_bbox[:4]) valid_depths = [] @@ -1293,209 +1353,209 @@ def start(self): } # if history is long enough, process the trajectory with MotionBERT - if self.args.use_mb and len(self.tracks[idx]["keypoints_2d"]) >= self.args.mb_clip_len: - prInfo("Running MotionBERT for track {}".format(idx)) + # if self.args.use_mb and len(self.tracks[idx]["keypoints_2d"]) >= self.args.mb_clip_len: + # prInfo("Running MotionBERT for track {}".format(idx)) - # prepare motion - motion = np.asarray(self.tracks[idx]["keypoints_2d"]) # T, 17, 3 - motion = motion[-self.args.mb_clip_len:, :, :] # keep only the required len - assert(motion.shape[1] == 17) - assert(motion.shape[2] == 3) - motion_h36 = coco2h36m(motion) # input is h36 format - motion_h36_scaled = crop_scale(motion_h36) # scale [1,1], normalize, crop + # # prepare motion + # motion = np.asarray(self.tracks[idx]["keypoints_2d"]) # T, 17, 3 + # motion = motion[-self.args.mb_clip_len:, :, :] # keep only the required len + # assert(motion.shape[1] == 17) + # assert(motion.shape[2] == 3) + # motion_h36 = coco2h36m(motion) # input is h36 format + # motion_h36_scaled = crop_scale(motion_h36) # scale [1,1], normalize, crop - with torch.no_grad(): - current_input = torch.Tensor(motion_h36_scaled).unsqueeze(0).cuda() - tic = time.time() - predicted_3d_pos = self.motionbert_3d_model(current_input) - tac = time.time() - prTimer("MotionBERT", tic, tac) - # root relative - predicted_3d_pos[:,:,0,:] = 0 # [1,T,17,3] + # with torch.no_grad(): + # current_input = torch.Tensor(motion_h36_scaled).unsqueeze(0).cuda() + # tic = time.time() + # predicted_3d_pos = self.motionbert_3d_model(current_input) + # tac = time.time() + # prTimer("MotionBERT", tic, tac) + # # root relative + # predicted_3d_pos[:,:,0,:] = 0 # [1,T,17,3] - predicted_3d_pos_np = predicted_3d_pos[0,-1,:,:].cpu().numpy() # keep only the last prediction - if "keypoints_3d" in self.tracks[idx].keys(): - self.tracks[idx]["keypoints_3d"].append(predicted_3d_pos_np) - else: - self.tracks[idx]["keypoints_3d"] = [predicted_3d_pos_np] * self.args.mb_clip_len # add fake padding at the begining so the lists align + # predicted_3d_pos_np = predicted_3d_pos[0,-1,:,:].cpu().numpy() # keep only the last prediction + # if "keypoints_3d" in self.tracks[idx].keys(): + # self.tracks[idx]["keypoints_3d"].append(predicted_3d_pos_np) + # else: + # self.tracks[idx]["keypoints_3d"] = [predicted_3d_pos_np] * self.args.mb_clip_len # add fake padding at the begining so the lists align # print("len compare", idx, len(self.tracks[idx]["keypoints_3d"]), len(self.tracks[idx]["keypoints_2d"]), color = "yan") - # (run for every track or only closest ?) add input for gafa processing - # if for everyone should run in batch - if self.args.use_gafa and (len(self.tracks[idx]["images_crop"]) >= self.args.gafa_n_frames or self.args.gafa_no_history): - gafa_tic = time.time() + # # (run for every track or only closest ?) add input for gafa processing + # # if for everyone should run in batch + # if self.args.use_gafa and (len(self.tracks[idx]["images_crop"]) >= self.args.gafa_n_frames or self.args.gafa_no_history): + # gafa_tic = time.time() - # Make sure that the image is rgb and not bgr, may need conversion ! - # img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) - # im_pil = Image.fromarray(img) - crop_img, head_mask, norm_body_center, head_bb_abs = self.get_gafa_input_from_current_image(rgb_array[:,:,::-1], keypoints, bbox) - - if self.args.gafa_no_history: - # no history : duplicate the last image - images = np.repeat(crop_img[np.newaxis, :, :, :], self.args.gafa_n_frames, axis = 0) # torch.Tensor of size [n_frames, 3, 256, 192] - head_masks = np.repeat(head_mask[np.newaxis, :, :, :], self.args.gafa_n_frames, axis = 0) # numpy.ndarray of size [n_frames, 3, 256, 192] - body_dvs = np.zeros((self.args.gafa_n_frames, 2)) # numpy.ndarray of size n_frames, 2 + # # Make sure that the image is rgb and not bgr, may need conversion ! + # # img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) + # # im_pil = Image.fromarray(img) + # crop_img, head_mask, norm_body_center, head_bb_abs = self.get_gafa_input_from_current_image(rgb_array[:,:,::-1], keypoints, bbox) + + # if self.args.gafa_no_history: + # # no history : duplicate the last image + # images = np.repeat(crop_img[np.newaxis, :, :, :], self.args.gafa_n_frames, axis = 0) # torch.Tensor of size [n_frames, 3, 256, 192] + # head_masks = np.repeat(head_mask[np.newaxis, :, :, :], self.args.gafa_n_frames, axis = 0) # numpy.ndarray of size [n_frames, 3, 256, 192] + # body_dvs = np.zeros((self.args.gafa_n_frames, 2)) # numpy.ndarray of size n_frames, 2 - else: - # history : use the last saved n images - self.tracks[idx]["images_crop"].append(crop_img) - self.tracks[idx]["head_masks"].append(head_mask) - self.tracks[idx]["norm_body_centers"].append(norm_body_center) + # else: + # # history : use the last saved n images + # self.tracks[idx]["images_crop"].append(crop_img) + # self.tracks[idx]["head_masks"].append(head_mask) + # self.tracks[idx]["norm_body_centers"].append(norm_body_center) - images = torch.stack(self.tracks[idx]["images_crop"][-self.args.gafa_n_frames:], dim = 0) # torch.Tensor of size n_frames, 3, 256, 192 - head_masks = np.asarray(self.tracks[idx]["head_masks"][-self.args.gafa_n_frames:]) # numpy.ndarray of size n_frames, 1, 256, 192 - norm_body_centers = np.asarray(self.tracks[idx]["norm_body_centers"][-self.args.gafa_n_frames:]) # numpy.ndarray of size n_frames, 2 - body_dvs = norm_body_centers - np.roll(norm_body_centers, shift=1, axis=0) # numpy.ndarray of size n_frames, 2 + # images = torch.stack(self.tracks[idx]["images_crop"][-self.args.gafa_n_frames:], dim = 0) # torch.Tensor of size n_frames, 3, 256, 192 + # head_masks = np.asarray(self.tracks[idx]["head_masks"][-self.args.gafa_n_frames:]) # numpy.ndarray of size n_frames, 1, 256, 192 + # norm_body_centers = np.asarray(self.tracks[idx]["norm_body_centers"][-self.args.gafa_n_frames:]) # numpy.ndarray of size n_frames, 2 + # body_dvs = norm_body_centers - np.roll(norm_body_centers, shift=1, axis=0) # numpy.ndarray of size n_frames, 2 - with torch.no_grad(): - # debug_dic = {} + # with torch.no_grad(): + # # debug_dic = {} - images = images.unsqueeze(0) #.cuda().float() - head_masks = torch.from_numpy(head_masks).unsqueeze(0) #.cuda().float() - body_dvs = torch.from_numpy(body_dvs).unsqueeze(0) #.cuda().float() + # images = images.unsqueeze(0) #.cuda().float() + # head_masks = torch.from_numpy(head_masks).unsqueeze(0) #.cuda().float() + # body_dvs = torch.from_numpy(body_dvs).unsqueeze(0) #.cuda().float() - # last_img = images[0, -1, : ,: ,:].clone() - # for i in range(7): - # images[0, i, : ,: ,:] = last_img - images = images.cuda().float() + # # last_img = images[0, -1, : ,: ,:].clone() + # # for i in range(7): + # # images[0, i, : ,: ,:] = last_img + # images = images.cuda().float() - # last_mask = head_masks[0, -1, : ,: ,:].clone() - # for i in range(7): - # head_masks[0, i, : ,: ,:] = last_mask - head_masks = head_masks.cuda().float() + # # last_mask = head_masks[0, -1, : ,: ,:].clone() + # # for i in range(7): + # # head_masks[0, i, : ,: ,:] = last_mask + # head_masks = head_masks.cuda().float() - # body_dvs = torch.zeros(body_dvs.shape) - body_dvs = body_dvs.cuda().float() + # # body_dvs = torch.zeros(body_dvs.shape) + # body_dvs = body_dvs.cuda().float() - # debug_dic["images"] = images.clone().cpu().numpy() - # debug_dic["head_masks"] = head_masks.clone().cpu().numpy() - # debug_dic["body_dvs"] = body_dvs.clone().cpu().numpy() + # # debug_dic["images"] = images.clone().cpu().numpy() + # # debug_dic["head_masks"] = head_masks.clone().cpu().numpy() + # # debug_dic["body_dvs"] = body_dvs.clone().cpu().numpy() - tic = time.time() - gaze_res, head_outputs, body_outputs = self.gafa_model(images, head_masks, body_dvs) - tac = time.time() - prTimer("GAFA", tic, tac) - - # debug_dic["gaze_res"] = gaze_res["direction"].clone().cpu().numpy() - # debug_dic["head_outputs"] = head_outputs["direction"].clone().cpu().numpy() - # debug_dic["body_outputs"] = body_outputs["direction"].clone().cpu().numpy() + # tic = time.time() + # gaze_res, head_outputs, body_outputs = self.gafa_model(images, head_masks, body_dvs) + # tac = time.time() + # prTimer("GAFA", tic, tac) + + # # debug_dic["gaze_res"] = gaze_res["direction"].clone().cpu().numpy() + # # debug_dic["head_outputs"] = head_outputs["direction"].clone().cpu().numpy() + # # debug_dic["body_outputs"] = body_outputs["direction"].clone().cpu().numpy() - # with open('./debug_dic.pickle', 'wb') as handle: - # pickle.dump(debug_dic, handle) + # # with open('./debug_dic.pickle', 'wb') as handle: + # # pickle.dump(debug_dic, handle) - # print("GAFA done", color = "green", background = "white") + # # print("GAFA done", color = "green", background = "white") - if not self.args.no_show: - self.plot_gaze_and_body_dir(gaze_res, head_outputs, body_outputs, head_bb_abs, bbox) - self.plot_xyxy_person_bbox(idx, head_bb_abs, depth_array.shape) - self.plot_gaze_text_info(gaze_res, head_outputs, body_outputs, head_bb_abs, idx) + # if not self.args.no_show: + # self.plot_gaze_and_body_dir(gaze_res, head_outputs, body_outputs, head_bb_abs, bbox) + # self.plot_xyxy_person_bbox(idx, head_bb_abs, depth_array.shape) + # self.plot_gaze_text_info(gaze_res, head_outputs, body_outputs, head_bb_abs, idx) - # For debug only - # plt.clf() - # plt.imshow(np.moveaxis(images.clone().cpu().numpy()[0,-1,:,:,:], 0, 2)) - # plt.title("Custom data") - # plt.pause(0.01) + # # For debug only + # # plt.clf() + # # plt.imshow(np.moveaxis(images.clone().cpu().numpy()[0,-1,:,:,:], 0, 2)) + # # plt.title("Custom data") + # # plt.pause(0.01) - gafa_tac = time.time() - prTimer("GAFA full", gafa_tic, gafa_tac) + # gafa_tac = time.time() + # prTimer("GAFA full", gafa_tic, gafa_tac) - else: - if self.args.use_gafa and self.args.gafa_no_history: - prInfo("Did not add inputs because no GAFA history required") - elif self.args.use_gafa: - # do not accumulate if unused - crop_img, head_mask, norm_body_center, head_bb_abs = self.get_gafa_input_from_current_image(rgb_array[:,:,::-1], keypoints, bbox) - self.tracks[idx]["images_crop"].append(crop_img) - self.tracks[idx]["head_masks"].append(head_mask) - self.tracks[idx]["norm_body_centers"].append(norm_body_center) - prInfo("Didn't run GAFA yet, waiting for history") + # else: + # if self.args.use_gafa and self.args.gafa_no_history: + # prInfo("Did not add inputs because no GAFA history required") + # elif self.args.use_gafa: + # # do not accumulate if unused + # crop_img, head_mask, norm_body_center, head_bb_abs = self.get_gafa_input_from_current_image(rgb_array[:,:,::-1], keypoints, bbox) + # self.tracks[idx]["images_crop"].append(crop_img) + # self.tracks[idx]["head_masks"].append(head_mask) + # self.tracks[idx]["norm_body_centers"].append(norm_body_center) + # prInfo("Didn't run GAFA yet, waiting for history") - if self.args.use_gaze_resnet: - with torch.no_grad(): + # if self.args.use_gaze_resnet: + # with torch.no_grad(): - # debug_dic = {"image_full": rgb_array} + # # debug_dic = {"image_full": rgb_array} - item = {"keypoints" : keypoints[:,:2]} - head_trans = head_transform_face(item) - head_bb = head_trans["bb"] - head_bb = np.array([head_bb['u'], head_bb['v'], head_bb['w'], head_bb['h']]).astype(np.int32) + # item = {"keypoints" : keypoints[:,:2]} + # head_trans = head_transform_face(item) + # head_bb = head_trans["bb"] + # head_bb = np.array([head_bb['u'], head_bb['v'], head_bb['w'], head_bb['h']]).astype(np.int32) - tic = time.time() - if self.args.use_face_detector: - face_bboxes, fd_kp = self.face_detector.detect(rgb_array) # or convert to bgr ?? ## only use the body detection so that we match easily... - prWarning("Using face_detector does not provide any matching to the current idx of the frame, only using first detection !") - else: - face_bboxes = np.array([[head_bb[0],head_bb[1],head_bb[0]+head_bb[2],head_bb[1]+head_bb[3]]]) - tac = time.time() - prTimer("Face detetction", tic, tac) + # tic = time.time() + # if self.args.use_face_detector: + # face_bboxes, fd_kp = self.face_detector.detect(rgb_array) # or convert to bgr ?? ## only use the body detection so that we match easily... + # prWarning("Using face_detector does not provide any matching to the current idx of the frame, only using first detection !") + # else: + # face_bboxes = np.array([[head_bb[0],head_bb[1],head_bb[0]+head_bb[2],head_bb[1]+head_bb[3]]]) + # tac = time.time() + # prTimer("Face detetction", tic, tac) - if (face_bboxes.shape[0] > 0): + # if (face_bboxes.shape[0] > 0): - x_min, y_min, x_max, y_max = map(int, face_bboxes[0,:4]) - head_image = rgb_array[y_min:y_max, x_min:x_max] + # x_min, y_min, x_max, y_max = map(int, face_bboxes[0,:4]) + # head_image = rgb_array[y_min:y_max, x_min:x_max] - if (head_image.shape[0] > 10) and (head_image.shape[1] > 10): - head_image = pre_process(head_image) + # if (head_image.shape[0] > 10) and (head_image.shape[1] > 10): + # head_image = pre_process(head_image) - # For debug - # plt.clf() - # plt.imshow(np.moveaxis(head_image.clone().cpu().numpy()[0,:,:,:], 0, 2)) - # plt.title("Custom data") - # plt.pause(0.01) + # # For debug + # # plt.clf() + # # plt.imshow(np.moveaxis(head_image.clone().cpu().numpy()[0,:,:,:], 0, 2)) + # # plt.title("Custom data") + # # plt.pause(0.01) - # debug_dic["image"] = head_image + # # debug_dic["image"] = head_image - pitch, yaw = self.gaze_estimation_model(head_image) + # pitch, yaw = self.gaze_estimation_model(head_image) - # debug_dic["pitch"] = pitch - # debug_dic["yaw"] = yaw + # # debug_dic["pitch"] = pitch + # # debug_dic["yaw"] = yaw - # with open('debuig_dic.pkl', 'wb') as fp: - # pickle.dump(debug_dic, fp) + # # with open('debuig_dic.pkl', 'wb') as fp: + # # pickle.dump(debug_dic, fp) - # Softmax beofre sum - pitch_predicted, yaw_predicted = F.softmax(pitch, dim=1), F.softmax(yaw, dim=1) + # # Softmax beofre sum + # pitch_predicted, yaw_predicted = F.softmax(pitch, dim=1), F.softmax(yaw, dim=1) - # Mapping from binned (0 to 90) to angles (-180 to 180) or (0 to 28) to angles (-42, 42) - idx_tensor = torch.arange(90, device=self.args.device.lower(), dtype=torch.float32) + # # Mapping from binned (0 to 90) to angles (-180 to 180) or (0 to 28) to angles (-42, 42) + # idx_tensor = torch.arange(90, device=self.args.device.lower(), dtype=torch.float32) - pitch_predicted = torch.sum(pitch_predicted * idx_tensor, dim=1) * 4 - 180 - yaw_predicted = torch.sum(yaw_predicted * idx_tensor, dim=1) * 4 - 180 + # pitch_predicted = torch.sum(pitch_predicted * idx_tensor, dim=1) * 4 - 180 + # yaw_predicted = torch.sum(yaw_predicted * idx_tensor, dim=1) * 4 - 180 - pitch_predicted = pitch_predicted.cpu().numpy() - yaw_predicted = yaw_predicted.cpu().numpy() + # pitch_predicted = pitch_predicted.cpu().numpy() + # yaw_predicted = yaw_predicted.cpu().numpy() - # Degrees to Radians - pitch_predicted_rad = np.radians(pitch_predicted) - yaw_predicted_rad = np.radians(yaw_predicted) + # # Degrees to Radians + # pitch_predicted_rad = np.radians(pitch_predicted) + # yaw_predicted_rad = np.radians(yaw_predicted) - self.tracks_in_current_image[idx]["gaze_pitch_rad"] = pitch_predicted_rad - self.tracks_in_current_image[idx]["gaze_yaw_rad"] = yaw_predicted_rad - self.tracks[idx]["gaze_pitch_rad"].append(pitch_predicted_rad) - self.tracks[idx]["gaze_yaw_rad"].append(yaw_predicted_rad) + # self.tracks_in_current_image[idx]["gaze_pitch_rad"] = pitch_predicted_rad + # self.tracks_in_current_image[idx]["gaze_yaw_rad"] = yaw_predicted_rad + # self.tracks[idx]["gaze_pitch_rad"].append(pitch_predicted_rad) + # self.tracks[idx]["gaze_yaw_rad"].append(yaw_predicted_rad) - self.plot_gaze_from_pitch_yaw(pitch_predicted_rad[0], yaw_predicted_rad[0], head_bb, idx, keypoints) - self.plot_gaze_angle_info(pitch_predicted[0], yaw_predicted[0], head_bb, idx) + # self.plot_gaze_from_pitch_yaw(pitch_predicted_rad[0], yaw_predicted_rad[0], head_bb, idx, keypoints) + # self.plot_gaze_angle_info(pitch_predicted[0], yaw_predicted[0], head_bb, idx) - # get face depth - nose = keypoints[0,:2].astype(np.uint32) - leye = keypoints[1,:2].astype(np.uint32) - reye = keypoints[2,:2].astype(np.uint32) + # # get face depth + # nose = keypoints[0,:2].astype(np.uint32) + # leye = keypoints[1,:2].astype(np.uint32) + # reye = keypoints[2,:2].astype(np.uint32) - depth_nose = depth_array[np.clip(nose[1], 0, depth_array.shape[0] - 1), np.clip(nose[0], 0, depth_array.shape[1] - 1)] - depth_leye = depth_array[np.clip(leye[1], 0, depth_array.shape[0] - 1), np.clip(leye[0], 0, depth_array.shape[1] - 1)] - depth_reye = depth_array[np.clip(reye[1], 0, depth_array.shape[0] - 1), np.clip(reye[0], 0, depth_array.shape[1] - 1)] + # depth_nose = depth_array[np.clip(nose[1], 0, depth_array.shape[0] - 1), np.clip(nose[0], 0, depth_array.shape[1] - 1)] + # depth_leye = depth_array[np.clip(leye[1], 0, depth_array.shape[0] - 1), np.clip(leye[0], 0, depth_array.shape[1] - 1)] + # depth_reye = depth_array[np.clip(reye[1], 0, depth_array.shape[0] - 1), np.clip(reye[0], 0, depth_array.shape[1] - 1)] - depth_face = np.median([depth_nose, depth_leye, depth_reye]) + # depth_face = np.median([depth_nose, depth_leye, depth_reye]) - self.tracks_in_current_image[idx]["depth_face"] = depth_face - self.tracks[idx]["depth_face"].append(depth_face) + # self.tracks_in_current_image[idx]["depth_face"] = depth_face + # self.tracks[idx]["depth_face"].append(depth_face) - self.plot_overlay_face_attention(self.tracks[idx], face_bboxes[0,:4]) + # self.plot_overlay_face_attention(self.tracks[idx], face_bboxes[0,:4]) @@ -1611,14 +1671,14 @@ def start(self): self.tracks[idx]["depth_face"].append(depth_face) - # Draw bb - bbox[4] *= 100 + # Draw bb + bbox[4] *= 100 # score percentage bbox = bbox.astype(np.int32) if not self.args.no_show: self.plot_xyxy_person_bbox(idx, bbox, depth_array.shape, self.tracks[idx]) - # return the list of body center joints and also fill self.tracks_in_current_image[idx] + # return the list of body center joints and also fill self.tracks_in_current_image[idx] body_center_joints = self.process_keypoints(keypoints, depth_array, idx) # find the body center @@ -1923,42 +1983,42 @@ def start(self): type=int, default=2, help='Number frames without the track present to keep going before removing a track | default = %(default)s') - parser.add_argument( - "--use_mb", - "-mb", - action="store_true", - default=False, - help="whether to use MotionBERT 3D Lifter | default = %(default)s", - ) - - parser.add_argument('--gafa_checkpoint', type=str, default = "./checkpoint/gafa/GazeNet_PyTorch.pt", help='Checkpoint file for 3D gaze estimation GAFA | default = %(default)s') - parser.add_argument( - '--gafa_n_frames', - type=int, - default=7, - help='Number of past frames to use for GAFA (default in model is 7) | default = %(default)s') - parser.add_argument( - "--use_gafa", - "-gafa", - action="store_true", - default=False, - help="whether to use GAFA 3D Gaze Estimation | default = %(default)s", - ) - parser.add_argument( - "--gafa_no_history", - "-gnh", - action="store_true", - default=False, - help="whether to use history in the GAFA sequence or fake it by copying last image | default = %(default)s", - ) - - parser.add_argument( - "--use_gaze_resnet", - "-resnet", - action="store_true", - default=False, - help="whether to use Gaze ResNet18 3D Gaze Estimation | default = %(default)s", - ) + # parser.add_argument( + # "--use_mb", + # "-mb", + # action="store_true", + # default=False, + # help="whether to use MotionBERT 3D Lifter | default = %(default)s", + # ) + + # parser.add_argument('--gafa_checkpoint', type=str, default = "./checkpoint/gafa/GazeNet_PyTorch.pt", help='Checkpoint file for 3D gaze estimation GAFA | default = %(default)s') + # parser.add_argument( + # '--gafa_n_frames', + # type=int, + # default=7, + # help='Number of past frames to use for GAFA (default in model is 7) | default = %(default)s') + # parser.add_argument( + # "--use_gafa", + # "-gafa", + # action="store_true", + # default=False, + # help="whether to use GAFA 3D Gaze Estimation | default = %(default)s", + # ) + # parser.add_argument( + # "--gafa_no_history", + # "-gnh", + # action="store_true", + # default=False, + # help="whether to use history in the GAFA sequence or fake it by copying last image | default = %(default)s", + # ) + + # parser.add_argument( + # "--use_gaze_resnet", + # "-resnet", + # action="store_true", + # default=False, + # help="whether to use Gaze ResNet18 3D Gaze Estimation | default = %(default)s", + # ) parser.add_argument( "--use_face_detector", "-ufd", @@ -1989,33 +2049,33 @@ def start(self): args = parser.parse_args() - assert has_mmdet, "Please install mmdet to run the demo." + assert has_mmdet, "mmdet install error." assert args.det_config is not None assert args.det_checkpoint is not None - if args.use_mb: - assert(has_mb), "Option --use_mb requires MotionBERT install" + # if args.use_mb: + # assert(has_mb), "Option --use_mb requires MotionBERT install" - if args.use_gafa: - assert(args.use_gaze_resnet == False), "Option --use_gafa and --use_gaze_resnet are not compatible" - assert(args.use_six_d_rep == False), "Option --use_gafa and --use_six_d_rep are not compatible" + # if args.use_gafa: + # assert(args.use_gaze_resnet == False), "Option --use_gafa and --use_gaze_resnet are not compatible" + # assert(args.use_six_d_rep == False), "Option --use_gafa and --use_six_d_rep are not compatible" - if args.use_gaze_resnet: - assert(args.use_gafa == False), "Option --use_gaze_resnet and --use_gafa are not compatible" - assert(args.use_six_d_rep == False), "Option --use_gaze_resnet and --use_six_d_rep are not compatible" + # if args.use_gaze_resnet: + # assert(args.use_gafa == False), "Option --use_gaze_resnet and --use_gafa are not compatible" + # assert(args.use_six_d_rep == False), "Option --use_gaze_resnet and --use_six_d_rep are not compatible" - if args.use_six_d_rep: - assert(args.use_gaze_resnet == False), "Option --use_six_d_rep and --use_gaze_resnet are not compatible" - assert(args.use_gafa == False), "Option --use_six_d_rep and --use_gafa are not compatible" + # if args.use_six_d_rep: + # assert(args.use_gaze_resnet == False), "Option --use_six_d_rep and --use_gaze_resnet are not compatible" + # assert(args.use_gafa == False), "Option --use_six_d_rep and --use_gafa are not compatible" - if args.use_gafa: - assert(has_gafa), "Option --use_gafa requires GAFA install" + # if args.use_gafa: + # assert(has_gafa), "Option --use_gafa requires GAFA install" if args.use_six_d_rep: assert(has_sixdrep), "Option --use_six_d_rep requires 6D Rep install" - if args.use_gaze_resnet: - assert(has_gaze_est), "Option --use_gaze_resnet requires Gaze Estimation" + # if args.use_gaze_resnet: + # assert(has_gaze_est), "Option --use_gaze_resnet requires Gaze Estimation" prInfo("Loaded with args : {}".format(args)) diff --git a/gafa_utils.py b/transform_utils.py similarity index 100% rename from gafa_utils.py rename to transform_utils.py diff --git a/utils.py b/utils.py index 67885e6..ce2a4f1 100644 --- a/utils.py +++ b/utils.py @@ -1,6 +1,17 @@ #!/usr/bin/env python # -*- encoding: utf-8 -*- import os + +key = 'VERBOSE' +VERBOSITY = os.getenv(key) +# 0 = only warnings, errors, success and debug, 1 = + info, 2 = + timer +if VERBOSITY is not None: + VERBOSITY = int(VERBOSITY) +else: + VERBOSITY = 2 + +print("Set verbosity level to : ", VERBOSITY) + import numpy as np import matplotlib.pyplot as plt from print_color import print @@ -28,13 +39,16 @@ def prSuccess(text): print(text, tag = "ok", tag_color = "green", color = "white") def prInfo(text): - print(text, tag = "info", tag_color = "cyan", color = "white") + if VERBOSITY >= 1: + print(text, tag = "info", tag_color = "cyan", color = "white") def prTimer(text, tic, tac): - print("{} {:.0f} ms".format(text, (tac-tic)*1000), tag = "timer", tag_color = "purple", color = "white") + if VERBOSITY >= 2: + print("{} {:.0f} ms".format(text, (tac-tic)*1000), tag = "timer", tag_color = "purple", color = "white") def prInfoBold(text): - print(text, tag = "info", tag_color = "cyan", color = "white", format = "bold") + if VERBOSITY >= 1: + print(text, tag = "info", tag_color = "cyan", color = "white", format = "bold") def prDebug(text): print(text, tag = "debug", tag_color = "red", background = "white", color = "white") @@ -496,6 +510,7 @@ def wrapper_function(*args, **kwargs): tic = time.time() res = func(*args, **kwargs) tac = time.time() - print("{} {:.0f} ms".format(func.__name__, (tac-tic)*1000), tag = "timer", tag_color = "purple", color = "white") + if VERBOSITY >= 2: + print("{} {:.0f} ms".format(func.__name__, (tac-tic)*1000), tag = "timer", tag_color = "purple", color = "white") return res return wrapper_function