Czytelny Kod Scala w Apache Spark (4 podejścia)

readable-scala-spark-code

Jupyter i Apache Zeppelin to dobre miejsce na eksperymentowanie z danymi. Niestety, specyfika notebook’ów nie zachęca do organizacji kodu, a w tym jego dekompozycji i czytelności. Możemy przekopiować komórki do Intellij IDEA i zbudować JAR’a, ale efekt będzie taki sobie. W artykule dowiesz się jak napisać czytelny kod Scala Apache Spark w Intellij IDEA.

0. Kod bazowy

Jest to prosta aplikacja która:

  • pobiera produktów spożywczych dane z pliku
  • filtruje owoce
  • normalizuje nazwy
  • wylicza sumę poszczególnych owoców
    val spark = SparkSession
      .builder
      .appName("MyAwesomeApp")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val groceries = spark.read
      .option("inferSchema", "true")
      .option("header", "true")
      .csv("some-data.csv")

    val sumOfFruits = groceries
      .filter($"type" === "fruit")
      .withColumn("normalized_name", lower($"name"))
      .groupBy("normalized_name")
      .agg(
        sum(($"quantity")).as("sum")
      )

    val fruits = groceries.filter($"type" === "fruit")

    val normalizedFruits = fruits.withColumn("normalized_name", lower($"name"))

    val sumOfFruits = normalizedFruits
      .groupBy("normalized_name")
      .agg(
        sum(($"quantity")).as("sum")
      )

    sumOfFruits.show()

1. Extract Methods

Wykorzystajmy moc IDE, a dokładniej operację refaktoryzacji Extract Method. Pozwala w prosty sposób utworzyć metodę z zaznaczonego fragmentu kodu. Tym sposobem spróbujmy utworzyć metody odpowiadające kolejnym krokom w aplikacji.

Ale jak to nie działa?

def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .appName("MyAwesomeApp")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val groceries: DataFrame = getGroceries
    val fruits: Dataset[Row] = filterFruits(groceries)
    val normalizedFruits: DataFrame = withNormalizedName(fruits)
    val sumOfFruits: DataFrame = sumByNormalizedName(normalizedFruits)

    sumOfFruits.show()
  }

private def sumByNormalizedName(normalizedFruits: DataFrame) = {
    val sumOfFruits = normalizedFruits
      .groupBy("normalized_name")
      .agg(
        sum(($"quantity")).as("sum")
      )
    sumOfFruits
  }

private def withNormalizedName(fruits: Dataset[Row]) = {
    val normalizedFruits = fruits.withColumn("normalized_name", lower($"name"))
    normalizedFruits
  }

private def filterFruits(groceries: DataFrame) = {
    val fruits = groceries.filter($"type" === "fruit")
    fruits
  }

private def getGroceries: DataFrame = {

  val groceries = spark.read
    .option("inferSchema","true")
    .option("header","true")
    .csv("some-data.csv")
    groceries
  }

Kod w metodzie main jest już bardziej czytelny… ale ten kod nie ma prawa działać. Chcemy w metodach korzystać z Spark Session i spark.implicits._. Niestety wartości tych nie ma w zakresach metod.

2. Przedawkowanie Spark Session

Możemy to naprawić przekazując SparkSession w każdej metodzie. Jest to niestety upierdliwe. Musimy również za każdym razem importować spark.implicits._. Moje wrodzone lenistwo na to nie pozwala 😁.

private def sumByNormalizedName(normalizedFruits: DataFrame, spark: SparkSession) = {
    import spark.implicits._
    val sumOfFruits = normalizedFruits
      .groupBy("normalized_name")
      .agg(
        sum(($"quantity")).as("sum")
      )
    sumOfFruits
  }

private def withNormalizedName(fruits: Dataset[Row], spark: SparkSession) = {
    import spark.implicits._
    val normalizedFruits = fruits.withColumn("normalized_name", lower($"name"))
    normalizedFruits
  }

private def filterFruits(groceries: DataFrame, spark: SparkSession) = {
    import spark.implicits._
    val fruits = groceries.filter($"type" === "fruit")
    fruits
  }

private def getGroceries(spark: SparkSession): DataFrame  = {
    val groceries = spark.read
      .option("inferSchema","true")
      .option("header","true")
      .csv("some-data.csv")
    groceries
  }

3. Wyciągnięcie SparkSession

Musimy zapewnić dostęp do SparkSession w trochę inny sposób. Pomoże nam w tym obiekt SparkJob.

package pl.wiadrodanych.demo.base

import org.apache.spark.sql.SparkSession

trait SparkJob {
  val spark: SparkSession = SparkSession
    .builder
    .appName("SomeApp")
    .master("local[*]")
    .getOrCreate()
}

