Jak Używać Variables i XCom w Apache Airflow?

apache-airflow-xcom-variables

Mówi się, że Apache Airflow to CRON na sterydach. Zdobywa uznanie wśród narzędzi do orchestracji ETL’i. Harmonogramowanie, zarządzanie i monitorowanie zadań mu nie straszne. Podstawowym sposobem definiowania zadań są acyklicze grafy skierowane (DAG). Zadania w nich muszą wymieniać się informacjami. We wpisie dowiesz się jak używać Variables i XCom w Apache Airflow.

Środowisko

W przypadku Apache Airflow dobrze sprawdza się wariant z wykorzystaniem Docker’a puckel/docker-airflow. Najczęściej używam wariantu docker-compose-LocalExecutor.yml.

sudo docker-compose -f docker-compose-LocalExecutor.yml up -d

Na co komu Variables i XCom?

Variables i XCom to zmienne używane w ramach środowiska Apache Airflow.

Variable to rodzaj globalnej zmiennej. Jeśli jakaś wartość używana jest przez wiele DAG’ów (a nie uśmiecha nam się edycja N plików w przypadku jej zmiany), możemy rozważyć wrzucenie jej do Variables.

XCom (od cross-communication) to wiadomości pozwalające na wymianę danych między zadaniami. Definiowane są jako klucz, wartość, timestamp i task/DAG z którego pochodzą. Każdy obiekt który da się pickle'ować może być wykorzystany jako XCom.

Dajmy na to, że napisałeś/aś aplikację w Apache Spark, która efekty pracy zapisuje w katalogu na HDFS/S3. Ścieżkę trzeba podać jako argument tej aplikacji, a ona generowana jest przez skrypt Python, który napisał Twój kolega z zespołu. Później, kolejne zadania pobierają te dane i robią z nimi kolejne cuda wianki. Wszedzie krąży parametr ze ścieżką. Po to właśnie jest XCom 😁.

UWAGA! Nie używaj XCom’ów do przesyłania dużej ilości danych! Wartości te zapisywane są w bazie używanej przez Apache Airflow. Wrzucanie olbrzymich obiektów Numpy to częsty przypadek nieprawidłowego używania XCom.

Przykład

Zadania DAG są proste:

  1. Pobrać (a jeśli nie istnieje, to wygenerować) wartość z Variables
  2. Na jej podstawie utworzyć kolejną wartość i dodać do XCom
  3. Iterować pobraną wartość z Variables i ją ponowanie zapisać
  4. Pobrać datę za pomocą BashOperator i dodać ją do XCom
  5. Wyświetlić obie wartości w konsoli na zdalnej maszynie używając SSHOperator

Przygotowanie Variables i XCom

Obsługa Variables dostępna jest poprzez Variable, a dokładnie metody get i set. W przypadku XCom, potrzebny jest context (provide_context=True), a do zapisania wartości do XCom służy metoda xcom_push. Poniżej kod pierwszego zadania.

...
def setup_var_and_xcom(**kwargs):
    ti = kwargs['ti']
    iterator = int(Variable.get('my_iterator', default_var=0))
    ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}')
    iterator += 1
    Variable.set('my_iterator', iterator)

t1 = PythonOperator(
    task_id='setup_var_and_xcom',
    python_callable=setup_var_and_xcom,
    provide_context=True,
    dag=dag
)
...

Pobranie wyniku BashOperator do XCom

Według dokumentacji, argument xcom_push w operatorze umożliwia zapisanie do XCom ostatniej linii z stdout.

...
t2 = BashOperator(
    task_id="get_date",
    bash_command="date",
    xcom_push=True,
    dag=dag
)
...

Wyświetlenie XCom w zdalnej konsoli

W tym przypadku wykorzystamy SSHOperator w połączeniu z Jinja Templating. Wstrzykniemy dodane wcześniej do XCom wartości. Jedna wartość jest pobierana po kluczu, druga po id zadania.

...
t3 = SSHOperator(
    ssh_conn_id='ssh_default',
    task_id='ssh_xcom_check',
    command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"',
    dag=dag)
...
Podany Connection Id wymaga wcześniejszej konfiguracji

Efekty

Wszystkie przekazywane wartości zostały prawidłowo wyświetlone w ostatnim zadaniu.

[2020-11-22 18:22:59,048] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,047] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.6p1)
[2020-11-22 18:22:59,065] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,065] {{transport.py:1819}} INFO - Authentication (password) successful!
[2020-11-22 18:22:59,065] {{ssh_operator.py:109}} INFO - Running command: echo "something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC"
[2020-11-22 18:22:59,767] {{ssh_operator.py:143}} INFO - something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC
[2020-11-22 18:22:59,780] {{taskinstance.py:1048}} INFO - Marking task as SUCCESS.dag_id=var_xcom_example, task_id=ssh_xcom_check, execution_date=20201122T182252, start_date=20201122T182258, end_date=20201122T182259
[2020-11-22 18:23:08,905] {{logging_mixin.py:112}} INFO - [2020-11-22 18:23:08,905] {{local_task_job.py:103}} INFO - Task exited with return code 0

Utworzone XCom’y widoczne są w UI Apache Airflow

To samo tyczy się Variables.

Kod

from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable

default_args = {
    "owner": "Maciej Szymczyk",
    "depends_on_past": False,
    "start_date": datetime(2020, 9, 27),
    "email": ["maciej@wiadrodanych.pl"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=1)
}

dag = DAG("var_xcom_example",
          default_args=default_args,
          schedule_interval='@daily',
          catchup=False)


def setup_var_and_xcom(**kwargs):
    ti = kwargs['ti']
    iterator = int(Variable.get('my_iterator', default_var=0))
    ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}')
    iterator += 1
    Variable.set('my_iterator', iterator)

t1 = PythonOperator(
    task_id='setup_var_and_xcom',
    python_callable=setup_var_and_xcom,
    provide_context=True,
    dag=dag
)

t2 = BashOperator(
    task_id="get_date",
    bash_command="date",
    xcom_push=True,
    dag=dag
)

t3 = SSHOperator(
    ssh_conn_id='ssh_default',
    task_id='ssh_xcom_check',
    command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"',
    dag=dag)

t1 >> t2 >> t3

Wnioski

I w ten oto sposób można utworzyć prosty DAG w Apache Airflow wykorzystując XCom i Variables. W przyszłości postaram się dać jakiś bardziej życiowy przykład np. z wykorzystaniem Apache Spark.

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *