|
| 1 | +from typing import Dict, List, Optional, Tuple, Union |
| 2 | + |
| 3 | +import numpy as np |
| 4 | +import supervision as sv |
| 5 | +from pydantic import ConfigDict, Field |
| 6 | +from typing_extensions import Literal, Type |
| 7 | + |
| 8 | +from inference.core.workflows.execution_engine.entities.base import ( |
| 9 | + OutputDefinition, |
| 10 | + WorkflowImageData, |
| 11 | +) |
| 12 | +from inference.core.workflows.execution_engine.entities.types import ( |
| 13 | + FLOAT_KIND, |
| 14 | + INSTANCE_SEGMENTATION_PREDICTION_KIND, |
| 15 | + OBJECT_DETECTION_PREDICTION_KIND, |
| 16 | + Selector, |
| 17 | + StepOutputSelector, |
| 18 | + WorkflowImageSelector, |
| 19 | +) |
| 20 | +from inference.core.workflows.prototypes.block import ( |
| 21 | + BlockResult, |
| 22 | + WorkflowBlock, |
| 23 | + WorkflowBlockManifest, |
| 24 | +) |
| 25 | + |
| 26 | +OUTPUT_KEY: str = "velocity_detections" |
| 27 | +SHORT_DESCRIPTION = "Calculate the velocity and speed of tracked objects with smoothing and unit conversion." |
| 28 | +LONG_DESCRIPTION = """ |
| 29 | +The `VelocityBlock` computes the velocity and speed of objects tracked across video frames. |
| 30 | +It includes options to smooth the velocity and speed measurements over time and to convert units from pixels per second to meters per second. |
| 31 | +It requires detections from Byte Track with unique `tracker_id` assigned to each object, which persists between frames. |
| 32 | +The velocities are calculated based on the displacement of object centers over time. |
| 33 | +
|
| 34 | +Note: due to perspective and camera distortions calculated velocity will be different depending on object position in relation to the camera. |
| 35 | +
|
| 36 | +""" |
| 37 | + |
| 38 | + |
| 39 | +class VelocityManifest(WorkflowBlockManifest): |
| 40 | + model_config = ConfigDict( |
| 41 | + json_schema_extra={ |
| 42 | + "name": "Velocity", |
| 43 | + "version": "v1", |
| 44 | + "short_description": SHORT_DESCRIPTION, |
| 45 | + "long_description": LONG_DESCRIPTION, |
| 46 | + "license": "Apache-2.0", |
| 47 | + "block_type": "analytics", |
| 48 | + } |
| 49 | + ) |
| 50 | + type: Literal["roboflow_core/velocity@v1"] |
| 51 | + image: WorkflowImageSelector |
| 52 | + detections: StepOutputSelector( |
| 53 | + kind=[ |
| 54 | + OBJECT_DETECTION_PREDICTION_KIND, |
| 55 | + INSTANCE_SEGMENTATION_PREDICTION_KIND, |
| 56 | + ] |
| 57 | + ) = Field( # type: ignore |
| 58 | + description="Predictions", |
| 59 | + examples=["$steps.object_detection_model.predictions"], |
| 60 | + ) |
| 61 | + smoothing_alpha: Union[float, Selector(kind=[FLOAT_KIND])] = Field( # type: ignore |
| 62 | + default=0.5, |
| 63 | + description="Smoothing factor (alpha) for exponential moving average (0 < alpha <= 1). Lower alpha means more smoothing.", |
| 64 | + examples=[0.5], |
| 65 | + ) |
| 66 | + pixels_per_meter: Union[float, Selector(kind=[FLOAT_KIND])] = Field( # type: ignore |
| 67 | + default=1.0, |
| 68 | + description="Conversion from pixels to meters. Velocity will be converted to meters per second using this value.", |
| 69 | + examples=[0.01], # Example: 1 pixel = 0.01 meters |
| 70 | + ) |
| 71 | + |
| 72 | + @classmethod |
| 73 | + def describe_outputs(cls) -> List[OutputDefinition]: |
| 74 | + return [ |
| 75 | + OutputDefinition( |
| 76 | + name=OUTPUT_KEY, |
| 77 | + kind=[ |
| 78 | + OBJECT_DETECTION_PREDICTION_KIND, |
| 79 | + INSTANCE_SEGMENTATION_PREDICTION_KIND, |
| 80 | + ], |
| 81 | + ), |
| 82 | + ] |
| 83 | + |
| 84 | + @classmethod |
| 85 | + def get_execution_engine_compatibility(cls) -> Optional[str]: |
| 86 | + return ">=1.0.0,<2.0.0" |
| 87 | + |
| 88 | + |
| 89 | +class VelocityBlockV1(WorkflowBlock): |
| 90 | + def __init__(self): |
| 91 | + # Store previous positions and timestamps for each tracker_id |
| 92 | + self._previous_positions: Dict[ |
| 93 | + str, Dict[Union[int, str], Tuple[np.ndarray, float]] |
| 94 | + ] = {} |
| 95 | + # Store smoothed velocities for each tracker_id |
| 96 | + self._smoothed_velocities: Dict[str, Dict[Union[int, str], np.ndarray]] = {} |
| 97 | + |
| 98 | + @classmethod |
| 99 | + def get_manifest(cls) -> Type[WorkflowBlockManifest]: |
| 100 | + return VelocityManifest |
| 101 | + |
| 102 | + def run( |
| 103 | + self, |
| 104 | + image: WorkflowImageData, |
| 105 | + detections: sv.Detections, |
| 106 | + smoothing_alpha: float, |
| 107 | + pixels_per_meter: float, |
| 108 | + ) -> BlockResult: |
| 109 | + if detections.tracker_id is None: |
| 110 | + raise ValueError( |
| 111 | + "tracker_id not initialized, VelocityBlock requires detections to be tracked" |
| 112 | + ) |
| 113 | + if not (0 < smoothing_alpha <= 1): |
| 114 | + raise ValueError( |
| 115 | + "smoothing_alpha must be between 0 (exclusive) and 1 (inclusive)" |
| 116 | + ) |
| 117 | + if not (pixels_per_meter > 0): |
| 118 | + raise ValueError("pixels_per_meter must be greater than 0") |
| 119 | + |
| 120 | + if image.video_metadata.comes_from_video_file and image.video_metadata.fps != 0: |
| 121 | + ts_current = image.video_metadata.frame_number / image.video_metadata.fps |
| 122 | + else: |
| 123 | + ts_current = image.video_metadata.frame_timestamp.timestamp() |
| 124 | + |
| 125 | + video_id = image.video_metadata.video_identifier |
| 126 | + previous_positions = self._previous_positions.setdefault(video_id, {}) |
| 127 | + smoothed_velocities = self._smoothed_velocities.setdefault(video_id, {}) |
| 128 | + |
| 129 | + num_detections = len(detections) |
| 130 | + |
| 131 | + # Compute current positions (center of bounding boxes) |
| 132 | + bbox_xyxy = detections.xyxy # Shape (num_detections, 4) |
| 133 | + x_centers = (bbox_xyxy[:, 0] + bbox_xyxy[:, 2]) / 2 |
| 134 | + y_centers = (bbox_xyxy[:, 1] + bbox_xyxy[:, 3]) / 2 |
| 135 | + current_positions = np.stack( |
| 136 | + [x_centers, y_centers], axis=1 |
| 137 | + ) # Shape (num_detections, 2) |
| 138 | + |
| 139 | + velocities = np.zeros_like(current_positions) # Shape (num_detections, 2) |
| 140 | + speeds = np.zeros(num_detections) # Shape (num_detections,) |
| 141 | + smoothed_velocities_arr = np.zeros_like(current_positions) |
| 142 | + smoothed_speeds = np.zeros(num_detections) |
| 143 | + |
| 144 | + for i, tracker_id in enumerate(detections.tracker_id): |
| 145 | + current_position = current_positions[i] |
| 146 | + |
| 147 | + # Ensure tracker_id is of type int or str |
| 148 | + tracker_id = int(tracker_id) |
| 149 | + |
| 150 | + if tracker_id in previous_positions: |
| 151 | + prev_position, prev_timestamp = previous_positions[tracker_id] |
| 152 | + delta_time = ts_current - prev_timestamp |
| 153 | + |
| 154 | + if delta_time > 0: |
| 155 | + displacement = current_position - prev_position |
| 156 | + velocity = displacement / delta_time # Pixels per second |
| 157 | + speed = np.linalg.norm( |
| 158 | + velocity |
| 159 | + ) # Speed is the magnitude of velocity vector |
| 160 | + else: |
| 161 | + velocity = np.array([0, 0]) |
| 162 | + speed = 0.0 |
| 163 | + else: |
| 164 | + velocity = np.array([0, 0]) # No previous position |
| 165 | + speed = 0.0 |
| 166 | + |
| 167 | + # Apply exponential moving average for smoothing |
| 168 | + if tracker_id in smoothed_velocities: |
| 169 | + prev_smoothed_velocity = smoothed_velocities[tracker_id] |
| 170 | + smoothed_velocity = ( |
| 171 | + smoothing_alpha * velocity |
| 172 | + + (1 - smoothing_alpha) * prev_smoothed_velocity |
| 173 | + ) |
| 174 | + else: |
| 175 | + smoothed_velocity = velocity # Initialize with current velocity |
| 176 | + |
| 177 | + smoothed_speed = np.linalg.norm(smoothed_velocity) |
| 178 | + |
| 179 | + # Store current position and timestamp for the next frame |
| 180 | + previous_positions[tracker_id] = (current_position, ts_current) |
| 181 | + smoothed_velocities[tracker_id] = smoothed_velocity |
| 182 | + |
| 183 | + # Convert velocities and speeds to meters per second if required |
| 184 | + velocity_m_s = velocity / pixels_per_meter |
| 185 | + smoothed_velocity_m_s = smoothed_velocity / pixels_per_meter |
| 186 | + speed_m_s = speed / pixels_per_meter |
| 187 | + smoothed_speed_m_s = smoothed_speed / pixels_per_meter |
| 188 | + |
| 189 | + velocities[i] = velocity_m_s |
| 190 | + speeds[i] = speed_m_s |
| 191 | + smoothed_velocities_arr[i] = smoothed_velocity_m_s |
| 192 | + smoothed_speeds[i] = smoothed_speed_m_s |
| 193 | + |
| 194 | + # Add velocity and speed to detections.data |
| 195 | + # Ensure that 'data' is a dictionary for each detection |
| 196 | + if detections.data is None: |
| 197 | + detections.data = {} |
| 198 | + |
| 199 | + # Initialize dictionaries if not present |
| 200 | + if "velocity" not in detections.data: |
| 201 | + detections.data["velocity"] = {} |
| 202 | + if "speed" not in detections.data: |
| 203 | + detections.data["speed"] = {} |
| 204 | + if "smoothed_velocity" not in detections.data: |
| 205 | + detections.data["smoothed_velocity"] = {} |
| 206 | + if "smoothed_speed" not in detections.data: |
| 207 | + detections.data["smoothed_speed"] = {} |
| 208 | + |
| 209 | + # Assign velocity data to the corresponding tracker_id |
| 210 | + detections.data["velocity"][tracker_id] = velocity_m_s.tolist() # [vx, vy] |
| 211 | + detections.data["speed"][tracker_id] = speed_m_s # Scalar |
| 212 | + detections.data["smoothed_velocity"][ |
| 213 | + tracker_id |
| 214 | + ] = smoothed_velocity_m_s.tolist() # [vx, vy] |
| 215 | + detections.data["smoothed_speed"][tracker_id] = smoothed_speed_m_s # Scalar |
| 216 | + |
| 217 | + return {OUTPUT_KEY: detections} |
0 commit comments