Jest to drugie podejście Wizualizacja autobusów w Elasticsearch i Kibana. Tym razem wykorzystam napisany wcześniej program w Kafka Streams do obliczenia prędkości i orientacji autobusów, a następnie wrzucę Logstash-em z Apache Kafka do Elasticsearch.
Analizę zebranych w ten sposób danych znajdziesz w kolejnym wpisie. Jak na darmową licencję, możliwości mogą zdziwić.
Środowisko
Docker-compose już trochę puchnie. Znajduje się w nim Elasticsearch, Kibana, Zookeeper, Kafka, Logstash, no i moja aplikacja Kafka Streams którą wrzuciłem na Docker Hub. Użytkownicy Kafki z niecierpliwością czekają na pozbycie się Zookeeper-a :-).
version: '3.3' services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0 restart: unless-stopped environment: - discovery.type=single-node - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ulimits: memlock: soft: -1 hard: -1 volumes: - esdata:/usr/share/elasticsearch/data ports: - 9200:9200 kibana: image: docker.elastic.co/kibana/kibana:7.6.2 restart: unless-stopped depends_on: - elasticsearch ports: - 5601:5601 volumes: - kibanadata:/usr/share/kibana/data zookeeper: image: 'bitnami/zookeeper:3' ports: - '2181:2181' volumes: - 'zookeeper_data:/bitnami' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:2' ports: - '9092:9092' - '29092:29092' volumes: - 'kafka_data:/bitnami' environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 depends_on: - zookeeper ztm_kafka_streams: image: "maciejszymczyk/ztm_stream:1.0" environment: - APPLICATION_ID_CONFIG=awesome_overrided_ztm_stream_app_id - BOOTSTRAP_SERVERS_CONFIG=kafka:9092 depends_on: - kafka logstash: image: docker.elastic.co/logstash/logstash:7.7.0 volumes: - "./pipeline:/usr/share/logstash/pipeline" environment: LS_JAVA_OPTS: "-Xmx256m -Xms256m" depends_on: - elasticsearch - kafka volumes: esdata: driver: local kibanadata: driver: local zookeeper_data: driver: local kafka_data: driver: local
Data Flow
Wyjaśnijmy sobie, jak wygląda przepływ danych w tym scenariuszu:
- Skrypt w Pythonie ściąga dane z API i wrzuca na Apache Kafka
- Aplikacja Kafka Streams przetwarza rekordy, dodaje prędkość, obrót i dystans, a następnie zapisuje w kolenym topic-u
- Logstash ściąga rekordy z topic-a, wrzuca pola (lat, lon) do obiektu location, aby Elasticsearch mógł za indeksować to jako geo_point (zapomniałem o tym w Kafka Streams). Wrzuca rekordy do Elasticsearch-a
- Elasticsearch indeksuje rekordy używając ILM-a
- Oglądamy co się dzieje na mapie w Kibanie.
Wrzucanie rekordów do Apache Kafka
Muszę się Wam przyznać, że znów napisałem prostacki skrypt z nieskończoną pętlą w Pythonie, zamiast zrobić to po ludzku np. w Apache Airflow. Takie uroki proof-of-concept i side-project :-). Na Apache Airflow przyjdą osobne posty. Token potrzebny w skrypcie można dostać tutaj: https://api.um.warszawa.pl/#
import requests import json import time from kafka import KafkaProducer token = 'use your own API Token' url = 'https://api.um.warszawa.pl/api/action/busestrams_get/' resource_id = 'f2e5503e927d-4ad3-9500-4ab9e55deb59' sleep_time = 15 bus_params = { 'apikey':token, 'type':1, 'resource_id': resource_id } tram_params = { 'apikey':token, 'type':2, 'resource_id': resource_id } while True: try: r = requests.get(url = url, params = bus_params) data = r.json() producer = KafkaProducer(bootstrap_servers=['localhost:29092'], value_serializer=lambda x: json.dumps(x).encode('utf-8'), key_serializer=lambda x: x ) print('Sending records...') for record in data['result']: #print(record) future = producer.send('ztm-input', value=record, key=record["VehicleNumber"].encode('utf-8')) result = future.get(timeout=60) except: print("¯\_(ツ)_/¯") time.sleep(sleep_time)
Logstash
To, że aplikacja Kafka Streams wiadomo z poprzednich wpisów. Więc zajmijmy się Logstashem.
Zapomniałem, że pole geo_point wymaga specyficznego schematu JSON-a. Logstash oprócz przerzucenia danych z Kafki na Elasticsearch naprawia ten błąd. Docelowo myślę, że lepiej poprawić Kafka Streams. Wtedy można zastąpić Logstasha inną aplikacją np. Kafka Connect.
input { kafka { topics => "ztm-output" bootstrap_servers => "kafka:9092" codec => "json" } } filter { mutate { add_field => { "[location][lat]" => "%{lat}" "[location][lon]" => "%{lon}" } remove_field => ["lat","lon"] } } output { elasticsearch { hosts => ["elasticsearch:9200"] index => "ztm" } }
Elasticsearch
Zamiast wrzucać rekordy do indeksów w stylu ztm-2020.05.24, ztm-2020.05.25… użyłem mechanizmu Index Lifecycle Management. Pozwala on zautomatyzować życie indeksu. Automatycznie robi rollupy, zmienia właściwości indeksów (architektura hot-warm-cold) w zależności od tego jak skonfigurujemy politykę.
Załóżmy, że chcę by robiony był rollover gdy indeks osiągnie 1 gb lub minie 30 dni.
PUT _ilm/policy/ztm_policy { "policy": { "phases": { "hot":{ "actions": { "rollover": { "max_size": "1gb", "max_age": "30d" } } } } } }
Potrzebny mi jest też template pod który podepnę ztm_policy. Definiuje tu też mapping. Bez mappingu Elasticsearch nie domysliłby się, że location to geo_point, a pole time byłoby zwykłym tekstem.
PUT _template/ztm_template { "index_patterns": ["ztm-*"], "settings": { "number_of_shards": 3, "number_of_replicas": 1, "index.lifecycle.name":"ztm_policy", "index.lifecycle.rollover_alias": "ztm" }, "mappings": { "properties": { "@timestamp": { "type": "date" }, "@version": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "bearing": { "type": "float" }, "brigade": { "type": "keyword" }, "distance": { "type": "float" }, "lines": { "type": "keyword" }, "location": { "type": "geo_point" }, "speed": { "type": "float" }, "time": { "type": "date", "format":"MMM d, yyyy h:mm:ss a" }, "vehicleNumber": { "type": "keyword" } } } }
Teraz można utworzyć pierwszy index z odpowiednim aliasem.
PUT ztm-000001 { "aliases": { "ztm": { "is_write_index":true } } }
Kibana
Index Pattern używa pola @timestamp jako punktu odniesienia rekordu w czasie. Czas pobierany z API jest przesunięty i trochę zakłamywał.

Mapa
Dodajemy mapę wykorzystującą index pattern ztm oraz pole location i dodajemy nową warstwę.


To, co zmieniło się od poprzedniej wersji to:
– Symbol type = arrow-es
– Fill color = zależny od prędkości pojazdu
– Symbol orientation = zalezny od obrotu pojazdu
– Label = numer linii
Działanie

API nie pozwala na częstsze próbkowanie, ale mimo wszystko efekt jest ciekawy.
Dane z kafki do elasticsearch wrzucam przy pomocy kafka-connect.
Ty wykorzystujesz logstash’a, którego zupełnie znam.
Zastanawiam się co jest lepsze i dlaczego lub kiedy to, a kiedy tamto.
Kafka Connect po prostu przerzuca dane z kafki/do kafki bez przetwarzania ich po drodze. Widać to po możliwościach konfiguracji connectora Elasticsearch: https://docs.confluent.io/kafka-connect-elasticsearch/current/configuration_options.html#elasticsearch-overview-config . Fajne jest to, że gdy mamy taki klaster Kafka Connect, możemy connector włączyć i wyłączyć jeśli jest taka potrzeba za pomocą REST API.
Jeśli chodzi o Logstasha, jest to rozwiązanie od Elastic’a, ale dość uniwersalne, bo możemy skonfigurować wiele wejść i wyjść w ramach jednego pipeline’a. Po drodze możemy te dane w jakiś sposób przetworzyć. Jeśli chodzi o zmianę działania, w tym przypadku nie ma REST API, trzeba zmienić plik konfiguracyjny w każdej instancji Logstasha (chyba że mamy płatną licencję od Elastica, w tym centralne zarządzanie Logstashami).
Jeśli chcesz po prostu przerzucić dane z lewa na prawo, Logstash i Kafka Connect będzie tak samo dobry. Warto wziąć pod uwagę fakt, że nie wszystkie connectory w Kafka Connect są darmowe. Np. connector Cassandra od Confluenta jest w ramach wersji enterprise, ale Datastax już udostępnia swoją wersję za darmo. Można też zarzucić Logstashowi, że nie potrafi rozmawiać ze Splunkiem, a do Kafka Connect możemy dostać connector od Splunka :-). Jeśli po drodze chcesz dane zmienić/zbogacić/okroić, wydaje mi się, że Logstash będzie lepszym rozwiązaniem
Dzięki za info,
na chwilę obecną dane, które dostaję na kafkę, przerzucam 1 do 1, ale już teraz wiem, że muszę wzbogacić je danymi z relacyjnej bazy i dopiero do Elastic’a.
Więc chyba bez aplikacji np. w java się nie obędzie. Logstash by sobie z tym poradził ?
Na razie wszystko jest w fazie dość koncepcyjnej więc mam jakąś dowolność i szukam za i przeciw.
Potem już nie będzie takiej elastyczności.
Możesz spróbować użyć jednego z filtrów JDBC:
https://www.elastic.co/guide/en/logstash/current/plugins-filters-jdbc_streaming.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-jdbc_static.html