Półtora miliarda haseł w Spark – część 1 – czyszczenie

Tym razem do piaskownicy wkracza Apache Spark. Zajmiemy się prostym, ale pokaźnym zbiorem maili i haseł z różnych wycieków danych. W tym przykładzie będę używał DataFrames w Spark 2.0. Środowisko na którym działałem to HDInsight na Azure.

Jest to jeden z artykułów dotyczących tego zbioru danych:

Dane

Danych do przetworzenia jest 41,1 GB. Maile i hasła zapisane są w CSV-kach dzielonych dwukropkiem, w folderach partycjonowanych wg. pierwszego znaku wiersza. Łącznie jest to 1 400 553 869 rekordów.

Wyświetlenie pierwszych 10 wierszy jednego z plików

Niestety zdarzają się zamienione pary, czyli mail jest w miejscu hasła. Mamy też do czynienia z dużą ilością małych plików (HDFS za tym nie przepada).

Przetworzenie pojedynczego pliku

Klasycznie, tak samo jak w Jak zostać Ironmanem? Analiza CSV-ek w pandas, najpierw zajmiemy się pojedynczym plikiem. Szkoda czasu na mielenie za każdym razem ponad 40 GB danych.

val path = "/public/BreachCompilation/data/0/0"
var breach = spark.read
    .option("charset", "UTF-8")
    .csv(path)

Dla pewności wyświetlmy zawartość tego DataFrame. Apache Zeppelin którego użyłem potrafi w ładny sposób wyświetlić zawartość DataFrame. W przypadku interpretera spark2, służy do tego metoda z.show.

Wykonanie metody z.show w Apache Zeppelin

W Apache Zeppelin na HDInsight w chmurze Microsoft Azure niestety nie działał z.show() . Z tego co się zorientowałem, jest to kwestia interpretera livy2. W tym przypadku można zarejestrować DataFrame jako tymczasową tabelę i wyświetlić interpreterem SQL.

breach.registerTempTable("raw_breach")
%sql
SELECT * FROM raw_breach LIMIT 10

Póki co mamy jedną kolumnę w której znajduje się mail i hasło. Ponadto zdarza się, że wartości te są zamienione.

breach = breach
    .withColumn("_tmp", split($"_c0", ":")).select(
      $"_tmp".getItem(0).as("mail_tmp"),
      $"_tmp".getItem(1).as("password_tmp")
    )
    .withColumn("isPasswordMailAlike", when($"password_tmp" rlike "(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|\"(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21\\x23-\\x5b\\x5d-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])*\")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21-\\x5a\\x53-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])+)\\])", 1).otherwise(0))
    .withColumn("mail", when($"isPasswordMailAlike" === 1, lower($"password_tmp") ).otherwise(lower($"mail_tmp")))
    .withColumn("password", when($"isPasswordMailAlike" === 1, $"mail_tmp" ).otherwise($"password_tmp"))
    .drop("mail_tmp")
    .drop("password_tmp")
    .drop("isPasswordMailAlike")

W powyższym kodzie sporo się dzieje. Przeanalizujmy idąc od góry:

  • linia 2-5 – Podzielenie kolumny na dwie mail_tmp i password_tmp po dwukropku.
  • linia 6 – Klasyfikator czy hasło przypomina email. Jeśli tak to 1, jeśli nie to 0. Użyłem wyrażenia regularnego (regex). Jeśli znasz lepszą metodę: proszę napisz w komentarzu.
  • Linia 7-8 – Wykorzystanie klasyfikatora do utworzenia nowych kolumn. Hasła do haseł, maile do maili.
  • Linia 9-10 – Usunięcie roboczych kolumn.
Agregacje

Dane wyglądają sensownie. Możemy przejść do prostych agregacji. Znajdźmy jakie hasła, maile i domeny najczęściej się powtarzają.

Top 10 haseł

Najpierw zaimportujmy funkcje do operacji na DataFrame

import org.apache.spark.sql.functions._

Prosty groupBy po kolumnie password. DataFrame ograniczam do 10 rekordów i dodaję do tymczasowej tabeli. Całość jest otoczona nawiasami by interpreter nie płakał, że nie wie co zrobić ze znakami nowej linii.

import org.apache.spark.sql.functions._
val top_10_passwords = (breach.groupBy("password")
                            .count()
                            .orderBy(desc("count")).limit(10))
top_10_passwords.registerTempTable("top_10_passwords")
Top 10 haseł w jednym z plików
Top 10 maili
val top_10_mails = (breach.groupBy("mail")
                            .count()
                            .orderBy(desc("count")).limit(10))
top_10_mails.registerTempTable("top_10_mails")
Top 10 domen
val top_10_domains = (breach
                        .withColumn("domain", substring(col("mail"), -3, 3))
                        .groupBy("domain")
                        .count()
                        .orderBy(desc("count"))
                        .limit(10))
top_10_domains.registerTempTable("top_10_domains")

Przetworzenie całego zbioru

To co osiągnęliśmy dla jednego pliku jest zadowalające. Teraz czas na powtórzenie operacji dla wszystkich plików. Poniżej przykład jak wczytania wszystkich plików do jednego DataFrame.

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable.ListBuffer

val fs = FileSystem.get(new Configuration())
val files = fs.listFiles(new Path("/public/BreachCompilation/data/"), true)
val filePaths = new ListBuffer[String]
while (files.hasNext()) {
    val file = files.next()
    filePaths += file.getPath.toString
}

var breach = (spark
    .read
    .option("charset", "UTF-8")
    .csv(filePaths: _*))

Najpierw policzmy ile rekordów mamy w naszym zbiorze. Jak widać proste policzenie trwało już prawie 6 minut.

Top 10 hasel

Dyskusyjne są wartości „null” oraz jej brak. Na pewno hasło „123456” wiedzie prym 😎. Problem w tym, że zapytanie trwało ponad 41 minut…

Natomiast kolejne zapytania w ogóle się nie kończą. Sesja Livy po godzinie się skończyła.

Materializacja zbioru danych do pliku Parquet

Działania w Spark-u są „leniwe” (Lazy Evaluation). Machina mieląca odpala się dopiero, gdy chcemy zmaterializować DataFrame/RDD. Operacje w Spark-u działają na abstrakcjach. To dlatego wszystkie operacje przed wykonaniem SELECT-a wykonywały się błyskawicznie.

Zmaterializujmy wszystkie csv-ki do jednego pliku. Będzie to format Parquet (o formatach więcej będzie w kolejnym wpisie). Napisałem pod to program wykorzystujący w.w kody i wrzuciłem w postaci .jar na serwer.

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer

object Breach {

  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = getSparkSession
    import sparkSession.implicits._

    var breach: DataFrame = getBreachDatasetDF(sparkSession)

    breach = breach.
      withColumn("_tmp", split($"_c0", ":")).select(
      $"_tmp".getItem(0).as("mail_tmp"),
      $"_tmp".getItem(1).as("password_tmp")
    ).
      withColumn("isPasswordMailAlike", when($"password_tmp" rlike "(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|\"(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21\\x23-\\x5b\\x5d-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])*\")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\\x01-\\x08\\x0b\\x0c\\x0e-\\x1f\\x21-\\x5a\\x53-\\x7f]|\\\\[\\x01-\\x09\\x0b\\x0c\\x0e-\\x7f])+)\\])", 1).otherwise(0)).
      withColumn("mail", when($"isPasswordMailAlike" === 1, lower($"password_tmp")).otherwise(lower($"mail_tmp"))).
      withColumn("password", when($"isPasswordMailAlike" === 1, $"mail_tmp").otherwise($"password_tmp")).
      drop("mail_tmp").
      drop("password_tmp").
      drop("isPasswordMailAlike").cache()

    breach.write.mode(SaveMode.Append).parquet("/breach_compilation/breach.parquet")
  }

  private def getBreachDatasetDF(sparkSession: SparkSession) = {
    val fs = FileSystem.get(new Configuration())
    val files = fs.listFiles(new Path("/breach_compilation/BreachCompilation/data/"), true)
    val filePaths = new ListBuffer[String]
    while (files.hasNext()) {
      val file = files.next()
      filePaths += file.getPath.toString
    }
    var breach = sparkSession
      .read
      .option("charset", "UTF-8")
      .csv(filePaths: _*)
    breach
  }

  private def getSparkSession = {
    val conf = new SparkConf()
      .setAppName("saveBreachDataSetToFormats");
//      .setMaster("yarn");

    val sparkSession = SparkSession
      .builder()
      .config(conf)
      .getOrCreate();
    sparkSession
  }
}

Operacja trwała dłuższą chwilę więc użyłem nohup by zamknięcie terminala nie spowodowało zabicie spark-owego job-a.

nohup spark-submit --num-executors 4 --executor-memory 22G --class Breach  myspark_2.11-0.1.jar &
Sporo tych plików parquet

Efektem jest duża ilość plików parquet. Rozwiązałem to specjalną aplikacją do łączenia plików parquet (na podstawie tego artykułu)

 hadoop jar parquet-tools-1.9.0.jar merge /breach_compilation/breach.parquet /breach_compilation/breach.parquet.merged/breach.parquet

Jak widać powyżej, dane w formacie parquet (kompresja Snappy) zajmują połowę tego co pierwotne.

Agregacje – one more time

Po załadowaniu do DataFrame pliku Parquet możemy wracać do analiz.

val breach = spark.sqlContext.read.parquet("/breach_compilation/breach.parquet.merged/breach.parquet")
Polska na 8-mym miejscu 😅

Wniosek

Oczyszczone dane warto zapisać do formatu typu Parquet i dopiero wtedy zacząć głębszą analizę.

2 myśli w temacie “Półtora miliarda haseł w Spark – część 1 – czyszczenie”

  1. Przed zamianą email z hasłem miejscami warto sprawdzić, czy email nie jest poprawny. Jeśli czyjeś hasło załapie się w regex może dojść do zamiany poprawnego emaila na just@nother.pass albo coś w tym stylu.

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *