2 분 소요

Airflow Task

Task는 airflow의 기본 실행단위로 한개 이상의 Task를 이용해서 하나의 DAG을 정의합니다. Task간 순서를 표현하기 위해 작업간 «(스트림업), »(스트림다운) 종속성을 설정하여 합니다. Task는

  • Operator : 지정한 작읍을 수행하는 Operator
  • Sensor : 어떤 조건이 만족하는지 주기적으로 스캔이 필요할 때 사용하며 조건이 만족하는 경우 Task가 수행.
  • Hook : DB나 서비스 같은 외부 시스템과 통신하기 위한 인터페이스를 제공하여 연결 상태를 유지 등을 사용할 수 있습니다.

Task Instance

DAG실행될 때 마다 Task Instance를 생성하여 Executor로 전달하여 해당작업을 실행합니다. 그리고 해당 Task Instance를 다시 Metadata로 보내서 상태를 업데이트하며, Task Instance의 작업이 남아 있으면 Executor로 다시 보내집니다. 작업이 완료가 되면 스케줄러에게 보냅니다. Operator Operator는 task를 어떻게 실행시킬지 정의합니다. 하나의 워크플로우안에서 한개 이상의 task를 정의할 수 있습니다. 하나의 Task가 하나의 Operator라고 할 수 있다. Operator는 Action Operator와 Transfer Operator로 구분됩니다.

  • Action Operator : 작업을 수행하거나 다른 시스템에 작업을 수행하도록 trigger합니다.
  • Transfer Operator : 특정 시스템에 다른 시스템으로 데이터를 이동
  • Sensor Operator : 특정 조건에 일치할 때 까지 기다렸다가, 만족되면 이후 과정을 진행하도록 기다려는 Operator.

Airflow는 기본 Operator는 Bash와 Python Operator가 대표적이며 그외 많은 Operator를 지원하고 있습니다. Operator에 공통적으로 **kwargs라는 keywoard Arguments를 전달하는 부분이 있으며, DAG을 정의할 때 default_args 전달하는 것처럼 전달합니다.

Task Dependencies

Apache Airflow의 DAG 내에 task들의 dependency를 설정함으로써 task 실행 순서와 병렬 실행 task들 등을 정의할 수 있습니다. Task 간 의존성은 다음과 같이

  • set_downstream 또는 » 기호
  • set_upstream 또는 « 기호 같은 함수 또는 기호로 설정할 수 있습니다. set_downstream 는 Task 실행 후에 수행할 task를 설정 set_upstream 는 Task 실행 전에 수행할 task를 설정

예시)

from airflow import DAG 
from airflow.operators.bash import BashOperator 
from airflow.operators.python import PythonOperator

from airflow.utils.dates import days_ago 
from datetime import datetime, timedelta 
from textwrap import dedent 

default_args = {
  'start_date': days_ago(1),
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
  'catchup': False
}

dag = DAG(
  'data_pipeline_ex09',
  default_args = default_args,
  description='Hello world',
  schedule_interval=timedelta(days=1)
)

def hello():
  print('Hello!')

t1 =  BashOperator(
    task_id='echo_hello',
    bash_command='echo "Hi from bash operator"',
    dag=dag
)

t2 = python_task = PythonOperator(
    task_id="python_task",
    python_callable=hello,
    dag=dag
)

templated_command = dedent(
  """
  
  """
)

t3 = BashOperator(
  task_id='templated',
  bash_command=templated_command,
  params={'my_param': 'Parameter I passed in'},
  dag=dag
)

t1 >> t2 >> t3
# t1.set_downstream(t2) 
# t3.set_upstream(t2)

t1, t2, t3 task가 순차적으로 실행됩니다.

t1.set_downstream(t2)

t2는 성공적으로 실행되는 t1에 의존하여 실행됩니다.

t2.set_upstream(t1)

비트 시프트 연산자를 사용하여 작업을 연결할 수도 있습니다.: t1 » t2

그리고 비트 시프트 연산자와의 업스트림 종속성 표기: t2 « t1

여러 종속성을 연결하는 것은 비트 시프트 연산자로 간결해집니다. t1 » t2 » t3

작업 목록을 종속성으로 설정할 수도 있습니다. 이러한 작업은 모두 동일한 효과를 갖습니다

t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

댓글남기기