Jupyter i Apache Zeppelin to dobre miejsce na eksperymentowanie z danymi. Niestety, specyfika notebook’ów nie zachęca do organizacji kodu, a w tym jego dekompozycji i czytelności. Możemy przekopiować komórki do Intellij IDEA i zbudować JAR’a, ale efekt będzie taki sobie. W artykule dowiesz się jak napisać czytelny kod Scala Apache Spark w Intellij IDEA.
0. Kod bazowy
Jest to prosta aplikacja która:
- pobiera produktów spożywczych dane z pliku
- filtruje owoce
- normalizuje nazwy
- wylicza sumę poszczególnych owoców
val spark = SparkSession
.builder
.appName("MyAwesomeApp")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val groceries = spark.read
.option("inferSchema", "true")
.option("header", "true")
.csv("some-data.csv")
val sumOfFruits = groceries
.filter($"type" === "fruit")
.withColumn("normalized_name", lower($"name"))
.groupBy("normalized_name")
.agg(
sum(($"quantity")).as("sum")
)
val fruits = groceries.filter($"type" === "fruit")
val normalizedFruits = fruits.withColumn("normalized_name", lower($"name"))
val sumOfFruits = normalizedFruits
.groupBy("normalized_name")
.agg(
sum(($"quantity")).as("sum")
)
sumOfFruits.show()
1. Extract Methods
Wykorzystajmy moc IDE, a dokładniej operację refaktoryzacji Extract Method. Pozwala w prosty sposób utworzyć metodę z zaznaczonego fragmentu kodu. Tym sposobem spróbujmy utworzyć metody odpowiadające kolejnym krokom w aplikacji.
Ale jak to nie działa?
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("MyAwesomeApp")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val groceries: DataFrame = getGroceries
val fruits: Dataset[Row] = filterFruits(groceries)
val normalizedFruits: DataFrame = withNormalizedName(fruits)
val sumOfFruits: DataFrame = sumByNormalizedName(normalizedFruits)
sumOfFruits.show()
}
private def sumByNormalizedName(normalizedFruits: DataFrame) = {
val sumOfFruits = normalizedFruits
.groupBy("normalized_name")
.agg(
sum(($"quantity")).as("sum")
)
sumOfFruits
}
private def withNormalizedName(fruits: Dataset[Row]) = {
val normalizedFruits = fruits.withColumn("normalized_name", lower($"name"))
normalizedFruits
}
private def filterFruits(groceries: DataFrame) = {
val fruits = groceries.filter($"type" === "fruit")
fruits
}
private def getGroceries: DataFrame = {
val groceries = spark.read
.option("inferSchema","true")
.option("header","true")
.csv("some-data.csv")
groceries
}
Kod w metodzie main jest już bardziej czytelny… ale ten kod nie ma prawa działać. Chcemy w metodach korzystać z Spark Session i spark.implicits._. Niestety wartości tych nie ma w zakresach metod.

