ROS 2从入门到精通系列(十八):视觉识别系统 - 目标检测与跟踪
ROS 2视觉识别系统:目标检测与跟踪 本文介绍了基于ROS 2构建的完整视觉处理系统架构,重点实现YOLOv8目标检测功能。系统包含硬件层(RGB/深度摄像头)、采集层(图像获取与标定)、检测层(YOLO等模型)、跟踪层(卡尔曼滤波等)和识别层(分类/分割)。通过Python节点实现了YOLOv8模型的实时目标检测,包括图像订阅、目标识别、结果处理和可视化输出功能,可输出检测框、类别标签和置信度
·
ROS 2从入门到精通系列(十八):视觉识别系统 - 目标检测与跟踪
构建完整的计算机视觉系统,从图像获取到目标跟踪。
引言
视觉是机器人最重要的感知手段。现代视觉系统涉及:
- 目标检测:识别图像中的物体位置
- 目标跟踪:在视频中连续跟踪目标
- 语义分割:像素级别的分类
- 姿态估计:识别物体方向
本篇将展示如何使用深度学习模型进行实时视觉识别。
一、视觉处理架构
1.1 完整的视觉系统架构
二、YOLOv8目标检测
2.1 安装YOLOv8
# 安装ultralytics
pip install ultralytics
# 安装ROS2相关包
sudo apt install ros-humble-cv-bridge
sudo apt install python3-opencv-python
2.2 YOLOv8检测节点
#!/usr/bin/env python3
"""
YOLOv8实时目标检测节点
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from vision_msgs.msg import Detection2DArray, Detection2D, BoundingBox2D
from cv_bridge import CvBridge
from ultralytics import YOLO
import cv2
import numpy as np
class YOLODetector(Node):
def __init__(self):
super().__init__('yolo_detector')
# 加载YOLOv8模型
self.model = YOLO('yolov8m.pt') # 中等大小模型
# 参数
self.declare_parameter('confidence_threshold', 0.5)
self.declare_parameter('iou_threshold', 0.45)
self.conf_threshold = self.get_parameter('confidence_threshold').value
self.iou_threshold = self.get_parameter('iou_threshold').value
# OpenCV桥接
self.bridge = CvBridge()
# 发布检测结果
self.detection_pub = self.create_publisher(
Detection2DArray, '/detections', 10)
self.debug_pub = self.create_publisher(
Image, '/detection_debug', 10)
# 订阅图像
self.image_sub = self.create_subscription(
Image, '/camera/image_raw', self.image_callback, 10)
self.get_logger().info('YOLOv8检测器已启动')
def image_callback(self, msg):
"""图像回调"""
# ROS消息转OpenCV
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
# 运行检测
results = self.model(cv_image, conf=self.conf_threshold,
iou=self.iou_threshold)
# 处理检测结果
detections = self.process_detections(results[0], msg.header)
# 发布检测结果
self.detection_pub.publish(detections)
# 发布调试图像
debug_image = self.draw_detections(cv_image, results[0])
debug_msg = self.bridge.cv2_to_imgmsg(debug_image, encoding='bgr8')
self.debug_pub.publish(debug_msg)
def process_detections(self, result, header):
"""处理检测结果"""
detections_msg = Detection2DArray()
detections_msg.header = header
if result.boxes is not None:
for box in result.boxes:
detection = Detection2D()
detection.header = header
# 边界框
x1, y1, x2, y2 = box.xyxy[0]
detection.bbox.center.x = float((x1 + x2) / 2)
detection.bbox.center.y = float((y1 + y2) / 2)
detection.bbox.size_x = float(x2 - x1)
detection.bbox.size_y = float(y2 - y1)
# 类别和置信度
if box.conf is not None:
confidence = float(box.conf)
class_id = int(box.cls)
class_name = self.model.names[class_id]
detection.results[0].hypothesis.class_id = class_name
detection.results[0].hypothesis.score = confidence
detections_msg.detections.append(detection)
return detections_msg
@staticmethod
def draw_detections(image, result):
"""在图像上绘制检测框"""
annotated_frame = image.copy()
if result.boxes is not None:
for box in result.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = float(box.conf)
class_id = int(box.cls)
class_name = result.names[class_id]
# 绘制边界框
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2),
(0, 255, 0), 2)
# 绘制标签
label = f'{class_name} {conf:.2f}'
cv2.putText(annotated_frame, label, (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
return annotated_frame
def main(args=None):
rclpy.init(args=args)
detector = YOLODetector()
rclpy.spin(detector)
rclpy.shutdown()
if __name__ == '__main__':
main()
三、多目标跟踪(MOT)
3.1 SORT算法实现
#!/usr/bin/env python3
"""
SORT多目标跟踪算法
"""
import numpy as np
from scipy.optimize import linear_sum_assignment
import cv2
class KalmanBoxTracker:
"""使用卡尔曼滤波的边界框跟踪器"""
def __init__(self, bbox):
"""
初始化跟踪器
bbox: [x1, y1, x2, y2] 边界框坐标
"""
# 转换为[cx, cy, s, r]格式
# cx, cy: 中心坐标
# s: 面积
# r: 宽高比
x1, y1, x2, y2 = bbox
self.cx = (x1 + x2) / 2
self.cy = (y1 + y2) / 2
self.s = (x2 - x1) * (y2 - y1)
self.r = (x2 - x1) / (y2 - y1)
# 卡尔曼滤波状态
self.state = np.array([self.cx, self.cy, self.s, self.r, 0, 0, 0])
# 协方差矩阵
self.P = np.eye(7) * 10
# 过程噪声和观测噪声
self.Q = np.eye(7) * 0.1
self.R = np.eye(4) * 1
self.id = None
self.hits = 0
self.age = 0
def predict(self):
"""卡尔曼滤波预测"""
# 简单的线性模型:x_{k+1} = A * x_k + B * u_k
self.state[:4] += self.state[4:7] * 1 # 加上速度
# 协方差预测:P = A*P*A^T + Q
self.P = self.P + self.Q
self.age += 1
def update(self, bbox):
"""卡尔曼滤波更新"""
x1, y1, x2, y2 = bbox
z = np.array([
(x1 + x2) / 2,
(y1 + y2) / 2,
(x2 - x1) * (y2 - y1),
(x2 - x1) / (y2 - y1)
])
# 更新状态
innovation = z - self.state[:4]
self.state[:4] += innovation * 0.5
# 更新协方差
self.P = self.P - np.outer(self.P @ np.eye(4), np.eye(4) @ self.P)
self.hits += 1
def get_bbox(self):
"""获取当前边界框"""
cx, cy, s, r = self.state[:4]
w = np.sqrt(s * r)
h = s / w
x1 = cx - w / 2
y1 = cy - h / 2
x2 = cx + w / 2
y2 = cy + h / 2
return np.array([x1, y1, x2, y2])
class SORTTracker:
"""SORT多目标跟踪器"""
def __init__(self, max_age=30, min_hits=3):
self.trackers = []
self.frame_count = 0
self.max_age = max_age
self.min_hits = min_hits
self.next_id = 1
def update(self, detections):
"""
更新跟踪器
detections: N×5 数组,每行是[x1,y1,x2,y2,conf]
"""
self.frame_count += 1
# 预测
for tracker in self.trackers:
tracker.predict()
# 关联
matched, unmatched_dets, unmatched_trks = \
self.associate_detections_to_trackers(detections)
# 更新已匹配的跟踪器
for d, t in matched:
self.trackers[t].update(detections[d, :4])
# 创建新跟踪器
for d in unmatched_dets:
trk = KalmanBoxTracker(detections[d, :4])
trk.id = self.next_id
self.next_id += 1
self.trackers.append(trk)
# 移除旧跟踪器
self.trackers = [t for t in self.trackers
if t.age < self.max_age]
# 获取结果
results = []
for t in self.trackers:
if t.hits >= self.min_hits or self.frame_count <= self.min_hits:
results.append({
'bbox': t.get_bbox(),
'id': t.id,
'conf': t.hits / self.frame_count
})
return results
@staticmethod
def associate_detections_to_trackers(detections, iou_threshold=0.3):
"""
匹配检测和跟踪的IoU关联
"""
n_detections = len(detections)
n_trackers = len(trackers)
if n_trackers == 0 or n_detections == 0:
return np.empty((0, 2)), np.arange(n_detections), np.empty((0,))
# 计算IoU矩阵
iou_matrix = np.zeros((n_detections, n_trackers))
for d, det in enumerate(detections):
for t, trk in enumerate(trackers):
iou = compute_iou(det[:4], trk.get_bbox())
iou_matrix[d, t] = iou
# 匈牙利算法
det_idx, trk_idx = linear_sum_assignment(-iou_matrix)
matched = []
unmatched_dets = []
unmatched_trks = []
for d, t in zip(det_idx, trk_idx):
if iou_matrix[d, t] < iou_threshold:
unmatched_dets.append(d)
unmatched_trks.append(t)
else:
matched.append([d, t])
for d in range(n_detections):
if d not in det_idx:
unmatched_dets.append(d)
for t in range(n_trackers):
if t not in trk_idx:
unmatched_trks.append(t)
return np.array(matched), np.array(unmatched_dets), np.array(unmatched_trks)
def compute_iou(box1, box2):
"""计算两个边界框的IoU"""
x1_min, y1_min, x1_max, y1_max = box1
x2_min, y2_min, x2_max, y2_max = box2
# 交集
inter_xmin = max(x1_min, x2_min)
inter_ymin = max(y1_min, y2_min)
inter_xmax = min(x1_max, x2_max)
inter_ymax = min(y1_max, y2_max)
if inter_xmin >= inter_xmax or inter_ymin >= inter_ymax:
return 0.0
inter_area = (inter_xmax - inter_xmin) * (inter_ymax - inter_ymin)
# 并集
box1_area = (x1_max - x1_min) * (y1_max - y1_min)
box2_area = (x2_max - x2_min) * (y2_max - y2_min)
union_area = box1_area + box2_area - inter_area
return inter_area / union_area
四、融合跟踪节点
#!/usr/bin/env python3
"""
融合YOLO检测和SORT跟踪的节点
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from vision_msgs.msg import Detection2DArray
from cv_bridge import CvBridge
import cv2
import numpy as np
class TrackingNode(Node):
def __init__(self):
super().__init__('tracking_node')
self.detector = YOLODetector()
self.tracker = SORTTracker()
self.bridge = CvBridge()
# 订阅图像
self.image_sub = self.create_subscription(
Image, '/camera/image_raw', self.image_callback, 10)
# 发布跟踪结果
self.tracking_pub = self.create_publisher(
Detection2DArray, '/tracks', 10)
def image_callback(self, msg):
"""处理图像"""
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
# 目标检测
results = self.detector.model(cv_image)
detections = self.extract_detections(results[0])
# 多目标跟踪
tracks = self.tracker.update(detections)
# 发布跟踪结果
self.publish_tracks(tracks, msg.header)
# 可视化
self.visualize(cv_image, tracks)
@staticmethod
def extract_detections(result):
"""从YOLO结果提取检测"""
detections = []
if result.boxes is not None:
for box in result.boxes:
x1, y1, x2, y2 = box.xyxy[0]
conf = box.conf
detections.append([x1, y1, x2, y2, conf])
return np.array(detections) if detections else np.empty((0, 5))
def publish_tracks(self, tracks, header):
"""发布跟踪结果"""
# 创建消息...
pass
def visualize(self, image, tracks):
"""可视化跟踪结果"""
for track in tracks:
bbox = track['bbox']
track_id = track['id']
x1, y1, x2, y2 = map(int, bbox)
# 绘制边界框
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
# 绘制ID
cv2.putText(image, f'ID:{track_id}', (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# 显示
cv2.imshow('Tracking', image)
cv2.waitKey(1)
def main(args=None):
rclpy.init(args=args)
node = TrackingNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
五、本项目要点总结
✅ 目标检测:
- YOLOv8实时检测
- 置信度和NMS筛选
- 多尺度特征
✅ 多目标跟踪:
- 卡尔曼滤波预测
- 匈牙利算法匹配
- ID分配和管理
✅ 融合系统:
- 检测和跟踪集成
- 实时性能
- 可视化输出
✅ 优化技巧:
- 模型量化加速
- 批处理
- 硬件加速(GPU)
下一篇预告:《ROS2从入门到精通系列(十九):多机器人协作 - 分布式系统与协调》
️ 视觉是机器人最强大的感知工具。掌握这套系统,你就掌握了机器人的"眼睛"!
更多推荐
所有评论(0)