Word2Vecの単語ベクトルから、いい感じな文書ベクトルが作れるSCDVというのを使ってテキスト分類をしてみました。
SCDVって?
SCDVについて分かりやすく説明されている記事がQiitaにあるので詳細は下の記事を参照してください。
文書ベクトルをお手軽に高い精度で作れるSCDVって実際どうなのか日本語コーパスで実験した(EMNLP2017)
こちらは本家のGithubです。
GitHub - dheeraj7596/SCDV: Text classification with Sparse Composite Document Vectors.
今回やること
今回は2値分類、つまり与えられたデータが2つのクラスのうちどちらのクラスに属するのかをSCDVを使ってやってみます。
今回用いるデータはTwitterのツイートとWikipediaの記事です。これを使ってSCDVを構築し、与えられたデータがツイートなのかWikipediaの記事なのか正しく分類できるのかを見てみます。
今回使うもの
私の実行環境とか使うものについて。
requestsとBeautifulSoupについて以前少しだけ触れました。
tqdmはpipでインストールできます。
pip install tqdm
MeCabをインストールしていない方は下の記事を参考にしてみてください。
www.pytry3g.com
1. データの用意
まずデータの用意します。
import codecs
import requests
from bs4 import BeautifulSoup
corpus = {"Wikipedia": [], "Twitter": []}
必要なライブラリをインポートし、corpus
にWikipediaの記事とTwitterのツイートを入れていきます。
Wikipediaの記事をダウンロードするにはrequestsとBeautifulSoupを使います。
今回ダウンロードする記事は戦国時代に活躍した大名の記事です。
url = "http://ja.wikipedia.org/wiki/"
wiki_name_list = ["織田信長", "豊臣秀吉", "徳川家康",
"武田信玄", "上杉謙信", "北条氏康",
"今川義元", "島津義弘", "毛利元就",
"伊達政宗"]
for wiki_name in wiki_name_list:
response = requests.get(url+wiki_name)
html = response.text
soup = BeautifulSoup(html, 'lxml')
for p_tag in soup.find_all('p'):
sentence = p_tag.text.strip()
if sentence == '':
continue
corpus["Wikipedia"].append(sentence)
ツイートの用意
次にツイートを用意します。今回使うツイートは以前ロシアワールドカップの期間中に集めたツイートです。
自分が用意したデータはpositive.txt
に入っているので以下のようなプログラムになります。
corpus["Twitter"] = codecs.open('positive.txt', 'r', 'utf-8').read().splitlines()
SCDVを作るにはWord2Vecが必要になります。
gensimのWord2Vecを使うので、Wikipediaのデータとツイートを形態素解析し List of lists of tokens の形(リストの中にリストがあり、そのリストの中身は形態素)にします。
import MeCab
tagger = MeCab.Tagger('-Owakati')
sentences = []
corpus_size = min(len(corpus["Wikipedia"]), len(corpus["Twitter"]))
for key in ["Wikipedia", "Twitter"]:
for _, sentence in zip(range(corpus_size), corpus[key]):
sentences.append(tagger.parse(sentence).strip().split())
Wikipediaとツイートのデータを同じ数にしたいので少ないほうのデータのサイズに合わせました。sentences
にはWikipediaとツイートがそれぞれ742件のデータが形態素解析された状態で入っています。Word2Vecにはこのsentences
を渡して単語ベクトルを作ります。
訓練データとテストデータに分ける
SCDVの性能を測るために訓練データとテストデータに分けます。
Wikipediaは1、Twitterは0にラベル付けしました。
from sklearn.model_selection import train_test_split
train_x, test_x, train_t, test_t = train_test_split(sentences, [1]*corpus_size+[0]*corpus_size, test_size=0.1)
SCDVを作る
データの用意ができたので、これからSCDVを作ります。本家のプログラムではSCDVを作るコードは関数で定義されています。個人的に使いずらいなと感じたので、それらの関数をまとめてSCDVを作りやすいように?クラスで書き直してみました。ちょっと長いので書き直したコード(scdv.py)はこの記事の下のソースコードのところに置いておきます。
新しく書き直したコードを使えばSCDVの作成手順は大雑把に言えば次の通りです。
- Word2Vecを作る
- 確率重み付き単語ベクトルを求める
- SCDVを作る
より詳細な説明はこちら。
これからこの記事の下に置いておいたscdv.pyを使ってSCDVを作っていきます。
1. Word2Vecを作る
さっき用意したsentences
を渡して次元が100の単語ベクトルを作っています。ちなみに、モデルはskip-gramモデルです。
from scdv import SparseCompositeDocumentVectors, build_word2vec
model = build_word2vec(sentences, 100, 5, 5, 1)
2. 確率重み付き単語ベクトルを求める
訓練データを使って作成した単語ベクトルから確率重み付き単語ベクトルを求めます。
今回は2クラスのテキスト分類なのでクラスタの数は2としました。
pname1 = "gmm_cluster.pkl"
pname2 = "gmm_prob_cluster.pkl"
vec = SparseCompositeDocumentVectors(model, 2, 100, pname1, pname2)
vec.get_probability_word_vectors(train_x)
3. SCDVを求める。
確率重み付き単語ベクトルを求めたら訓練データとテストデータのSCDVをそれぞれ求めます。
train_gwbowv = vec.make_gwbowv(train_x)
test_gwbowv = vec.make_gwbowv(test_x)
テキスト分類
SCDVを求めることができたので、SCDVの性能をサポートベクターマシンを使って見てみます。
from sklearn.svm import SVC
clf = SVC(kernel="linear")
y_pred = clf.fit(train_gwbowv, train_t).predict(test_gwbowv)
result = sum(1*(p == t) for p, t in zip(y_pred.tolist(), test_t)) / len(test_t)
print("Accuracy: {:.2f}".format(result))
訓練データのSCDVとラベルをサポートベクターマシンの学習モデルに渡して学習し、テストデータのSCDVを学習モデルに渡して予測結果を出力させます。
結果
結果はこうなりました。
Accuracy: 0.99
かなり高いですね。Wikipediaの記事とTwitterのツイートがこんなに高い精度で分類できるとは思いませんでした。
ちなみにWord2VecのモデルをCBOWモデルにすると精度は少し悪くなりました。
この記事で紹介したデータを用意する部分はdataset.pyにまとめました。
main.pyでデータの読み込み、Word2Vecの作成、SCDVを求めてテキスト分類をしています。
scdv.py
import logging
import pickle
import numpy as np
from gensim.models.word2vec import Word2Vec
from tqdm import tqdm
from sklearn.mixture import GaussianMixture
from sklearn.feature_extraction.text import TfidfVectorizer
def build_word2vec(sentences, embedding_dim, min_count, window_size, sg):
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
model = Word2Vec(sentences, size=embedding_dim, min_count=min_count, window=window_size, sg=sg)
return model
class SparseCompositeDocumentVectors:
def __init__(self, model, num_clusters, embedding_dim, pname1, pname2):
self.min_no = 0
self.max_no = 0
self.prob_wordvecs = {}
self.model = model
self.word_vectors = model.wv.syn0
self.num_clusters = num_clusters
self.num_features = embedding_dim
self.pname1 = pname1
self.pname2 = pname2
def cluster_GMM(self):
clf = GaussianMixture(
n_components=self.num_clusters,
covariance_type="tied",
init_params="kmeans",
max_iter=50
)
clf.fit(self.word_vectors)
idx = clf.predict(self.word_vectors)
print("Clustering Done...")
idx_proba = clf.predict_proba(self.word_vectors)
pickle.dump(idx, open(self.pname1, "wb"))
print("Cluster Assignments Saved...")
pickle.dump(idx_proba, open(self.pname2, "wb"))
print("Probabilities of Cluster Assignments saved...")
return (idx, idx_proba)
def read_GMM(self):
idx = pickle.load(open(self.idx_name, "rb"))
idx_proba = pickle.load(open(self.idx_proba_name, "rb"))
print("Cluster Model Loaded...")
return (idx, idx_proba)
def get_probability_word_vectors(self, corpus):
"""
corpus: list of lists of tokens
"""
idx, idx_proba = self.cluster_GMM()
word_centroid_map = dict(zip(self.model.wv.index2word, idx))
word_centroid_prob_map = dict(zip(self.model.wv.index2word, idx_proba))
tfv = TfidfVectorizer(dtype=np.float32)
corpus = [" ".join(data) for data in corpus]
tfidfmatrix_traindata = tfv.fit_transform(corpus)
featurenames = tfv.get_feature_names()
idf = tfv._tfidf.idf_
print("Creating word-idf dictionary for dataset...")
word_idf_dict = {}
for pair in zip(featurenames, idf):
word_idf_dict[pair[0]] = pair[1]
for word in word_centroid_map:
self.prob_wordvecs[word] = np.zeros(self.num_clusters * self.num_features, dtype="float32")
for index in range(self.num_clusters):
try:
self.prob_wordvecs[word][index*self.num_features:(index+1)*self.num_features] = \
self.model[word] * word_centroid_prob_map[word][index] * word_idf_dict[word]
except:
continue
self.word_centroid_map = word_centroid_map
def create_cluster_vector_and_gwbowv(self, tokens, flag):
bag_of_centroids = np.zeros(self.num_clusters * self.num_features, dtype="float32")
for token in tokens:
try:
temp = self.word_centroid_map[token]
except:
continue
bag_of_centroids += self.prob_wordvecs[token]
norm = np.sqrt(np.einsum('...i,...i', bag_of_centroids, bag_of_centroids))
if norm != 0:
bag_of_centroids /= norm
if flag:
self.min_no += min(bag_of_centroids)
self.max_no += max(bag_of_centroids)
return bag_of_centroids
def plain_word2vec_document_vectors(self, tokens):
bag_of_centroids = np.zeros(self.num_features, dtype="float32")
for token in tokens:
try:
temp = self.model[token]
except:
continue
bag_of_centroids += temp
bag_of_centroids = bag_of_centroids / len(tokens)
return bag_of_centroids
def make_gwbowv(self, corpus, train=True):
gwbowv = np.zeros((len(corpus), self.num_clusters*self.num_features)).astype(np.float32)
cnt = 0
for tokens in tqdm(corpus):
gwbowv[cnt] = self.create_cluster_vector_and_gwbowv(tokens, train)
cnt += 1
return gwbowv
def make_word2vec(self, corpus, model):
self.model = model
w2docv = np.zeros((len(corpus), self.num_features)).astype(np.float32)
cnt = 0
for tokens in tqdm(corpus):
w2docv[cnt] = self.plain_word2vec_document_vectors(tokens)
cnt += 1
return w2docv
def get_word2vec_document_vector(self, tokens):
return self.plain_word2vec_document_vectors(tokens)
def dump_gwbowv(self, gwbowv, path="gwbowv_matrix.npy", percentage=0.04):
min_no = self.min_no*1.0/gwbowv.shape[0]
max_no = self.max_no*1.0/gwbowv.shape[0]
print("Average min: ", min_no)
print("Average max: ", max_no)
thres = (abs(max_no) + abs(min_no))/2
thres = thres * percentage
temp = abs(gwbowv) < thres
gwbowv[temp] = 0
np.save(path, gwbowv)
print("SDV created and dumped...")
def load_matrix(self, name):
return np.load(name)
dataset.py
Twitterのツイートを読み込む部分は各自の環境に合わせて変えてください。
import MeCab
import codecs
import requests
from bs4 import BeautifulSoup
from sklearn.model_selection import train_test_split
corpus = {"Wikipedia": [], "Twitter": []}
def load_corpus():
url = "http://ja.wikipedia.org/wiki/"
wiki_name_list = ["織田信長", "豊臣秀吉", "徳川家康",
"武田信玄", "上杉謙信", "北条氏康",
"今川義元", "島津義弘", "毛利元就",
"伊達政宗"]
for wiki_name in wiki_name_list:
response = requests.get(url+wiki_name)
html = response.text
soup = BeautifulSoup(html, 'lxml')
for p_tag in soup.find_all('p'):
sentence = p_tag.text.strip()
if sentence == '':
continue
corpus["Wikipedia"].append(sentence)
corpus["Twitter"] = codecs.open('positive.txt', 'r', 'utf-8').read().splitlines()
tagger = MeCab.Tagger('-Owakati')
sentences = []
corpus_size = min(len(corpus["Wikipedia"]), len(corpus["Twitter"]))
for key in ["Wikipedia", "Twitter"]:
for _, sentence in zip(range(corpus_size), corpus[key]):
sentences.append(tagger.parse(sentence).strip().split())
train_x, test_x, train_t, test_t = train_test_split(sentences, [1]*corpus_size+[0]*corpus_size, test_size=0.1)
return sentences, train_x, test_x, train_t, test_t
main.py
import argparse
import dataset
from sklearn.svm import SVC
from scdv import SparseCompositeDocumentVectors, build_word2vec
def parse_args():
parser = argparse.ArgumentParser(
description="Word2VecとSCDVのパラメータの設定"
)
parser.add_argument(
'--embedding_dim', type=int, default=100
)
parser.add_argument(
'--min_count', type=int, default=5
)
parser.add_argument(
'--window_size', type=int, default=5
)
parser.add_argument(
'--sg', type=int, default=1
)
parser.add_argument(
'--num_clusters', type=int, default=2
)
parser.add_argument(
'--pname1', type=str, default="gmm_cluster.pkl"
)
parser.add_argument(
'--pname2', type=str, default="gmm_prob_cluster.pkl"
)
return parser.parse_args()
def main(args):
sentences, train_x, test_x, train_t, test_t = dataset.load_corpus()
model = build_word2vec(
sentences,
args.embedding_dim,
args.min_count,
args.window_size,
args.sg
)
vec = SparseCompositeDocumentVectors(
model,
args.num_clusters,
args.embedding_dim,
args.pname1,
args.pname2
)
vec.get_probability_word_vectors(train_x)
train_gwbowv = vec.make_gwbowv(train_x)
test_gwbowv = vec.make_gwbowv(test_x)
clf = SVC(kernel="linear")
y_pred = clf.fit(train_gwbowv, train_t).predict(test_gwbowv)
result = sum(1*(p == t) for p, t in zip(y_pred.tolist(), test_t)) / len(test_t)
print("Accuracy: {:.2f}".format(result))
if __name__ == "__main__":
main(parse_args())