Apache Spark – 2 Kroki do Lepszej Utylizacji Zasobów

Apache Spark

Chcemy, aby nasze aplikacje w Apache Spark wykorzystywały wszystkie przydzielone zasoby. Niestety nie jest to takie proste. Rozproszenie obliczeń niesie za sobą koszty zarządzania zadaniami, a same zadania mają wobec siebie zależności. Z jednej strony ogranicza nas CPU (szybkość obliczeń), z drugiej strony dyski i sieć. MapReduce poświęciłem dedykowany materiał wideo. W artykule dowiesz się jak w 2 prostych krokach poprawić utylizację zasobów w Apache Spark.

Prosta aplikacja

Przyjrzyjmy się strukturze typowej aplikacji w Apache Spark.

val students = spark.read
    .format("...")
    .option("table", "some_students_table")
    .load()

val filteredStudents = students
    .filter("...")

studentsAfterSomeTransformations.write
    .format("...")
    .option("collection", "some_collection")
    .save()

Najpierw określamy źródło danych, następnie wykonujemy na nim pewne operacje, aby finalnie zapisać wyniki. Wszystkie transformacje w Spark’u są lewnie, więc obliczenia rozpoczną się dopiero przy operacji zapisu.

Nie taka prosta aplikacja

Zerknijmy teraz na szablon trochę bardziej zaawansowanej aplikacji. Załóżmy, że mamy dwa lub więcej źródeł. Dodatkowo dane przekształcamy i agregujemy po różnych encjach.

val students = spark.read
    .format("...")
    .option("table", "some_students_table")
    .load()

val groups = spark.read
    .format("...")
    .option("table", "some_groups_table")
    .load()

val teachers = spark.read
    .format("...")
    .option("table", "some_teachers_table")
    .load()

val filteredStudents = students
    .filter("...")

val filteredGroups = groups
    .filter("...")

val filteredteachers = teachers
    .filter("...")

val allInOne = filteredGroups
    .join(filteredStudents,...)
    .join(filteredteachers,...)
    .transform(...)
    .transform(...)
    .transform(...)

// allInOne.cache()
// OR
// allInOne.persist(...)

val studentsStats = allInOne
    .groupBy(...)
    .agg(...)

val groupsStats = allInOne
    .groupBy(...)
    .agg(...)
    
val teachersStats = allInOne
    .groupBy(...)
    .agg(...)

writeToStatsTable(studentsStats, "my_stats_collection")
writeToStatsTable(groupsStats, "my_stats_collection")
writeToStatsTable(teachersStats, "my_stats_collection")

...

def writeToStatsTable(stats: DataFrame, table: String): Unit = {
    stats.write
        .format("...")
        .option("collection", table)
        .save()
}

Sprawa trochę się pokomplikowała. Dane z wielu źródeł są pobierane, złączane i przekształcane. Jest to dobry moment na cache/persist (linia 32-34), aby nie wykonywać tych samych kroków dla każdej operacji zapisu.

Skupmy się na liniach 48-50. Widać tu trochę powtórzeń (copy&paste) gdzie róźni się tylko jedna zmienna. Spróbujmy to trochę poprawić.

Seq(
  studentsStats,
  groupsStats,
  teachersStats
).foreach(writeToStatsTable(_, "my_stats_collection"))

Od razu jakoś tak ładniej . Pod katem wydajności jednak nie ma zmian. Kod wykonywany jest nadal sekwencyjnie przez Driver.

Krok 1 – Parallel Collections

W Scali mamy możliwość zmiany kolekcji sekwencyjnej, na taką obsługiwaną równolegle. W naszym przypadku wystarczy dodać .par

Seq(
  studentsStats,
  groupsStats,
  teachersStats
).par.foreach(writeToStatsTable(_, "my_stats_collection"))

Krok 2 – Zmiana FIFO na FAIR

To jednak nie wystarczy. Domyślnie zadania w Spark’u wykonywane są sekwencyjnie (kolejka FIFO). Szczegóły znajdziesz tutaj. Możemy to zmienić w kodzie lub konfiguracją spark-submit

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

Porównanie wyników

W artykule posługiwałem się czymś w rodzaju pseudo kodu. Poniżej możesz zobaczyć wykres utylizacji CPU dla dwóch aplikacji. W jednej wykorzystałem .par przy zapisie, reszta kodu jest identyczna.

Screenshot jest z wizualizacji TSVB w Kibana, a do zbierania metryk wykorzystałem Metricbeat. Są to dwie funkcje, jedna z nich ma ustawiony offset tak, aby pokrywał się moment uruchomienia aplikacji Apache Spark. Zadania były odpalane na 4-nodowym klastrze.

Wnioski:

  • W czasie 8:30 .par nadal pracuje, gdzie no .par robi „przerwę” na około 10 minut.
  • Widać różnicę w sumarycznym porównania zużycia CPU – .par 0.589 vs no .par 0.311.
  • Nie widać tego po wykresie, ale zadanie z .par kończy się około 10:07. Potem uruchomione zostało kolejne zadanie z DAG’a w Apache Airflow. Zadanie no .par zakończyło się dopiero około 10:30. Jest to ponad 15% poprawa czasu wykonania aplikacji.

Podsumowanie

Niewielką zmienną w kodzie i konfiguracji udało się zmniejszyć czas potrzebny na wykonanie aplikacji. Oczywiście wyniki mogą się różnić zaleznie od poziomu skomplikowania aplikacji.

Warto optymalizować aplikacje Apache Spark. Czas to pieniądz, a w świecie chmury publicznej, dosłownie .

Jeden komentarz do “Apache Spark – 2 Kroki do Lepszej Utylizacji Zasobów”

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *