PySpark ETL z MySQL i MongoDB do Cassandra

apache-spark-etl-mongo-mysql-cassandra

W Apache Spark/PySpark posługujemy się abstrakcjami, a faktyczne przetwarzanie dokonywane jest dopiero gdy chcemy zmaterializować wynik operacji. Do dyspozycji mamy szereg bibliotek, którymi możemy łączyć się z różnymi bazami i systemami plików. W tym artykule dowiesz się jak połączyć dane z MySQL i MongoDB, a następnie zapisać je w Apache Cassandra.

Środowisko

Idealny moment na wykorzystanie Docker, a dokładnie Docker Compose. Postawimy wszystkie bazy i Jupyter’a z Apache Spark.

# Use root/example as user/password credentials
version: '3.1'

services:
  notebook:
    image: jupyter/all-spark-notebook
    ports:
      - 8888:8888
      - 4040:4040
    volumes:
      - ./work:/home/jovyan/work

  cassandra:
    image: 'bitnami/cassandra:latest'

  mongo:
    image: mongo
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example

  mysql:
    image: mysql:5.7
    environment:
      MYSQL_DATABASE: 'school'
      MYSQL_USER: 'user'
      MYSQL_PASSWORD: 'password'
      MYSQL_ROOT_PASSWORD: 'password'

Dodanie danych do MongoDB

Potrzebne są jakieś dane. Napisałem prosty skrypt w Python. Załóżmy, że w mongo są dane studentów.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

Dodanie danych do MySQL

Natomiast w MySQL mamy tabelę słownikową z grupami. Trzeba utworzyć tabelę i dodać do niej dane.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

Schemat danych w Cassandrze

Skoro chcemy wrzucić dane do Cassandry, musimy utworzyć tam keyspace i odpowiednią tabelę. Cassandra jest w Dockerze, więc musimy tam wejśc i odpalić cqlsh.

sudo docker exec -it simple-spark-etl_cassandra_1 bash
cqlsh --user cassandra --password cassandra

Następnie utworzyć keyspace i tabelę o odpowiednim schemacie.

CREATE KEYSPACE school WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};

CREATE TABLE school.students (
    name text,
    surname text,
    age int,
    group_id int,
    group_number text,
    skills set<text>,
    something_important int,
    PRIMARY KEY (name, surname)
) ;

Dane i schemat w tym przypadku nie mają znaczenia (i sensu). W Apache Cassandra modelowanie danych jest specyficzne, więc w warunkach “nieszkoleniowych” najlepiej kilka razy się zastanowić.

ETL do Apache Cassandra w PySpark

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

Musimy dostarczyć odpowiednie biblioteki za pomocą zmiennej PYSPARK_SUBMIT_ARGS i skonfigurować źródła.

Jak widać, kod nie jest skomplikowany. Całą robotę przejmują na siebie biblioteki. Apache Spark przydziela zadania dzieląc zbiory na podstawie kluczy/indeksów/metadanych.

Ciekawostka

Pierwotnie chciałem napisać w.w kod w Scali używając kernela Spylon w Jupyter. Niestety ma problemy z biblioteką do MongoDB. PySpark i spark-shell radzi sobie z tym bez problemu.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

Repozytorium

https://github.com/zorteran/wiadro-danych-simple-spark-etl

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *