RNNで言語モデルを作る – 実装編


代表の島田です。前回の記事 RNNで言語モデルを作るための理論 では、言語モデルを作るという目的で一般的なRNNの構造についての解説を行いました。それを踏まえて、今回の記事では Python で実際に言語モデルを実装し、その言語モデルを用いて自動で生成された文章の内容を確認してみます。 scoutyでもRNNは今後文生成や、スカウトメールの文面と返信率の相関性検証などに使っていこうと考えている技術です。

今はフルスクラッチで書かなくても、Chainerのような便利なライブラリがあるので、実践で使うならそちらの方が便利でしょう。 ただ、実際にフルスクラッチで実装することで、中身の原理を理解し、チューニングすることも自分でできるようになるので、今回はそういった趣旨でRNNの実装を行ってみます。

モジュール構成

一般的なNNと同様、以下のようなモジュール構成になります:

  • predict: Forward Propagation を行う関数
    • 入力 x: 文(単語 \(x^{(t)}\) を並べた1次元配列)
      • e.g., "I have a" を表現する [0, 4, 2]
    • 出力 y: 予測値(単語 \(y^{(t)}\) を並べた1次元配列)
      • e.g., "have a pen" を表現する [4, 2, 3]
    • 出力 s: 隠れ層
  • compute_mean_loss: 損失関数
    • 入力 X: 訓練データ(文 \(\boldsymbol{x}\) を並べた2次元配列)
      • e.g., "I have a" と "You want to" を表現する [[0, 4, 2], [1, 3, 0]]
    • 入力 D: 教師データ(文 \(\boldsymbol{d}\) を並べた2次元配列)
      • e.g., "have a pen" と "want to make" を表現する [[4, 2, 3], [3, 0, 3]]
    • 出力 mean_loss: XD から計算された損失関数の値
  • acc_deltas_bptt: 重み更新値(デルタ)を計算する関数
    • 入力 x: 文(単語 \(x^{(t)}\) を並べた1次元配列)
    • 入力 d: 教師データ(単語 \(d^{(t)}\) を並べた1次元配列)
    • 入力 y: 予測値(単語 \(y^{(t)}\) を並べた1次元配列)
    • 入力 s: 隠れ層
    • 入力 steps: BPTTで遡るタイムステップ数
  • train: トレーニングを実行する関数
    • 入力は数が多いので省略します(コード内の docstring をご参照ください)

なお、もちろんこれは実装の一例であり、入出力や関数の切り分けパターンは他にも存在します。

各モジュール

今回の実装では、以下のように RNN クラスのインスタンス変数として重みや重み更新値を設定します:

class RNN(object):
    def __init__(self, vocab_size, hidden_dims):
        self.vocab_size = vocab_size
        self.hidden_dims = hidden_dims 
        
        # matrices
        # V (input -> hidden)
        self.V = random.randn(self.hidden_dims, self.vocab_size) * sqrt(0.1)
        # W (hidden -> output)
        self.W = random.randn(self.vocab_size, self.hidden_dims) * sqrt(0.1)
        # U (hidden ->; hidden)
        self.U = random.randn(self.hidden_dims, self.hidden_dims) * sqrt(0.1)
        
        # aggregated weight changes for V, W, U
        self.deltaV = zeros((self.hidden_dims, self.vocab_size))
        self.deltaW = zeros((self.vocab_size, self.hidden_dims))
        self.deltaU = zeros((self.hidden_dims, self.hidden_dims))

predict の実装

Forward Propagation, つまり入力(例えば "The bank went bankrupt")から次単語列(例えば "bank went bankrupt again")の予測を行う関数 predict は、次のように実装されます:

def predict(self, x):
    '''
    Args:
        x: list of words, as indices (e.g.: [0, 4, 2])
    Returns:
        y: matrix of probability vectors for each input word
        s: matrix of hidden layers for each input word
    '''
    # NOTE: in this implement, s[0] = [0. 0. ... 0.]
    #       and s[t+1] is the hidden layer corresponding to y[t]
    s = zeros((len(x) + 1, self.hidden_dims))
    y = zeros((len(x), self.vocab_size))

    for t in range(len(x)):
        x_vector = self.get_one_hot_vector(x[t])

        net_in = dot(self.V, x_vector) + dot(self.U, s[t])
        s[t + 1] =  sigmoid(net_in)

        net_out = dot(self.W, s[t + 1])
        y[t] = softmax(net_out)

    return y, s

前回記事の図に示したように、単語の長さに応じてレイヤーの枚数が変わっていくようなイメージです。

\(t\) 番目の単語を one-hot ベクトル \(\boldsymbol{x}^{(t)}\) で表した場合は文の表現は行列になりますが、本記事の実装では \(\boldsymbol{x}^{(t)}\) のうち1となっているインデックスだけを整数で表すことで、文を1次元のベクトルで表現することとします。つまり、 predict の引数 x は1次元配列 (list) になります。

損失関数 compute_loss, compute_mean_loss の実装

今回の実装では、損失関数として Cross Entropy Loss を用います。このとき、文一つの損失 \(J\) は各単語の損失 \(J^{(t)}\) の和*1として、次のように表されます:
$$\begin{align}
J^{(t)} &= -\sum^{|V|}_{j=1} d^{(t)}_j \log y_j^{(t)}\\
J &= \sum^{n}_{t=1} J^{(t)}
\end{align}$$
今回はデータセットの評価にはmean_loss関数、つまり、データ全体でみた損失関数の単語ごとの平均を用いました。

def compute_loss(self, x, d):
    y, s = self.predict(x)
    loss_t = zeros(len(y))

    for t in range(len(y)):
        d_t_vector = self.get_one_hot_vector(d[t])
        y_t_vector = y[t]

        # dot product works as product & summation
        loss_t[t] = -dot(d_t_vector, log(y_t_vector)) 

    # combined loss is summation of loss over t
    loss = sum(loss_t)

    return loss

def compute_mean_loss(self, X, D):
    '''
    Args:
        X: a list of input vectors   (e.g., [[0, 4, 2], [1, 3, 0]]
        D: a list of desired outputs (e.g., [[4, 2, 3], [3, 0, 3]]
    Returns:
        mean_loss
    '''
    sum_of_loss = 0
    sum_of_length = 0

    for i in range(len(X)):
       sum_of_loss += self.compute_loss(X[i], D[i])
       sum_of_length += len(X[i])

    # loss per word = total loss in the dataset devided by total number of words in the dataset.
    mean_loss = sum_of_loss / float(sum_of_length)

    return mean_loss

重み更新値デルタ関数 acc_deltas_bptt の実装

前回記事の更新式をそのままコードにすることで、重み更新値デルタを計算する関数 acc_deltas_bptt を以下のように実装できます:

def acc_deltas_bptt(self, x, d, y, s, steps):
    '''
    Args:
        steps: number of time steps to go back in BPTT
    Return:
        None
    '''

    net_out_grad = ones(len(x))
    net_in_grad = array([s_t * (ones(len(s_t)) - s_t) for s_t in s])
    sum_deltaW = zeros((self.vocab_size, self.hidden_dims))
    sum_deltaV = zeros((self.hidden_dims, self.vocab_size))
    sum_deltaU = zeros((self.hidden_dims, self.hidden_dims))

    # NOTE: in this implement, s[0] = [0. 0. ... 0.] 
    #       and s[t+1] is the hidden layer corresponding to y[t] (same in net_in_grad)
    for t in reversed(range(len(x))):
        d_t_vector = self.get_one_hot_vector(d[t])
        delta_out_t = (d_t_vector - y[t]) * net_out_grad[t]
        sum_deltaW += outer(delta_out_t, s[t + 1])

        delta_in = zeros((len(x), self.hidden_dims))

        for tau in range(0, 1 + steps):
            if t - tau < 0:
                break
            if tau == 0:
                delta_in[t - tau] = dot(self.W.T, delta_out_t) * net_in_grad[t + 1]
            else:
                delta_in[t - tau] = dot(self.U.T, delta_in[t - tau + 1]) * net_in_grad[t - tau + 1]
            sum_deltaV += outer(delta_in[t - tau], x[t - tau])
            sum_deltaU += outer(delta_in[t - tau], s[t - tau])

    # multiply learning rate when actually applying delta
    self.deltaW = sum_deltaW
    self.deltaV = sum_deltaV
    self.deltaU = sum_deltaU

