【强化学习实战】第十二章:Gymnasium库的介绍和使用(2)、蛇棋游戏案例

上一个篇章讲了如何使用gymnasium库中内置的游戏环境,本篇讲如何自定义环境,并用一个蛇棋的小游戏展示说明。

一、gymnasium自定义环境并封装

gymnasium官方介绍封装自定义环境的文档:https://gymnasium.farama.org/tutorials/gymnasium_basics/environment_creation/

1、官方源码
官方的Gymnaium源码在Github上的下载地址是:https://gitcode.com/GitHub_Trending/gy/Gymnasium?utm_source=csdn_github_accelerator&isLogin=1&from_link=bf0bcf3062539a3fe95979812265b002

Gym的源代码结构包括多个子模块,其中关键的模块有:

  • gym.envs: 包含了所有内置的环境。
  • gym.spaces: 包含了空间类的定义。
  • gym.wrappers: 包含了一些环境包装器,可以用于修改现有环境的行为

这里偷个懒就不写了,下面这篇博文讲得非常清晰:
https://blog.csdn.net/Ever_____/article/details/139503558?ops_request_misc=&request_id=&biz_id=102&utm_term=gymnasium&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-1-139503558.142^v102^pc_search_result_base7&spm=1018.2226.3001.4187

二、案例展示:蛇棋游戏

(一)蛇棋规则1、agent通过掷色子前进,比如当前agent在格子1位置,agent掷的色子是2,那agent就走两个格子,从格子1到达格子3,这算是agent走了一步,系统奖励-1分。

比如当agent位于格子99时,如果agent掷色子掷了3,那agent就从格子99出发到100再返回到格子98。这一步的系统奖励还是-1。只有当agent最后一步正好走到格子100时,系统奖励才是100分。

2、梯子的双向作用:
比如当前agent在格子1,掷的色子是3,agent就走到了格子4,但是格子4有一个梯子直达格子14,所以agent就从格子1直接跳到了格子14。
但是,比如如果当前agent在格子10处,而掷色子掷了4,agent就得走到格子14,但格子14有梯子,agent就又被滑落到格子4处。 只有格子100处的梯子不再滑落。

3、假设action可以有两种随机形式:从1-3随机抽取从1-6随机抽取,两种方式。

为了使规则简单一点,我们暂定这些规则,暂时不管蛇的作用。

4、这款游戏的目标是看谁从1走到100,获得的奖励最多。因为获得最多的奖励说明它走的步数是最少的,就是是最快达到终点的。而能最快达到终点的策略就是最优的策略。所以换句话说,我们设置reward奖励是为了寻找这个游戏的最优策略

我们人类看这个游戏,第一直觉就是在游戏最初,也就是在离100较远的格子状态下,我们要尽可能地大步走,也就是我们更希望从1-6中随机掷色子。而在快达到100的时候,或者说离格子100较近的时候,要尽可能地小步走,也就是更希望从1-3中随机掷色子,这样更容易一步到达100,而不是从100再返回去。这是我们人类的直觉和预判。下面我们编写代码,训练一个智能体,看看智能体在奖励的牵引下,它的最优策略是怎样的。

(二)代码实现

#一、重写环境类,需要重写:init函数--step函数--reward函数--reset函数--render函数     
import gymnasium as gym
import numpy as np 

class Env():
    SIZE=100    #用一个类级别的全局变量表示棋盘    棋盘有100个格子
    
    def __init__(self, ladder_num, dice_mode, ladder_seed=17):  # 动作模式:dice_mode=[3,6]----见A处, ladder_seed---见B处
        self.ladder_num = ladder_num  #你希望棋盘上有几个梯子
        self.dice_mode = dice_mode    #  A---表示智能体可以选择从1-3中随机筛色子,也可以从1-6中随机筛色子
        self.state_space = gym.spaces.Discrete(self.SIZE, start=1)  # Discrete(100, start=1)   状态空间:1-100   
        self.action_space = gym.spaces.Discrete(len(self.dice_mode))  #Discrete(2) ,动作空间0-1
        self.pos = 1   #初始化游戏的开始环境
        
        #生成梯子
        if ladder_num == 0:   #不要梯子
            self.ladders = {0:0}
        else:                 #要梯子,但第一个格子不能有梯子,不合理嘛
            random.seed(ladder_seed)    # B---控制一下生成梯子的随机性,方便后面复现和对比
            temp_num = np.array(random.sample(range(2, self.SIZE+1), ladder_num*2)).reshape(2, -1)
            temp_ladders = dict(zip(temp_num[0], temp_num[1]))
            ladders = {}
            for k, v in temp_ladders.items():  #使梯子都是k小v大的格式
                if k<v:
                    ladders[k] = v
                else:
                    ladders[v] = k
            self.ladders = ladders   #梯子就是一个字典
        print(f"棋盘中的梯子有:{self.ladders},智能体可选的掷色子模式有:{self.dice_mode}")
     
    def step(self, action):   # 智能体走一步,状态、奖励、是否结束游戏等信息   action是智能体本次掷的点数
        self.pos += action   #环境迁移
        flag = 0
        
        if self.pos in self.ladders:  #如果这一步走到向上的梯子脚下,上升
            self.pos = self.ladders[self.pos]
            flag = 1
        
        if self.pos > 100:   #如果这一步超过了100,返回
            self.pos = 200-self.pos
            
        if self.pos == 100:    #如果一步到达100就直接return
            return 100, 100, 1, {}   #位置、奖励100、游戏结束、其他    #1表示游戏结束了
            
        if self.pos in self.ladders.values() and flag == 0:  #如果现在的位置在梯子顶端,下降
            reverse_ladder = {v:k for k,v in self.ladders.items()}
            self.pos = reverse_ladder[self.pos]
            
        return self.pos, -1, 0, {}    #0表示游戏还没结束
    
    def reward(self, s):
        if s==99:
            return 100
        else:
            return -1
        
    def reset(self):   #每次训练都要从起始点重新跑一遍游戏
        self.pos = 1
        return self.pos
    
    def render(self):   #可视化界面
        pass
#二、构建智能体 --策略、状态价值、动作价值、gamma折扣系数、转移矩阵  
class Agent():
    def __init__(self, env):
        self.pi = np.ones((env.state_space.n, env.action_space.n)) * 1/env.action_space.n   #一开始是均匀随机的策略
        self.value_pi = np.zeros(env.state_space.n)    #存放每个格子的价值   (100,)
        self.value_q = np.zeros((env.state_space.n, env.action_space.n))   #存放每个格子的动作价值   (100, 2)
        self.gamma = 0.9    #价值折扣系数
        
        #下面是求状态转移矩阵的
        self.pa = np.zeros([env.action_space.n, env.state_space.n, env.state_space.n])  #状态转移矩阵 (2, 100, 100)
        reverse_ladder = {v:k for k,v in env.ladders.items()}
        for i in range(env.action_space.n):    #i=0/1
            prob = 1/env.dice_mode[i]       #1/3 或者 1/6
            for j in range(env.state_space.n-1): #0-99  #格子100不参与循环
                for step in range(1,env.dice_mode[i]+1): # step=1,2,3 /1,2,3,4,5,6
                    k = j+step
                    flag = 0
               
                    if k>99:
                        self.pa[i,j,198-k] += prob
                        
                    elif k in env.ladders:
                        k = env.ladders[k]
                        if k==100:
                            self.pa[i,j,k-1]+= prob 
                        else:
                            self.pa[i,j,k-1]+= prob 
                        flag = 1
                        
                    elif k in reverse_ladder and flag==0:
                        k = reverse_ladder[k]
                        self.pa[i,j,k]+= prob

                    else:
                        self.pa[i,j,k]+= prob      
#三、评估当前策略下的状态价值--更新智能体的状态价值、动作价值
def Policy_evaluation(env, agent):
    pi = agent.pi.copy()
    value_pi = agent.value_pi.copy()
    value_q = agent.value_q.copy()
    
    max_value_iter = 100
    while True:    #开始迭代价值函数--也就是评估策略pi下的状态价值     
        pre_value_pi = value_pi.copy()
        for state in range(env.state_space.n):    #0-99
            q0 = (agent.pa[0,state,:]*value_pi)[agent.pa[0,state,:] !=0].sum()   #agent.pa是状态转移矩阵
            q1 = (agent.pa[1,state,:]*value_pi)[agent.pa[1,state,:] !=0].sum()
            value_q[state] = [q0, q1]    #更改q值
            value_pi[state] = env.reward(state) + agent.gamma*(pi[state][0]*q0 + pi[state][1]*q1) #更改v值

        #设置两个停止迭代的信号
        diff = np.sqrt(np.sum(np.power(value_pi-pre_value_pi, 2)))
        max_value_iter -= 1    
        if diff < 1e-5:
#            print(f"价值迭代正常收敛,迭代{100-max_value_iter}次")
            break
        if max_value_iter == 0:
            print("到达最大迭代次数")
            break
            
    agent.value_pi = value_pi
    agent.value_q = value_q
#四、提升agent的策略   --更新智能体的策略
def policy_improvement(env, agent):  
    pi = agent.pi.copy()
    for state in range(env.state_space.n):    #0-99
        up = agent.value_q[state].argmax()
        down = agent.value_q[state].argmin()
        
        if pi[state][up]<= 0.9:
            pi[state][up] += 0.1
        if pi[state][down]>=0.1:
            pi[state][down] -= 0.1
    agent.pi = pi
#五、agent用当前策略,玩n=100轮游戏
def play_n_episode(env, agent, n=100):
    reward = []
    
    for episode in range(n):
        state = env.reset()     # state是 1
        agent_policy = agent.pi  
        
        reward_step = []
        trace = []
        while True:
            rand_num = np.random.rand()    #抽取0-1之间均匀分布的随机数
            s_policy = agent_policy[state-1]   #把当前状态下的策略取出来
            dice = None
    
            if s_policy.min()  == s_policy.max():  #如果策略0和策略1的概率相同
                dice = np.random.randint(0,2)    #就随意选择一种模式
            else:                                   #如果不相同
                if rand_num<s_policy.min():    #如果随机数小于小概率的策略
                    dice = s_policy.argmin()   #就选小概率的策略
                else:
                    dice = s_policy.argmax()
    
            action = np.random.randint(1, env.dice_mode[dice]+1)  #掷色子
            pre_position = env.pos
            new_state, r, done, _ = env.step(action)  #走一步
            reward_step.append(r)
            trace.append([pre_position, dice, action, env.pos, r])
            if done:
                break
        reward.append(sum(reward_step))
    return np.mean(reward)
#六、训练10个epoch---也就是评估10次策略-提升10次策略
def fit(env, agent, epochs=10):
    score = []
    for epoch in range(epochs):
        mean_reward = play_n_episode(env, agent, n=100)   #玩100次游戏,统计评价得分
        score.append(mean_reward)
        Policy_evaluation(env, agent)   #评估策略
        policy_improvement(env, agent)  #提升策略
    return score

(三)训练结果查看

#1、不要梯子,看看agent的策略和得分
import matplotlib.pyplot as plt
env = Env(0, [3,6])         #不要梯子
agent = Agent(env)
score = fit(env, agent)
np.argmax(agent.pi, axis = 1)

plt.figure(figsize = (10, 2))
plt.plot(score)

#2、设置5个梯子,看看agent的策略和得分
import matplotlib.pyplot as plt
env = Env(5, [3,6])     #设置5个梯子
agent = Agent(env)
score = fit(env, agent)
np.argmax(agent.pi, axis = 1)

plt.figure(figsize = (10, 2))
plt.plot(score)

从结果来看,我们的算法还是有效的:
(1)对于无梯子的情况:在离格子100还有3个格子时,就从0-3之间掷色子。其他地方都从0-6掷色子。显然这是最优的策略。
(2)对于有梯子的情况:在梯子低端附近有部分格子是从0-3掷色子,这样可以尽可能地利用梯子快速上升。在梯子顶端附件则从0-6之间掷色子,加快越过梯子顶端,防止滑落。可见,我们训练出来的结果还是非常有效的策略。
(3)从整体看,有梯子比无梯子的得分要高。这也非常合情理。

三、蛇棋案例说明

该案例是我从 https://www.bilibili.com/video/BV1VY411W7Mm/?spm_id_from=333.1007.top_right_bar_window_history.content.click&vd_source=b6780e06031ac609460f6fbf017bbb39 上看到的。上面(三)的代码是我按照我自己对算法的理解,自己又重构的代码。个人认为我的代码清晰度、逻辑链条更加通畅一些。

原作者是想用蛇棋案例来说明强化学习中的三种算法:策略迭代、价值迭代、泛化迭代

我个人认为,这个案例就是一个动态规划问题,这是一个有模型的强化学习问题,所以agent打的游戏数据是没有什么用处的。就是说最优策略是可以在环境确定下来后,就可以准确算出最优策略了,而没必要让agent去实实在在打几轮游戏去计算最优策略。至于为什么,可从我下面这两篇博文找到理论支持:
https://blog.csdn.net/friday1203/article/details/155533020?spm=1001.2014.3001.5501
https://blog.csdn.net/friday1203/article/details/155946919?spm=1001.2014.3001.5501
所以在我自己重构的代码中,agent实实在在打的游戏都是用来衡量策略的效果。至于策略提升,是完全用不到agent实实在在打的游戏数据的。同时我也仔细研究了原作者的代码,其实她的代码中也没用到agent实实在在打游戏的数据。下面我把原作者的代码也贴出来,大家互相参考:

import numpy as np
import gymnasium as gym
from gymnasium.spaces import Discrete

from contextlib import contextmanager
import time

@contextmanager
def timer(name):
    start = time.time()
    yield
    end = time.time()
    print("{} COST:{}".format(name, end-start))
#(一)构建环境
class SnakeEnv(gym.Env):
    SIZE=100    #表示游戏是从1-100
    
    def __init__(self, ladder_num, dices):  
        self.ladder_num = ladder_num  #指定游戏有几个梯子
        self.dices = dices    #透筛子的点数
        self.observation_space = Discrete(self.SIZE+1)  #0索引弃用,所以设置state的数量为101
        self.action_space = Discrete(len(dices))  #有两种action
        
        if ladder_num == 0:
            self.ladders = {0:0}
        else:
            ladders = set(np.random.randint(1, self.SIZE, size=self.ladder_num*2))
            while len(ladders) < self.ladder_num*2:
                ladders.add(np.random.randint(1, self.SIZE))
            ladders = list(ladders)
            ladders = np.array(ladders)
            np.random.shuffle(ladders)
            ladders = ladders.reshape((self.ladder_num, 2))
            
            re_ladders = list()
            for i in ladders:
                re_ladders.append([i[1], i[0]])
            re_ladders = np.array(re_ladders)
            self.ladders = dict(np.append(re_ladders, ladders, axis=0))
        print(f"ladders info:{self.ladders} dice ranges:{self.dices}")
        self.pos = 1   #初始化位置
    
    def reset(self):   #每次训练都要从起始点重新跑一遍游戏
        self.pos = 1
        return self.pos

    def step(self, a):   #每走一步,状态、奖励、是否结束游戏等信息
        step = np.random.randint(1, self.dices[a]+1)
        self.pos += step
        if self.pos == 100:
            return 100, 100, 1, {}   #位置、奖励、游戏结束
        elif self.pos > 100:
            self.pos = 200-self.pos
            
        if self.pos in self.ladders:
            self.pos = self.ladders[self.pos]
        return self.pos, -1, 0, {}    #0表示游戏还没结束
    
    def reward(self, s):
        if s==100:
            return 100
        else:
            return -1
    
    def render(self):   #可视化界面
        pass
#(二)构建智能体   
class TableAgent(object):
    def __init__(self, env):
        self.s_len = env.observation_space.n  #state的个数
        self.a_len = env.action_space.n   #action的个数
        self.r = [env.reward(s) for s in range(0, self.s_len)]
        self.pi = np.zeros(self.s_len, dtype=int)   #确定性策略
        self.p = np.zeros([self.a_len, self.s_len, self.s_len], dtype=float)  #状态转移概率
        
        ladder_move = np.vectorize(lambda x: env.ladders[x] if x in env.ladders else x)
        
        for i, dice in enumerate(env.dices):   #初始化表格所有位置的概率p[A,S,S]
            prob = 1.0/dice    #dice就是一个数组[3,6]
            for src in range(1, 100):
                step = np.arange(dice) + 1
                step += src
                step = np.piecewise(step, [step>100, step<=100], [lambda x: 200-x, lambda x:x])
                step = ladder_move(step)
                for dst in step:
                    self.p[i, src, dst] += prob
            
        self.p[:,100,100] = 1
        self.value_pi = np.zeros((self.s_len))
        self.value_q = np.zeros((self.s_len, self.a_len))
        self.gamma = 0.8
        
    def play(self, state):
        return self.pi[state]
#(三)策略评估(reward计算)
def eval_game(env, agent):
    state = env.reset()
    total_reward = 0
    state_action = []
    
    while True:
        act = agent.play(state)
        state_action.append((state, act))
        state, reward, done, _ = env.step(act)
        total_reward += reward
        if done:
            break
    return total_reward, state_action
#(四)算法--1、策略迭代
class PolicyIteration(object):
    dice = [3, 6]
    def policy_evaluation(self, agent, max_iter=-1):
        iteration = 0
        while True:
            iteration += 1
            new_value_pi = agent.value_pi.copy()
            for i in range(1, agent.s_len):
                ac = agent.pi[i]
                for j in range(0, agent.a_len):
                    if ac != j:
                        break
                    transition = agent.p[ac, i, :]
                    value_sa = np.dot(transition, agent.r+agent.gamma*agent.value_pi)
                    new_value_pi[i] = value_sa
            diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
            if diff < 1e-6:
                print("policy evaluation proceed {} iters.".format(iteration))
                break
            else:
                agent.value_pi = new_value_pi
            if iteration == max_iter:
                print("policy evaluation proceed {} iters.".format(iteration))
                break
                
    def policy_improvement(self, agent):
        new_policy = np.zeros_like(agent.pi)
        for i in range(1, agent.s_len):
            for j in range(0, agent.a_len):
                transition = agent.p[j, i, :]
                agent.value_q[i,j] = np.dot(transition, agent.r+agent.gamma*agent.value_pi)
            max_act = np.argmax(agent.value_q[i,:])
            new_policy[i] = max_act
        
        if np.all(np.equal(new_policy, agent.pi)):
            return False
        else:
            agent.pi = new_policy
            return True
        
    def policy_iteration(self, agent, max_iter=-1):
        iteration=0
        with timer('Timer PolicyIter'):
            while True:
                iteration += 1
                with timer('Timer PolicyEval'):
                    self.policy_evaluation(agent, max_iter)
                with timer('Timer PolicyImprove'):
                    ret = self.policy_improvement(agent)
                if not ret:
                    break
        print("Iter {} rounds converge".format(iteration))
def policy_iteration_demo(env):
    agent = TableAgent(env)
    pi_algo = PolicyIteration()
    pi_algo.policy_iteration(agent)
    print('agent.pi={}'.format(agent.pi))
    total_reward, state_action=eval_game(env,agent)
    print('total_reward={0}, state_action{1}'.format(total_reward, state_action))
if __name__ == '__main__':
#    env1 = SnakeEnv(0, [3, 6])
    env2 = SnakeEnv(5, [3, 6])
#    policy_iteration_demo(env1)
    policy_iteration_demo(env2)

##(四)算法2--价值迭代
def value_iteration(agent, max_iter=-1):
    iteration = 0
    dice=[3, 6]
    with timer('Timer ValueIter'):
        while True:
            iteration += 1
            new_value_pi = agent.value_pi.copy()
            for i in range(1, agent.s_len):
                value_sas = []
                for j in range(0, agent.a_len):
                    value_sa = np.dot(agent.p[j,i,:], agent.r + agent.gamma*agent.value_pi)
                    value_sas.append(value_sa)
                new_value_pi[i] = max(value_sas)
                
            diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
            if diff < 1e-6:
                break
            else:
                agent.value_pi = new_value_pi
            if iteration == max_iter:
                break
        print('Iter {} rounds converge'.format(iteration))
        for i in range(1, agent.s_len):
            for j in range(0, agent.a_len):
                agent.value_q[i,j] = np.dot(agent.p[j,i,:], agent.r+agent.gamma*agent.value_pi)
            max_act = np.argmax(agent.value_q[i,:])
            agent.pi[i] = max_act
#价值迭代和策略迭代的对比
def policy_vs_value_demo(env):
    policy_agent = TableAgent(env)
    value_agent = TableAgent(env)
    
    pi_algo = PolicyIteration()
    pi_algo.policy_iteration(policy_agent)
    print('agent.pi={}'.format(policy_agent.pi))
    total_reward, state_action = eval_game(env, policy_agent)
    print('total_reward={0}, state_action={1}'.format(total_reward, state_action))
    
    value_iteration(value_agent)
    print('agent.pi={}'.format(value_agent.pi))
    total_reward, state_action = eval_game(env, value_agent)
    print('total_reward={0}, state_action={1}'.format(total_reward, state_action))
if __name__ == '__main__':
#    env1 = SnakeEnv(0, [3, 6])
    env2 = SnakeEnv(5, [3, 6])
#    policy_iteration_demo(env1)
#    policy_iteration_demo(env2)

    policy_vs_value_demo(env2)
##(四)算法3--泛化迭代
#泛化迭代和前两种迭代的对比
def generalized_policy_compare(env):
    policy_vs_value_demo(env)
    gener_agent = TableAgent(env)
    #用10轮的价值迭代+1轮的策略迭代-->得到一个策略方案
    with timer("Timer GeneralizedIter"):
        value_iteration(gener_agent, 10)     #这里的10你可以自己调,你觉得迭代几次价值评估就大差不差了,那你就迭代几次吧
        pi_algo = PolicyIteration()
        pi_algo.policy_iteration(gener_agent, 1)
    print("agent.pi={}".format(gener_agent.pi))
    total_reward, state_action = eval_game(env, gener_agent)
    print("total_reward={0}, state_action={1}".format(total_reward, state_action))
if __name__ == '__main__':
    env2 = SnakeEnv(5, [3, 6])
    generalized_policy_compare(env2)

最后补充:装饰器

假如我原来写的函数是my_funcB,但是我们嫌这个函数的功能不太完善,我现在想补充一些,那补充的内容就可以写成闭包函数funcA()这样的嵌套形式,并且参数是原函数my_funcB,然后我再在原函数my_funcB头上加一个@funcA的帽子,这样我调用并执行my_funcB的时候就执行了my_funcB和我想补充的内容。

更多推荐