Kafka Streams 201 – Obliczanie prędkości, Processor API, KeyValueStore

Czasami klasyczne Kafka DSL nam nie wystarcza. Processor API pozwala na dowolne zdefiniowanie procesora, a co najlepsze, wykorzystanie State Store. W tym przypadku obliczymy prędkość, kierunek i dystans pojazdów komunikacji miejskiej w Warszwie.

Rozkład jazdy

Źródło

W tym wpisie będę wykorzystywał rekordy autobusów ZTM w Warszawie. API znajdziesz na tej stronie. Innych źródeł danych możesz szukać w tym wpisie. Pojedynczy rekord z autobusu/tramwaju wygląda tak:

{
   "Lines":"130",
   "Lon":21.003338,
   "VehicleNumber":"1000",
   "Time":"2020-04-23 20:22:55",
   "Lat":52.206166,
   "Brigade":"3"
}

Plan

Plan jest taki:

  • Producent wrzuca rekord na topic ztm-input
  • Kafka Streams
    • Filtruje strumień z duplikatów i starych rekordów
    • Oblicza prędkość, dystans, kierunek
    • Wrzuca rekord na topic ztm-output
  • Konsument pobiera rekordy z ztm-output i wrzuca je na Elasticsearch
  • Podziwiamy mapy i dashboardy

Tutaj skupimy się na części związanej z Kafka Streams.

Dlaczego Processor API?

Umożliwia wykorzystanie State Store, czyli zapisanie stanu rekordu/strumienia. We wpisie poświęconym Apache Spark, wykorzystywałem funkcje oknowe (Window Functions). Tutaj mechanizm nie polega na micro batchach, więc będziemy mogli precyzyjnie przetwarzać rekord po rekordzie. Moim planem jest odkładanie ostatniego rekordu na KeyValueStore i porównywanie go z kolejnym przychodzącym.

No to lecimy

Poruszałem kwestię testów w Kafce w tym wpisie. Tutaj bez testów się nie obejdzie. Wrzucanie rekordów na prawdziwą Kafkę strasznie wydłużyłoby proces implementacji takiego strumienia.

Odczyt danych z Kafki

Musimy napisać klasę odpowiadającą rekordom z Kafki, dlatego powstała taka pośrednia klasa InputZtmRecord.

public class InputZtmRecord {
    @SerializedName("Lines")
    public String lines;
    @SerializedName("Lon")
    public double lon;
    @SerializedName("Lat")
    public double lat;
    @SerializedName("VehicleNumber")
    public String vehicleNumber;
    @SerializedName("Brigade")
    public String brigade;
    @SerializedName("Time")
    public Date time;
}

Docelowo zmapujemy ją w docelową klasę ZtmRecord.

public class ZtmRecord {
    public String lines;
    public double lon;
    public double lat;
    public String vehicleNumber;
    public String brigade;
    public Date time;
    public double speed;
    public double distance;
    public double bearing;
}

Chciałem zachować pewną elastyczność między klasami. Ekpserymentowałem też z różnymi typami daty, ale ostatecznie zostałem przy starym java.util.Date, bo Gson sprawiał z nią najmniej kłopotów. Deserializer mapuje jedną klasę w drugą.

public class InputZtmRecordToZtmRecordDeserializer implements Deserializer<ZtmRecord> {
    private static final Charset CHARSET = Charset.forName("UTF-8");
    static private Gson gson = new GsonBuilder()
            .setDateFormat("yyyy-MM-dd HH:mm:ss")
            .create();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public ZtmRecord deserialize(String s, byte[] bytes) {
            String rawRecord = new String(bytes, CHARSET);
            InputZtmRecord inputRecord = gson.fromJson(rawRecord, InputZtmRecord.class);
            return new ZtmRecord(inputRecord);
    }

    @Override
    public void close() {

    }
}

Testy

Eksperymentując z datami, miałem trochę problemów z de/serializacją. Pomogłem sobie pisząc testy. Poniżej jeden z nich.

    @Test
    public void ztmRecordSerializationWorks(){
        InputZtmRecordDeserializer deserializer = new InputZtmRecordDeserializer();

        String rawRecord = "{\"Lines\": \"204\", \"Lon\": 21.043399, \"VehicleNumber\": \"1042\", \"Time\": \"2020-04-24 21:14:34\", \"Lat\": 52.26617, \"Brigade\": \"04\"}";
        InputZtmRecord record =  deserializer.deserialize(null, rawRecord.getBytes());

        Assert.assertEquals("204",record.lines);
        Assert.assertEquals(21.043399,record.lon,0.0001);
        Assert.assertEquals(52.26617,record.lat,0.0001);
        Assert.assertEquals("04",record.brigade);
        Assert.assertEquals("1042",record.vehicleNumber);
        Assert.assertEquals("204",record.lines);
        Assert.assertEquals(Date.from(LocalDateTime.of(2020,04,24,21,14,34).atZone(ZoneOffset.systemDefault()).toInstant()),record.time);
    }

Topologia

Podejście wykorzystujące Process API różni się od Kafka DSL. Można je łączyć (co uczynię w następnych wpisach), ale tutaj skupmy się na procesach. W Kafka DSL pipe-owaliśmy flow danych. Tutaj tez pipe-ujemy, ale źródła (rodziców w acyklicznym grafie skierowanym) każdego z procesów definiujemy po prostu stringiem.

    public Topology createTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        final Serde<ZtmRecord> outputZtmRecordSerde = Serdes.serdeFrom(new GenericSerializer(), new ZtmRecordDeserializer());///new ZtmRecordDeserializer());

        StoreBuilder ztmStoreBuilder =
                Stores.keyValueStoreBuilder(
                        Stores.persistentKeyValueStore("ztmStore"),
                        Serdes.String(),
                        outputZtmRecordSerde
                );

        Topology topology = new Topology();
        topology.addSource("Source",new StringDeserializer(), new InputZtmRecordToZtmRecordDeserializer(),INPUT_TOPIC)
                .addProcessor("ZtmProcess", () -> new ZtmProcessor(), "Source")
                .addStateStore(ztmStoreBuilder, "ZtmProcess")
                .addSink("Sink", OUTPUT_TOPIC, new StringSerializer(), new GenericSerializer(),"ZtmProcess");

        return topology;
    }
  • 5 – 10 -> definiujemy skład którego użyjemy w procesorze
  • 13 -> definiujemy z jakiego topic-a pobieramy dane
  • 14 -> dodajemy procesor, wskazujemy jego rodzica
  • 15 -> przypisujemy wcześniej utworzony skład do procesora
  • 16 -> dodajemy ujście naszego strumienia, czyli nazwa topica oraz rodzica skąd rekordy mają płynąć.

Procesor

Najpierw pokażę zarys procesora. Omówimy Pi razy drzwi jak to wygląda. Następnie przejdziemy do omówienia flow przetwarzania.

public class ZtmProcessor implements Processor<String, ZtmRecord> {
    public static final int SUSPICIOUS_SPEED = 120;
    private ProcessorContext context;
    private KeyValueStore<String, ZtmRecord> ztmRecordStore;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        ztmRecordStore = (KeyValueStore) context.getStateStore("ztmStore");
    }

    @Override
    public void process(String key, ZtmRecord record) {
        // some awesome code
    }

    // some awesome code

    @Override
    public void close() {
    }
}

Cała magia dzieje się w metodzie process, jednak najpierw trzeba pobrać StateStore o odpowiedniej nazwie (linia 4) oraz zachować referencję na ProcessorContext. Kontekst pozwoli na odczyt danych o rekordzie (np. headery) oraz akcje takie jak podanie rekordu dalej.

    @Override
    public void process(String key, ZtmRecord record) {
        ZtmRecord previousRecord = ztmRecordStore.get(key);
        if (previousRecord == null) {
            ztmRecordStore.put(key, record);
            context.forward(key, record);
            return;
        }
        if (previousRecord.time.compareTo(record.time) >= 0) {
            return; // ignore old/same record
        }
        record = calculateRecord(previousRecord, record);
        if (record.speed > SUSPICIOUS_SPEED)
        {
            return; // probably measurement error
        }
        ztmRecordStore.put(key, record);
        context.forward(key, record);
    }

    private ZtmRecord calculateRecord(ZtmRecord previousRecord, ZtmRecord record) {
        double lat1 = previousRecord.lat;
        double lat2 = record.lat;
        double lon1 = previousRecord.lon;
        double lon2 = record.lon;
        record.distance = GeoTools.calculateDistanceInKilometers(lat1, lat2, lon1, lon2);
        record.bearing = GeoTools.calculateBearing(lat1, lat2, lon1, lon2);
        record.speed = GeoTools.calculateSpeed(record.distance,previousRecord.time, record.time);
        return record;
    }
  • 3 -> pobranie poprzedniego rekordu
  • 4-8 -> jak nie ma poprzednika, zapisanie rekordu w składzie i przekazanie go dalej
  • 9-11 -> ignorowanie duplikatów, starych rekordów
  • 12 -> obliczenie prędkości, dystansu, kierunku
  • 13-16 -> założyłem, że prędkości ponad 120 kph to błędy czujników GPS.
  • 17-10 -> zapisanie rekordu do składu i przekazanie go dalej

Pewnie sporo rzeczy wyjdzie, jak podepnie się źródło i wrzuci rekordy na Elasticsearch. Obliczenia przeniosłem do osobnej klasy GeoTools.

public class GeoTools {
    public static double calculateDistanceInKilometers(double lat1, double lat2, double lon1, double lon2) {
        if ((lat1 == lat2) && (lon1 == lon2)) {
            return 0;
        } else {
            double theta = lon1 - lon2;
            double dist = Math.sin(Math.toRadians(lat1)) * Math.sin(Math.toRadians(lat2)) + Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) * Math.cos(Math.toRadians(theta));
            dist = Math.acos(dist);
            dist = Math.toDegrees(dist);
            dist = dist * 60 * 1.1515;
            dist = dist * 1.609344;
            return dist;
        }
    }

    public static double calculateBearing(double lat1, double lat2, double lon1, double lon2) {
        double latitude1 = Math.toRadians(lat1);
        double latitude2 = Math.toRadians(lat2);
        double longDiff = Math.toRadians(lon2 - lon1);
        double y = Math.sin(longDiff) * Math.cos(latitude2);
        double x = Math.cos(latitude1) * Math.sin(latitude2) - Math.sin(latitude1) * Math.cos(latitude2) * Math.cos(longDiff);
        return (Math.toDegrees(Math.atan2(y, x)) + 360) % 360;
    }

    public static double calculateSpeed(double distance, Date previousTime, Date time) {
        double millis = time.getTime() - previousTime.getTime();
        double hours = millis / (1000 * 60 * 60);
        if (hours <= 0) {
            return 0;
        }
        return distance / hours;
    }
}

Testy

public class ZtmStreamTest {

    TopologyTestDriver testDriver;

    StringSerializer stringSerializer = new StringSerializer();
    StringDeserializer stringDeserializer = new StringDeserializer();
    ZtmRecordDeserializer ztmRecordDeserializer = new ZtmRecordDeserializer();
    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, ZtmRecord> outputTopic;

