Airflow Trigger_rules
Airflow Trigger rules
일반적으로 Task는 이전 Task들이 성공할 때만 실행됩니다. trigger rule이 default로 all_success이기 때문입니다. 기본적으로 모든 상위 작업이 성공하면 작업이 실행됩니다. 이 action은 일반적으로 기대하는 것입니다.
그러나 더 복잡한 것을 원한다면 어떻게 해야 할까요? 상위 task 중 한 개 task가 성공하자마자 작업을 수행하고 싶다면 어떻게? 아니면 작업이 실패하면 다른 작업 세트를 실행하고 싶습니까? 또는 작업이 성공하거나 실패하거나 이벤트가 건너 뛸 경우에 따라 다르게 action 을 해야 하는 경우?
좀더 복잡한 workflow는 Task간 다양한 의존성이 존재합니다. 일반적인 워크플로 동작은 모든 직접 업스트림 작업이 성공할 때 작업을 트리거하는 것이지만 Airflow는 더 복잡한 종속성 설정을 허용합니다. 이러한 다양한 의존성을 지원하기 위한 Trigger Rule들이 다음과 같이 존재합니다
모든 Operator 에는 생성된 작업이 트리거되는 규칙을 정의하는 trigger_rule 인수가 있습니다. trigger_rule 의 기본값은 all_success이며 “모든 직접 업스트림 작업이 성공하면 이 작업 트리거”로 정의할 수 있습니다. 여기에 설명된 다른 모든 규칙은 직접 상위 작업을 기반으로 하며 작업을 만드는 동안 모든위 Operator에게 전달할 수 있는 값입니다.
- all_success: (default) 모든 상위 Task가 성공한 경우
- all_failed: 모든 Parent Task가 실패 또는 upstream_failed 상태일 떄 하위 Task가 실행
- all_done: 모든 상위 Task가 완료된 경우 하위 Task 실행.
- one_failed: 적어도 한 부모가 실패하자마자 모든 부모가 완료 될 때까지 기다리지 않고 실행됩니다.
- one_success: 적어도 한 부모가 성공하자마자 모든 부모가 완료 될 때까지 기다리지 않고 Trigger 됩니다.
- none_failed: 모든 상위 Task가 실패가 없는 경우(failed or upstream_failed) i.e. 모든 부모가 성공했거나 skip인 경우 하위 Task 실행.
- none_skipped:상위 Task의 상태가 Skip이 없는 경우 하위 Task 실행, i.e. all parents are in a success, failed, or upstream_failed state
아래의 task Graph view 처럼 trigger rule이 all_success인 경우 end task는 실행되지 않고 skip하게 되는 경우도 있습니다.
어떤 dag은 이전 Task에 실패가 없고 최소한 한개 이상 성공한 경우end task를 항상 실행하고자 할 때 end task에 trigger rule을 none_failed_min_one_success 로 설정하면 다음과 같이 end task를 실행할 수 있습니다.
예시)
t1 = EmptyOperator(task_id=’end’, trigger_rule=’ none_failed_min_one_success’)
all_success
이것은 매우 간단하고 이미 보았습니다. 모든 업스트림 작업 (부모)이 성공했을 때 작업이 시작됩니다
all_failed
모든 상위 작업이 실패하면 Task C 는 작업이 Trigger 됩니다
all_done
모든 업스트림 작업 (상위)이 자신의 상태에 관계없이 실행을 수행하면 작업을 트리거하고 합니다. 이 트리거 규칙은 업스트림 작업의 상태에 관계없이 항상 실행하려는 작업이 있는 경우 유용 할 수 있습니다.
none_failed
모든 업스트림 작업이 성공하거나 skip이면 Task D는 트리거 됩니다
one_success
한 명의 상위 또는 업스트림 작업이 성공하자마자 트리거됩니다. 단 모든 상위 task가 종료될 때까지 기다리지 않습니다.
one_failed
한 개의 상위 또는 업스트림 작업이 최소한 1개라도 실패하면 task D는 trigger 됩니다.단 모든 상위 task가 종료될 때까지 기다리지 않습니다
none_failed_min_one_success
한 개의 상위 또는 업스트림 작업이 모두 실패가 없고 최소한 1개라도 성공하면 task D는 trigger 됩니다.단 모든 상위 task가 종료될 때까지 기다리지 않습니다
예제 )
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
# Utils
from airflow.utils.dates import days_ago
from datetime import timedelta
import random
default_args = {
'start_date': days_ago(1),
'retries': 0,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@daily',
'catchup': False
}
def check_condition():
num = random.randint(0, 10)
if num > 6:
return 'greater'
else:
return 'smaller'
with DAG(
dag_id = 'trigger_rule',
default_args = default_args,
tags = ['training']
) as dag:
start = EmptyOperator(task_id = 'start')
end = EmptyOperator(task_id='end', trigger_rule = 'all_done')
greater = BashOperator(task_id = 'greater', bash_command='echo "value is greater than 6" && sleep 30')
smaller = BashOperator(task_id = 'smaller', bash_command ='echo "value is smaller thant 6" && exit 1')
start >> [greater, smaller] >> end
댓글남기기