Jest to drugie podejście Wizualizacja autobusów w Elasticsearch i Kibana. Tym razem wykorzystam napisany wcześniej program w Kafka Streams do obliczenia prędkości i orientacji autobusów, a następnie wrzucę Logstash-em z Apache Kafka do Elasticsearch.
Analizę zebranych w ten sposób danych znajdziesz w kolejnym wpisie. Jak na darmową licencję, możliwości mogą zdziwić.
Środowisko
Docker-compose już trochę puchnie. Znajduje się w nim Elasticsearch, Kibana, Zookeeper, Kafka, Logstash, no i moja aplikacja Kafka Streams którą wrzuciłem na Docker Hub. Użytkownicy Kafki z niecierpliwością czekają na pozbycie się Zookeeper-a :-).
version: '3.3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0
restart: unless-stopped
environment:
- discovery.type=single-node
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- esdata:/usr/share/elasticsearch/data
ports:
- 9200:9200
kibana:
image: docker.elastic.co/kibana/kibana:7.6.2
restart: unless-stopped
depends_on:
- elasticsearch
ports:
- 5601:5601
volumes:
- kibanadata:/usr/share/kibana/data
zookeeper:
image: 'bitnami/zookeeper:3'
ports:
- '2181:2181'
volumes:
- 'zookeeper_data:/bitnami'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:2'
ports:
- '9092:9092'
- '29092:29092'
volumes:
- 'kafka_data:/bitnami'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
depends_on:
- zookeeper
ztm_kafka_streams:
image: "maciejszymczyk/ztm_stream:1.0"
environment:
- APPLICATION_ID_CONFIG=awesome_overrided_ztm_stream_app_id
- BOOTSTRAP_SERVERS_CONFIG=kafka:9092
depends_on:
- kafka
logstash:
image: docker.elastic.co/logstash/logstash:7.7.0
volumes:
- "./pipeline:/usr/share/logstash/pipeline"
environment:
LS_JAVA_OPTS: "-Xmx256m -Xms256m"
depends_on:
- elasticsearch
- kafka
volumes:
esdata:
driver: local
kibanadata:
driver: local
zookeeper_data:
driver: local
kafka_data:
driver: local
Data Flow
Wyjaśnijmy sobie, jak wygląda przepływ danych w tym scenariuszu:
- Skrypt w Pythonie ściąga dane z API i wrzuca na Apache Kafka
- Aplikacja Kafka Streams przetwarza rekordy, dodaje prędkość, obrót i dystans, a następnie zapisuje w kolenym topic-u
- Logstash ściąga rekordy z topic-a, wrzuca pola (lat, lon) do obiektu location, aby Elasticsearch mógł za indeksować to jako geo_point (zapomniałem o tym w Kafka Streams). Wrzuca rekordy do Elasticsearch-a
- Elasticsearch indeksuje rekordy używając ILM-a
- Oglądamy co się dzieje na mapie w Kibanie.
Wrzucanie rekordów do Apache Kafka
Muszę się Wam przyznać, że znów napisałem prostacki skrypt z nieskończoną pętlą w Pythonie, zamiast zrobić to po ludzku np. w Apache Airflow. Takie uroki proof-of-concept i side-project :-). Na Apache Airflow przyjdą osobne posty. Token potrzebny w skrypcie można dostać tutaj: https://api.um.warszawa.pl/#
import requests
import json
import time
from kafka import KafkaProducer
token = 'use your own API Token'
url = 'https://api.um.warszawa.pl/api/action/busestrams_get/'
resource_id = 'f2e5503e927d-4ad3-9500-4ab9e55deb59'
sleep_time = 15
bus_params = {
'apikey':token,
'type':1,
'resource_id': resource_id
}
tram_params = {
'apikey':token,
'type':2,
'resource_id': resource_id
}
while True:
try:
r = requests.get(url = url, params = bus_params)
data = r.json()
producer = KafkaProducer(bootstrap_servers=['localhost:29092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
key_serializer=lambda x: x
)
print('Sending records...')
for record in data['result']:
#print(record)
future = producer.send('ztm-input', value=record, key=record["VehicleNumber"].encode('utf-8'))
result = future.get(timeout=60)
except:
print("¯\_(ツ)_/¯")
time.sleep(sleep_time)
Logstash
To, że aplikacja Kafka Streams wiadomo z poprzednich wpisów. Więc zajmijmy się Logstashem.
Zapomniałem, że pole geo_point wymaga specyficznego schematu JSON-a. Logstash oprócz przerzucenia danych z Kafki na Elasticsearch naprawia ten błąd. Docelowo myślę, że lepiej poprawić Kafka Streams. Wtedy można zastąpić Logstasha inną aplikacją np. Kafka Connect.
input {
kafka {
topics => "ztm-output"
bootstrap_servers => "kafka:9092"
codec => "json"
}
}
filter {
mutate {
add_field => {
"[location][lat]" => "%{lat}"
"[location][lon]" => "%{lon}"
}
remove_field => ["lat","lon"]
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "ztm"
}
}
Elasticsearch
Zamiast wrzucać rekordy do indeksów w stylu ztm-2020.05.24, ztm-2020.05.25… użyłem mechanizmu Index Lifecycle Management. Pozwala on zautomatyzować życie indeksu. Automatycznie robi rollupy, zmienia właściwości indeksów (architektura hot-warm-cold) w zależności od tego jak skonfigurujemy politykę.
Załóżmy, że chcę by robiony był rollover gdy indeks osiągnie 1 gb lub minie 30 dni.
PUT _ilm/policy/ztm_policy
{
"policy": {
"phases": {
"hot":{
"actions": {
"rollover": {
"max_size": "1gb",
"max_age": "30d"
}
}
}
}
}
}
Potrzebny mi jest też template pod który podepnę ztm_policy. Definiuje tu też mapping. Bez mappingu Elasticsearch nie domysliłby się, że location to geo_point, a pole time byłoby zwykłym tekstem.
PUT _template/ztm_template
{
"index_patterns": ["ztm-*"],
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name":"ztm_policy",
"index.lifecycle.rollover_alias": "ztm"
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"bearing": {
"type": "float"
},
"brigade": {
"type": "keyword"
},
"distance": {
"type": "float"
},
"lines": {
"type": "keyword"
},
"location": {
"type": "geo_point"
},
"speed": {
"type": "float"
},
"time": {
"type": "date",
"format":"MMM d, yyyy h:mm:ss a"
},
"vehicleNumber": {
"type": "keyword"
}
}
}
}
Teraz można utworzyć pierwszy index z odpowiednim aliasem.
PUT ztm-000001
{
"aliases": {
"ztm": {
"is_write_index":true
}
}
}
Kibana
Index Pattern używa pola @timestamp jako punktu odniesienia rekordu w czasie. Czas pobierany z API jest przesunięty i trochę zakłamywał.

Mapa
Dodajemy mapę wykorzystującą index pattern ztm oraz pole location i dodajemy nową warstwę.


To, co zmieniło się od poprzedniej wersji to:
– Symbol type = arrow-es
– Fill color = zależny od prędkości pojazdu
– Symbol orientation = zalezny od obrotu pojazdu
– Label = numer linii
Działanie

API nie pozwala na częstsze próbkowanie, ale mimo wszystko efekt jest ciekawy.
Dane z kafki do elasticsearch wrzucam przy pomocy kafka-connect.
Ty wykorzystujesz logstash’a, którego zupełnie znam.
Zastanawiam się co jest lepsze i dlaczego lub kiedy to, a kiedy tamto.
Kafka Connect po prostu przerzuca dane z kafki/do kafki bez przetwarzania ich po drodze. Widać to po możliwościach konfiguracji connectora Elasticsearch: https://docs.confluent.io/kafka-connect-elasticsearch/current/configuration_options.html#elasticsearch-overview-config . Fajne jest to, że gdy mamy taki klaster Kafka Connect, możemy connector włączyć i wyłączyć jeśli jest taka potrzeba za pomocą REST API.
Jeśli chodzi o Logstasha, jest to rozwiązanie od Elastic’a, ale dość uniwersalne, bo możemy skonfigurować wiele wejść i wyjść w ramach jednego pipeline’a. Po drodze możemy te dane w jakiś sposób przetworzyć. Jeśli chodzi o zmianę działania, w tym przypadku nie ma REST API, trzeba zmienić plik konfiguracyjny w każdej instancji Logstasha (chyba że mamy płatną licencję od Elastica, w tym centralne zarządzanie Logstashami).
Jeśli chcesz po prostu przerzucić dane z lewa na prawo, Logstash i Kafka Connect będzie tak samo dobry. Warto wziąć pod uwagę fakt, że nie wszystkie connectory w Kafka Connect są darmowe. Np. connector Cassandra od Confluenta jest w ramach wersji enterprise, ale Datastax już udostępnia swoją wersję za darmo. Można też zarzucić Logstashowi, że nie potrafi rozmawiać ze Splunkiem, a do Kafka Connect możemy dostać connector od Splunka :-). Jeśli po drodze chcesz dane zmienić/zbogacić/okroić, wydaje mi się, że Logstash będzie lepszym rozwiązaniem
Dzięki za info,
na chwilę obecną dane, które dostaję na kafkę, przerzucam 1 do 1, ale już teraz wiem, że muszę wzbogacić je danymi z relacyjnej bazy i dopiero do Elastic’a.
Więc chyba bez aplikacji np. w java się nie obędzie. Logstash by sobie z tym poradził ?
Na razie wszystko jest w fazie dość koncepcyjnej więc mam jakąś dowolność i szukam za i przeciw.
Potem już nie będzie takiej elastyczności.
Możesz spróbować użyć jednego z filtrów JDBC:
https://www.elastic.co/guide/en/logstash/current/plugins-filters-jdbc_streaming.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-jdbc_static.html