Kafka Streams 102 – Wyjątki i Dead Letter Queue

Błędy zdarzają się każdemu. Prędzej czy później nasza aplikacja Kafka Streams dostanie wiadomość, która ją zabije (Poison Pill). Niestety uruchomienie jej ponownie nie pomoże, dopóki wiadomość nie zniknie z kolejki. W tym wpisie spróbujemy obsłużyć takie wiadomości i zapisać je do Dead Letter Queue.

Rozkład jazdy

Poison Pill

Trująca pigułka jest to wiadomość, która zawsze wyrzuci nam wyjątkiem. Będzie się tak działo niezależnie od tego ile razy spróbujemy ją skonsumować. Dopóki nie opuści kolejki (np. z powodu polityki retencji), nasz konsument nic z nią nie zrobi. Powodów dla takiej wiadomości może być wiele, najczęściej są to problemy z deserializacją.

Rozwiązania

  1. Utworzyć logi błędu i zamknąć aplikacje.
  2. Utworzyć logi błędu i pominąć uszkodzony rekord.
  3. Wysłać rekord na oddzielny topic zwany Dead Letter Queue. Takie rekordy możemy potem przeanalizować i poprawić naszą aplikację.

Problemy

Patrząc na strumień z poprzedniego wpisu Kafka Streams 101, widzimy dwa potencjalne problemy:

  1. Producent wrzuci rekord, który nie będzie mógł zostać prawidłowo zdeserializowany do obiektu klasy Person.
  2. Producent wrzuci rekord z pustym atrybutem name. Nie uwzględniliśmy tego w kodzie, więc dostaniemy wyjątek StringIndexOutOfBoundsException.

Dead Letter Queue

Wyprzedźmy trochę fakty. Kiedyś skorzystamy z topica do „spadków” z naszego przetwarzania strumieniowego. Poniżej implementacja producenta do takiego topica. Można ją ugenerycznić, ale dla prostoty skupiłem się na klasie Person. Oprócz klucza i wartości, dołączane są również nagłówki rekordu z powodem zaistniałego problemu.

public class DeadLetterQueue {
    private static final Logger log = LoggerFactory.getLogger(DeadLetterQueue.class);

    private final KafkaProducer<String, String> dlqProducer;
    private final String dlqTopic = "dead_letter_queue";

    private static DeadLetterQueue deadLetterQueueInstance;
    private final Gson gson;

    public static DeadLetterQueue getInstance() {
        if (deadLetterQueueInstance == null) {
            deadLetterQueueInstance = new DeadLetterQueue();
        }
        return deadLetterQueueInstance;
    }

    private DeadLetterQueue() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:29092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.dlqProducer = new KafkaProducer<>(props);
        this.gson = new Gson();
    }

    public void send(String key, Person person, Headers headers, String reason) {
        send(key, gson.toJson(person), headers, reason);
    }

    public void send(String key, String value, Headers headers, String reason) throws KafkaException {
        headers.add(new RecordHeader("failure.reason", reason.getBytes()));
        headers.add(new RecordHeader("failure.time", String.valueOf(System.currentTimeMillis()).getBytes()));

        log.warn("Sending to Dead Letter Queue {}: {}", dlqTopic, reason);

        dlqProducer.send(new ProducerRecord<>(
                dlqTopic,
                null,
                key,
                value,
                headers)
        );
    }

    public void send(byte[] key, byte[] value, Headers headers, String reason) throws KafkaException {
        send(new String(key), new String(value), headers, reason);
    }
}

Filtrowanie wadliwych rekordów

Podczas deserializacji

Konfigurując Kafka Streams mamy możliwość ustawienia domyślnej obsługi wyjątków podczas deserializacji. Możemy użyć gotowych klas LogAndContinueExceptionHandler i LogAndFailExceptionHandler lub napisać własną implementującą interfejs Deserialization ExceptionHandler. Ich nazwy same mówią za siebie.

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
// OR
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class);

Podczas transformacji

Złapanie wyjątku, zwrócenie nulla i przefiltrowanie załatwia sprawę. Niestety z informacji o błędzie mam tylko te, które zalogujemy.

// some code from FilterExceptionsJsonStream.java
...
.mapValues(v -> {
    try {
        v.name = v.name.substring(0, 1).toUpperCase() + v.name.substring(1).toLowerCase();
        return v;
    } catch (Exception exception) {
        System.err.println("Occured an exception has... ");
        exception.printStackTrace();
        return null;
    }
})
.filter((k, v) -> v != null)
...
// some code from FilterExceptionsJsonStream.java

Wykorzystanie Dead Letter Queue

Podczas deserializacji

Tym razem wykorzystamy własną implementacje interfejsu DeserializationExceptionHandler. Wykorzystamy wcześniej utworzonego producenta do kolejki DLQ.

public class MyDeserializationExceptionHandler implements DeserializationExceptionHandler {

    private static final Logger log = LoggerFactory.getLogger(MyDeserializationExceptionHandler.class);

    @Override
    public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception) {
        exception.printStackTrace();
        log.warn("Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}", new Object[]{context.taskId(), record.topic(), record.partition(), record.offset(), exception});
        DeadLetterQueue dlq = DeadLetterQueue.getInstance();
        dlq.send(record.key(), record.value(), record.headers(), exception.getMessage());
        return DeserializationHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

I konfiguracja

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, MyDeserializationExceptionHandler.class);

Podczas transformacji – używając branch

Możemy użyć metody branch. Dzieli ona KStream na podstawie podanych predykatów. Wcześniej jednak, aby w jakiś sposób przekazywać informacje o wyjątku. Utworzyłem klasę EntityBase po której dziedziczy obiekt Person. Wykorzystanie keyword transient powoduje to, że atrybuty nie są brane pod uwagę podczas de/serializacji.

public class EntityBase {
    public transient boolean valid;
    public transient Exception exception;
    public EntityBase(){
        valid = true;
    }
}

public class Person extends EntityBase{
    public String name;
    public int age;
}

W ciele try… catch uzupełniamy valid i exception. Na podstawie valid dzielimy strumień na dwie części: poprawną i niepoprawną.

// some code from BranchExceptionsJsonStream.java
...
int valid = 0;
int invalid = 1;

KStream<String, Person>[] personStream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), personSerde))
        .peek(
                (key, value) -> System.out.println("key=" + key + ", value=" + value)
        )
        .mapValues(v -> {
            try {
                v.name = v.name.substring(0, 1).toUpperCase() + v.name.substring(1).toLowerCase();
                return v;
            } catch (Exception exception) {
                System.err.println("Occured an exception has... ");
                exception.printStackTrace();
                v.valid = false;
                return v;
            }
        })
        .branch(
                (key, value) -> value.valid,
                (key, value) -> true // !value.valid
        );

personStream[valid].filter((k, v) -> v.age >= 18)
        .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), personSerde));

personStream[invalid].to(DLQ_TOPIC, Produced.with(Serdes.String(), personSerde));
...
// some code from BranchExceptionsJsonStream.java

Zmienne valid i invalid zwiększają czytelność. Zwróć uwagę na to, że na DLQ trafią rekordy bez valid i exception. Zmieni się to w następnym przykładzie.

Podczas transformacji – używając branch i procesora

Istnieje możliwość zamknięcia logiki obsługi DLQ w procesorze. W końcu napisaliśmy klasę DeadLetterQueue, a w przykładzie wyżej jej nie użyliśmy. Zacznijmy od implementacji klasy implementującej Processor<String, Person>.

public class MessageFailureHandler implements Processor<String, Person> {
    private ProcessorContext context;

    private static final Logger log = LoggerFactory.getLogger(MessageFailureHandler.class);

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, Person value) {

        DeadLetterQueue.getInstance().send(
                key,
                value,
                context.headers(),
                value.exception.toString()
        );
    }

    @Override
    public void close() {
    }
}

Dla wygody przyda nam się też implementacja ProcessorSupplier

public class MessageFailureHandlerSupplier implements ProcessorSupplier {
    @Override
    public Processor get() {
        return new MessageFailureHandler();
    }
}

Teraz KStream z nieprawidłowymi rekordami możemy prze procesować.

// Some code from BranchExceptionsWithProcessorJsonStream.java
...
MessageFailureHandlerSupplier messageFailureHandlerSupplier = new MessageFailureHandlerSupplier();

...

        .branch(
                (key, value) -> value.valid,
                (key, value) -> true // !value.valid
        );

personStream[valid].filter((k, v) -> v.age >= 18)
        .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), personSerde));

personStream[invalid].process(messageFailureHandlerSupplier);
...
// Some code from BranchExceptionsWithProcessorJsonStream.java

Przykład

Repozytorium

https://github.com/zorteran/wiadro-danych-kafka-streams/tree/kafka_streams_102

branch: kafka_streams_102

Co dalej?

Ręczne testowanie aplikacji jest czasochłonne. Myślę, że kolejnym tematem, jakim się zajmę, będzie pisanie testów dla Kafka Streams.

3 myśli w temacie “Kafka Streams 102 – Wyjątki i Dead Letter Queue”

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *