Błędy zdarzają się każdemu. Prędzej czy później nasza aplikacja Kafka Streams dostanie wiadomość, która ją zabije (Poison Pill). Niestety uruchomienie jej ponownie nie pomoże, dopóki wiadomość nie zniknie z kolejki. W tym wpisie spróbujemy obsłużyć takie wiadomości i zapisać je do Dead Letter Queue.
Rozkład jazdy
- Kafka Streams 101 – de/serilizacja
- Kafka Streams 102 – Wyjątki i Dead Letter Queue ✔
- Kafka Streams 102 – Pisz testy, zapomnij o Kafce
- Kafka Streams 201 – Obliczanie prędkości, Processor API, KeyValueStore
Poison Pill
Trująca pigułka jest to wiadomość, która zawsze wyrzuci nam wyjątkiem. Będzie się tak działo niezależnie od tego ile razy spróbujemy ją skonsumować. Dopóki nie opuści kolejki (np. z powodu polityki retencji), nasz konsument nic z nią nie zrobi. Powodów dla takiej wiadomości może być wiele, najczęściej są to problemy z deserializacją.
Rozwiązania
- Utworzyć logi błędu i zamknąć aplikacje.
- Utworzyć logi błędu i pominąć uszkodzony rekord.
- Wysłać rekord na oddzielny topic zwany Dead Letter Queue. Takie rekordy możemy potem przeanalizować i poprawić naszą aplikację.
Problemy
Patrząc na strumień z poprzedniego wpisu Kafka Streams 101, widzimy dwa potencjalne problemy:
- Producent wrzuci rekord, który nie będzie mógł zostać prawidłowo zdeserializowany do obiektu klasy Person.
- Producent wrzuci rekord z pustym atrybutem name. Nie uwzględniliśmy tego w kodzie, więc dostaniemy wyjątek StringIndexOutOfBoundsException.
Dead Letter Queue
Wyprzedźmy trochę fakty. Kiedyś skorzystamy z topica do “spadków” z naszego przetwarzania strumieniowego. Poniżej implementacja producenta do takiego topica. Można ją ugenerycznić, ale dla prostoty skupiłem się na klasie Person. Oprócz klucza i wartości, dołączane są również nagłówki rekordu z powodem zaistniałego problemu.
public class DeadLetterQueue {
private static final Logger log = LoggerFactory.getLogger(DeadLetterQueue.class);
private final KafkaProducer<String, String> dlqProducer;
private final String dlqTopic = "dead_letter_queue";
private static DeadLetterQueue deadLetterQueueInstance;
private final Gson gson;
public static DeadLetterQueue getInstance() {
if (deadLetterQueueInstance == null) {
deadLetterQueueInstance = new DeadLetterQueue();
}
return deadLetterQueueInstance;
}
private DeadLetterQueue() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:29092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.dlqProducer = new KafkaProducer<>(props);
this.gson = new Gson();
}
public void send(String key, Person person, Headers headers, String reason) {
send(key, gson.toJson(person), headers, reason);
}
public void send(String key, String value, Headers headers, String reason) throws KafkaException {
headers.add(new RecordHeader("failure.reason", reason.getBytes()));
headers.add(new RecordHeader("failure.time", String.valueOf(System.currentTimeMillis()).getBytes()));
log.warn("Sending to Dead Letter Queue {}: {}", dlqTopic, reason);
dlqProducer.send(new ProducerRecord<>(
dlqTopic,
null,
key,
value,
headers)
);
}
public void send(byte[] key, byte[] value, Headers headers, String reason) throws KafkaException {
send(new String(key), new String(value), headers, reason);
}
}
Filtrowanie wadliwych rekordów
Podczas deserializacji
Konfigurując Kafka Streams mamy możliwość ustawienia domyślnej obsługi wyjątków podczas deserializacji. Możemy użyć gotowych klas LogAndContinueExceptionHandler i LogAndFailExceptionHandler lub napisać własną implementującą interfejs Deserialization ExceptionHandler. Ich nazwy same mówią za siebie.
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
// OR
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class);
Podczas transformacji
Złapanie wyjątku, zwrócenie nulla i przefiltrowanie załatwia sprawę. Niestety z informacji o błędzie mam tylko te, które zalogujemy.
// some code from FilterExceptionsJsonStream.java
...
.mapValues(v -> {
try {
v.name = v.name.substring(0, 1).toUpperCase() + v.name.substring(1).toLowerCase();
return v;
} catch (Exception exception) {
System.err.println("Occured an exception has... ");
exception.printStackTrace();
return null;
}
})
.filter((k, v) -> v != null)
...
// some code from FilterExceptionsJsonStream.java
Wykorzystanie Dead Letter Queue
Podczas deserializacji
Tym razem wykorzystamy własną implementacje interfejsu DeserializationExceptionHandler. Wykorzystamy wcześniej utworzonego producenta do kolejki DLQ.
public class MyDeserializationExceptionHandler implements DeserializationExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(MyDeserializationExceptionHandler.class);
@Override
public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception) {
exception.printStackTrace();
log.warn("Exception caught during Deserialization, taskId: {}, topic: {}, partition: {}, offset: {}", new Object[]{context.taskId(), record.topic(), record.partition(), record.offset(), exception});
DeadLetterQueue dlq = DeadLetterQueue.getInstance();
dlq.send(record.key(), record.value(), record.headers(), exception.getMessage());
return DeserializationHandlerResponse.CONTINUE;
}
@Override
public void configure(Map<String, ?> map) {
}
}
I konfiguracja
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, MyDeserializationExceptionHandler.class);
Podczas transformacji – używając branch
Możemy użyć metody branch. Dzieli ona KStream na podstawie podanych predykatów. Wcześniej jednak, aby w jakiś sposób przekazywać informacje o wyjątku. Utworzyłem klasę EntityBase po której dziedziczy obiekt Person. Wykorzystanie keyword transient powoduje to, że atrybuty nie są brane pod uwagę podczas de/serializacji.
public class EntityBase {
public transient boolean valid;
public transient Exception exception;
public EntityBase(){
valid = true;
}
}
public class Person extends EntityBase{
public String name;
public int age;
}
W ciele try… catch uzupełniamy valid i exception. Na podstawie valid dzielimy strumień na dwie części: poprawną i niepoprawną.
// some code from BranchExceptionsJsonStream.java
...
int valid = 0;
int invalid = 1;
KStream<String, Person>[] personStream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), personSerde))
.peek(
(key, value) -> System.out.println("key=" + key + ", value=" + value)
)
.mapValues(v -> {
try {
v.name = v.name.substring(0, 1).toUpperCase() + v.name.substring(1).toLowerCase();
return v;
} catch (Exception exception) {
System.err.println("Occured an exception has... ");
exception.printStackTrace();
v.valid = false;
return v;
}
})
.branch(
(key, value) -> value.valid,
(key, value) -> true // !value.valid
);
personStream[valid].filter((k, v) -> v.age >= 18)
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), personSerde));
personStream[invalid].to(DLQ_TOPIC, Produced.with(Serdes.String(), personSerde));
...
// some code from BranchExceptionsJsonStream.java
Zmienne valid i invalid zwiększają czytelność. Zwróć uwagę na to, że na DLQ trafią rekordy bez valid i exception. Zmieni się to w następnym przykładzie.
Podczas transformacji – używając branch i procesora
Istnieje możliwość zamknięcia logiki obsługi DLQ w procesorze. W końcu napisaliśmy klasę DeadLetterQueue, a w przykładzie wyżej jej nie użyliśmy. Zacznijmy od implementacji klasy implementującej Processor<String, Person>.
public class MessageFailureHandler implements Processor<String, Person> {
private ProcessorContext context;
private static final Logger log = LoggerFactory.getLogger(MessageFailureHandler.class);
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, Person value) {
DeadLetterQueue.getInstance().send(
key,
value,
context.headers(),
value.exception.toString()
);
}
@Override
public void close() {
}
}
Dla wygody przyda nam się też implementacja ProcessorSupplier
public class MessageFailureHandlerSupplier implements ProcessorSupplier {
@Override
public Processor get() {
return new MessageFailureHandler();
}
}
Teraz KStream z nieprawidłowymi rekordami możemy prze procesować.
// Some code from BranchExceptionsWithProcessorJsonStream.java
...
MessageFailureHandlerSupplier messageFailureHandlerSupplier = new MessageFailureHandlerSupplier();
...
.branch(
(key, value) -> value.valid,
(key, value) -> true // !value.valid
);
personStream[valid].filter((k, v) -> v.age >= 18)
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), personSerde));
personStream[invalid].process(messageFailureHandlerSupplier);
...
// Some code from BranchExceptionsWithProcessorJsonStream.java
Przykład
Repozytorium
https://github.com/zorteran/wiadro-danych-kafka-streams/tree/kafka_streams_102
branch: kafka_streams_102
Co dalej?
Ręczne testowanie aplikacji jest czasochłonne. Myślę, że kolejnym tematem, jakim się zajmę, będzie pisanie testów dla Kafka Streams.
Właśnie czytam wszystkie Twoje wpisy po kolei i naprawdę fajny blog! Po przeczytaniu tego jedna rzecz rzuca mi się w oczy. Że kod jest “niskiej jakości”. Co mam dokładnie na myśli? Że używasz singletonów, że korzystasz ze statycznych metod, że nie można go testować w łatwy sposób. Z czego to wynika? Czy np. DeadLetterQueue musi być singletonem ze względu na wielowątkowość? Jeśli tak to i tak jest źle zaimplementowany (brak double checking). Czy może jest jakiś inny powód?
Hej. Dzięki za opinie. Miło mi, że podobają Ci się treści na blogu 😁.
Co do kodu: Czasami po prostu eksperymentuje, robie PoC, chcę pokazać jakąś koncepcje w miarę jasny sposób i w skończonym czasie. To prawda, że pewnie w nie jednym miejscu przydałby się refactor.
Niestety nie pamiętam czemu akurat wykorzystałem tutaj singleton i faktycznie przydałby się double checking.
Jasne, rozumiem. Dzięki za odpowiedź 🙂