Jak zacząć z Apache Spark i Cassandra

Apache Cassandra to specyficzna baza danych. Skaluje się (uwaga) liniowo. Ma to swoją cenę: specyficzne modelowanie tabel, konfigurowalna spójność i ograniczona analityka. Apple wykonuje miliony operacji na sekundę na ponad 160 tys. instancjach Cassandry. Gromadzi przy tym ponad 100 PB danych. Ograniczoną analitykę można „wyleczyć” wykorzystując Apache Spark i connector od DataStax i o tym jest ten wpis.

Środowisko

Wykorzystałem jeden węzeł Apache Cassandra postawiony na Docker’ze.

version: '3'

services:
  cassandra:
    image: cassandra:latest
    ports:
      - "9042:9042"

Apache Spark w wersji 3.0 uruchamiany jest z linii komend. Dołączona jest paczka z connectorem oraz biblioteką, która przyda się przy timeuuid. Wersja connectora kompatybilna z wersją Spark’a.

./spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta,com.datastax.cassandra:cassandra-driver-core:3.9.0

Jeśli Cassandra nie jest postawiona lokalnie, trzeba ją wskazać.

spark.conf.set("spark.cassandra.connection.host", "127.0.0.1")

Dane

Aby sprawdzić działanie kombinacj Spark + Cassandra, wygenerowałem dane za pomocą mockaroo.com. Jest to lista sensorów oraz lista pomiarów z tych sensorów. Znajdziesz je w repozytorium na GitHub.

maciej@ubuntu:~/Desktop/spark_and_cassandra$ head sensor_reads.csv 
date,sensor_id,temperature,wind_speed,wind_direction
2020-02-20 13:00:57,11,90.42,72.91,153
2020-05-28 21:31:03,9,51.62,20.07,255
2020-06-04 16:32:02,3,6.68,89.31,309
...

Utworzenie tabel w Apache Cassandra

Apache Cassandra ma dedykowane narzędzie o nazwie cqlsh. Kaśka jest w Dockerze, więc trzeba użyć poniższej komendy.

sudo docker exec -it container_name cqlsh

Najpierw musimy utworzyć keyspace, czyli worek, w którym będą nasze tabele.

CREATE KEYSPACE spark_playground WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };

Informacje o sensorach będą trzymane w tabeli sensors. Nic nadzwyczajnego: id, lokalizacja, id grupy. Zwróć uwagę na to, że kluczem głównym jest id. Czyli nie zrobimy WHERE po pozostałych kolumnach bez całkowitego skanu tabeli (wspominałem o specyficznym modelowaniu danych?), ale filtrowanie po kluczu będzie super szybkie.

CREATE TABLE sensors ( sensor_id int, location text, group_id int, PRIMARY KEY (sensor_id ));

Pomiary będą w tabeli sensors_reads. Klucz składa się z id sensora (partition key) i daty w postaci timeuuid (clustering key). Partition key wskazuje, gdzie znajduje się rekord (w jakim węźle). Timeuuid jako clustering key umożliwia sortowanie rekordów. Jeśli pogubiłeś się, co jest czym, zerknij na wytłumaczenie na stackoverflow. Z wyborem partition key trzeba uważać, od tego zależy czy węzły będą równomiernie wypełnione danymi.

CREATE TABLE sensors_reads ( sensor_id int, date timeuuid, temp double, humidity double, wind_speed double, wind_direction double, PRIMARY KEY (sensor_id, date ));

Zasilenie Cassandry Sparkiem

Wczytanie plików csv

val sensors = spark.read.format("csv").option("header", "true").load("/home/maciej/Desktop/spark_and_cassandra/sensors.csv")
val sensorReads = spark.read.format("csv").option("header", "true").load("/home/maciej/Desktop/spark_and_cassandra/sensor_reads.csv")

Zapis do tabeli z sensorami

Zarówno typy jak i nazwy kolumn zgadzają się, więc operacja jest prosta.

sensors.write
    .format("org.apache.spark.sql.cassandra")
    .option("keyspace","spark_playground")
    .option("table","sensors")
    .mode("append")
    .save()

Zapis do tabeli z pomiarami

Tutaj sytuacja się komplikuje:

  • Niespójność nazw w kolumnie z temperaturą. temperature -> temp
  • Niespójność typów w kolumnie z datą. Z CSV odczytaliśmy string, natomiast w tabeli jest timeuuid. Musimy posłużyć się biblioteką klienta Cassandry, aby dokonać konwersji.

Problem niespójności typów można rozwiązać odpowiednim UDF (User Defined Function). Przy okazji sprawdzi się, czy funkcja odwrotna zwróci tą samą datę. Normalnie kombinowałbym z testem w IntelliJ, skoro spark-shell to trzeba ręcznie 😊.

import spark.implicits._
import com.datastax.driver.core.utils.UUIDs
import org.apache.spark.sql.functions.udf

val toTimeuuid: java.sql.Timestamp => String = x => UUIDs.startOf(x.getTime()).toString()
val fromTimeuuid: String => java.sql.Timestamp = x => new java.sql.Timestamp(UUIDs.unixTimestamp(java.util.UUID.fromString(x)))

val toTimeuuidUDF = udf(toTimeuuid)
val fromTimeuuidUDF = udf(fromTimeuuid)

sensorsReads
    .withColumn("date_as_timestamp", to_timestamp($"date"))
    .withColumn("date_as_timeuuid", toTimeuuidUDF($"date_as_timestamp"))
    .withColumn("timestamp_from_timeuuid",fromTimeuuidUDF($"date_as_timeuuid"))
    .show(false)
Wygląda całkiem nieźle

Pozostaje pozbyć się niepotrzebnych kolumn, zmienić nazwę kolumny z temperaturą i można wrzucać.

val sensorsReads_fixed = sensorsReads
                            .withColumn("date_as_timestamp", to_timestamp($"date"))
                            .withColumn("date_as_timeuuid", toTimeuuidUDF($"date_as_timestamp"))
                            .drop("date").drop("date_as_timestamp")
                            .withColumnRenamed("date_as_timeuuid","date")
                            .withColumnRenamed("temperature","temp")
sensorsReads_fixed.write
    .format("org.apache.spark.sql.cassandra")
    .option("keyspace","spark_playground")
    .option("table","sensors_reads")
    .mode("append")
    .save()

Odczyt z Cassandry

Jest możliwość uproszczenia sobie współpracy z Cassandrą. Wystarczy odpowiednio skonfigurować Spark Context.

spark.conf.set("spark.sql.catalog.casscatalog","com.datastax.spark.connector.datasource.CassandraCatalog")

I możemy w prostszy sposób wskazywać keyspace i tabelę.

scala> spark.read.table("casscatalog.spark_playground.sensors").show()
+---------+--------+--------------------+
|sensor_id|group_id|            location|
+---------+--------+--------------------+
|        2|       4|       027 Heath Way|
|       13|       3|  42585 Ramsey Alley|
|        4|       1|     676 Marcy Point|
|        5|       3|260 Steensland Cr...|
|        9|       3|9385 Comanche Ter...|
|        8|       2|291 Meadow Ridge ...|
|        7|       2|     716 Randy Point|
|       14|       2|    331 Mcbride Road|
|       15|       3|     91 Gateway Hill|
|        1|       3|      66 Vera Avenue|
|        6|       4|87212 Lake View S...|
|       12|       2|    12 Montana Place|
|       10|       3|      60 Spohn Plaza|
|       11|       1|    48 Redwing Court|
|        3|       3|        930 Almo Way|
+---------+--------+--------------------+
val sensors_table = spark.read.table("casscatalog.spark_playground.sensors")
val sensors_reads_table = spark.read.table("casscatalog.spark_playground.sensors_reads")

Koszt operacji

Korzystając ze Sparka wszystko wydaje się łatwe i przyjemne. Musimy pamiętać, że pod spodem jest Cassandra, która ma swój sposób obsługi zapytań. Chodzi mi głównie o szybkość pobierania rekordów szukanych na podstawie kluczy i wartości.

scala> sensors_table.filter("sensor_id in (1,2,3)").select("sensor_id","location").explain
20/07/21 11:09:50 INFO V2ScanRelationPushDown: 
Pushing operators to sensors
Pushed Filters: In(sensor_id, [1,2,3])
Post-Scan Filters: 
Output: sensor_id#749, location#751
         
== Physical Plan ==
*(1) Project [sensor_id#749, location#751]
+- BatchScan[sensor_id#749, location#751] Cassandra Scan: spark_playground.sensors
 - Cassandra Filters: [["sensor_id" IN (?, ?, ?), 1]]
 - Requested Columns: [sensor_id,location]

Filtr po sensor_id dzieje się na etapie pobrania danych z Cassandry. Filtr po group_id wymaga przeskanowania całej tabeli.

scala> sensors_table.filter("group_id in (1,2,3)").select("sensor_id","location").explain
20/07/21 11:14:34 INFO V2ScanRelationPushDown: 
Pushing operators to sensors
Pushed Filters: 
Post-Scan Filters: group_id#750 IN (1,2,3)
Output: sensor_id#749, group_id#750, location#751
         
== Physical Plan ==
*(1) Project [sensor_id#749, location#751]
+- *(1) Filter group_id#750 IN (1,2,3)
   +- BatchScan[sensor_id#749, group_id#750, location#751] Cassandra Scan: spark_playground.sensors
 - Cassandra Filters: []
 - Requested Columns: [sensor_id,group_id,location]

Proste agregacje

Załóżmy, że pojawiła się potrzeba zrobienia zestawienia liczby sensorów w grupach. Nie przewidzieliśmy tego na poziomie projektowania bazy i CQL nam nie wystarczy.

cqlsh:spark_playground> SELECT group_id, count(1) FROM sensors GROUP BY group_id;
InvalidRequest: Error from server: code=2200 [Invalid query] message="Group by is currently only supported on the columns of the PRIMARY KEY, got group_id"

Więc albo robimy to po stronie aplikacji klienckiej, albo korzystamy z Apache Spark.

scala> sensors_table.groupBy("group_id").count.show
+--------+-----+                                                                
|group_id|count|
+--------+-----+
|       1|    2|
|       3|    7|
|       4|    2|
|       2|    4|
+--------+-----+

Złączenia – joinWithCassandraTable

Najprostszym sposobem na złączenie jest pobranie dwóch zbiorów i wykonanie iloczynu kartezjańskiego. Connector umożliwia jednak szybsze rozwiązanie. W wersji RDD była metoda joinWithCassandraTable, natomiast w DataFrame jest Direct Join, którego dokumentacja, jak widać, jest znikoma. joinWithCassandraTable wykonuje jedno zapytanie dla każdej partycji wymaganej przez źródłowy RDD. W DF dzieje się to automagicznie.

scala> sensors_reads_table.join(sensors_table).explain
...    
== Physical Plan ==
CartesianProduct
:- *(1) Project [date#755, sensor_id#756, humidity#757, temp#758, wind_direction#759, wind_speed#760]
:  +- BatchScan[date#755, sensor_id#756, humidity#757, temp#758, wind_direction#759, wind_speed#760] Cassandra Scan: spark_playground.sensors_reads
 - Cassandra Filters: []
 - Requested Columns: [date,sensor_id,humidity,temp,wind_direction,wind_speed]
+- *(2) Project [sensor_id#749, group_id#750, location#751]
   +- BatchScan[sensor_id#749, group_id#750, location#751] Cassandra Scan: spark_playground.sensors
 - Cassandra Filters: []
 - Requested Columns: [sensor_id,group_id,location]

Teoretycznie ta sama operacja, ale wskazanie złączenia po kluczach generuje o wiele wydajniejszy plan wykonania.

scala> sensors_reads_table.join(sensors_table, sensors_reads_table("sensor_id") === sensors_table("sensor_id"), "inner").explain
...
== Physical Plan ==
*(5) SortMergeJoin [sensor_id#756], [sensor_id#749], Inner
:- *(2) Sort [sensor_id#756 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(sensor_id#756, 200), true, [id=#812]
:     +- *(1) Project [date#755, sensor_id#756, humidity#757, temp#758, wind_direction#759, wind_speed#760]
:        +- BatchScan[date#755, sensor_id#756, humidity#757, temp#758, wind_direction#759, wind_speed#760] Cassandra Scan: spark_playground.sensors_reads
 - Cassandra Filters: []
 - Requested Columns: [date,sensor_id,humidity,temp,wind_direction,wind_speed]
+- *(4) Sort [sensor_id#749 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(sensor_id#749, 200), true, [id=#820]
      +- *(3) Project [sensor_id#749, group_id#750, location#751]
         +- BatchScan[sensor_id#749, group_id#750, location#751] Cassandra Scan: spark_playground.sensors
 - Cassandra Filters: []
 - Requested Columns: [sensor_id,group_id,location]

EDIT (30.07.2020)

Alex Ott z Datastax zwrócł mi uwagę na Linkedin, że nie wykonał się tu jednak Direct Join. Okazuje się, że konieczna jest konfiguracja spark.sql.extensions. Więcej szczegółów znajdziesz tutaj.

spark.conf.set("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")

Repozytorium

https://github.com/zorteran/wiadro-danych-spark-cassandra-101 – docker-compose i csv-ki

Podsumowanie

Temat współpracy Sparka i Cassandry został zaledwie poruszony w tym wpisie. Kolejna ciekawa alternatywa i/lub uzupełnienie ekosystemu Hadoop. Sparka możemy wykorzystać do analityki, ale też utrzymania i integracji danych z i do Cassandry. W końcu ciężko o idealny model danych przy wysokiej dynamice wymagań 😉.

Please follow and like us:

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *