Mówi się, że Apache Airflow to CRON na sterydach. Zdobywa uznanie wśród narzędzi do orchestracji ETL’i. Harmonogramowanie, zarządzanie i monitorowanie zadań mu nie straszne. Podstawowym sposobem definiowania zadań są acyklicze grafy skierowane (DAG). Zadania w nich muszą wymieniać się informacjami. We wpisie dowiesz się jak używać Variables i XCom w Apache Airflow.
Środowisko
W przypadku Apache Airflow dobrze sprawdza się wariant z wykorzystaniem Docker’a puckel/docker-airflow. Najczęściej używam wariantu docker-compose-LocalExecutor.yml.
sudo docker-compose -f docker-compose-LocalExecutor.yml up -d
Na co komu Variables i XCom?
Variables i XCom to zmienne używane w ramach środowiska Apache Airflow.
Variable to rodzaj globalnej zmiennej. Jeśli jakaś wartość używana jest przez wiele DAG’ów (a nie uśmiecha nam się edycja N plików w przypadku jej zmiany), możemy rozważyć wrzucenie jej do Variables.
XCom (od cross-communication) to wiadomości pozwalające na wymianę danych między zadaniami. Definiowane są jako klucz, wartość, timestamp i task/DAG z którego pochodzą. Każdy obiekt który da się pickle'ować może być wykorzystany jako XCom.
Dajmy na to, że napisałeś/aś aplikację w Apache Spark, która efekty pracy zapisuje w katalogu na HDFS/S3. Ścieżkę trzeba podać jako argument tej aplikacji, a ona generowana jest przez skrypt Python, który napisał Twój kolega z zespołu. Później, kolejne zadania pobierają te dane i robią z nimi kolejne cuda wianki. Wszedzie krąży parametr ze ścieżką. Po to właśnie jest XCom 😁.
UWAGA! Nie używaj XCom’ów do przesyłania dużej ilości danych! Wartości te zapisywane są w bazie używanej przez Apache Airflow. Wrzucanie olbrzymich obiektów Numpy to częsty przypadek nieprawidłowego używania XCom.
Przykład
Zadania DAG są proste:
- Pobrać (a jeśli nie istnieje, to wygenerować) wartość z Variables
- Na jej podstawie utworzyć kolejną wartość i dodać do XCom
- Iterować pobraną wartość z Variablesi ją ponowanie zapisać
- Pobrać datę za pomocą BashOperatori dodać ją doXCom
- Wyświetlić obie wartości w konsoli na zdalnej maszynie używając SSHOperator
Przygotowanie Variables i XCom
Obsługa Variables dostępna jest poprzez Variable, a dokładnie metody get i set. W przypadku XCom, potrzebny jest context (provide_context=True), a do zapisania wartości do XCom służy metoda xcom_push. Poniżej kod pierwszego zadania.
...
def setup_var_and_xcom(**kwargs):
    ti = kwargs['ti']
    iterator = int(Variable.get('my_iterator', default_var=0))
    ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}')
    iterator += 1
    Variable.set('my_iterator', iterator)
t1 = PythonOperator(
    task_id='setup_var_and_xcom',
    python_callable=setup_var_and_xcom,
    provide_context=True,
    dag=dag
)
...
Pobranie wyniku BashOperator do XCom
Według dokumentacji, argument xcom_push w operatorze umożliwia zapisanie do XCom ostatniej linii z stdout.
...
t2 = BashOperator(
    task_id="get_date",
    bash_command="date",
    xcom_push=True,
    dag=dag
)
...
Wyświetlenie XCom w zdalnej konsoli
W tym przypadku wykorzystamy SSHOperator w połączeniu z Jinja Templating. Wstrzykniemy dodane wcześniej do XCom wartości. Jedna wartość jest pobierana po kluczu, druga po id zadania.
...
t3 = SSHOperator(
    ssh_conn_id='ssh_default',
    task_id='ssh_xcom_check',
    command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"',
    dag=dag)
...

Efekty
Wszystkie przekazywane wartości zostały prawidłowo wyświetlone w ostatnim zadaniu.

[2020-11-22 18:22:59,048] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,047] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.6p1)
[2020-11-22 18:22:59,065] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,065] {{transport.py:1819}} INFO - Authentication (password) successful!
[2020-11-22 18:22:59,065] {{ssh_operator.py:109}} INFO - Running command: echo "something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC"
[2020-11-22 18:22:59,767] {{ssh_operator.py:143}} INFO - something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC
[2020-11-22 18:22:59,780] {{taskinstance.py:1048}} INFO - Marking task as SUCCESS.dag_id=var_xcom_example, task_id=ssh_xcom_check, execution_date=20201122T182252, start_date=20201122T182258, end_date=20201122T182259
[2020-11-22 18:23:08,905] {{logging_mixin.py:112}} INFO - [2020-11-22 18:23:08,905] {{local_task_job.py:103}} INFO - Task exited with return code 0
Utworzone XCom’y widoczne są w UI Apache Airflow

To samo tyczy się Variables.

Kod
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
default_args = {
    "owner": "Maciej Szymczyk",
    "depends_on_past": False,
    "start_date": datetime(2020, 9, 27),
    "email": ["maciej@wiadrodanych.pl"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=1)
}
dag = DAG("var_xcom_example",
          default_args=default_args,
          schedule_interval='@daily',
          catchup=False)
def setup_var_and_xcom(**kwargs):
    ti = kwargs['ti']
    iterator = int(Variable.get('my_iterator', default_var=0))
    ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}')
    iterator += 1
    Variable.set('my_iterator', iterator)
t1 = PythonOperator(
    task_id='setup_var_and_xcom',
    python_callable=setup_var_and_xcom,
    provide_context=True,
    dag=dag
)
t2 = BashOperator(
    task_id="get_date",
    bash_command="date",
    xcom_push=True,
    dag=dag
)
t3 = SSHOperator(
    ssh_conn_id='ssh_default',
    task_id='ssh_xcom_check',
    command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"',
    dag=dag)
t1 >> t2 >> t3
Wnioski
I w ten oto sposób można utworzyć prosty DAG w Apache Airflow wykorzystując XCom i Variables. W przyszłości postaram się dać jakiś bardziej życiowy przykład np. z wykorzystaniem Apache Spark.

 
	