えんじにあのじゆうちょう

勉強したことを中心にアウトプットしていきます。

【異常検知】ホテリング理論による単純な実装

はじめに

異常検知はいろいろな方法がありますが、どれもよくわかっていませんでした。
まずは、調べた中で最も単純だと思われる「ホテリング理論」についてまとめてみます。


解説

概要

ぶっちゃけてしまうと、以下のAlbert社の記事がとてもとてもわかりやすいです。
www.albert2005.co.jp

そもそも異常検知は「正常」となる状態を定めてその正常から逸脱する場合に「異常」として扱うようなロジックとなります。
ホテリング理論の場合、正常となるデータの出現する確率分布を定義して、その分布から大きく逸脱する場合に異常値であると判定します。
また、確率分布として正規分布を仮定します。そのため、母平均を\mu, 標準偏差を\sigmaとすると、この事象の確率密度関数は以下の通り表されます。

 p(x|\mu, \sigma) = \frac{1}{\sqrt{2\pi\sigma^2}}e^{-\frac{(x-\mu)^2}{\sigma^2}}

これの対数を取ったものをアノマリ度として利用します。

 a(x) = log(p(x|\mu, \sigma))

これを整理すると、

 a(x) = \frac{(x-\mu)^2}{\sigma^2}

となりますが、これは自由度1のχ2乗分布に従います。

これについては、以下の記事がわかりやすいです。

bellcurve.jp

分布さえ定義できてしまえば、あとはχ2乗分布の分布に対して閾値となるp値を決定し、そのp値を上回るか下回るかで判断ができます。

やってみよう

それではコードを書いて実現してみます。
まずはデータを用意します。

import numpy as np
import matplotlib.pyplot as plt

N = 100
series = []

np.random.seed(5)
series1 = np.random.normal(loc=0, scale=1, size=(N,))
series1[66] = 7
series.append(series1)

np.random.seed(4)
series_a = np.random.normal(loc=0, scale=1, size=(N//2,))
series_b = np.random.normal(loc=5, scale=1, size=(N//2,))
series_b[6] = 0
series2 = np.hstack([series_a, series_b])
series.append(series2)

np.random.seed(4)
series_a = np.random.normal(loc=0, scale=1, size=(N//2,))
series_b = np.random.normal(loc=5, scale=1, size=(N//2,))
series_b[16] = 10
series3 = np.hstack([series_a, series_b])
series.append(series3)

def f(x):
    return 0.01 * x
np.random.seed(5)
base = f(np.arange(N))
series4 = base + np.random.normal(loc=0, scale=1, size=(N,))
series4[10] = 20
series4[15] = 20
series4[20] = 20
series4[25] = 20
series4[30] = 20
series.append(series4)

def sin(x):
    return 2 * np.sin(0.1 * x)
np.random.seed(5)
base = sin(np.arange(N))
series5 = base + np.random.normal(loc=0, scale=0.5, size=(N,))
series5[16] = 10
series.append(series5)

def f(x_t1, x_t2, x_t3):
    return 1.3 * x_t1 - 0.4 * x_t2  + np.random.normal(loc=0, scale=1)
np.random.seed(5)
series6 = np.zeros((N,))
series6[0] = np.random.normal(1, 1)
series6[1] = np.random.normal(2.0, 1)
for i in range(2, N):
    series6[i] = f(series6[i-1],series6[i-2],series6[i-3])
series.append(series6)

fig, ax = plt.subplots(3, 2, figsize=(15, 10))
for i in range(NUM_SERIES):
    ax[i //2, i % 2].plot(np.arange(N), series[i], label="actual")
    ax[i //2, i % 2].legend()

f:id:marufeuillex:20191214205628p:plain

適当にいろいろな種類用意しています。
ちなみに今回は1~3までくらいしか使わないのですが、他の実験もやってる都合6種類作っています。
それについてはまた後日書きます。。。

それぞれの分布について \mu \sigmaを求めます。

mu = np.mean(series, axis=1)
sigma = np.std(series, axis=1)

あとはこの母数を元に、自由度1のχ2乗分布を見ればよいわけです。
ここでは、異常とみなす確率を0.5%として計算してみます。

# アノマリ値を計算
def a(x, mu, sigma):
    return ((x - mu) / sigma) ** 2

result = []
for i in range(len(series)):
    tmp = []
    s = series[i]
    m = mu[i]
    sig = sigma[i]
    for x in s:
        tmp.append(scipy.stats.chi2.cdf(a(x, m, sig), df=1))
    result.append(tmp)

# 発生確率がP%以下のものをアノマリとして扱う
P = 0.5
anomaly_thresh = 1 - (P/100)
anomaly_idx = np.array(np.where(np.array(result) > anomaly_thresh))

# 可視化
fig, ax = plt.subplots(3, 2, figsize=(15, 10))
for i in range(NUM_SERIES):
    anomaly_data = np.array(series[i])[anomaly_idx.T[anomaly_idx[0] == i][:, 1]]
    ax[i //2, i % 2].plot(np.arange(N), series[i], label="actual")
    ax[i //2, i % 2].scatter(anomaly_idx.T[anomaly_idx[0] == i][:, 1], anomaly_data, color="red", marker="x", label="anomaly")
    ax[i //2, i % 2].legend()

f:id:marufeuillex:20191214210335p:plain

非常に単純な実装ができました。

うまくいくケース、いかないケース

これも前述のAlbert社のブログにしっかり書かれているので、それが詳しいですが改めて。

1番目の絵のようなさも正規分布で表されそうなものであればうまく検知できます。
一方で、2番目、3番目のように途中で大きく\muの値が変化するようなケースでは正しく検知することはできません。
ホテリング理論は実装も単純で良いのですが、データの変動が大きい場合はあまりうまくいかないことに注意しなくてはいけません。

まとめ

今回は異常検知で最も単純そうなホテリング理論について、Pythonでの実装を通して学習しました。
単純な方法ですが、場面を見極めれば十分使える思うので、しっかり理解しておきましょう。