Kafka Streams 101

Przetwarzanie strumieniowe jest dzisiaj standardem. Skoro i tak większość osób korzysta z Apache Kafka jako kolejki, czemu nie spróbować Kafka Streams? Jest to rozwiązanie skalowalne i nie wymaga specjalnych środowisk typu YARN czy Apache Mesos. Ten wpis rozpoczyna serię wpisów dotyczących Kafka Streams.

Rozkład jazdy

Cel

We wpisach z tego cyklu będę bawił się Kafka Streams, by poznać możliwości tej technologii. Docelowo chcę napisać program/y w Kafka Streams, który będzie:

Zobaczymy, co z tego wyjdzie. Nabieram doświadczenia, wiec jak masz jakiś pomysł lub uwagi, nie krępuj się :-).

Rozkład jazdy

Podstawy

Kafka Streams to program napisany w Javie, który jest jednocześnie konsumentem i producentem Kafki. Źródło wejściowe i wyjściowe to oddzielne topic-i w Apache Kafka. Bardzo fajnie mówił o tym temacie Radek z Big Data Passion na Confiturze, tutaj link do YouTube.

Środowisko

Wykorzystałem docker-compose od bitnami, o którym wspominałem we wpisie o Dockerze. Dodałem do niego Kafdrop, bo tak.

version: '2'
 
services:
  kafdrop:
    image: obsidiandynamics/kafdrop
    restart: "no"
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka:9092"
      JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
    depends_on:
      - kafka
  zookeeper:
    image: 'bitnami/zookeeper:3'
    ports:
      - '2181:2181'
    volumes:
      - 'zookeeper_data:/bitnami'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:2'
    ports:
      - '9092:9092'
      - '29092:29092'
    volumes:
      - 'kafka_data:/bitnami'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
    depends_on:
      - zookeeper
 
volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

Kafdrop pozwala m.in wyklikać nowy topic. Kafka jest co prawda w dockerze i można się tam dostać docker exec -it <kontener> bash, ale ja ściągnąłem Kafkę, by móc wykonywać operacje z terminala na mojej maszynie. Poniżej kilka komend, które mogą Ci się przydać.

# Utworzenie nowego topic-a
bin/kafka-topics.sh --create --bootstrap-server localhost:29092 --replication-factor 1 --partitions 1 --topic wiaderko-input

#Wyświetlenie listy topic-ów
bin/kafka-topics.sh --list --bootstrap-server localhost:29092

#Uruchomienie producenta w konsoli - key:value
bin/kafka-console-producer.sh   --broker-list localhost:29092   --topic wiaderko-input   --property "parse.key=true"   --property "key.separator=:"

#Uruchomienie konsumenta w konsoli
bin/kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic wiaderko-output \
      --from-beginning --formatter kafka.tools.DefaultMessageFormatter \
      --property print.key=true --property print.value=true \
      --property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer \
      --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

#Zmiana konfiguracji topic-a, tutaj zmiana retencji na 5 sekund
bin/kafka-topics.sh --zookeeper localhost:2181 localhost:29092 --alter --topic wiaderko-input --config retention.ms=5000

Pierwszy strumień – LowerCaseStream

Pierwsze koty za płoty. Na razie nic wyszukanego. Przychodząca wartość będzie zmieniana na małe litery.

public class LowercaseStream {

    public static final String INPUT_TOPIC = "wiaderko-input";
    public static final String OUTPUT_TOPIC = "wiaderko-output";

    public static void main(String[] args) throws Exception {
        Properties props = createProperties();

        final StreamsBuilder builder = new StreamsBuilder();builder.<String, String>stream(INPUT_TOPIC)
                .mapValues(v -> v.toLowerCase())
                .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

    private static Properties createProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wiaderko-lowercase-stream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }
}
  • Najpierw musimy skonfigurować strumień (linia 7).
  • “Mięsko”, czyli to jak strumień ma działać, znajduje się w liniach 9-1q. Wykonywana jest metoda mapValues i strumień kierowany jest do innego topic-a.
  • Metodę peek wykorzystałem do zerknięcia w strumień w celach demonstracjnych. Peek używa się do bezstanowych operacji na rekordach.
  • Linie 13-33 jest to utworzenie topologii, strumienia i obsłużenie jego zamknięcia.

W Intellij wystarczy odpalić dany plik. Jeśli zbudowałeś/aś JAR-a to odpalisz go komendą:

java -cp wiadrodanych-kafka-streams.jar wiadrodanych.streams.LowercaseStream

Po wpisaniu do producenta w konsoli wiadomości powinniśmy dostać podobny rezultat.

Efekty widzimy w terminalu IntelliJ (generowane w peek) oraz konsoli z konsumentem topic-a wiaderko-output

De/Serializacja z łapy

Najczęściej używanym formatem jest JSON, więc utworzyłem prostą klasę, która przedstawia osobę.

public class Person {
    public String name;
    public int age;
}

Spróbujmy przekształcić imię tak, by zaczynało się wielką literą. Dodamy również filtr, który będzie zatrzymywał osoby poniżej 18 roku życia.

        builder.<String, String>stream(INPUT_TOPIC)
                .peek(
                    (key, value) -> System.out.println("Input: key=" + key + ", value=" + value)
                )
                .mapValues(v -> {
                    try {
                        return mapper.readValue(v, Person.class);
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                })
                .mapValues((v -> {
                    v.name = v.name.substring(0, 1).toUpperCase() + v.name.substring(1).toLowerCase();
                    return v;
                }))
                .filter((k, v) -> v.age >= 18)
                .mapValues(v -> {
                    try {
                        return mapper.writeValueAsString((v));
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                })
                .peek(
                        (key, value) -> System.out.println("Output: key=" + key + ", value=" + value)
                )
                .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));

Jak widzisz w liniach 5-11, w związku z tym, że zakładamy na wejściu string, sami musimy sparsować obiekt

To żyje!

Poison Pill

Jak widać wyżej, jakoś to działa. Co natomiast w przypadku gdy serializacja się nie uda?

Kafka Stream rzuca błędem i przestaje działać. Możemy dodać obsługę niezłapanych wyjątków w strumieniu, ale aplikacja i tak przestanie działać. O takich wiadomościach psujących strumień mówi się posion pills. Weź pod uwagę, że dopóki taka wiadomość jest w kolejce, aplikacja będzie się psuć. Dlatego warto ustawić czasową retencję na topic-u gdy bawimy się Kafka Streams.

streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
    System.err.println("oh no! error! " + throwable.getMessage());
});

De/Serializacja nie z łapy

Jest lepszy sposób na de/serializacje. Najpierw musimy napisać klasy implementujące interfejsy Serializer i Deserializer.

public class PersonSerializer implements Serializer {

    private static final Charset CHARSET = Charset.forName("UTF-8");
    static private Gson gson = new Gson();
    @Override
    public void configure(Map configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String s, Object o) {
        String object = gson.toJson(o);
        // Return the bytes from the String 'line'
        return object.getBytes(CHARSET);
    }

    @Override
    public void close() {

    }
}

public class PersonDeserializer implements Deserializer<Person> {
    private static final Charset CHARSET = Charset.forName("UTF-8");
    static private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public Person deserialize(String s, byte[] bytes) {
            String person = new String(bytes, CHARSET);
            return gson.fromJson(person, Person.class);
    }

    @Override
    public void close() {

    }
}

Teraz zamiast podawania w konfiguracji domyślnych SERDE, dostarczymy je tworząc strumień. Kod strumienia wygląda teraz tak:

builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), personSerde))
        .peek(
                (key, value) -> System.out.println("key=" + key + ", value=" + value)
        )
        .mapValues((v -> {
            v.name = v.name.substring(0, 1).toUpperCase() + v.name.substring(1).toLowerCase();
            return v;
        }))
        .filter((k, v) -> v.age >= 18)
        .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), personSerde));

Peek dużo nam nie mówi, bo teraz dostajemy obiekt. Aplikacja działa tak jak wcześniejsza, ale kod jest czytelniejszy.

Poison Pill

Co się stanie jak wrzucimy do topic-a poprzedni poison pill? Jeśli się przygotujemy to nic. Jak dodamy do konfiguracji strumienia klasę LogAndContinueExceptionHandler lub swoją implementującą DeserializationExceptionHandler, możemy obsłużyć wyjątek dezerializacji i kontynuować pracę programu.

    private static Properties createProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wiaderko-json-stream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
        return props;
    }

Niestety w ten sposób ratujemy się tylko w przypadku błędów z parsowaniem. Gdy podamy pusty string metoda mapValues rzuci wyjątkiem i wszystko się rozsypie.

Co dalej?

Jak widzisz, problem ze strumieniem nadal pozostał. Oto tematy Kafka Streams, które chcę poruszyć w przyszłości:

  • Jak obsługiwać błędy?
  • Co zrobić z błędnymi wiadomościami?
  • Jak to się testuje?
  • Agregacje. O co chodzi z tym KTable itp.

Jeśli masz jakieś pomysły/uwagi, proszę, napisz w komentarzu.

Repozytorium

https://github.com/zorteran/wiadro-danych-kafka-streams/tree/kafka_streams_101

4 komentarze do “Kafka Streams 101”

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *