Kafka Connect to część platformy Apache Kafka. Służy do łączenia Kafki z zewnętrznymi serwisami takimi jak systemy plików lub bazy danych. W artykule dowiesz się jaki problem rozwiązuje i jak ją uruchomić.
Dlaczego Kafka Connect?
Apache Kafka wykorzystywana jest w architekturze mikroserwisów, agregacji logów, Change data capture (CDC), integracji, platforma do przetwarzania strumieniowego i warstwa akwizycji danych do Data Lake‘a. Niezależnie do czego wykorzystujesz Kafkę, dane płyną ze źródła (source) i trafiają do ujścia (sink).
Implementacja konsumenta lub producenta Apache Kafka zgodnie ze sztuką wymaga czasu i wiedzy. Rzecz w tym, że źródła i ujścia często powtarzają się. Wiele firm zapisuje dane z Kafki do HDFS/S3 i Elasticsearch. A gdyby tak napisać takie połączenie raz a porządnie?
Nie ma sensu wymyślać koła na nowo. Kafka Connect rozwiązuje ten problem. Jest to platforma do łączenia Kafki z zewnętrznymi komponentami. Za pomocą systemu konektorów, możemy elastycznie konfigurować wejścia/wyjścia do /z kolejki. Może działać samodzielnie i w trybie rozproszonym, dzięki czemu uzyskujemy rozproszoną, skalowalną, odporną na awarię platformę o niskiej barierze wejścia.
Alternatywy
Alternatywy, które przychodzą mi do głowy to:
Konektory
Konektory znajdziesz na stronie Confluent Hub. Dostępne są darmowe jak i płatne, dostępne w ramach platformy Confluent. Przykładowo: Konektor do Cassandry jest dostępny w wersji płatnej (od Confluent), ale jest też wersja darmowa od DataStax. Jeśli nie znalazłeś gotowego rozwiązania, można napisać swój konektor.
Środowisko
Standardowo, wykorzystany jest tu Docker. W tym przypadku Kafka, Zookeeper i Minio są w postaci kontenerów. Kafka Connect będzie włączane na maszynie hosta. Jest częścią Apache Kafka, więc wystarczy ściągnąć i rozpakować binarkę. Obsługę Kafki możemy ułatwić sobie za pomocą narzędzia Conduktor.
version: '2' services: zookeeper: image: 'bitnami/zookeeper:3' ports: - '2181:2181' volumes: - 'zookeeper_data:/bitnami' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:2' ports: - '9092:9092' - '29092:29092' volumes: - 'kafka_data:/bitnami' environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 depends_on: - zookeeper minio: image: 'bitnami/minio:latest' ports: - '9000:9000' environment: - MINIO_ACCESS_KEY=minio-access-key - MINIO_SECRET_KEY=minio-secret-key volumes: zookeeper_data: driver: local kafka_data: driver: local
Tryb Standalone – Zapis do pliku
Apache Connect możemy uruchomić w dwóch trybach: Standalone i Distributed. Ten pierwszy służy do testowania i deweloperki. Aby go uruchomić potrzebne są pliki konfiguracyjne.
bootstrap.servers=localhost:29092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter offset.storage.file.filename=/tmp/connect.offsets
Key i value converter określają format w jakim Kafka ma odczytać/zapisać dane.
name=standalone-file-sink connector.class=FileStreamSink tasks.max=1 file=output.txt topics=standalone-test
Pola wydają się oczywiste. Poniżej GIF z wykonania.
kafka_2.13-2.6.0/bin/connect-standalone.sh connect-standalone.properties connect-file-sink.properties
Tryb Distributed
To właśnie ten tryb zapewnia skalowalność i odporność na awarie. Wystarczy odpalić wiele workerów z tym samym group.id, a orchiestracja zadaniami zapewniona jest przez samą platformę. W przykładzie z generowaniem danych zobaczysz jak zachowa się Kafka Connect gdy ubijemy jednego z workerów. Poniżej plik konfiguracyjny jednego z workerów.
bootstrap.servers=localhost:29092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false offset.storage.topic=connect-offsets offset.storage.replication.factor=1 config.storage.topic=connect-configs config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 #rest.host.name= rest.port=8083 plugin.path=/tmp/plugins/
- group.id = działa podobnie jak consumer group. Jeśli chcemy, by workery pracowały w klastrze, group.id muszą byc identyczne
- Kafka Connect trzyma dane w… Kafce. Trzeba wskazać nazwy topiców i oczekiwany replication factor.
- Do katalogu podanego plugin.path wrzucamy konektory. Wszystkie workery powinny mieć dostęp do tych samych konektorów.
Pozostaje odpalić worker Kafka Connect
kafka_2.13-2.6.0/bin/connect-distributed.sh connect-distributed-1.properties
Generowanie danych – Datagen Source
Wykorzystamy Kafka Connect Datagen. Jest to konektor do generowania danych. Może przydać się do jakiegoś PoC. Wystarczy go ściągnąć i rozpakowany katalog umieścić do katalogu podanym w plugin.path.
Zadania w Kafka Connect uruchamiamy za pomocą REST API. Najpierw trzeba przygotować konfigurację konektora.
{ "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "distributed-test", "quickstart": "ratings", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "max.interval": 100, "tasks.max": "1" }
Aby potem wysłać go pod scieżkę …/connectors/nazwa/config.
curl -X PUT -H 'Content-Type:application/json' http://localhost:8083/connectors/source-datagen/config -d @connect-datagen-source.json
Ale możemy skorzystać też z innych narzędzi, np. z wspomnianego wcześniej Conduktora.
Poniżej możesz zobaczyć GIF w którym odpalam konektor Datagen, a następnie stawiam i ubijam workery Kafka Connect.
Zapis danych – AWS S3 Sink
Dane są generowane, przejdźmy do kwestii zapisu. Pierwotnie chciałem tu wykorzystać Elasticsearch Sink, ale nie mogłem obejść problemu z brakiem guavy w bibliotekach Kafki, a chciałem wypuścić ten artykuł w miarę szybko. Konektor AWS S3 znajdziesz tutaj. Kroki takie same jak w przypadku Datagen.
{ "name":"sink-minio", "connector.class":"io.confluent.connect.s3.S3SinkConnector", "tasks.max":"1", "topics":"distributed-test", "s3.bucket.name":"test", "s3.part.size":"5242880", "s3.compression.type":"gzip", "flush.size":"100", "aws.secret.access.key": "minio-secret-key", "aws.access.key.id": "minio-access-key", "store.url":"http://localhost:9000", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "schema.compatibility": "NONE", "schemas.enable":false }
- store.url wskazuje na lokalny kontener z MinIO
- flush.size – tyle rekordów będzie zawierać jedna paczka
- s3.compression.type – każda paczka będzie kompresowana (lub nie)
- value.converter – rekordy mogą być w róznych formatach. Tutaj akurat korzystamy z JSON.
curl -X PUT -H 'Content-Type:application/json' http://localhost:8083/connectors/sink-minio/config -d @connect-minio-sink.json
Zadanie z MinIO zostało dodane. Możemy sprawdzić czy faktycznie dane trafiają do kubełka.
Podsumowanie
Jeśli dane mają iść prosto do/z Kafki, pokusiłbym się o Kafka Connect. W przypadku bardziej zaawansowanych operacji raczej widzę rozwiązanie w klimatach przetwarzania strumieniowego takie jak Logstash lub Spark Structured Streaming.
Repozytorium
https://github.com/zorteran/wiadro-danych-kafka-connect-nutshell
2 komentarze do “Kafka Connect w pigułce”