Jednym z podstawowych narzędzi Data Scientist jest Pandas. Niestety nadmiar danych może znacznie utrudnić nam zabawę. Dlatego powstało Koalas. Biblioteka umożliwiająca korzystanie z Apache Spark w taki sposób, jakbyśmy robili to za pomocą Pandas.
Psst! Repozytorium z plikiem Jupyter znajdziesz na końcu wpisu.
Cel
Nie będziemy tutaj dokonywać niesamowitych, przełomowych odkryć. Na danych Movielens sprawdzimy, czy Koalas podoła prostym operacjom. Koalas w wersji 1.0.1, ale wynik może Cię lekko zaskoczyć.
Środowisko
Standardowo wykorzystam tutaj Dockera, a dokładnie kontener jupyter/all-spark-notebook/. Oprócz portu 8888 (jupyter), mapuję też port 4040 (Spark Web UI)
version: '3' services: notebook: image: jupyter/all-spark-notebook ports: - 8888:8888 - 4040:4040 volumes: - ./work:/home/jovyan/work
Pip i Spark Session
import sys !{sys.executable} -m pip install koalas import databricks.koalas as ks import pandas as pd from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()
Przygotowanie Spark DataFrame
movies_df = (spark.read .option("header", "true") .option("inferSchema", "true") .csv("./movies.csv") ) ratings_df = (spark.read .option("header", "true") .option("inferSchema", "true") .csv("./ratings.csv") ) links_df = (spark.read .option("header", "true") .option("inferSchema", "true") .csv("./links.csv") ) tags_df = (spark.read .option("header", "true") .option("inferSchema", "true") .csv("./tags.csv") ) movies_df.registerTempTable("movies") ratings_df.registerTempTable("ratings") links_df.registerTempTable("link") tags_df.registerTempTable("tags")
Przygotowanie Koalas DataFrame
koalas_movies_df = ks.DataFrame(movies_df) koalas_ratings_df = ks.DataFrame(ratings_df) koalas_links_df = ks.DataFrame(links_df) koalas_tags_df = ks.DataFrame(tags_df)

W Spark Web UI widać, że faktycznie pod skorupą Pandas, Koalas napędza Spark.

Top 10 najczęściej ocenianych filmów
Wszystkie rozwiązania są ok. Osobiście najbardziej pasuje mi deklaratywny styl SQL-a.
PySpark
from pyspark.sql.functions import col movies_with_ratings = movies_df.join(ratings_df, movies_df.movieId == ratings_df.movieId) (movies_with_ratings .groupBy("title") .count() .orderBy(col("count").desc()) .show(10, False))
+-----------------------------------------+-----+ |title |count| +-----------------------------------------+-----+ |Forrest Gump (1994) |329 | |Shawshank Redemption, The (1994) |317 | |Pulp Fiction (1994) |307 | |Silence of the Lambs, The (1991) |279 | |Matrix, The (1999) |278 | |Star Wars: Episode IV - A New Hope (1977)|251 | |Jurassic Park (1993) |238 | |Braveheart (1995) |237 | |Terminator 2: Judgment Day (1991) |224 | |Schindler's List (1993) |220 | +-----------------------------------------+-----+ only showing top 10 rows
Koalas/Pandas
koalas_movies_with_ratings = ks.merge(koalas_movies_df, koalas_ratings_df) (koalas_movies_with_ratings .groupby('title') .size() .sort_values(ascending=False)[:10])
title Forrest Gump (1994) 329 Shawshank Redemption, The (1994) 317 Pulp Fiction (1994) 307 Silence of the Lambs, The (1991) 279 Matrix, The (1999) 278 Star Wars: Episode IV - A New Hope (1977) 251 Jurassic Park (1993) 238 Braveheart (1995) 237 Terminator 2: Judgment Day (1991) 224 Schindler's List (1993) 220 Name: count, dtype: int64
Spark SQL
spark.sql(""" SELECT title, COUNT(*) as CNT FROM movies LEFT JOIN ratings ON movies.movieId = ratings.movieID GROUP BY title ORDER BY CNT DESC LIMIT 10 """).show(10,False)
+-----------------------------------------+---+ |title |CNT| +-----------------------------------------+---+ |Forrest Gump (1994) |329| |Shawshank Redemption, The (1994) |317| |Pulp Fiction (1994) |307| |Silence of the Lambs, The (1991) |279| |Matrix, The (1999) |278| |Star Wars: Episode IV - A New Hope (1977)|251| |Jurassic Park (1993) |238| |Braveheart (1995) |237| |Terminator 2: Judgment Day (1991) |224| |Schindler's List (1993) |220| +-----------------------------------------+---+
Top 10 filmów wg średnich ocen (ale tylko takich, które mają >100 ocen)
PySpark
import pyspark.sql.functions as F (movies_with_ratings .groupBy("title") .agg( F.count(F.lit(1)).alias("cnt"), F.avg(col("rating")).alias("avg_rating") ) .filter("cnt > 100") .orderBy(col("avg_rating").desc()) .select("title","avg_rating") .show(10, False))
+-----------------------------------------+-----------------+ |title |avg_rating | +-----------------------------------------+-----------------+ |Shawshank Redemption, The (1994) |4.429022082018927| |Godfather, The (1972) |4.2890625 | |Fight Club (1999) |4.272935779816514| |Godfather: Part II, The (1974) |4.25968992248062 | |Departed, The (2006) |4.252336448598131| |Goodfellas (1990) |4.25 | |Dark Knight, The (2008) |4.238255033557047| |Usual Suspects, The (1995) |4.237745098039215| |Princess Bride, The (1987) |4.232394366197183| |Star Wars: Episode IV - A New Hope (1977)|4.231075697211155| +-----------------------------------------+-----------------+ only showing top 10 rows
Koalas/Pandas
Jak widać poniżej, nie wszystko jest zaimplementowane. Trzeba kombinować ?.
# PandasNotImplementedError: The method `pd.DataFrame.insert()` is not implemented yet. # :-( koalas_top_100 = (koalas_movies_with_ratings .insert(0, 'cnt', 1) .groupby('title') .agg( { 'rating': 'avg', 'cnt': 'size' } ))
Koalas miał problem z wykorzystaniem DataFrame jako parametr operacji na innym DataFrame. Co ciekawe miałem też tak w innym przypadku, gdzie używałem tego samego DataFrame. Tak czy inaczej, musiałem włączyć opcję compute.ops_on_diff_frames.
koalas_top_10 = (koalas_movies_with_ratings .assign(cnt=1) .groupby('title') .agg({ 'rating': 'avg', 'cnt': 'count' }) ) over_100_cnt = koalas_top_10['cnt'] > 100 ks.set_option('compute.ops_on_diff_frames', True) koalas_top_10 = (koalas_top_10[over_100_cnt] .where(koalas_top_10.cnt>100) .sort_values(by='rating', ascending=False) )

Spark SQL
Tutaj wygląda to dziecinnie prosto. Jak dla mnie, SQL znów wygrywa.
spark.sql(""" SELECT title, AVG(rating) as avg_rating FROM movies LEFT JOIN ratings ON movies.movieId = ratings.movieID GROUP BY title HAVING COUNT(*) > 100 ORDER BY avg_rating DESC LIMIT 10 """).show(10,False)
+-----------------------------------------+-----------------+ |title |avg_rating | +-----------------------------------------+-----------------+ |Shawshank Redemption, The (1994) |4.429022082018927| |Godfather, The (1972) |4.2890625 | |Fight Club (1999) |4.272935779816514| |Godfather: Part II, The (1974) |4.25968992248062 | |Departed, The (2006) |4.252336448598131| |Goodfellas (1990) |4.25 | |Dark Knight, The (2008) |4.238255033557047| |Usual Suspects, The (1995) |4.237745098039215| |Princess Bride, The (1987) |4.232394366197183| |Star Wars: Episode IV - A New Hope (1977)|4.231075697211155| +-----------------------------------------+-----------------+
Top 10 lat wg średnich ocen
Nie ma kolumny rok. Trzeba wyciągnąć tę informację z tytułu.
PySpark
(movies_with_ratings .withColumn('year', F.regexp_extract(F.col('title'),'.*\((.*)\).*', 1)) .groupBy("year") .agg( F.avg(col("rating")).alias("avg_rating") ) .orderBy(col("avg_rating").desc()) .show(10, False))
+---------+-----------------+ |year |avg_rating | +---------+-----------------+ |2006–2007|5.0 | |1917 |4.5 | |1930 |4.205882352941177| |1921 |4.1 | |1934 |4.088235294117647| |1944 |4.043478260869565| |1957 |4.03953488372093 | |1954 |4.009191176470588| |1926 |4.0 | |1908 |4.0 | +---------+-----------------+ only showing top 10 rows
Zaraz zaraz, co to ma być?
Pewnie zastanawiasz się, czy operacja withColumn po join jest dobrym pomysłem. Otóż ostatecznie kolejność może być różna. Wywołania w Apache Spark są leniwe (Lazy Evaluation). Zanim ruszy maszyna, wywoływana jest seria optymalizacji (na poziomie zarówno logicznym, jak i fizycznym) która przyśpiesza realizację kodu. Co więcej, w najnowszym Apache Spark 3.0 dodano Dynamic Partition Prunning i Adaptive Query Execution, co czyni Sparka jeszce szybszym. Sprawdźmy, co będzie w tym przypadku.
Wykonanie metody explain na dataframe pokazuje, że join jest wykonywany przed regexp, czyli mój kod jest słaby ?. Regexp przed joinem da to samo, a będzie operował na mniejszym zbiorze danych.
== Physical Plan == *(4) Sort [avg_rating#942 DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(avg_rating#942 DESC NULLS LAST, 200) +- *(3) HashAggregate(keys=[year#924], functions=[avg(rating#472)]) +- Exchange hashpartitioning(year#924, 200) +- *(2) HashAggregate(keys=[year#924], functions=[partial_avg(rating#472)]) +- *(2) Project [rating#472, regexp_extract(title#455, .*\((.*)\).*, 1) AS year#924] +- *(2) BroadcastHashJoin [movieId#454], [movieId#471], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) : +- *(1) Project [movieId#454, title#455] : +- *(1) Filter isnotnull(movieId#454) : +- *(1) FileScan csv [movieId#454,title#455] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/movies.csv], PartitionFilters: [], PushedFilters: [IsNotNull(movieId)], ReadSchema: struct<movieId:int,title:string> +- *(2) Project [movieId#471, rating#472] +- *(2) Filter isnotnull(movieId#471) +- *(2) FileScan csv [movieId#471,rating#472] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/ratings.csv], PartitionFilters: [], PushedFilters: [IsNotNull(movieId)], ReadSchema: struct<movieId:int,rating:double>
Koalas/Pandas
Tutaj niestety znów pojawił się błąd NotImplementedError. Tym razem nie ma metody extract. W tym miejscu postanowiłem przerwać „eksperyment”. Doświadczony „pandziaż” pewnie obszedłby ten problem na 10 innych sposobów. Koalas jest to naprawdę obiecujący projekt, ale jak widać, mogą być z nim problemy.

Matplotlib
To może z innej beczki. Czy wykresiki działają? Ależ owszem.

Konwersja Koalas => Pandas
Konwersja opiera się na wykonaniu metody toPandas()

Nie wszystkie operacje musimy wykonywać na klastrze. Możemy wykroić kawałek tortu i pójśc z nim do swojego domowego warsztatu.
Repozytorium
https://github.com/zorteran/wiadro-danych-koalas-pandas-fun
Podsumowanie
Uważam, że warto obserwować ten projekt. Stoi za nim databricks. Wiedzą, że grono Data Scientists mają swoje przyzwyczajenia i tak łatwo nie przejdą na PySparka.
> toPandas()
kthxbye
?♂️