1 분 소요

Airflow Sensor

센서는 정확히 한 가지 일을 하도록 설계된 특별한 유형의 오퍼레이터입니다 - 무언가가 발생할 때까지 기다립니다. 시간 기반이거나 파일 또는 외부 이벤트를 기다리는 것일 수 있지만 어떤 일이 발생할 때까지 기다렸다가 해당조건을 만족하면 다운스트림 작업(이후 Task)을 실행할 수 있습니다.

https://airflow.apache.org/docs/apache-airflow/2.2.3/_api/airflow/sensors/index.html

Sensor Task는 주기적으로 체크하면 다음 단계로 진행하지 못하고 대기모드로 유지되기 때문에 Airflow DAG에서의 Sensor는 Worker의 슬롯 한 개를 점유합니다. Sensor는 BaseSensorOperator를 상속하여 구현합니다. BaseSensorOperator는 다음의 옵션을 지원합니다.

구분 타입 기본값 설명
poke_interval float 60 조건 확인을 위한 재시도 주기이며 단위는 second
timeout float 60 * 60 * 24 * 7 Sesnor의 조건 확인을 위해 대기하는 시간이며 단위는 Second
mode str “poke” poke는 특정 조건을 만족할 때까지 worker 슬롯 점유
reschedule은 조건을 확인할때만 worker 슬롯 점유

Sensor의 유형 중에 대표적인 Sensor는

  • FileSensor

이며 그외 다음과 같은 Sensor들이 있습니다.

구분 모듈 경로 설명
BaseSensor airflow.sensors.bash 조건 확인의 재시도 주기이며 단위는 Second
DatetimeSensor
DateTimeSensorAsync
airflow.sensors.date_time Sensor의 조건 확인 대기 시간이며 단위 Second
ExternalTaskSensor airflow.sensors.external_task 다른 DAG의 Task가 종료까지 대기하면서 모니터링하는 DAG의 external Task가 종료되면 ExternalTaskSensor가 실행

FileSensor 예시

from airflow.operators.empty import EmptyOperator
from airflow.sensors.filesystem import FileSensor 
from airflow import DAG 
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta  

default_args = {
  'start_date' : days_ago(1), 
  'retries': 1,
  'retry_delay': timedelta(minutes=5),
  'catchup': False
}

with DAG (
  dag_id ='filesensor',
  default_args = default_args, 
  schedule_interval='@daily'
) as dag:
  start = EmptyOperator(task_id='start')
  end = EmptyOperator(task_id='end')
  
  sensor_task = FileSensor(
    task_id='file_sensor_task', 
    poke_interval=10,
    fs_conn_id='file-conn',
    filepath='/opt/airflow/data/test1.csv'
  )
  
  start >> sensor_task >> end 

댓글남기기