머신러닝 (ML)/MLOps

[Kubeflow] Pipeline 작성 가이드

Kubeflow는 머신러닝 워크플로우를 자동화하고 확장할 수 있도록 도와주는 오픈소스 플랫폼입니다. 본 글에서는 Kubeflow Pipeline을 작성하는 기본 가이드를 예시 코드와 함께 제공하며, pipeline 작성 시 제가 경험했던 문제와 이를 해결한 방법도 함께 다루고자 합니다.

 

* 본 글은 kubeflow 1.8 버전을 기준으로 작성되었습니다.

 


1. 개요

Kubeflow Pipeline은 머신러닝 워크플로우를 구성하기 위한 도구로, 각 컴포넌트개별적으로, 컨테이너로 실행하여 재사용할 수 있는이프라인을 구축할 수 있도록 합니다. 각 컴포넌트는 독립적으로 실행되며, 데이터 파이프라인을 통해 다른 컴포넌트로 데이터를 전달할 수 있습니다. Pipeline을 작성할 때 각 컴포넌트는 컨테이너로 구성되며, 필요한 리소스를 설정하고 다른 컴포넌트와의 연결 방식을 정의하게 됩니다.

2. 예제 코드

아래는 간단한 Pipeline 예제 코드입니다. 

 

import argparse
import sys
from pathlib import Path

import kfp
from kfp import dsl
from kfp.compiler import Compiler

from utils.helper import load_yaml
from utils.kfp_alert import notify_fail_slack
from utils.k8s import get_env


KFP_HOST_ADDRESS = "{your_url}"


def upload_pipeline(file_path, pipeline_name=None):
    if not pipeline_name:
        pipeline_name = Path(file_path).stem

    client = kfp.Client(host=KFP_HOST_ADDRESS)
    pipeline_id = client.get_pipeline_id(pipeline_name)

    client.upload_pipeline(
        pipeline_name=pipeline_name, 
        pipeline_package_path=file_path
    )


def op_first_pipeline(current_time, scheduled_time):
    return dsl.ContainerOp(
        name="first_pipeline",
        image=image,
        command=["python", "first_pipeline.py"],
        arguments=[
            "--pipeline-name", args.pipeline_name,
            "--current-time", current_time,
            "--scheduled-time", scheduled_time,
        ]
    ).set_image_pull_policy("Always") \
        .set_cpu_request("2").set_cpu_limit("4") \
        .set_memory_request("8Gi").set_memory_limit("16Gi")


def op_second_pipeline(current_time, scheduled_time):
    return dsl.ContainerOp(
        name="second_pipeline",
        image=image,
        command=["python", "second_pipeline.py"],
        arguments=[
            "--pipeline-name", args.pipeline_name,
            "--current-time", current_time,
            "--scheduled-time", scheduled_time,
        ]
    ).set_image_pull_policy("Always") \
        .set_cpu_request("2").set_cpu_limit("4") \
        .set_memory_request("8Gi").set_memory_limit("16Gi")


@dsl.pipeline(name="sample-pipeline")
def sample_pipeline(
        CURRENT_TIME='[[CurrentTime]]',
        SCHEDULED_TIME='[[ScheduledTime]]',
        ENVIRONMENTS: str = '["DEV", "STG", "PERF"]'
    ):

    with dsl.ExitHandler(notify_fail_slack(args.env)):
        first_pipeline = op_first_pipeline(
            current_time=CURRENT_TIME,
            scheduled_time=SCHEDULED_TIME,
        ).add_env_variable(get_env("ENVS", target_envs))\
            .set_display_name("first-pipeline")

        second_pipeline = op_second_pipeline(
            current_time=CURRENT_TIME,
            scheduled_time=SCHEDULED_TIME,
        ).add_env_variable(get_env("ENVS", target_envs))\
            .set_display_name("second-pipeline").after(first_pipeline)


if __name__ == "__main__":
    # Set parser
    parser = argparse.ArgumentParser(description='Create Kubeflow Pipeline')
    parser.add_argument("--env", type=str, help='', required=True)
    parser.add_argument("--pipeline-name", type=str, help='', required=True)
    parser.add_argument("--image", type=str, help='', required=True)
    args = parser.parse_args()

    # Define global vars
    image = args.image
    conf = load_yaml("config.yaml")

    # Compile This python code to kubeflow pipeline
    package_path = f"{args.pipeline_name}.yaml"

    Compiler().compile(sample_pipeline, package_path)
    upload_pipeline(package_path)

컴포넌트 간의 연결 방식 예시를 보여드리고자 2개의 컴포넌트로 정의했습니다. 기본적으로 예제에서는 ContainerOp를 사용하여 컨테이너 컴포넌트를 정의하고, 필요한 환경 변수와 자원을 설정합니다.

 

3. Pipeline 작성 가이드

Kubeflow Pipeline을 작성하는 기본 순서는 개별 작업을 정의하는 컴포넌트 생성, 이를 모아 하나의 파이프라인으로 연결하는 파이프라인 정의, 마지막으로 파이프라인을 클러스터에서 실행하기 위한 파이프라인 빌드 및 실행으로 이루어집니다. 단계별로 자세히 살펴보겠습니다.

1) 컴포넌트 생성

컴포넌트는 파이프라인에서 수행될 개별 작업을 정의합니다. 일반적으로 각 컴포넌트는 컨테이너로 패키징된 Python 함수로 정의되며, dsl.ContainerOp 객체를 통해 파이프라인 내에 작업으로 등록됩니다.

컴포넌트 작성 예시

def op_first_pipeline(current_time, scheduled_time):
    return dsl.ContainerOp(
        name="first_pipeline",
        image=image,
        command=["python", "first_pipeline.py"],
        arguments=[
            "--pipeline-name", args.pipeline_name,
            "--current-time", current_time,
            "--scheduled-time", scheduled_time,
        ]
    ).set_image_pull_policy("Always") \
        .set_cpu_request("2").set_cpu_limit("4") \
        .set_memory_request("8Gi").set_memory_limit("16Gi")
  • name: 컴포넌트의 이름을 설정합니다.
  • image: 실행할 컨테이너 이미지 경로를 지정합니다.
  • command와 arguments: 컨테이너에서 실행할 명령어와 전달할 인자를 설정합니다. 예시에서는 first_pipeline.py라는 Python 스크립트를 실행하며, current_time 과 scheduled_time 파라미터와 파이프라인 이름을 인자로 넘깁니다.
  • 리소스 설정: set_cpu_request와 set_memory_request 메서드를 통해 컴포넌트가 사용하는 리소스를 제한합니다. 

이와 같은 방식으로, 각 작업에서 요구사항에 맞는 컴포넌트를 정의합니다.

2) 파이프라인 정의

정의된 컴포넌트를 연결하여 하나의 파이프라인으로 구성하는 단계입니다. Kubeflow Pipeline에서는 @dsl.pipeline 데코레이터를 사용하여 전체 워크플로우를 정의할 수 있으며, 여러 컴포넌트를 순차적으로 또는 병렬로 실행할 수 있습니다. 이번 예시는 순차적으로 연결하는 방식을 알아봅니다.

파이프라인 정의 및 컴포넌트 연결 예시

@dsl.pipeline(name="sample-pipeline")
def sample_pipeline(
        CURRENT_TIME='[[CurrentTime]]',
        SCHEDULED_TIME='[[ScheduledTime]]',
        ENVIRONMENTS: str = '["DEV", "STG", "PERF"]'
    ):
    
    with dsl.ExitHandler(notify_fail_slack(args.env)):
        first_pipeline = op_first_pipeline(
            current_time=CURRENT_TIME,
            scheduled_time=SCHEDULED_TIME,
        ).add_env_variable(get_env("ENVS", target_envs))\
            .set_display_name("first-pipeline")

        second_pipeline = op_second_pipeline(
            current_time=CURRENT_TIME,
            scheduled_time=SCHEDULED_TIME,
        ).add_env_variable(get_env("ENVS", target_envs))\
            .set_display_name("second-pipeline").after(first_pipeline)
 
  • 환경 변수 설정: add_env_variable을 통해 컴포넌트 내에서 사용할 환경 변수를 설정합니다.
  • 컴포넌트 간 연결: after 메서드를 통해 first_pipeline이 완료된 후 second_pipeline이 실행되도록 합니다. 이 방식으로 순차적인 워크플로우를 관리할 수 있습니다.
  • ExitHandler: 파이프라인 종료 시 특정 알림을 보내도록 설정해, 실패 시 슬랙 알림을 받을 수도 있습니다.
*Kubeflow에서 제공하는 유용한 동적 변수들 참고
- CurrentTime : 파이프라인이 실행되는 시점을 기준으로 현재 시간
- ScheduledTime : 파이프라인의 예약된 실행 시간

 

3) 파이프라인 빌드 및 실행

파이프라인을 작성한 후, 이를 클러스터에서 실행하기 위해 YAML 파일로 빌드하고, 필요한 설정과 함께 클러스터에 업로드하는 단계입니다.

Python 코드를 YAML로 컴파일하고 클러스터에 업로드하는 예시

아래는 명령어 인자를 통해 파이프라인 이름, 환경, 이미지를 설정하고, Compiler()와 upload_pipeline()을 통해 파이프라인을 컴파일하고 업로드하는 코드입니다.

if __name__ == "__main__":
    # Set parser
    parser = argparse.ArgumentParser(description='Create Kubeflow Pipeline')
    parser.add_argument("--env", type=str, help='', required=True)
    parser.add_argument("--pipeline-name", type=str, help='', required=True)
    parser.add_argument("--image", type=str, help='', required=True)
    args = parser.parse_args()

    # Define global vars
    image = args.image
    conf = load_yaml("config.yaml")

    # Compile This python code to kubeflow pipeline
    package_path = f"{args.pipeline_name}.yaml"

    Compiler().compile(sample_pipeline, package_path)
    upload_pipeline(package_path)
  • Compiler: update_autocompletion_keyword 파이프라인을 pipeline.yaml 파일로 컴파일합니다.
  • upload_pipeline: 컴파일된 YAML 파일을 클러스터에 업로드하여 파이프라인을 실행 가능한 상태로 만듭니다. 업로드된 파이프라인은 Kubeflow 대시보드에서 관리할 수 있습니다.

4. pipeline 작성 시 겪었던 문제들과 해결 방법

Pipeline 작성 시 제가 직접 경험했던 문제와 그 해결 방법을 공유합니다. 미리 참고하시고, 이 글을 읽는 분들은 겪지 않기를 바랍니다..!

1) 컨테이너 이미지 생성 오류

Kubeflow Pipeline은 컴포넌트를 개별 컨테이너로 실행하기 때문에, 필요한 패키지와 라이브러리가 포함된 Docker 이미지를 올바르게 생성하는 것이 중요합니다. 이미지가 올바르게 생성되지 않으면 파이프라인 실행 시 오류가 발생할 수 있습니다.

  • 문제: Docker 이미지 생성 시 필요한 라이브러리가 누락되거나 이미지 빌드가 실패하는 경우가 있습니다. 라이브러리나 패키지를 추가로 설치해야 하거나, Dockerfile 설정이 올바르지 않을 수 있습니다.
  • 해결 방법: requirements.txt를 사용해 필요한 Python 라이브러리를 명시하고, 이를 Dockerfile 내에서 설치하도록 설정합니다. 특히 패키지 설치 순서도 중요하게 고려해주세요.
COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt
  • Tip: 빌드와 배포 과정이 빠르게 진행되도록 이미지 크기를 최소화하는 것도 중요합니다. 컴파일시 필요한 패키지의 경우 도커 파일 내에서 정의하는 게 아니라 깃헙 액션 등 pipeline을 업로드하는 환경에서 커맨드를 통해 설치해야할 수도 있습니다.

2) 컴포넌트 간 데이터 전송

컴포넌트는 독립적인 컨테이너로 실행되므로, 한 컴포넌트의 출력을 다른 컴포넌트의 입력으로 전달할 때 적절한 방법을 사용해야 합니다. 가장 간단한 방법으로는 Kubeflow가 제공하는 PipelineParam을 통해 전달할 수 있습니다.

  • 문제: 파이프라인에서 컴포넌트 간 데이터를 전달해야 합니다.
  • 해결 방법: PipelineParam 객체를 사용합니다. 예를 들어, 첫 번째 컴포넌트의 출력이 문자열이라면, 이를 다음 컴포넌트의 인자로 지정하여 전달할 수 있습니다.
  • Tip: 간단한 문자열이 아닌 경우, 출력 데이터를 각 컴포넌트의 output_path 옵션을 사용해 외부 저장소(GCS나 S3 같은 저장소)에 저장하면 데이터 관리가 편리해집니다. 이 때는 output_path로 지정한 경로를 다른 컴포넌트의 인자로 사용합니다.

3) PipelineParam 관련 문제

Kubeflow에서 파이프라인 정의 시 특정 인자에 대해 PipelineParam 클래스로 인식하여 컴파일합니다. PipelineParam은 파이프라인 실행 시에만 값이 할당되므로, 파이프라인 정의 시점에는 직접적인 타입 조작이나 연산이 제한됩니다.

  • 문제: 컴파일 시 PipelineParam은 파이프라인 실행 시점에서만 값이 설정되는 특수한 변수로 취급되므로, 예를 들어 PipelineParam을 문자열, 정수, 리스트 등으로 변환해 사용하려고 하면 컴파일 시점에 오류가 발생할 수 있습니다.
  • 해결 방법: PipelineParam을 바로 조작할 수 없으므로, 컴포넌트 내에서 PipelineParam 값을 필요한 형태로 변환하도록 코드를 작성합니다. 컴포넌트의 command나 arguments에서 PipelineParam을 받아 실제 동작할 Python 스크립트 내부에서 타입 변환 및 처리를 수행하게 해야 합니다.
  • Tip: JSON 문자열로 인코딩한 후 문자열 그대로 PipelineParam으로 전달하는 방법을 사용할 수 있습니다. 그 후 컴포넌트 내부에서 JSON 디코딩을 통해 파라미터를 해석하는 방식이 예시 코드에서 사용한 방식입니다.

4) 자원 제한 및 메모리 초과 오류

기본적으로 클라우드 환경에서 pipeline이 실행되므로, 각 컴포넌트에 할당할 리소스(cpu, 메모리 등)을 설정해야 합니다. 적절한 리소스를 설정하지 않으면 작업이 실패하거나 실행 시간이 오래 걸릴 수 있습니다.

  • 문제: 리소스가 과도하게 할당되어 비용이 높아지거나, 자원 부족으로 인해 작업이 실패하는 문제가 발생할 수 있습니다.
  • 해결 방법: set_cpu_request, set_cpu_limit, set_memory_request, set_memory_limit 메서드를 사용해 각 컴포넌트의 리소스 요구량을 적절히 제한합니다. 개발 초기에는 충분한 리소스를 할당한 후에 모니터링을 통해 점차 최적화하는 방식으로 설정할 수 있습니다.

 


 

본 글에서는 Kubeflow Pipeline의 기초 개념과 작성 방법을 소개했습니다.

 

Kubeflow Pipeline은 머신러닝 워크플로우를 관리하기 위해 각 작업을 독립적인 컨테이너로 연결하여 구성합니다. 각 컴포넌트를 정의하고 파이프라인으로 연결하여 의존성 설정, 자원 할당과 환경 설정을 통해 실행할 수 있습니다. 작성 시 이미지 빌드 오류나 데이터 전달 문제, PipelineParam 제한 사항을 미리 확인하는 것이 발생할 수 있는 다양한 문제를 예방할 수 있게 해줍니다.

 

다음 글로는 kubeflow 워크플로우 개발 작업 중에 있었던 트러블 슈팅이나 이 외에 좀 더 살펴볼만한 설정들 소개 내용으로 찾아오겠습니다. 감사합니다.