解説
概要
異常検知は「正しい状態」を定めて、それからどれだけ乖離しているかで正常/異常を振り分けます。
つまり、正しい状態として時系列モデルを使ってしまい、そのモデルの予測値からの乖離をみて、正常/異常の判断を行います。
実装
以前の記事でARモデルは線形回帰モデルを利用していましたが、今回は王道としてstatsmodelsパッケージを利用しています。
例のごとくデータ生成は以前の記事に譲ります。
https://t.marufeuille.dev/entory/hotelings-theoryt.marufeuille.dev
データはこんな感じです。
import statsmodels.api as sm # モデル化&異常検知&可視化 fig, ax = plt.subplots(3, 2, figsize=(15, 10)) # データによってスケールがまちまちなので、閾値を個別に設定 threshs = [20, 16, 10, 5, 5, 5] for i in range(len(series)): s = series[i] model = sm.tsa.AR(s) LAG = 2 reg = model.fit(maxlag=LAG) predict = np.hstack([[s[j] for j in range(LAG)], reg.predict(start=LAG, end=N+LAG)]) anomaly = np.array([(s[j] - predict[j]) ** 2 for j in range(N)]) thresh = threshs[i] anomaly_idx = np.where(anomaly[:N] > thresh)[0] predict[0] = 0 predict[1] = 0 ax[i //2, i % 2].plot(np.arange(N), s, label="actual") ax[i //2, i % 2].plot(np.arange(N+LAG+1), predict, label="predict", color="orange") if len(anomaly_idx) != 0: anomaly_data = s[anomaly_idx] ax[i //2, i % 2].scatter(anomaly_idx, anomaly_data, label="anomaly", color="red", marker="x") ax[i //2, i % 2].legend()
こんな結果になります。
ARモデルは自己相関モデルなので、それできれいにモデリングできているものであればよく異常を察知できますし、逆にモデリングできないものであれば正常を異常と判断してしまっているところも多くなります。
まとめ
これまで、異常検知の色々なアルゴリズムについて実装しながら見てきました。
結局、万能な方法はないので、データの形を見ながら試していくしかないというオチではあるのですが、一通り試したことでどういった事ができるのかの大枠は掴めたかと思います。