どん底から這い上がるまでの記録

どん底から這い上がりたいけど這い上がれない人がいろいろ書くブログ(主にプログラミング)

PyTorchを使ってSMSSpamCollectionの分類をしてみる。(2)

前回の続き、今回はTFIDFを使ってスパム分類をしてみる。

pytry3g.hatenablog.com

前準備

import argparse
import codecs
import string
import numpy as np
from nltk import word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as O
from torch.autograd import Variable


class Tfidf(nn.Module):
    def __init__(self, n_in, n_hidden, n_out):
        super(Tfidf, self).__init__()
        self.l1 = nn.Linear(n_in, n_hidden)
        self.l2 = nn.Linear(n_hidden, n_hidden)
        self.l3 = nn.Linear(n_hidden, n_out)

    def forward(self, x):
        h = F.relu(self.l1(x))
        h = F.relu(self.l2(h))
        y = F.softmax(self.l3(h))
        return y

def tokenizer(text):
    words = word_tokenize(text)
    return words

parser = argparse.ArgumentParser()
parser.add_argument('--batchsize', '-b', type=int, default=50,
                    help="Number of minibatchsize...")
parser.add_argument('--epoch', '-e', type=int, default=100,
                    help="Number of epochs...")
parser.add_argument('--learning_rate', '-lr', type=float, default=0.1,
                    help="Learning rate...")
parser.add_argument('--units', '-u', type=int, default=100,
                    help="Number of units...")
args = parser.parse_args()

TFIDF

前回と同じようにデータをテキストとラベルに分けて正規化する。 前回と違う点はテキストデータをTFIDFで表現するところ。 scikit-learnを使えば簡単に実装できます。

# Create TF-IDF of text data
tfidf = TfidfVectorizer(tokenizer=tokenizer, stop_words="english",
                        max_features=1000)
sparse_tfidf = tfidf.fit_transform(text)
N, _ = sparse_tfidf.shape
# Split data into train set and test one
train_indices = np.random.choice(N, round(0.8*N), replace=False)
test_indices = np.array(list(set(range(N)) - set(train_indices)))
train_x = sparse_tfidf[train_indices]
train_t = np.array([x for i, x in enumerate(label) if i in train_indices])
test_x = sparse_tfidf[test_indices]
test_t = np.array([x for i, x in enumerate(label) if i in test_indices])

Training

permutationを使ってindexをシャッフルしミニバッチ学習する。

for epoch in range(epochs):
    if epoch % 10 == 0:
        print("Epoch {}".format(epoch))

    # Get random indices of train set
    indices = np.random.permutation([i for i in range(train_x.shape[0])])
    for i in range(0, len(indices), batchsize):
        start = i
        end = min(i+batchsize, len(indices))
        x = Variable(torch.FloatTensor([data.tolist()[0] for data in train_x[start:end].todense()]))
        t = Variable(torch.LongTensor(np.asarray(np.transpose(train_t[start:end]).astype(np.int64))))
        optimizer.zero_grad()
        y = model(x)
        loss = criterion(y, t)
        loss.backward()
        optimizer.step()
    history['loss'].append(loss.data[0])

結果

0.86

結果は0.86となりました。tfidfのほうがいい結果になると思ったが、前回より若干悪くなってしまった。 原因はよくわからない。

コード

import argparse
import codecs
import string
import numpy as np
from nltk import word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as O
from torch.autograd import Variable


class Tfidf(nn.Module):
    def __init__(self, n_in, n_hidden, n_out):
        super(Tfidf, self).__init__()
        self.l1 = nn.Linear(n_in, n_hidden)
        self.l2 = nn.Linear(n_hidden, n_hidden)
        self.l3 = nn.Linear(n_hidden, n_out)

    def forward(self, x):
        h = F.relu(self.l1(x))
        h = F.relu(self.l2(h))
        y = F.softmax(self.l3(h))
        return y

def tokenizer(text):
    words = word_tokenize(text)
    return words

parser = argparse.ArgumentParser()
parser.add_argument('--batchsize', '-b', type=int, default=50,
                    help="Number of minibatchsize...")
parser.add_argument('--epoch', '-e', type=int, default=100,
                    help="Number of epochs...")
parser.add_argument('--learning_rate', '-lr', type=float, default=0.1,
                    help="Learning rate...")
parser.add_argument('--units', '-u', type=int, default=100,
                    help="Number of units...")
args = parser.parse_args()

path = "SMSSpamCollection"
with codecs.open(path, "r", "utf-8") as f:
    data = f.read().splitlines()

# Split data into text data and label one.
text = [d.split('\t')[1] for d in data]
label = [[0, 1][d.split('\t')[0]=="spam"] for d in data]

# Normalization
# Remove punctuation
text = ["".join(ch for ch in line if ch not in string.punctuation) for line in text]
# Change all characters to lowercase
text = [line.lower() for line in text]

# Create TF-IDF of text data
tfidf = TfidfVectorizer(tokenizer=tokenizer, stop_words="english",
                        max_features=1000)
sparse_tfidf = tfidf.fit_transform(text)
N, _ = sparse_tfidf.shape
# Split data into train set and test one
train_indices = np.random.choice(N, round(0.8*N), replace=False)
test_indices = np.array(list(set(range(N)) - set(train_indices)))
train_x = sparse_tfidf[train_indices]
train_t = np.array([x for i, x in enumerate(label) if i in train_indices])
test_x = sparse_tfidf[test_indices]
test_t = np.array([x for i, x in enumerate(label) if i in test_indices])

batchsize = args.batchsize
epochs = args.epoch
learning_rate = args.learning_rate
n_in = train_x.shape[1]
n_hidden = args.units
n_out = 2
history = {"loss": []}

# Model
model = Tfidf(n_in, n_hidden, n_out)
# Loss
criterion = nn.CrossEntropyLoss()
# Optimizer
optimizer = O.Adam(model.parameters(), lr=learning_rate)

for epoch in range(epochs):
    if epoch % 10 == 0:
        print("Epoch {}".format(epoch))

    # Get random indices of train set
    indices = np.random.permutation([i for i in range(train_x.shape[0])])
    for i in range(0, len(indices), batchsize):
        start = i
        end = min(i+batchsize, len(indices))
        x = Variable(torch.FloatTensor([data.tolist()[0] for data in train_x[start:end].todense()]))
        t = Variable(torch.LongTensor(np.asarray(np.transpose(train_t[start:end]).astype(np.int64))))
        optimizer.zero_grad()
        y = model(x)
        loss = criterion(y, t)
        loss.backward()
        optimizer.step()
    history['loss'].append(loss.data[0])

var = Variable(torch.FloatTensor([data.tolist()[0] for data in test_x.todense()]), requires_grad=False)
result = model(var)
predicted = torch.max(result, 1)[1]
print("{:.2f}".format(sum(p == t for p, t in zip(predicted.data, test_t)) / len(test_t)))


github.com