PyTorchを使ってSMSSpamCollectionの分類をしてみる。(2)

前回の続き、今回はTFIDFを使ってスパム分類をしてみる。

pytry3g.hatenablog.com

前準備

import argparse
import codecs
import string
import numpy as np
from nltk import word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as O
from torch.autograd import Variable


class Tfidf(nn.Module):
    def __init__(self, n_in, n_hidden, n_out):
        super(Tfidf, self).__init__()
        self.l1 = nn.Linear(n_in, n_hidden)
        self.l2 = nn.Linear(n_hidden, n_hidden)
        self.l3 = nn.Linear(n_hidden, n_out)

    def forward(self, x):
        h = F.relu(self.l1(x))
        h = F.relu(self.l2(h))
        y = F.softmax(self.l3(h))
        return y

def tokenizer(text):
    words = word_tokenize(text)
    return words

parser = argparse.ArgumentParser()
parser.add_argument('--batchsize', '-b', type=int, default=50,
                    help="Number of minibatchsize...")
parser.add_argument('--epoch', '-e', type=int, default=100,
                    help="Number of epochs...")
parser.add_argument('--learning_rate', '-lr', type=float, default=0.1,
                    help="Learning rate...")
parser.add_argument('--units', '-u', type=int, default=100,
                    help="Number of units...")
args = parser.parse_args()

TFIDF

前回と同じようにデータをテキストとラベルに分けて正規化する。 前回と違う点はテキストデータをTFIDFで表現するところ。 scikit-learnを使えば簡単に実装できます。

# Create TF-IDF of text data
tfidf = TfidfVectorizer(tokenizer=tokenizer, stop_words="english",
                        max_features=1000)
sparse_tfidf = tfidf.fit_transform(text)
N, _ = sparse_tfidf.shape
# Split data into train set and test one
train_indices = np.random.choice(N, round(0.8*N), replace=False)
test_indices = np.array(list(set(range(N)) - set(train_indices)))
train_x = sparse_tfidf[train_indices]
train_t = np.array([x for i, x in enumerate(label) if i in train_indices])
test_x = sparse_tfidf[test_indices]
test_t = np.array([x for i, x in enumerate(label) if i in test_indices])

Training

permutationを使ってindexをシャッフルしミニバッチ学習する。

for epoch in range(epochs):
    if epoch % 10 == 0:
        print("Epoch {}".format(epoch))

    # Get random indices of train set
    indices = np.random.permutation([i for i in range(train_x.shape[0])])
    for i in range(0, len(indices), batchsize):
        start = i
        end = min(i+batchsize, len(indices))
        x = Variable(torch.FloatTensor([data.tolist()[0] for data in train_x[start:end].todense()]))
        t = Variable(torch.LongTensor(np.asarray(np.transpose(train_t[start:end]).astype(np.int64))))
        optimizer.zero_grad()
        y = model(x)
        loss = criterion(y, t)
        loss.backward()
        optimizer.step()
    history['loss'].append(loss.data[0])

結果

0.86

結果は0.86となりました。tfidfのほうがいい結果になると思ったが、前回より若干悪くなってしまった。 原因はよくわからない。

コード

import argparse
import codecs
import string
import numpy as np
from nltk import word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as O
from torch.autograd import Variable


class Tfidf(nn.Module):
    def __init__(self, n_in, n_hidden, n_out):
        super(Tfidf, self).__init__()
        self.l1 = nn.Linear(n_in, n_hidden)
        self.l2 = nn.Linear(n_hidden, n_hidden)
        self.l3 = nn.Linear(n_hidden, n_out)

    def forward(self, x):
        h = F.relu(self.l1(x))
        h = F.relu(self.l2(h))
        y = F.softmax(self.l3(h))
        return y

def tokenizer(text):
    words = word_tokenize(text)
    return words

parser = argparse.ArgumentParser()
parser.add_argument('--batchsize', '-b', type=int, default=50,
                    help="Number of minibatchsize...")
parser.add_argument('--epoch', '-e', type=int, default=100,
                    help="Number of epochs...")
parser.add_argument('--learning_rate', '-lr', type=float, default=0.1,
                    help="Learning rate...")
parser.add_argument('--units', '-u', type=int, default=100,
                    help="Number of units...")
args = parser.parse_args()

path = "SMSSpamCollection"
with codecs.open(path, "r", "utf-8") as f:
    data = f.read().splitlines()

# Split data into text data and label one.
text = [d.split('\t')[1] for d in data]
label = [[0, 1][d.split('\t')[0]=="spam"] for d in data]

# Normalization
# Remove punctuation
text = ["".join(ch for ch in line if ch not in string.punctuation) for line in text]
# Change all characters to lowercase
text = [line.lower() for line in text]

# Create TF-IDF of text data
tfidf = TfidfVectorizer(tokenizer=tokenizer, stop_words="english",
                        max_features=1000)
sparse_tfidf = tfidf.fit_transform(text)
N, _ = sparse_tfidf.shape
# Split data into train set and test one
train_indices = np.random.choice(N, round(0.8*N), replace=False)
test_indices = np.array(list(set(range(N)) - set(train_indices)))
train_x = sparse_tfidf[train_indices]
train_t = np.array([x for i, x in enumerate(label) if i in train_indices])
test_x = sparse_tfidf[test_indices]
test_t = np.array([x for i, x in enumerate(label) if i in test_indices])

batchsize = args.batchsize
epochs = args.epoch
learning_rate = args.learning_rate
n_in = train_x.shape[1]
n_hidden = args.units
n_out = 2
history = {"loss": []}

# Model
model = Tfidf(n_in, n_hidden, n_out)
# Loss
criterion = nn.CrossEntropyLoss()
# Optimizer
optimizer = O.Adam(model.parameters(), lr=learning_rate)

for epoch in range(epochs):
    if epoch % 10 == 0:
        print("Epoch {}".format(epoch))

    # Get random indices of train set
    indices = np.random.permutation([i for i in range(train_x.shape[0])])
    for i in range(0, len(indices), batchsize):
        start = i
        end = min(i+batchsize, len(indices))
        x = Variable(torch.FloatTensor([data.tolist()[0] for data in train_x[start:end].todense()]))
        t = Variable(torch.LongTensor(np.asarray(np.transpose(train_t[start:end]).astype(np.int64))))
        optimizer.zero_grad()
        y = model(x)
        loss = criterion(y, t)
        loss.backward()
        optimizer.step()
    history['loss'].append(loss.data[0])

var = Variable(torch.FloatTensor([data.tolist()[0] for data in test_x.todense()]), requires_grad=False)
result = model(var)
predicted = torch.max(result, 1)[1]
print("{:.2f}".format(sum(p == t for p, t in zip(predicted.data, test_t)) / len(test_t)))


github.com