Elasticsearch można lubić lub nie. Fakty są takie, że robi robotę. Razem z Kibana, Logstash i Beats pozwalają w prosty sposób zbierać logi, metryki i przeprowadzać analizy w czasie rzeczywistym. Gdy potrzebujemy więcej, możemy chwycić za inne narzędzia. W tym wpisie przyjrzymy się jak połączyć Apache Spark i Elasticsearch.
Psst! Repo z kodem na dole wpisu! ?
Czemu Spark?
Ostatnio zwrócił moją uwagę HELK, czyli rozwiązanie do analizy cyber oparte na Elastic Stack. Patrząc na jego architekturę, widzimy wykorzystanie Apache Spark oraz biblioteki Graphframes (grafy, użyłem jej w tym wpisie).
Apache Spark to ujednolicony silnik analityczny i choć kojarzy się z HDFS, może korzystać z innych źródeł. Dzięki temu możemy zapisać przetworzone dane do Elasticsearch, jak i odczytać w celu np. utworzenia modelu w MLlib.
Co jest potrzebne?
Potrzebna jest biblioteka elasticsearch-spark. Po ściągnięciu użyłem PySpark poniższą komendą. Mam go spiętego z Jupyter.
pyspark --jars /sciezka/do/jara
Odczyt z Elasticsearch
Po prostu
reader = spark.read.format("org.elasticsearch.spark.sql")\
.option("es.read.metadata", "true")\
.option("es.nodes.wan.only","true")\
.option("es.port","9200")\
.option("es.net.ssl","false")\
.option("es.nodes", "http://localhost")\
df = reader.load("just_testing_witcher")
Wszystko proste i czytelne. Metoda load przyjmuje nazwę indesu, którego dane chcemy pobrać.

Z zapytaniem
reader = spark.read.format("org.elasticsearch.spark.sql")\
.option("es.read.metadata", "true")\
.option("es.nodes.wan.only","true")\
.option("es.port","9200")\
.option("es.net.ssl","false")\
.option("es.nodes", "http://localhost")\
.option("es.query", """{"query": { "query_string": { "query": "*ag*" } } }""")
df = reader.load("just_testing_witcher")
Zapytanie jest to opcja es.query. W tym przypadku szukam po wszystkich polach wzorca *ag*, ograniczając wyniki do magów. Umożliwia nam to filtrowanie na poziomie silnika zapytań Elasticsearch, nie kłopocząc tym Sparka.

Zapis do Elasticsearch
Dane możemy również zapisywać. Czy rozwiązanie będzie wsadowe, czy strumieniowe, warto zastanowić się ile danych Twój Elasticsearch jest w stanie odebrać. Ostatecznie, można rozważyć bufor/kolejkę np. Apache Kafka.
Poniżej przykład jak zbiór danych filmów z MovieLens wrzucić na Elasticsearch. Na początku wczytanie csv-ek. Nic nadzwyczajnego
path = "./ml-latest-small/"
movies = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(path + "movies.csv")
ratings = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(path + "ratings.csv")
tags = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(path + "tags.csv")

Widok filmów z listą gatunków i tagów
Załóżmy, że chcemy wykorzystać funkcję “szukania po wszystkim” na filmach z gatunkami i tagami. Gatunki w filmach rozdzieli się metodą split, natomiast tagi złączy metodą collect_set. Utworzenie widoków umożliwi wykonanie JOIN w SQL.
tags_materialized = tags.groupBy('movieId').agg(F.collect_set('tag').alias('tags'))
tags_materialized.createOrReplaceTempView("tags_materialized")
movies_materialized = movies.select(\
F.col("movieId"),
F.col("title"),
F.split(F.col("genres"), '\|').alias("genres")\
)
movies_materialized.createOrReplaceTempView("movies_materialized")


movies_complete = spark.sql("""SELECT m.movieId, title, genres, tags
FROM movies_materialized m
LEFT JOIN tags_materialized t ON m.movieId = t.movieId
""")

Zapis
esURL = "localhost"
movies_complete.write\
.format("org.elasticsearch.spark.sql")\
.option("es.port","9200")\
.option("es.net.ssl","false")\
.option("es.nodes", esURL)\
.mode("Overwrite")\
.save("movielens")
Dużo nie ma co wyjaśniać. Metoda save przyjmuje indeks.

Uwaga na typy danych
Załóżmy, że chcesz wrzucić tabelę z ocenami (ratings). Zawiera kolumnę timestamp będącą datą wyrażoną w liczbie sekund od 1.01.1970. Gdybyśmy po prostu wrzucili tę tabelę na Elasticsearch, będziemy mieli pole typu numerycznego. Aby w pełni korzystać z okien czasowych i filtrowania, trzeba wcześniej utworzyć indeks z polem typu date i formatem epoch_second. Inne formaty daty znajdziesz tutaj. Do wysłania PUT-a użyłem VS Code i wtyczki REST Client.
@host = http://127.0.0.1:9200
PUT {{host}}/ratings
Content-Type: application/json
{
"mappings": {
"properties": {
"datetime": {
"type": "date",
"format": "epoch_second"
}
}
}
}


Repo
https://github.com/zorteran/wiadro-danych-spark-elasticsearch
Jeden komentarz do “Spark i Elasticsearch? To tak można? elasticsearch-spark”