遡るタイムステップを指定する引数 steps はハイパーパラメータで、 steps を大きくすれば大きくするほど計算結果は正確になりますが、計算量も大きくなります。

train の実装

トレーニングの際には、「エポック」という単位で処理を繰り返します。各エポックでは、まずトレーニングセンテンスごとに acc_delta_bptt を実行し、誤差逆伝搬でデルタを計算します。デルタの計算が終わると重みを更新し、次のエポックに移り、これを繰り返す流れです。 ここで、重みの更新の大きさをコントロールする 学習率 というパラメータがありますが、学習率は各エポックごとにだんだん小さくしていくのが一般的です。学習率をどのように変化させて行くかもハイパーパラメータのひとつとして実装者が決定します。今回の実装では、 \(m\) エポック目での学習率 \(\eta_m\) を以下のように定義します:
$$\eta_m = \eta_0 \frac{r}{m + r}.$$
\(r\) は実数の定数で、今回は \(r=5\) としています。 train は以下のように実装します:

def train(self, X, D, X_dev, D_dev, epochs, learning_rate, anneal, back_steps, batch_size, min_change):
    '''
  
    Args:
        X: a list of input vectors       e.g., [[0, 4, 2], [1, 3, 0]]
        D: a list of desired outputs     e.g., [[4, 2, 3], [3, 0, 3]]
        X_dev: a list of input vectors   e.g., [[0, 4, 2], [1, 3, 0]]
        D_dev: a list of desired outputs e.g., [[4, 2, 3], [3, 0, 3]]
        epochs: maximum number of epochs (iterations) over the training set
        learning_rate: initial learning rate for training
        anneal: positive integer. if > 0, lowers the learning rate in a harmonically after each epoch.
                higher annealing rate means less change per epoch.
                anneal=0 will not change the learning rate over time
        back_steps: positive integer. number of timesteps for BPTT. if back_steps < 2, standard BP will be used
        batch_size: number of training instances(sentence?) to use before updating the RNN's weight matrices.
                    if set to 1, weights will be updated after each instance. if set to len(X), weights are only updated after each epoch
        min_change: minimum change in loss between 2 epochs. if the change in loss is smaller than min_change, training stops regardless of
                    number of epochs left

    Returns:
        None
    '''

    print("Training model for {0} epochs\ntraining set: {1} sentences (batch size {2})".format(epochs, len(X), batch_size))
    print("Optimizing loss on {0} sentences".format(len(X_dev)))
    print("Vocab size: {0}\nHidden units: {1}".format(self.vocab_size, self.hidden_dims))
    print("Steps for back propagation: {0}".format(back_steps))
    print("Initial learning rate set to {0}, annealing set to {1}".format(learning_rate, anneal))
    print("\ncalculating initial mean loss on dev set")

    initial_loss = self.compute_mean_loss(X_dev, D_dev)
    print("initial mean loss: {0}".format(initial_loss))

    prev_loss = initial_loss
    loss_watch_count = -1
    min_change_count = -1

    a0 = learning_rate

    best_loss = initial_loss
    bestU, bestV, bestW = self.U, self.V, self.W
    best_epoch = 0

    for epoch in range(epochs):
        t0 = time.time()
        
        if anneal > 0:
            learning_rate = a0 / ((epoch + 0.0 + anneal) / anneal)
        else:
            learning_rate = a0
        print("\nepoch %d, learning rate %.04f" % (epoch + 1, learning_rate))

        count = 0

        # use random sequence of instances in the training set (tries to avoid local maxima when training on batches)
        for i in random.permutation(range(len(X))):
            count += 1
            stdout.write("\r\tpair {0}".format(count))

            x_i = X[i]
            d_i = D[i]
            y_i, s_i = self.predict(x_i)
            if back_steps < 2:
                self.acc_deltas(x_i, d_i, y_i, s_i)
            else:
                self.acc_deltas_bptt(x_i, d_i, y_i, s_i, back_steps)

            if count % batch_size == 0:
                self.deltaU /= batch_size
                self.deltaV /= batch_size
                self.deltaW /= batch_size
                self.apply_deltas(learning_rate)

        if count % batch_size > 0:
            self.deltaU /= (count % batch_size)
            self.deltaV /= (count % batch_size)
            self.deltaW /= (count % batch_size)
            self.apply_deltas(learning_rate)

        print("\n\tcalculating new loss on dev set")
        loss = self.compute_mean_loss(X_dev, D_dev)
        print("\tmean loss: {0}".format(loss))
        print("\tepoch done in %.02f seconds" % (time.time() - t0))
        
        if loss < best_loss:
            best_loss = loss
            bestU, bestV, bestW = self.U.copy(), self.V.copy(), self.W.copy()
            best_epoch = epoch

        # make sure we change the RNN enough
        if abs(prev_loss - loss) < min_change:
            min_change_count += 1
        else:
            min_change_count = 0
        if min_change_count > 2:
            print("\ntraining finished after {0} epochs due to minimal change in loss".format(epoch + 1))
            break

        prev_loss = loss
    print("\ntraining finished after reaching maximum of {0} epochs".format(epochs))
    print("best observed loss was {0}, at epoch {1}".format(best_loss, (best_epoch + 1)))
    print("setting U, V, W to matrices from best epoch")

    self.U, self.V, self.W = bestU, bestV, bestW

