PySpark ETL z MySQL i MongoDB do Cassandra

apache-spark-etl-mongo-mysql-cassandra

W Apache Spark/PySpark posługujemy się abstrakcjami, a faktyczne przetwarzanie dokonywane jest dopiero gdy chcemy zmaterializować wynik operacji. Do dyspozycji mamy szereg bibliotek, którymi możemy łączyć się z różnymi bazami i systemami plików. W tym artykule dowiesz się jak połączyć dane z MySQL i MongoDB, a następnie zapisać je w Apache Cassandra.

Środowisko

Idealny moment na wykorzystanie Docker, a dokładnie Docker Compose. Postawimy wszystkie bazy i Jupyter’a z Apache Spark.

# Use root/example as user/password credentials
version: '3.1'

services:
  notebook:
    image: jupyter/all-spark-notebook
    ports:
      - 8888:8888
      - 4040:4040
    volumes:
      - ./work:/home/jovyan/work

  cassandra:
    image: 'bitnami/cassandra:latest'

  mongo:
    image: mongo
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example

  mysql:
    image: mysql:5.7
    environment:
      MYSQL_DATABASE: 'school'
      MYSQL_USER: 'user'
      MYSQL_PASSWORD: 'password'
      MYSQL_ROOT_PASSWORD: 'password'

Dodanie danych do MongoDB

Potrzebne są jakieś dane. Napisałem prosty skrypt w Python. Załóżmy, że w mongo są dane studentów.

Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
view raw seed_mongo.ipynb hosted with ❤ by GitHub

Dodanie danych do MySQL

Natomiast w MySQL mamy tabelę słownikową z grupami. Trzeba utworzyć tabelę i dodać do niej dane.

Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
view raw seed_mysql.ipynb hosted with ❤ by GitHub

Schemat danych w Cassandrze

Skoro chcemy wrzucić dane do Cassandry, musimy utworzyć tam keyspace i odpowiednią tabelę. Cassandra jest w Dockerze, więc musimy tam wejśc i odpalić cqlsh.

sudo docker exec -it simple-spark-etl_cassandra_1 bash
cqlsh --user cassandra --password cassandra

Następnie utworzyć keyspace i tabelę o odpowiednim schemacie.

CREATE KEYSPACE school WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};

CREATE TABLE school.students (
    name text,
    surname text,
    age int,
    group_id int,
    group_number text,
    skills set<text>,
    something_important int,
    PRIMARY KEY (name, surname)
) ;

Dane i schemat w tym przypadku nie mają znaczenia (i sensu). W Apache Cassandra modelowanie danych jest specyficzne, więc w warunkach „nieszkoleniowych” najlepiej kilka razy się zastanowić.

ETL do Apache Cassandra w PySpark

Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

Musimy dostarczyć odpowiednie biblioteki za pomocą zmiennej PYSPARK_SUBMIT_ARGS i skonfigurować źródła.

Jak widać, kod nie jest skomplikowany. Całą robotę przejmują na siebie biblioteki. Apache Spark przydziela zadania dzieląc zbiory na podstawie kluczy/indeksów/metadanych.

Ciekawostka

Pierwotnie chciałem napisać w.w kod w Scali używając kernela Spylon w Jupyter. Niestety ma problemy z biblioteką do MongoDB. PySpark i spark-shell radzi sobie z tym bez problemu.

Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

Repozytorium

https://github.com/zorteran/wiadro-danych-simple-spark-etl

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *