アルゴリズムとかオーダーとか

仕事で勉強したことなどをまとめてます

KubeflowのNotebookからPipelineを実行

この記事では、KubeflowのパイプラインをNotebookから実行する方法を説明する。Kubeflowがローカル環境に構築されている場合、ServiceAccountの設定から実際にパイプラインを実行する手順を記述する。
以下の公式のドキュメントが実行できなかったので、ローカルのkubeflow notebookで実行できる手順をまとめた。
www.kubeflow.org

どうやらデフォルトで提供されるServiceAccountには、Kubeflow Pipelineを操作するための十分な権限がない。
このため、必要な権限を付与した新しいServiceAccountを作成し、Notebook上からパイプラインを実行する方法をまとめた。
また、作成したServiceAccountを適切に設定し、ローカルのnamespaceと合わせることで、Kubeflowダッシュボードからも実行状況を確認できるようにする。
これにより、notebookで生成したpipelineのyamlをいちいちダウンロードしてからダッシュボードにアップロードして実行みたいなめんどくさい手順を経ずに、直接notebookからpipelineを登録・実行できるようになる。

前提条件

  • Kubeflowがローカル環境にセットアップされていること。
  • kubectlやkfpライブラリが適切にインストールされていること。
  • Notebook環境で実行することを前提とする。

Kubeflowの構築は前回の記事を参照。
y-nakajo.hatenablog.com


1. ServiceAccountの作成

最初に、KubeflowでPipelineを実行するためのServiceAccountを作成。これにより、必要な権限が与えられたサービスアカウントが作成され、パイプラインを実行できるようになる。以下はそのYAML定義。このYAMLを例えばaccount-service.yamlとして保存する。

apiVersion: v1
kind: ServiceAccount
metadata:
  name: pipeline-user
  namespace: kubeflow
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: kubeflow-user-example-com
  name: ml-pipeline-full-access
rules:
  - apiGroups: ["pipelines.kubeflow.org"]
    resources:
      - pipelines
      - experiments
      - jobs
      - runs
    verbs: ["get", "list", "watch", "create", "delete", "update", "patch"]
  - apiGroups: [""] # Core API group
    resources:
      - secrets
    verbs: ["get", "list", "create", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: pipeline-user-binding
subjects:
  - kind: ServiceAccount
    name: pipeline-user
    namespace: kubeflow
roleRef:
  kind: ClusterRole
  name: ml-pipeline-full-access
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: v1
kind: Secret
metadata:
  name: pipeline-user-token
  namespace: kubeflow
  annotations:
    kubernetes.io/service-account.name: pipeline-user
type: kubernetes.io/service-account-token

このYAMLファイルを以下のコマンドで適用することで、ServiceAccountの作成とその権限設定を反映。

kubectl apply -f service_account.yaml

2. Tokenの発行

次に、先ほど作成したServiceAccountにKFPからアクセスするためのTokenを発行する。ここで注意点としてTokenは`pipelines.kubeflow.org` のaudienceで有効なものを発行する必要がある。具体的には以下のコマンドを実行する。実行結果にTokenが発行されるでメモしておく。

このコマンドはUbuntu上から実行。特にkubeflowの特定のPodsにログインする必要はない。

 kubectl create token pipeline-user -n kubeflow --audience=pipelines.kubeflow.org

Tokenは以下のコマンドでいつでも確認可能。

 kubectl create token pipeline-user -n kubeflow --audience=pipelines.kubeflow.org

ここで発行しているTokenは一時的なのでおそらく再起動とかすると無効になるはず。その場合は再度発行すると良い。

3. NotebookでKubeflowパイプラインを実行

次に、Notebook環境からKubeflowパイプラインを実行する。

pipelineの作成

まずはGetting Startedを参考にして以下の簡単なpipelineを作成。

from kfp import dsl

@dsl.component
def say_hello(name: str) -> str:
    hello_text = f'Hello, {name}!'
    print(hello_text)
    return hello_text

@dsl.pipeline
def hello_pipeline(recipient: str) -> str:
    hello_task = say_hello(name=recipient)
    return hello_task.output

YAMLファイルにコンパイルしておく。

from kfp import compiler

compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')

Kubeflowのエンドポイントとトークンの設定

先ほど発行したTokenを利用してNotebookからpipelineを作成・実行する方法を説明する。
Kubeflow Pipelinesのエンドポイントと認証トークンを設定する。ローカルで構築されたKubeflow環境では、エンドポイントは「http://ml-pipeline.kubeflow.svc.cluster.local:8888」になる。

kubernetesに慣れてる人はすぐに分かるのかもだが、自分はこのエンドポイントが分からず結構調べた。
もしかしたらエンドポイントの指定は不要かもしれないが、自分の環境では必要だった。

import kfp

KFP_ENDPOINT = "http://ml-pipeline.kubeflow.svc.cluster.local:8888"
TOKEN = "your-service-account-token-here"
client = kfp.Client(host=KFP_ENDPOINT, existing_token=TOKEN)

Experiment, Pipelineの取得

Kubeflowでは、パイプラインはExperiment内で実行される。まず、Experimentをリストアップし、特定のExperimentを取得する。なお、Experimentを指定しない場合は「Default」Experimentが生成され実行される。が、ここではあとから実行結果を整理できるように特定のExperimentを指定してpipelineを実行することにする。ここでは「test」というExperimentでpipelineを実行している。

なお、「test」Experimentはダッシュボード上であらかじめ作成しておく。

# Experimentの取得
experiments = client.list_experiments(namespace="kubeflow-user-example-com")
experiment = [experiment for experiment in experiments.experiments if experiment.display_name == "test"]

# pipelineを取得もしくは作成

pipelines = client.list_pipelines(namespace="kubeflow-user-example-com")
pipeline = [pipeline for pipeline in pipelines.pipelines if pipeline.display_name == "hello-pipeline"]

if len(pipeline) == 0:
    print("not found. create pipeline")
    pipeline_path = "./pipeline.yaml"
    pipeline = client.upload_pipeline(pipeline_path, namespace="kubeflow-user-example-com")
    pipeline = [pipeline] # 後述のコードのためにリストにしておく

# version_idの取得
version_id = client.list_pipeline_versions(pipeline_id=pipeline[0].pipeline_id).pipeline_versions[0].pipeline_version_id

パイプラインの実行

最後に、pipelineを実行する。

run = client.run_pipeline(
    experiment_id=experiment[0].experiment_id,
    pipeline_id=pipeline[0].pipeline_id,
    version_id=version_id,
    job_name="simple-pipeline-run",
    params={
        'recipient': 'World',
    }
)

Experiment設定せず直接pipelineを実行する方法

以下のコードでは直接YAMLファイルを指定してpipelineを実行可能。

run = client.create_run_from_pipeline_package(
    'pipeline.yaml',
    arguments={
        'recipient': 'World',
    },
)