2. Przedawkowanie Spark Session
Możemy to naprawić przekazując SparkSession w każdej metodzie. Jest to niestety upierdliwe. Musimy również za każdym razem importować spark.implicits._. Moje wrodzone lenistwo na to nie pozwala 😁.
private def sumByNormalizedName(normalizedFruits: DataFrame, spark: SparkSession) = {
import spark.implicits._
val sumOfFruits = normalizedFruits
.groupBy("normalized_name")
.agg(
sum(($"quantity")).as("sum")
)
sumOfFruits
}
private def withNormalizedName(fruits: Dataset[Row], spark: SparkSession) = {
import spark.implicits._
val normalizedFruits = fruits.withColumn("normalized_name", lower($"name"))
normalizedFruits
}
private def filterFruits(groceries: DataFrame, spark: SparkSession) = {
import spark.implicits._
val fruits = groceries.filter($"type" === "fruit")
fruits
}
private def getGroceries(spark: SparkSession): DataFrame = {
val groceries = spark.read
.option("inferSchema","true")
.option("header","true")
.csv("some-data.csv")
groceries
}
3. Wyciągnięcie SparkSession
Musimy zapewnić dostęp do SparkSession w trochę inny sposób. Pomoże nam w tym obiekt SparkJob.
package pl.wiadrodanych.demo.base
import org.apache.spark.sql.SparkSession
trait SparkJob {
val spark: SparkSession = SparkSession
.builder
.appName("SomeApp")
.master("local[*]")
.getOrCreate()
}
object SparkJob extends SparkJob {}
Teraz możemy zaimportować SparkJob i spark.implicits._ w aplikacji. Kod wygląda lepiej. Metod możemy używać wielokrotnie.
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import pl.wiadrodanych.demo.base.SparkJob
import pl.wiadrodanych.demo.base.SparkJob.spark.implicits._
object NiceApp {
val spark = SparkJob.spark
def main(args: Array[String]) = {
val groceries: DataFrame = getGroceries
val fruits: Dataset[Row] = filterFruits(groceries)
val normalizedFruits: DataFrame = addNormalizedNameColumn(fruits)
val sumOfFruits: DataFrame = sumByNormalizedName(normalizedFruits)
sumOfFruits.show()
}
private def sumByNormalizedName(normalizedFruits: DataFrame) = {
val sumOfFruits = normalizedFruits
.groupBy("normalized_name")
.agg(
sum(($"quantity")).as("sum")
)
sumOfFruits
}
private def addNormalizedNameColumn(fruits: Dataset[Row]) = {
val normalizedFruits = fruits.withColumn("normalized_name", lower($"name"))
normalizedFruits
}
private def filterFruits(groceries: DataFrame) = {
val fruits = groceries.filter($"type" === "fruit")
fruits
}
private def getGroceries: DataFrame = {
val groceries = spark.read
.option("inferSchema", "true")
.option("header", "true")
.csv("some-data.csv")
groceries
}
4. Implicit class / Extension method
Napisałem sporo kodu C# w swoim życiu. Ciekawą i przydatną konstrukcją jest Extension Method. Pozwala na “dodanie” metod do istniejącego typu/klasy bez jej modyfikacji. Poniżej przykład. Zamiast pisać
int numberA = 1
int numberB = 2
val sum = Sum(numberA, numberB)
...
public int Sum(int numberA, int numberB)
{
return numberA + numberB
}
Możemy napisać
int numberA = 1
int numberB = 2
val sum = numberA.Add(numberB)
...
public static int Add(this int numberA, int numberB)
{
return numberA + numberB
}
Różnicę czytelności widać w poniższym przykładzie
Sum(A, Sum(B, Sum(C,Sum (D,...)))) // VS A.Add(B).Add(C).Add(D)...
W Scali możemy uzyskać podobny mechanizm stosując Implicit class. Poniżej przekształcona logika omawianej aplikacji Apache Spark.
package pl.wiadrodanych.demo.extensions
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import pl.wiadrodanych.demo.base.SparkJob.spark.implicits._
object GroceryDataFrameExtensions {
implicit class RichDataFrame(df: DataFrame) {
def sumByNormalizedName: DataFrame = {
val sumOfFruits = df
.groupBy("normalized_name")
.agg(
sum(($"quantity")).as("sum")
)
sumOfFruits
}
def addNormalizedNameColumn: DataFrame = {
val normalizedFruits = df.withColumn("normalized_name", lower($"name"))
normalizedFruits
}
def filterFruits: DataFrame = {
val fruits = df.filter($"type" === "fruit")
fruits
}
}
}
Logika aplikacji przeniosła się do innego obiektu, a kod możemy czytać jak prozę.
package pl.wiadrodanych.demo
import org.apache.spark.sql.DataFrame
import pl.wiadrodanych.demo.NiceApp.spark
import pl.wiadrodanych.demo.extensions.GroceryDataFrameExtensions._
object CoolApp {
def main(args: Array[String]) = {
val result = getGroceries
.filterFruits
.addNormalizedNameColumn
.sumByNormalizedName
result.show
}
private def getGroceries: DataFrame = {
val groceries = spark.read
.option("inferSchema", "true")
.option("header", "true")
.csv("some-data.csv")
groceries
}
}
Wróćmy do tego co miała robić aplikacja:
- pobiera produktów spożywczych dane z pliku
- filtruje owoce
- normalizuje nazwy
- wylicza sumę poszczególnych owoców
Może nie słowo w słowo, ale podobnie 😁.
EDIT: Dataset transform
O ile poprzedni sposób jest fajny, czasami może wprowadzać w błąd. Aby oddzielić kod biznesowy od klasy bazowej, możemy użyć Dataset.transform. Szczegóły znajdziesz w tym artykule od MungingData.
Repozytorium
https://github.com/zorteran/wiadro-danych-readable-scala-apache-spark
Proszę, podziel się w komentarzu co o tym sądzisz. Jaki jest Twój sposób na czytelność i złożoność kodu?

