えんじにあのじゆうちょう

勉強したことを中心にアウトプットしていきます。

コードで理解するPCAを用いた異常検知

はじめに

お仕事の都合もあり、O'Reilly Japan - Pythonではじめる教師なし学習をちょっと読んでいるのですが、PCAで異常検知ができるというところに興味を持ちました。
確かに、他の異常検知も基本は潜在表現を得て(例えばその自称が従う分布のパラメータ)それの95%範囲などから判断する、といったことをよくやるのでできるんだろうなぁ、と思いました。

一方で、そもそもPCA自体よく知らないなと思ったので、一度書いてみることにしました。

実装

まずはPCAの実装

まずは異常検知のことは忘れて、一旦PCAをわかりやすいデータでやってみようと思います。
わかりやすいというところで、mnistを使います。

from sklearn.datasets import fetch_openml
mnist = fetch_openml('mnist_784', version=1,)
x_train = mnist.data[:60000]
x_test = mnist.data[60000:]
y_train = mnist.target[:60000]
y_test = mnist.target[60000:]

こんな感じで、適当にtrainとtestでわけておきます。
なお、yは答え合わせというか、可視化に使うだけです。

PCAは圧縮する次元数を選択できますが、今回はグラフ化しやすさのため2次元に圧縮しようと思います。

import numpy as np
import matplotlib.pyplot as plt
import numpy.linalg as LA

# 共分散行列の計算
c = np.cov(x_train, rowvar=0, bias=0)
# 固有値、固有ベクトルの算出
w, v = LA.eigh(c)

m1 = np.argmax(w)
idx = [True] * w.shape[0]
idx[m1] = False
m2 = np.argmax(w[idx])

# 座標の変換
mean = np.mean(x_train, axis=0)
compressed = (x_train - mean).dot(np.hstack([v[:, m1][:, np.newaxis], v[:, m2][:, np.newaxis]]))

# 描画
for i in range(10):
    idx = (y_train.astype("int") == i)
    plt.scatter(compressed[idx, 0], compressed[idx, 1], label=str(i))
plt.legend()

f:id:marufeuillex:20200424211656p:plain
こんな感じでサクッとできてしまいます。

一応残しておいたテストデータでも試してみましょう。

compressed = (x_test - mean).dot(np.hstack([v[:, m1][:, np.newaxis], v[:, m2][:, np.newaxis]]))
for i in range(10):
    idx = (y_test.astype("int") == i)
    plt.scatter(compressed[idx, 0], compressed[idx, 1], label=str(i))
plt.legend()

f:id:marufeuillex:20200424211719p:plain

まあ良さそうですね。

異常検知

ではやってみましょう。
データはクレジットカードの取引情報で不正取引ラベルがついているデータです。(本の中で使っていたサンプルです)

https://github.com/aapatel09/handson-unsupervised-learning/raw/master/datasets/credit_card_data/credit_card.csv

やり方はPCAで圧縮後逆変換でもとに戻し、元のデータとの差分を取ります。
PCAは決められた次数の中で最もよくデータを表現する軸を残していくので、今回のデータで言うと不正取引は約30000件中400件ととても小さいので、不正取引に特徴があるのだとすればその特徴は圧縮の残す軸として選択されにくいだろうと考えられます。

そのため、逆変換後に差分が大きいものは異常であるというふうに言えます。

では実装します。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import numpy.linalg as LA

# データの用意
df = pd.read_csv("credit_card.csv")
credit = df.to_numpy()
X = credit[:, 1:30]
Y = credit[:, 30]

X_train = X[:int(X.shape[0] * 0.95)]
X_test = X[int(X.shape[0] * 0.95):]
Y_train = Y[:int(X.shape[0] * 0.95)].astype("int")
Y_test = Y[int(X.shape[0] * 0.95):].astype("int")

# PCA変換
c = np.cov(X_train, rowvar=0, bias=0)
w, v = LA.eigh(c)
to_dim = 10 # ここを変えると次元数をいじれる
vs = []
idx = [True] * w.shape[0]
for i in range(to_dim):
    ch = np.argmax(w[idx])
    vs.append(v[ch][:, np.newaxis])
    idx[ch] = False
mean = np.mean(X_train, axis=0)
compressed = (X_train - mean).dot(np.hstack(vs))

# 逆変換
decompressed = compressed.dot(LA.pinv(np.hstack(vs))) + mean

# 評価 二乗和誤差で順番に並べる
rank = list(reversed(np.argsort(np.sum((X_train - decompressed) ** 2, axis=1), )))
error = np.sum((Y_train.astype("int")==1) * 1)

# ROC曲線の描画
cnt = 0
point = [0, 0]
roc = []
roc.append([0, 0])
for ridx in rank:
    if Y_train[ridx] == 1:
            point[1] = point[1] +1
            cnt = cnt + 1
    else:
        point[0] = point[0] +1
    roc.append([point[0], point[1]])
    if cnt == error:
        roc.append([Y_train.shape[0], point[1]])
        break
roc = np.array(roc)
plt.plot(roc[:, 0] / Y_train.shape, roc[:, 1] / error)
plt.plot([0,1], color='black', linestyle='dashed')

dim=10に圧縮した結果を見てみましょう。
f:id:marufeuillex:20200424212727p:plain
割といい感じですね。

では次にdim=29 つまり無圧縮のときどうなるか見てみましょう。
f:id:marufeuillex:20200424212155p:plain
圧縮したときに比べてとても微妙です。単純に圧縮しないということはそもそも全てを表現できてしまうということで全く分離ができないので、こういうことになってしまいます。。

まとめ

今回はPCAの実装から異常検知の方法についてまとめてみました。
あくまで一番単純なパターンのみですが、本を読むと色々なバリエーションがありますので、興味ある人は読んでみると良いかと思います。

個人的にはここからVAE, Convolutional VAEの画像での異常検知とかをここからやっていきたいな、という気持ちです。

最後に

頑張ってコード化終えて一眠りしたら、twitterで超わかりやすいものが流れてきました。
自分は数式的には正確に理解できていないので、その記載はこの記事ではしていませんので、そういった点に興味がある場合は読んで頂くと良いと思います。
僕はとても勉強になりました!(理解しきれてる自信はない・・・)
noppoman.github.io