Czasami klasyczne Kafka DSL nam nie wystarcza. Processor API pozwala na dowolne zdefiniowanie procesora, a co najlepsze, wykorzystanie State Store. W tym przypadku obliczymy prędkość, kierunek i dystans pojazdów komunikacji miejskiej w Warszwie.
Rozkład jazdy
- Kafka Streams 101 – de/serilizacja
- Kafka Streams 102 – Wyjątki i Dead Letter Queue
- Kafka Streams 103 – Pisz testy, zapomnij o Kafce
- Kafka Streams 201 – Obliczanie prędkości, Processor API, KeyValueStore ✔
Źródło
W tym wpisie będę wykorzystywał rekordy autobusów ZTM w Warszawie. API znajdziesz na tej stronie. Innych źródeł danych możesz szukać w tym wpisie. Pojedynczy rekord z autobusu/tramwaju wygląda tak:
{
"Lines":"130",
"Lon":21.003338,
"VehicleNumber":"1000",
"Time":"2020-04-23 20:22:55",
"Lat":52.206166,
"Brigade":"3"
}
Plan
Plan jest taki:
- Producent wrzuca rekord na topic ztm-input
- Kafka Streams
- Filtruje strumień z duplikatów i starych rekordów
- Oblicza prędkość, dystans, kierunek
- Wrzuca rekord na topic ztm-output
- Konsument pobiera rekordy z ztm-output i wrzuca je na Elasticsearch
- Podziwiamy mapy i dashboardy
Tutaj skupimy się na części związanej z Kafka Streams.
Dlaczego Processor API?
Umożliwia wykorzystanie State Store, czyli zapisanie stanu rekordu/strumienia. We wpisie poświęconym Apache Spark, wykorzystywałem funkcje oknowe (Window Functions). Tutaj mechanizm nie polega na micro batchach, więc będziemy mogli precyzyjnie przetwarzać rekord po rekordzie. Moim planem jest odkładanie ostatniego rekordu na KeyValueStore i porównywanie go z kolejnym przychodzącym.
No to lecimy
Poruszałem kwestię testów w Kafce w tym wpisie. Tutaj bez testów się nie obejdzie. Wrzucanie rekordów na prawdziwą Kafkę strasznie wydłużyłoby proces implementacji takiego strumienia.
Odczyt danych z Kafki
Musimy napisać klasę odpowiadającą rekordom z Kafki, dlatego powstała taka pośrednia klasa InputZtmRecord.
public class InputZtmRecord {
@SerializedName("Lines")
public String lines;
@SerializedName("Lon")
public double lon;
@SerializedName("Lat")
public double lat;
@SerializedName("VehicleNumber")
public String vehicleNumber;
@SerializedName("Brigade")
public String brigade;
@SerializedName("Time")
public Date time;
}
Docelowo zmapujemy ją w docelową klasę ZtmRecord.
public class ZtmRecord {
public String lines;
public double lon;
public double lat;
public String vehicleNumber;
public String brigade;
public Date time;
public double speed;
public double distance;
public double bearing;
}
Chciałem zachować pewną elastyczność między klasami. Ekpserymentowałem też z różnymi typami daty, ale ostatecznie zostałem przy starym java.util.Date, bo Gson sprawiał z nią najmniej kłopotów. Deserializer mapuje jedną klasę w drugą.
public class InputZtmRecordToZtmRecordDeserializer implements Deserializer<ZtmRecord> {
private static final Charset CHARSET = Charset.forName("UTF-8");
static private Gson gson = new GsonBuilder()
.setDateFormat("yyyy-MM-dd HH:mm:ss")
.create();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public ZtmRecord deserialize(String s, byte[] bytes) {
String rawRecord = new String(bytes, CHARSET);
InputZtmRecord inputRecord = gson.fromJson(rawRecord, InputZtmRecord.class);
return new ZtmRecord(inputRecord);
}
@Override
public void close() {
}
}
Testy
Eksperymentując z datami, miałem trochę problemów z de/serializacją. Pomogłem sobie pisząc testy. Poniżej jeden z nich.
@Test
public void ztmRecordSerializationWorks(){
InputZtmRecordDeserializer deserializer = new InputZtmRecordDeserializer();
String rawRecord = "{\"Lines\": \"204\", \"Lon\": 21.043399, \"VehicleNumber\": \"1042\", \"Time\": \"2020-04-24 21:14:34\", \"Lat\": 52.26617, \"Brigade\": \"04\"}";
InputZtmRecord record = deserializer.deserialize(null, rawRecord.getBytes());
Assert.assertEquals("204",record.lines);
Assert.assertEquals(21.043399,record.lon,0.0001);
Assert.assertEquals(52.26617,record.lat,0.0001);
Assert.assertEquals("04",record.brigade);
Assert.assertEquals("1042",record.vehicleNumber);
Assert.assertEquals("204",record.lines);
Assert.assertEquals(Date.from(LocalDateTime.of(2020,04,24,21,14,34).atZone(ZoneOffset.systemDefault()).toInstant()),record.time);
}
Topologia
Podejście wykorzystujące Process API różni się od Kafka DSL. Można je łączyć (co uczynię w następnych wpisach), ale tutaj skupmy się na procesach. W Kafka DSL pipe-owaliśmy flow danych. Tutaj tez pipe-ujemy, ale źródła (rodziców w acyklicznym grafie skierowanym) każdego z procesów definiujemy po prostu stringiem.
public Topology createTopology() {
final StreamsBuilder builder = new StreamsBuilder();
final Serde<ZtmRecord> outputZtmRecordSerde = Serdes.serdeFrom(new GenericSerializer(), new ZtmRecordDeserializer());///new ZtmRecordDeserializer());
StoreBuilder ztmStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("ztmStore"),
Serdes.String(),
outputZtmRecordSerde
);
Topology topology = new Topology();
topology.addSource("Source",new StringDeserializer(), new InputZtmRecordToZtmRecordDeserializer(),INPUT_TOPIC)
.addProcessor("ZtmProcess", () -> new ZtmProcessor(), "Source")
.addStateStore(ztmStoreBuilder, "ZtmProcess")
.addSink("Sink", OUTPUT_TOPIC, new StringSerializer(), new GenericSerializer(),"ZtmProcess");
return topology;
}
- 5 – 10 -> definiujemy skład którego użyjemy w procesorze
- 13 -> definiujemy z jakiego topic-a pobieramy dane
- 14 -> dodajemy procesor, wskazujemy jego rodzica
- 15 -> przypisujemy wcześniej utworzony skład do procesora
- 16 -> dodajemy ujście naszego strumienia, czyli nazwa topica oraz rodzica skąd rekordy mają płynąć.
Procesor
Najpierw pokażę zarys procesora. Omówimy Pi razy drzwi jak to wygląda. Następnie przejdziemy do omówienia flow przetwarzania.
public class ZtmProcessor implements Processor<String, ZtmRecord> {
public static final int SUSPICIOUS_SPEED = 120;
private ProcessorContext context;
private KeyValueStore<String, ZtmRecord> ztmRecordStore;
@Override
public void init(ProcessorContext context) {
this.context = context;
ztmRecordStore = (KeyValueStore) context.getStateStore("ztmStore");
}
@Override
public void process(String key, ZtmRecord record) {
// some awesome code
}
// some awesome code
@Override
public void close() {
}
}
Cała magia dzieje się w metodzie process, jednak najpierw trzeba pobrać StateStore o odpowiedniej nazwie (linia 4) oraz zachować referencję na ProcessorContext. Kontekst pozwoli na odczyt danych o rekordzie (np. headery) oraz akcje takie jak podanie rekordu dalej.
@Override
public void process(String key, ZtmRecord record) {
ZtmRecord previousRecord = ztmRecordStore.get(key);
if (previousRecord == null) {
ztmRecordStore.put(key, record);
context.forward(key, record);
return;
}
if (previousRecord.time.compareTo(record.time) >= 0) {
return; // ignore old/same record
}
record = calculateRecord(previousRecord, record);
if (record.speed > SUSPICIOUS_SPEED)
{
return; // probably measurement error
}
ztmRecordStore.put(key, record);
context.forward(key, record);
}
private ZtmRecord calculateRecord(ZtmRecord previousRecord, ZtmRecord record) {
double lat1 = previousRecord.lat;
double lat2 = record.lat;
double lon1 = previousRecord.lon;
double lon2 = record.lon;
record.distance = GeoTools.calculateDistanceInKilometers(lat1, lat2, lon1, lon2);
record.bearing = GeoTools.calculateBearing(lat1, lat2, lon1, lon2);
record.speed = GeoTools.calculateSpeed(record.distance,previousRecord.time, record.time);
return record;
}
- 3 -> pobranie poprzedniego rekordu
- 4-8 -> jak nie ma poprzednika, zapisanie rekordu w składzie i przekazanie go dalej
- 9-11 -> ignorowanie duplikatów, starych rekordów
- 12 -> obliczenie prędkości, dystansu, kierunku
- 13-16 -> założyłem, że prędkości ponad 120 kph to błędy czujników GPS.
- 17-10 -> zapisanie rekordu do składu i przekazanie go dalej
Pewnie sporo rzeczy wyjdzie, jak podepnie się źródło i wrzuci rekordy na Elasticsearch. Obliczenia przeniosłem do osobnej klasy GeoTools.
public class GeoTools {
public static double calculateDistanceInKilometers(double lat1, double lat2, double lon1, double lon2) {
if ((lat1 == lat2) && (lon1 == lon2)) {
return 0;
} else {
double theta = lon1 - lon2;
double dist = Math.sin(Math.toRadians(lat1)) * Math.sin(Math.toRadians(lat2)) + Math.cos(Math.toRadians(lat1)) * Math.cos(Math.toRadians(lat2)) * Math.cos(Math.toRadians(theta));
dist = Math.acos(dist);
dist = Math.toDegrees(dist);
dist = dist * 60 * 1.1515;
dist = dist * 1.609344;
return dist;
}
}
public static double calculateBearing(double lat1, double lat2, double lon1, double lon2) {
double latitude1 = Math.toRadians(lat1);
double latitude2 = Math.toRadians(lat2);
double longDiff = Math.toRadians(lon2 - lon1);
double y = Math.sin(longDiff) * Math.cos(latitude2);
double x = Math.cos(latitude1) * Math.sin(latitude2) - Math.sin(latitude1) * Math.cos(latitude2) * Math.cos(longDiff);
return (Math.toDegrees(Math.atan2(y, x)) + 360) % 360;
}
public static double calculateSpeed(double distance, Date previousTime, Date time) {
double millis = time.getTime() - previousTime.getTime();
double hours = millis / (1000 * 60 * 60);
if (hours <= 0) {
return 0;
}
return distance / hours;
}
}
Testy
public class ZtmStreamTest {
TopologyTestDriver testDriver;
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
ZtmRecordDeserializer ztmRecordDeserializer = new ZtmRecordDeserializer();
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, ZtmRecord> outputTopic;
@Before
public void prepareTopologyTestDriver() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wiaderko-ztm-stream-test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "doesnt-matter:1337");
ZtmStream ztmStream = new ZtmStream();
Topology topology = ztmStream.createTopology();
testDriver = new TopologyTestDriver(topology, config);
inputTopic = testDriver.createInputTopic(ZtmStream.INPUT_TOPIC, stringSerializer, stringSerializer);
outputTopic = testDriver.createOutputTopic(ZtmStream.OUTPUT_TOPIC, stringDeserializer, ztmRecordDeserializer);
}
@After
public void closeTestDriver() {
testDriver.close();
}
@Test
public void streamDropsSameRecords() {
String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
inputTopic.pipeInput("1037", firstRecord);
inputTopic.pipeInput("1037", secondRecord);
List<ZtmRecord> output = outputTopic.readValuesToList();
Assert.assertEquals(1, output.size());
}
@Test
public void streamDropsOldRecords() {
String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:17:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
inputTopic.pipeInput("1037", firstRecord);
inputTopic.pipeInput("1037", secondRecord);
List<ZtmRecord> output = outputTopic.readValuesToList();
Assert.assertEquals(1, output.size());
}
@Test
public void streamComputesAreCorrectly() {
String firstRecord = "{\"Lines\": \"108\", \"Lon\": 21.076998, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:16:53\", \"Lat\": 52.200958, \"Brigade\": \"51\"}";
String secondRecord = "{\"Lines\": \"108\", \"Lon\": 21.078823, \"VehicleNumber\": \"1037\", \"Time\": \"2020-04-25 20:17:34\", \"Lat\": 52.199871, \"Brigade\": \"51\"}";
inputTopic.pipeInput("1037", firstRecord);
inputTopic.pipeInput("1037", secondRecord);
List<ZtmRecord> output = outputTopic.readValuesToList();
ZtmRecord record = output.get(1);
Assert.assertTrue(record.bearing > 0);
Assert.assertTrue(record.speed > 0);
Assert.assertTrue(record.distance > 0);
Assert.assertEquals(0.1734, record.distance, 0.0001);
Assert.assertEquals(134, record.bearing, 1);
Assert.assertEquals(15.227, record.speed, 0.001);
}
}
Czy z prawdziwą Kafką też działa?
Napisałem na szybko skrypt wrzucający rekordy do Kafki w pythonie.
import requests
import json
from kafka import KafkaProducer
token = 'paste-your-token-here'
url = 'https://api.um.warszawa.pl/api/action/busestrams_get/'
resource_id = 'f2e5503e927d-4ad3-9500-4ab9e55deb59'
bus_params = {
'apikey':token,
'type':1,
'resource_id': resource_id
}
tram_params = {
'apikey':token,
'type':2,
'resource_id': resource_id
}
r = requests.get(url = url, params = bus_params)
data = r.json()
producer = KafkaProducer(bootstrap_servers=['localhost:29092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
key_serializer=lambda x: x
)
for record in data['result']:
print(record)
future = producer.send('ztm-input', value=record, key=record["VehicleNumber"].encode('utf-8'))
result = future.get(timeout=60)
Repozytorium
https://github.com/zorteran/wiadro-danych-kafka-streams/tree/kafka_streams_201
Branch: kafka_streams_201
Podsumowanie
Jak widać powyżej, działa. Pozostaje kwestia podłączenia pod jakąś infrastrukturę i sprawdzenia, czy dane mają sens. Może po drodze próba zDockeryzowania takiej aplikacji Kafka Streams? 😉
Jeszcze raz chcę wspomnieć o wartości, jaką dały testy w tym projekcie. Nie wyobrażam sobie wrzucania za każdym razem ręczne rekordów w celu sprawdzenia implementacji.
Fajnie się bawisz. Mam trochę bardziej praktyczny pomysł na zagadkę. Jak ustalić w którą stronę jedzie autobus. Czyli czy jedzie trasą czy może nią wraca w przeciwną stronę. To taka trochę trudniejsza zagadka – ale wszystko jest dla ludzi 😉
Chyba mniej więcej udało mi się to zrobić na https://czynaczas.pl – nie zawsze to działa poprawnie, ale jest mapka Warszawy i na podstawie kierunku podróży autobusu/tramwaju wykrywany jest kierunek jazdy (w sensie przystanek końcowy) 🙂 .
Super to wygląda. Szacun 🙂