Delta Lake w Pigułce (czyli o podróżach w czasie)

Delta Lake zdobywa ostatnio coraz większa popularność. Słychać o nim na konferencjach na całym świecie. W tym artykule przyjrzymy się jakie problemy rozwiązuje.

Co to Delta Lake?

Delta Lake to nakładka na Apache Spark. Pełni rolę warstwy persystencji i gwarantuje ACID na HDFS, S3, ADLS. Czemu jest to takie istotne?

Kiedyś porównałem różne formaty danych na HDFS. Porównałem tam czas odczytów, ale nie było nic o zapisie. HDFS dodaje i usuwa pliki. Update to tak naprawdę usunięcie i zapisanie, czyli nie ma mowy o atomiczności. Do tego proces zapisu może działać na wielu węzłach. Wyobraź sobie, że chcesz nadpisać dane i w połowie operacji występuje błąd. W rezultacie masz niespójne dane. Nie możesz wrócić do poprzedniej wersji. Jeśli zrobił to jeden ze współpracowników, ciężko wskazać kto zawinił. Jest to jeden z problemów, które rozwiązuje.

Jupyter + Delta Lake

Aby uprościć konfigurację środowiska do minimum, użyłem Docker. Poniżej docker compose. Wykorzystałem gotowy obraz jupyter/all-spark-notebook. Jedyny trik to zmienna środowiskowa, którą wykorzysta pyspark przy tworzeniu contextu.

version: '3'
services:
  notebook:
    image: jupyter/all-spark-notebook
    ports:
      - 8888:8888
    environment:
      - PYSPARK_SUBMIT_ARGS=--packages io.delta:delta-core_2.11:0.4.0 pyspark-shell
    volumes:
      - ./work:/home/jovyan/work

Przygotowanie danych

Nie jest ich dużo, ale nie o to chodzi. Dane zwierają imię i wiek, zmienna data3 dodatkowo zawiera rozmiar buta.

data1 = [ ['Maciej', 29], ['John', 31], ['Jack',35]]
data2 = [ ['Beck', 24], ['Wick', 21], ['Samantha',35]]
data3 = [ ['Homer', 23, 9], ['Barney', 24, 7], ['Edward', 45, 8]]
dataSchema = StructType([ \
    StructField("name", StringType()), \
    StructField("age", IntegerType()) \
    ])
differentSchema = StructType([ \
    StructField("name", StringType()), \
    StructField("age", IntegerType()), \
    StructField("foot_size", IntegerType()) \
    ])
df1 = spark.sparkContext.parallelize(data1).toDF(dataSchema)
df2 = spark.sparkContext.parallelize(data2).toDF(dataSchema)
df3 = spark.sparkContext.parallelize(data3).toDF(differentSchema)

Bez Delta Lake

Przypomnijmy jak to wygląda bez delta lake. Nadpisujemy dane i je bezpowrotnie tracimy. Czy zawsze chcemy do nich wracać? Nie, ale w niektórych przypadkach dobrze mieć taką możliwość.

Z Delta Lake

Jedyne co trzeba dodać to .format(„delta”)

df1.write.format("delta").save(path)

Historia i podróże w czasie

Delta Lake zapisuje dane w taki sposób, że nie tracimy tego, co było wcześniej.

Dane zostały kolejno nadpisane i dodane, ale ilość plików w katalogu ciągle rośnie.

Widzimy wszystkie zmiany dokonane na zbiorze. Możemy równiez odczytać konkretną wersję używając opcji versionAsOf. Można posłużyć się też timestamp-em.

Delta Lake pod spodem używa Parquet. Można w ten sposób odczytać delta, ale raczej nie ma to sensu.

Zarządzanie schematem

Jeśli kiedyś przypadkiem dodałeś kolumnę lub pomyliłeś typ danych, warto rozważyć delta lake. Nie bez powodu data3 posiada inny schemat. Sprawdźmy co się stanie jak spróbujemy dodać te dane do utworzonej delty.

Delta Lake nie pozwoli nam zmienić schmatu jeśli go nie „uprzedzimy”. Nie chcemy, aby zły format lub przypadkowa kolumna wpadła do naszych danych. Decyzję o zmianie schematu podejmujemy świadomie, stąd wymagana opcja .option(„mergeSchema”, „true”).

df3.write.mode("append").option("mergeSchema", "true").format("delta").save(path)
Nowy schemat
Wersje historyczne są bezpieczne.

Vacuum

Do wyczyszczenia historii służy metoda .vacuum(). Domyślnie Delta Lake trzyma dane 7 dni. Między innymi dlatego przykład ten nie ma sensu, bo wykonanie vacuum nic nie zmienia ?

Delta Lake API

Delete

Normalnie w DataFrame musielibyśmy odfiltrować wiersze, które chcemy usunąć. W tym przypadku możemy zrobić następująco:

Update

Podobnie z aktualizacją wartości

Merge

Jednak najciekawszą opcją jest merge. Umożliwia warunkowe operacje przy łączeniu dwóch zbiorów. O wydajnych upsert-ach możesz przeczytać w dokumentacji databricks

Aby zrobić przykład trochę bardziej „sensownym”, dodajmy kolumnę count i załóżmy, że kolumna name jest unikalna. Dodajmy dwa zbiory do siebie, jednocześnie zwiększając count, jeśli dany rekord się powtórzył.

deltaTable.alias("clients").merge(
    source = df3.alias("new_clients"),
    condition = expr("clients.name == new_clients.name")
  ).whenMatchedUpdate(set =
    {
      "count": col("clients.count") + 1
    }
  ).whenNotMatchedInsert(values =
    {
      "name": col("new_clients.name"),
      "age": col("new_clients.age"),
      "foot_size": col("new_clients.foot_size"),
      "count": lit("1")
    }
  ).execute()
Efekt końcowy merge. Usunięty wcześniej Edward wrócił do zbioru, a Barney ma count równy 2.

Streaming Sink

Delte można wykorzystać jako input/output w Spark Streaming. Aby to zobrazować, utworzyłem stream liczący średnią wieku z delty wykorzystanej w poprzednich przykładach.

Zwróć uwagę na linijkę nr 5. Jest konieczna, ponieważ dane zarówno usuwamy, jak i zmieniamy. Według dokumentacji:

Structured Streaming does not handle input that is not an append and throws an exception if any modifications occur on the table being used as a source. 

streaming_path = './data/example_1/delta_stream'
streaming_checkpoint = './data/example_1/delta_stream_checkpount'
stream = spark.readStream \
  .format("delta") \
  .option("ignoreChanges", "true") \
  .load(path) \
  .agg(avg(col('age')).alias("average")) \
  .writeStream \
  .format("delta") \
  .outputMode("complete") \
  .option("checkpointLocation", streaming_checkpoint) \
  .start(streaming_path)

Mamy dostęp do danych aktualnych i historycznych. Może pipe-owanie takich delt używając Spark Structured Streaming to dobry pomysł? Kiedyś się dowiemy.

Co jest pod maską?

To co się dzieje w folderze z danymi przypomina trochę git-a. Mamy pliki typu parquet oraz folder z metadanymi

maciej@ubuntu:~/Desktop/deltalake/work/data/example_1/delta$ ls
_delta_log
part-00000-0af6e4cc-78be-4c85-8d87-97fb80a0fa6e-c000.snappy.parquet
part-00000-12eecac6-7315-4d79-8f8e-58f21b5b18e2-c000.snappy.parquet
part-00000-5efb5622-8738-491a-8811-817294922727-c000.snappy.parquet
part-00000-7c029367-2807-4136-916a-3495c926728d-c000.snappy.parquet
part-00000-92eae11f-a036-4af6-a0d5-5fb8a5467cf7-c000.snappy.parquet
part-00000-9d838eda-f53f-4c13-9844-974689312738-c000.snappy.parquet
part-00000-d0acc365-4799-42ed-9ca2-d1480a2cddd7-c000.snappy.parquet
part-00000-f4b2c672-939c-45b4-bd7c-5ede2fde80e1-c000.snappy.parquet
part-00001-2bc5f331-370f-4219-a00b-7ae213826b73-c000.snappy.parquet
...
maciej@ubuntu:~/Desktop/deltalake/work/data/example_1/delta/_delta_log$ ll
total 48
drwxr-xr-x 2 maciej users  4096 Feb  6 23:15 ./
drwxr-xr-x 3 maciej users 12288 Feb  6 23:15 ../
-rw-r--r-- 1 maciej users  1260 Feb  6 23:11 00000000000000000000.json
-rw-r--r-- 1 maciej users  1411 Feb  6 23:12 00000000000000000001.json
-rw-r--r-- 1 maciej users   839 Feb  6 23:12 00000000000000000002.json
-rw-r--r-- 1 maciej users  1292 Feb  6 23:12 00000000000000000003.json
-rw-r--r-- 1 maciej users   478 Feb  6 23:14 00000000000000000004.json
-rw-r--r-- 1 maciej users  1525 Feb  6 23:15 00000000000000000005.json
-rw-r--r-- 1 maciej users  2312 Feb  6 23:15 00000000000000000006.json
-rw-r--r-- 1 maciej users  2170 Feb  6 23:15 00000000000000000007.json

Spójrzmy na jeden 0..4.json.

{"commitInfo":{"timestamp":1581027297648,"operation":"DELETE","operationParameters":{"predicate":"[\"(`name` = 'Edward')\"]"},"readVersion":3,"isBlindAppend":false}}
{"remove":{"path":"part-00003-02426997-e65a-4b08-b820-3614d2c74e01-c000.snappy.parquet","deletionTimestamp":1581027297642,"dataChange":true}}
{"add":{"path":"part-00000-7c029367-2807-4136-916a-3495c926728d-c000.snappy.parquet","partitionValues":{},"size":433,"modificationTime":1581027297000,"dataChange":true}}

predicate…’name’ = ‚Edward’... To operacja usuwania z przykładu wykorzystania Delta Lake API. Poniżej commitInfo widzimy dodanie oraz usunięcie plików parquet. Działa to podobnie jak git lub commit log w „standardowej” bazie danych. Po przetworzeniu wszystkich metadanych otrzymamy aktualną wersję.

Repo

https://github.com/zorteran/wiadro-danych-delta-lake-nutshell

2 myśli w temacie “Delta Lake w Pigułce (czyli o podróżach w czasie)”

  1. Super, dzięki za idealnie przejrzyście przedstawione przykłady!
    Do tej pory myślałem, że delta w zasadzie jest do ACID i merge, bez kwestii historii zmian w schema.
    A świat zmian w schema znałem pobieżnie tylko z punktu widzenia Avro schema evolution, niemniej jednak jakoś za głęboko w tym nie siedzę i wiedza jest szczątkowa.

    Od jakiegoś czasu śledzę artykuły (odkąd wrzucasz na grupę DC, jakoś od końca 2019). Dobra robota!

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *