MinIO to rozproszony storage implementujący API AWS S3. Można go wdrożyć na środowiskach on-premises. Jest przygotowany pod Kubernetes. Stanowi ciekawą alternatywę dla środowisk opartych o HDFS i resztę ekosystemu Hadoop. W końcu Kubernetes staje się coraz ciekawszą alternatywą YARN-a dla Apache Spark. W tym wpisie zapoznamy się z lokalnie postawionym MinIO na docker-compose i wykonamy kilka operacji w Sparku.
Środowisko
Docker compose składający się z jupytera oraz 4 węzłów z MinIO.
version: '3'
services:
notebook:
image: jupyter/all-spark-notebook
ports:
- 8888:8888
- 4040:4040
environment:
- PYSPARK_SUBMIT_ARGS=--packages com.amazonaws:aws-java-sdk-bundle:1.11.819,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell
volumes:
- ./work:/home/jovyan/work
minio1:
image: minio/minio:RELEASE.2020-07-02T00-15-09Z
volumes:
- ./minio/data1-1:/data1
- ./minio/data1-2:/data2
ports:
- "9001:9000"
environment:
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123
command: server http://minio{1...4}/data{1...2}
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
minio2:
image: minio/minio:RELEASE.2020-07-02T00-15-09Z
volumes:
- ./minio/data2-1:/data1
- ./minio/data2-2:/data2
ports:
- "9002:9000"
environment:
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123
command: server http://minio{1...4}/data{1...2}
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
minio3:
image: minio/minio:RELEASE.2020-07-02T00-15-09Z
volumes:
- ./minio/data3-1:/data1
- ./minio/data3-2:/data2
ports:
- "9003:9000"
environment:
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123
command: server http://minio{1...4}/data{1...2}
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
minio4:
image: minio/minio:RELEASE.2020-07-02T00-15-09Z
volumes:
- ./minio/data4-1:/data1
- ./minio/data4-2:/data2
ports:
- "9004:9000"
environment:
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123
command: server http://minio{1...4}/data{1...2}
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
Testowymi danymi będzie zbiór MovieLens.
wget http://files.grouplens.org/datasets/movielens/ml-latest.zip
unzip ./ml-latest.zip
Aby odpalic, wystarczy wpisać sudo docker-compose up -d w folderze z repo.
MinIO
GUI
Pod localhost:9001-9004 dostaniemy się do interfejsu graficznego. Jest on bardzo prosty. Po wpisaniu access_key i secret_key podanego w docker-compose.yml, możemy utworzyć testowy bucket i dodać pliki z zbioru MovieLens
Jest opcja korzystania z dedykowanego CLI mc. Pozwala między innymi na administrowanie klastrem.
Co jest pod spodem? Struktura plików
Nie zgłębiałem się jeszcze mocno w dokumentację, by poznać sposób działania MinIO. Zmapowaliśmy foldery z danymi, więc zerknijmy jak zostały dodane pliki MoieLens.
maciej@ubuntu:~/Desktop/fun2/minio$ du -h .
12K ./data1-1/bucket1/movielens/README.txt
182M ./data1-1/bucket1/movielens/ratings.csv
320K ./data1-1/bucket1/movielens/links.csv
9.5M ./data1-1/bucket1/movielens/tags.csv
99M ./data1-1/bucket1/movielens/genome-scores.csv
16K ./data1-1/bucket1/movielens/genome-tags.csv
708K ./data1-1/bucket1/movielens/movies.csv
291M ./data1-1/bucket1/movielens
291M ./data1-1/bucket1
12K ./data1-1/.minio.sys/buckets/.usage-cache.bin
12K ./data1-1/.minio.sys/buckets/bucket1/.usage-cache.bin
12K ./data1-1/.minio.sys/buckets/bucket1/.metadata.bin
28K ./data1-1/.minio.sys/buckets/bucket1
12K ./data1-1/.minio.sys/buckets/.bloomcycle.bin
12K ./data1-1/.minio.sys/buckets/.usage.json
76K ./data1-1/.minio.sys/buckets
12K ./data1-1/.minio.sys/backend-encrypted
4.0K ./data1-1/.minio.sys/multipart
12K ./data1-1/.minio.sys/config/iam/format.json
16K ./data1-1/.minio.sys/config/iam
12K ./data1-1/.minio.sys/config/config.json
32K ./data1-1/.minio.sys/config
4.0K ./data1-1/.minio.sys/tmp
136K ./data1-1/.minio.sys
291M ./data1-1
...
Widać pliki konfiguracje. Nazwy plików, które wrzuciliśmy, są katalogami. Rozmiar dodanego pliku ratings.csv wynosił 724 MB. Tutaj jedna częśc wazy 182 MB, więc licząc 2 katalogi * 4 węzły, wychodzi łącznie ~1456 MB. Czyli 2x tyle, co oryginał. Tutaj znajdziesz konfigurację ilości dysków data i parity. MinIO w tym przypadku ma Storage Usage Ratio o wartości 2. Ciekawe co się stanie, jak wyłączymy jeden node ?.
maciej@ubuntu:~/Desktop/fun2/minio$ md5sum data{1,2,3,4}-{1,2}/bucket1/movielens/ratings.csv/part.1
0f034325431b33725c9938632166f0e6 data1-1/bucket1/movielens/ratings.csv/part.1
559aefe5c66a7b8e1cbbaf34afec36f3 data1-2/bucket1/movielens/ratings.csv/part.1
235e2c8af8779589d7198e4031a37e3e data2-1/bucket1/movielens/ratings.csv/part.1
4573b69e29948539e7400f99117418a3 data2-2/bucket1/movielens/ratings.csv/part.1
76b4d2684bf8f5c1d5d5e728a40d85e1 data3-1/bucket1/movielens/ratings.csv/part.1
1ec1e7a955ef5b17880468ddf9d5052a data3-2/bucket1/movielens/ratings.csv/part.1
c475fafd05739d56ccaf25724ea60362 data4-1/bucket1/movielens/ratings.csv/part.1
6148048b77f83fcc1e2922bff0cef67b data4-2/bucket1/movielens/ratings.csv/part.1
Skróty się różnią. Pomimo tych samych nazw, to nie są identyczne pliki.
Spark
Sprawdźmy, czy Spark (a właściwie PySpark) w wersji 3.0 dogada się z MinIO. Pamiętaj, aby użyć docker logs <id/nazwa_kontenera>, by podejrzeć link aktywujący w kontenerze z jupyter’em.
Zależności
Wróćmy na chwilę do docker-compose.yml. Aby Spark potrafił gadać z API S3, musimy dać mu parę paczek.
...
environment:
- PYSPARK_SUBMIT_ARGS=--packages com.amazonaws:aws-java-sdk-bundle:1.11.819,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell
...
Skąd wiedzieć jakie wersje? Jupyter, którego tutaj użyłem, korzysta ze Sparka w wersji 3.0. Spark Core 3.0.0, korzysta z org.apache.hadoop>>hadoop-client w wersji 3.2.1, co odpowiada tej samej wersji org.apache.hadoop>>hadoop-aws, ta natomiast ma zależność na com.amazonaws>>aws-java-sdk-bundle w wersji 1.11.819. Jeśli mój tok rozumowania jest zły, daj znać ? Tak czy inaczej, inne wersje nie były do końca kompatybilne i w Jupyter/PySparku widziałem typowy javowy Stack Trace.
Konfiguracja Spark Context
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext._jsc\
.hadoopConfiguration().set("fs.s3a.access.key", "minio")
spark.sparkContext._jsc\
.hadoopConfiguration().set("fs.s3a.secret.key", "minio123")
spark.sparkContext._jsc\
.hadoopConfiguration().set("fs.s3a.endpoint", "http://minio1:9000")
spark.sparkContext._jsc\
.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext._jsc\
.hadoopConfiguration().set("spark.hadoop.fs.s3a.path.style.access", "true")
spark.sparkContext._jsc\
.hadoopConfiguration().set("fs.s3a.multipart.size", "104857600")
Oprócz namiarów na MinIO jest tutaj konfiguracja specyficzna dla PySparka. fs.s3a.multipart.size domyślnie wynosi 100M, czego python nie potrafi zinterpretować.
Odczyt z MinIO
Dalej jest już standardowy kod. Jedyna różnica to URI.
ratings = spark.read\
.option("header", "true")\
.option("inferSchema", "true")\
.csv("s3a://bucket1/movielens/ratings.csv")
ratings.registerTempTable("ratings")
movies = spark.read\
.option("header", "true")\
.option("inferSchema", "true")\
.csv("s3a://bucket1/movielens/movies.csv")
movies.registerTempTable("movies")
Zapis do MINIO
Odczytaliśmy w CSV, więc zapiszmy w Parquet.
top_100_movies = spark.sql("""
SELECT title, AVG(rating) as avg_rating
FROM movies m
LEFT JOIN ratings r ON m.movieId = r.movieID
GROUP BY title
HAVING COUNT(*) > 100
ORDER BY avg_rating DESC
LIMIT 100
""")
top_100_movies.write.parquet("s3a://bucket1/movielens/results/top_100_movies")
A jakby wyłączyć kontener…
Wyłączałem po kolei trzy kontenery. Zostawiłem ten pierwszy, by nie zmieniać configu.
Wyłączenie trzeciego uniemożliwiło funkcjonowanie.
I to by się zgadzało z tym co piszą w architekturze referencyjnej. Minio zabezpiecza integralność obiektów korzystając z erasure code i sum kontrolnych zabezpieczających przed degradacją danych. W teorii MinIO powinno działać po utraceniu 50% dysków i 50% klastra i tak też wyszło w tym przypadku. Chyba że oszczędzimy na dyskach 🙂
Repozytorium
https://github.com/zorteran/wiadro-danych-minio-test/tree/master
Co dalej?
Planuję porównać szybkość działań rozwiązania opartego o HDFS i MinIO. MinIO co prawda zrobiło już takie porównanie. Wykorzystali tam 12 węzłów (storage + compute) w przypadku HDFS, gdy dla MinIO było to 12 (storage) + 12 (compute). Rozumiem rozdzielenie storage od compute, ale porównanie 12 vs 24 trochę mnie nie przekonuje ?.
cześć a dostawałeś może “com.amazonaws.AmazonClientException: Unable to execute HTTP request: Timeout waiting for connection from pool” dl większych danych ?
Hej, nie miałem takiego problemu. Probowałeś zwiększyć fs.s3.maxConnections / fs.s3a.connection.maximum ( https://aws.amazon.com/premiumsupport/knowledge-center/emr-timeout-connection-wait/ / https://docs.cloudera.com/documentation/enterprise/5-8-x/topics/impala_s3.html#s3_best_practices)?