こんにちは、エンジニアの竹内です。
以前の記事でDQNに模倣学習の仕組みを取り入れたDeep Q-Learning from Demonstrationsというアルゴリズムを紹介しましたが、模倣学習には他にもいろいろなアプローチが存在します。
特にエキスパートの行動軌跡から環境の報酬関数を推定する逆強化学習(Inverse Reinforcement Learning)という手法を利用したものは模倣学習アルゴリズムの中でも代表的な手法の1つであり、環境からの報酬が得られない場合でも模倣学習を行う事ができます。
そこで今回は逆強化学習を用いた模倣学習アルゴリズムの中でも特に有用な手法である、敵対的生成ネットワーク(Generative Adversarial Network)を組み合わせたGenerative Adversarial Imitation Learning(GAIL)という手法を紹介し、Tensorflow2で実装していきたいと思います。
逆強化学習とは?
環境から得られる報酬を最大化する方策を求めるのが通常の強化学習の問題設定ですが、逆強化学習では報酬を最大化するような方策、つまりエキスパートの方策から得られた行動軌跡のサンプルを使って逆に報酬関数の方を推定することが目的です。さらに推定した報酬関数を用いて再度強化学習を行うことによって環境から得られる報酬を利用せずにエキスパートの方策を再現することができます。
ちなみに逆強化学習の問題設定では学習に利用するフィードバックとして報酬関数ではなくコスト関数を用いることが多いです。報酬関数の符号を変えればコスト関数になるので基本的には両者は同じものとみなせます。*1
逆強化学習アルゴリズムの例としては、ニューラルネットを用いないMaximum Entropy Inverse Reinforcement Learningやニューラルネットを用いたDeep IRL、Guided Cost Learning、AIRLそしてこれから紹介するGAILなどがあります。
GAILの概要
逆強化学習を用いた模倣学習における課題の一つに、エキスパートの行動軌跡から報酬関数を求めるという問題と、得られた報酬関数から強化学習によってエキスパートの方策を求めるという問題の2つを解かなければならないという点があります。GAILの論文ではこれら2つの問題を1つの問題として定式化し、敵対的生成ネットワーク(GAN)の仕組みを利用することでエキスパートの行動軌跡から直接エキスパートの方策を抽出する手法が示されています。最終的に提案されているアルゴリズム自体は割とシンプルなものなのに対し、逆強化学習と強化学習の2つの問題を1つの問題に定式化するための理論立てについては凸最適化の枠組みを利用しながらかなり詳細に述べられており、強化学習の他の論文と比較すると若干難解な印象を受けます。*2
この記事ではアルゴリズムの実装をメインとして、定式化の理論部分についての解説は省略させていただきます。(理論部分についてはGenerative Adversarial Imitation Learningの紹介(RLアーキテクチャ勉強会)や[DL輪読会]逆強化学習とGANsなどのスライドがわかりやすく、非常に参考になります。)
アルゴリズムの概要
論文に記載されている擬似コードは以下のようなものです。
通常のGANと同様にGeneratorとDiscriminatorの2つのニューラルネットを使用します。
GeneratorはPPOやTRPOで使用するエージェントと同じく現在の状態を入力として選択する行動の確率分布を出力します。Discriminatorは状態と行動のペアを入力として、その状態と行動のペアがエキスパートのものである確率を出力します。学習の際Discriminatorはサンプルされた状態と行動のペアがエキスパートのものかGeneratorが生成したものかを見分けるようにパラメータを更新し、Generatorは自身が生成した状態と行動に対するDiscriminatorからの出力を報酬とみなして通常の強化学習の要領で学習します。つまり、DIscriminatorが出力した「エキスパートのものである確率」が高い行動に対しては得られる報酬が高く、そうでない行動に対しては得られる報酬が低いものとして学習を行うことで、GeneratorはよりDiscriminatorが騙されやすい(=エキスパートの方策に近い)方策を学習していくことになります。DiscriminatorとGeneratorの学習は交互に行っていきます。
Lossの計算部分は論文の擬似コードと若干異なっていますが、Stable Baselinesの実装に合わせています。
実装
エージェントの方策の更新にはTRPOの代わりにPPOを用いています。今回の実装は以前ブログで紹介したPPOの実装に基づいているため、ぜひそちらの方もご参照ください。学習する環境にはCartPole-v0を使用します。
なお、ブログ内のコードは簡略化して載せているため、全体の実装についてはgithubを御覧ください。
メイン部分
大まかな流れは、予め作成したエキスパートの行動軌跡を読み込んでメモリーに保存した後
- 現在の方策に従ってエージェントの行動軌跡をサンプル
- Discriminatorを学習し、報酬を計算
- 計算された報酬を使いGeneratorを学習
の3つの手順の繰り返しとなります。
def main(): env = gym.make(config.env_name) # 環境の作成 env = CartPoleWrapper(env) # with tf.device("/gpu:0"): # gpuを使用する場合 with tf.device("/cpu:0"): # generatorとdiscriminatorの作成 ppo = PPO( num_actions=env.action_space.n, input_shape=env.observation_space.shape, config=config ) discriminator = Discriminator( num_obs=env.observation_space.shape[0], num_actions=env.action_space.n, config=config ) num_episodes = 0 episode_rewards = deque([] * 100, maxlen=100) memory = Memory(env.observation_space.shape, config) # 学習のためのサンプルを保存しておくメモリ expert_dataset = ExpertDataset(f"demo/{config.demo}") # デモデータをロードしてメモリに保存 reward_sum = 0 obs = env.reset() for t in tqdm(range(config.num_updates)): # ===== 1. get samples ===== for _ in range(config.num_steps): policy, value = ppo.step(tf.constant(obs)) policy = policy.numpy() action = np.random.choice(2, p=policy) next_obs, rew, done, _ = env.step(action) # 環境からの報酬は学習時には使用しない memory.add(obs, action, rew, done, value, policy[action]) obs = next_obs reward_sum += rew if done: episode_rewards.append(env.steps) num_episodes += 1 reward_sum = 0 obs = env.reset() _, last_value = ppo.step(obs[np.newaxis, :]) memory.add(None, None, None, None, last_value, None) # ===== 2. train reward giver(discriminator) ===== for _ in range(config.num_discriminator_epochs): idxes = [idx for idx in range(config.num_steps)] # エージェントが集めたサンプル数と同じ数のデータを学習に使用 random.shuffle(idxes) for start in range(0, len(memory), config.batch_size): minibatch_indexes = idxes[start:start + config.batch_size] agent_obs, agent_act, _, _, _ = memory.sample(minibatch_indexes) # エージェントが集めたサンプルn demo_obs, demo_act = expert_dataset.sample(config.batch_size) # エキスパートのデモデータ total_loss, agent_loss, demo_loss, agent_acc, demo_acc = discriminator.train(demo_obs, demo_act, agent_obs, agent_act) actions = tf.constant(memory.actions, dtype=tf.int32) observations = tf.constant(memory.obses, dtype=tf.float32) reward_signals = discriminator.inference(observations, actions).numpy() # discriminatorの出力を報酬として利用 memory.rewards = reward_signals # エージェントの学習用の報酬をdiscriminatorの出力に置き換える rew_mean = np.mean(reward_signals) # ===== train agent(generator) ===== memory.compute_gae() # GAEの計算 for _ in range(config.num_generator_epochs): idxes = [idx for idx in range(config.num_steps)] random.shuffle(idxes) for start in range(0, len(memory), config.batch_size): minibatch_indexes = idxes[start:start+config.batch_size] batch_obs, batch_act, batch_adv, batch_sum, batch_pi_old = memory.sample(minibatch_indexes) loss, policy_loss, value_loss, entropy_loss, policy, kl, frac = ppo.train(batch_obs, batch_act, batch_pi_old, batch_adv, batch_sum) memory.reset()
Generator(PPO Agent)
行動軌跡を生成するエージェントを実装していきます。といっても中身は以前紹介したPPOの実装と全く同じものとなります。
import tensorflow as tf EPS = 1e-8 class PPO(tf.Module): def __init__(self, num_actions, input_shape, config): super(PPO, self).__init__(name='ppo_model') self.num_actions = num_actions self.input_shape = input_shape self.batch_size = config.batch_size self.config = config self.policy_value_network = self._build_model() self.optimizer = tf.keras.optimizers.Adam(learning_rate=config.learning_rate) self.gradient_clip = config.gradient_clip @tf.function def step(self, obs): obs = tf.expand_dims(obs, 0) pi, v = self.policy_value_network(obs) pi = tf.squeeze(pi) return pi, v @tf.function def train(self, obs, action, pi_old, advantage, rew_sum): # GradientTapeコンテクスト内で行われる計算は勾配を記録することが出来ます。 with tf.GradientTape() as tape: policy, value = self.policy_value_network(obs) value = tf.squeeze(value) one_hot_action = tf.one_hot(action, self.num_actions) pi = tf.squeeze(tf.reduce_sum(policy * one_hot_action, axis=1, keepdims=True)) ratio = tf.divide(pi, pi_old + EPS) # log内や割り算で0が発生し、パラメータがnanになるのを避けるため予め小さい数を加えます clipped_advantage = tf.where(advantage > 0, (1+self.config.clip)*advantage, (1-self.config.clip)*advantage) l_clip = (-1) * tf.reduce_mean(tf.minimum(ratio * advantage, clipped_advantage)) # 損失は最小化されるので最大化したいものにはマイナスを付けます l_vf = tf.reduce_mean(tf.square(rew_sum-value)) entropy = - tf.reduce_sum(policy * tf.math.log(policy + EPS), axis=1) l_ent = tf.reduce_mean(entropy) loss = l_clip + l_vf * self.config.vf_coef - l_ent * self.config.ent_coef # エントロピーは最大化するように符号を付けます grads = tape.gradient(loss, self.policy_value_network.trainable_variables) # 記録した計算のパラメータに関する勾配を取得します if self.gradient_clip is not None: # 勾配を一定の範囲にクリッピングします(baselinesで用いられている手法です) clipped_grads = [] for grad in grads: clipped_grads.append(tf.clip_by_norm(grad, self.gradient_clip)) grads = clipped_grads grads_and_vars = zip(grads, self.policy_value_network.trainable_variables) self.optimizer.apply_gradients(grads_and_vars) # optimizerを適用してパラメータを更新します # ↓ここから先はデバッグ用に監視する値なので学習には不要です new_policy, _ = self.policy_value_network(tf.convert_to_tensor(obs, dtype=tf.float32)) new_prob = tf.reduce_sum(new_policy * one_hot_action, axis=1) kl = tf.reduce_mean(pi * tf.math.log(new_prob+EPS) - pi * tf.math.log(pi+EPS)) clipfrac = tf.reduce_mean(tf.cast(tf.greater(tf.abs(ratio - 1.0), self.config.clip), tf.float32)) return loss, l_clip, l_vf, l_ent, policy, kl, clipfrac def _build_model(self): input_x = tf.keras.Input(shape=self.input_shape) h1 = tf.keras.layers.Dense(units=self.config.num_units, activation="relu")(input_x) h2 = tf.keras.layers.Dense(units=self.config.num_units, activation="relu")(h1) policy = tf.keras.layers.Dense(self.num_actions, activation="softmax")(h2) value = tf.keras.layers.Dense(1)(h2) model = tf.keras.Model(inputs=input_x, outputs=[policy, value]) return model
Discriminator
Discriminatorを実装していきます。
Discriminatorは状態sとone-hotエンコーディングした行動aを結合したものを入力として、その組み合わせがエキスパートのものである確率を出力します。
この出力はそのままGeneratorが学習するための報酬の代わりとして利用します。論文の擬似コードを参照すると本来出力のlogをとったものを報酬シグナルとすべきですが、使用している学習アルゴリズムが違うためか単なる誤読かはわかりませんがその方法では学習がうまく進みませんでした。(誤読していたら指摘していただけるとありがたいです。)
ネットワークの構造はGeneratorとほぼ同じものを使用しています。
EPS = 1e-8 class Discriminator(tf.Module): def __init__(self, num_obs, num_actions, config): super(Discriminator, self).__init__(name='discriminator') self.num_actions = num_actions self.input_shape = (num_obs + num_actions, ) self.batch_size = config.batch_size self.config = config self.network = self._build_model() self.optimizer = tf.keras.optimizers.Adam(learning_rate=config.learning_rate) self.gradient_clip = config.gradient_clip @tf.function def inference(self, obs, action): one_hot_action = tf.one_hot(action, self.num_actions, dtype=tf.float32) agent_input = tf.concat([obs, one_hot_action], axis=1) p = self.network(agent_input) # reward = tf.math.log(p + EPS) # 論文通りに実装するとこうなるはずですが、何故かうまく学習できませんでした。 reward = p return reward @tf.function def train(self, demo_obs, demo_act, agent_obs, agent_act): with tf.GradientTape() as tape: agent_one_hot_act = tf.one_hot(agent_act, self.num_actions) demo_one_hot_act = tf.one_hot(demo_act, self.num_actions) agent_input = tf.concat([agent_obs, agent_one_hot_act], axis=1) demo_input = tf.concat([demo_obs, demo_one_hot_act], axis=1) agent_p = self.network(agent_input) demo_p = self.network(demo_input) agent_loss = - tf.reduce_mean(tf.math.log(1. - agent_p + EPS)) demo_loss = - tf.reduce_mean(tf.math.log(demo_p + EPS)) loss = agent_loss + demo_loss grads = tape.gradient(loss, self.network.trainable_variables) if self.gradient_clip is not None: clipped_grads = [] for grad in grads: clipped_grads.append(tf.clip_by_norm(grad, self.gradient_clip)) grads = clipped_grads agent_acc = tf.reduce_mean(tf.cast(agent_p < 0.5, dtype=tf.float32)) demo_acc = tf.reduce_mean(tf.cast(demo_p > 0.5, dtype=tf.float32)) grads_and_vars = zip(grads, self.network.trainable_variables) self.optimizer.apply_gradients(grads_and_vars) return loss, agent_loss, demo_loss, agent_acc, demo_acc def _build_model(self): input_x = tf.keras.Input(shape=self.input_shape) h1 = tf.keras.layers.Dense(units=self.config.num_units, activation="relu")(input_x) h2 = tf.keras.layers.Dense(units=self.config.num_units, activation="relu")(h1) p = tf.keras.layers.Dense(1, activation="sigmoid")(h2) model = tf.keras.Model(inputs=input_x, outputs=p) return model
デモデータの作成
デモデータはpygameを使って人力で作成するパターンと、予めPPOで学習したモデルを使用するパターンの2通りで作成して結果の比較を行いました。人力で作成するためコードについてはOpenAIのコードを利用しました。
デモデータは両者ともmaxの200ステップまでキープできたエピソードを10本分使用しました。*3
学習結果
実際に10000エピソード学習させてみた結果です。マシンはIntel Core i7-9750H 6core RAM64GBのPCでCPUのみを使用し、学習にはだいたい40分程度を要しました。
人間が生成したデモと学習済みppoモデルから生成したデモの2種類で学習させた結果、ppoからのデモを使用したほうが安定して高いスコアを挙げることが出来ました。
学習済みモデルの方が同じ状態に対して同じ行動を選択する確率が高く方策が一貫しているのに対して、人間のプレイでは一貫した方策を取りにくく、不完全なデモが生成されやすいため、このような差が生じたと考えられます。
まとめ
敵対的学習の仕組みを利用した逆強化学習型模倣学習アルゴリズムGAILを紹介しました。
GAILはUnityエディタ上で強化学習を行うためのプラグインであるUnity ML-Agentsの標準アルゴリズムとしても採用されており、高次元の状態空間で模倣学習を行う際、環境によっては単純に教師あり学習を行うよりも高いパフォーマンスを発揮することが出来ます。また以前紹介した模倣学習アルゴリズムDQfDと比較したときの一番の利点は報酬を設定しなくても模倣学習が可能であるという点です。実装もそこまで複雑でないため模倣学習を行う際の選択肢の1つとして有力な候補になるでしょう。
Reference
- Jonathan Ho, Stefano Ermon, 2016, Generative Adversarial Imitation Learning
https://arxiv.org/abs/1606.03476
- Inverse Reinforcement Learning CS 294-112: Deep Reinforcement Learning Sergey Levine
http://rail.eecs.berkeley.edu/deeprlcourse-fa17/f17docs/lecture_12_irl.pdf
- Stable Baselines GAIL
https://stable-baselines.readthedocs.io/en/master/modules/gail.html
- Generative Adversarial Imitation Learningの紹介(RLアーキテクチャ勉強会)
https://www.slideshare.net/YusukeNakata1/generative-adversarial-imitation-learningrl
- [DL輪読会]逆強化学習とGANs
https://www.slideshare.net/DeepLearningJP2016/dlgenerative-adversarial-imitation-learning-82875615