Airflow XCom
Airflow XCom
Airflow 작업(Task) 간에 데이터를 전달하는 첫 번째 방법은 작업 데이터를 공유하기 위한 주요 Airflow 기능인 XCom을 사용하는 것입니다. XCom은 task간 데이터를 공유가 필요할 때, 데이터를 공유하기 위해 push, pull 을 사용하여 값을 전달하고, 값을 가져올 수 있습니다.. XComs는 작업에서 보내는 의미의 “푸시” , 작업에서 수신하는 것을 의미하는 “pulled”일 수 있습니다. 푸시된 XCom은 에어플로우 메타데이터 데이터베이스에 저장되고 다른 모든 작업에서 사용할 수 있게 됩니다. 작업이 값을 반환할 때마다. Airflow에서는 여러 분산환경에서 서로 다른 Worker에서 Task가 실행 될 수 있기 때문에 Xcom을 사용합니다. Variable과 비슷하지만 Xcom은 특정 DAG내부에서만 공유되는 특징이 있습니다. 여러 DAG에서 공유해서 사용하려면 Variable을 사용해야 합니다. PythonOperator를 사용하면 return값이 자동으로 Xcom에 push됩니다.
Airflow UI의 Admin > XComs 메뉴에서XCom을 내용을 볼 수 있습니다. 다음과 같은 내용이 표시되어야 합니다.
XCom 사용시기 및 제약사항
XComs는 작업 간에 소량의 데이터를 전달하는 데 사용해야 합니다. 예를 들어 작업 메타데이터, 날짜, 모델 정확도 또는 단일 값 쿼리 결과는 모두 XCom과 함께 사용하기에 이상적인 데이터입니다. XCom으로 작은 데이터 세트를 전달하는 것을 막을 수는 없지만 그렇게 할 때는 매우 주의하십시오. 이것은 XCom이 설계된 목적이 아니며 팬더 데이터 프레임과 같은 데이터를 전달하는 데 사용하면 DAG의 성능이 저하되고 메타데이터 데이터베이스의 저장소를 차지할 수 있습니다. XCom은 작업 간에 큰 데이터 세트를 전달하는 데 사용할 수 없습니다. XCom의 크기 제한은 사용 중인 메타데이터 데이터베이스에 따라 결정됩니다
- Postgres: 1 Gb
- SQLite: 2 Gb
- MySQL: 64 Kb
이러한 한계가 그리 크지 않다는 것을 알 수 있습니다. 데이터가 최대 허용 한도를 충족한다고 생각되더라도 XComs를 사용하지 마십시오. 대신 더 많은 양의 데이터에 더 적합한 중간 데이터 저장소를 사용하십시오.
Custom XCom Backends
커스텀 XCom 백엔드는 에어플로우 2.0 이상에서 사용할 수 있는 새로운 기능입니다. XCom 백엔드를 사용하면 Airflow의 메타데이터 데이터베이스의 기본값이 아닌 S3, GCS 또는 HDFS와 같은 외부 시스템에서 XCom을 푸시하고 풀 할 수 있습니다. 또한 사용자 고유의 직렬화 및 역직렬화 메서드를 구현하여 XCom이 처리되는 방법을 정의할 수 있습니다. 이것은 그 자체로 개념이며 사용자 지정 XCom 백엔드를 읽으면 더 많은 것을 배울 수 있습니다.
예시)
def push_func(**context):
contenxt[‘task_instance’].xcom_push(key=변수명, value=전달할 value)
def pull_func(**context):
value=context[‘ti’].xcom_pull(key=변수명, task_ids=대상 Task이름)
- Xcom 예시 1)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Utils
from datetime import datetime,timedelta
from airflow.utils.dates import days_ago
default_args = {
'start_date': days_ago(1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@daily',
'tags': 'training',
'catchup': False
}
def xcom_push(**context):
context['task_instance'].xcom_push(
key='pushed_value',
value='xcom_push_test_message!')
def pull_func(**context):
value=context['ti'].xcom_pull(
key='pushed_value',
task_ids='push_by_xcom'
)
print(value)
with DAG(
dag_id='xcom_dag',
default_args = default_args
) as dag:
push_by_xcom = PythonOperator(
task_id='push_by_xcom',
python_callable=xcom_push
)
pull_task1 = PythonOperator(
task_id='pull_example1',
python_callable=pull_func
)
pull_task2 = BashOperator(
task_id='pull_example2',
bash_command='echo ""'
)
push_by_xcom >> pull_task1 >> pull_task2
- Xcom 예시 2)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import requests
import json
url = 'https://covidtracking.com/api/v1/states/'
state = 'wa'
def get_testing_increase(state, ti):
"""
Gets totalTestResultsIncrease field from Covid API for given state and returns value
"""
res = requests.get(url+'{0}/current.json'.format(state))
testing_increase = json.loads(res.text)['totalTestResultsIncrease']
ti.xcom_push(key='testing_increase', value=testing_increase)
def analyze_testing_increases(state, ti):
"""
Evaluates testing increase results
"""
testing_increases=ti.xcom_pull(key='testing_increase', task_ids='get_testing_increase_data_{0}'.format(state))
print('Testing increases for {0}:'.format(state), testing_increases)
#run some analysis here
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('xcom_dag',
start_date=datetime(2021, 1, 1),
max_active_runs=2,
schedule_interval=timedelta(minutes=30),
default_args=default_args,
catchup=False
) as dag:
opr_get_covid_data = PythonOperator(
task_id = 'get_testing_increase_data_{0}'.format(state),
python_callable=get_testing_increase,
op_kwargs={'state':state}
)
opr_analyze_testing_data = PythonOperator(
task_id = 'analyze_data',
python_callable=analyze_testing_increases,
op_kwargs={'state':state}
)
opr_get_covid_data >> opr_analyze_testing_data
댓글남기기