どん底から這い上がるまでの記録

どん底から這い上がりたいけど這い上がれない人がいろいろ書くブログ(主にプログラミング)

RNNを使った文章の自動生成

 

この記事ではRNNによる言語モデルを使った文章生成の方法(主にプログラム)について書いてみます。

 

 

はじめに

今回はRNNを使った文章の自動生成をやってみます。

今回やりたいことは単語を学習したモデル(言語モデル)に渡して、その出力結果から次の単語を予測。そしてその予測した単語をまた言語モデルに渡して、結果を得る。これを繰り返して文章を生成することです。

実装にはPyTorchを使いました。

この記事ではWikipediaの文章から言語モデルを作って文章生成をしてみます。ただし、学習時間のことを考えて3つの記事から言語モデルを作ることにします。

環境

実行環境

以下の環境で動作確認済みです。また、記事の後半にGoogle Colaboratoryでの学習方法について書いてみます。

必要なライブラリなど

  • Requests
  • BeautifulSoup
  • MeCab
  • JapaneseTextEncoder

WindowsMeCabをインストールするにはこちらを参考にしてみてください。それからJapaneseTextEncoderについてはこちら。JapaneseTextEncoderのtext_encoder.pyreserved_tokens.pyを使います。

1. Wikipediaからデータを取ってくる

はじめに、Wikipediaから学習データを取ってきます。今回は「織田信長」、「豊臣秀吉」、「徳川家康」の3つの記事を学習データとして使います。

そのためにRequestsBeautifulSoupを使ってWikipediaからデータをダウンロードします。

ライブラリのインポート

まずは必要なライブラリのインポート。

import MeCab
import re
import requests
from bs4 import BeautifulSoup

Wikipediaの記事をダウンロード

指定したURLの情報を取ってきて、そこから<p></p>で囲まれた部分を抽出し前処理してリストに入れています。今回はミニバッチ学習をするため学習データの長さ(形態素解析したときの単語の数)が50未満になるように前処理をしています。

### データの用意 ###
# Wikipediaの記事をダウンロード
url = "http://ja.wikipedia.org/wiki/"
# 記事のリスト
wiki_name_list = ["織田信長", "豊臣秀吉", "徳川家康"]
# データを入れる
corpus = []
tagger = MeCab.Tagger('-Owakati')
for wiki_name in wiki_name_list:
    # 指定したページの情報を取得
    response = requests.get(url+wiki_name)
    # HTMLフォーマットに変換
    html = response.text
    soup = BeautifulSoup(html, 'html.parser')
    # pタグで囲まれた部分を抽出
    for p_tag in soup.find_all('p'):
        sentences = p_tag.text.strip()
        for sentence in sentences.split("。"):
            if len(sentence.strip()) == 0:
                continue
            # 前処理
            sentence = re.sub(r"\[[0-9]+\]", "", sentence)
            sentence = re.sub(r"\[注釈 [0-9]+\]", "", sentence)
            tokens = tagger.parse(sentence).strip().split()
            if len(tokens) > 46:
                continue
            corpus.append(sentence+"。")
print(corpus[0])
# 織田 信長(おだ のぶなが)は、戦国時代から安土桃山時代にかけての武将、戦国大名。

関連記事

関連記事①:Requestsを使ってみた

関連記事②:BeautifulSoupを使ってWikipediaのテキストを抽出する

2. 単語辞書を作り、学習データをインデックスに変換する

JapaneseTextEncoderのbuild()で学習データを単語辞書をもとにインデックスに変換します。

from text_encoder import JapaneseTextEncoder

encoder = JapaneseTextEncoder(corpus, append_eos=True, maxlen=50, padding=True)
encoder.build()

indices = encoder.dataset[0]
print(indices)
# [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 15, 19, 20, 21, 13, 14, 22, 23, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
print(encoder.decode(indices))
# 織田信長(おだのぶなが)は、戦国時代から安土桃山時代にかけての武将、戦国大名。</s>

全てのデータの長さを50に揃えるために終端記号を表すの後はで埋めるようにしています。

3. モデルの構成

ネットワーク

下のプログラムが言語モデルを学習するための構成で、3層からなるネットワークになっています。

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


class RNNLM(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size, batch_size=50, num_layers=1):
        super(RNNLM, self).__init__()
        self.batch_size = batch_size
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.dropout = nn.Dropout(p=0.5)

        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True, num_layers=self.num_layers)

        self.output = nn.Linear(hidden_dim, vocab_size)

    def init_hidden(self):
        self.hidden_state = torch.zeros(self.num_layers, self.batch_size, self.hidden_dim, device=device)

    def forward(self, indices):
        embed = self.word_embeddings(indices) # batch_len x sequence_length x embedding_dim
        drop_out = self.dropout(embed)
        if drop_out.dim() == 2:
            drop_out = torch.unsqueeze(drop_out, 1)
        gru_out, self.hidden_state = self.gru(drop_out, self.hidden_state)# batch_len x sequence_length x hidden_dim
        gru_out = gru_out.contiguous()
        return self.output(gru_out)

パラメータとか

学習を行うための前準備。

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
batch_size = 50
n_vocab = len(encoder.word2id)
n_epoch = 75000
EMBEDDING_DIM = HIDDEN_DIM = 256
model = RNNLM(EMBEDDING_DIM, HIDDEN_DIM, n_vocab).to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss(ignore_index=0)

def train2batch(dataset):
    batch_dataset = []
    for i in range(0, 1350, batch_size):
        batch_dataset.append(dataset[i:i+batch_size])
    return batch_dataset

4. 学習する

文章をモデルに渡して学習させます。

例えば下のような文章があったとき、

それなまちがいないわ

さらに、この文章の単語ID列が下のようなとき、

[21, 22, 23, 24, 13, 2]

この場合、言語モデルにそれなまちがいないわを学習させるときのイメージはこんな感じになります。

f:id:pytry3g:20180914153727p:plain

この文章の形態素を1単語ずつモデルに渡して次の単語を正しく予測するように学習していきます。

このイメージを上で用意したネットワークに当てはめると下の図になります。

f:id:pytry3g:20180914150607p:plain

単語IDと隠れ状態をネットワークに与えて出力を得ます。そして教師データの単語IDとネットワークの出力から損失を計算しています。

 

以下が学習コードです。

学習を始まる前にtrain()で学習モードに入ります。ドロップアウトを使うときは必ず学習モードに入る必要があるみたいです。(参考)

# Training
model.train()
for epoch in range(1, n_epoch+1):

    encoder.shuffle()
    # len(encoder.dataset) == 1361
    batch_dataset = train2batch(encoder.dataset)
    for batch_data in batch_dataset:
        model.zero_grad()
        model.init_hidden()
        
        batch_tensor = torch.tensor(batch_data, device=device)
        input_tensor = batch_tensor[:, :-1]
        target_tensor = batch_tensor[:, 1:].contiguous()
        outputs = model(input_tensor)
        outputs = outputs.view(-1, n_vocab)
        targets = target_tensor.view(-1)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

torch.save(model.state_dict(), "wikipedia.model")

学習は以下のような流れです。

  1. 学習データをシャッフル
  2. train2batch()で学習データからバッチサイズを50にしてbatch_datasetに入れる。
  3. バッチデータをネットワークに与え出力を得る。
  4. 出力と正解データから損失を計算。
  5. パラメータの更新。

学習が終われば最後にモデルの保存で終了です。

 

関連記事

PyTorchによる言語モデルの作り方 - どん底から這い上がるまでの記録

5. 学習モデルから文章の生成

今回は信長という単語を学習したモデルに渡して文章を自動生成します。 信長を含めて生成した文章が50語以下もしくは</s>, <pad>, 。が出たら終了するようにします。

学習したモデルを使って文章を生成するプログラムは下のようになります。

import numpy as np
model = RNNLM(EMBEDDING_DIM, HIDDEN_DIM, n_vocab, batch_size=1).to(device)
model_name = "wikipedia.model"
model.load_state_dict(torch.load(model_name))

# Evaluation
model.eval()
with torch.no_grad():
    for i in range(30):
        model.init_hidden()
        morpheme = "信長"
        sentence = [morpheme]
        for j in range(50):
            index = encoder.word2id[morpheme]
            input_tensor = torch.tensor([index], device=device)
            outputs = model(input_tensor)
            probs = F.softmax(torch.squeeze(outputs))
            p = probs.cpu().detach().numpy()
            morpheme = np.random.choice(encoder.vocab, p=p)
            sentence.append(morpheme)
            if morpheme in ["</s>", "<pad>", "。"]:
                break
        print("".join(sentence))

学習するときにはtrain()を使いましたが、今回はeval()を使って推論モードにします。(参考)

上のプログラムでは信長という単語から始まる文章を30個生成しています。

以下は生成結果の一部です。

信長は細川藤孝に宛てた書状のなかで、「天下安全」の実現のために倒すべき敵は、本願寺のみとなったと述べている。

信長は岐阜城を信忠に発令、完成信玄を城中とした。

信長は設楽原決戦においては佐々成政ら5人の武将に多くの火縄銃を用いた射撃を行わとして、浅井であった。

信長は信雄を厳しく叱責し、謹慎を命じた(第一次天正伊賀の乱)。

信長のそれはある一ヵ所の戦場に集中して運用できたことに特徴があったといえる。

信長は茶の湯に大きな関心を示した。

信長は義昭を上洛させるために、14国に行軍したこと等を掲げと、その存立基盤を維持することになるが合意された。

 

