Tym razem do piaskownicy wkracza Apache Spark. Zajmiemy się prostym, ale pokaźnym zbiorem maili i haseł z różnych wycieków danych. W tym przykładzie będę używał DataFrames w Spark 2.0. Środowisko na którym działałem to HDInsight na Azure.
Jest to jeden z artykułów dotyczących tego zbioru danych:
- Oczyszczenie i wykonanie prostych agregacji (tutaj jesteś)
- Porównanie formatów ORC, Parquet, JSON, CSV
- Osobiste haveibeenpwned – partycjonowanie danych
Dane
Danych do przetworzenia jest 41,1 GB. Maile i hasła zapisane są w CSV-kach dzielonych dwukropkiem, w folderach partycjonowanych wg. pierwszego znaku wiersza. Łącznie jest to 1 400 553 869 rekordów.
Niestety zdarzają się zamienione pary, czyli mail jest w miejscu hasła. Mamy też do czynienia z dużą ilością małych plików (HDFS za tym nie przepada).
Przetworzenie pojedynczego pliku
Klasycznie, tak samo jak w Jak zostać Ironmanem? Analiza CSV-ek w pandas, najpierw zajmiemy się pojedynczym plikiem. Szkoda czasu na mielenie za każdym razem ponad 40 GB danych.
val path = "/public/BreachCompilation/data/0/0"
var breach = spark.read
.option("charset", "UTF-8")
.csv(path)
Dla pewności wyświetlmy zawartość tego DataFrame. Apache Zeppelin którego użyłem potrafi w ładny sposób wyświetlić zawartość DataFrame. W przypadku interpretera spark2, służy do tego metoda z.show.
W Apache Zeppelin na HDInsight w chmurze Microsoft Azure niestety nie działał z.show() . Z tego co się zorientowałem, jest to kwestia interpretera livy2. W tym przypadku można zarejestrować DataFrame jako tymczasową tabelę i wyświetlić interpreterem SQL.
breach.registerTempTable("raw_breach")
%sql
SELECT * FROM raw_breach LIMIT 10
Póki co mamy jedną kolumnę w której znajduje się mail i hasło. Ponadto zdarza się, że wartości te są zamienione.
breach = breach
.withColumn("_tmp", split($"_c0", ":")).select(
$"_tmp".getItem(0).as("mail_tmp"),
$"_tmp".getItem(1).as("password_tmp")
)
.withColumn("isPasswordMailAlike", when($"password_tmp" rlike "(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|\"(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21\\x23-\\x5b\\x5d-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])*\")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21-\\x5a\\x53-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])+)\\])", 1).otherwise(0))
.withColumn("mail", when($"isPasswordMailAlike" === 1, lower($"password_tmp") ).otherwise(lower($"mail_tmp")))
.withColumn("password", when($"isPasswordMailAlike" === 1, $"mail_tmp" ).otherwise($"password_tmp"))
.drop("mail_tmp")
.drop("password_tmp")
.drop("isPasswordMailAlike")
W powyższym kodzie sporo się dzieje. Przeanalizujmy idąc od góry:
- linia 2-5 – Podzielenie kolumny na dwie mail_tmp i password_tmp po dwukropku.
- linia 6 – Klasyfikator czy hasło przypomina email. Jeśli tak to 1, jeśli nie to 0. Użyłem wyrażenia regularnego (regex). Jeśli znasz lepszą metodę: proszę napisz w komentarzu.
- Linia 7-8 – Wykorzystanie klasyfikatora do utworzenia nowych kolumn. Hasła do haseł, maile do maili.
- Linia 9-10 – Usunięcie roboczych kolumn.
Agregacje
Dane wyglądają sensownie. Możemy przejść do prostych agregacji. Znajdźmy jakie hasła, maile i domeny najczęściej się powtarzają.
Top 10 haseł
Najpierw zaimportujmy funkcje do operacji na DataFrame
import org.apache.spark.sql.functions._
Prosty groupBy po kolumnie password. DataFrame ograniczam do 10 rekordów i dodaję do tymczasowej tabeli. Całość jest otoczona nawiasami by interpreter nie płakał, że nie wie co zrobić ze znakami nowej linii.
import org.apache.spark.sql.functions._
val top_10_passwords = (breach.groupBy("password")
.count()
.orderBy(desc("count")).limit(10))
top_10_passwords.registerTempTable("top_10_passwords")
Top 10 maili
val top_10_mails = (breach.groupBy("mail")
.count()
.orderBy(desc("count")).limit(10))
top_10_mails.registerTempTable("top_10_mails")
Top 10 domen
val top_10_domains = (breach
.withColumn("domain", substring(col("mail"), -3, 3))
.groupBy("domain")
.count()
.orderBy(desc("count"))
.limit(10))
top_10_domains.registerTempTable("top_10_domains")
Przetworzenie całego zbioru
To co osiągnęliśmy dla jednego pliku jest zadowalające. Teraz czas na powtórzenie operacji dla wszystkich plików. Poniżej przykład jak wczytania wszystkich plików do jednego DataFrame.
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable.ListBuffer
val fs = FileSystem.get(new Configuration())
val files = fs.listFiles(new Path("/public/BreachCompilation/data/"), true)
val filePaths = new ListBuffer[String]
while (files.hasNext()) {
val file = files.next()
filePaths += file.getPath.toString
}
var breach = (spark
.read
.option("charset", "UTF-8")
.csv(filePaths: _*))
Najpierw policzmy ile rekordów mamy w naszym zbiorze. Jak widać proste policzenie trwało już prawie 6 minut.
Top 10 hasel
Dyskusyjne są wartości “null” oraz jej brak. Na pewno hasło “123456” wiedzie prym ?. Problem w tym, że zapytanie trwało ponad 41 minut…
Natomiast kolejne zapytania w ogóle się nie kończą. Sesja Livy po godzinie się skończyła.
Materializacja zbioru danych do pliku Parquet
Działania w Spark-u są “leniwe” (Lazy Evaluation). Machina mieląca odpala się dopiero, gdy chcemy zmaterializować DataFrame/RDD. Operacje w Spark-u działają na abstrakcjach. To dlatego wszystkie operacje przed wykonaniem SELECT-a wykonywały się błyskawicznie.
Zmaterializujmy wszystkie csv-ki do jednego pliku. Będzie to format Parquet (o formatach więcej będzie w kolejnym wpisie). Napisałem pod to program wykorzystujący w.w kody i wrzuciłem w postaci .jar na serwer.
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
object Breach {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = getSparkSession
import sparkSession.implicits._
var breach: DataFrame = getBreachDatasetDF(sparkSession)
breach = breach.
withColumn("_tmp", split($"_c0", ":")).select(
$"_tmp".getItem(0).as("mail_tmp"),
$"_tmp".getItem(1).as("password_tmp")
).
withColumn("isPasswordMailAlike", when($"password_tmp" rlike "(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|\"(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21\\x23-\\x5b\\x5d-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])*\")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21-\\x5a\\x53-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])+)\\])", 1).otherwise(0)).
withColumn("mail", when($"isPasswordMailAlike" === 1, lower($"password_tmp")).otherwise(lower($"mail_tmp"))).
withColumn("password", when($"isPasswordMailAlike" === 1, $"mail_tmp").otherwise($"password_tmp")).
drop("mail_tmp").
drop("password_tmp").
drop("isPasswordMailAlike").cache()
breach.write.mode(SaveMode.Append).parquet("/breach_compilation/breach.parquet")
}
private def getBreachDatasetDF(sparkSession: SparkSession) = {
val fs = FileSystem.get(new Configuration())
val files = fs.listFiles(new Path("/breach_compilation/BreachCompilation/data/"), true)
val filePaths = new ListBuffer[String]
while (files.hasNext()) {
val file = files.next()
filePaths += file.getPath.toString
}
var breach = sparkSession
.read
.option("charset", "UTF-8")
.csv(filePaths: _*)
breach
}
private def getSparkSession = {
val conf = new SparkConf()
.setAppName("saveBreachDataSetToFormats");
// .setMaster("yarn");
val sparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate();
sparkSession
}
}
Operacja trwała dłuższą chwilę więc użyłem nohup by zamknięcie terminala nie spowodowało zabicie spark-owego job-a.
nohup spark-submit --num-executors 4 --executor-memory 22G --class Breach myspark_2.11-0.1.jar &
Efektem jest duża ilość plików parquet. Rozwiązałem to specjalną aplikacją do łączenia plików parquet (na podstawie tego artykułu)
hadoop jar parquet-tools-1.9.0.jar merge /breach_compilation/breach.parquet /breach_compilation/breach.parquet.merged/breach.parquet
Jak widać powyżej, dane w formacie parquet (kompresja Snappy) zajmują połowę tego co pierwotne.
Agregacje – one more time
Po załadowaniu do DataFrame pliku Parquet możemy wracać do analiz.
val breach = spark.sqlContext.read.parquet("/breach_compilation/breach.parquet.merged/breach.parquet")
Wniosek
Oczyszczone dane warto zapisać do formatu typu Parquet i dopiero wtedy zacząć głębszą analizę.
Przed zamianą email z hasłem miejscami warto sprawdzić, czy email nie jest poprawny. Jeśli czyjeś hasło załapie się w regex może dojść do zamiany poprawnego emaila na just@nother.pass albo coś w tym stylu.
Słuszna uwaga 🙂
Domena nie zawsze ma 3 znaki.