江苏省华建建设股份有限公司网站微信引流推广
目标跟踪之DeepSort
- 1 安装
- 1.1 代码下载与安装
- 1. 2 DeepSort检测流程
- 1.3 模型初始化流程
- 2. 模型推理
- 2.1 模型推理代码解析
- 2.2 对预测结果跟踪代码解析
- 2.3 轨迹预测
- 2.4 轨迹跟踪
- 2.5 轨迹与特征匹配
- 2.6 计算轨迹与检测的特征余弦距离
- 2.7 用轨迹与检测的马氏距离跟新cost_matrix矩阵
1 安装
参考:https://github.com/beeduchai/YOLOv8-DeepSORT-Object-Tracking
1.1 代码下载与安装
1 创建一个全新的虚拟环境
conda create -n yolov8_deepsort python=3.9
2 激活虚拟环境
conda activate yolov8_deepsort
3 在当前地址创建一个文件夹存放将要下载的YOLOv8-DeepSORT-Object-Tracking
mkdir my_yolov8_deepsort
4 跳转到yolov8_deepsort 文件夹
cd my_yolov8_deepsort
5 下载代码
git clone https://github.com/MuhammadMoinFaisal/YOLOv8-DeepSORT-Object-Tracking.git
6 安装依赖库
pip install -e .
7 在https://drive.google.com/drive/folders/1kna8eWGrSfzaR6DtNJ8_GchGgPMv3VC8下载文件,将文件夹解压放在ultralytics/yolo/v8/detect目录下
8 在ultralytics/yolo/v8/detect放一个视频,执行以下命令.首次执行以下代码,会自动取下载yolov8l.pt模型
python predict.py model=yolov8l.pt source="traffic.mp4" show=True
注释:如果出现错误,可能是库的版本不对,根据报错提示更改版本。
1. 2 DeepSort检测流程
DeepSort检测流程:
- 模型初始化
- 模型推理
# ultralytics/yolo/v8/detect/predict.py
def predict(cfg): # cfg:ultralytics/yolo/configs/default.yaml # 1.模型初始化init_tracker() cfg.model = cfg.model or "yolov8n.pt"cfg.imgsz = check_imgsz(cfg.imgsz, min_dim=2) # check image sizecfg.source = cfg.source if cfg.source is not None else ROOT / "assets"# 2. 模型推理predictor = DetectionPredictor(cfg)predictor()
1.3 模型初始化流程
- 实例化对象获取"deep_sort_pytorch/configs/deep_sort.yaml"的参数。
- 初始化跟踪器
2.1 实例化特征提取器
2.2 实例化匹配代价矩阵
2.3 实例化跟踪器
# ultralytics/yolo/v8/detect/predict.py
def init_tracker():global deepsort# 1 实例化对象获取"deep_sort_pytorch/configs/deep_sort.yaml"的参数cfg_deep = get_config() cfg_deep.merge_from_file("deep_sort_pytorch/configs/deep_sort.yaml")# 2 初始化跟踪器deepsort= DeepSort(cfg_deep.DEEPSORT.REID_CKPT,max_dist=cfg_deep.DEEPSORT.MAX_DIST, min_confidence=cfg_deep.DEEPSORT.MIN_CONFIDENCE,nms_max_overlap=cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg_deep.DEEPSORT.MAX_IOU_DISTANCE,max_age=cfg_deep.DEEPSORT.MAX_AGE, n_init=cfg_deep.DEEPSORT.N_INIT, nn_budget=cfg_deep.DEEPSORT.NN_BUDGET,use_cuda=True)# ultralytics/yolo/v8/detect/deep_sort_pytorch/deep_sort/deep_sort.py
class DeepSort(object):def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, nms_max_overlap=1.0, max_iou_distance=0.7, max_age=70, n_init=3, nn_budget=100, use_cuda=True):self.min_confidence = min_confidence # 检测物体的最小置信度self.nms_max_overlap = nms_max_overlap # NMS时iou的最大值# 2.1 实例化特征提取器self.extractor = Extractor(model_path, use_cuda=use_cuda) # 提取检测到的物体的特征,用于计算轨迹与检测框的余弦距离max_cosine_distance = max_dist # 计算距离的最大值2.2 实例化匹配代价矩阵metric = NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget) # metric计算匹配代价矩阵。在级联匹配中用欧式距离计算特级特征和检测特征的距离2.3 实例化跟踪器self.tracker = Tracker(metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)
2. 模型推理
模型推理流程:
- 准备模型和数据,对数据进行预处理
1.1 数据进行预处理 - 数据带入模型得到预测结果
- 处理预测结果
3.1 处理预测结果 - 对预测结果跟踪
2.1 模型推理代码解析
# ultralytics/yolo/engine/predictor.py@smart_inference_mode()def __call__(self, source=None, model=None):# 1.准备模型和数据,对数据进行预处理self.run_callbacks("on_predict_start")model = self.model if self.done_setup else self.setup(source, model)model.eval()self.seen, self.windows, self.dt = 0, [], (ops.Profile(), ops.Profile(), ops.Profile())self.all_outputs = []for batch in self.dataset:self.run_callbacks("on_predict_batch_start")path, im, im0s, vid_cap, s = batchvisualize = increment_path(self.save_dir / Path(path).stem, mkdir=True) if self.args.visualize else Falsewith self.dt[0]:im = self.preprocess(im)if len(im.shape) == 3:im = im[None] # expand for batch dim# 2. 数据带入模型得到预测结果# Inferencewith self.dt[1]:preds = model(im, augment=self.args.augment, visualize=visualize)# 3. 处理预测结果# postprocesswith self.dt[2]:preds = self.postprocess(preds, im, im0s)for i in range(len(im)):if self.webcam:path, im0s = path[i], im0s[i]p = Path(path)# 4. 对预测结果跟踪s += self.write_results(i, preds, (p, im, im0s))if self.args.show:self.show(p)if self.args.save:self.save_preds(vid_cap, i, str(self.save_dir / p.name))# Print time (inference-only)LOGGER.info(f"{s}{'' if len(preds) else '(no detections), '}{self.dt[1].dt * 1E3:.1f}ms")self.run_callbacks("on_predict_batch_end")# Print resultst = tuple(x.t / self.seen * 1E3 for x in self.dt) # speeds per imageLOGGER.info(f'Speed: %.1fms pre-process, %.1fms inference, %.1fms postprocess per image at shape {(1, 3, *self.imgsz)}'% t)if self.args.save_txt or self.args.save:s = f"\n{len(list(self.save_dir.glob('labels/*.txt')))} labels saved to {self.save_dir / 'labels'}" if self.args.save_txt else ''LOGGER.info(f"Results saved to {colorstr('bold', self.save_dir)}{s}")self.run_callbacks("on_predict_end")return self.all_outputs# 1.1 数据进行预处理 ultralytics/yolo/v8/detect/predict.pydef preprocess(self, img):img = torch.from_numpy(img).to(self.model.device) # 图片数据转换成tensor格式img = img.half() if self.model.fp16 else img.float() # uint8 to fp16/32 选择半精度还是双精度img /= 255 # 0 - 255 to 0.0 - 1.0 # 图片数据缩放到0~1之间return img# 3.1 处理预测结果 ultralytics/yolo/v8/detect/predict.pydef postprocess(self, preds, img, orig_img):# 1 非极大值抑制preds = ops.non_max_suppression(preds,self.args.conf,self.args.iou,agnostic=self.args.agnostic_nms,max_det=self.args.max_det) for i, pred in enumerate(preds):shape = orig_img[i].shape if self.webcam else orig_img.shape# 2 把检测到的框映射到原图pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], shape).round()return preds
2.2 对预测结果跟踪代码解析
把检测框的中心点宽高、置信度、类别、原图输入 deepsort.update()中进行跟踪,跟踪代码流程如下:
- 提取特征。
- 轨迹预测。
- 轨迹跟踪。
# ultralytics/yolo/v8/detect/predict.py-->def write_results(self, idx, preds, batch)-->outputs = deepsort.update(xywhs, confss, oids, im0)--> self.tracker.predict() self.tracker.update(detections)def update(self, bbox_xywh, confidences, oids, ori_img):self.height, self.width = ori_img.shape[:2]# generate detections # 1. 提取特征features = self._get_features(bbox_xywh, ori_img) # 提取检测框对应物体的特征bbox_tlwh = self._xywh_to_tlwh(bbox_xywh) # 把框由中心点宽高的格式转换为左上角坐标和宽高的格式detections = [Detection(bbox_tlwh[i], conf, features[i],oid) for i, (conf,oid) in enumerate(zip(confidences,oids)) if conf > self.min_confidence] # 删除资信度小于阈值的检测框,并把检测框的特征添加到detections中# run on non-maximum supressionboxes = np.array([d.tlwh for d in detections])scores = np.array([d.confidence for d in detections])# update tracker# 2. 轨迹预测self.tracker.predict()# 3. 轨迹跟踪self.tracker.update(detections)# output bbox identitiesoutputs = []for track in self.tracker.tracks:if not track.is_confirmed() or track.time_since_update > 1:continue box = track.to_tlwh()x1, y1, x2, y2 = self._tlwh_to_xyxy(box)track_id = track.track_idtrack_oid = track.oidoutputs.append(np.array([x1, y1, x2, y2, track_id, track_oid], dtype=np.int))if len(outputs) > 0:outputs = np.stack(outputs, axis=0)return outputs
2.3 轨迹预测
ultralytics/yolo/v8/detect/deep_sort_pytorch/deep_sort/sort/kalman_filter.py
self.tracker.predict()–>track.predict(self.kf)–>def predict(self, mean, covariance)
轨迹预测预测流程:
- 状态转移矩阵。
- 根据状态转移矩阵获取下一状态和其协方差。
def predict(self, mean, covariance):"""Run Kalman filter prediction step.Parameters----------mean : ndarrayThe 8 dimensional mean vector of the object state at the previoustime step.covariance : ndarrayThe 8x8 dimensional covariance matrix of the object state at theprevious time step.Returns-------(ndarray, ndarray)Returns the mean vector and covariance matrix of the predictedstate. Unobserved velocities are initialized to 0 mean."""std_pos = [self._std_weight_position * mean[3],self._std_weight_position * mean[3],1e-2,self._std_weight_position * mean[3]]std_vel = [self._std_weight_velocity * mean[3],self._std_weight_velocity * mean[3],1e-5,self._std_weight_velocity * mean[3]]# 1. 状态转移矩阵motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))# 2. 根据状态转移矩阵获取下一状态和其协方差mean = np.dot(self._motion_mat, mean)covariance = np.linalg.multi_dot((self._motion_mat, covariance, self._motion_mat.T)) + motion_covreturn mean, covariance
2.4 轨迹跟踪
ultralytics/yolo/v8/detect/deep_sort_pytorch/deep_sort/sort/tracker.py
self.tracker.update(detections)
轨迹跟踪步骤:
- 轨迹与特征匹配
- 更新匹配轨迹、未匹配轨迹和未匹配检测
- 更新轨迹的特征
def update(self, detections):"""Perform measurement update and track management.Parameters---------- bbox_tlwh[i], conf, features[i],oiddetections : List[deep_sort.detection.Detection]A list of detections at the current time step."""# Run matching cascade. 1. 轨迹与特征 级联匹配和IOU匹配matches, unmatched_tracks, unmatched_detections = \self._match(detections)# Update track set. 2. 更新匹配轨迹、未匹配轨迹和未匹配检测for track_idx, detection_idx in matches:self.tracks[track_idx].update(self.kf, detections[detection_idx]) # 对匹配轨迹更新得到当前最优轨迹for track_idx in unmatched_tracks:self.tracks[track_idx].mark_missed() # 对未匹配轨迹进行标识for detection_idx in unmatched_detections:self._initiate_track(detections[detection_idx]) # 对新检测框初始化轨迹self.tracks = [t for t in self.tracks if not t.is_deleted()] # 剔除掉删除的轨迹# Update distance metric. 3. 更新轨迹的历史特征active_targets = [t.track_id for t in self.tracks if t.is_confirmed()] # 存放确认状态轨迹的idfeatures, targets = [], [] # 分别存放当前所有轨迹的特征和idfor track in self.tracks:if not track.is_confirmed():continuefeatures += track.featurestargets += [track.track_id for _ in track.features]track.features = []self.metric.partial_fit( #np.asarray(features), np.asarray(targets), active_targets)
2.5 轨迹与特征匹配
轨迹与特征匹配步骤:
- 把轨迹分为确认状态和非确认状态。
- 级联匹配。通过级联匹配得到确认匹配轨迹和检测的id
matches_a,未匹配轨迹unmatched_tracks_a, 未匹配检测unmatched_detections - IOU匹配。把级联匹配步骤中只有一次未匹配的轨迹并如非确认轨迹用于IOU匹配;保留级联匹配步骤中未匹配的轨迹中连续2帧或2帧以上没有匹配的轨迹。
- 更新级联匹配和IOU匹配的结果。
def _match(self, detections):def gated_metric(tracks, dets, track_indices, detection_indices):features = np.array([dets[i].feature for i in detection_indices]) # 检测框的特征targets = np.array([tracks[i].track_id for i in track_indices]) # 轨迹的idcost_matrix = self.metric.distance(features, targets) # 计算轨迹与检测的特征余弦距离cost_matrix = linear_assignment.gate_cost_matrix( # 用轨迹与检测的马氏距离跟新cost_matrix矩阵self.kf, cost_matrix, tracks, dets, track_indices,detection_indices)return cost_matrix# Split track set into confirmed and unconfirmed tracks.# 1 把轨迹分为确认状态和非确认状态confirmed_tracks = [i for i, t in enumerate(self.tracks) if t.is_confirmed()]unconfirmed_tracks = [i for i, t in enumerate(self.tracks) if not t.is_confirmed()]# Associate confirmed tracks using appearance features. # 2 级联匹配matches_a, unmatched_tracks_a, unmatched_detections = \linear_assignment.matching_cascade(gated_metric, self.metric.matching_threshold, self.max_age,self.tracks, detections, confirmed_tracks)# Associate remaining tracks together with unconfirmed tracks using IOU.# 3 IOU匹配iou_track_candidates = unconfirmed_tracks + [k for k in unmatched_tracks_a ifself.tracks[k].time_since_update == 1] # 如果级联未匹配轨迹上一帧匹配成功,这一帧匹配失败,则把其添加到不确认轨迹中unmatched_tracks_a = [k for k in unmatched_tracks_a ifself.tracks[k].time_since_update != 1] # 跟新unmatched_tracks_a,只保留大于等于连续两帧没有被匹配上的matches_b, unmatched_tracks_b, unmatched_detections = \linear_assignment.min_cost_matching(iou_matching.iou_cost, self.max_iou_distance, self.tracks,detections, iou_track_candidates, unmatched_detections) # IOU匹配# 4 更新级联匹配和IOU匹配的结果matches = matches_a + matches_bunmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))return matches, unmatched_tracks, unmatched_detectionsdef _initiate_track(self, detection):mean, covariance = self.kf.initiate(detection.to_xyah()) # 根据新检测到的框初始化轨迹的数值和协方差self.tracks.append(Track(mean, covariance, self._next_id, self.n_init, self.max_age,detection.oid,detection.feature)) # 添加新的轨迹self._next_id += 1# 2 级联匹配
级联匹配步骤:
2.1 获取(1 + level)次没有被匹配上的轨迹
2.2 计算轨迹特征和检测特征的代价矩阵2.2.1 计算轨迹与检测的特征余弦距离2.2.2 用轨迹与检测的马氏距离跟新cost_matrix矩阵def matching_cascade(distance_metric, max_distance, cascade_depth, tracks, detections,track_indices=None, detection_indices=None):"""Run matching cascade.Parameters----------distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarrayThe distance metric is given a list of tracks and detections as well asa list of N track indices and M detection indices. The metric shouldreturn the NxM dimensional cost matrix, where element (i, j) is theassociation cost between the i-th track in the given track indices andthe j-th detection in the given detection indices.max_distance : floatGating threshold. Associations with cost larger than this value aredisregarded.cascade_depth: intThe cascade depth, should be se to the maximum track age.tracks : List[track.Track]A list of predicted tracks at the current time step.detections : List[detection.Detection]A list of detections at the current time step.track_indices : Optional[List[int]]List of track indices that maps rows in `cost_matrix` to tracks in`tracks` (see description above). Defaults to all tracks.detection_indices : Optional[List[int]]List of detection indices that maps columns in `cost_matrix` todetections in `detections` (see description above). Defaults to alldetections.Returns-------(List[(int, int)], List[int], List[int])Returns a tuple with the following three entries:* A list of matched track and detection indices.* A list of unmatched track indices.* A list of unmatched detection indices."""if track_indices is None:track_indices = list(range(len(tracks)))if detection_indices is None:detection_indices = list(range(len(detections)))unmatched_detections = detection_indices # 初始所有的检测都没有匹配matches = []for level in range(cascade_depth):if len(unmatched_detections) == 0: # No detections leftbreak# 1 获取(1 + level)次没有被匹配上的轨迹track_indices_l = [k for k in track_indicesif tracks[k].time_since_update == 1 + level ]if len(track_indices_l) == 0: # Nothing to match at this levelcontinue# 2 计算轨迹特征和检测特征的马氏距离matches_l, _, unmatched_detections = \min_cost_matching(distance_metric, max_distance, tracks, detections, # max_distance阈值track_indices_l, unmatched_detections)matches += matches_lunmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))return matches, unmatched_tracks, unmatched_detections
2.6 计算轨迹与检测的特征余弦距离
对检测特征和历史的轨迹特征先归一化,再1减去特征的矩阵相乘得到余弦距离
def _cosine_distance(a, b, data_is_normalized=False):"""Compute pair-wise cosine distance between points in `a` and `b`.Parameters----------a : array_likeAn NxM matrix of N samples of dimensionality M.b : array_likeAn LxM matrix of L samples of dimensionality M.data_is_normalized : Optional[bool]If True, assumes rows in a and b are unit length vectors.Otherwise, a and b are explicitly normalized to lenght 1.Returns-------ndarrayReturns a matrix of size len(a), len(b) such that element (i, j)contains the squared distance between `a[i]` and `b[j]`."""if not data_is_normalized:a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True) # a是轨迹特征归一化b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True) # b是检测特征归一化return 1. - np.dot(a, b.T)
2.7 用轨迹与检测的马氏距离跟新cost_matrix矩阵
根据公式计算马氏距离
def gate_cost_matrix(kf, cost_matrix, tracks, detections, track_indices, detection_indices,gated_cost=INFTY_COST, only_position=False):"""Invalidate infeasible entries in cost matrix based on the statedistributions obtained by Kalman filtering.Parameters----------kf : The Kalman filter.cost_matrix : ndarrayThe NxM dimensional cost matrix, where N is the number of track indicesand M is the number of detection indices, such that entry (i, j) is theassociation cost between `tracks[track_indices[i]]` and`detections[detection_indices[j]]`.tracks : List[track.Track]A list of predicted tracks at the current time step.detections : List[detection.Detection]A list of detections at the current time step.track_indices : List[int]List of track indices that maps rows in `cost_matrix` to tracks in`tracks` (see description above).detection_indices : List[int]List of detection indices that maps columns in `cost_matrix` todetections in `detections` (see description above).gated_cost : Optional[float]Entries in the cost matrix corresponding to infeasible associations areset this value. Defaults to a very large value.only_position : Optional[bool]If True, only the x, y position of the state distribution is consideredduring gating. Defaults to False.Returns-------ndarrayReturns the modified cost matrix."""gating_dim = 2 if only_position else 4gating_threshold = kalman_filter.chi2inv95[gating_dim]measurements = np.asarray([detections[i].to_xyah() for i in detection_indices])for row, track_idx in enumerate(track_indices):track = tracks[track_idx] # gating_distance马氏距离gating_distance = kf.gating_distance( # track.mean, track.covariance是轨迹的均值和协方差。measurements是检测框track.mean, track.covariance, measurements, only_position)cost_matrix[row, gating_distance > gating_threshold] = gated_cost # 如果马氏距离大于阈值,则赋值为极大值。return cost_matrix# 马氏距离def gating_distance(self, mean, covariance, measurements,only_position=False):"""Compute gating distance between state distribution and measurements.A suitable distance threshold can be obtained from `chi2inv95`. If`only_position` is False, the chi-square distribution has 4 degrees offreedom, otherwise 2.Parameters----------mean : ndarrayMean vector over the state distribution (8 dimensional).covariance : ndarrayCovariance of the state distribution (8x8 dimensional).measurements : ndarrayAn Nx4 dimensional matrix of N measurements, each informat (x, y, a, h) where (x, y) is the bounding box centerposition, a the aspect ratio, and h the height.only_position : Optional[bool]If True, distance computation is done with respect to the boundingbox center position only.Returns-------ndarrayReturns an array of length N, where the i-th element contains thesquared Mahalanobis distance between (mean, covariance) and`measurements[i]`."""mean, covariance = self.project(mean, covariance) # 轨迹特征if only_position: # 计算马氏距离用几个值mean, covariance = mean[:2], covariance[:2, :2]measurements = measurements[:, :2]cholesky_factor = np.linalg.cholesky(covariance) # 对轨迹的协方差分解成下三角矩阵和,前提covariance正定d = measurements - mean #(det-track).T *track's covariance.INV *(det-track)z = scipy.linalg.solve_triangular( # 求线性方程组的解 mash = d.T *covariance^{-1} *d = d.T *(L.T*L) *d = (Ld).T*(Ld)cholesky_factor, d.T, lower=True, check_finite=False, # L.T*? = d.T--> ? = ((L.T)^{-1} d.T=(dL^{-1}).Toverwrite_b=True)squared_maha = np.sum(z * z, axis=0)return squared_maha