【强化学习实战】第十二章:Gymnasium库的介绍和使用(2)、蛇棋游戏案例
上一个篇章讲了如何使用gymnasium库中内置的游戏环境,本篇讲如何自定义环境,并用一个蛇棋的小游戏展示说明。
【强化学习实战】第十二章:Gymnasium库的介绍和使用(2)、蛇棋游戏案例
上一个篇章讲了如何使用gymnasium库中内置的游戏环境,本篇讲如何自定义环境,并用一个蛇棋的小游戏展示说明。
一、gymnasium自定义环境并封装
gymnasium官方介绍封装自定义环境的文档:https://gymnasium.farama.org/tutorials/gymnasium_basics/environment_creation/
1、官方源码
官方的Gymnaium源码在Github上的下载地址是:https://gitcode.com/GitHub_Trending/gy/Gymnasium?utm_source=csdn_github_accelerator&isLogin=1&from_link=bf0bcf3062539a3fe95979812265b002
Gym的源代码结构包括多个子模块,其中关键的模块有:
- gym.envs: 包含了所有内置的环境。
- gym.spaces: 包含了空间类的定义。
- gym.wrappers: 包含了一些环境包装器,可以用于修改现有环境的行为
二、案例展示:蛇棋游戏
(一)蛇棋规则
1、agent通过掷色子前进,比如当前agent在格子1位置,agent掷的色子是2,那agent就走两个格子,从格子1到达格子3,这算是agent走了一步,系统奖励-1分。
比如当agent位于格子99时,如果agent掷色子掷了3,那agent就从格子99出发到100再返回到格子98。这一步的系统奖励还是-1。只有当agent最后一步正好走到格子100时,系统奖励才是100分。
2、梯子的双向作用:
比如当前agent在格子1,掷的色子是3,agent就走到了格子4,但是格子4有一个梯子直达格子14,所以agent就从格子1直接跳到了格子14。
但是,比如如果当前agent在格子10处,而掷色子掷了4,agent就得走到格子14,但格子14有梯子,agent就又被滑落到格子4处。 只有格子100处的梯子不再滑落。
3、假设action可以有两种随机形式:从1-3随机抽取和从1-6随机抽取,两种方式。
为了使规则简单一点,我们暂定这些规则,暂时不管蛇的作用。
4、这款游戏的目标是看谁从1走到100,获得的奖励最多。因为获得最多的奖励说明它走的步数是最少的,就是是最快达到终点的。而能最快达到终点的策略就是最优的策略。所以换句话说,我们设置reward奖励是为了寻找这个游戏的最优策略。
我们人类看这个游戏,第一直觉就是在游戏最初,也就是在离100较远的格子状态下,我们要尽可能地大步走,也就是我们更希望从1-6中随机掷色子。而在快达到100的时候,或者说离格子100较近的时候,要尽可能地小步走,也就是更希望从1-3中随机掷色子,这样更容易一步到达100,而不是从100再返回去。这是我们人类的直觉和预判。下面我们编写代码,训练一个智能体,看看智能体在奖励的牵引下,它的最优策略是怎样的。
(二)代码实现
#一、重写环境类,需要重写:init函数--step函数--reward函数--reset函数--render函数
import gymnasium as gym
import numpy as np
class Env():
SIZE=100 #用一个类级别的全局变量表示棋盘 棋盘有100个格子
def __init__(self, ladder_num, dice_mode, ladder_seed=17): # 动作模式:dice_mode=[3,6]----见A处, ladder_seed---见B处
self.ladder_num = ladder_num #你希望棋盘上有几个梯子
self.dice_mode = dice_mode # A---表示智能体可以选择从1-3中随机筛色子,也可以从1-6中随机筛色子
self.state_space = gym.spaces.Discrete(self.SIZE, start=1) # Discrete(100, start=1) 状态空间:1-100
self.action_space = gym.spaces.Discrete(len(self.dice_mode)) #Discrete(2) ,动作空间0-1
self.pos = 1 #初始化游戏的开始环境
#生成梯子
if ladder_num == 0: #不要梯子
self.ladders = {0:0}
else: #要梯子,但第一个格子不能有梯子,不合理嘛
random.seed(ladder_seed) # B---控制一下生成梯子的随机性,方便后面复现和对比
temp_num = np.array(random.sample(range(2, self.SIZE+1), ladder_num*2)).reshape(2, -1)
temp_ladders = dict(zip(temp_num[0], temp_num[1]))
ladders = {}
for k, v in temp_ladders.items(): #使梯子都是k小v大的格式
if k<v:
ladders[k] = v
else:
ladders[v] = k
self.ladders = ladders #梯子就是一个字典
print(f"棋盘中的梯子有:{self.ladders},智能体可选的掷色子模式有:{self.dice_mode}")
def step(self, action): # 智能体走一步,状态、奖励、是否结束游戏等信息 action是智能体本次掷的点数
self.pos += action #环境迁移
flag = 0
if self.pos in self.ladders: #如果这一步走到向上的梯子脚下,上升
self.pos = self.ladders[self.pos]
flag = 1
if self.pos > 100: #如果这一步超过了100,返回
self.pos = 200-self.pos
if self.pos == 100: #如果一步到达100就直接return
return 100, 100, 1, {} #位置、奖励100、游戏结束、其他 #1表示游戏结束了
if self.pos in self.ladders.values() and flag == 0: #如果现在的位置在梯子顶端,下降
reverse_ladder = {v:k for k,v in self.ladders.items()}
self.pos = reverse_ladder[self.pos]
return self.pos, -1, 0, {} #0表示游戏还没结束
def reward(self, s):
if s==99:
return 100
else:
return -1
def reset(self): #每次训练都要从起始点重新跑一遍游戏
self.pos = 1
return self.pos
def render(self): #可视化界面
pass
#二、构建智能体 --策略、状态价值、动作价值、gamma折扣系数、转移矩阵
class Agent():
def __init__(self, env):
self.pi = np.ones((env.state_space.n, env.action_space.n)) * 1/env.action_space.n #一开始是均匀随机的策略
self.value_pi = np.zeros(env.state_space.n) #存放每个格子的价值 (100,)
self.value_q = np.zeros((env.state_space.n, env.action_space.n)) #存放每个格子的动作价值 (100, 2)
self.gamma = 0.9 #价值折扣系数
#下面是求状态转移矩阵的
self.pa = np.zeros([env.action_space.n, env.state_space.n, env.state_space.n]) #状态转移矩阵 (2, 100, 100)
reverse_ladder = {v:k for k,v in env.ladders.items()}
for i in range(env.action_space.n): #i=0/1
prob = 1/env.dice_mode[i] #1/3 或者 1/6
for j in range(env.state_space.n-1): #0-99 #格子100不参与循环
for step in range(1,env.dice_mode[i]+1): # step=1,2,3 /1,2,3,4,5,6
k = j+step
flag = 0
if k>99:
self.pa[i,j,198-k] += prob
elif k in env.ladders:
k = env.ladders[k]
if k==100:
self.pa[i,j,k-1]+= prob
else:
self.pa[i,j,k-1]+= prob
flag = 1
elif k in reverse_ladder and flag==0:
k = reverse_ladder[k]
self.pa[i,j,k]+= prob
else:
self.pa[i,j,k]+= prob
#三、评估当前策略下的状态价值--更新智能体的状态价值、动作价值
def Policy_evaluation(env, agent):
pi = agent.pi.copy()
value_pi = agent.value_pi.copy()
value_q = agent.value_q.copy()
max_value_iter = 100
while True: #开始迭代价值函数--也就是评估策略pi下的状态价值
pre_value_pi = value_pi.copy()
for state in range(env.state_space.n): #0-99
q0 = (agent.pa[0,state,:]*value_pi)[agent.pa[0,state,:] !=0].sum() #agent.pa是状态转移矩阵
q1 = (agent.pa[1,state,:]*value_pi)[agent.pa[1,state,:] !=0].sum()
value_q[state] = [q0, q1] #更改q值
value_pi[state] = env.reward(state) + agent.gamma*(pi[state][0]*q0 + pi[state][1]*q1) #更改v值
#设置两个停止迭代的信号
diff = np.sqrt(np.sum(np.power(value_pi-pre_value_pi, 2)))
max_value_iter -= 1
if diff < 1e-5:
# print(f"价值迭代正常收敛,迭代{100-max_value_iter}次")
break
if max_value_iter == 0:
print("到达最大迭代次数")
break
agent.value_pi = value_pi
agent.value_q = value_q
#四、提升agent的策略 --更新智能体的策略
def policy_improvement(env, agent):
pi = agent.pi.copy()
for state in range(env.state_space.n): #0-99
up = agent.value_q[state].argmax()
down = agent.value_q[state].argmin()
if pi[state][up]<= 0.9:
pi[state][up] += 0.1
if pi[state][down]>=0.1:
pi[state][down] -= 0.1
agent.pi = pi
#五、agent用当前策略,玩n=100轮游戏
def play_n_episode(env, agent, n=100):
reward = []
for episode in range(n):
state = env.reset() # state是 1
agent_policy = agent.pi
reward_step = []
trace = []
while True:
rand_num = np.random.rand() #抽取0-1之间均匀分布的随机数
s_policy = agent_policy[state-1] #把当前状态下的策略取出来
dice = None
if s_policy.min() == s_policy.max(): #如果策略0和策略1的概率相同
dice = np.random.randint(0,2) #就随意选择一种模式
else: #如果不相同
if rand_num<s_policy.min(): #如果随机数小于小概率的策略
dice = s_policy.argmin() #就选小概率的策略
else:
dice = s_policy.argmax()
action = np.random.randint(1, env.dice_mode[dice]+1) #掷色子
pre_position = env.pos
new_state, r, done, _ = env.step(action) #走一步
reward_step.append(r)
trace.append([pre_position, dice, action, env.pos, r])
if done:
break
reward.append(sum(reward_step))
return np.mean(reward)
#六、训练10个epoch---也就是评估10次策略-提升10次策略
def fit(env, agent, epochs=10):
score = []
for epoch in range(epochs):
mean_reward = play_n_episode(env, agent, n=100) #玩100次游戏,统计评价得分
score.append(mean_reward)
Policy_evaluation(env, agent) #评估策略
policy_improvement(env, agent) #提升策略
return score
(三)训练结果查看
#1、不要梯子,看看agent的策略和得分
import matplotlib.pyplot as plt
env = Env(0, [3,6]) #不要梯子
agent = Agent(env)
score = fit(env, agent)
np.argmax(agent.pi, axis = 1)
plt.figure(figsize = (10, 2))
plt.plot(score)

