Airflow DAG 선언 유형
Install local-storage-class to kubernetes
Apache Airflow에서 DAG을 선언하는 다음과 같이 3가 유형이 있습니다.
- with DAG (Recommendation)
with DAG (
dag_id=”myFirstDag”,
default_args = default_args,
schedule_interval=”@daily”,
catchup=False
) as dag:
op = DummyOperator(task_id=”dummy”)
- with DAG 예시
from airflow.models import DAG
from airflow.providers.http.sensors.http import HttpSensor
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"email_on_failure": False,
"email_on_retry": False,
"email": "admin@localhost.com",
"retries": 1,
"retry_delay": timedelta(minutes=5)
}
with DAG("date_pipeline_ex02",
start_date=datetime(2022,11,17),
schedule_interval="@daily",
default_args = default_args,
catchup=False
) as dag:
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
is_api_available
- 표준 생성자 DAG
dag=DAG (
dag_id=”myFirstDag”,
default_args = default_args,
schedule_interval=”@daily”,
catchup=False
)
start = EmptyOperator(task_id=”start”, dag=dag)
- 표준 생성자 DAG 예시
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
import pendulum
# timezone 한국시간으로 변경
kst = pendulum.timezone("Asia/Seoul")
default_args = {
"start_date": days_ago(1),
"catchup": False,
}
dag = DAG(
"myFirstDAG",
default_args=default_args,
schedule_interval="@daily"
)
def hello_airflow():
print("Hello airflow")
t1 = BashOperator(
task_id="bash",
bash_command="echo Hello airflow",
dag=dag
)
t1
- 데코레이터 DAG(@)
@dag (
dag_id='data_pipeline_ex05',
default_args = default_args,
schedule_interval='@daily'
)
def hello_dag():
@task
def hello_task1():
print('Hello World')
hello_task1()
# DAG 호출
dag = hello_dag()
- 데코레이터 예시
from airflow.decorators import dag, task
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
default_args = {
"start_date": days_ago(1),
"catchup": False,
}
@dag (
dag_id='data_pipeline_ex05',
default_args = default_args,
schedule_interval='@daily'
)
def hello_dag():
@task
def hello_task1():
print('Hello World')
hello_task1()
# DAG 호출
dag = hello_dag()
댓글남기기