この記事ではRNNによる言語モデルを使った文章生成の方法(主にプログラム)について書いてみます。
はじめに
今回はRNNを使った文章の自動生成をやってみます。
今回やりたいことは単語を学習したモデル(言語モデル)に渡して、その出力結果から次の単語を予測。そしてその予測した単語をまた言語モデルに渡して、結果を得る。これを繰り返して文章を生成することです。
実装にはPyTorchを使いました。
この記事ではWikipediaの文章から言語モデルを作って文章生成をしてみます。ただし、学習時間のことを考えて3つの記事から言語モデルを作ることにします。
環境
実行環境
以下の環境で動作確認済みです。また、記事の後半にGoogle Colaboratoryでの学習方法について書いてみます。
必要なライブラリなど
- Requests
- BeautifulSoup
- MeCab
- JapaneseTextEncoder
WindowsにMeCabをインストールするにはこちらを参考にしてみてください。それからJapaneseTextEncoderについてはこちら。JapaneseTextEncoderのtext_encoder.pyとreserved_tokens.pyを使います。
はじめに、Wikipediaから学習データを取ってきます。今回は「織田信長」、「豊臣秀吉」、「徳川家康」の3つの記事を学習データとして使います。
そのためにRequestsとBeautifulSoupを使ってWikipediaからデータをダウンロードします。
ライブラリのインポート
まずは必要なライブラリのインポート。
import MeCab
import re
import requests
from bs4 import BeautifulSoup
指定したURLの情報を取ってきて、そこから<p></p>で囲まれた部分を抽出し前処理してリストに入れています。今回はミニバッチ学習をするため学習データの長さ(形態素解析したときの単語の数)が50未満になるように前処理をしています。
url = "http://ja.wikipedia.org/wiki/"
wiki_name_list = ["織田信長", "豊臣秀吉", "徳川家康"]
corpus = []
tagger = MeCab.Tagger('-Owakati')
for wiki_name in wiki_name_list:
response = requests.get(url+wiki_name)
html = response.text
soup = BeautifulSoup(html, 'html.parser')
for p_tag in soup.find_all('p'):
sentences = p_tag.text.strip()
for sentence in sentences.split("。"):
if len(sentence.strip()) == 0:
continue
sentence = re.sub(r"\[[0-9]+\]", "", sentence)
sentence = re.sub(r"\[注釈 [0-9]+\]", "", sentence)
tokens = tagger.parse(sentence).strip().split()
if len(tokens) > 46:
continue
corpus.append(sentence+"。")
print(corpus[0])
関連記事
関連記事①:Requestsを使ってみた
関連記事②:BeautifulSoupを使ってWikipediaのテキストを抽出する
2. 単語辞書を作り、学習データをインデックスに変換する
JapaneseTextEncoderのbuild()
で学習データを単語辞書をもとにインデックスに変換します。
from text_encoder import JapaneseTextEncoder
encoder = JapaneseTextEncoder(corpus, append_eos=True, maxlen=50, padding=True)
encoder.build()
indices = encoder.dataset[0]
print(indices)
print(encoder.decode(indices))
全てのデータの長さを50に揃えるために終端記号を表す2の後は0で埋めるようにしています。
3. モデルの構成
ネットワーク
下のプログラムが言語モデルを学習するための構成で、3層からなるネットワークになっています。
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
class RNNLM(nn.Module):
def __init__(self, embedding_dim, hidden_dim, vocab_size, batch_size=50, num_layers=1):
super(RNNLM, self).__init__()
self.batch_size = batch_size
self.num_layers = num_layers
self.hidden_dim = hidden_dim
self.word_embeddings = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.dropout = nn.Dropout(p=0.5)
self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True, num_layers=self.num_layers)
self.output = nn.Linear(hidden_dim, vocab_size)
def init_hidden(self):
self.hidden_state = torch.zeros(self.num_layers, self.batch_size, self.hidden_dim, device=device)
def forward(self, indices):
embed = self.word_embeddings(indices)
drop_out = self.dropout(embed)
if drop_out.dim() == 2:
drop_out = torch.unsqueeze(drop_out, 1)
gru_out, self.hidden_state = self.gru(drop_out, self.hidden_state)
gru_out = gru_out.contiguous()
return self.output(gru_out)
パラメータとか
学習を行うための前準備。
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
batch_size = 50
n_vocab = len(encoder.word2id)
n_epoch = 75000
EMBEDDING_DIM = HIDDEN_DIM = 256
model = RNNLM(EMBEDDING_DIM, HIDDEN_DIM, n_vocab).to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss(ignore_index=0)
def train2batch(dataset):
batch_dataset = []
for i in range(0, 1350, batch_size):
batch_dataset.append(dataset[i:i+batch_size])
return batch_dataset
4. 学習する
文章をモデルに渡して学習させます。
例えば下のような文章があったとき、
それなまちがいないわ
さらに、この文章の単語ID列が下のようなとき、
[21, 22, 23, 24, 13, 2]
この場合、言語モデルにそれなまちがいないわを学習させるときのイメージはこんな感じになります。