トレーニング結果と文章生成結果

今回の実装では、隠れ層のユニット数 \(D_h\), BPTTにおいて遡るタイムステップ数 \(\tau\), 学習率の初期値 \(\eta_0\) をそれぞれ以下のような範囲で変更しました:

  • \(D_h \in \{10,\ 50,\ 100\}\)
  • \(\tau \in \{0,\ 3,\ 10\}\)
  • \(\eta_0 \in \{0.5,\ 0.1,\ 0.05\}\)

パラメータの組み合わせは計27通りです。実験のためのデータセットとして Penn Treebank の WSJ Corpus を利用しています。

まず、ハイパーパラメータのチューニングを行います。このとき、全てのデータを利用するとトレーニングに時間がかかるので、トレーニングに用いるセンテンスを1000個に絞ります。なお、絞られたデータセットでのボキャブラリーサイズ \(|V|\) は \(|V|=2000\) であり、エポック数は25として実験を行いました。 各パラメータに対する最終的な平均損失 (mean loss) の値は以下の表1のようになりました:

表1: train の際の mean loss の一覧表。 \(\tau\) を定めたときの mean loss を \(\mathcal{L}(\tau)\) とすると、各セルの中の値はそれぞれ \(\mathcal{L}(0)\), \(\mathcal{L}(3)\),\(\mathcal{L}(10)\) を表す。

今回の実装で試したパラメータでは、 \(\eta_0 = 0.5\), \(D_h = 50\), \(\tau = 5.94\) が最も良い組み合わせだとわかりました。

次に、このハイパーパラメータの組み合わせを固定してフルトレーニングセット(6万件くらい)でネットワークのトレーニングをやり直し*2、できた言語モデルを使って自動で文章を生成させてみました。文章を作成する関数 generate_sequence のコードは以下のようになります:

def generate_sequence(self, start, end, maxLength):
    sequence = [start]
    loss = 0.
    x = [start]
    while True:
        # predict next word from current sequene x
        y,s = self.predict(x)

        # generate next word by sampling the word  according to the last element of y
        word_index = multinomial_sample(y[-1])

        x.append(word_index)
        sequence.append(word_index)
        pointwise_loss = -log(y[-1][word_index])
        loss += pointwise_loss

        if word_index == end or len(sequence) > maxLength:
            break

    return sequence, loss

文頭記号 <s> から出発し、 predict 関数を使って \(\boldsymbol{y}^{(t)}\) ベクトルを計算し、その確率分布に応じて次の単語を選択、今までのセンテンスと結合させて、再度 predict 関数で次の単語を推定する... という流れで文章を作り、文末記号 </s> に到達したらセンテンスを返す、という処理です。

出力結果のうち、特に mean loss の小さかった例が以下になります。

mean loss: 3.36850018043
[’<s>’, ’for’, ’offered’, ’market’, ’.’, ’,’, ’that’, ’says’, ’,’, ’</s>’] 

mean loss: 3.63943955362
[’<s>;’, ’net’, ’was’, ’to’, ’share’, ’the’, ’</s>’]

mean loss: 3.78469000827
[’<s>;’, ’these’, ’has’, ’the’, ’resources’, ’the’, ’he’, ’,’, ’.’, ’it’,
’which’, ’fund’, ’</s>’]

意味の無い文が多くなっていますが、今回はデータセットも絞った上でハイパーパラメータのチューニングもまだ十分にできていなかったので、改善の余地はありそうです。 ", that says," や "was to share" といった部分的に意味の通っている表現は見受けられるものの、全体を通しては意味不明な文章が多いですね。この精度であればマルコフ連鎖モデル(品詞 → 品詞の遷移 (transition) 確率と、品詞 → 単語の emission 確率を組み合わせるモデル)などを用いる方がいいかもしれません。 意味不明な文章が生成されてしまった原因のひとつとして考えられるのは、ボキャブラリーサイズ2000のデータセットに対して one-hot ベクトルを使うことで、単語を表現するベクトル空間がスパースになってしまっていることです。 \(\boldsymbol{y}^{(t)}\) ベクトルの一番大きな値でも0.01か0.02(つまり一番出やすい単語でも1%か2%の確率、他が0.5%くらい)というケースが多く、この分布に従ったらほとんどランダムに単語をピックアップしているようなものです。ピックアップのアルゴリズム(例えば上位100単語だけ見て確率を正規化し直すとか)を変えるだけで性能は向上するかもしれません。 今回は実験と原理理解のために実装した形なので、精度向上に関しての考察はこのあたりに留めておきます。

まとめ

言語モデルにおいて最も伝統的なNgramは、直前N個の単語まで考慮してコーパス内の単語のつながりをカウントし、単にカウントとカウントの割り算で確率を計算する、というシンプルなモデルでした。 このモデルの問題点は、コーパスにある単語のつながりしかカウントできないので、実際には出現する可能性があってもコーパスに出ていないというだけで単語の生起確率が0になってしまう、というものです。 これを解消する手法が単語カウントに他の単語カウントから得られた適当な数を足して0をなくすというSmoothingですが、これにもいろいろな問題があります(ここでは省略します)。端的に言うと、Ngramは汎化能力が低いという弱みがあります。

この点、RNNは単語をカウントしているのではなく、重みを学ぶことで単語と単語のつながり方を学習しているようなものなので、汎化能力が相対的に高いのです。つまり、コーパスに無い初見の単語でも確率0を与えないような出力を行うことができます。 しかし、今回のようなモデルでは one-hot ベクトルを利用するという性質上ベクトルがスパースになるという(おなじみの)問題は避けられません。今回のRNNアーキテクチャは単語の分散表現 (embedding) ベクトルでも応用可能なのか、少し考えてみる必要はあるかもしれません。

また、RNNは言語以外のシーケンシャルなデータに対しても応用できます。例えば、各タイムステップでの画像を入力にして動画を解析したり、株価の推移やFX取引のアルゴリズムとして使うことが可能です。


*1: あるいは平均を用いることもできます
*2: 筆者の環境では1エポックに30分近くかかりました。