#2、设置5个梯子,看看agent的策略和得分
import matplotlib.pyplot as plt
env = Env(5, [3,6]) #设置5个梯子
agent = Agent(env)
score = fit(env, agent)
np.argmax(agent.pi, axis = 1)
plt.figure(figsize = (10, 2))
plt.plot(score)

从结果来看,我们的算法还是有效的:
(1)对于无梯子的情况:在离格子100还有3个格子时,就从0-3之间掷色子。其他地方都从0-6掷色子。显然这是最优的策略。
(2)对于有梯子的情况:在梯子低端附近有部分格子是从0-3掷色子,这样可以尽可能地利用梯子快速上升。在梯子顶端附件则从0-6之间掷色子,加快越过梯子顶端,防止滑落。可见,我们训练出来的结果还是非常有效的策略。
(3)从整体看,有梯子比无梯子的得分要高。这也非常合情理。
三、蛇棋案例说明
该案例是我从 https://www.bilibili.com/video/BV1VY411W7Mm/?spm_id_from=333.1007.top_right_bar_window_history.content.click&vd_source=b6780e06031ac609460f6fbf017bbb39 上看到的。上面(三)的代码是我按照我自己对算法的理解,自己又重构的代码。个人认为我的代码清晰度、逻辑链条更加通畅一些。
原作者是想用蛇棋案例来说明强化学习中的三种算法:策略迭代、价值迭代、泛化迭代
我个人认为,这个案例就是一个动态规划问题,这是一个有模型的强化学习问题,所以agent打的游戏数据是没有什么用处的。就是说最优策略是可以在环境确定下来后,就可以准确算出最优策略了,而没必要让agent去实实在在打几轮游戏去计算最优策略。至于为什么,可从我下面这两篇博文找到理论支持:
https://blog.csdn.net/friday1203/article/details/155533020?spm=1001.2014.3001.5501
https://blog.csdn.net/friday1203/article/details/155946919?spm=1001.2014.3001.5501
所以在我自己重构的代码中,agent实实在在打的游戏都是用来衡量策略的效果。至于策略提升,是完全用不到agent实实在在打的游戏数据的。同时我也仔细研究了原作者的代码,其实她的代码中也没用到agent实实在在打游戏的数据。下面我把原作者的代码也贴出来,大家互相参考:
import numpy as np
import gymnasium as gym
from gymnasium.spaces import Discrete
from contextlib import contextmanager
import time
@contextmanager
def timer(name):
start = time.time()
yield
end = time.time()
print("{} COST:{}".format(name, end-start))
#(一)构建环境
class SnakeEnv(gym.Env):
SIZE=100 #表示游戏是从1-100
def __init__(self, ladder_num, dices):
self.ladder_num = ladder_num #指定游戏有几个梯子
self.dices = dices #透筛子的点数
self.observation_space = Discrete(self.SIZE+1) #0索引弃用,所以设置state的数量为101
self.action_space = Discrete(len(dices)) #有两种action
if ladder_num == 0:
self.ladders = {0:0}
else:
ladders = set(np.random.randint(1, self.SIZE, size=self.ladder_num*2))
while len(ladders) < self.ladder_num*2:
ladders.add(np.random.randint(1, self.SIZE))
ladders = list(ladders)
ladders = np.array(ladders)
np.random.shuffle(ladders)
ladders = ladders.reshape((self.ladder_num, 2))
re_ladders = list()
for i in ladders:
re_ladders.append([i[1], i[0]])
re_ladders = np.array(re_ladders)
self.ladders = dict(np.append(re_ladders, ladders, axis=0))
print(f"ladders info:{self.ladders} dice ranges:{self.dices}")
self.pos = 1 #初始化位置
def reset(self): #每次训练都要从起始点重新跑一遍游戏
self.pos = 1
return self.pos
def step(self, a): #每走一步,状态、奖励、是否结束游戏等信息
step = np.random.randint(1, self.dices[a]+1)
self.pos += step
if self.pos == 100:
return 100, 100, 1, {} #位置、奖励、游戏结束
elif self.pos > 100:
self.pos = 200-self.pos
if self.pos in self.ladders:
self.pos = self.ladders[self.pos]
return self.pos, -1, 0, {} #0表示游戏还没结束
def reward(self, s):
if s==100:
return 100
else:
return -1
def render(self): #可视化界面
pass
#(二)构建智能体
class TableAgent(object):
def __init__(self, env):
self.s_len = env.observation_space.n #state的个数
self.a_len = env.action_space.n #action的个数
self.r = [env.reward(s) for s in range(0, self.s_len)]
self.pi = np.zeros(self.s_len, dtype=int) #确定性策略
self.p = np.zeros([self.a_len, self.s_len, self.s_len], dtype=float) #状态转移概率
ladder_move = np.vectorize(lambda x: env.ladders[x] if x in env.ladders else x)
for i, dice in enumerate(env.dices): #初始化表格所有位置的概率p[A,S,S]
prob = 1.0/dice #dice就是一个数组[3,6]
for src in range(1, 100):
step = np.arange(dice) + 1
step += src
step = np.piecewise(step, [step>100, step<=100], [lambda x: 200-x, lambda x:x])
step = ladder_move(step)
for dst in step:
self.p[i, src, dst] += prob
self.p[:,100,100] = 1
self.value_pi = np.zeros((self.s_len))
self.value_q = np.zeros((self.s_len, self.a_len))
self.gamma = 0.8
def play(self, state):
return self.pi[state]
#(三)策略评估(reward计算)
def eval_game(env, agent):
state = env.reset()
total_reward = 0
state_action = []
while True:
act = agent.play(state)
state_action.append((state, act))
state, reward, done, _ = env.step(act)
total_reward += reward
if done:
break
return total_reward, state_action
#(四)算法--1、策略迭代
class PolicyIteration(object):
dice = [3, 6]
def policy_evaluation(self, agent, max_iter=-1):
iteration = 0
while True:
iteration += 1
new_value_pi = agent.value_pi.copy()
for i in range(1, agent.s_len):
ac = agent.pi[i]
for j in range(0, agent.a_len):
if ac != j:
break
transition = agent.p[ac, i, :]
value_sa = np.dot(transition, agent.r+agent.gamma*agent.value_pi)
new_value_pi[i] = value_sa
diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
if diff < 1e-6:
print("policy evaluation proceed {} iters.".format(iteration))
break
else:
agent.value_pi = new_value_pi
if iteration == max_iter:
print("policy evaluation proceed {} iters.".format(iteration))
break
def policy_improvement(self, agent):
new_policy = np.zeros_like(agent.pi)
for i in range(1, agent.s_len):
for j in range(0, agent.a_len):
transition = agent.p[j, i, :]
agent.value_q[i,j] = np.dot(transition, agent.r+agent.gamma*agent.value_pi)
max_act = np.argmax(agent.value_q[i,:])
new_policy[i] = max_act
if np.all(np.equal(new_policy, agent.pi)):
return False
else:
agent.pi = new_policy
return True
def policy_iteration(self, agent, max_iter=-1):
iteration=0
with timer('Timer PolicyIter'):
while True:
iteration += 1
with timer('Timer PolicyEval'):
self.policy_evaluation(agent, max_iter)
with timer('Timer PolicyImprove'):
ret = self.policy_improvement(agent)
if not ret:
break
print("Iter {} rounds converge".format(iteration))
def policy_iteration_demo(env):
agent = TableAgent(env)
pi_algo = PolicyIteration()
pi_algo.policy_iteration(agent)
print('agent.pi={}'.format(agent.pi))
total_reward, state_action=eval_game(env,agent)
print('total_reward={0}, state_action{1}'.format(total_reward, state_action))
if __name__ == '__main__':
# env1 = SnakeEnv(0, [3, 6])
env2 = SnakeEnv(5, [3, 6])
# policy_iteration_demo(env1)
policy_iteration_demo(env2)

##(四)算法2--价值迭代
def value_iteration(agent, max_iter=-1):
iteration = 0
dice=[3, 6]
with timer('Timer ValueIter'):
while True:
iteration += 1
new_value_pi = agent.value_pi.copy()
for i in range(1, agent.s_len):
value_sas = []
for j in range(0, agent.a_len):
value_sa = np.dot(agent.p[j,i,:], agent.r + agent.gamma*agent.value_pi)
value_sas.append(value_sa)
new_value_pi[i] = max(value_sas)
diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
if diff < 1e-6:
break
else:
agent.value_pi = new_value_pi
if iteration == max_iter:
break
print('Iter {} rounds converge'.format(iteration))
for i in range(1, agent.s_len):
for j in range(0, agent.a_len):
agent.value_q[i,j] = np.dot(agent.p[j,i,:], agent.r+agent.gamma*agent.value_pi)
max_act = np.argmax(agent.value_q[i,:])
agent.pi[i] = max_act
#价值迭代和策略迭代的对比
def policy_vs_value_demo(env):
policy_agent = TableAgent(env)
value_agent = TableAgent(env)
pi_algo = PolicyIteration()
pi_algo.policy_iteration(policy_agent)
print('agent.pi={}'.format(policy_agent.pi))
total_reward, state_action = eval_game(env, policy_agent)
print('total_reward={0}, state_action={1}'.format(total_reward, state_action))
value_iteration(value_agent)
print('agent.pi={}'.format(value_agent.pi))
total_reward, state_action = eval_game(env, value_agent)
print('total_reward={0}, state_action={1}'.format(total_reward, state_action))
if __name__ == '__main__':
# env1 = SnakeEnv(0, [3, 6])
env2 = SnakeEnv(5, [3, 6])
# policy_iteration_demo(env1)
# policy_iteration_demo(env2)
policy_vs_value_demo(env2)
##(四)算法3--泛化迭代
#泛化迭代和前两种迭代的对比
def generalized_policy_compare(env):
policy_vs_value_demo(env)
gener_agent = TableAgent(env)
#用10轮的价值迭代+1轮的策略迭代-->得到一个策略方案
with timer("Timer GeneralizedIter"):
value_iteration(gener_agent, 10) #这里的10你可以自己调,你觉得迭代几次价值评估就大差不差了,那你就迭代几次吧
pi_algo = PolicyIteration()
pi_algo.policy_iteration(gener_agent, 1)
print("agent.pi={}".format(gener_agent.pi))
total_reward, state_action = eval_game(env, gener_agent)
print("total_reward={0}, state_action={1}".format(total_reward, state_action))
if __name__ == '__main__':
env2 = SnakeEnv(5, [3, 6])
generalized_policy_compare(env2)
最后补充:装饰器
假如我原来写的函数是my_funcB,但是我们嫌这个函数的功能不太完善,我现在想补充一些,那补充的内容就可以写成闭包函数funcA()这样的嵌套形式,并且参数是原函数my_funcB,然后我再在原函数my_funcB头上加一个@funcA的帽子,这样我调用并执行my_funcB的时候就执行了my_funcB和我想补充的内容。

更多推荐


所有评论(0)