Mówi się, że Apache Airflow to CRON na sterydach. Zdobywa uznanie wśród narzędzi do orchestracji ETL’i. Harmonogramowanie, zarządzanie i monitorowanie zadań mu nie straszne. Podstawowym sposobem definiowania zadań są acyklicze grafy skierowane (DAG). Zadania w nich muszą wymieniać się informacjami. We wpisie dowiesz się jak używać Variables i XCom w Apache Airflow.
Środowisko
W przypadku Apache Airflow dobrze sprawdza się wariant z wykorzystaniem Docker’a puckel/docker-airflow. Najczęściej używam wariantu docker-compose-LocalExecutor.yml.
sudo docker-compose -f docker-compose-LocalExecutor.yml up -d
Na co komu Variables i XCom?
Variables i XCom to zmienne używane w ramach środowiska Apache Airflow.
Variable to rodzaj globalnej zmiennej. Jeśli jakaś wartość używana jest przez wiele DAG’ów (a nie uśmiecha nam się edycja N plików w przypadku jej zmiany), możemy rozważyć wrzucenie jej do Variables.
XCom (od cross-communication) to wiadomości pozwalające na wymianę danych między zadaniami. Definiowane są jako klucz, wartość, timestamp i task/DAG z którego pochodzą. Każdy obiekt który da się pickle'ować może być wykorzystany jako XCom.
Dajmy na to, że napisałeś/aś aplikację w Apache Spark, która efekty pracy zapisuje w katalogu na HDFS/S3. Ścieżkę trzeba podać jako argument tej aplikacji, a ona generowana jest przez skrypt Python, który napisał Twój kolega z zespołu. Później, kolejne zadania pobierają te dane i robią z nimi kolejne cuda wianki. Wszedzie krąży parametr ze ścieżką. Po to właśnie jest XCom 😁.
UWAGA! Nie używaj XCom’ów do przesyłania dużej ilości danych! Wartości te zapisywane są w bazie używanej przez Apache Airflow. Wrzucanie olbrzymich obiektów Numpy to częsty przypadek nieprawidłowego używania XCom.
Przykład
Zadania DAG są proste:
- Pobrać (a jeśli nie istnieje, to wygenerować) wartość z
Variables - Na jej podstawie utworzyć kolejną wartość i dodać do
XCom - Iterować pobraną wartość z
Variablesi ją ponowanie zapisać - Pobrać datę za pomocą
BashOperatori dodać ją doXCom - Wyświetlić obie wartości w konsoli na zdalnej maszynie używając
SSHOperator
Przygotowanie Variables i XCom
Obsługa Variables dostępna jest poprzez Variable, a dokładnie metody get i set. W przypadku XCom, potrzebny jest context (provide_context=True), a do zapisania wartości do XCom służy metoda xcom_push. Poniżej kod pierwszego zadania.
...
def setup_var_and_xcom(**kwargs):
ti = kwargs['ti']
iterator = int(Variable.get('my_iterator', default_var=0))
ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}')
iterator += 1
Variable.set('my_iterator', iterator)
t1 = PythonOperator(
task_id='setup_var_and_xcom',
python_callable=setup_var_and_xcom,
provide_context=True,
dag=dag
)
...
Pobranie wyniku BashOperator do XCom
Według dokumentacji, argument xcom_push w operatorze umożliwia zapisanie do XCom ostatniej linii z stdout.
...
t2 = BashOperator(
task_id="get_date",
bash_command="date",
xcom_push=True,
dag=dag
)
...
Wyświetlenie XCom w zdalnej konsoli
W tym przypadku wykorzystamy SSHOperator w połączeniu z Jinja Templating. Wstrzykniemy dodane wcześniej do XCom wartości. Jedna wartość jest pobierana po kluczu, druga po id zadania.
...
t3 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='ssh_xcom_check',
command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"',
dag=dag)
...

Efekty
Wszystkie przekazywane wartości zostały prawidłowo wyświetlone w ostatnim zadaniu.

[2020-11-22 18:22:59,048] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,047] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.6p1)
[2020-11-22 18:22:59,065] {{logging_mixin.py:112}} INFO - [2020-11-22 18:22:59,065] {{transport.py:1819}} INFO - Authentication (password) successful!
[2020-11-22 18:22:59,065] {{ssh_operator.py:109}} INFO - Running command: echo "something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC"
[2020-11-22 18:22:59,767] {{ssh_operator.py:143}} INFO - something_important_6 Sun 22 Nov 2020 06:22:56 PM UTC
[2020-11-22 18:22:59,780] {{taskinstance.py:1048}} INFO - Marking task as SUCCESS.dag_id=var_xcom_example, task_id=ssh_xcom_check, execution_date=20201122T182252, start_date=20201122T182258, end_date=20201122T182259
[2020-11-22 18:23:08,905] {{logging_mixin.py:112}} INFO - [2020-11-22 18:23:08,905] {{local_task_job.py:103}} INFO - Task exited with return code 0
Utworzone XCom’y widoczne są w UI Apache Airflow

To samo tyczy się Variables.

Kod
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
default_args = {
"owner": "Maciej Szymczyk",
"depends_on_past": False,
"start_date": datetime(2020, 9, 27),
"email": ["maciej@wiadrodanych.pl"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=1)
}
dag = DAG("var_xcom_example",
default_args=default_args,
schedule_interval='@daily',
catchup=False)
def setup_var_and_xcom(**kwargs):
ti = kwargs['ti']
iterator = int(Variable.get('my_iterator', default_var=0))
ti.xcom_push(key='important_xcom_value', value = f'something_important_{iterator}')
iterator += 1
Variable.set('my_iterator', iterator)
t1 = PythonOperator(
task_id='setup_var_and_xcom',
python_callable=setup_var_and_xcom,
provide_context=True,
dag=dag
)
t2 = BashOperator(
task_id="get_date",
bash_command="date",
xcom_push=True,
dag=dag
)
t3 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='ssh_xcom_check',
command='echo "{{ ti.xcom_pull(key="important_xcom_value") }} {{ ti.xcom_pull(task_ids="get_date") }}"',
dag=dag)
t1 >> t2 >> t3
Wnioski
I w ten oto sposób można utworzyć prosty DAG w Apache Airflow wykorzystując XCom i Variables. W przyszłości postaram się dać jakiś bardziej życiowy przykład np. z wykorzystaniem Apache Spark.