この文章の形態素を1単語ずつモデルに渡して次の単語を正しく予測するように学習していきます。
このイメージを上で用意したネットワークに当てはめると下の図になります。

単語IDと隠れ状態をネットワークに与えて出力を得ます。そして教師データの単語IDとネットワークの出力から損失を計算しています。
以下が学習コードです。
学習を始まる前にtrain()
で学習モードに入ります。ドロップアウトを使うときは必ず学習モードに入る必要があるみたいです。(参考)
model.train()
for epoch in range(1, n_epoch+1):
encoder.shuffle()
batch_dataset = train2batch(encoder.dataset)
for batch_data in batch_dataset:
model.zero_grad()
model.init_hidden()
batch_tensor = torch.tensor(batch_data, device=device)
input_tensor = batch_tensor[:, :-1]
target_tensor = batch_tensor[:, 1:].contiguous()
outputs = model(input_tensor)
outputs = outputs.view(-1, n_vocab)
targets = target_tensor.view(-1)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
torch.save(model.state_dict(), "wikipedia.model")
学習は以下のような流れです。
- 学習データをシャッフル
train2batch()
で学習データからバッチサイズを50にしてbatch_dataset
に入れる。
- バッチデータをネットワークに与え出力を得る。
- 出力と正解データから損失を計算。
- パラメータの更新。
学習が終われば最後にモデルの保存で終了です。
関連記事
PyTorchによる言語モデルの作り方 - どん底から這い上がるまでの記録
5. 学習モデルから文章の生成
今回は信長という単語を学習したモデルに渡して文章を自動生成します。 信長を含めて生成した文章が50語以下もしくは</s>, <pad>, 。が出たら終了するようにします。
学習したモデルを使って文章を生成するプログラムは下のようになります。
import numpy as np
model = RNNLM(EMBEDDING_DIM, HIDDEN_DIM, n_vocab, batch_size=1).to(device)
model_name = "wikipedia.model"
model.load_state_dict(torch.load(model_name))
model.eval()
with torch.no_grad():
for i in range(30):
model.init_hidden()
morpheme = "信長"
sentence = [morpheme]
for j in range(50):
index = encoder.word2id[morpheme]
input_tensor = torch.tensor([index], device=device)
outputs = model(input_tensor)
probs = F.softmax(torch.squeeze(outputs))
p = probs.cpu().detach().numpy()
morpheme = np.random.choice(encoder.vocab, p=p)
sentence.append(morpheme)
if morpheme in ["</s>", "<pad>", "。"]:
break
print("".join(sentence))
学習するときにはtrain()
を使いましたが、今回はeval()
を使って推論モードにします。(参考)
上のプログラムでは信長という単語から始まる文章を30個生成しています。
以下は生成結果の一部です。
信長は細川藤孝に宛てた書状のなかで、「天下安全」の実現のために倒すべき敵は、本願寺のみとなったと述べている。
信長は岐阜城を信忠に発令、完成信玄を城中とした。
信長は設楽原決戦においては佐々成政ら5人の武将に多くの火縄銃を用いた射撃を行わとして、浅井であった。
信長は信雄を厳しく叱責し、謹慎を命じた(第一次天正伊賀の乱)。
信長のそれはある一ヵ所の戦場に集中して運用できたことに特徴があったといえる。
信長は茶の湯に大きな関心を示した。
信長は義昭を上洛させるために、14国に行軍したこと等を掲げと、その存立基盤を維持することになるが合意された。
文章の意味はおいといて、ところどころ変な日本語がありますが、けっこういい感じで文章が生成できているように感じます。
バリエーションのある文章を生成する。
文章生成のプログラムで工夫したところは以下のところです。numpyを使って確率分布から確率の高い単語を選択するようにしています。(赤い部分)
for j in range(50):
index = encoder.word2id[morpheme]
input_tensor = torch.tensor([index], device=device)
outputs = model(input_tensor)
probs = F.softmax(torch.squeeze(outputs))
p = probs.cpu().detach().numpy()
morpheme = np.random.choice(encoder.vocab, p=p)
文章を生成する方法として言語モデルに単語を与えて、その出力をSoftmaxで確率に変換し、最も高い確率の単語を選択していくという方法があります。しかし、この方法だと任意の単語を言語モデルに与えると最も高い確率の単語を常に選択するので、生成される文章は同じものになってしまいます。(下のコード)
for j in range(50):
index = encoder.word2id[morpheme]
input_tensor = torch.tensor([index], device=device)
outputs = model(input_tensor)
probs = F.softmax(torch.squeeze(outputs))
index = torch.argmax(probs.cpu().detach()).item()
morpheme = encoder.vocab[index]
sentence.append(morpheme)
そこで、確率分布に従って単語を選択するようにすれば言語モデルに毎回同じ単語を与えても異なる文章を生成できるようになります。
言語モデルのことをもっと知りたい方は
言語モデルについてはこの記事の説明では不十分だと思うので、より詳しく知りたい方にはこちらの本がオススメです。(^^
Deep Learningによる自然言語処理を勉強したい方へのおすすめの本です。
tensorflow, chainerやPyTorchといったフレームワークを使わずにゼロからnumpyを使ってディープラーニングの実装をしています。
扱っている内容はword2vec, RNN, GRU, seq2seqやAttentionなど、、、
text_encoder.pyとreserved_tokens.pyを用意しといてください。
データの用意
import MeCab
import re
import requests
from bs4 import BeautifulSoup
url = "http://ja.wikipedia.org/wiki/"
wiki_name_list = ["織田信長", "豊臣秀吉", "徳川家康"]
corpus = []
tagger = MeCab.Tagger('-Owakati')
for wiki_name in wiki_name_list:
response = requests.get(url+wiki_name)
html = response.text
soup = BeautifulSoup(html, 'html.parser')
for p_tag in soup.find_all('p'):
sentences = p_tag.text.strip()
for sentence in sentences.split("。"):
if len(sentence.strip()) == 0:
continue
sentence = re.sub(r"\[[0-9]+\]", "", sentence)
sentence = re.sub(r"\[注釈 [0-9]+\]", "", sentence)
tokens = tagger.parse(sentence).strip().split()
if len(tokens) > 46:
continue
corpus.append(sentence+"。")
学習コード
import logging
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from text_encoder import JapaneseTextEncoder
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
logging.info("Building dictionary and dataset.")
encoder = JapaneseTextEncoder(corpus, append_eos=True, maxlen=50, padding=True)
encoder.build()
logging.info("Done...")
n_vocab = len(encoder.word2id)
EMBEDDING_DIM = HIDDEN_DIM = 256
batch_size = 50
logging.info("Vocab has %i elements.", n_vocab)
logging.info("The longest sentence has %i elements.", len(max(encoder.dataset, key=len)))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class RNNLM(nn.Module):
def __init__(self, embedding_dim, hidden_dim, vocab_size, batch_size=50, num_layers=1):
super(RNNLM, self).__init__()
self.batch_size = batch_size
self.num_layers = num_layers
self.hidden_dim = hidden_dim
self.word_embeddings = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.dropout = nn.Dropout(p=0.5)
self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True, num_layers=self.num_layers)
self.output = nn.Linear(hidden_dim, vocab_size)
def init_hidden(self):
self.hidden_state = torch.zeros(self.num_layers, self.batch_size, self.hidden_dim, device=device)
def forward(self, indices):
embed = self.word_embeddings(indices)
drop_out = self.dropout(embed)
if drop_out.dim() == 2:
drop_out = torch.unsqueeze(drop_out, 1)
gru_out, self.hidden_state = self.gru(drop_out, self.hidden_state)
gru_out = gru_out.contiguous()
return self.output(gru_out)
def train2batch(dataset):
batch_dataset = []
for i in range(0, 1350, batch_size):
batch_dataset.append(dataset[i:i+batch_size])
return batch_dataset
n_epoch = 75000
model = RNNLM(EMBEDDING_DIM, HIDDEN_DIM, n_vocab).to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss(ignore_index=0)
logging.info("Training mode")
model.train()
for epoch in range(1, n_epoch+1):
if epoch % 100 == 0:
logging.info("Epoch %i: %.2f", epoch, loss.item())
encoder.shuffle()
batch_dataset = train2batch(encoder.dataset)
for batch_data in batch_dataset:
model.zero_grad()
model.init_hidden()
batch_tensor = torch.tensor(batch_data, device=device)
input_tensor = batch_tensor[:, :-1]
target_tensor = batch_tensor[:, 1:].contiguous()
outputs = model(input_tensor)
outputs = outputs.view(-1, n_vocab)
targets = target_tensor.view(-1)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
torch.save(model.state_dict(), "wikipedia.model")
文章生成コード
import numpy as np
model = RNNLM(EMBEDDING_DIM, HIDDEN_DIM, n_vocab, batch_size=1).to(device)
model_name = "wikipedia.model"
model.load_state_dict(torch.load(model_name))
model.eval()
with torch.no_grad():
for i in range(30):
model.init_hidden()
morpheme = "信長"
sentence = [morpheme]
for j in range(50):
index = encoder.word2id[morpheme]
input_tensor = torch.tensor([index], device=device)
outputs = model(input_tensor)
probs = F.softmax(torch.squeeze(outputs))
p = probs.cpu().detach().numpy()
morpheme = np.random.choice(encoder.vocab, p=p)
sentence.append(morpheme)
if morpheme in ["</s>", "<pad>", "。"]:
break
print("".join(sentence))
(おまけ)Google Colaboratoryを使った学習
Google Colaboratoryを使った学習方法を紹介します。(※ランタイムのタイプはPython 3、GPUに設定しておきます。)
まずは、drive内のディレクトリをマウントします。
www.pytry3g.com
以下のコードをセルにコピペして実行します。
from google.colab import drive
drive.mount('/content/drive')
これによりdriveのMy Drive以下にGoogle Driveのルートディレクトリがマウントされます。
cd drive/My\ Drive/任意の作業ディレクトリ
次にGoogle Driveにある任意の作業ディレクトリに移動します。この作業ディレクトリには、あらかじめtext_encoder.pyとreserved_tokens.pyを用意しときます。
!apt install aptitude -qq
!aptitude install mecab libmecab-dev mecab-ipadic-utf8 git make curl xz-utils file -y -q
!pip install mecab-python3 -qqq
!pip install torch -qqq
上のコードをセルにコピペして実行し必要なライブラリをインストールします。
あとはこの記事で紹介した順にデータを集めて学習、文章生成をします。
Google Colaboratoryを使うときの問題点としてランタイムの接続が切れることがあります。そこで、学習が途中で止まってしまっても再学習できるように学習コードの部分を以下のようにします。
(※上のソースコードのコメントアウト# Training以下を変更。)
logging.info("Training mode")
model.train()
for epoch in range(1, n_epoch+1):
if epoch % 100 == 0:
logging.info("Epoch %i: %.2f", epoch, loss.item())
encoder.shuffle()
batch_dataset = train2batch(encoder.dataset)
for batch_data in batch_dataset:
model.zero_grad()
model.init_hidden()
batch_tensor = torch.tensor(batch_data, device=device)
input_tensor = batch_tensor[:, :-1]
target_tensor = batch_tensor[:, 1:].contiguous()
outputs = model(input_tensor)
outputs = outputs.view(-1, n_vocab)
targets = target_tensor.view(-1)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
# 以下変更箇所 1000epochごとに記録する。
if epoch % 1000 == 0:
model_name = "embedding{}_v{}.pt".format(EMBEDDING_DIM, epoch)
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss
}, model_name)
logging.info("Saving the checkpoint...")
とりあえず、model_state_dictとoptimizer_state_dictだけ記録しておけばおk。
再学習するには再開したいエポックを指定し、model_state_dictとoptimizer_state_dictを読み込んで学習を再開します。
begin_point = 65000
end_point = 75000
model_name = "embedding{}_v{}.pt".format(EMBEDDING_DIM, begin_point)
checkpoint = torch.load(model_name)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
logging.info("Training mode")
model.train()
loss = checkpoint['loss']
for epoch in range(begin_point+1, end_point+1):
if epoch % 100 == 0:
logging.info("Epoch %i: %.2f", epoch, loss.item())
以下学習コードと同じ...