どん底から這い上がるまでの記録

どん底から這い上がりたいけど這い上がれない人がいろいろ書くブログ(主にプログラミング)

SCDVを使ったテキスト分類をしてみる

 

Word2Vecの単語ベクトルから、いい感じな文書ベクトルが作れるSCDVというのを使ってテキスト分類をしてみました。

SCDVって?

SCDVについて分かりやすく説明されている記事がQiitaにあるので詳細は下の記事を参照してください。

文書ベクトルをお手軽に高い精度で作れるSCDVって実際どうなのか日本語コーパスで実験した(EMNLP2017)

 

こちらは本家のGithubです。

GitHub - dheeraj7596/SCDV: Text classification with Sparse Composite Document Vectors.

今回やること

今回は2値分類、つまり与えられたデータが2つのクラスのうちどちらのクラスに属するのかをSCDVを使ってやってみます。

今回用いるデータはTwitterのツイートとWikipediaの記事です。これを使ってSCDVを構築し、与えられたデータがツイートなのかWikipediaの記事なのか正しく分類できるのかを見てみます。

今回使うもの

私の実行環境とか使うものについて。

requestsとBeautifulSoupについて以前少しだけ触れました。

tqdmはpipでインストールできます。

pip install tqdm

MeCabをインストールしていない方は下の記事を参考にしてみてください。

www.pytry3g.com

1. データの用意

まずデータの用意します。

import codecs
import requests
from bs4 import BeautifulSoup


# Wikipediaの記事とTwitterのツイートを入れる
corpus = {"Wikipedia": [], "Twitter": []}

必要なライブラリをインポートし、corpusWikipediaの記事とTwitterのツイートを入れていきます。

Wikipediaの記事をダウンロード

Wikipediaの記事をダウンロードするにはrequestsBeautifulSoupを使います。

今回ダウンロードする記事は戦国時代に活躍した大名の記事です。

### データの用意 ###
# Wikipediaの記事をダウンロード
url = "http://ja.wikipedia.org/wiki/"
# 記事のリスト
wiki_name_list = ["織田信長", "豊臣秀吉", "徳川家康",
                  "武田信玄", "上杉謙信", "北条氏康",
                  "今川義元", "島津義弘", "毛利元就",
                  "伊達政宗"]

for wiki_name in wiki_name_list:
    # 指定したページの情報を取得
    response = requests.get(url+wiki_name)
    # HTMLフォーマットに変換
    html = response.text
    soup = BeautifulSoup(html, 'lxml')
    # pタグで囲まれた部分を抽出
    for p_tag in soup.find_all('p'):
        sentence = p_tag.text.strip()
        if sentence == '':
            continue
        corpus["Wikipedia"].append(sentence)
# corpus["Wikipedia"][0]
# '織田 信長(おだ のぶなが)は、戦国時代から安土桃山時代にかけての武将・戦国大名。三英傑の一人。'

ツイートの用意

次にツイートを用意します。今回使うツイートは以前ロシアワールドカップの期間中に集めたツイートです。

自分が用意したデータはpositive.txtに入っているので以下のようなプログラムになります。

# Twitterのツイートを読み込む
corpus["Twitter"] = codecs.open('positive.txt', 'r', 'utf-8').read().splitlines()
# corpus["Twitter"][0]
# '原口と槙野も先発だと期待してます。'

形態素解析

SCDVを作るにはWord2Vecが必要になります。

gensimのWord2Vecを使うので、Wikipediaのデータとツイートを形態素解析List of lists of tokens の形(リストの中にリストがあり、そのリストの中身は形態素)にします。

import MeCab

tagger = MeCab.Tagger('-Owakati')
sentences = []
corpus_size = min(len(corpus["Wikipedia"]), len(corpus["Twitter"]))
for key in ["Wikipedia", "Twitter"]:
    for _, sentence in zip(range(corpus_size), corpus[key]):
        sentences.append(tagger.parse(sentence).strip().split())
# sentences[-1]
# ['まあ', '日本', '単独', 'だ', 'と', '無理', 'か']

Wikipediaとツイートのデータを同じ数にしたいので少ないほうのデータのサイズに合わせました。sentencesにはWikipediaとツイートがそれぞれ742件のデータが形態素解析された状態で入っています。Word2Vecにはこのsentencesを渡して単語ベクトルを作ります。

訓練データとテストデータに分ける

SCDVの性能を測るために訓練データとテストデータに分けます。

Wikipediaは1、Twitterは0にラベル付けしました。

from sklearn.model_selection import train_test_split
# Wikipedia-> 1, Twitter-> 0 にラベル付けする。
train_x, test_x, train_t, test_t = train_test_split(sentences, [1]*corpus_size+[0]*corpus_size, test_size=0.1)

SCDVを作る

データの用意ができたので、これからSCDVを作ります。本家のプログラムではSCDVを作るコードは関数で定義されています。個人的に使いずらいなと感じたので、それらの関数をまとめてSCDVを作りやすいように?クラスで書き直してみました。ちょっと長いので書き直したコード(scdv.py)はこの記事の下のソースコードのところに置いておきます。

新しく書き直したコードを使えばSCDVの作成手順は大雑把に言えば次の通りです。

  1. Word2Vecを作る
  2. 確率重み付き単語ベクトルを求める
  3. SCDVを作る

より詳細な説明はこちら

これからこの記事の下に置いておいたscdv.pyを使ってSCDVを作っていきます。

1. Word2Vecを作る

さっき用意したsentencesを渡して次元が100の単語ベクトルを作っています。ちなみに、モデルはskip-gramモデルです。

from scdv import SparseCompositeDocumentVectors, build_word2vec
model = build_word2vec(sentences, 100, 5, 5, 1)

2. 確率重み付き単語ベクトルを求める

訓練データを使って作成した単語ベクトルから確率重み付き単語ベクトルを求めます。

今回は2クラスのテキスト分類なのでクラスタの数は2としました。

pname1 = "gmm_cluster.pkl"
pname2 = "gmm_prob_cluster.pkl"
vec = SparseCompositeDocumentVectors(model, 2, 100, pname1, pname2)
# 確率重み付き単語ベクトルを求める
vec.get_probability_word_vectors(train_x)

3. SCDVを求める。

確率重み付き単語ベクトルを求めたら訓練データとテストデータのSCDVをそれぞれ求めます。

# 訓練データからSCDVを求める
train_gwbowv = vec.make_gwbowv(train_x)
# テストデータからSCDVを求める 
test_gwbowv = vec.make_gwbowv(test_x)

テキスト分類

SCDVを求めることができたので、SCDVの性能をサポートベクターマシンを使って見てみます。

from sklearn.svm import SVC
# Test
clf = SVC(kernel="linear")
y_pred = clf.fit(train_gwbowv, train_t).predict(test_gwbowv)
result = sum(1*(p == t) for p, t in zip(y_pred.tolist(), test_t)) / len(test_t)
print("Accuracy: {:.2f}".format(result))

訓練データのSCDVとラベルをサポートベクターマシンの学習モデルに渡して学習し、テストデータのSCDVを学習モデルに渡して予測結果を出力させます。

結果

結果はこうなりました。

Accuracy: 0.99

かなり高いですね。Wikipediaの記事とTwitterのツイートがこんなに高い精度で分類できるとは思いませんでした。

ちなみにWord2VecのモデルをCBOWモデルにすると精度は少し悪くなりました。

ソースコード

この記事で紹介したデータを用意する部分はdataset.pyにまとめました。

main.pyでデータの読み込み、Word2Vecの作成、SCDVを求めてテキスト分類をしています。

scdv.py

import logging
import pickle
import numpy as np
from gensim.models.word2vec import Word2Vec
from tqdm import tqdm
from sklearn.mixture import GaussianMixture
from sklearn.feature_extraction.text import TfidfVectorizer


def build_word2vec(sentences, embedding_dim, min_count, window_size, sg):
    logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
    # モデルを作る
    model = Word2Vec(sentences, size=embedding_dim, min_count=min_count, window=window_size, sg=sg)
    return model


class SparseCompositeDocumentVectors:
    def __init__(self, model, num_clusters, embedding_dim, pname1, pname2):
        self.min_no = 0
        self.max_no = 0
        self.prob_wordvecs = {}
        self.model = model
        self.word_vectors = model.wv.syn0
        self.num_clusters = num_clusters
        self.num_features = embedding_dim
        self.pname1 = pname1
        self.pname2 = pname2

    def cluster_GMM(self):
        # Initalize a GMM object and use it for clustering.
        clf = GaussianMixture(
            n_components=self.num_clusters,
            covariance_type="tied",
            init_params="kmeans",
            max_iter=50
        )
        # Get cluster assignments.
        clf.fit(self.word_vectors)
        idx = clf.predict(self.word_vectors)
        print("Clustering Done...")
        # Get probabilities of cluster assignments.
        idx_proba = clf.predict_proba(self.word_vectors)
        # Dump cluster assignments and probability of cluster assignments.
        pickle.dump(idx, open(self.pname1, "wb"))
        print("Cluster Assignments Saved...")
        pickle.dump(idx_proba, open(self.pname2, "wb"))
        print("Probabilities of Cluster Assignments saved...")
        return (idx, idx_proba)

    def read_GMM(self):
        # Loads cluster assignments and probability of cluster assignments.
        idx = pickle.load(open(self.idx_name, "rb"))
        idx_proba = pickle.load(open(self.idx_proba_name, "rb"))
        print("Cluster Model Loaded...")
        return (idx, idx_proba)

    def get_probability_word_vectors(self, corpus):
        """
        corpus: list of lists of tokens
        """
        # This function computes probability word-cluster vectors.
        idx, idx_proba = self.cluster_GMM()

        # Create a Word / Index dictionary, mapping each vocabulary word
        # to a cluster number
        word_centroid_map = dict(zip(self.model.wv.index2word, idx))
        # Create Word / Probability of cluster assignment dictionary, mapping
        # each vocabulary word to list of probabilities of cluster assignments.
        word_centroid_prob_map = dict(zip(self.model.wv.index2word, idx_proba))

        # Comoputing tf-idf values
        tfv = TfidfVectorizer(dtype=np.float32)
        # transform corpus to get tfidf value
        corpus = [" ".join(data) for data in corpus]
        tfidfmatrix_traindata = tfv.fit_transform(corpus)
        featurenames = tfv.get_feature_names()
        idf = tfv._tfidf.idf_
        # Creating a dictionary with word mapped to its idf value
        print("Creating word-idf dictionary for dataset...")
        word_idf_dict = {}
        for pair in zip(featurenames, idf):
            word_idf_dict[pair[0]] = pair[1]

        for word in word_centroid_map:
            self.prob_wordvecs[word] = np.zeros(self.num_clusters * self.num_features, dtype="float32")
            for index in range(self.num_clusters):
                try:
                    self.prob_wordvecs[word][index*self.num_features:(index+1)*self.num_features] = \
                        self.model[word] * word_centroid_prob_map[word][index] * word_idf_dict[word]
                except:
                    continue
        self.word_centroid_map = word_centroid_map

    def create_cluster_vector_and_gwbowv(self, tokens, flag):
        # This function computes SDV feature vectors.
        bag_of_centroids = np.zeros(self.num_clusters * self.num_features, dtype="float32")
        for token in tokens:
            try:
                temp = self.word_centroid_map[token]
            except:
                continue
            bag_of_centroids += self.prob_wordvecs[token]
        norm = np.sqrt(np.einsum('...i,...i', bag_of_centroids, bag_of_centroids))
        if norm != 0:
            bag_of_centroids /= norm
        # To make feature vector sparse, make note of minimum and maximum values.
        if flag:
            self.min_no += min(bag_of_centroids)
            self.max_no += max(bag_of_centroids)
        return bag_of_centroids

    def plain_word2vec_document_vectors(self, tokens):
        bag_of_centroids = np.zeros(self.num_features, dtype="float32")
        for token in tokens:
            try:
                temp = self.model[token]
            except:
                continue
            bag_of_centroids += temp

        bag_of_centroids = bag_of_centroids / len(tokens)
        return bag_of_centroids

    def make_gwbowv(self, corpus, train=True):
        # gwbowv is a matrix which contains normalized document vectors.
        gwbowv = np.zeros((len(corpus), self.num_clusters*self.num_features)).astype(np.float32)
        cnt = 0
        for tokens in tqdm(corpus):
            gwbowv[cnt] = self.create_cluster_vector_and_gwbowv(tokens, train)
            cnt += 1
        return gwbowv

    def make_word2vec(self, corpus, model):
        self.model = model
        w2docv = np.zeros((len(corpus), self.num_features)).astype(np.float32)
        cnt = 0
        for tokens in tqdm(corpus):
            w2docv[cnt] = self.plain_word2vec_document_vectors(tokens)
            cnt += 1
        return w2docv

    def get_word2vec_document_vector(self, tokens):
        # tokens: list of tokens
        return self.plain_word2vec_document_vectors(tokens)

    def dump_gwbowv(self, gwbowv, path="gwbowv_matrix.npy", percentage=0.04):
        # Set the threshold percentage for making it sparse.
        min_no = self.min_no*1.0/gwbowv.shape[0]
        max_no = self.max_no*1.0/gwbowv.shape[0]
        print("Average min: ", min_no)
        print("Average max: ", max_no)
        thres = (abs(max_no) + abs(min_no))/2
        thres = thres * percentage
        # Make values of matrices which are less than threshold to zero.
        temp = abs(gwbowv) < thres
        gwbowv[temp] = 0
        np.save(path, gwbowv)
        print("SDV created and dumped...")

    def load_matrix(self, name):
        return np.load(name)

dataset.py

Twitterのツイートを読み込む部分は各自の環境に合わせて変えてください。

import MeCab
import codecs
import requests
from bs4 import BeautifulSoup
from sklearn.model_selection import train_test_split


# Wikipediaの記事とTwitterのツイートを入れる
corpus = {"Wikipedia": [], "Twitter": []}
def load_corpus():
    ### データの用意 ###
    # Wikipediaの記事をダウンロード
    url = "http://ja.wikipedia.org/wiki/"
    # 記事のリスト
    wiki_name_list = ["織田信長", "豊臣秀吉", "徳川家康",
                      "武田信玄", "上杉謙信", "北条氏康",
                      "今川義元", "島津義弘", "毛利元就",
                      "伊達政宗"]

    for wiki_name in wiki_name_list:
        # 指定したページの情報を取得
        response = requests.get(url+wiki_name)
        # HTMLフォーマットに変換
        html = response.text
        soup = BeautifulSoup(html, 'lxml')
        # pタグで囲まれた部分を抽出
        for p_tag in soup.find_all('p'):
            sentence = p_tag.text.strip()
            if sentence == '':
                continue
            corpus["Wikipedia"].append(sentence)
    # corpus["Wikipedia"][0]
    # '織田 信長(おだ のぶなが)は、戦国時代から安土桃山時代にかけての武将・戦国大名。三英傑の一人。'

    # Twitterのツイートを読み込む
    corpus["Twitter"] = codecs.open('positive.txt', 'r', 'utf-8').read().splitlines()
    # corpus["Twitter"][0]
    # '原口と槙野も先発だと期待してます。'
    tagger = MeCab.Tagger('-Owakati')
    sentences = []
    corpus_size = min(len(corpus["Wikipedia"]), len(corpus["Twitter"]))
    for key in ["Wikipedia", "Twitter"]:
        for _, sentence in zip(range(corpus_size), corpus[key]):
            sentences.append(tagger.parse(sentence).strip().split())
    # sentences[-1]
    # ['まあ', '日本', '単独', 'だ', 'と', '無理', 'か']

    # Wikipedia-> 1, Twitter-> 0 にラベル付けする。
    train_x, test_x, train_t, test_t = train_test_split(sentences, [1]*corpus_size+[0]*corpus_size, test_size=0.1)
    return sentences, train_x, test_x, train_t, test_t

main.py

import argparse
import dataset
from sklearn.svm import SVC
from scdv import SparseCompositeDocumentVectors, build_word2vec

def parse_args():
    parser = argparse.ArgumentParser(
        description="Word2VecとSCDVのパラメータの設定"
    )
    parser.add_argument(
        '--embedding_dim', type=int, default=100
    )
    parser.add_argument(
        '--min_count', type=int, default=5
    )
    parser.add_argument(
        '--window_size', type=int, default=5
    )
    parser.add_argument(
        '--sg', type=int, default=1
    )
    parser.add_argument(
        '--num_clusters', type=int, default=2
    )
    parser.add_argument(
        '--pname1', type=str, default="gmm_cluster.pkl"
    )
    parser.add_argument(
        '--pname2', type=str, default="gmm_prob_cluster.pkl"
    )

    return parser.parse_args()

def main(args):
    sentences, train_x, test_x, train_t, test_t = dataset.load_corpus()
    # Word2Vecを作る
    model = build_word2vec(
        sentences,
        args.embedding_dim,
        args.min_count,
        args.window_size,
        args.sg
    )
    vec = SparseCompositeDocumentVectors(
        model,
        args.num_clusters,
        args.embedding_dim,
        args.pname1,
        args.pname2
    )
    # 確率重み付き単語ベクトルを求める
    vec.get_probability_word_vectors(train_x)
    # 訓練データからSCDVを求める
    train_gwbowv = vec.make_gwbowv(train_x)
    # テストデータからSCDVを求める
    test_gwbowv = vec.make_gwbowv(test_x)

    # Test
    clf = SVC(kernel="linear")
    y_pred = clf.fit(train_gwbowv, train_t).predict(test_gwbowv)
    result = sum(1*(p == t) for p, t in zip(y_pred.tolist(), test_t)) / len(test_t)
    print("Accuracy: {:.2f}".format(result))

if __name__ == "__main__":
    main(parse_args())