    @Before
    public void prepareTopologyTestDriver() {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wiaderko-ztm-stream-test");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "doesnt-matter:1337");

        ZtmStream ztmStream = new ZtmStream();
        Topology topology = ztmStream.createTopology();
        testDriver = new TopologyTestDriver(topology, config);
        inputTopic = testDriver.createInputTopic(ZtmStream.INPUT_TOPIC, stringSerializer, stringSerializer);
        outputTopic = testDriver.createOutputTopic(ZtmStream.OUTPUT_TOPIC, stringDeserializer, ztmRecordDeserializer);
    }

    @After
    public void closeTestDriver() {
        testDriver.close();
    }

    @Test
    public void streamDropsSameRecords() {
        String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
        String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
        inputTopic.pipeInput("1037", firstRecord);
        inputTopic.pipeInput("1037", secondRecord);
        List<ZtmRecord> output = outputTopic.readValuesToList();
        Assert.assertEquals(1, output.size());
    }

    @Test
    public void streamDropsOldRecords() {
        String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:17:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
        String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
        inputTopic.pipeInput("1037", firstRecord);
        inputTopic.pipeInput("1037", secondRecord);
        List<ZtmRecord> output = outputTopic.readValuesToList();
        Assert.assertEquals(1, output.size());
    }

    @Test
    public void streamComputesAreCorrectly() {
        String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
        String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.078823, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:17:34\", \"Lat\": 52.199871, \"Brigade\": \"51\"}";
        inputTopic.pipeInput("1037", firstRecord);
        inputTopic.pipeInput("1037", secondRecord);
        List<ZtmRecord> output = outputTopic.readValuesToList();
        ZtmRecord record = output.get(1);
        Assert.assertTrue(record.bearing > 0);
        Assert.assertTrue(record.speed > 0);
        Assert.assertTrue(record.distance > 0);
        Assert.assertEquals(0.1734, record.distance, 0.0001);
        Assert.assertEquals(134, record.bearing, 1);
        Assert.assertEquals(15.227, record.speed, 0.001);
    }

}

Czy z prawdziwą Kafką też działa?

Napisałem na szybko skrypt wrzucający rekordy do Kafki w pythonie.

import requests 
import json
from kafka import KafkaProducer

token = 'paste-your-token-here'
url = 'https://api.um.warszawa.pl/api/action/busestrams_get/'
resource_id = 'f2e5503e927d-4ad3-9500-4ab9e55deb59'
  
bus_params = {
    'apikey':token,
    'type':1,
    'resource_id': resource_id
    }
tram_params = {
    'apikey':token,
    'type':2,
    'resource_id': resource_id
    }
  
r = requests.get(url = url, params = bus_params)
  
data = r.json() 

producer = KafkaProducer(bootstrap_servers=['localhost:29092'],
                            value_serializer=lambda x: json.dumps(x).encode('utf-8'),
                            key_serializer=lambda x: x
                            )

for record in data['result']:
    print(record)
    future = producer.send('ztm-input', value=record, key=record["VehicleNumber"].encode('utf-8'))
    result = future.get(timeout=60)
    

Repozytorium

https://github.com/zorteran/wiadro-danych-kafka-streams/tree/kafka_streams_201

Branch: kafka_streams_201

Podsumowanie

Jak widać powyżej, działa. Pozostaje kwestia podłączenia pod jakąś infrastrukturę i sprawdzenia, czy dane mają sens. Może po drodze próba zDockeryzowania takiej aplikacji Kafka Streams? 😉

Jeszcze raz chcę wspomnieć o wartości, jaką dały testy w tym projekcie. Nie wyobrażam sobie wrzucania za każdym razem ręczne rekordów w celu sprawdzenia implementacji.

5 komentarzy do “Kafka Streams 201 – Obliczanie prędkości, Processor API, KeyValueStore”

  1. Fajnie się bawisz. Mam trochę bardziej praktyczny pomysł na zagadkę. Jak ustalić w którą stronę jedzie autobus. Czyli czy jedzie trasą czy może nią wraca w przeciwną stronę. To taka trochę trudniejsza zagadka – ale wszystko jest dla ludzi 😉

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *