Wizualizacja autobusów w Elasticsearch i Kibana – podejście strumieniowe – Kafka Streams, Logstash

Jest to drugie podejście Wizualizacja autobusów w Elasticsearch i Kibana. Tym razem wykorzystam napisany wcześniej program w Kafka Streams do obliczenia prędkości i orientacji autobusów, a następnie wrzucę Logstash-em z Apache Kafka do Elasticsearch.

Analizę zebranych w ten sposób danych znajdziesz w kolejnym wpisie. Jak na darmową licencję, możliwości mogą zdziwić.

Środowisko

Docker-compose już trochę puchnie. Znajduje się w nim Elasticsearch, Kibana, Zookeeper, Kafka, Logstash, no i moja aplikacja Kafka Streams którą wrzuciłem na Docker Hub. Użytkownicy Kafki z niecierpliwością czekają na pozbycie się Zookeeper-a :-).

version: '3.3'
services:
    elasticsearch:
        image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0
        restart: unless-stopped
        environment:
        - discovery.type=single-node
        - bootstrap.memory_lock=true
        - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        ulimits:
            memlock:
                soft: -1
                hard: -1
        volumes:
        - esdata:/usr/share/elasticsearch/data
        ports:
        - 9200:9200

    kibana:
        image: docker.elastic.co/kibana/kibana:7.6.2
        restart: unless-stopped
        depends_on:
            - elasticsearch
        ports:
            - 5601:5601
        volumes:
            - kibanadata:/usr/share/kibana/data

    zookeeper:
        image: 'bitnami/zookeeper:3'
        ports:
            - '2181:2181'
        volumes:
            - 'zookeeper_data:/bitnami'
        environment:
            - ALLOW_ANONYMOUS_LOGIN=yes
            
    kafka:
        image: 'bitnami/kafka:2'
        ports:
            - '9092:9092'
            - '29092:29092'
        volumes:
            - 'kafka_data:/bitnami'
        environment:
            - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
            - ALLOW_PLAINTEXT_LISTENER=yes
            - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
            - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
        depends_on:
            - zookeeper

    ztm_kafka_streams:
        image: "maciejszymczyk/ztm_stream:1.0"
        environment:
          - APPLICATION_ID_CONFIG=awesome_overrided_ztm_stream_app_id
          - BOOTSTRAP_SERVERS_CONFIG=kafka:9092
        depends_on:
          - kafka

    logstash:
        image: docker.elastic.co/logstash/logstash:7.7.0
        volumes:
            - "./pipeline:/usr/share/logstash/pipeline"
        environment:
            LS_JAVA_OPTS: "-Xmx256m -Xms256m"
        depends_on:
            - elasticsearch
            - kafka
  
volumes:
    esdata:
        driver: local
    kibanadata:
        driver: local
    zookeeper_data:
        driver: local
    kafka_data:
        driver: local

Data Flow

Wyjaśnijmy sobie, jak wygląda przepływ danych w tym scenariuszu:

  1. Skrypt w Pythonie ściąga dane z API i wrzuca na Apache Kafka
  2. Aplikacja Kafka Streams przetwarza rekordy, dodaje prędkość, obrót i dystans, a następnie zapisuje w kolenym topic-u
  3. Logstash ściąga rekordy z topic-a, wrzuca pola (lat, lon) do obiektu location, aby Elasticsearch mógł za indeksować to jako geo_point (zapomniałem o tym w Kafka Streams). Wrzuca rekordy do Elasticsearch-a
  4. Elasticsearch indeksuje rekordy używając ILM-a
  5. Oglądamy co się dzieje na mapie w Kibanie.

Wrzucanie rekordów do Apache Kafka

Muszę się Wam przyznać, że znów napisałem prostacki skrypt z nieskończoną pętlą w Pythonie, zamiast zrobić to po ludzku np. w Apache Airflow. Takie uroki proof-of-concept i side-project :-). Na Apache Airflow przyjdą osobne posty. Token potrzebny w skrypcie można dostać tutaj: https://api.um.warszawa.pl/#

import requests 
import json
import time
from kafka import KafkaProducer

token = 'use your own API Token'
url = 'https://api.um.warszawa.pl/api/action/busestrams_get/'
resource_id = 'f2e5503e927d-4ad3-9500-4ab9e55deb59'
sleep_time = 15

bus_params = {
    'apikey':token,
    'type':1,
    'resource_id': resource_id
    }
tram_params = {
    'apikey':token,
    'type':2,
    'resource_id': resource_id
    }

while True:
    try:
        r = requests.get(url = url, params = bus_params)
        data = r.json() 
        producer = KafkaProducer(bootstrap_servers=['localhost:29092'],
                                value_serializer=lambda x: json.dumps(x).encode('utf-8'),
                                key_serializer=lambda x: x
                                )

        print('Sending records...')
        for record in data['result']:
            #print(record)
            future = producer.send('ztm-input', value=record, key=record["VehicleNumber"].encode('utf-8'))
            result = future.get(timeout=60)
    except:
        print("¯\_(ツ)_/¯")
    time.sleep(sleep_time)

Logstash

To, że aplikacja Kafka Streams wiadomo z poprzednich wpisów. Więc zajmijmy się Logstashem.

Zapomniałem, że pole geo_point wymaga specyficznego schematu JSON-a. Logstash oprócz przerzucenia danych z Kafki na Elasticsearch naprawia ten błąd. Docelowo myślę, że lepiej poprawić Kafka Streams. Wtedy można zastąpić Logstasha inną aplikacją np. Kafka Connect.

