Airflow Operator
Airflow Operator
DAG을 구성하는 작업을 Task라고 하며, DAG이 수행할 작업을 의미합니다. 한개 이상의 Task를 pipeline으로 연결해서 하나의 DAG을 완성해야 합니다. Task에는
- Operator
- Sensor
- Hook
가 있습니다.
Operator에서는 대표적인. Bash, Python, Empty 또는 이전버전 Dummy Operator가 있습니다.
상세한 Operator 정보는 다음의 링크를 참고하세요
https://airflow.apache.org/docs/apache-airflow/2.2.3/operators-and-hooks-ref.html
Operator는
- Action Operator
- Transfer Operator
- Sensor Operator
로 구분됩니다.
Action Operator 는 작업을 수행하거나 다른 작업을 수행하도록 trigger합니다.
Transfer Operator는 특정 시스템에 다른 시스템으로 데이터를 이동합니다.
Sensor Operator는 특정 조건에 일치할 때 까지 기다렸다가, 만족되면 이후 과정을 진행하도록 기다려는 Operator.
여기에서는 대표적인 Operator 를 알아보겠습니다.
- EmptyOperator
- BashOperator
- PythonOperator
기외 주요 Operator는 다음과 같습니다.
구분 | 클래스 경로 | 설명 |
---|---|---|
BranchPythonOperator | airflow.operators.branch | 파이션 실행결과에 따른 분기를 설정하는 Operator |
TriggerDagRunOperator | airflow.operators.trigger_dagrun | 지정한 dag을 실행 |
ShortCircuitOperator | airflow.operators.python | bool 조건에 맞을 때만 실행 bool 연산 로직은 python_callable로 전달 |
EmailOperator | airflow.operators.email | 이메일 전송 |
그외 operator들은 다음의 링크 참고합니다.
https://airflow.apache.org/docs/apache-airflow/2.2.3/_api/airflow/operators/index.html
Bash Operator 예제
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'start_date': days_ago(1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@daily',
'catchup': False
}
with DAG(
dag_id = 'bash-op',
default_args = default_args,
tags = ['training']
) as dag:
start = EmptyOperator(
task_id = 'start'
)
end = EmptyOperator(
task_id = 'end'
)
bash_task = BashOperator(
task_id="test_bash",
bash_command = "echo 'This is the ds: \'$msg\''",
env = { "msg": ''}
)
start >> bash_task >> end
Python Operator 예제
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import time
default_args = {
'start_date': days_ago(1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@daily',
'tags': ['training'],
'catchup': False
}
def _sleep_func(sleep_time):
print('Start sleep {sleep_time} seconds'.format(sleep_time = sleep_time))
time.sleep(sleep_time)
with DAG(
dag_id = 'python-op',
default_args=default_args,
tags = ['training']
) as dag :
start = EmptyOperator(task_id = 'start_task')
end = EmptyOperator(task_id = 'end_task')
task1 = PythonOperator(
task_id = 'python_callable_task1',
python_callable = _sleep_func,
op_kwargs = {"sleep_time" : 10}
)
start >> task1 >> end
댓글남기기