1 분 소요

Install local-storage-class to kubernetes

Apache Airflow에서 DAG을 선언하는 다음과 같이 3가 유형이 있습니다.

  1. with DAG (Recommendation)
with DAG (
    dag_id=myFirstDag,
       default_args = default_args,
       schedule_interval=@daily,
       catchup=False
) as dag: 
  op = DummyOperator(task_id=dummy)

  • with DAG 예시
from airflow.models import DAG 
from airflow.providers.http.sensors.http import HttpSensor
from datetime import datetime, timedelta

default_args = {
    "owner": "airflow",
    "email_on_failure": False,
    "email_on_retry": False,
    "email": "admin@localhost.com",
    "retries": 1,
    "retry_delay": timedelta(minutes=5)
}

with DAG("date_pipeline_ex02", 
  start_date=datetime(2022,11,17),
  schedule_interval="@daily",
  default_args = default_args,
  catchup=False
) as dag:
  is_api_available = HttpSensor(
    task_id='is_api_available',
    http_conn_id='user_api',
    endpoint='api/'
  )

  is_api_available
  1. 표준 생성자 DAG
dag=DAG (
    dag_id=myFirstDag,
       default_args = default_args,
       schedule_interval=@daily,
       catchup=False
) 
start = EmptyOperator(task_id=start, dag=dag)

  • 표준 생성자 DAG 예시

from airflow import DAG 
from datetime import datetime, timedelta 
from airflow.operators.bash import BashOperator 
from airflow.utils.dates import days_ago 
import pendulum

# timezone 한국시간으로 변경
kst = pendulum.timezone("Asia/Seoul")

default_args = {
    "start_date": days_ago(1),
    "catchup": False,
}

dag = DAG(
    "myFirstDAG",
    default_args=default_args,
    schedule_interval="@daily"
)

def hello_airflow():
    print("Hello airflow")

t1 = BashOperator(
    task_id="bash",
    bash_command="echo Hello airflow",
    dag=dag
)

t1
  1. 데코레이터 DAG(@)
@dag (
    dag_id='data_pipeline_ex05',
       default_args = default_args,
       schedule_interval='@daily'
) 
def hello_dag():
    @task
    def hello_task1():
        print('Hello World')

    hello_task1()

# DAG 호출
dag = hello_dag()

  • 데코레이터 예시
from airflow.decorators import dag, task 
from datetime import datetime, timedelta 
from airflow.utils.dates import days_ago 

default_args = {
    "start_date": days_ago(1),
    "catchup": False,
}

@dag (
    dag_id='data_pipeline_ex05',
       default_args = default_args,
       schedule_interval='@daily'
) 
def hello_dag():
    @task
    def hello_task1():
        print('Hello World')

    hello_task1()

# DAG 호출
dag = hello_dag()

댓글남기기