Spark i Elasticsearch? To tak można? elasticsearch-spark

Elasticsearch można lubić lub nie. Fakty są takie, że robi robotę. Razem z Kibana, Logstash i Beats pozwalają w prosty sposób zbierać logi, metryki i przeprowadzać analizy w czasie rzeczywistym. Gdy potrzebujemy więcej, możemy chwycić za inne narzędzia. W tym wpisie przyjrzymy się jak połączyć Apache Spark i Elasticsearch.

Psst! Repo z kodem na dole wpisu! ?

Czemu Spark?

Ostatnio zwrócił moją uwagę HELK, czyli rozwiązanie do analizy cyber oparte na Elastic Stack. Patrząc na jego architekturę, widzimy wykorzystanie Apache Spark oraz biblioteki Graphframes (grafy, użyłem jej w tym wpisie).

Apache Spark to ujednolicony silnik analityczny i choć kojarzy się z HDFS, może korzystać z innych źródeł. Dzięki temu możemy zapisać przetworzone dane do Elasticsearch, jak i odczytać w celu np. utworzenia modelu w MLlib.

Co jest potrzebne?

Potrzebna jest biblioteka elasticsearch-spark. Po ściągnięciu użyłem PySpark poniższą komendą. Mam go spiętego z Jupyter.

pyspark --jars /sciezka/do/jara

Odczyt z Elasticsearch

Po prostu

reader = spark.read.format("org.elasticsearch.spark.sql")\
                .option("es.read.metadata", "true")\
                .option("es.nodes.wan.only","true")\
                .option("es.port","9200")\
                .option("es.net.ssl","false")\
                .option("es.nodes", "http://localhost")\
df = reader.load("just_testing_witcher")

Wszystko proste i czytelne. Metoda load przyjmuje nazwę indesu, którego dane chcemy pobrać.

Z zapytaniem

reader = spark.read.format("org.elasticsearch.spark.sql")\
                .option("es.read.metadata", "true")\
                .option("es.nodes.wan.only","true")\
                .option("es.port","9200")\
                .option("es.net.ssl","false")\
                .option("es.nodes", "http://localhost")\
                .option("es.query", """{"query": { "query_string": { "query": "*ag*" } } }""")
df = reader.load("just_testing_witcher")

Zapytanie jest to opcja es.query. W tym przypadku szukam po wszystkich polach wzorca *ag*, ograniczając wyniki do magów. Umożliwia nam to filtrowanie na poziomie silnika zapytań Elasticsearch, nie kłopocząc tym Sparka.

Zapis do Elasticsearch

Dane możemy również zapisywać. Czy rozwiązanie będzie wsadowe, czy strumieniowe, warto zastanowić się ile danych Twój Elasticsearch jest w stanie odebrać. Ostatecznie, można rozważyć bufor/kolejkę np. Apache Kafka.

Poniżej przykład jak zbiór danych filmów z MovieLens wrzucić na Elasticsearch. Na początku wczytanie csv-ek. Nic nadzwyczajnego

path = "./ml-latest-small/"
movies = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(path + "movies.csv")
ratings = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(path + "ratings.csv")
tags = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(path + "tags.csv")

Widok filmów z listą gatunków i tagów

Załóżmy, że chcemy wykorzystać funkcję „szukania po wszystkim” na filmach z gatunkami i tagami. Gatunki w filmach rozdzieli się metodą split, natomiast tagi złączy metodą collect_set. Utworzenie widoków umożliwi wykonanie JOIN w SQL.

tags_materialized = tags.groupBy('movieId').agg(F.collect_set('tag').alias('tags'))
tags_materialized.createOrReplaceTempView("tags_materialized")

movies_materialized = movies.select(\
                                F.col("movieId"),
                                F.col("title"),
                                F.split(F.col("genres"), '\|').alias("genres")\
                            )
movies_materialized.createOrReplaceTempView("movies_materialized")
movies_complete = spark.sql("""SELECT m.movieId, title, genres, tags
                                FROM movies_materialized m
                                LEFT JOIN tags_materialized t ON m.movieId = t.movieId
                               """)

Zapis

esURL = "localhost"

movies_complete.write\
  .format("org.elasticsearch.spark.sql")\
  .option("es.port","9200")\
  .option("es.net.ssl","false")\
  .option("es.nodes", esURL)\
  .mode("Overwrite")\
  .save("movielens")

Dużo nie ma co wyjaśniać. Metoda save przyjmuje indeks.

Rekord widoczny w Kibana

Uwaga na typy danych

Załóżmy, że chcesz wrzucić tabelę z ocenami (ratings). Zawiera kolumnę timestamp będącą datą wyrażoną w liczbie sekund od 1.01.1970. Gdybyśmy po prostu wrzucili tę tabelę na Elasticsearch, będziemy mieli pole typu numerycznego. Aby w pełni korzystać z okien czasowych i filtrowania, trzeba wcześniej utworzyć indeks z polem typu date i formatem epoch_second. Inne formaty daty znajdziesz tutaj. Do wysłania PUT-a użyłem VS Code i wtyczki REST Client.

@host = http://127.0.0.1:9200

PUT {{host}}/ratings
Content-Type: application/json

{
  "mappings": {
    "properties": {
      "datetime": {
        "type": "date",
        "format": "epoch_second"
      }
    }
  }
}

Step 2 of 2: Configure settings 
You've defined ratings as your index pattern. Now you can specify some settings before we create it. 
Time Filter field name Refresh 
datetime 
The Time Filter will use this field to filter your data by time. 
You can choose not to have a time field, but you will not be able to 
narrow down your data by a time range. 
> Show advanced options
Przy dodawaniu index pattern w Kibana widać zdefiniowany typ z datą
80 
2000-01-09 
2000-01-23 
2000-02-06 
Count 
2000-02-20 
datetime per day
Kibnaa. Liczba wystawianych ocen w czasie 01.2000 – 02.2000

Repo

https://github.com/zorteran/wiadro-danych-spark-elasticsearch

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *