W Apache Spark/PySpark posługujemy się abstrakcjami, a faktyczne przetwarzanie dokonywane jest dopiero gdy chcemy zmaterializować wynik operacji. Do dyspozycji mamy szereg bibliotek, którymi możemy łączyć się z różnymi bazami i systemami plików. W tym artykule dowiesz się jak połączyć dane z MySQL i MongoDB, a następnie zapisać je w Apache Cassandra.
Środowisko
Idealny moment na wykorzystanie Docker, a dokładnie Docker Compose. Postawimy wszystkie bazy i Jupyter’a z Apache Spark.
# Use root/example as user/password credentials version: '3.1' services: notebook: image: jupyter/all-spark-notebook ports: - 8888:8888 - 4040:4040 volumes: - ./work:/home/jovyan/work cassandra: image: 'bitnami/cassandra:latest' mongo: image: mongo environment: MONGO_INITDB_ROOT_USERNAME: root MONGO_INITDB_ROOT_PASSWORD: example mysql: image: mysql:5.7 environment: MYSQL_DATABASE: 'school' MYSQL_USER: 'user' MYSQL_PASSWORD: 'password' MYSQL_ROOT_PASSWORD: 'password'
Dodanie danych do MongoDB
Potrzebne są jakieś dane. Napisałem prosty skrypt w Python. Załóżmy, że w mongo są dane studentów.
Dodanie danych do MySQL
Natomiast w MySQL mamy tabelę słownikową z grupami. Trzeba utworzyć tabelę i dodać do niej dane.
Schemat danych w Cassandrze
Skoro chcemy wrzucić dane do Cassandry, musimy utworzyć tam keyspace i odpowiednią tabelę. Cassandra jest w Dockerze, więc musimy tam wejśc i odpalić cqlsh.
sudo docker exec -it simple-spark-etl_cassandra_1 bash cqlsh --user cassandra --password cassandra
Następnie utworzyć keyspace i tabelę o odpowiednim schemacie.
CREATE KEYSPACE school WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; CREATE TABLE school.students ( name text, surname text, age int, group_id int, group_number text, skills set<text>, something_important int, PRIMARY KEY (name, surname) ) ;
Dane i schemat w tym przypadku nie mają znaczenia (i sensu). W Apache Cassandra modelowanie danych jest specyficzne, więc w warunkach “nieszkoleniowych” najlepiej kilka razy się zastanowić.
ETL do Apache Cassandra w PySpark
Musimy dostarczyć odpowiednie biblioteki za pomocą zmiennej PYSPARK_SUBMIT_ARGS i skonfigurować źródła.
Jak widać, kod nie jest skomplikowany. Całą robotę przejmują na siebie biblioteki. Apache Spark przydziela zadania dzieląc zbiory na podstawie kluczy/indeksów/metadanych.
Ciekawostka
Pierwotnie chciałem napisać w.w kod w Scali używając kernela Spylon w Jupyter. Niestety ma problemy z biblioteką do MongoDB. PySpark i spark-shell radzi sobie z tym bez problemu.