ROS 2从入门到精通系列(十八):视觉识别系统 - 目标检测与跟踪

构建完整的计算机视觉系统,从图像获取到目标跟踪。


引言

视觉是机器人最重要的感知手段。现代视觉系统涉及:

  • 目标检测:识别图像中的物体位置
  • 目标跟踪:在视频中连续跟踪目标
  • 语义分割:像素级别的分类
  • 姿态估计:识别物体方向

本篇将展示如何使用深度学习模型进行实时视觉识别。


一、视觉处理架构

1.1 完整的视觉系统架构

输出层

检测框

轨迹ID

标签和置信度

识别层

分类
什么物体

分割
像素标签

姿态
物体方向

跟踪层

卡尔曼滤波
平滑轨迹

深度学习
外观特征

SORT
匈牙利匹配

检测层

YOLO
实时检测

Faster R-CNN
精准检测

SSD
单阶段

采集层

图像采集
cv2/rclpy

相机标定
内外参

图像矫正
去畸变

硬件层

RGB摄像头
USB/GigE

深度摄像头
RGB-D


二、YOLOv8目标检测

2.1 安装YOLOv8

# 安装ultralytics
pip install ultralytics

# 安装ROS2相关包
sudo apt install ros-humble-cv-bridge
sudo apt install python3-opencv-python

2.2 YOLOv8检测节点

#!/usr/bin/env python3
"""
YOLOv8实时目标检测节点
"""

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from vision_msgs.msg import Detection2DArray, Detection2D, BoundingBox2D
from cv_bridge import CvBridge
from ultralytics import YOLO
import cv2
import numpy as np

class YOLODetector(Node):
    def __init__(self):
        super().__init__('yolo_detector')

        # 加载YOLOv8模型
        self.model = YOLO('yolov8m.pt')  # 中等大小模型

        # 参数
        self.declare_parameter('confidence_threshold', 0.5)
        self.declare_parameter('iou_threshold', 0.45)

        self.conf_threshold = self.get_parameter('confidence_threshold').value
        self.iou_threshold = self.get_parameter('iou_threshold').value

        # OpenCV桥接
        self.bridge = CvBridge()

        # 发布检测结果
        self.detection_pub = self.create_publisher(
            Detection2DArray, '/detections', 10)
        self.debug_pub = self.create_publisher(
            Image, '/detection_debug', 10)

        # 订阅图像
        self.image_sub = self.create_subscription(
            Image, '/camera/image_raw', self.image_callback, 10)

        self.get_logger().info('YOLOv8检测器已启动')

    def image_callback(self, msg):
        """图像回调"""
        # ROS消息转OpenCV
        cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')

        # 运行检测
        results = self.model(cv_image, conf=self.conf_threshold,
                           iou=self.iou_threshold)

        # 处理检测结果
        detections = self.process_detections(results[0], msg.header)

        # 发布检测结果
        self.detection_pub.publish(detections)

        # 发布调试图像
        debug_image = self.draw_detections(cv_image, results[0])
        debug_msg = self.bridge.cv2_to_imgmsg(debug_image, encoding='bgr8')
        self.debug_pub.publish(debug_msg)

    def process_detections(self, result, header):
        """处理检测结果"""
        detections_msg = Detection2DArray()
        detections_msg.header = header

        if result.boxes is not None:
            for box in result.boxes:
                detection = Detection2D()
                detection.header = header

                # 边界框
                x1, y1, x2, y2 = box.xyxy[0]
                detection.bbox.center.x = float((x1 + x2) / 2)
                detection.bbox.center.y = float((y1 + y2) / 2)
                detection.bbox.size_x = float(x2 - x1)
                detection.bbox.size_y = float(y2 - y1)

                # 类别和置信度
                if box.conf is not None:
                    confidence = float(box.conf)
                    class_id = int(box.cls)
                    class_name = self.model.names[class_id]

                    detection.results[0].hypothesis.class_id = class_name
                    detection.results[0].hypothesis.score = confidence

                detections_msg.detections.append(detection)

        return detections_msg

    @staticmethod
    def draw_detections(image, result):
        """在图像上绘制检测框"""
        annotated_frame = image.copy()

        if result.boxes is not None:
            for box in result.boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                conf = float(box.conf)
                class_id = int(box.cls)
                class_name = result.names[class_id]

                # 绘制边界框
                cv2.rectangle(annotated_frame, (x1, y1), (x2, y2),
                            (0, 255, 0), 2)

                # 绘制标签
                label = f'{class_name} {conf:.2f}'
                cv2.putText(annotated_frame, label, (x1, y1 - 10),
                          cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        return annotated_frame

def main(args=None):
    rclpy.init(args=args)
    detector = YOLODetector()
    rclpy.spin(detector)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

三、多目标跟踪(MOT)

3.1 SORT算法实现

#!/usr/bin/env python3
"""
SORT多目标跟踪算法
"""

import numpy as np
from scipy.optimize import linear_sum_assignment
import cv2

class KalmanBoxTracker:
    """使用卡尔曼滤波的边界框跟踪器"""

    def __init__(self, bbox):
        """
        初始化跟踪器

        bbox: [x1, y1, x2, y2] 边界框坐标
        """
        # 转换为[cx, cy, s, r]格式
        # cx, cy: 中心坐标
        # s: 面积
        # r: 宽高比
        x1, y1, x2, y2 = bbox
        self.cx = (x1 + x2) / 2
        self.cy = (y1 + y2) / 2
        self.s = (x2 - x1) * (y2 - y1)
        self.r = (x2 - x1) / (y2 - y1)

        # 卡尔曼滤波状态
        self.state = np.array([self.cx, self.cy, self.s, self.r, 0, 0, 0])

        # 协方差矩阵
        self.P = np.eye(7) * 10

        # 过程噪声和观测噪声
        self.Q = np.eye(7) * 0.1
        self.R = np.eye(4) * 1

        self.id = None
        self.hits = 0
        self.age = 0

    def predict(self):
        """卡尔曼滤波预测"""
        # 简单的线性模型:x_{k+1} = A * x_k + B * u_k
        self.state[:4] += self.state[4:7] * 1  # 加上速度

        # 协方差预测:P = A*P*A^T + Q
        self.P = self.P + self.Q

        self.age += 1

    def update(self, bbox):
        """卡尔曼滤波更新"""
        x1, y1, x2, y2 = bbox
        z = np.array([
            (x1 + x2) / 2,
            (y1 + y2) / 2,
            (x2 - x1) * (y2 - y1),
            (x2 - x1) / (y2 - y1)
        ])

        # 更新状态
        innovation = z - self.state[:4]
        self.state[:4] += innovation * 0.5

        # 更新协方差
        self.P = self.P - np.outer(self.P @ np.eye(4), np.eye(4) @ self.P)

        self.hits += 1

    def get_bbox(self):
        """获取当前边界框"""
        cx, cy, s, r = self.state[:4]
        w = np.sqrt(s * r)
        h = s / w
        x1 = cx - w / 2
        y1 = cy - h / 2
        x2 = cx + w / 2
        y2 = cy + h / 2
        return np.array([x1, y1, x2, y2])


class SORTTracker:
    """SORT多目标跟踪器"""

    def __init__(self, max_age=30, min_hits=3):
        self.trackers = []
        self.frame_count = 0
        self.max_age = max_age
        self.min_hits = min_hits
        self.next_id = 1

    def update(self, detections):
        """
        更新跟踪器

        detections: N×5 数组,每行是[x1,y1,x2,y2,conf]
        """
        self.frame_count += 1

        # 预测
        for tracker in self.trackers:
            tracker.predict()

        # 关联
        matched, unmatched_dets, unmatched_trks = \
            self.associate_detections_to_trackers(detections)

        # 更新已匹配的跟踪器
        for d, t in matched:
            self.trackers[t].update(detections[d, :4])

        # 创建新跟踪器
        for d in unmatched_dets:
            trk = KalmanBoxTracker(detections[d, :4])
            trk.id = self.next_id
            self.next_id += 1
            self.trackers.append(trk)

        # 移除旧跟踪器
        self.trackers = [t for t in self.trackers
                        if t.age < self.max_age]

        # 获取结果
        results = []
        for t in self.trackers:
            if t.hits >= self.min_hits or self.frame_count <= self.min_hits:
                results.append({
                    'bbox': t.get_bbox(),
                    'id': t.id,
                    'conf': t.hits / self.frame_count
                })

        return results

    @staticmethod
    def associate_detections_to_trackers(detections, iou_threshold=0.3):
        """
        匹配检测和跟踪的IoU关联
        """
        n_detections = len(detections)
        n_trackers = len(trackers)

        if n_trackers == 0 or n_detections == 0:
            return np.empty((0, 2)), np.arange(n_detections), np.empty((0,))

        # 计算IoU矩阵
        iou_matrix = np.zeros((n_detections, n_trackers))

        for d, det in enumerate(detections):
            for t, trk in enumerate(trackers):
                iou = compute_iou(det[:4], trk.get_bbox())
                iou_matrix[d, t] = iou

        # 匈牙利算法
        det_idx, trk_idx = linear_sum_assignment(-iou_matrix)

        matched = []
        unmatched_dets = []
        unmatched_trks = []

        for d, t in zip(det_idx, trk_idx):
            if iou_matrix[d, t] < iou_threshold:
                unmatched_dets.append(d)
                unmatched_trks.append(t)
            else:
                matched.append([d, t])

        for d in range(n_detections):
            if d not in det_idx:
                unmatched_dets.append(d)

        for t in range(n_trackers):
            if t not in trk_idx:
                unmatched_trks.append(t)

        return np.array(matched), np.array(unmatched_dets), np.array(unmatched_trks)


def compute_iou(box1, box2):
    """计算两个边界框的IoU"""
    x1_min, y1_min, x1_max, y1_max = box1
    x2_min, y2_min, x2_max, y2_max = box2

    # 交集
    inter_xmin = max(x1_min, x2_min)
    inter_ymin = max(y1_min, y2_min)
    inter_xmax = min(x1_max, x2_max)
    inter_ymax = min(y1_max, y2_max)

    if inter_xmin >= inter_xmax or inter_ymin >= inter_ymax:
        return 0.0

    inter_area = (inter_xmax - inter_xmin) * (inter_ymax - inter_ymin)

    # 并集
    box1_area = (x1_max - x1_min) * (y1_max - y1_min)
    box2_area = (x2_max - x2_min) * (y2_max - y2_min)
    union_area = box1_area + box2_area - inter_area

    return inter_area / union_area

四、融合跟踪节点

#!/usr/bin/env python3
"""
融合YOLO检测和SORT跟踪的节点
"""

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from vision_msgs.msg import Detection2DArray
from cv_bridge import CvBridge
import cv2
import numpy as np

class TrackingNode(Node):
    def __init__(self):
        super().__init__('tracking_node')

        self.detector = YOLODetector()
        self.tracker = SORTTracker()
        self.bridge = CvBridge()

        # 订阅图像
        self.image_sub = self.create_subscription(
            Image, '/camera/image_raw', self.image_callback, 10)

        # 发布跟踪结果
        self.tracking_pub = self.create_publisher(
            Detection2DArray, '/tracks', 10)

    def image_callback(self, msg):
        """处理图像"""
        cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')

        # 目标检测
        results = self.detector.model(cv_image)
        detections = self.extract_detections(results[0])

        # 多目标跟踪
        tracks = self.tracker.update(detections)

        # 发布跟踪结果
        self.publish_tracks(tracks, msg.header)

        # 可视化
        self.visualize(cv_image, tracks)

    @staticmethod
    def extract_detections(result):
        """从YOLO结果提取检测"""
        detections = []
        if result.boxes is not None:
            for box in result.boxes:
                x1, y1, x2, y2 = box.xyxy[0]
                conf = box.conf
                detections.append([x1, y1, x2, y2, conf])
        return np.array(detections) if detections else np.empty((0, 5))

    def publish_tracks(self, tracks, header):
        """发布跟踪结果"""
        # 创建消息...
        pass

    def visualize(self, image, tracks):
        """可视化跟踪结果"""
        for track in tracks:
            bbox = track['bbox']
            track_id = track['id']
            x1, y1, x2, y2 = map(int, bbox)

            # 绘制边界框
            cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # 绘制ID
            cv2.putText(image, f'ID:{track_id}', (x1, y1 - 10),
                      cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        # 显示
        cv2.imshow('Tracking', image)
        cv2.waitKey(1)

def main(args=None):
    rclpy.init(args=args)
    node = TrackingNode()
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

五、本项目要点总结

目标检测

  • YOLOv8实时检测
  • 置信度和NMS筛选
  • 多尺度特征

多目标跟踪

  • 卡尔曼滤波预测
  • 匈牙利算法匹配
  • ID分配和管理

融合系统

  • 检测和跟踪集成
  • 实时性能
  • 可视化输出

优化技巧

  • 模型量化加速
  • 批处理
  • 硬件加速(GPU)

下一篇预告《ROS2从入门到精通系列(十九):多机器人协作 - 分布式系统与协调》

️ 视觉是机器人最强大的感知工具。掌握这套系统,你就掌握了机器人的"眼睛"!

更多推荐