文章の意味はおいといて、ところどころ変な日本語がありますが、けっこういい感じで文章が生成できているように感じます。

バリエーションのある文章を生成する。

文章生成のプログラムで工夫したところは以下のところです。numpyを使って確率分布から確率の高い単語を選択するようにしています。(赤い部分)

        for j in range(50):
            index = encoder.word2id[morpheme]
            input_tensor = torch.tensor([index], device=device)
            outputs = model(input_tensor)
            probs = F.softmax(torch.squeeze(outputs))
            p = probs.cpu().detach().numpy()
            morpheme = np.random.choice(encoder.vocab, p=p)

文章を生成する方法として言語モデルに単語を与えて、その出力をSoftmaxで確率に変換し、最も高い確率の単語を選択していくという方法があります。しかし、この方法だと任意の単語を言語モデルに与えると最も高い確率の単語を常に選択するので、生成される文章は同じものになってしまいます。(下のコード)

        for j in range(50):
            index = encoder.word2id[morpheme]
            input_tensor = torch.tensor([index], device=device)
            outputs = model(input_tensor)
            probs = F.softmax(torch.squeeze(outputs))
            #p = probs.cpu().detach().numpy()
            #morpheme = np.random.choice(encoder.vocab, p=p)
            index = torch.argmax(probs.cpu().detach()).item()
            morpheme = encoder.vocab[index]
            sentence.append(morpheme)

そこで、確率分布に従って単語を選択するようにすれば言語モデルに毎回同じ単語を与えても異なる文章を生成できるようになります。

言語モデルのことをもっと知りたい方は

言語モデルについてはこの記事の説明では不十分だと思うので、より詳しく知りたい方にはこちらの本がオススメです。(^^

Deep Learningによる自然言語処理を勉強したい方へのおすすめの本です。

tensorflow, chainerやPyTorchといったフレームワークを使わずにゼロからnumpyを使ってディープラーニングの実装をしています。

扱っている内容はword2vec, RNN, GRU, seq2seqやAttentionなど、、、

ソースコード

text_encoder.pyreserved_tokens.pyを用意しといてください。

データの用意

import MeCab
import re
import requests
from bs4 import BeautifulSoup

### データの用意 ###
# Wikipediaの記事をダウンロード
url = "http://ja.wikipedia.org/wiki/"
# 記事のリスト
wiki_name_list = ["織田信長", "豊臣秀吉", "徳川家康"]
# データを入れる
corpus = []
tagger = MeCab.Tagger('-Owakati')
for wiki_name in wiki_name_list:
    # 指定したページの情報を取得
    response = requests.get(url+wiki_name)
    # HTMLフォーマットに変換
    html = response.text
    soup = BeautifulSoup(html, 'html.parser')
    # pタグで囲まれた部分を抽出
    for p_tag in soup.find_all('p'):
        sentences = p_tag.text.strip()
        for sentence in sentences.split("。"):
            if len(sentence.strip()) == 0:
                continue
            # 前処理
            sentence = re.sub(r"\[[0-9]+\]", "", sentence)
            sentence = re.sub(r"\[注釈 [0-9]+\]", "", sentence)
            tokens = tagger.parse(sentence).strip().split()
            if len(tokens) > 46:
                continue
            corpus.append(sentence+"。")

学習コード

import logging
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from text_encoder import JapaneseTextEncoder


logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
logging.info("Building dictionary and dataset.")
encoder = JapaneseTextEncoder(corpus, append_eos=True, maxlen=50, padding=True)
encoder.build()
logging.info("Done...")

n_vocab = len(encoder.word2id)
EMBEDDING_DIM = HIDDEN_DIM = 256
batch_size = 50
logging.info("Vocab has %i elements.", n_vocab)
logging.info("The longest sentence has %i elements.", len(max(encoder.dataset, key=len)))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class RNNLM(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size, batch_size=50, num_layers=1):
        super(RNNLM, self).__init__()
        self.batch_size = batch_size
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.dropout = nn.Dropout(p=0.5)

        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True, num_layers=self.num_layers)

        self.output = nn.Linear(hidden_dim, vocab_size)

    def init_hidden(self):
        self.hidden_state = torch.zeros(self.num_layers, self.batch_size, self.hidden_dim, device=device)

    def forward(self, indices):
        embed = self.word_embeddings(indices) # batch_len x sequence_length x embedding_dim
        drop_out = self.dropout(embed)
        if drop_out.dim() == 2:
            drop_out = torch.unsqueeze(drop_out, 1)
        gru_out, self.hidden_state = self.gru(drop_out, self.hidden_state)# batch_len x sequence_length x hidden_dim
        gru_out = gru_out.contiguous()
        return self.output(gru_out)
    
    
def train2batch(dataset):
    batch_dataset = []
    for i in range(0, 1350, batch_size):
        batch_dataset.append(dataset[i:i+batch_size])
    return batch_dataset

n_epoch = 75000
model = RNNLM(EMBEDDING_DIM, HIDDEN_DIM, n_vocab).to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss(ignore_index=0)

# Training
logging.info("Training mode")
model.train()
for epoch in range(1, n_epoch+1):
    
    if epoch % 100 == 0:
        logging.info("Epoch %i: %.2f", epoch, loss.item())
    
    encoder.shuffle()
    # len(encoder.dataset) == 1361
    batch_dataset = train2batch(encoder.dataset)
    for batch_data in batch_dataset:
        model.zero_grad()
        model.init_hidden()
        
        batch_tensor = torch.tensor(batch_data, device=device)
        input_tensor = batch_tensor[:, :-1]
        target_tensor = batch_tensor[:, 1:].contiguous()
        outputs = model(input_tensor)
        outputs = outputs.view(-1, n_vocab)
        targets = target_tensor.view(-1)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

torch.save(model.state_dict(), "wikipedia.model")

文章生成コード

import numpy as np
model = RNNLM(EMBEDDING_DIM, HIDDEN_DIM, n_vocab, batch_size=1).to(device)
model_name = "wikipedia.model"
model.load_state_dict(torch.load(model_name))

model.eval()
with torch.no_grad():
    for i in range(30):
        model.init_hidden()
        morpheme = "信長"
        sentence = [morpheme]
        for j in range(50):
            index = encoder.word2id[morpheme]
            input_tensor = torch.tensor([index], device=device)
            outputs = model(input_tensor)
            probs = F.softmax(torch.squeeze(outputs))
            p = probs.cpu().detach().numpy()
            morpheme = np.random.choice(encoder.vocab, p=p)
            sentence.append(morpheme)
            if morpheme in ["</s>", "<pad>", "。"]:
                break
        print("".join(sentence))

(おまけ)Google Colaboratoryを使った学習

Google Colaboratoryを使った学習方法を紹介します。(※ランタイムのタイプはPython 3、GPUに設定しておきます。)

まずは、drive内のディレクトリをマウントします。

www.pytry3g.com

 

以下のコードをセルにコピペして実行します。

from google.colab import drive
drive.mount('/content/drive')

これによりdriveのMy Drive以下にGoogle Driveのルートディレクトリがマウントされます。

cd drive/My\ Drive/任意の作業ディレクト

次にGoogle Driveにある任意の作業ディレクトリに移動します。この作業ディレクトリには、あらかじめtext_encoder.pyreserved_tokens.pyを用意しときます。

!apt install aptitude -qq
!aptitude install mecab libmecab-dev mecab-ipadic-utf8 git make curl xz-utils file -y -q
!pip install mecab-python3 -qqq
!pip install torch -qqq

上のコードをセルにコピペして実行し必要なライブラリをインストールします。

あとはこの記事で紹介した順にデータを集めて学習、文章生成をします。

 

Google Colaboratoryを使うときの問題点としてランタイムの接続が切れることがあります。そこで、学習が途中で止まってしまっても再学習できるように学習コードの部分を以下のようにします。

(※上のソースコードコメントアウト# Training以下を変更。)

# Training
logging.info("Training mode")
model.train()
for epoch in range(1, n_epoch+1):
    
    if epoch % 100 == 0:
        logging.info("Epoch %i: %.2f", epoch, loss.item())
    
    encoder.shuffle()
    # len(encoder.dataset) == 1361
    batch_dataset = train2batch(encoder.dataset)
    for batch_data in batch_dataset:
        model.zero_grad()
        model.init_hidden()
        
        batch_tensor = torch.tensor(batch_data, device=device)
        input_tensor = batch_tensor[:, :-1]
        target_tensor = batch_tensor[:, 1:].contiguous()
        outputs = model(input_tensor)
        outputs = outputs.view(-1, n_vocab)
        targets = target_tensor.view(-1)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
    # 以下変更箇所 1000epochごとに記録する。
    if epoch % 1000 == 0:
        model_name = "embedding{}_v{}.pt".format(EMBEDDING_DIM, epoch)
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss
        }, model_name)
        logging.info("Saving the checkpoint...")

とりあえず、model_state_dictとoptimizer_state_dictだけ記録しておけばおk。

 

再学習するには再開したいエポックを指定し、model_state_dictとoptimizer_state_dictを読み込んで学習を再開します。

begin_point = 65000
end_point = 75000
model_name = "embedding{}_v{}.pt".format(EMBEDDING_DIM, begin_point)
checkpoint = torch.load(model_name)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
# Training
logging.info("Training mode")
model.train()
loss = checkpoint['loss']
for epoch in range(begin_point+1, end_point+1):
    
    if epoch % 100 == 0:
        logging.info("Epoch %i: %.2f", epoch, loss.item())
    
以下学習コードと同じ...