object SparkJob extends SparkJob {}

Teraz możemy zaimportować SparkJob i spark.implicits._ w aplikacji. Kod wygląda lepiej. Metod możemy używać wielokrotnie.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import pl.wiadrodanych.demo.base.SparkJob
import pl.wiadrodanych.demo.base.SparkJob.spark.implicits._

object NiceApp {
  val spark = SparkJob.spark

  def main(args: Array[String]) = {
    val groceries: DataFrame = getGroceries
    val fruits: Dataset[Row] = filterFruits(groceries)
    val normalizedFruits: DataFrame = addNormalizedNameColumn(fruits)
    val sumOfFruits: DataFrame = sumByNormalizedName(normalizedFruits)
    sumOfFruits.show()
  }

  private def sumByNormalizedName(normalizedFruits: DataFrame) = {
    val sumOfFruits = normalizedFruits
      .groupBy("normalized_name")
      .agg(
        sum(($"quantity")).as("sum")
      )
    sumOfFruits
  }

  private def addNormalizedNameColumn(fruits: Dataset[Row]) = {
    val normalizedFruits = fruits.withColumn("normalized_name", lower($"name"))
    normalizedFruits
  }

  private def filterFruits(groceries: DataFrame) = {
    val fruits = groceries.filter($"type" === "fruit")
    fruits
  }

  private def getGroceries: DataFrame = {
    val groceries = spark.read
      .option("inferSchema", "true")
      .option("header", "true")
      .csv("some-data.csv")
    groceries
  }

4. Implicit class / Extension method

Napisałem sporo kodu C# w swoim życiu. Ciekawą i przydatną konstrukcją jest Extension Method. Pozwala na „dodanie” metod do istniejącego typu/klasy bez jej modyfikacji. Poniżej przykład. Zamiast pisać

int numberA = 1
int numberB = 2
val sum = Sum(numberA, numberB)
...
public int Sum(int numberA, int numberB)
{
    return numberA + numberB
}

Możemy napisać

int numberA = 1
int numberB = 2
val sum = numberA.Add(numberB)
...
public static int Add(this int numberA, int numberB)
{
    return numberA + numberB
}

Różnicę czytelności widać w poniższym przykładzie

Sum(A, Sum(B, Sum(C,Sum (D,...))))
// VS
A.Add(B).Add(C).Add(D)...

W Scali możemy uzyskać podobny mechanizm stosując Implicit class. Poniżej przekształcona logika omawianej aplikacji Apache Spark.

package pl.wiadrodanych.demo.extensions

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import pl.wiadrodanych.demo.base.SparkJob.spark.implicits._

object GroceryDataFrameExtensions {

  implicit class RichDataFrame(df: DataFrame) {

    def sumByNormalizedName: DataFrame = {
      val sumOfFruits = df
        .groupBy("normalized_name")
        .agg(
          sum(($"quantity")).as("sum")
        )
      sumOfFruits
    }

    def addNormalizedNameColumn: DataFrame = {
      val normalizedFruits = df.withColumn("normalized_name", lower($"name"))
      normalizedFruits
    }

    def filterFruits: DataFrame = {
      val fruits = df.filter($"type" === "fruit")
      fruits
    }
  }

}

Logika aplikacji przeniosła się do innego obiektu, a kod możemy czytać jak prozę.

package pl.wiadrodanych.demo

import org.apache.spark.sql.DataFrame
import pl.wiadrodanych.demo.NiceApp.spark
import pl.wiadrodanych.demo.extensions.GroceryDataFrameExtensions._

object CoolApp {
  def main(args: Array[String]) = {
    val result = getGroceries
      .filterFruits
      .addNormalizedNameColumn
      .sumByNormalizedName

    result.show
  }

  private def getGroceries: DataFrame = {
    val groceries = spark.read
      .option("inferSchema", "true")
      .option("header", "true")
      .csv("some-data.csv")
    groceries
  }
}

Wróćmy do tego co miała robić aplikacja:

  • pobiera produktów spożywczych dane z pliku
  • filtruje owoce
  • normalizuje nazwy
  • wylicza sumę poszczególnych owoców

Może nie słowo w słowo, ale podobnie 😁.

EDIT: Dataset transform

O ile poprzedni sposób jest fajny, czasami może wprowadzać w błąd. Aby oddzielić kod biznesowy od klasy bazowej, możemy użyć Dataset.transform. Szczegóły znajdziesz w tym artykule od MungingData.

Repozytorium

https://github.com/zorteran/wiadro-danych-readable-scala-apache-spark

Proszę, podziel się w komentarzu co o tym sądzisz. Jaki jest Twój sposób na czytelność i złożoność kodu?

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *