Przetwarzanie strumieniowe jest dzisiaj standardem. Skoro i tak większość osób korzysta z Apache Kafka jako kolejki, czemu nie spróbować Kafka Streams? Jest to rozwiązanie skalowalne i nie wymaga specjalnych środowisk typu YARN czy Apache Mesos. Ten wpis rozpoczyna serię wpisów dotyczących Kafka Streams.
Rozkład jazdy
- Kafka Streams 101 – de/serilizacja ✔
- Kafka Streams 102 – Wyjątki i Dead Letter Queue
- Kafka Streams 103 – Pisz testy, zapomnij o Kafce
- Kafka Streams 201 – Obliczanie prędkości, Processor API, KeyValueStore
Cel
We wpisach z tego cyklu będę bawił się Kafka Streams, by poznać możliwości tej technologii. Docelowo chcę napisać program/y w Kafka Streams, który będzie:
- Wzbogacał strumień lokalizacji pojazdów ZTM w Warszawie w prędkość i wektor poruszania się
- Filtrował błędne odczyty
- <tutaj myślę o agregacjach, ale jeszcze nie wiem jakich 🙂 >
- Myślałem też o wykrywaniu czy autobus znajduje się na przystanku
Zobaczymy, co z tego wyjdzie. Nabieram doświadczenia, wiec jak masz jakiś pomysł lub uwagi, nie krępuj się :-).
Czy musimy sprawdzać wiarygodność pobieranych danych? ? Jak widać tak. Pojazd linii 178 nagle teleportował się na Ursynów ? #elasticsearch #elasticstack pic.twitter.com/M1gYhh9ZaJ
— Maciej Szymczyk (@maciej_szymczyk) February 5, 2020
Rozkład jazdy
- Kafka Streams 101 – de/serilizacja ✔
- Kafka Streams 102 – Wyjątki i Dead Letter Queue
- Kafka Streams 102 – Pisz testy, zapomnij o Kafce
Podstawy
Kafka Streams to program napisany w Javie, który jest jednocześnie konsumentem i producentem Kafki. Źródło wejściowe i wyjściowe to oddzielne topic-i w Apache Kafka. Bardzo fajnie mówił o tym temacie Radek z Big Data Passion na Confiturze, tutaj link do YouTube.
Środowisko
Wykorzystałem docker-compose od bitnami, o którym wspominałem we wpisie o Dockerze. Dodałem do niego Kafdrop, bo tak.
version: '2' services: kafdrop: image: obsidiandynamics/kafdrop restart: "no" ports: - "9000:9000" environment: KAFKA_BROKERCONNECT: "kafka:9092" JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify" depends_on: - kafka zookeeper: image: 'bitnami/zookeeper:3' ports: - '2181:2181' volumes: - 'zookeeper_data:/bitnami' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:2' ports: - '9092:9092' - '29092:29092' volumes: - 'kafka_data:/bitnami' environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 depends_on: - zookeeper volumes: zookeeper_data: driver: local kafka_data: driver: local
Kafdrop pozwala m.in wyklikać nowy topic. Kafka jest co prawda w dockerze i można się tam dostać docker exec -it <kontener> bash, ale ja ściągnąłem Kafkę, by móc wykonywać operacje z terminala na mojej maszynie. Poniżej kilka komend, które mogą Ci się przydać.
# Utworzenie nowego topic-a bin/kafka-topics.sh --create --bootstrap-server localhost:29092 --replication-factor 1 --partitions 1 --topic wiaderko-input #Wyświetlenie listy topic-ów bin/kafka-topics.sh --list --bootstrap-server localhost:29092 #Uruchomienie producenta w konsoli - key:value bin/kafka-console-producer.sh --broker-list localhost:29092 --topic wiaderko-input --property "parse.key=true" --property "key.separator=:" #Uruchomienie konsumenta w konsoli bin/kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic wiaderko-output \ --from-beginning --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true --property print.value=true \ --property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #Zmiana konfiguracji topic-a, tutaj zmiana retencji na 5 sekund bin/kafka-topics.sh --zookeeper localhost:2181 localhost:29092 --alter --topic wiaderko-input --config retention.ms=5000
Pierwszy strumień – LowerCaseStream
Pierwsze koty za płoty. Na razie nic wyszukanego. Przychodząca wartość będzie zmieniana na małe litery.
public class LowercaseStream { public static final String INPUT_TOPIC = "wiaderko-input"; public static final String OUTPUT_TOPIC = "wiaderko-output"; public static void main(String[] args) throws Exception { Properties props = createProperties(); final StreamsBuilder builder = new StreamsBuilder();builder.<String, String>stream(INPUT_TOPIC) .mapValues(v -> v.toLowerCase()) .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String())); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } private static Properties createProperties() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wiaderko-lowercase-stream"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } }
- Najpierw musimy skonfigurować strumień (linia 7).
- „Mięsko”, czyli to jak strumień ma działać, znajduje się w liniach 9-1q. Wykonywana jest metoda mapValues i strumień kierowany jest do innego topic-a.
- Metodę peek wykorzystałem do zerknięcia w strumień w celach demonstracjnych. Peek używa się do bezstanowych operacji na rekordach.
- Linie 13-33 jest to utworzenie topologii, strumienia i obsłużenie jego zamknięcia.
W Intellij wystarczy odpalić dany plik. Jeśli zbudowałeś/aś JAR-a to odpalisz go komendą:
java -cp wiadrodanych-kafka-streams.jar wiadrodanych.streams.LowercaseStream
Po wpisaniu do producenta w konsoli wiadomości powinniśmy dostać podobny rezultat.

De/Serializacja z łapy
Najczęściej używanym formatem jest JSON, więc utworzyłem prostą klasę, która przedstawia osobę.
public class Person { public String name; public int age; }
Spróbujmy przekształcić imię tak, by zaczynało się wielką literą. Dodamy również filtr, który będzie zatrzymywał osoby poniżej 18 roku życia.
builder.<String, String>stream(INPUT_TOPIC) .peek( (key, value) -> System.out.println("Input: key=" + key + ", value=" + value) ) .mapValues(v -> { try { return mapper.readValue(v, Person.class); } catch (JsonProcessingException e) { throw new RuntimeException(e); } }) .mapValues((v -> { v.name = v.name.substring(0, 1).toUpperCase() + v.name.substring(1).toLowerCase(); return v; })) .filter((k, v) -> v.age >= 18) .mapValues(v -> { try { return mapper.writeValueAsString((v)); } catch (JsonProcessingException e) { throw new RuntimeException(e); } }) .peek( (key, value) -> System.out.println("Output: key=" + key + ", value=" + value) ) .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
Jak widzisz w liniach 5-11, w związku z tym, że zakładamy na wejściu string, sami musimy sparsować obiekt

Poison Pill
Jak widać wyżej, jakoś to działa. Co natomiast w przypadku gdy serializacja się nie uda?

Kafka Stream rzuca błędem i przestaje działać. Możemy dodać obsługę niezłapanych wyjątków w strumieniu, ale aplikacja i tak przestanie działać. O takich wiadomościach psujących strumień mówi się posion pills. Weź pod uwagę, że dopóki taka wiadomość jest w kolejce, aplikacja będzie się psuć. Dlatego warto ustawić czasową retencję na topic-u gdy bawimy się Kafka Streams.
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> { System.err.println("oh no! error! " + throwable.getMessage()); });
De/Serializacja nie z łapy
Jest lepszy sposób na de/serializacje. Najpierw musimy napisać klasy implementujące interfejsy Serializer i Deserializer.
public class PersonSerializer implements Serializer { private static final Charset CHARSET = Charset.forName("UTF-8"); static private Gson gson = new Gson(); @Override public void configure(Map configs, boolean isKey) { } @Override public byte[] serialize(String s, Object o) { String object = gson.toJson(o); // Return the bytes from the String 'line' return object.getBytes(CHARSET); } @Override public void close() { } } public class PersonDeserializer implements Deserializer<Person> { private static final Charset CHARSET = Charset.forName("UTF-8"); static private Gson gson = new Gson(); @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public Person deserialize(String s, byte[] bytes) { String person = new String(bytes, CHARSET); return gson.fromJson(person, Person.class); } @Override public void close() { } }
Teraz zamiast podawania w konfiguracji domyślnych SERDE, dostarczymy je tworząc strumień. Kod strumienia wygląda teraz tak:
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), personSerde)) .peek( (key, value) -> System.out.println("key=" + key + ", value=" + value) ) .mapValues((v -> { v.name = v.name.substring(0, 1).toUpperCase() + v.name.substring(1).toLowerCase(); return v; })) .filter((k, v) -> v.age >= 18) .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), personSerde));
Peek dużo nam nie mówi, bo teraz dostajemy obiekt. Aplikacja działa tak jak wcześniejsza, ale kod jest czytelniejszy.

Poison Pill
Co się stanie jak wrzucimy do topic-a poprzedni poison pill? Jeśli się przygotujemy to nic. Jak dodamy do konfiguracji strumienia klasę LogAndContinueExceptionHandler lub swoją implementującą DeserializationExceptionHandler, możemy obsłużyć wyjątek dezerializacji i kontynuować pracę programu.
private static Properties createProperties() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wiaderko-json-stream"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092"); props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); return props; }

Niestety w ten sposób ratujemy się tylko w przypadku błędów z parsowaniem. Gdy podamy pusty string metoda mapValues rzuci wyjątkiem i wszystko się rozsypie.

Co dalej?
Jak widzisz, problem ze strumieniem nadal pozostał. Oto tematy Kafka Streams, które chcę poruszyć w przyszłości:
- Jak obsługiwać błędy?
- Co zrobić z błędnymi wiadomościami?
- Jak to się testuje?
- Agregacje. O co chodzi z tym KTable itp.
Jeśli masz jakieś pomysły/uwagi, proszę, napisz w komentarzu.
Repozytorium
https://github.com/zorteran/wiadro-danych-kafka-streams/tree/kafka_streams_101
3 myśli w temacie “Kafka Streams 101”