2 분 소요

Airflow XCom

Airflow 작업(Task) 간에 데이터를 전달하는 첫 번째 방법은 작업 데이터를 공유하기 위한 주요 Airflow 기능인 XCom을 사용하는 것입니다. XCom은 task간 데이터를 공유가 필요할 때, 데이터를 공유하기 위해 push, pull 을 사용하여 값을 전달하고, 값을 가져올 수 있습니다.. XComs는 작업에서 보내는 의미의 “푸시” , 작업에서 수신하는 것을 의미하는 “pulled”일 수 있습니다. 푸시된 XCom은 에어플로우 메타데이터 데이터베이스에 저장되고 다른 모든 작업에서 사용할 수 있게 됩니다. 작업이 값을 반환할 때마다. Airflow에서는 여러 분산환경에서 서로 다른 Worker에서 Task가 실행 될 수 있기 때문에 Xcom을 사용합니다. Variable과 비슷하지만 Xcom은 특정 DAG내부에서만 공유되는 특징이 있습니다. 여러 DAG에서 공유해서 사용하려면 Variable을 사용해야 합니다. PythonOperator를 사용하면 return값이 자동으로 Xcom에 push됩니다.

Airflow UI의 Admin > XComs 메뉴에서XCom을 내용을 볼 수 있습니다. 다음과 같은 내용이 표시되어야 합니다.

XCom 사용시기 및 제약사항

XComs는 작업 간에 소량의 데이터를 전달하는 데 사용해야 합니다. 예를 들어 작업 메타데이터, 날짜, 모델 정확도 또는 단일 값 쿼리 결과는 모두 XCom과 함께 사용하기에 이상적인 데이터입니다. XCom으로 작은 데이터 세트를 전달하는 것을 막을 수는 없지만 그렇게 할 때는 매우 주의하십시오. 이것은 XCom이 설계된 목적이 아니며 팬더 데이터 프레임과 같은 데이터를 전달하는 데 사용하면 DAG의 성능이 저하되고 메타데이터 데이터베이스의 저장소를 차지할 수 있습니다. XCom은 작업 간에 큰 데이터 세트를 전달하는 데 사용할 수 없습니다. XCom의 크기 제한은 사용 중인 메타데이터 데이터베이스에 따라 결정됩니다

  • Postgres: 1 Gb
  • SQLite: 2 Gb
  • MySQL: 64 Kb

이러한 한계가 그리 크지 않다는 것을 알 수 있습니다. 데이터가 최대 허용 한도를 충족한다고 생각되더라도 XComs를 사용하지 마십시오. 대신 더 많은 양의 데이터에 더 적합한 중간 데이터 저장소를 사용하십시오.

Custom XCom Backends

커스텀 XCom 백엔드는 에어플로우 2.0 이상에서 사용할 수 있는 새로운 기능입니다. XCom 백엔드를 사용하면 Airflow의 메타데이터 데이터베이스의 기본값이 아닌 S3, GCS 또는 HDFS와 같은 외부 시스템에서 XCom을 푸시하고 풀 할 수 있습니다. 또한 사용자 고유의 직렬화 및 역직렬화 메서드를 구현하여 XCom이 처리되는 방법을 정의할 수 있습니다. 이것은 그 자체로 개념이며 사용자 지정 XCom 백엔드를 읽으면 더 많은 것을 배울 수 있습니다.

예시)

def push_func(**context):
  contenxt[task_instance].xcom_push(key=변수명, value=전달할 value)

def pull_func(**context):
  value=context[ti].xcom_pull(key=변수명, task_ids=대상 Task이름)

  • Xcom 예시 1)
from airflow import DAG 
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# Utils 
from datetime import datetime,timedelta 
from airflow.utils.dates import days_ago 

default_args = {
  'start_date': days_ago(1),
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
  'schedule_interval': '@daily',
  'tags': 'training',
  'catchup': False
} 

def xcom_push(**context):
  context['task_instance'].xcom_push(
    key='pushed_value',
    value='xcom_push_test_message!')

def pull_func(**context):
  value=context['ti'].xcom_pull(
    key='pushed_value', 
    task_ids='push_by_xcom'
  )
  print(value)

with DAG(
  dag_id='xcom_dag', 
  default_args = default_args 
) as dag:

  push_by_xcom = PythonOperator(
    task_id='push_by_xcom',
    python_callable=xcom_push
  )
  
  pull_task1 = PythonOperator(
    task_id='pull_example1',
    python_callable=pull_func
  )
  
  pull_task2 = BashOperator(
    task_id='pull_example2',
    bash_command='echo ""'
  )
  
  push_by_xcom >> pull_task1 >> pull_task2
  • Xcom 예시 2)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

import requests
import json

url = 'https://covidtracking.com/api/v1/states/'
state = 'wa'

def get_testing_increase(state, ti):
    """
    Gets totalTestResultsIncrease field from Covid API for given state and returns value
    """
    res = requests.get(url+'{0}/current.json'.format(state))
    testing_increase = json.loads(res.text)['totalTestResultsIncrease']

    ti.xcom_push(key='testing_increase', value=testing_increase)

def analyze_testing_increases(state, ti):
    """
    Evaluates testing increase results
    """
    testing_increases=ti.xcom_pull(key='testing_increase', task_ids='get_testing_increase_data_{0}'.format(state))
    print('Testing increases for {0}:'.format(state), testing_increases)
    #run some analysis here

# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG('xcom_dag',
         start_date=datetime(2021, 1, 1),
         max_active_runs=2,
         schedule_interval=timedelta(minutes=30),
         default_args=default_args,
         catchup=False
         ) as dag:

    opr_get_covid_data = PythonOperator(
        task_id = 'get_testing_increase_data_{0}'.format(state),
        python_callable=get_testing_increase,
        op_kwargs={'state':state}
    )

    opr_analyze_testing_data = PythonOperator(
        task_id = 'analyze_data',
        python_callable=analyze_testing_increases,
                op_kwargs={'state':state}
    )

    opr_get_covid_data >> opr_analyze_testing_data

댓글남기기