10个与奖励函数设计相关的关键概念、技术或方法,并为每个提供一个简化的Python代码片段来帮助理解其思路。

  1. 碰撞惩罚 (Collision Penalty)
    思路: 最基础的安全奖励。如果发生碰撞,则给予一个大的负奖励。
    创新点: 直接且易于实现,强制代理避免事故。
def reward_collision(has_collision):
    if has_collision:
        return -1000  # 大的负奖励
    else:
        return 0      # 无碰撞则无奖励

# 示例调用
current_reward = reward_collision(vehicle.crashed)
  1. 基于速度的碰撞惩罚 (Speed-Based Collision Penalty)
    思路: 惩罚不仅取决于是否碰撞,还取决于碰撞时的速度,高速碰撞惩罚更重。
    创新点: 更符合现实,鼓励代理在高速时更加谨慎。
def reward_speed_based_collision(has_collision, current_speed):
    if has_collision:
        # 惩罚随速度平方增长
        penalty = -50 * (current_speed ** 2 + 0.5)
        return penalty
    else:
        return 0

# 示例调用
current_reward = reward_speed_based_collision(vehicle.crashed, vehicle.speed)
  1. 时间到碰撞 (Time-to-Collision, TTC)
    思路: 计算与前方障碍物发生碰撞前的剩余时间,TTC越小,风险越高,应给予负奖励。
    创新点: 提供了一个连续的、密集的风险信号,而不仅仅是稀疏的碰撞事件。
def calculate_ttc(distance_to_front, ego_speed, front_speed):
    relative_speed = max(ego_speed - front_speed, 0.1)  # 避免除零
    ttc = distance_to_front / relative_speed
    return ttc

def reward_ttc(ttc, threshold=3.0):
    if ttc < threshold:
        # TTC低于阈值时,奖励为负,且TTC越小,惩罚越大
        return -1 / ttc
    else:
        return 0  # 安全

# 示例调用
ttc = calculate_ttc(dist_front, ego_speed, front_speed)
current_reward += reward_ttc(ttc)
  1. 进展奖励 (Progress Reward based on Distance)
    思路: 根据车辆在道路上前进的距离给予正奖励,以激励代理向目标移动。
    创新点: 解决了目标奖励延迟的问题,提供了密集的正向反馈。
class ProgressTracker:
    def __init__(self):
        self.last_distance = 0
    
    def reward_progress(self, current_distance_on_route):
        progress_made = current_distance_on_route - self.last_distance
        self.last_distance = current_distance_on_route
        return progress_made * 10  # 每米奖励10分

# 示例调用
tracker = ProgressTracker()
current_reward += tracker.reward_progress(agent.current_route_distance)
  1. 舒适度奖励 (Comfort Reward via Jerk Penalty)
    思路: 通过惩罚纵向和横向加速度的变化率(即“急促度”jerk)来保证乘坐舒适性。
    创新点: 将物理上的舒适度指标量化到奖励函数中。
def reward_comfort(longitudinal_jerk, lateral_jerk):
    # 对急促度进行惩罚,数值越大惩罚越重
    jerk_penalty = -(longitudinal_jerk**2 + lateral_jerk**2) * 0.1
    return jerk_penalty

# 示例调用
current_reward += reward_comfort(vehicle.long_jerk, vehicle.lat_jerk)
  1. 速度限制惩罚 (Speed Limit Violation Penalty)
    思路: 当车辆速度超过道路限速时,根据超速程度施加惩罚。
    创新点: 鼓励代理遵守基本交通法规。
def reward_speed_limit(current_speed, speed_limit):
    if current_speed > speed_limit:
        over_speed = current_speed - speed_limit
        return -over_speed * 5  # 每超速1m/s,扣5分
    else:
        return 0  # 不超速则无惩罚

# 示例调用
current_reward += reward_speed_limit(vehicle.speed, road.speed_limit)
  1. 加权求和奖励聚合 (Weighted Sum Aggregation)
    思路: 将不同类别的奖励乘以权重后相加,形成最终奖励。这是最常用的方法。
    创新点: 允许研究者手动调整不同目标的重要性。
def final_reward_weighted_sum(safety_r, progress_r, comfort_r, rules_r):
    w_safety = 10.0
    w_progress = 1.0
    w_comfort = 0.5
    w_rules = 2.0
    
    total_reward = (w_safety * safety_r + 
                    w_progress * progress_r + 
                    w_comfort * comfort_r + 
                    w_rules * rules_r)
    return total_reward

# 示例调用
final_r = final_reward_weighted_sum(rs, rp, rc, rr)
  1. 基于规则书的奖励 (Rulebook-based Reward)
    思路: 不使用权重,而是定义一个有优先级顺序的规则列表(如安全>规则>舒适)。代理必须满足高优先级规则,再优化低优先级目标。
    创新点: 避免了权重调整的难题,明确表达了目标间的优先级。
class Rulebook:
    def __init__(self):
        self.rules = [
            {"name": "No_Collision", "priority": 1, "check": lambda state: not state.has_collision()},
            {"name": "Obey_Speed_Limit", "priority": 2, "check": lambda state: state.speed <= state.speed_limit},
            {"name": "Smooth_Driving", "priority": 3, "check": lambda state: state.jerk < 5.0}
        ]
    
    def evaluate_action(self, state):
        # 按优先级检查规则
        for rule in sorted(self.rules, key=lambda x: x["priority"]):
            if not rule["check"](state):
                return -float('inf')  # 违反高优先级规则,奖励为负无穷
        # 如果所有规则都满足,则返回一个基于进展的奖励
        return state.progress_made * 10

# 示例调用
rulebook = Rulebook()
current_reward = rulebook.evaluate_action(current_state)
  1. 奖励机 (Reward Machines)
    思路: 使用类似有限状态机的结构,根据当前驾驶情境(如“跟车”、“变道”、“汇入”)切换不同的子奖励函数。
    创新点: 实现了情境感知的奖励,使奖励函数更具适应性和通用性。
class RewardMachine:
    def __init__(self):
        self.state = "NORMAL_DRIVING"
    
    def update_state(self, environment):
        if environment.is_merging_scenario():
            self.state = "MERGING"
        elif environment.is_lane_change_allowed():
            self.state = "LANE_CHANGE"
        else:
            self.state = "NORMAL_DRIVING"
    
    def get_reward(self, state, action):
        self.update_state(state)
        
        if self.state == "MERGING":
            return self._merging_reward(state, action)
        elif self.state == "LANE_CHANGE":
            return self._lane_change_reward(state, action)
        else:
            return self._normal_driving_reward(state, action)

# 每个情境下的具体奖励函数...
  1. 逆向强化学习 (Inverse Reinforcement Learning, IRL)
    思路: 不手动设计奖励函数,而是从人类专家的驾驶数据中学习出一个能解释这些行为的奖励函数。
    创新点: 可以学习到复杂、难以显式编程的驾驶偏好。
# 此处仅为概念示意,真实IRL非常复杂
def inverse_rl_learn_reward(human_trajectories):
    """
    伪代码:从人类轨迹数据中学习奖励函数
    """
    # 初始化一个参数化的奖励函数 R(s; θ)
    theta = initialize_parameters()
    
    for trajectory in human_trajectories:
        # 计算在当前θ下,该轨迹的概率
        prob = calculate_trajectory_probability(trajectory, theta)
        # 更新θ以最大化人类轨迹出现的概率
        theta = optimize(theta, prob)
    
    learned_reward_function = lambda s, a: R(s, a; theta)
    return learned_reward_function

# 学习到的奖励函数可以用于训练新的RL代理
learned_reward = inverse_rl_learn_reward(expert_demos)
agent.train_with_reward(learned_reward)

更多推荐