はじめに
前回に引き続き、Kubeflow pipelineを使って機械学習パイプラインを組んで見る第2段です。
今回は、pipeline処理を行うコンテナと処理を1つ作ってみて、雰囲気に触れてみようと思います。
前回の記事はこちら
t.marufeuille.dev
やってみる
対象とするデータ
今回は学習の前にデータの前処理を行うコンテナを作成することにしてみます。
対象データは個人的に慣れているこれにします。
archive.ics.uci.edu
インフラ的な構成
Kubeflowは前回作ったものを使いますが、ファイルの置き場は外部でアクセス可能なところにする必要があります。
ローカルになにか用意しても良かったのですが、仕事の兼ね合いもありAlibabaCloudがサクッと触れる状態だったので、AlibabaCloudのObjectStorageService上にinput / outputファイルを置くこととします。
(というよりも、ローカルのメモリをほぼkubeflowが食いつぶしていて、ローカルにおけなかった・・・)
絵で書くとこんな感じです。
Dockerfileの作成
今回はpythonさえ使えればよいので、公式のpythonイメージを利用します。
あとは追加で必要なoss2(AlibabaCloud OSSのSDK)を追加し、後段で作るprocess.pyをコピーしておくようにしておきます。
FROM python:latest RUN apt -y update && apt -y upgrade && apt -y install python-dev && mkdir /ml RUN pip3 install oss2 pandas numpy COPY ./src/process.py /ml/process.py ENTRYPOINT [ "python3", "/ml/process.py" ]
処理スクリプトの作成
今回はデモなので処理の中身は超ザックリで、
- train(80%)とtest(20%)に分割
- セパレータのセミコロンをカンマに変換
- 文字列カラムは一律onehot encoding
という処理を行うことにします。
処理スクリプトは基本的に外部から必要なパラメータを渡すようにします。
また、一時的ファイルは/tmpに置くことにしました。
(ちょっと冗長な書き方になっています。途中確認の都合なので、ご容赦を。。。)
import argparse import logging import oss2 import pandas as pd import numpy as np def main(argv=None): # 引数を解析 parser = argparse.ArgumentParser(description='Preprocessor') parser.add_argument('--filename', type=str, help='Input file path on Aliyun ObjectStorage', required=True) parser.add_argument('--output', type=str, help='Output file dir path on Aliyun ObjectStorage', required=True) parser.add_argument('--accesskey', type=str, help='Aliyun AccessKey', required=True) parser.add_argument('--accesskeysecret', type=str, help='Aliyun AccessKeySecret', required=True) parser.add_argument('--endpoint', type=str, help='Aliyun OSS Endpoint', required=True) parser.add_argument('--bucketname', type=str, help='Aliyun OSS bucket name', required=True) args = parser.parse_args() # 表示するLogLevelの指定 logging.getLogger().setLevel(logging.INFO) # OSSからデータの取得 auth = oss2.Auth(args.accesskey, args.accesskeysecret) logging.info('Pulling file...') bucket = oss2.Bucket(auth, args.endpoint, args.bucketname) bucket.get_object_to_file(args.filename, '/tmp/input.csv') logging.info('Pulling file completed.') logging.info('Process file...') # ; -> , with open('/tmp/input.csv') as f: contents = f.read().replace(";", ",") with open('/tmp/replaced.csv', 'w') as f2: f2.write(contents) df = pd.read_csv('/tmp/replaced.csv') # ダミー化 df_r = pd.get_dummies(df, columns=[ "job","marital","education","default", "housing","loan","contact", "month", "poutcome" ], drop_first=True ) df_r = df_r.replace({'y': {"no": 0, "yes": 1}}) # 分割 np.random.seed(10) idx = np.random.randint(0, 10, (df_r.shape[0])) df_r[idx < 8].to_csv("/tmp/train.csv") df_r[idx >= 8].to_csv("/tmp/test.csv") # アップロード logging.info('Pushing file...') bucket.put_object_from_file(args.output + 'train.csv', '/tmp/train.csv') bucket.put_object_from_file(args.output + 'test.csv', '/tmp/test.csv') logging.info('Pushing file completed.') if __name__== "__main__": main()
コンテナのビルド
TAG_NAMEは適切なものに置き換えてください。
docker build -t TAG_NAME .
実行
大文字になっているところは適切なものに置き換えてください。
また、endpointは日本リージョンのエンドポイントを指していますので、必要に応じて書き換えてください。
docker run -it TAG_NAME --filename FILEPATH_ON_OSS --output datasets/bank-marketing/ --bucketname BUCKET_NAME --endpoint oss-ap-northeast-1.aliyuncs.com --accesskey ACCESSKEY --accesskeysecret ACCESSKEYSECRET
うまくいくと以下の様に出力され、OSSにファイルが2つアップロードされますので確認しましょう。
INFO:root:Pulling file... INFO:root:Pulling file completed. INFO:root:Process file... INFO:root:Pushing file... INFO:root:Pushing file completed.
kubeflow pipelineに載せる
コンテナを公開する
コンテナイメージはkubeflowを利用するkubernetesからアクセスできないといけないので、適当なコンテナレジストリに登録する必要があります。
今回は、AlibabaCloudのContainerRegistryを利用します。
ContainerRegistryの使い方自体はここでは書きませんが、以下の様に作成したものとします。
namespace | mykfp |
repogitory name | preprocessor-bm |
まずはコンテナレジストリにログインします。
docker login --username=USERNAME registry-intl.ap-northeast-1.aliyuncs.com
パスワードはコンテナレジストリ作成時に指定したものですが、忘れてしまった場合は初期化もできます。
そしてコンテナをpushします。
TAGNAMEは上の方で指定したものです。
docker tag TAGNAME registry-intl.ap-northeast-1.aliyuncs.com/mykfp/preprocessor-bm:latest docker push registry-intl.ap-northeast-1.aliyuncs.com/mykfp/preprocessor-bm:latest
ちなみにここの操作は公式マニュアルではbuild_image.shというスクリプトとしてまとめています。
その方が圧倒的に楽なので、そちらを使いましょう。また、latestタグ運用は何かとミスが起きやすくなるので、試験とはいえバージョンつけておくほうが何かと便利です。
#!/bin/bash -e image_name=YOUR_IMAGE_NAME_AT_REPOGITORY image_tag=0.0.1 full_image_name=${image_name}:${image_tag} cd "$(dirname "$0")" docker build -t "${full_image_name}" . docker push "$full_image_name" docker inspect --format="{{index .RepoDigests 0}}" "${IMAGE_NAME}"
コンポーネントの作成
それではこのDockerを使ったcomponent.yamlを作成していきましょう。
name: My Preprocess task description: My first work inputs: - {name: filename} - {name: accesskey} - {name: accesskeysecret} - {name: endpoint} - {name: bucketname} - {name: output} implementation: container: image: YOUR_IMAGE_NAME:0.0.1(VERSION) command: [ python3, /ml/process.py, --filename, {inputValue: filename}, --accesskey, {inputValue: accesskey}, --accesskeysecret, {inputValue: accesskeysecret}, --endpoint, {inputValue: endpoint}, --bucketname, {inputValue: bucketname}, --output, {inputValue: output} ]
詳しい解説は公式ドキュメントを読んでいただければわかると思いますが、ざっくり解説すると、inputsで入力値の一覧を作ります。
それをimpelemtation > container > command内にある各引数として宣言します。
{inputValue: filename}とか書いてありますが、これはそのまま引数として与えられたものを使うという宣言です。
inputPathというものもありますが、その場合はPathにあるファイル等からデータを引っ張ってくるみたいです(未検証)
さて、このファイルはkubeflowからアクセスできるところになくてはいけませんが、いいところがないので、またAlibabaCloudを使います。
ObjectStorageServiceにアップロードします。AWSであればS3、GCPであればCloud Storage等利用すればいいと思います。
これで外部からcomponentが利用できるようになりました。
パイプラインの作成
最後にパイプラインを作成していきます。
パイプラインはまずpython(DSL)で定義し、dsl-compileでコンパイルすることで完成します。
import kfp from kfp import components from kfp import dsl preprocess_op = components.load_component_from_url( 'https://ishii-test-upload.oss-ap-northeast-1.aliyuncs.com/kfp/components/component.yaml' ) @dsl.pipeline( name='Preprocess only pipeline', description='This pipeline preprocess the data.' ) def my_pipeline( bucket_name='{{oss-bucketname}}', output_dir_path='{{output-dir}}', input_file_path='{{raw-filename}}', oss_endpoint='oss-ap-northeast-1.aliyuncs.com', access_key='{{accesskey}}', access_key_secret='{{accesskeysecret}}' ): _prepro_task = preprocess_op( filename=input_file_path, output=output_dir_path, bucketname=bucket_name, endpoint=oss_endpoint, accesskey=access_key, accesskeysecret=access_key_secret ) if __name__ == '__main__': kfp.compiler.Compiler().compile(my_pipeline, arguments={})
@dsl.pipelineをつけた関数はpipelineとして扱える関数になるので、それをコンパイルの引数に渡します。
その関数の引数に書いたものはパイプラインを登録するときにGUIから指定できるパラメータになります。
今回は、このあたりをパラメータとして扱っています。
bucket_name | OSSのバケット名 |
output_dir_path | outputを置くパス |
input_file_path | inputファイルの置いてあるパス |
oss_endpoint | OSSのエンドポイント(defaultはoss-ap-northeast-1.aliyuncs.com = tokyoリージョンを指定) |
access_key | アクセスキー |
access_key_secret | アクセスキーシークレット |
そしてコンパイルします。
dsl-compile --py pipeline.py --output pipeline.tar.gz
これができたらほぼ準備は完了なのですが、今回はプライベートリポジトリの都合上、もう1つ設定が必要です。
kubectlでprivate registry利用の設定
今回構築を行うdockerとkubeflow(k8s)が動いているdockerは別物なので、kubernetes側からプライベートリポジトリを参照するための設定が必要です。
参考の4つめのブログに設定の仕方が書かれていましたので、ほぼそのまま実行しました。
kubectl create secret docker-registry regcred --docker-server=コンテナレジストリのエンドポイント --docker-username=ユーザ名 --docker-password=おパスワード --docker-email=email-address@address -n kubeflow kubectl patch serviceaccount default -p '{\"imagePullSecrets\": [{\"name\": \"regcred\"}]}' -n kubeflow
pipelineのアップロード&実行
まずはpipelineをアップロードしましょう。
これはkubeflowのGUIにアクセスして、Uploadを押してファイルを選択するくらいです。
ここではあまりにも見栄えしないので載せませんが、作成されたパイプラインをクリックすると、GUIベースでパイプラインを確認することもできます。
次にExperimentsを作って、その中にいまのpipelineを配置します。
実行すると以下のような画面になるので、Refreshを押しながら少しまちます。
以下のような画面になれば完了です!!
outputを指定したオブジェクトストレージにデータが格納されていることも確認してみてください。
ちなみに、実行後のrunをクリックするとこんな感じで実行結果の確認も可能です。
まとめ
今回は自分でコンポーネントを作成して処理を流すところまでを書いてみました。
あまり意味のないコンポーネントですが、概ね作成のフローはわかったように思います。
次はたぶん新しいことはあまりない気はしますが、学習&予測をパイプラインに組み込んだものを作ってみようと思います。
今回のコードはこちらにおいています。
github.com
おまけ
今回はVagrantつまり、Virtualbox上の仮想マシンに環境がありました。
1回で終わってしまえばあまり関係ないですが、何回にも分けて、つまり途中vagrant suspendしていたりすると、深刻なほど時間がずれていきます。
その時は、vagrant sshで仮想マシンにログインした上で、以下のコマンドを実行すれば時刻をサクッと合わせられます。
例えば2020/3/15の9時10分に合わせたい場合であれば、
sudo date --set='2020/03/15 09:10:00'
とします。
参考にしたもの
公式のXGBoostトレーニング用のコード
github.com
XGBoostのデモのDockerfile
github.com
pipelineの書き方
github.com
kubectlを使ってプライベートリポジトリを設定する方法
medium.com