Delta Lake zdobywa ostatnio coraz większa popularność. Słychać o nim na konferencjach na całym świecie. W tym artykule przyjrzymy się jakie problemy rozwiązuje.
Co to Delta Lake?
Delta Lake to nakładka na Apache Spark. Pełni rolę warstwy persystencji i gwarantuje ACID na HDFS, S3, ADLS. Czemu jest to takie istotne?
Kiedyś porównałem różne formaty danych na HDFS. Porównałem tam czas odczytów, ale nie było nic o zapisie. HDFS dodaje i usuwa pliki. Update to tak naprawdę usunięcie i zapisanie, czyli nie ma mowy o atomiczności. Do tego proces zapisu może działać na wielu węzłach. Wyobraź sobie, że chcesz nadpisać dane i w połowie operacji występuje błąd. W rezultacie masz niespójne dane. Nie możesz wrócić do poprzedniej wersji. Jeśli zrobił to jeden ze współpracowników, ciężko wskazać kto zawinił. Jest to jeden z problemów, które rozwiązuje.
Jupyter + Delta Lake
Aby uprościć konfigurację środowiska do minimum, użyłem Docker. Poniżej docker compose. Wykorzystałem gotowy obraz jupyter/all-spark-notebook. Jedyny trik to zmienna środowiskowa, którą wykorzysta pyspark przy tworzeniu contextu.
version: '3'
services:
notebook:
image: jupyter/all-spark-notebook
ports:
- 8888:8888
environment:
- PYSPARK_SUBMIT_ARGS=--packages io.delta:delta-core_2.11:0.4.0 pyspark-shell
volumes:
- ./work:/home/jovyan/work
Przygotowanie danych
Nie jest ich dużo, ale nie o to chodzi. Dane zwierają imię i wiek, zmienna data3 dodatkowo zawiera rozmiar buta.
data1 = [ ['Maciej', 29], ['John', 31], ['Jack',35]]
data2 = [ ['Beck', 24], ['Wick', 21], ['Samantha',35]]
data3 = [ ['Homer', 23, 9], ['Barney', 24, 7], ['Edward', 45, 8]]
dataSchema = StructType([ \
StructField("name", StringType()), \
StructField("age", IntegerType()) \
])
differentSchema = StructType([ \
StructField("name", StringType()), \
StructField("age", IntegerType()), \
StructField("foot_size", IntegerType()) \
])
df1 = spark.sparkContext.parallelize(data1).toDF(dataSchema)
df2 = spark.sparkContext.parallelize(data2).toDF(dataSchema)
df3 = spark.sparkContext.parallelize(data3).toDF(differentSchema)
Bez Delta Lake
Przypomnijmy jak to wygląda bez delta lake. Nadpisujemy dane i je bezpowrotnie tracimy. Czy zawsze chcemy do nich wracać? Nie, ale w niektórych przypadkach dobrze mieć taką możliwość.
Z Delta Lake
Jedyne co trzeba dodać to .format(“delta”)
df1.write.format("delta").save(path)
Historia i podróże w czasie
Delta Lake zapisuje dane w taki sposób, że nie tracimy tego, co było wcześniej.
Dane zostały kolejno nadpisane i dodane, ale ilość plików w katalogu ciągle rośnie.
Widzimy wszystkie zmiany dokonane na zbiorze. Możemy równiez odczytać konkretną wersję używając opcji versionAsOf. Można posłużyć się też timestamp-em.
Zarządzanie schematem
Jeśli kiedyś przypadkiem dodałeś kolumnę lub pomyliłeś typ danych, warto rozważyć delta lake. Nie bez powodu data3 posiada inny schemat. Sprawdźmy co się stanie jak spróbujemy dodać te dane do utworzonej delty.
Delta Lake nie pozwoli nam zmienić schmatu jeśli go nie “uprzedzimy”. Nie chcemy, aby zły format lub przypadkowa kolumna wpadła do naszych danych. Decyzję o zmianie schematu podejmujemy świadomie, stąd wymagana opcja .option(“mergeSchema”, “true”).
df3.write.mode("append").option("mergeSchema", "true").format("delta").save(path)
Vacuum
Do wyczyszczenia historii służy metoda .vacuum(). Domyślnie Delta Lake trzyma dane 7 dni. Między innymi dlatego przykład ten nie ma sensu, bo wykonanie vacuum nic nie zmienia ?
Delta Lake API
Delete
Normalnie w DataFrame musielibyśmy odfiltrować wiersze, które chcemy usunąć. W tym przypadku możemy zrobić następująco:
Update
Podobnie z aktualizacją wartości
Merge
Jednak najciekawszą opcją jest merge. Umożliwia warunkowe operacje przy łączeniu dwóch zbiorów. O wydajnych upsert-ach możesz przeczytać w dokumentacji databricks
Aby zrobić przykład trochę bardziej “sensownym”, dodajmy kolumnę count i załóżmy, że kolumna name jest unikalna. Dodajmy dwa zbiory do siebie, jednocześnie zwiększając count, jeśli dany rekord się powtórzył.
deltaTable.alias("clients").merge(
source = df3.alias("new_clients"),
condition = expr("clients.name == new_clients.name")
).whenMatchedUpdate(set =
{
"count": col("clients.count") + 1
}
).whenNotMatchedInsert(values =
{
"name": col("new_clients.name"),
"age": col("new_clients.age"),
"foot_size": col("new_clients.foot_size"),
"count": lit("1")
}
).execute()
Streaming Sink
Delte można wykorzystać jako input/output w Spark Streaming. Aby to zobrazować, utworzyłem stream liczący średnią wieku z delty wykorzystanej w poprzednich przykładach.
Zwróć uwagę na linijkę nr 5. Jest konieczna, ponieważ dane zarówno usuwamy, jak i zmieniamy. Według dokumentacji:
Structured Streaming does not handle input that is not an append and throws an exception if any modifications occur on the table being used as a source.
streaming_path = './data/example_1/delta_stream'
streaming_checkpoint = './data/example_1/delta_stream_checkpount'
stream = spark.readStream \
.format("delta") \
.option("ignoreChanges", "true") \
.load(path) \
.agg(avg(col('age')).alias("average")) \
.writeStream \
.format("delta") \
.outputMode("complete") \
.option("checkpointLocation", streaming_checkpoint) \
.start(streaming_path)
Mamy dostęp do danych aktualnych i historycznych. Może pipe-owanie takich delt używając Spark Structured Streaming to dobry pomysł? Kiedyś się dowiemy.
Co jest pod maską?
To co się dzieje w folderze z danymi przypomina trochę git-a. Mamy pliki typu parquet oraz folder z metadanymi
maciej@ubuntu:~/Desktop/deltalake/work/data/example_1/delta$ ls
_delta_log
part-00000-0af6e4cc-78be-4c85-8d87-97fb80a0fa6e-c000.snappy.parquet
part-00000-12eecac6-7315-4d79-8f8e-58f21b5b18e2-c000.snappy.parquet
part-00000-5efb5622-8738-491a-8811-817294922727-c000.snappy.parquet
part-00000-7c029367-2807-4136-916a-3495c926728d-c000.snappy.parquet
part-00000-92eae11f-a036-4af6-a0d5-5fb8a5467cf7-c000.snappy.parquet
part-00000-9d838eda-f53f-4c13-9844-974689312738-c000.snappy.parquet
part-00000-d0acc365-4799-42ed-9ca2-d1480a2cddd7-c000.snappy.parquet
part-00000-f4b2c672-939c-45b4-bd7c-5ede2fde80e1-c000.snappy.parquet
part-00001-2bc5f331-370f-4219-a00b-7ae213826b73-c000.snappy.parquet
...
maciej@ubuntu:~/Desktop/deltalake/work/data/example_1/delta/_delta_log$ ll
total 48
drwxr-xr-x 2 maciej users 4096 Feb 6 23:15 ./
drwxr-xr-x 3 maciej users 12288 Feb 6 23:15 ../
-rw-r--r-- 1 maciej users 1260 Feb 6 23:11 00000000000000000000.json
-rw-r--r-- 1 maciej users 1411 Feb 6 23:12 00000000000000000001.json
-rw-r--r-- 1 maciej users 839 Feb 6 23:12 00000000000000000002.json
-rw-r--r-- 1 maciej users 1292 Feb 6 23:12 00000000000000000003.json
-rw-r--r-- 1 maciej users 478 Feb 6 23:14 00000000000000000004.json
-rw-r--r-- 1 maciej users 1525 Feb 6 23:15 00000000000000000005.json
-rw-r--r-- 1 maciej users 2312 Feb 6 23:15 00000000000000000006.json
-rw-r--r-- 1 maciej users 2170 Feb 6 23:15 00000000000000000007.json
Spójrzmy na jeden 0..4.json.
{"commitInfo":{"timestamp":1581027297648,"operation":"DELETE","operationParameters":{"predicate":"[\"(`name` = 'Edward')\"]"},"readVersion":3,"isBlindAppend":false}}
{"remove":{"path":"part-00003-02426997-e65a-4b08-b820-3614d2c74e01-c000.snappy.parquet","deletionTimestamp":1581027297642,"dataChange":true}}
{"add":{"path":"part-00000-7c029367-2807-4136-916a-3495c926728d-c000.snappy.parquet","partitionValues":{},"size":433,"modificationTime":1581027297000,"dataChange":true}}
predicate…’name’ = ‘Edward’... To operacja usuwania z przykładu wykorzystania Delta Lake API. Poniżej commitInfo widzimy dodanie oraz usunięcie plików parquet. Działa to podobnie jak git lub commit log w “standardowej” bazie danych. Po przetworzeniu wszystkich metadanych otrzymamy aktualną wersję.
Repo
https://github.com/zorteran/wiadro-danych-delta-lake-nutshell
Super, dzięki za idealnie przejrzyście przedstawione przykłady!
Do tej pory myślałem, że delta w zasadzie jest do ACID i merge, bez kwestii historii zmian w schema.
A świat zmian w schema znałem pobieżnie tylko z punktu widzenia Avro schema evolution, niemniej jednak jakoś za głęboko w tym nie siedzę i wiedza jest szczątkowa.
Od jakiegoś czasu śledzę artykuły (odkąd wrzucasz na grupę DC, jakoś od końca 2019). Dobra robota!
Dzięki za feedback! Morele rosną po takim komentarzu 🙂 Ciesze się, że wpis się przydał.