Schlably:深度强化学习车间调度实验的Python框架
最近发现了一个比较好玩的开源项目Schlably,其是一个基于Python和深度强化学习(DRL),用于进行调度问题实验的框架。它具有可扩展的gym环境和DRL-Agent,以及用于数据生成、训练和测试的相关功能。

获取更多资讯,赶快关注上面的公众号吧!
文章目录
最近发现了一个比较好玩的开源项目Schlably,其是一个基于Python和深度强化学习(DRL),用于进行调度问题实验的框架。它具有可扩展的gym环境和DRL-Agent,以及用于数据生成、训练和测试的相关功能。
1. 引言
生产调度(Production Scheduling, PS)是运筹学(Operations Research, OR)和优化中的一个重要且复杂的问题。它涉及在时间上分配资源以完成生产任务,目标是最小化时间、成本和资源使用等指标。PS问题在人工智能的运用中得到了广泛的关注,尤其是通过深度强化学习(Deep Reinforcement Learning, DRL)技术的应用。
在这个领域,尽管存在大量的实验和研究,但由于各实验的设置和解决方案常常仅有细微的差异,研究人员不得不不断重复相似的编程工作,这大大增加了研究的初始成本和难度。为了解决这个问题,德国伍伯塔尔大学开发了一个名为Schlably的Python框架,它提供了一整套工具来简化PS解决方案的开发和评估。
开源代码:https://github.com/tmdt-buw/schlably
使用手册:https://schlably.readthedocs.io/en/latest/index.html
2. 背景与相关工作
Schlably的开发初衷源于一个大学研究项目,该项目需要解决一个实际的生产调度问题,并与工业合作伙伴合作。因此,作者在开发早期就确定了一些设计目标,包括:提供开箱即用的DRL方法和启发式算法、覆盖不同的调度场景、支持详细的评估、并且易于代码交互,以便学生和研究人员快速上手。
作者还对现有的相关框架进行了评估和比较。Schlably的设计目标在于提供一种灵活、模块化、易于使用的实验框架,特别是在生成调度问题实例和集成第三方DRL库方面有显著优势。
3. 软件架构
Schlably的架构设计非常注重模块化和扩展性。其核心组件包括:
- 环境(Environment)
- 调度问题生成器(Scheduling Problem Generator)
- DRL代理算法(DRL Agent Algorithm)
- 日志记录和评估工具(Logging and Evaluation Tools)
这一设计不仅简化了实验设置,还使得不同组件之间的替换和扩展变得更加容易。
下图展示了Schlably的总体架构:

在Schlably的架构中,作者使用了强化学习中的Q学习算法。其更新公式如下:
Q ( s t , a t ) ← Q ( s t , a t ) + α [ r t + γ max a Q ( s t + 1 , a ) − Q ( s t , a t ) ] Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \left[ r_t + \gamma \max_a Q(s_{t+1}, a) - Q(s_t, a_t) \right] Q(st,at)←Q(st,at)+α[rt+γamaxQ(st+1,a)−Q(st,at)]
其中:
- Q ( s t , a t ) Q(s_t, a_t) Q(st,at)是在状态 $ s_t $ 下采取动作 $a_t $ 的 Q Q Q值。
- $ \alpha $ 是学习率。
- $ r_t $ 是当前时间步的奖励。
- $ \gamma $ 是折扣因子。
- $ \max_a Q(s_{t+1}, a) $ 是下一状态 $ s_{t+1} $ 下所有可能动作的最大 Q Q Q值。
4. 算法
Schlably中集成了多个调度算法,包括启发式算法、强化学习代理和求解器。
4.1 启发式算法
其中启发式算法包括:
- EDD: earliest due date
- SPT: shortest processing time first
- MTR: most tasks remaining
- LTR: least tasks remaining
- Random: random action
具体实现代码如下:
"""
This module provides the following scheduling heuristics as function:
- EDD: earliest due date
- SPT: shortest processing time first
- MTR: most tasks remaining
- LTR: least tasks remaining
- Random: random action
You can implement additional heuristics in this file by specifying a function that takes a list of tasks and an action
mask and returns the index of the job to be scheduled next.
If you want to call your heuristic via the HeuristicSelectionAgent or edit an existing shortcut,
adapt/extend the task_selection dict attribute of the HeuristicSelectionAgent class.
:Example:
Add a heuristic that returns zeros (this is not a practical example!)
1. Define the according function
.. code-block:: python
def return_0_heuristic(tasks: List[Task], action_mask: np.array) -> int:
return 0
2. Add the function to the task_selection dict within the HeuristicSelectionAgent class:
.. code-block:: python
self.task_selections = {
'rand': random_task,
'EDD': edd,
'SPT': spt,
'MTR': mtr,
'LTR': ltr,
'ZERO': return_0_heuristic
}
"""
import numpy as np
from typing import List
from src.data_generator.task import Task
def get_active_task_dict(tasks: List[Task]) -> dict:
"""
Helper function to determining the next unfinished task to be processed for each job
:param tasks: List of task objects, so one instance
:return: Dictionary containing the next tasks to be processed for each job
Would be an empty dictionary if all tasks were completed
"""
active_job_task_dict = {}
for task_i, task in enumerate(tasks):
if not task.done and task.job_index not in active_job_task_dict.keys():
active_job_task_dict[task.job_index] = task_i
return active_job_task_dict
def edd(tasks: List[Task], action_mask: np.array) -> int:
"""
EDD: earliest due date. Determines the job with the smallest deadline
:param tasks: List of task objects, so one instance
:param action_mask: Action mask from the environment that is to receive the action selected by this heuristic
:return: Index of the job selected according to the heuristic
"""
if np.sum(action_mask) == 1:
chosen_job = np.argmax(action_mask)
else:
num_jobs = action_mask.shape[0] - 1
num_tasks_per_job = len(tasks) / num_jobs
deadlines = np.full(num_jobs + 1, np.inf)
for job_i in range(num_jobs):
idx = int(num_tasks_per_job * job_i)
deadlines[job_i] = tasks[idx].deadline
deadlines = np.where(action_mask == 1, deadlines, np.full(deadlines.shape, np.inf))
chosen_job = np.argmin(deadlines)
return chosen_job
def spt(tasks: List[Task], action_mask: np.array) -> int:
"""
SPT: shortest processing time first. Determines the job of which the next unfinished task has the lowest runtime
:param tasks: List of task objects, so one instance
:param action_mask: Action mask from the environment that is to receive the action selected by this heuristic
:return: Index of the job selected according to the heuristic
"""
if np.sum(action_mask) == 1:
chosen_job = np.argmax(action_mask)
else:
num_jobs = action_mask.shape[0] - 1
runtimes = np.full(num_jobs + 1, np.inf)
active_task_dict = get_active_task_dict(tasks)
for i in range(num_jobs):
if i in active_task_dict.keys():
task_idx = active_task_dict[i]
runtimes[i] = tasks[task_idx].runtime
runtimes = np.where(action_mask == 1, runtimes, np.full(runtimes.shape, np.inf))
chosen_job = np.argmin(runtimes)
return chosen_job
def mtr(tasks: List[Task], action_mask: np.array) -> int:
"""
MTR: most tasks remaining. Determines the job with the least completed tasks
:param tasks: List of task objects, so one instance
:param action_mask: Action mask from the environment that is to receive the action selected by this heuristic
:return: Index of the job selected according to the heuristic
"""
if np.sum(action_mask) == 1:
chosen_job = np.argmax(action_mask)
else:
tasks_done = np.zeros(len(tasks) + 1)
possible_tasks = get_active_task_dict(tasks)
for _, task in enumerate(tasks):
if task.done and task.job_index in possible_tasks.keys():
tasks_done[possible_tasks[task.job_index]] += 1
task_mask = np.zeros(len(tasks) + 1)
for job_id, task_id in possible_tasks.items():
if action_mask[job_id] == 1:
task_mask[task_id] += 1
tasks_done = np.where(task_mask == 1, tasks_done, np.full(tasks_done.shape, np.inf))
tasks_done[-1] = np.inf
chosen_task = np.argmin(tasks_done)
chosen_job = tasks[chosen_task].job_index
return chosen_job
def ltr(tasks: List[Task], action_mask: np.array) -> int:
"""
LTR: least tasks remaining. Determines the job with the most completed tasks
:param tasks: List of task objects, so one instance
:param action_mask: Action mask from the environment that is to receive the action selected by this heuristic
:return: Index of the job selected according to the heuristic
"""
if np.sum(action_mask) == 1:
chosen_job = np.argmax(action_mask)
else:
tasks_done = np.zeros(len(tasks) + 1)
possible_tasks = get_active_task_dict(tasks)
for _, task in enumerate(tasks):
if task.done and task.job_index in possible_tasks.keys():
tasks_done[possible_tasks[task.job_index]] += 1
task_mask = np.zeros(len(tasks) + 1)
for job_id, task_id in possible_tasks.items():
if action_mask[job_id] == 1:
task_mask[task_id] += 1
tasks_done = np.where(task_mask == 1, tasks_done, np.full(tasks_done.shape, -1))
tasks_done[-1] = -1
chosen_task = np.argmax(tasks_done)
chosen_job = tasks[chosen_task].job_index
return chosen_job
def random_task(tasks: List[Task], action_mask: np.array) -> int:
"""
Returns a random task
:param tasks: Not needed
:param action_mask: Action mask from the environment that is to receive the action selected by this heuristic
:return: Index of the job selected according to the heuristic
"""
chosen_job = None
if np.sum(action_mask) == 1:
chosen_job = np.argmax(action_mask)
else:
valid_values_0 = np.where(action_mask > 0)[0]
if len(valid_values_0) > 2:
chosen_job = np.random.choice(valid_values_0, size=1)[0]
elif len(valid_values_0) == 0:
print('this is not possible')
else:
chosen_job = np.random.choice(valid_values_0, size=1)[0]
return chosen_job
def choose_random_machine(chosen_task, machine_mask) -> int:
"""
Determines a random machine which is available according to the mask and chosen task. Useful for the FJSSP.
:param chosen_task: ID of the task that is scheduled on the selected machine
:param machine_mask: Machine mask from the environment that is to receive the machine action chosen by this function
:return: Index of the chosen machine
"""
machine_mask = np.array(np.where(machine_mask > 0))
idx_valid_machine = np.where(machine_mask[0] == chosen_task)
valid_machines = machine_mask[1][idx_valid_machine]
chosen_machine = np.random.choice(valid_machines, size=1)[0]
return chosen_machine
def choose_first_machine(chosen_task, machine_mask) -> int:
"""
Determines the first (by index) machine which is available according to the mask and chosen task. Useful for the
FJSSP
:param chosen_task: ID of the task that is scheduled on the selected machine
:param machine_mask: Machine mask from the environment that is to receive the machine action chosen by this function
:return: Index of the chosen machine
"""
machine_mask = np.array(np.where(machine_mask > 0))
idx_valid_machine = np.where(machine_mask[0] == chosen_task)
valid_machines = machine_mask[1][idx_valid_machine]
return valid_machines[0]
class HeuristicSelectionAgent:
"""
This class can be used to get the next task according to the heuristic passed as string abbreviation (e.g. EDD).
If you want to edit a shortcut, or add one for your custom heuristic, adapt/extend the task_selection dict.
:Example:
.. code-block:: python
def my_custom_heuristic():
...<function body>...
or
.. code-block:: python
self.task_selections = {
'rand': random_task,
'XYZ': my_custom_heuristic
}
"""
def __init__(self) -> None:
super().__init__()
# Map heuristic ids to corresponding function
self.task_selections = {
'rand': random_task,
'EDD': edd,
'SPT': spt,
'MTR': mtr,
'LTR': ltr
}
def __call__(self, tasks: List, action_mask: np.array, task_selection: str) -> int:
"""
Selects the next heuristic function according to the heuristic passed as string abbreviation
and the assignment in the task_selections dictionary
:param tasks: List of task objects, so one instance
:param action_mask: Action mask from the environment that is to receive the action selected by this heuristic
:param task_selection: Heuristic string abbreviation (e.g. EDD)
:return: Index of the job selected according to the heuristic
"""
choose_task = self.task_selections[task_selection]
chosen_task = choose_task(tasks, action_mask)
return chosen_task
Schlably提供了一些开箱即用的示例,以帮助用户快速上手。以下是一个简单的使用示例:
import pathlib
import pickle
import yaml
from src.data_generator.instance_factory import main as data_generation_main
from src.agents.train import main as training_main
from src.utils.file_handler.config_handler import ConfigHandler
# constants
from src.utils.file_handler.data_handler import DATA_DIRECTORY
from src.utils.file_handler.config_handler import CONFIG_DIRECTORY
DEFAULT_DATA_GEN_FILE: str = 'data_generation/jssp/config_job3_task4_tools0.yaml'
DEFAULT_TRAINING_FILE: str = 'training/dqn/config_job3_task4_tools0.yaml'
if __name__ == "__main__":
sp_types = ['jssp', 'fjssp']
algorithms = ['dqn', 'ppo']
num_jobs = [1, 3, 6]
num_tasks = [1, 3, 6]
num_tools = [0, 1, 6]
# kwargs to overwrite defaults according to the current test setup
data_gen_overwrite_kwargs = {
'num_instances': 10,
'num_processes': 10,
'write_to_file': True
}
train_overwrite_kwargs = {
'total_timesteps': 100,
'intermediate_test_interval': 100,
'saved_model_name': 'model_from_code_test'
}
files_to_delete = []
# TODO understand why and where config are updated and if all owrk as intened
for num_j, num_ta, num_to in zip(num_jobs, num_tasks, num_tools):
# for each sp_type
for sp_type in sp_types:
# specify instances file
instances_file = f"config_job{num_j}_task{num_ta}_tools{num_to}.pkl"
instances_file_path = DATA_DIRECTORY / sp_type / instances_file
# if file does not exist, create a file
if not instances_file_path.exists():
# get default config
data_gen_default_config = ConfigHandler.get_config(DEFAULT_DATA_GEN_FILE)
# update data_gent_overwrite_kwargs with current jobs, task, tool setup
data_gen_overwrite_kwargs.update({
'sp_type': sp_type,
'num_jobs': num_j,
'num_tasks': num_ta,
'num_tools': num_to,
'num_machines': num_j,
})
# update default config with overwrite kwargs
data_gen_default_config.update(data_gen_overwrite_kwargs)
data_gen_config = data_gen_overwrite_kwargs
# use (new) config to run data generation
data_generation_main(external_config=data_gen_config)
# add config and data path to files_to_delete
files_to_delete.append(instances_file_path)
for algorithm in algorithms:
# test train function (includes intermediate_test, test)
# load default config
train_default_config = ConfigHandler.get_config(DEFAULT_TRAINING_FILE)
# update train_overwrite kwargs_with current algorithm and instances file
train_overwrite_kwargs.update({
'algorithm': algorithm,
'instances_file': f"{sp_type}/{instances_file}"
})
# update default config with overwrite kwargs
train_default_config.update(train_overwrite_kwargs)
print(f"Testing train file for job{num_j}_task{num_ta}_tools{num_to}, {sp_type}, {algorithm}")
# start training
training_main(external_config=train_default_config)
# delete data files created only for this code test
for file in files_to_delete:
pathlib.Path.unlink(file)
4.2 强化学习代理
其中强化学习包括DQN、PPO、PPO_masked三种代理。
4.2.1 DQN
"""
DQN Implementation with target net and epsilon greedy. Follows the Stable Baselines 3 implementation.
To reuse trained models, you can make use of the save and load function.
To adapt policy and value network structure, specify the layer and activation parameter in your train config or
change the constants in this file
"""
import numpy as np
import pickle
import random
from collections import deque
import torch as T
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F
from typing import Tuple, List
from src.utils.logger import Logger
# constants
LAYER: List[int] = [64, 64]
ACTIVATION: str = 'ReLU'
class MemoryBuffer:
"""
Handles episode data collection and sample generation
:param buffer_size: Buffer size
:param batch_size: Size for batches to be generated
:param obs_dim: Size of the observation to be stored in the buffer
:param obs_type: Type of the observation to be stored in the buffer
:param action_type: Type of the action to be stored in the buffer
"""
def __init__(self, buffer_size: int, batch_size: int, obs_dim: int, obs_type: type, action_type: type):
self.buffer_size = buffer_size
self.batch_size = batch_size
self.pos = 0
self.full = False
# buffer data
self.obs = np.zeros((buffer_size, obs_dim), dtype=obs_type)
self.actions = np.zeros((buffer_size, 1), dtype=action_type)
self.rewards = np.zeros((buffer_size, 1), dtype=np.float32)
self.dones = np.zeros((buffer_size, 1), dtype=np.float32) # try with dtype=np.bool
self.new_obs = np.zeros((buffer_size, obs_dim), dtype=np.float32)
def __len__(self):
if self.full:
return self.buffer_size
else:
return self.pos
def store_memory(self, obs, action, reward, done, new_obs) -> None:
"""
Appends all data from the recent step
:param obs: Observation at the beginning of the step
:param action: Index of the selected action
:param reward: Reward the env returned in this step
:param done: True if the episode ended in this step
:param new_obs: Observation after the step
:return:
"""
self.obs[self.pos] = np.array(obs).copy()
self.actions[self.pos] = np.array(action).copy()
self.rewards[self.pos] = np.array(reward).copy()
self.dones[self.pos] = np.array(done).copy()
self.new_obs[self.pos] = np.array(new_obs)
self.pos += 1
# if pos behind last element -> buffer full.
# Return pos to 0. Next step, the oldest data in the buffer is then replaced by the newest one
if self.pos == self.buffer_size:
self.full = True
self.pos = 0
def get_samples(self) -> Tuple:
"""
Generates random samples from the stored data
:return: batch_size samples from the buffer. e.g. obs, actions, ..., new_obs from step 21
"""
# generate batch_size random indices in range of current buffer len
indices = np.random.randint(0, len(self), size=self.batch_size)
return self.obs[indices], self.actions[indices], self.rewards[indices], \
self.dones[indices], self.new_obs[indices]
class Policy(nn.Module):
"""
Network structure used for both the Q network and the target network
:param obs_dim: Observation size to determine input dimension
:param action_dim: Number of action to determine output size
:param learning_rate: Learning rate for the network
:param hidden_layers: List of hidden layer sizes (int)
:param activation: String naming activation function for hidden layers
"""
def __init__(self, obs_dim: int, action_dim: int, learning_rate: float, hidden_layers: List[int], activation: str):
super(Policy, self).__init__()
net_structure = []
# get activation class according to string
activation = getattr(nn, activation)()
# create first hidden layer in accordance with the input dim and the first hidden dim
net_structure.extend([nn.Linear(obs_dim, hidden_layers[0]), activation])
# create the other hidden layers
for i, layer_dim in enumerate(hidden_layers):
if not i + 1 == len(hidden_layers):
net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation])
else:
# create output layer
net_structure.append(nn.Linear(layer_dim, action_dim))
self.q_net = nn.Sequential(*net_structure)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self, obs):
""" forward pass through the Q-network """
q_values = self.q_net(obs)
return q_values
class DQN:
"""DQN Implementation with target net and epsilon greedy. Follows the Stable Baselines 3 implementation."""
def __init__(self, env, config: dict, logger: Logger = None):
"""
| batch_size: Number of samples that are chosen and passed through the net per update
| gradient_steps: Number of updates per training
| train_freq: Environment steps between two trainings
| buffer_size: Size of the memory buffer = max number of rollouts that can be stored before the oldest are deleted
| target_net_update: Number of steps between target_net_updates
| training_starts = Learning_starts: steps after which training can start for the first time
| initial_eps: Initial epsilon value
| final_eps: Final epsilon value
| fraction_eps: If the percentage progress of learn exceeds fraction eps, epsilon takes the final_eps value
| e.g. 5/100 total_timesteps done -> progress = 0.5 > fraction eps -> eps=final_eps
| max_grad_norm: Value to clip the policy update of the q_net
:param env: Pregenerated, gymbased environment. If no env is passed, env = None -> PPO can only be used
for evaluation (action prediction)
:param config: Dictionary with parameters to specify DQN attributes
:param logger: Logger
"""
self.env = env
self.gamma = config.get('gamma', 0.99)
self.learning_rate = config.get('learning_rate', 1e-4)
self.batch_size = config.get('batch_size', 32)
self.gradient_steps = config.get('gradient_steps', 1)
self.train_freq = config.get('train_freq', 4)
self.buffer_size = config.get('buffer_size', 1_000_000)
self.target_net_update = config.get('target_net_update', 10_000)
self.training_starts = config.get('training_starts', 50_000)
self.initial_eps = config.get('initial_eps', 1.0)
self.final_eps = config.get('final_eps', 0.05)
self.fraction_eps = config.get('fraction_eps', 0.1)
self.max_grad_norm = config.get('max_grad_norm', 10.0)
self.epsilon = self.initial_eps # epsilon is the exploration rate
self.remaining_progress = 1 # tracks how much % of total steps remain -> value between 1 and 0
self.num_timesteps = 0
self.n_updates = 0
self.logger = logger if logger else Logger(config=config)
self.seed = config.get('seed', None)
self.reward_info = deque(maxlen=100)
# torch seed setting
if self.seed is not None:
random.seed(self.seed)
np.random.seed(self.seed)
T.manual_seed(self.seed)
self.env.action_space.seed(self.seed)
self.env.seed(self.seed)
# create networks and buffer
self.q_net = Policy(env.observation_space.shape[0], env.action_space.n, self.learning_rate,
config.get('layer', LAYER),
config.get('activation', ACTIVATION))
self.q_target_net = Policy(env.observation_space.shape[0], env.action_space.n, self.learning_rate,
config.get('layer', LAYER),
config.get('activation', ACTIVATION))
# copy weights to target_net
self.q_target_net.load_state_dict(self.q_net.state_dict())
self.memory_buffer = MemoryBuffer(self.buffer_size, self.batch_size, env.observation_space.shape[0],
env.observation_space.dtype, env.action_space.dtype)
def save(self, file: str) -> None:
"""
Save model as pickle file
:param file: Path under which the file will be saved
:return: None
"""
params_dict = self.__dict__.copy()
del params_dict['logger']
data = {
"params": params_dict,
"q_params": self.q_net.state_dict(),
"target_params": self.q_target_net.state_dict()
}
with open(f"{file}.pkl", "wb") as handle:
pickle.dump(data, handle)
@classmethod
def load(cls, file: str, config: dict, logger: Logger = None):
"""
Creates a DQN object according to the parameters saved in file.pkl
:param file: Path and filname (without .pkl) of your saved model pickle file
:param config: Dictionary with parameters to specify PPO attributes
:param logger: Logger
:return: DQN object
"""
with open(f"{file}.pkl", "rb") as handle:
data = pickle.load(handle)
env = data["params"]["env"]
# create DQN object. Commit necessary parameters. Update remaining parameters
model = cls(env=env, config=config, logger=logger)
model.__dict__.update(data["params"])
# set weights
model.q_net.load_state_dict(data["q_params"])
model.q_target_net.load_state_dict(data["target_params"])
return model
def get_action(self, obs: np.ndarray) -> int:
"""
Random action or action according to the policy and epsilon
:return: action index
"""
if np.random.random() < self.epsilon:
# random action from the action space
action = self.env.action_space.sample()
else:
obs = T.tensor(obs, dtype=T.float).to(self.q_net.device)
q_values = self.q_net(obs)
# choose action with highest Q value -> greedy policy
action = T.argmax(q_values)
action = T.squeeze(action).item()
return action
def predict(self, observation: np.ndarray, action_mask: np.ndarray = np.ones(1),
deterministic: bool = True, state=None) -> Tuple:
"""
Action prediction for testing
:param observation: Current observation of teh environment
:param action_mask: Mask of actions, which can logically be taken. NOTE: currently not implemented!
:param deterministic: Set True, to force a deterministic prediction
:param state: The last states (used in rnn policies)
:return: Predicted action and next state (used in rnn policies)
"""
observation = T.tensor(np.array([observation]), dtype=T.float).to(self.q_net.device)
with T.no_grad():
q_values = self.q_net(observation)
if deterministic:
action = T.argmax(q_values)
else:
# choose random action according to the predicted probs
action = q_values.sample()
action = T.squeeze(action).item()
return action, state
def train(self) -> None:
"""
Trains Q-network and Target-Network
:return: None
"""
# Switch to train mode (this affects batch norm / dropout)
self.q_net.train()
losses = []
for _ in range(self.gradient_steps):
# get samples from the buffer
obs_arr, action_arr, reward_arr, done_array, new_obs_array = self.memory_buffer.get_samples()
# convert to tensors
obs = T.tensor(obs_arr, dtype=T.float).to(self.q_target_net.device)
actions = T.tensor(action_arr, dtype=T.float).to(self.q_target_net.device)
rewards = T.tensor(reward_arr, dtype=T.float).to(self.q_target_net.device)
dones = T.tensor(done_array, dtype=T.float).to(self.q_target_net.device)
new_obs = T.tensor(new_obs_array, dtype=T.float).to(self.q_target_net.device)
# no update on the target net -> use no_grad
with T.no_grad():
# Compute the next Q-values using the target network
next_q_values = self.q_target_net(new_obs)
# Follow greedy policy: use the one with the highest value
next_q_values, _ = next_q_values.max(dim=1)
# Avoid potential broadcast issue
next_q_values = next_q_values.reshape(-1, 1)
# 1-dones -> reward + 0 if step is last in episode
target_q_values = rewards + (1 - dones) * self.gamma * next_q_values
# get all current Q-values for each obs
current_q_values = self.q_net(obs)
# choose Q-Values according to actions
current_q_values = T.gather(current_q_values, dim=1, index=actions.long())
# loss computation. MSE also possible
loss = F.smooth_l1_loss(current_q_values, target_q_values)
losses.append(loss.item())
# update
self.q_net.optimizer.zero_grad()
loss.backward()
# clip
nn.utils.clip_grad_norm_(self.q_net.parameters(), self.max_grad_norm)
self.q_net.optimizer.step()
self.n_updates += self.gradient_steps
# logs
self.logger.record(
{
'agent_training/exploration rate': self.epsilon,
'agent_training/n_updates': self.n_updates,
'agent_training/loss': np.mean(losses),
'agent_training/mean_rwd': np.mean(self.reward_info)
}
)
self.logger.dump()
# if self.num_timesteps % 10_000 == 0:
# print(f'Update at {self.num_timesteps} Mean reward {np.mean(self.reward_info)}')
def on_step(self, total_timesteps):
"""
Method track and check plenty conditions to e.g. check if q_target_net or epsilon update are necessary
"""
# update progress
self.remaining_progress = 1 - float(self.num_timesteps) / float(total_timesteps)
# update target_net with parameters from main q_net
if self.num_timesteps % self.target_net_update == 0:
self.q_target_net.load_state_dict(self.q_net.state_dict())
# update epsilon
if (1-self.remaining_progress) > self.fraction_eps:
# constant if fraction reached
self.epsilon = self.final_eps
else:
# linear function. Goes from initial eps to final eps. Reaches final values in the step,
# where the function turns constant
self.epsilon = self.initial_eps + \
(1-self.remaining_progress) * (self.final_eps-self.initial_eps) / self.fraction_eps
def learn(self, total_instances: int, total_timesteps: int, intermediate_test=None) -> None:
"""
Learn over n problem instances or n timesteps (environment steps).
Breaks depending on which condition is met first.
One learning iteration consists of collecting rollouts and training the networks on the rollout data
:param total_instances: Instance limit
:param total_timesteps: Timestep limit
:param intermediate_test: (IntermediateTest) intermediate test object. Must be created before.
"""
instances = 0
# iterate over n episodes = the agents has n episodes to interact with the environment
for _ in range(total_instances):
obs = self.env.reset()
done = False
instances += 1
episode_reward = 0
# run agent on env until done
while not done:
# observe and fill buffer
action = self.get_action(obs)
new_obs, reward, done, info = self.env.step(action)
self.num_timesteps += 1
episode_reward += reward
self.memory_buffer.store_memory(obs, action, reward, done, new_obs)
# call intermediate_test on_step
if intermediate_test:
intermediate_test.on_step(self.num_timesteps, instances, self)
# call function intern on_step
self.on_step(total_timesteps)
# break learn if total_timesteps are reached
if self.num_timesteps >= total_timesteps:
print(f'Total timesteps reached: {total_timesteps}')
self.logger.record(
{
'results_on_train_dataset/instances': instances,
'results_on_train_dataset/num_timesteps': self.num_timesteps
}
)
self.logger.dump()
return None
# train if training_starts is reached and then every n rollout_steps
if self.num_timesteps >= self.training_starts and self.num_timesteps % self.train_freq == 0:
self.train()
# switch back to eval mode
self.q_net.train(False)
obs = new_obs
self.reward_info.append(episode_reward)
if instances % len(self.env.data) == len(self.env.data) - 1:
mean_training_reward = np.mean(self.env.episodes_rewards)
mean_training_makespan = np.mean(self.env.episodes_makespans)
mean_training_tardiness = np.mean(self.env.tardiness)
self.logger.record(
{
'results_on_train_dataset/mean_reward': mean_training_reward,
'results_on_train_dataset/mean_makespan': mean_training_makespan,
'results_on_train_dataset/mean_tardiness': mean_training_tardiness
}
)
self.logger.dump()
print("TRAINING DONE")
self.logger.record(
{
'results_on_train_dataset/instances': instances,
'results_on_train_dataset/num_timesteps': self.num_timesteps
}
)
self.logger.dump()
4.2.2 PPO
"""
PPO implementation inspired by the StableBaselines3 implementation.
To reuse trained models, you can make use of the save and load function
To adapt policy and value network structure, specify the policy and value layer and activation parameter
in your train config or change the constants in this file
"""
import numpy as np
import random
import torch as T
import torch.nn as nn
import torch.optim as optim
from torch.distributions.categorical import Categorical
import pickle
from typing import Tuple, Any, List
from src.utils.logger import Logger
# constants
POLICY_LAYER: List[int] = [256, 256]
POLICY_ACTIVATION: str = 'ReLU'
VALUE_LAYER: List[int] = [256, 256]
VALUE_ACTIVATION: str = 'ReLU'
class RolloutBuffer:
"""
Handles episode data collection and batch generation
:param buffer_size: Buffer size
:param batch_size: Size for batches to be generated
"""
def __init__(self, buffer_size: int, batch_size: int):
self.observations = []
self.probs = []
self.values = []
self.actions = []
self.rewards = []
self.dones = []
self.advantages = None
self.returns = None
if buffer_size % batch_size != 0:
raise TypeError("rollout_steps has to be a multiple of batch_size")
self.buffer_size = buffer_size
self.batch_size = batch_size
self.reset()
def generate_batches(self) -> Tuple:
"""
Generates batches from the stored data
:return: batches: Lists with all indices from the rollout_data, shuffled and sampled in lists with batch_size
e.g. [[0,34,1,768,...(len: batch size)], [], ...(len: len(rollout_data) / batch size)]
"""
# create random index list and split into arrays with batch size
indices = np.random.permutation(self.buffer_size)
num_batches = int(self.buffer_size / self.batch_size)
batches = indices.reshape((num_batches, self.batch_size))
return np.array(self.observations), np.array(self.actions), np.array(self.probs), batches
def compute_advantages_and_returns(self, last_value, gamma, gae_lambda) -> None:
"""
Computes advantage values and returns for all stored episodes.
:param last_value: Value from the next step to calculate the advantage for the last episode in the buffer
:param gamma: Discount factor for the advantage calculation
:param gae_lambda: Smoothing parameter for the advantage calculation
:return: None
"""
# advantage: advantage from the actual returned rewards over the baseline value from step t onwards
last_advantage = 0
for step in reversed(range(self.buffer_size)):
# use the predicted reward for the advantage computation of the last step of the buffer
if step == self.buffer_size - 1:
# if a step is the last one of the episode (done = 1) -> not_done = 0 => the advantage
# doesn't contain values outside the own episode
not_done = 1.0 - self.dones[step]
next_values = last_value
else:
not_done = 1.0 - self.dones[step]
next_values = self.values[step + 1]
delta = self.rewards[step] + gamma * next_values * not_done - self.values[step]
last_advantage = delta + gamma * gae_lambda * not_done * last_advantage
self.advantages[step] = last_advantage
# compute returns = discounted rewards, advantages = discounted rewards - values
# Necessary to update the value network
self.returns = self.values + self.advantages
def store_memory(self, observation: np.ndarray, action: int, prob: float, value: float,
reward: Any, done: bool) -> None:
"""
Appends all data from the recent step
:param observation: Observation at the beginning of the step
:param action: Index of the selected action
:param prob: Probability of the selected action (output from the policy_net)
:param value: Baseline value that the value_net estimated from this step onwards according to the
:param observation: Output from the value_net
:param reward: Reward the env returned in this step
:param done: True if the episode ended in this step
:return: None
"""
self.observations.append(observation)
self.actions.append(action)
self.probs.append(prob)
self.values.append(value)
self.rewards.append(reward)
self.dones.append(done)
def reset(self) -> None:
"""
Resets all buffer lists
:return: None
"""
self.observations = []
self.probs = []
self.actions = []
self.rewards = []
self.dones = []
self.values = []
self.advantages = np.zeros(self.buffer_size, dtype=np.float32)
class PolicyNetwork(nn.Module):
"""
Policy Network for the agent
:param input_dim: Observation size to determine input dimension
:param n_actions: Number of action to determine output size
:param learning_rate: Learning rate for the network
:param hidden_layers: List of hidden layer sizes (int)
:param activation: String naming activation function for hidden layers
"""
def __init__(self, input_dim: int, n_actions: int, learning_rate: float, hidden_layers: List[int], activation: str):
super(PolicyNetwork, self).__init__()
net_structure = []
# get activation class according to string
activation = getattr(nn, activation)()
# create first hidden layer in accordance with the input dim and the first hidden dim
net_structure.extend([nn.Linear(input_dim, hidden_layers[0]), activation])
# create the other hidden layers
for i, layer_dim in enumerate(hidden_layers):
if not i+1 == len(hidden_layers):
net_structure.extend([nn.Linear(layer_dim, hidden_layers[i+1]), activation])
else:
# create output layer
net_structure.extend([nn.Linear(layer_dim, n_actions), nn.Softmax(dim=-1)])
self.policy_net = nn.Sequential(*net_structure)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self, observation):
"""forward function"""
observation.to(self.device)
logits = self.policy_net(observation)
dist = Categorical(logits=logits)
return dist
class ValueNetwork(nn.Module):
"""
Value Network for the agent
:param input_dim: Observation size to determine input dimension
:param learning_rate: Learning rate for the network
:param hidden_layers: List of hidden layer sizes (int)
:param activation: String naming activation function for hidden layers
"""
def __init__(self, input_dim: int, learning_rate: float, hidden_layers: List[int], activation: str):
super(ValueNetwork, self).__init__()
net_structure = []
# get activation class according to string
activation = getattr(nn, activation)()
# create first hidden layer in accordance with the input dim and the first hidden dim
net_structure.extend([nn.Linear(*input_dim, hidden_layers[0]), activation])
# create the other hidden layers
for i, layer_dim in enumerate(hidden_layers):
if not i + 1 == len(hidden_layers):
net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation])
else:
# create output layer
net_structure.append(nn.Linear(layer_dim, 1))
self.value_net = nn.Sequential(*net_structure)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self, observation):
"""forward function"""
value = self.value_net(observation)
return value
class PPO:
"""PPO Agent class"""
def __init__(self, env, config: dict, logger: Logger = None):
"""
| gamma: Discount factor for the advantage calculation
| learning_rate: Learning rate for both, policy_net and value_net
| gae_lambda: Smoothing parameter for the advantage calculation
| clip_range: Limitation for the ratio between old and new policy
| batch_size: Size of batches which were sampled from the buffer and fed into the nets during training
| n_epochs: Number of repetitions for each training iteration
| rollout_steps: Step interval within the update is performed. Has to be a multiple of batch_size
"""
self.env = env
self.gamma = config.get('gamma', 0.99)
self.gae_lambda = config.get('gae_lambda', 0.95)
self.clip_range = config.get('clip_range', 0.2)
self.n_epochs = config.get('n_epochs', 0.5)
self.rollout_steps = config.get('rollout_steps', 2048)
self.ent_coef = config.get('ent_coef', 0.0)
self.num_timesteps = 0
self.n_updates = 0
self.learning_rate = config.get('learning_rate', 0.002)
self.batch_size = config.get('batch_size', 256)
self.logger = logger if logger else Logger(config=config)
self.seed = config.get('seed', None)
# torch seed setting
if self.seed is not None:
random.seed(self.seed)
np.random.seed(self.seed)
T.manual_seed(self.seed)
# self.env.action_space.seed(seed)
self.env.seed(self.seed)
# create networks and buffer
self.policy_net = PolicyNetwork(env.observation_space.shape[0], env.action_space.n, self.learning_rate,
config.get('policy_layer', POLICY_LAYER),
config.get('policy_activation', POLICY_ACTIVATION))
self.value_net = ValueNetwork(env.observation_space.shape, self.learning_rate,
config.get('value_layer', VALUE_LAYER),
config.get('value_activation', VALUE_ACTIVATION))
self.rollout_buffer = RolloutBuffer(self.rollout_steps, self.batch_size)
@classmethod
def load(cls, file: str, config: dict, logger: Logger = None):
"""
Creates a PPO object according to the parameters saved in file.pkl
:param file: Path and filname (without .pkl) of your saved model pickle file
:param config: Dictionary with parameters to specify PPO attributes
:param logger: Logger
:return: MaskedPPO object
"""
with open(f"{file}.pkl", "rb") as handle:
data = pickle.load(handle)
env = data["params"]["env"]
# create PPO object, commit necessary parameters. Update remaining parameters
model = cls(env=env, config=config, logger=logger)
model.__dict__.update(data["params"])
# set weights from policy and value
model.policy_net.load_state_dict(data["policy_params"])
model.value_net.load_state_dict(data["value_params"])
return model
def save(self, file: str) -> None:
"""
Save model as pickle file
:param file: Path under which the file will be saved
:return: None
"""
params_dict = self.__dict__.copy()
del params_dict['logger']
data = {
"params": params_dict,
"policy_params": self.policy_net.state_dict(),
"value_params": self.value_net.state_dict()
}
with open(f"{file}.pkl", "wb") as handle:
pickle.dump(data, handle)
def forward(self, observation: np.ndarray, **kwargs) -> Tuple:
"""
Predicts an action according to the current policy based on the observation
and the value for the next state
:param observation: Current observation of teh environment
:param kwargs: Used to accept but ignore passing actions masks from the environment.
:return: Predicted action, probability for this action, and predicted value for the next state
"""
observation = T.tensor(observation, dtype=T.float).to(self.policy_net.device)
dist = self.policy_net(observation)
value = self.value_net(observation)
action = dist.sample()
prob = T.squeeze(dist.log_prob(action)).item()
action = T.squeeze(action).item()
value = T.squeeze(value).item()
return action, prob, value
def predict(self, observation: np.ndarray, deterministic: bool = True, state=None, **kwargs) -> Tuple:
"""
Action prediction for testing
:param observation: Current observation of teh environment
:param deterministic: Set True, to force a deterministic prediction
:param state: The last states (used in rnn policies)
:param kwargs: Used to accept but ignore passing actions masks from the environment.
:return: Predicted action and next state (used in rnn policies)
"""
observation = T.tensor(np.array([observation]), dtype=T.float).to(self.policy_net.device)
with T.no_grad():
dist = self.policy_net(observation)
if deterministic:
action = T.argmax(dist.probs)
else:
# choose random action according to the predicted probs
action = dist.sample()
action = T.squeeze(action).item()
return action, state
def train(self) -> None:
"""
Trains policy and value
:return: None
"""
# switch to train mode
self.policy_net.train(True)
self.value_net.train(True)
policy_losses, value_losses, entropy_losses, total_losses = [], [], [], []
for _ in range(self.n_epochs):
# get data from buffer and random batches(index lists) to iterate over
# e.g. obs[batch] returns the observations for all indices in batch
obs_arr, action_arr, old_prob_arr, batches = self.rollout_buffer.generate_batches()
# get advantage and return values from buffer
advantages = T.tensor(self.rollout_buffer.advantages).to(self.policy_net.device)
returns = T.tensor(self.rollout_buffer.returns).to(self.value_net.device)
# normalize advantages
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
for batch in batches:
observations = T.tensor(obs_arr[batch], dtype=T.float).to(self.policy_net.device)
old_probs = T.tensor(old_prob_arr[batch]).to(self.policy_net.device)
actions = T.tensor(action_arr[batch]).to(self.policy_net.device)
dist = self.policy_net(observations)
values = self.value_net(observations)
values = T.squeeze(values)
# ratio between old and new policy (probs of selected actions)
# Should be one at the first batch of every train iteration
new_probs = dist.log_prob(actions)
prob_ratio = new_probs.exp() / old_probs.exp()
# policy clip
policy_loss_1 = prob_ratio * advantages[batch]
policy_loss_2 = T.clamp(prob_ratio, 1-self.clip_range, 1+self.clip_range) * advantages[batch]
# we want to maximize the reward, but running gradient descent -> negate the loss here
policy_loss = -T.min(policy_loss_1, policy_loss_2).mean()
value_loss = (returns[batch]-values)**2
value_loss = value_loss.mean()
# entropy loss
entropy_loss = -T.mean(dist.entropy())
entropy_losses.append(entropy_loss.item())
total_loss = policy_loss + 0.5*value_loss + self.ent_coef*entropy_loss
self.policy_net.optimizer.zero_grad()
self.value_net.optimizer.zero_grad()
total_loss.backward()
self.policy_net.optimizer.step()
self.value_net.optimizer.step()
policy_losses.append(policy_loss.item())
value_losses.append(value_loss.item())
total_losses.append(total_loss.item())
self.n_updates += self.n_epochs
# logs
# compute explained variance
explained_var = explained_variance(np.asarray(self.rollout_buffer.values), self.rollout_buffer.returns)
self.logger.record(
{
'agent_training/n_updates': self.n_updates,
'agent_training/loss': np.mean(total_losses),
'agent_training/policy_gradient_loss': np.mean(policy_losses),
'agent_training/value_loss': np.mean(value_losses),
'agent_training/entropy_loss': np.mean(entropy_losses),
'agent_training/explained_variance': explained_var
}
)
self.logger.dump()
def learn(self, total_instances: int, total_timesteps: int, intermediate_test=None) -> None:
"""
Learn over n environment instances or n timesteps. Break depending on which condition is met first
One learning iteration consists of collecting rollouts and training the networks
:param total_instances: Instance limit
:param total_timesteps: Timestep limit
:param intermediate_test: (IntermediateTest) intermediate test object. Must be created before.
"""
instances = 0
# iterate over n episodes = the agents has n episodes to interact with the environment
for _ in range(total_instances):
obs = self.env.reset()
done = False
instances += 1
# run agent on env until done
while not done:
action, prob, val = self.forward(obs)
new_obs, reward, done, info = self.env.step(action)
self.num_timesteps += 1
self.rollout_buffer.store_memory(obs, action, prob, val, reward, done)
# call intermediate_test on_step
if intermediate_test:
intermediate_test.on_step(self.num_timesteps, instances, self)
# break learn if total_timesteps are reached
if self.num_timesteps >= total_timesteps:
print("total_timesteps reached")
self.logger.record(
{
'results_on_train_dataset/instances': instances,
'results_on_train_dataset/num_timesteps': self.num_timesteps
}
)
self.logger.dump()
return None
# update every n rollout_steps
if self.num_timesteps % self.rollout_steps == 0:
# predict the next reward, needed for the advantage computation of the last collected step
with T.no_grad():
_, _, val = self.forward(new_obs)
self.rollout_buffer.compute_advantages_and_returns(val, self.gamma, self.gae_lambda)
# train networks
self.train()
# switch back to normal mode
self.policy_net.train(False)
self.value_net.train(False)
# reset buffer to continue collecting rollouts
self.rollout_buffer.reset()
obs = new_obs
if instances % len(self.env.data) == len(self.env.data) - 1:
mean_training_reward = np.mean(self.env.episodes_rewards)
mean_training_makespan = np.mean(self.env.episodes_makespans)
if len(self.env.episodes_tardinesses) == 0:
mean_training_tardiness = 0
else:
mean_training_tardiness = np.mean(self.env.episodes_tardinesses)
self.logger.record(
{
'results_on_train_dataset/mean_reward': mean_training_reward,
'results_on_train_dataset/mean_makespan': mean_training_makespan,
'results_on_train_dataset/mean_tardiness': mean_training_tardiness
}
)
self.logger.dump()
print("TRAINING DONE")
self.logger.record(
{
'results_on_train_dataset/instances': instances,
'results_on_train_dataset/num_timesteps': self.num_timesteps
}
)
self.logger.dump()
def explained_variance(y_pred: np.ndarray, y_true: np.ndarray) -> np.ndarray:
"""
From Stable-Baseline
Computes fraction of variance that ypred explains about y.
Returns 1 - Var[y-ypred] / Var[y]
interpretation:
ev=0 => might as well have predicted zero
ev=1 => perfect prediction
ev<0 => worse than just predicting zero
:param y_pred: the prediction
:param y_true: the expected value
:return: explained variance of ypred and y
"""
assert y_true.ndim == 1 and y_pred.ndim == 1
var_y = np.var(y_true)
return np.nan if var_y == 0 else 1 - np.var(y_true - y_pred) / var_y
if __name__ == "__main__":
policy_net = PolicyNetwork(4, 10, 0.003, [64, 64], 'ReLU')
for name, para in policy_net.named_parameters():
print('{}: {}'.format(name, para.shape))
4.2.3 PPO_MASKED
"""
PPO implementation with action mask according to the StableBaselines3 implementation.
To reuse trained models, you can make use of the save and load function
"""
import numpy as np
import random
import torch as T
import torch.nn as nn
import torch.optim as optim
from torch.distributions.categorical import Categorical
import pickle
from typing import Tuple, Any, List
from src.utils.logger import Logger
# constants
POLICY_LAYER: List[int] = [256, 256]
POLICY_ACTIVATION: str = 'ReLU'
VALUE_LAYER: List[int] = [256, 256]
VALUE_ACTIVATION: str = 'ReLU'
class RolloutBuffer:
"""
Handles episode data collection and batch generation
:param buffer_size: Buffer size
:param batch_size: Size for batches to be generated
"""
def __init__(self, buffer_size: int, batch_size: int):
self.observations = []
self.probs = []
self.values = []
self.actions = []
self.rewards = []
self.dones = []
self.action_masks = []
self.advantages = None
self.returns = None
if buffer_size % batch_size != 0:
raise TypeError("rollout_steps has to be a multiple of batch_size")
self.buffer_size = buffer_size
self.batch_size = batch_size
self.reset()
def generate_batches(self) -> Tuple:
"""
Generates batches from the stored data
:return: batches: Lists with all indices from the rollout_data, shuffled and sampled in lists with batch_size
e.g. [[0,34,1,768,...(len: batch size)], [], ...(len: len(rollout_data) / batch size)]
"""
# create random index list and split into arrays with batch size
indices = np.random.permutation(self.buffer_size)
num_batches = int(self.buffer_size / self.batch_size)
batches = indices.reshape((num_batches, self.batch_size))
return np.array(self.observations), np.array(self.actions), np.array(self.probs), np.array(self.action_masks),\
batches
def compute_advantages_and_returns(self, last_value, gamma, gae_lambda) -> None:
"""
Computes advantage values and returns for all stored episodes. Required to
:param last_value: Value from the next step to calculate the advantage for the last episode in the buffer
:param gamma: Discount factor for the advantage calculation
:param gae_lambda: Smoothing parameter for the advantage calculation
:return: None
"""
# advantage: advantage from the actual returned rewards over the baseline value from step t onwards
last_advantage = 0
for step in reversed(range(self.buffer_size)):
# use the predicted reward for the advantage computation of the last step of the buffer
if step == self.buffer_size - 1:
# if a step is the last one of the episode (done = 1) -> not_done = 0 => the advantage
# doesn't contain values outside the own episode
not_done = 1.0 - self.dones[step]
next_values = last_value
else:
not_done = 1.0 - self.dones[step]
next_values = self.values[step + 1]
delta = self.rewards[step] + gamma * next_values * not_done - self.values[step]
last_advantage = delta + gamma * gae_lambda * not_done * last_advantage
self.advantages[step] = last_advantage
# compute returns = discounted rewards, advantages = discounted rewards - values
# Necessary to update the value network
self.returns = self.values + self.advantages
def store_memory(self, observation: np.ndarray, action: int, prob: float, value: float,
reward: Any, done: bool, action_mask: np.ndarray) -> None:
"""
Appends all data from the recent step
:param observation: Observation at the beginning of the step
:param action: Index of the selected action
:param prob: Probability of the selected action (output from the policy_net)
:param value: Baseline value that the value_net estimated from this step onwards according to the
:param observation: Output from the value_net
:param reward: Reward the env returned in this step
:param done: True if the episode ended in this step
:param action_mask: One hot vector with ones for all possible actions
:return: None
"""
self.observations.append(observation)
self.actions.append(action)
self.probs.append(prob)
self.values.append(value)
self.rewards.append(reward)
self.dones.append(done)
self.action_masks.append(action_mask)
def reset(self) -> None:
"""
Resets all buffer lists
:return: None
"""
self.observations = []
self.probs = []
self.actions = []
self.rewards = []
self.dones = []
self.values = []
self.action_masks = []
self.advantages = np.zeros(self.buffer_size, dtype=np.float32)
class PolicyNetwork(nn.Module):
"""
Policy Network for the agent
:param input_dims: Observation size to determine input dimension
:param n_actions: Number of action to determine output size
:param learning_rate: Learning rate for the network
:param fc1_dims: Size hidden layer 1
:param fc2_dims: Size hidden layer 2
"""
def __init__(self, input_dim: int, n_actions: int, learning_rate: float, hidden_layers: List[int], activation: str):
super(PolicyNetwork, self).__init__()
net_structure = []
# get activation class according to string
activation = getattr(nn, activation)()
# create first hidden layer in accordance with the input dim and the first hidden dim
net_structure.extend([nn.Linear(input_dim, hidden_layers[0]), activation])
# create the other hidden layers
for i, layer_dim in enumerate(hidden_layers):
if not i + 1 == len(hidden_layers):
net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation])
else:
# create output layer
net_structure.extend([nn.Linear(layer_dim, n_actions), nn.Softmax(dim=-1)])
self.policy_net = nn.Sequential(*net_structure)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self, observation, action_mask):
"""forward through the actor network"""
observation.to(self.device)
logits = self.policy_net(observation)
# mask probabilities if action_mask is not None (for env.reset)
if action_mask is not None:
action_mask.to(self.device)
logits = T.where(action_mask, logits, T.tensor(-1e+8).to(self.device))
dist = Categorical(logits=logits)
return dist
class ValueNetwork(nn.Module):
"""
Value Network for the agent
:param input_dims: Observation size to determine input dimension
:param learning_rate: Learning rate for the network
:param fc1_dims: Size hidden layer 1
:param fc2_dims: Size hidden layer 2
"""
def __init__(self, input_dim: int, learning_rate: float, hidden_layers: List[int], activation: str):
super(ValueNetwork, self).__init__()
net_structure = []
# get activation class according to string
activation = getattr(nn, activation)()
# create first hidden layer in accordance with the input dim and the first hidden dim
net_structure.extend([nn.Linear(*input_dim, hidden_layers[0]), activation])
# create the other hidden layers
for i, layer_dim in enumerate(hidden_layers):
if not i + 1 == len(hidden_layers):
net_structure.extend([nn.Linear(layer_dim, hidden_layers[i + 1]), activation])
else:
# create output layer
net_structure.append(nn.Linear(layer_dim, 1))
self.value_net = nn.Sequential(*net_structure)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self, observation):
"""forward through the value network"""
value = self.value_net(observation)
return value
class MaskedPPO:
def __init__(self, env, config: dict, logger: Logger = None):
"""
| gamma: Discount factor for the advantage calculation
| learning_rate: Learning rate for both, policy_net and value_net
| gae_lambda: Smoothing parameter for the advantage calculation
| clip_range: Limitation for the ratio between old and new policy
| batch_size: Size of batches which were sampled from the buffer and fed into the nets during training
| n_epochs: Number of repetitions for each training iteration
| rollout_steps: Step interval within the update is performed. Has to be a multiple of batch_size
"""
self.env = env
self.gamma = config.get('gamma', 0.99)
self.gae_lambda = config.get('gae_lambda', 0.95)
self.clip_range = config.get('clip_range', 0.2)
self.n_epochs = config.get('n_epochs', 0.5)
self.rollout_steps = config.get('rollout_steps', 2048)
self.ent_coef = config.get('ent_coef', 0.0)
self.num_timesteps = 0
self.n_updates = 0
self.learning_rate = config.get('learning_rate', 0.002)
self.batch_size = config.get('batch_size', 256)
self.logger = logger if logger else Logger(config=config)
self.seed = config.get('seed', None)
# torch seed setting
if self.seed is not None:
random.seed(self.seed)
np.random.seed(self.seed)
T.manual_seed(self.seed)
# self.env.action_space.seed(seed)
self.env.seed(self.seed)
# create networks and buffer
self.policy_net = PolicyNetwork(env.observation_space.shape[0], env.action_space.n, self.learning_rate,
config.get('policy_layer', POLICY_LAYER),
config.get('policy_activation', POLICY_ACTIVATION))
self.value_net = ValueNetwork(env.observation_space.shape, self.learning_rate,
config.get('value_layer', VALUE_LAYER),
config.get('value_activation', VALUE_ACTIVATION))
self.rollout_buffer = RolloutBuffer(self.rollout_steps, self.batch_size)
@classmethod
def load(cls, file: str, config: dict, logger: Logger = None):
"""
Creates a PPO object according to the parameters saved in file.pkl
:param file: Path and filname (without .pkl) of your saved model pickle file
:param config: Dictionary with parameters to specify PPO attributes
:param logger: Logger
:return: MaskedPPO object
"""
with open(f"{file}.pkl", "rb") as handle:
data = pickle.load(handle)
env = data["params"]["env"]
# create PPO object, commit necessary parameters. Update remaining parameters
model = cls(env=env, config=config, logger=logger)
model.__dict__.update(data["params"])
# set weights from policy and value
model.policy_net.load_state_dict(data["policy_params"])
model.value_net.load_state_dict(data["value_params"])
return model
def save(self, file: str) -> None:
"""
Save model as pickle file
:param file: Path under which the file will be saved
:return: None
"""
params_dict = self.__dict__.copy()
del params_dict['logger']
data = {
"params": params_dict,
"policy_params": self.policy_net.state_dict(),
"value_params": self.value_net.state_dict()
}
with open(f"{file}.pkl", "wb") as handle:
pickle.dump(data, handle)
def forward(self, observation: np.ndarray, action_mask: np.ndarray) -> Tuple:
"""
Predicts an action according to the current policy and based on the action_mask and observation
and the value for the next state
:param observation: Current observation of teh environment
:param action_mask: One hot vector with ones for all possible actions
:return: Predicted action, probability for this action, and predicted value for the next state
"""
observation = T.tensor(observation, dtype=T.float).to(self.policy_net.device)
if action_mask is not None:
action_mask = T.tensor(action_mask, dtype=T.bool).to(self.policy_net.device)
dist = self.policy_net(observation, action_mask)
value = self.value_net(observation)
action = dist.sample()
prob = T.squeeze(dist.log_prob(action)).item()
action = T.squeeze(action).item()
value = T.squeeze(value).item()
return action, prob, value
def predict(self, observation: np.ndarray, action_mask: np.ndarray,
deterministic: bool = True, state=None) -> Tuple:
"""
Action prediction for testing
:param observation: Current observation of teh environment
:param action_mask: One hot vector with ones for all possible actions
:param deterministic: Set True, to force a deterministic prediction
:param state: The last states (used in rnn policies)
:return: Predicted action and next state (used in rnn policies)
"""
observation = T.tensor(np.array(observation), dtype=T.float).to(self.policy_net.device)
action_mask = T.tensor(action_mask, dtype=T.bool).to(self.policy_net.device)
with T.no_grad():
dist = self.policy_net(observation, action_mask)
if deterministic:
action = T.argmax(dist.probs)
else:
# choose random action according to the predicted probs
action = dist.sample()
action = T.squeeze(action).item()
return action, state
def train(self) -> None:
"""
Trains policy and value
:return: None
"""
# switch to train mode
self.policy_net.train(True)
self.value_net.train(True)
policy_losses, value_losses, entropy_losses, total_losses = [], [], [], []
for _ in range(self.n_epochs):
# get data from buffer and random batches(index lists) to iterate over
# e.g. obs[batch] returns the observations for all indices in batch
obs_arr, action_arr, old_prob_arr, action_mask_arr, batches = self.rollout_buffer.generate_batches()
# get advantage and return values from buffer
advantages = T.tensor(self.rollout_buffer.advantages).to(self.policy_net.device)
returns = T.tensor(self.rollout_buffer.returns).to(self.value_net.device)
# normalize advantages
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
for batch in batches:
observations = T.tensor(obs_arr[batch], dtype=T.float).to(self.policy_net.device)
old_probs = T.tensor(old_prob_arr[batch]).to(self.policy_net.device)
actions = T.tensor(action_arr[batch]).to(self.policy_net.device)
action_masks = T.tensor(action_mask_arr[batch], dtype=T.bool).to(self.policy_net.device)
dist = self.policy_net(observations, action_masks)
values = self.value_net(observations)
values = T.squeeze(values)
# ratio between old and new policy (probs of selected actions)
# Should be one at the first batch of every train iteration
new_probs = dist.log_prob(actions)
prob_ratio = new_probs.exp() / old_probs.exp()
# policy clip
policy_loss_1 = prob_ratio * advantages[batch]
policy_loss_2 = T.clamp(prob_ratio, 1-self.clip_range, 1+self.clip_range) * advantages[batch]
# we want to maximize the reward, but running gradient descent -> negate the loss here
policy_loss = -T.min(policy_loss_1, policy_loss_2).mean()
value_loss = (returns[batch]-values)**2
value_loss = value_loss.mean()
# entropy loss
entropy_loss = -T.mean(dist.entropy())
entropy_losses.append(entropy_loss.item())
total_loss = policy_loss + 0.5*value_loss + self.ent_coef*entropy_loss
self.policy_net.optimizer.zero_grad()
self.value_net.optimizer.zero_grad()
total_loss.backward()
self.policy_net.optimizer.step()
self.value_net.optimizer.step()
policy_losses.append(policy_loss.item())
value_losses.append(value_loss.item())
total_losses.append(total_loss.item())
self.n_updates += self.n_epochs
# logs
# compute explained variance
explained_var = explained_variance(np.asarray(self.rollout_buffer.values), self.rollout_buffer.returns)
self.logger.record(
{
'agent_training/n_updates': self.n_updates,
'agent_training/loss': np.mean(total_losses),
'agent_training/policy_gradient_loss': np.mean(policy_losses),
'agent_training/value_loss': np.mean(value_losses),
'agent_training/entropy_loss': np.mean(entropy_losses),
'agent_training/explained_variance': explained_var
}
)
self.logger.dump()
def learn(self, total_instances: int, total_timesteps: int, intermediate_test=None) -> None:
"""
Learn over n environment instances or n timesteps. Break depending on which condition is met first
One learning iteration consists of collecting rollouts and training the networks
:param total_instances: Instance limit
:param total_timesteps: Timestep limit
:param intermediate_test: (IntermediateTest) intermediate test object. Must be created before.
"""
instances = 0
# iterate over n episodes = the agents has n episodes to interact with the environment
for _ in range(total_instances):
obs = self.env.reset()
info = {'mask': None}
done = False
instances += 1
# run agent on env until done
while not done:
action, prob, val = self.forward(obs, action_mask=info['mask'])
new_obs, reward, done, info = self.env.step(action)
self.num_timesteps += 1
self.rollout_buffer.store_memory(obs, action, prob, val, reward, done, info['mask'])
# call intermediate_test on_step
if intermediate_test:
intermediate_test.on_step(self.num_timesteps, instances, self)
# break learn if total_timesteps are reached
if self.num_timesteps >= total_timesteps:
print("total_timesteps reached")
self.logger.record(
{
'results_on_train_dataset/instances': instances,
'results_on_train_dataset/num_timesteps': self.num_timesteps
}
)
self.logger.dump()
return None
# update every n rollout_steps
if self.num_timesteps % self.rollout_steps == 0:
# predict the next reward, needed for the advantage computation of the last collected step
with T.no_grad():
_, _, val = self.forward(new_obs, info['mask'])
self.rollout_buffer.compute_advantages_and_returns(val, self.gamma, self.gae_lambda)
# train networks
self.train()
# switch back to normal mode
self.policy_net.train(False)
self.value_net.train(False)
# reset buffer to continue collecting rollouts
self.rollout_buffer.reset()
obs = new_obs
if instances % len(self.env.data) == len(self.env.data) - 1:
mean_training_reward = np.mean(self.env.episodes_rewards)
mean_training_makespan = np.mean(self.env.episodes_makespans)
if len(self.env.episodes_tardinesses) == 0:
mean_training_tardiness = 0
else:
mean_training_tardiness = np.mean(self.env.episodes_tardinesses)
self.logger.record(
{
'results_on_train_dataset/mean_reward': mean_training_reward,
'results_on_train_dataset/mean_makespan': mean_training_makespan,
'results_on_train_dataset/mean_tardiness': mean_training_tardiness
}
)
self.logger.dump()
print("TRAINING DONE")
self.logger.record(
{
'results_on_train_dataset/instances': instances,
'results_on_train_dataset/num_timesteps': self.num_timesteps
}
)
self.logger.dump()
def explained_variance(y_pred: np.ndarray, y_true: np.ndarray) -> np.ndarray:
"""
From Stable-Baseline
Computes fraction of variance that ypred explains about y.
Returns 1 - Var[y-ypred] / Var[y]
interpretation:
ev=0 => might as well have predicted zero
ev=1 => perfect prediction
ev<0 => worse than just predicting zero
:param y_pred: the prediction
:param y_true: the expected value
:return: explained variance of ypred and y
"""
assert y_true.ndim == 1 and y_pred.ndim == 1
var_y = np.var(y_true)
return np.nan if var_y == 0 else 1 - np.var(y_true - y_pred) / var_y
if __name__ == "__main__":
policy_net = PolicyNetwork(4, 10, 0.003)
for name, para in policy_net.named_parameters():
print('{}: {}'.format(name, para.shape))
5. 可视化
Schlably还提供了代理性能学习的训练曲线、调度测试案例的甘特图和不同算法间的结果对比等的可视化。
"""
Gantt chart tests.
"""
import unittest
import copy
from pathlib import Path
from typing import List
import PIL.Image
from src.agents.heuristic.heuristic_agent import HeuristicSelectionAgent
from src.data_generator.task import Task
from src.environments.env_tetris_scheduling import Env
from src.utils.file_handler.config_handler import ConfigHandler
from src.utils.file_handler.data_handler import DataHandler
from src.visuals_generator.gantt_chart import GanttChartPlotter
class TestGanttChart(unittest.TestCase):
"""
Class with gantt chart tests.
"""
_test_tasks: List[Task]
@classmethod
def setUpClass(cls) -> None:
"""
:return: None
"""
env_config = ConfigHandler.get_config(config_file_path='training/dqn/config_job3_task4_tools0.yaml')
data = DataHandler.load_instances_data_file(config=env_config)
cls._test_tasks = copy.deepcopy(data[0])
done = False
env = Env(env_config, [data[0]])
heuristic_agent = HeuristicSelectionAgent()
while not done:
# obs = env.state_obs
mask = env.get_action_mask()
cls._test_tasks = env.tasks
task_mask = mask
action = heuristic_agent(cls._test_tasks, task_mask, 'rand')
res = env.step(action)
done = res[2]
@classmethod
def tearDownClass(cls) -> None:
"""
Tear down class
:return: None
"""
del cls._test_tasks
cls._trap = None
def test_get_gantt_chart_image(self) -> None:
"""
Test gantt chart image
:return: None
"""
test_image = GanttChartPlotter.get_gantt_chart_image(self._test_tasks)
self.assertIsInstance(test_image, PIL.Image.Image)
def test_get_gantt_chart_image_and_save(self) -> None:
"""
Test gantt chart image and save
:return: None
"""
test_image_path: Path = \
GanttChartPlotter.get_gantt_chart_image_and_save(self._test_tasks,
filename="automated_test_random_name_dc55c0e399428u7e",
file_type="png")
test_image_path.unlink()
def test_get_gantt_chart_gif_and_save(self) -> None:
"""
Test gantt chart gif and save
:return: None
"""
# TODO - prevent print output - redirect_stdout does not work
test_gif_path: Path = \
GanttChartPlotter.get_gantt_chart_gif_and_save(self._test_tasks,
filename="automated_test_random_name_dc55c0e399428e3e",
save_intermediate_images=False,
quality_dpi=55)
test_gif_path.unlink()
if __name__ == '__main__':
unittest.main()
6. 结论
综上所述,Schlably是一个功能强大且易于使用的框架,它不仅解决了许多DRL调度实验中的常见问题,还提供了丰富的工具和资源,帮助研究人员专注于创新性工作。我们期待Schlably能在未来的研究中发挥更大的作用,推动PS领域的发展。
A. 参考文献
- [1] C. W. de Puiseau, J. Peters, C. Dorpelkus, H. Tercan, and T. Meisen, “schlably: A Python framework for deep reinforcement learning based scheduling experiments,” Softwarex, vol. 22, May 2023, Art no. 101383, doi: 10.1016/j.softx.2023.101383.
更多推荐
所有评论(0)