Nie wyobrażam sobie programowania bez pisania testów. Gdy śpieszy mi się i o nich “zapominam”, potem i tak poprawiam kod przez jakąś głupotę. Poprawiają jakość i przyśpieszają czas tworzenia oprogramowania. Nie wierzysz?
Rozkład jazdy
- Kafka Streams 101 – de/serilizacja
- Kafka Streams 102 – Wyjątki i Dead Letter Queue
- Kafka Streams 103 – Pisz testy, zapomnij o Kafce ✔
- Kafka Streams 201 – Obliczanie prędkości, Processor API, KeyValueStore
Co zyskamy?
- Nie będziemy musieli używać Kafki podczas pisania programu Kafka Streams
- Unikniemy uciążliwego wpisywania non stop tych samych wiadomości do Kafka Console Producera
- Będziemy mieli gwarancję, że nasz kod działa
- Szybciej wykryjemy bugi
- Możemy przygotować swój zestaw danych, a w razie potrzeby zrobić fuzzing
- Szacunek na dzielni ( i na linkedin )

Zależności
Do projektu z poprzednich postów dodałem kafka-streams-test-utils i junit
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
Testowalny kod Kafka Streams
Wróćmy do pierwszego przykładu z Kafka 101. Cały kod znajduje się w metodzie main. Musimy wszystko od inicjalizacji StreamsBuilder, po wykonanie na nim build(), zamknąć w metodzie. Potrzebny jest obiekt klasy Topology, którym potem zasilimy TopologyTestDriver
/// SOME CODE
public static void main(String[] args) throws Exception {
Properties props = createProperties();
SerDeJsonStream serDeJsonStream = new SerDeJsonStream();
final KafkaStreams streams = new KafkaStreams(serDeJsonStream.createTopology(), props);
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
System.err.println("oh no! error! " + throwable.getMessage());
});
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
/// SOME CODE
public Topology createTopology() {
final StreamsBuilder builder = new StreamsBuilder();
final Serde<Person> personSerde = Serdes.serdeFrom(new PersonSerializer(), new PersonDeserializer());
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), personSerde))
.peek(
(key, value) -> System.out.println("key=" + key + ", value=" + value)
)
.mapValues((v -> {
v.name = v.name.substring(0, 1).toUpperCase() + v.name.substring(1).toLowerCase();
return v;
}))
.filter((k, v) -> v.age >= 18)
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), personSerde));
return builder.build();
}
/// SOME CODE
Test 1 – LowercaseStream
Na wyjściu strumienia, każdy string powinien mieć tylko małe litery. Sprawdźmy to!
TopologyTestDriver
Testy w Kafka Streams opierają się na użyciu TopologyTestDriver, którego zadaniem jest udawanie Kafki.
public class LowercaseStreamTest {
TopologyTestDriver testDriver;
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, String> outputTopic;
@Before
public void prepareTopologyTestDriver() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "LowercaseStreamTest");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "doesnt-matter:1337");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
LowercaseStream lowercaseStream = new LowercaseStream();
Topology topology = lowercaseStream.createTopology();
testDriver = new TopologyTestDriver(topology, config);
inputTopic = testDriver.createInputTopic(LowercaseStream.INPUT_TOPIC, stringSerializer, stringSerializer);
outputTopic = testDriver.createOutputTopic(LowercaseStream.OUTPUT_TOPIC, stringDeserializer, stringDeserializer);
}
@After
public void closeTestDriver() {
testDriver.close();
}
...
- @Before wykonywane jest przed każdym testem
- @After wykonywane jest po każdym teście. Musimy zamknąć TopologyTestDriver
- Linie 12-17 -> jest to konfiguracja naszej aplikacji. Najczęściej będzie prawie taka sama jak pierwotna. BOOTSTRAP_SERVERS_CONFIG może być z czapy.
- Linia 21 -> tworzenie TopologyTestDriver na podstawie topologii z poprzedniej sekcji.
- Od wersji 2.4 interakcję z kolejkami wejściowymi/wyjściowymi daje nam TestInputTopic i TestOutputTopic. W liniach 22-23 tworzone są na podstawie TopologyTestDriver oraz De/Serializatorów.
Test właściwy
Używając ulubionej konwencji nazewniczej, piszemy test.
...
@Test
public void outputShouldBeLowercase()
{
String inputText = "Wiadro Danych Rul3Z!";
inputTopic.pipeInput(inputText);
String outputText = outputTopic.readValue();
Assert.assertEquals(outputText, inputText.toLowerCase());
}

Test 2 – SerDeJsonStream
A co w przypadku gdy dochodzi de/serializacja? Prawie to samo, tylko z de/serializatorami ?
public class SerDeJsonStreamTest {
TopologyTestDriver testDriver;
StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
PersonSerializer personSerializer = new PersonSerializer();
PersonDeserializer personDeserializer = new PersonDeserializer();
private TestInputTopic<String, Person> inputTopic;
private TestOutputTopic<String, Person> outputTopic;
@Before
public void prepareTopologyTestDriver() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "SerDeJsonStreamTest");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "doesnt-matter:1337");
SerDeJsonStream serDeJsonStream = new SerDeJsonStream();
Topology topology = serDeJsonStream.createTopology();
testDriver = new TopologyTestDriver(topology, config);
inputTopic = testDriver.createInputTopic(SerDeJsonStream.INPUT_TOPIC, stringSerializer, personSerializer);
outputTopic = testDriver.createOutputTopic(SerDeJsonStream.OUTPUT_TOPIC, stringDeserializer, personDeserializer);
}
...
Pisząc testy, posługujemy się obiektami, więc musimy pamiętać o testowaniu de/serializatorów. Przejdźmy jednak do właściwych testów.
...
@Test
public void personUnder18ShouldNotBeProcessed(){
Person inputPerson1 = new Person("grzesiek", 17);
Person inputPerson2 = new Person("ADAM", 20);
inputTopic.pipeInput(inputPerson1);
inputTopic.pipeInput(inputPerson2);
List<Person> output = outputTopic.readValuesToList();
Person outputPerson = output.stream().findFirst().get();
Assert.assertTrue(output.size() == 1);
Assert.assertEquals("Adam", outputPerson.name);
}
@Test
public void personWithEmptyNameShouldNotBeProcessed(){
Person inputPerson = new Person("", 17);
inputTopic.pipeInput(inputPerson);
Assert.assertTrue(outputTopic.isEmpty());
}
}

W przypadku pustego name powstał wyjątek StringIndexOutOfBoundsException. O tym jak zabezpieczyć się przed wyjątakmi, pisałem w poprzednim wpisie
Repozytorium
https://github.com/zorteran/wiadro-danych-kafka-streams/tree/kafka_streams_103
branch: kafka_streams_103
Wnioski
Dobrze, że nie odkładałem testów na później. Tak naprawdę szkoda, że od nich nie zacząłem ?. Jest to o wiele szybsze i wydajniejsze niż Kafka Console Producer i Consumer na dwóch terminalach.
3 komentarze do “Kafka Streams 103 – Pisz testy, zapomnij o Kafce”