Jupyter i Apache Zeppelin to dobre miejsce na eksperymentowanie z danymi. Niestety, specyfika notebook’ów nie zachęca do organizacji kodu, a w tym jego dekompozycji i czytelności. Możemy przekopiować komórki do Intellij IDEA i zbudować JAR’a, ale efekt będzie taki sobie. W artykule dowiesz się jak napisać czytelny kod Scala Apache Spark w Intellij IDEA.
0. Kod bazowy
Jest to prosta aplikacja która:
- pobiera produktów spożywczych dane z pliku
- filtruje owoce
- normalizuje nazwy
- wylicza sumę poszczególnych owoców
val spark = SparkSession .builder .appName("MyAwesomeApp") .master("local[*]") .getOrCreate() import spark.implicits._ val groceries = spark.read .option("inferSchema", "true") .option("header", "true") .csv("some-data.csv") val sumOfFruits = groceries .filter($"type" === "fruit") .withColumn("normalized_name", lower($"name")) .groupBy("normalized_name") .agg( sum(($"quantity")).as("sum") ) val fruits = groceries.filter($"type" === "fruit") val normalizedFruits = fruits.withColumn("normalized_name", lower($"name")) val sumOfFruits = normalizedFruits .groupBy("normalized_name") .agg( sum(($"quantity")).as("sum") ) sumOfFruits.show()
1. Extract Methods
Wykorzystajmy moc IDE, a dokładniej operację refaktoryzacji Extract Method. Pozwala w prosty sposób utworzyć metodę z zaznaczonego fragmentu kodu. Tym sposobem spróbujmy utworzyć metody odpowiadające kolejnym krokom w aplikacji.
Ale jak to nie działa?
def main(args: Array[String]) { val spark = SparkSession .builder .appName("MyAwesomeApp") .master("local[*]") .getOrCreate() import spark.implicits._ val groceries: DataFrame = getGroceries val fruits: Dataset[Row] = filterFruits(groceries) val normalizedFruits: DataFrame = withNormalizedName(fruits) val sumOfFruits: DataFrame = sumByNormalizedName(normalizedFruits) sumOfFruits.show() } private def sumByNormalizedName(normalizedFruits: DataFrame) = { val sumOfFruits = normalizedFruits .groupBy("normalized_name") .agg( sum(($"quantity")).as("sum") ) sumOfFruits } private def withNormalizedName(fruits: Dataset[Row]) = { val normalizedFruits = fruits.withColumn("normalized_name", lower($"name")) normalizedFruits } private def filterFruits(groceries: DataFrame) = { val fruits = groceries.filter($"type" === "fruit") fruits } private def getGroceries: DataFrame = { val groceries = spark.read .option("inferSchema","true") .option("header","true") .csv("some-data.csv") groceries }
Kod w metodzie main
jest już bardziej czytelny… ale ten kod nie ma prawa działać. Chcemy w metodach korzystać z Spark Session
i spark.implicits._
. Niestety wartości tych nie ma w zakresach metod.

2. Przedawkowanie Spark Session
Możemy to naprawić przekazując SparkSession
w każdej metodzie. Jest to niestety upierdliwe. Musimy również za każdym razem importować spark.implicits._
. Moje wrodzone lenistwo na to nie pozwala 😁.
private def sumByNormalizedName(normalizedFruits: DataFrame, spark: SparkSession) = { import spark.implicits._ val sumOfFruits = normalizedFruits .groupBy("normalized_name") .agg( sum(($"quantity")).as("sum") ) sumOfFruits } private def withNormalizedName(fruits: Dataset[Row], spark: SparkSession) = { import spark.implicits._ val normalizedFruits = fruits.withColumn("normalized_name", lower($"name")) normalizedFruits } private def filterFruits(groceries: DataFrame, spark: SparkSession) = { import spark.implicits._ val fruits = groceries.filter($"type" === "fruit") fruits } private def getGroceries(spark: SparkSession): DataFrame = { val groceries = spark.read .option("inferSchema","true") .option("header","true") .csv("some-data.csv") groceries }
3. Wyciągnięcie SparkSession
Musimy zapewnić dostęp do SparkSession
w trochę inny sposób. Pomoże nam w tym obiekt SparkJob
.
package pl.wiadrodanych.demo.base import org.apache.spark.sql.SparkSession trait SparkJob { val spark: SparkSession = SparkSession .builder .appName("SomeApp") .master("local[*]") .getOrCreate() } object SparkJob extends SparkJob {}
Teraz możemy zaimportować SparkJob
i spark.implicits._
w aplikacji. Kod wygląda lepiej. Metod możemy używać wielokrotnie.
import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import pl.wiadrodanych.demo.base.SparkJob import pl.wiadrodanych.demo.base.SparkJob.spark.implicits._ object NiceApp { val spark = SparkJob.spark def main(args: Array[String]) = { val groceries: DataFrame = getGroceries val fruits: Dataset[Row] = filterFruits(groceries) val normalizedFruits: DataFrame = addNormalizedNameColumn(fruits) val sumOfFruits: DataFrame = sumByNormalizedName(normalizedFruits) sumOfFruits.show() } private def sumByNormalizedName(normalizedFruits: DataFrame) = { val sumOfFruits = normalizedFruits .groupBy("normalized_name") .agg( sum(($"quantity")).as("sum") ) sumOfFruits } private def addNormalizedNameColumn(fruits: Dataset[Row]) = { val normalizedFruits = fruits.withColumn("normalized_name", lower($"name")) normalizedFruits } private def filterFruits(groceries: DataFrame) = { val fruits = groceries.filter($"type" === "fruit") fruits } private def getGroceries: DataFrame = { val groceries = spark.read .option("inferSchema", "true") .option("header", "true") .csv("some-data.csv") groceries }
4. Implicit class / Extension method
Napisałem sporo kodu C# w swoim życiu. Ciekawą i przydatną konstrukcją jest Extension Method. Pozwala na “dodanie” metod do istniejącego typu/klasy bez jej modyfikacji. Poniżej przykład. Zamiast pisać
int numberA = 1 int numberB = 2 val sum = Sum(numberA, numberB) ... public int Sum(int numberA, int numberB) { return numberA + numberB }
Możemy napisać
int numberA = 1 int numberB = 2 val sum = numberA.Add(numberB) ... public static int Add(this int numberA, int numberB) { return numberA + numberB }
Różnicę czytelności widać w poniższym przykładzie
Sum(A, Sum(B, Sum(C,Sum (D,...)))) // VS A.Add(B).Add(C).Add(D)...
W Scali możemy uzyskać podobny mechanizm stosując Implicit class
. Poniżej przekształcona logika omawianej aplikacji Apache Spark.
package pl.wiadrodanych.demo.extensions import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import pl.wiadrodanych.demo.base.SparkJob.spark.implicits._ object GroceryDataFrameExtensions { implicit class RichDataFrame(df: DataFrame) { def sumByNormalizedName: DataFrame = { val sumOfFruits = df .groupBy("normalized_name") .agg( sum(($"quantity")).as("sum") ) sumOfFruits } def addNormalizedNameColumn: DataFrame = { val normalizedFruits = df.withColumn("normalized_name", lower($"name")) normalizedFruits } def filterFruits: DataFrame = { val fruits = df.filter($"type" === "fruit") fruits } } }
Logika aplikacji przeniosła się do innego obiektu, a kod możemy czytać jak prozę.
package pl.wiadrodanych.demo import org.apache.spark.sql.DataFrame import pl.wiadrodanych.demo.NiceApp.spark import pl.wiadrodanych.demo.extensions.GroceryDataFrameExtensions._ object CoolApp { def main(args: Array[String]) = { val result = getGroceries .filterFruits .addNormalizedNameColumn .sumByNormalizedName result.show } private def getGroceries: DataFrame = { val groceries = spark.read .option("inferSchema", "true") .option("header", "true") .csv("some-data.csv") groceries } }
Wróćmy do tego co miała robić aplikacja:
- pobiera produktów spożywczych dane z pliku
- filtruje owoce
- normalizuje nazwy
- wylicza sumę poszczególnych owoców
Może nie słowo w słowo, ale podobnie 😁.
EDIT: Dataset transform
O ile poprzedni sposób jest fajny, czasami może wprowadzać w błąd. Aby oddzielić kod biznesowy od klasy bazowej, możemy użyć Dataset.transform. Szczegóły znajdziesz w tym artykule od MungingData.
Repozytorium
https://github.com/zorteran/wiadro-danych-readable-scala-apache-spark
Proszę, podziel się w komentarzu co o tym sądzisz. Jaki jest Twój sposób na czytelność i złożoność kodu?