input {
	kafka {
		topics => "ztm-output"
		bootstrap_servers => "kafka:9092"
        	codec => "json"
	}
}
filter {
    mutate {
        add_field => {
            "[location][lat]" => "%{lat}"
            "[location][lon]" => "%{lon}"
            }
        remove_field => ["lat","lon"]
    }
}
output {

	elasticsearch {
    		hosts => ["elasticsearch:9200"]
            index => "ztm"
  	}
}

Elasticsearch

Zamiast wrzucać rekordy do indeksów w stylu ztm-2020.05.24, ztm-2020.05.25… użyłem mechanizmu Index Lifecycle Management. Pozwala on zautomatyzować życie indeksu. Automatycznie robi rollupy, zmienia właściwości indeksów (architektura hot-warm-cold) w zależności od tego jak skonfigurujemy politykę.

Załóżmy, że chcę by robiony był rollover gdy indeks osiągnie 1 gb lub minie 30 dni.

PUT _ilm/policy/ztm_policy
{
  "policy": {
    "phases": {
      "hot":{
        "actions": {
          "rollover": {
            "max_size": "1gb",
            "max_age": "30d"
          }
        }
      }
    }
  }
}

Potrzebny mi jest też template pod który podepnę ztm_policy. Definiuje tu też mapping. Bez mappingu Elasticsearch nie domysliłby się, że location to geo_point, a pole time byłoby zwykłym tekstem.

PUT _template/ztm_template
{
  "index_patterns": ["ztm-*"],
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "index.lifecycle.name":"ztm_policy",
    "index.lifecycle.rollover_alias": "ztm"
  },
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "@version": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "bearing": {
        "type": "float"
      },
      "brigade": {
        "type": "keyword"
      },
      "distance": {
        "type": "float"
      },
      "lines": {
        "type": "keyword"
      },
      "location": {
        "type": "geo_point"
      },
      "speed": {
        "type": "float"
      },
      "time": {
        "type": "date",
        "format":"MMM d, yyyy h:mm:ss a"
      },
      "vehicleNumber": {
        "type": "keyword"
      }
    }
  }
}

Teraz można utworzyć pierwszy index z odpowiednim aliasem.

PUT ztm-000001
{
  "aliases": {
    "ztm": {
      "is_write_index":true
    }
  }
}

Kibana

Index Pattern używa pola @timestamp jako punktu odniesienia rekordu w czasie. Czas pobierany z API jest przesunięty i trochę zakłamywał.

Mapa

Dodajemy mapę wykorzystującą index pattern ztm oraz pole location i dodajemy nową warstwę.

Chcemy widzieć tylko najnowszy rekord per pojazd

To, co zmieniło się od poprzedniej wersji to:
– Symbol type = arrow-es
– Fill color = zależny od prędkości pojazdu
– Symbol orientation = zalezny od obrotu pojazdu
– Label = numer linii

Działanie

API nie pozwala na częstsze próbkowanie, ale mimo wszystko efekt jest ciekawy.

Repozytorium

https://github.com/zorteran/wiadro-danych-kafka-to-es-ztm

5 komentarzy do “Wizualizacja autobusów w Elasticsearch i Kibana – podejście strumieniowe – Kafka Streams, Logstash”

  1. Dane z kafki do elasticsearch wrzucam przy pomocy kafka-connect.
    Ty wykorzystujesz logstash’a, którego zupełnie znam.
    Zastanawiam się co jest lepsze i dlaczego lub kiedy to, a kiedy tamto.

    1. Kafka Connect po prostu przerzuca dane z kafki/do kafki bez przetwarzania ich po drodze. Widać to po możliwościach konfiguracji connectora Elasticsearch: https://docs.confluent.io/kafka-connect-elasticsearch/current/configuration_options.html#elasticsearch-overview-config . Fajne jest to, że gdy mamy taki klaster Kafka Connect, możemy connector włączyć i wyłączyć jeśli jest taka potrzeba za pomocą REST API.

      Jeśli chodzi o Logstasha, jest to rozwiązanie od Elastic’a, ale dość uniwersalne, bo możemy skonfigurować wiele wejść i wyjść w ramach jednego pipeline’a. Po drodze możemy te dane w jakiś sposób przetworzyć. Jeśli chodzi o zmianę działania, w tym przypadku nie ma REST API, trzeba zmienić plik konfiguracyjny w każdej instancji Logstasha (chyba że mamy płatną licencję od Elastica, w tym centralne zarządzanie Logstashami).

      Jeśli chcesz po prostu przerzucić dane z lewa na prawo, Logstash i Kafka Connect będzie tak samo dobry. Warto wziąć pod uwagę fakt, że nie wszystkie connectory w Kafka Connect są darmowe. Np. connector Cassandra od Confluenta jest w ramach wersji enterprise, ale Datastax już udostępnia swoją wersję za darmo. Można też zarzucić Logstashowi, że nie potrafi rozmawiać ze Splunkiem, a do Kafka Connect możemy dostać connector od Splunka :-). Jeśli po drodze chcesz dane zmienić/zbogacić/okroić, wydaje mi się, że Logstash będzie lepszym rozwiązaniem

      1. Dzięki za info,
        na chwilę obecną dane, które dostaję na kafkę, przerzucam 1 do 1, ale już teraz wiem, że muszę wzbogacić je danymi z relacyjnej bazy i dopiero do Elastic’a.
        Więc chyba bez aplikacji np. w java się nie obędzie. Logstash by sobie z tym poradził ?
        Na razie wszystko jest w fazie dość koncepcyjnej więc mam jakąś dowolność i szukam za i przeciw.
        Potem już nie będzie takiej elastyczności.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *