Chcemy, aby nasze aplikacje w Apache Spark wykorzystywały wszystkie przydzielone zasoby. Niestety nie jest to takie proste. Rozproszenie obliczeń niesie za sobą koszty zarządzania zadaniami, a same zadania mają wobec siebie zależności. Z jednej strony ogranicza nas CPU (szybkość obliczeń), z drugiej strony dyski i sieć. MapReduce poświęciłem dedykowany materiał wideo. W artykule dowiesz się jak w 2 prostych krokach poprawić utylizację zasobów w Apache Spark.
Prosta aplikacja
Przyjrzyjmy się strukturze typowej aplikacji w Apache Spark.
val students = spark.read .format("...") .option("table", "some_students_table") .load() val filteredStudents = students .filter("...") studentsAfterSomeTransformations.write .format("...") .option("collection", "some_collection") .save()
Najpierw określamy źródło danych, następnie wykonujemy na nim pewne operacje, aby finalnie zapisać wyniki. Wszystkie transformacje w Spark’u są lewnie, więc obliczenia rozpoczną się dopiero przy operacji zapisu.
Nie taka prosta aplikacja
Zerknijmy teraz na szablon trochę bardziej zaawansowanej aplikacji. Załóżmy, że mamy dwa lub więcej źródeł. Dodatkowo dane przekształcamy i agregujemy po różnych encjach.
val students = spark.read .format("...") .option("table", "some_students_table") .load() val groups = spark.read .format("...") .option("table", "some_groups_table") .load() val teachers = spark.read .format("...") .option("table", "some_teachers_table") .load() val filteredStudents = students .filter("...") val filteredGroups = groups .filter("...") val filteredteachers = teachers .filter("...") val allInOne = filteredGroups .join(filteredStudents,...) .join(filteredteachers,...) .transform(...) .transform(...) .transform(...) // allInOne.cache() // OR // allInOne.persist(...) val studentsStats = allInOne .groupBy(...) .agg(...) val groupsStats = allInOne .groupBy(...) .agg(...) val teachersStats = allInOne .groupBy(...) .agg(...) writeToStatsTable(studentsStats, "my_stats_collection") writeToStatsTable(groupsStats, "my_stats_collection") writeToStatsTable(teachersStats, "my_stats_collection") ... def writeToStatsTable(stats: DataFrame, table: String): Unit = { stats.write .format("...") .option("collection", table) .save() }
Sprawa trochę się pokomplikowała. Dane z wielu źródeł są pobierane, złączane i przekształcane. Jest to dobry moment na cache/persist (linia 32-34), aby nie wykonywać tych samych kroków dla każdej operacji zapisu.
Skupmy się na liniach 48-50. Widać tu trochę powtórzeń (copy&paste) gdzie róźni się tylko jedna zmienna. Spróbujmy to trochę poprawić.
Seq( studentsStats, groupsStats, teachersStats ).foreach(writeToStatsTable(_, "my_stats_collection"))
Od razu jakoś tak ładniej 😊. Pod katem wydajności jednak nie ma zmian. Kod wykonywany jest nadal sekwencyjnie przez Driver.
Krok 1 – Parallel Collections
W Scali mamy możliwość zmiany kolekcji sekwencyjnej, na taką obsługiwaną równolegle. W naszym przypadku wystarczy dodać .par
Seq( studentsStats, groupsStats, teachersStats ).par.foreach(writeToStatsTable(_, "my_stats_collection"))
Krok 2 – Zmiana FIFO na FAIR
To jednak nie wystarczy. Domyślnie zadania w Spark’u wykonywane są sekwencyjnie (kolejka FIFO). Szczegóły znajdziesz tutaj. Możemy to zmienić w kodzie lub konfiguracją spark-submit
val conf = new SparkConf().setMaster(...).setAppName(...) conf.set("spark.scheduler.mode", "FAIR") val sc = new SparkContext(conf)
Porównanie wyników
W artykule posługiwałem się czymś w rodzaju pseudo kodu. Poniżej możesz zobaczyć wykres utylizacji CPU dla dwóch aplikacji. W jednej wykorzystałem .par
przy zapisie, reszta kodu jest identyczna.
Screenshot jest z wizualizacji TSVB w Kibana, a do zbierania metryk wykorzystałem Metricbeat. Są to dwie funkcje, jedna z nich ma ustawiony offset tak, aby pokrywał się moment uruchomienia aplikacji Apache Spark. Zadania były odpalane na 4-nodowym klastrze.
Wnioski:
- W czasie 8:30
.par
nadal pracuje, gdzieno .par
robi “przerwę” na około 10 minut. - Widać różnicę w sumarycznym porównania zużycia CPU –
.par
0.589 vsno .par
0.311. - Nie widać tego po wykresie, ale zadanie z
.par
kończy się około 10:07. Potem uruchomione zostało kolejne zadanie z DAG’a w Apache Airflow. Zadanieno .par
zakończyło się dopiero około 10:30. Jest to ponad 15% poprawa czasu wykonania aplikacji.
Podsumowanie
Niewielką zmienną w kodzie i konfiguracji udało się zmniejszyć czas potrzebny na wykonanie aplikacji. Oczywiście wyniki mogą się różnić zaleznie od poziomu skomplikowania aplikacji.
Warto optymalizować aplikacje Apache Spark. Czas to pieniądz, a w świecie chmury publicznej, dosłownie 😁.
Jeden komentarz do “Apache Spark – 2 Kroki do Lepszej Utylizacji Zasobów”