えんじにあのじゆうちょう

勉強したことを中心にアウトプットしていきます。

kubeflowで機械学習パイプラインを試してみる(処理コンテナ実装編)

はじめに

前回に引き続き、Kubeflow pipelineを使って機械学習パイプラインを組んで見る第2段です。

今回は、pipeline処理を行うコンテナと処理を1つ作ってみて、雰囲気に触れてみようと思います。

前回の記事はこちら
t.marufeuille.dev

やってみる

対象とするデータ

今回は学習の前にデータの前処理を行うコンテナを作成することにしてみます。
対象データは個人的に慣れているこれにします。
archive.ics.uci.edu

インフラ的な構成

Kubeflowは前回作ったものを使いますが、ファイルの置き場は外部でアクセス可能なところにする必要があります。
ローカルになにか用意しても良かったのですが、仕事の兼ね合いもありAlibabaCloudがサクッと触れる状態だったので、AlibabaCloudのObjectStorageService上にinput / outputファイルを置くこととします。
(というよりも、ローカルのメモリをほぼkubeflowが食いつぶしていて、ローカルにおけなかった・・・)
絵で書くとこんな感じです。

f:id:marufeuillex:20200308094414p:plain

Dockerfileの作成

今回はpythonさえ使えればよいので、公式のpythonイメージを利用します。
あとは追加で必要なoss2(AlibabaCloud OSSのSDK)を追加し、後段で作るprocess.pyをコピーしておくようにしておきます。

FROM python:latest
RUN apt -y update && apt -y upgrade && apt -y install python-dev && mkdir /ml
RUN pip3 install oss2 pandas numpy
COPY ./src/process.py /ml/process.py
ENTRYPOINT [ "python3", "/ml/process.py" ]

処理スクリプトの作成

今回はデモなので処理の中身は超ザックリで、

  1. train(80%)とtest(20%)に分割
  2. セパレータのセミコロンをカンマに変換
  3. 文字列カラムは一律onehot encoding

という処理を行うことにします。

処理スクリプトは基本的に外部から必要なパラメータを渡すようにします。
また、一時的ファイルは/tmpに置くことにしました。
(ちょっと冗長な書き方になっています。途中確認の都合なので、ご容赦を。。。)

import argparse
import logging
import oss2
import pandas as pd
import numpy as np

def main(argv=None):
    # 引数を解析
    parser = argparse.ArgumentParser(description='Preprocessor')
    parser.add_argument('--filename', type=str, help='Input file path on Aliyun ObjectStorage', required=True)
    parser.add_argument('--output', type=str, help='Output file dir path on Aliyun ObjectStorage', required=True)
    parser.add_argument('--accesskey', type=str, help='Aliyun AccessKey', required=True)
    parser.add_argument('--accesskeysecret', type=str, help='Aliyun AccessKeySecret', required=True)
    parser.add_argument('--endpoint', type=str, help='Aliyun OSS Endpoint', required=True)
    parser.add_argument('--bucketname', type=str, help='Aliyun OSS bucket name', required=True)
    args = parser.parse_args()

    # 表示するLogLevelの指定
    logging.getLogger().setLevel(logging.INFO)

    # OSSからデータの取得
    auth = oss2.Auth(args.accesskey, args.accesskeysecret)
    logging.info('Pulling file...')
    bucket = oss2.Bucket(auth, args.endpoint, args.bucketname)
    bucket.get_object_to_file(args.filename, '/tmp/input.csv')
    logging.info('Pulling file completed.')

    logging.info('Process file...')
    # ; -> ,
    with open('/tmp/input.csv') as f:
        contents = f.read().replace(";", ",")
        with open('/tmp/replaced.csv', 'w') as f2:
            f2.write(contents)
    df = pd.read_csv('/tmp/replaced.csv')

    # ダミー化
    df_r = pd.get_dummies(df, columns=[
            "job","marital","education","default",
            "housing","loan","contact", "month",
            "poutcome"
        ],
        drop_first=True
    )
    df_r = df_r.replace({'y': {"no": 0, "yes": 1}})

    # 分割
    np.random.seed(10)
    idx = np.random.randint(0, 10, (df_r.shape[0]))
    df_r[idx < 8].to_csv("/tmp/train.csv")
    df_r[idx >= 8].to_csv("/tmp/test.csv")
    
    # アップロード
    logging.info('Pushing file...')
    bucket.put_object_from_file(args.output + 'train.csv', '/tmp/train.csv')
    bucket.put_object_from_file(args.output + 'test.csv', '/tmp/test.csv')
    logging.info('Pushing file completed.')

if __name__== "__main__":
  main()
コンテナのビルド

TAG_NAMEは適切なものに置き換えてください。

docker build -t TAG_NAME .
実行

大文字になっているところは適切なものに置き換えてください。
また、endpointは日本リージョンのエンドポイントを指していますので、必要に応じて書き換えてください。

docker run -it TAG_NAME --filename FILEPATH_ON_OSS --output datasets/bank-marketing/ --bucketname BUCKET_NAME --endpoint oss-ap-northeast-1.aliyuncs.com --accesskey ACCESSKEY --accesskeysecret ACCESSKEYSECRET

うまくいくと以下の様に出力され、OSSにファイルが2つアップロードされますので確認しましょう。

INFO:root:Pulling file...
INFO:root:Pulling file completed.
INFO:root:Process file...
INFO:root:Pushing file...
INFO:root:Pushing file completed.

kubeflow pipelineに載せる

コンテナを公開する

コンテナイメージはkubeflowを利用するkubernetesからアクセスできないといけないので、適当なコンテナレジストリに登録する必要があります。
今回は、AlibabaCloudのContainerRegistryを利用します。

ContainerRegistryの使い方自体はここでは書きませんが、以下の様に作成したものとします。

namespace mykfp
repogitory name preprocessor-bm

まずはコンテナレジストリにログインします。

docker login --username=USERNAME registry-intl.ap-northeast-1.aliyuncs.com

パスワードはコンテナレジストリ作成時に指定したものですが、忘れてしまった場合は初期化もできます。

そしてコンテナをpushします。
TAGNAMEは上の方で指定したものです。

docker tag TAGNAME registry-intl.ap-northeast-1.aliyuncs.com/mykfp/preprocessor-bm:latest
docker push registry-intl.ap-northeast-1.aliyuncs.com/mykfp/preprocessor-bm:latest

ちなみにここの操作は公式マニュアルではbuild_image.shというスクリプトとしてまとめています。
その方が圧倒的に楽なので、そちらを使いましょう。また、latestタグ運用は何かとミスが起きやすくなるので、試験とはいえバージョンつけておくほうが何かと便利です。

#!/bin/bash -e
image_name=YOUR_IMAGE_NAME_AT_REPOGITORY
image_tag=0.0.1
full_image_name=${image_name}:${image_tag}

cd "$(dirname "$0")" 
docker build -t "${full_image_name}" .
docker push "$full_image_name"

docker inspect --format="{{index .RepoDigests 0}}" "${IMAGE_NAME}"

コンポーネントの作成

それではこのDockerを使ったcomponent.yamlを作成していきましょう。

name: My Preprocess task
description: My first work
inputs:
    - {name: filename}
    - {name: accesskey}
    - {name: accesskeysecret}
    - {name: endpoint}
    - {name: bucketname}
    - {name: output}
implementation:
    container:
        image: YOUR_IMAGE_NAME:0.0.1(VERSION)
        command: [
            python3, /ml/process.py,
            --filename, {inputValue: filename},
            --accesskey, {inputValue: accesskey},
            --accesskeysecret, {inputValue: accesskeysecret},
            --endpoint, {inputValue: endpoint},
            --bucketname, {inputValue: bucketname},
            --output, {inputValue: output}
    ]

詳しい解説は公式ドキュメントを読んでいただければわかると思いますが、ざっくり解説すると、inputsで入力値の一覧を作ります。
それをimpelemtation > container > command内にある各引数として宣言します。
{inputValue: filename}とか書いてありますが、これはそのまま引数として与えられたものを使うという宣言です。
inputPathというものもありますが、その場合はPathにあるファイル等からデータを引っ張ってくるみたいです(未検証)

さて、このファイルはkubeflowからアクセスできるところになくてはいけませんが、いいところがないので、またAlibabaCloudを使います。
ObjectStorageServiceにアップロードします。AWSであればS3、GCPであればCloud Storage等利用すればいいと思います。

f:id:marufeuillex:20200314152442p:plain

これで外部からcomponentが利用できるようになりました。

パイプラインの作成

最後にパイプラインを作成していきます。
パイプラインはまずpython(DSL)で定義し、dsl-compileでコンパイルすることで完成します。

import kfp
from kfp import components
from kfp import dsl

preprocess_op = components.load_component_from_url(
    'https://ishii-test-upload.oss-ap-northeast-1.aliyuncs.com/kfp/components/component.yaml'
)

@dsl.pipeline(
    name='Preprocess only pipeline',
    description='This pipeline preprocess the data.'
)
def my_pipeline(
    bucket_name='{{oss-bucketname}}',
    output_dir_path='{{output-dir}}',
    input_file_path='{{raw-filename}}',
    oss_endpoint='oss-ap-northeast-1.aliyuncs.com',
    access_key='{{accesskey}}',
    access_key_secret='{{accesskeysecret}}'
):
    _prepro_task = preprocess_op(
        filename=input_file_path,
        output=output_dir_path,
        bucketname=bucket_name,
        endpoint=oss_endpoint,
        accesskey=access_key,
        accesskeysecret=access_key_secret
    )
if __name__ == '__main__':
    kfp.compiler.Compiler().compile(my_pipeline, arguments={})

@dsl.pipelineをつけた関数はpipelineとして扱える関数になるので、それをコンパイルの引数に渡します。
その関数の引数に書いたものはパイプラインを登録するときにGUIから指定できるパラメータになります。

今回は、このあたりをパラメータとして扱っています。

bucket_name OSSのバケット名
output_dir_path outputを置くパス
input_file_path inputファイルの置いてあるパス
oss_endpoint OSSのエンドポイント(defaultはoss-ap-northeast-1.aliyuncs.com = tokyoリージョンを指定)
access_key アクセスキー
access_key_secret アクセスキーシークレット

そしてコンパイルします。

dsl-compile --py pipeline.py --output pipeline.tar.gz

これができたらほぼ準備は完了なのですが、今回はプライベートリポジトリの都合上、もう1つ設定が必要です。

kubectlでprivate registry利用の設定

今回構築を行うdockerとkubeflow(k8s)が動いているdockerは別物なので、kubernetes側からプライベートリポジトリを参照するための設定が必要です。
参考の4つめのブログに設定の仕方が書かれていましたので、ほぼそのまま実行しました。

kubectl create secret docker-registry regcred --docker-server=コンテナレジストリのエンドポイント --docker-username=ユーザ名 --docker-password=おパスワード --docker-email=email-address@address -n kubeflow
kubectl patch serviceaccount default -p '{\"imagePullSecrets\": [{\"name\": \"regcred\"}]}' -n kubeflow

pipelineのアップロード&実行

まずはpipelineをアップロードしましょう。
これはkubeflowのGUIにアクセスして、Uploadを押してファイルを選択するくらいです。

f:id:marufeuillex:20200314153854p:plain

f:id:marufeuillex:20200314154155p:plain

f:id:marufeuillex:20200314154306p:plain


ここではあまりにも見栄えしないので載せませんが、作成されたパイプラインをクリックすると、GUIベースでパイプラインを確認することもできます。

次にExperimentsを作って、その中にいまのpipelineを配置します。
f:id:marufeuillex:20200314154549p:plain

f:id:marufeuillex:20200314155046p:plain

f:id:marufeuillex:20200315094159p:plain

f:id:marufeuillex:20200315094215p:plain

実行すると以下のような画面になるので、Refreshを押しながら少しまちます。
f:id:marufeuillex:20200314155136p:plain

以下のような画面になれば完了です!!
f:id:marufeuillex:20200314155308p:plain

outputを指定したオブジェクトストレージにデータが格納されていることも確認してみてください。

ちなみに、実行後のrunをクリックするとこんな感じで実行結果の確認も可能です。

f:id:marufeuillex:20200314155626p:plain

まとめ

今回は自分でコンポーネントを作成して処理を流すところまでを書いてみました。
あまり意味のないコンポーネントですが、概ね作成のフローはわかったように思います。

次はたぶん新しいことはあまりない気はしますが、学習&予測をパイプラインに組み込んだものを作ってみようと思います。

今回のコードはこちらにおいています。
github.com

おまけ

今回はVagrantつまり、Virtualbox上の仮想マシンに環境がありました。
1回で終わってしまえばあまり関係ないですが、何回にも分けて、つまり途中vagrant suspendしていたりすると、深刻なほど時間がずれていきます。
その時は、vagrant sshで仮想マシンにログインした上で、以下のコマンドを実行すれば時刻をサクッと合わせられます。

例えば2020/3/15の9時10分に合わせたい場合であれば、

sudo date --set='2020/03/15 09:10:00'

とします。

参考にしたもの

公式のXGBoostトレーニング用のコード
github.com

XGBoostのデモのDockerfile
github.com

pipelineの書き方
github.com

kubectlを使ってプライベートリポジトリを設定する方法
medium.com