ML & AI/Data Engineering

[Airflow] dag 스케쥴 관련 date 정의 알아보기 (start_date, execution_date, schedule_interval)

채얼음 2023. 12. 10. 23:57

개요

Apache Airflow는 데이터 파이프라인 관리와 작업 스케줄링을 위한 오픈 소스 플랫폼으로, 데이터 엔지니어링 및 데이터 과학 작업에서 널리 사용됩니다. 저도 올해 처음으로 해당 플랫폼을 사용하여 데이터 ETL 작업을 처리했습니다. 작업 중에 가장 헷갈렸던 여러 기준의 date 들을 한번 자세히 정리해보겠습니다.

 

태스크 스케줄링의 중요성


Apache Airflow는 작업 스케줄링 및 자동화를 위한 강력한 도구입니다. 사용자는 작업의 실행 시간, 간격 및 의존성을 정의하고, Airflow는 해당 작업을 예약된 시간에 실행하거나 재시도할 수 있습니다.

Airflow에서 작업은 Directed Acyclic Graph (DAG)로 구성됩니다. DAG는 여러 작업(Task)으로 구성된 워크플로우를 나타냅니다. 각 작업은 DAG 내에서 의존성을 가질 수 있으며, 작업 간의 실행 순서를 정의합니다.
이때, 다양한 스케줄링 옵션을 제공하며, 실패한 작업에 대한 자동 재시도 및 알림 메커니즘을 지원합니다. 이를 통해 안정적인 데이터 파이프라인을 유지할 수 있습니다.

데이터 파이프라인을 구축할 때, 각 태스크(task)의 실행 시점을 관리하는 것은 매우 중요합니다. 실행 시점을 적절하게 설정하여 DAG 내의 각 태스크 간의 의존성을 관리할 수 있습니다. 의존성은 어떤 태스크가 다른 태스크 이전에 실행되어야 함을 나타내며, 스케줄링은 이러한 의존성을 준수하여 작업의 순서를 관리합니다.

또한, 데이터 파이프라인 작업은 정해진 시간 내에 실행되어야 하는 경우가 많습니다. 태스크 스케줄링은 이러한 시간 제한을 준수하며 데이터 파이프라인이 제 시간에 실행되도록 보장합니다. 제 시간에 실행되지 않은 경우, 또는 장애 발생 시 자동으로 재시도하고 알림을 보내는 기능도 관리할 수 있습니다. 이것은 파이프라인이 예기치 않은 문제에 대응하고 안정성을 유지하는 데 중요합니다.

 

정의

그럼 이제 각 태스크의 실행 시점을 정확하게 설정하여 데이터 파이프라인을 효과적으로 운영하기 위해 사용되는 핵심 요소인 start_date와 execution_date를 알아보겠습니다.

start_date는 DAG의 시작 날짜를 나타내며, DAG가 언제부터 실행될 수 있는지를 결정하는데 사용됩니다.

execution_date는 각 태스크의 실행 시간을 나타내며, 각 태스크가 언제 실행되어야 하는지 정의합니다. 또한, execution_date는 각 태스크가 start_date를 기준으로 언제 실행되어야 하는지 계산하는 데 사용됩니다. 예를 들어, start_date가 2023년 11월 1일로 설정되고, DAG가 매일 실행되는 경우, 각 태스크의 execution_date는 날짜가 하루씩 증가하는 형태로 정의됩니다. (실제로 실행되는 시각이 아니라 스케쥴 간격의 시작을 표시한다고 생각하면 편합니다.)

마지막으로 schedule_interval은 DAG의 실행 주기를 정의하는 파라미터로, DAG가 얼마나 자주 실행되어야 하는지를 나타냅니다. schedule_interval을 설정하면 Airflow는 해당 주기에 맞게 DAG를 스케줄링합니다. 예를 들어, interval을 timedelta(days=1)로 설정하면 DAG는 매일 실행됩니다.

 

코드 예시와 함께 알아보기

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

# DAG 정의
dag = DAG(
    'example_dag',
    start_date=datetime(2023, 11, 1),
    schedule_interval=timedelta(days=1),  # 매일 실행
    catchup=False,  # 과거 실행을 잡지 않음
)

# 시작 태스크
start_task = DummyOperator(task_id='start_task', dag=dag)

# 다른 태스크들...

# 태스크 간 의존성 설정
start_task >> ...

if __name__ == "__main__":
    dag.cli()

 

위의 코드에서 start_date는 2023년 11월 1일로 설정되어 있으며, schedule_interval은 timedelta(days=1)로 설정되어 있습니다. 이는 DAG가 매일 실행됨을 의미합니다.

첫 번째 DAG 실행의 경우 execution_date는 해당 실행의 날짜 및 시간으로 설정됩니다. 예를 들어, 2023년 11월 1일의 실행은 execution_date=datetime(2023, 11, 1)이 됩니다.
이후 DAG는 매일 스케줄에 따라 실행되며, 각 실행에서 execution_date가 동적으로 변경됩니다.

 

하지만, 우리는 보통 한국 날짜를 기준으로 많이 작업을 하게 됩니다. 아래는 시간대를 고려한 예시입니다.

*Airflow는 기본적으로 UTC를 사용하므로 execution_date 역시 UTC 시간을 기준으로 설정됩니다. 이로 인해 로컬 시간대와의 차이가 발생할 수 있습니다.

# DAG 정의
dag = DAG(
    'timezone_example',
    start_date=datetime(2023, 11, 1),
    schedule_interval=timedelta(days=1),  # 매일 실행
    catchup=False,  # 과거 실행을 잡지 않음
    timezone=timezone('Asia/Seoul'),  # 타임존 설정 (대한민국 표준시, UTC+9)
)

 

이 예시에서, DAG는 Asia/Seoul 타임존을 사용하도록 설정되어 있습니다. 즉, 한국 시간대인 대한민국 표준시 (UTC+9)를 기준으로 합니다.

위 DAG 실행의 경우, start_date인 2023년 11월 1일이 Asia/Seoul 시간으로 해석되고, 이에 따라 execution_date도 동적으로 설정됩니다. 그러나 이 값은 내부적으로는 항상 UTC로 저장됩니다.