Kafka Streams 103 – Pisz testy, zapomnij o Kafce

Nie wyobrażam sobie programowania bez pisania testów. Gdy śpieszy mi się i o nich “zapominam”, potem i tak poprawiam kod przez jakąś głupotę. Poprawiają jakość i przyśpieszają czas tworzenia oprogramowania. Nie wierzysz?

Rozkład jazdy

Co zyskamy?

  • Nie będziemy musieli używać Kafki podczas pisania programu Kafka Streams
  • Unikniemy uciążliwego wpisywania non stop tych samych wiadomości do Kafka Console Producera
  • Będziemy mieli gwarancję, że nasz kod działa
  • Szybciej wykryjemy bugi
  • Możemy przygotować swój zestaw danych, a w razie potrzeby zrobić fuzzing
  • Szacunek na dzielni ( i na linkedin )

Zależności

Do projektu z poprzednich postów dodałem kafka-streams-test-utils i junit

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>${kafka.version}</version>
    <scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13</version>
    <scope>test</scope>
</dependency>

Testowalny kod Kafka Streams

Wróćmy do pierwszego przykładu z Kafka 101. Cały kod znajduje się w metodzie main. Musimy wszystko od inicjalizacji StreamsBuilder, po wykonanie na nim build(), zamknąć w metodzie. Potrzebny jest obiekt klasy Topology, którym potem zasilimy TopologyTestDriver

/// SOME CODE
public static void main(String[] args) throws Exception {
        Properties props = createProperties();

        SerDeJsonStream serDeJsonStream = new SerDeJsonStream();
        final KafkaStreams streams = new KafkaStreams(serDeJsonStream.createTopology(), props);
        streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
            System.err.println("oh no! error! " + throwable.getMessage());
        });
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
/// SOME CODE
    public Topology createTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        final Serde<Person> personSerde = Serdes.serdeFrom(new PersonSerializer(), new PersonDeserializer());

        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), personSerde))
                .peek(
                        (key, value) -> System.out.println("key=" + key + ", value=" + value)
                )
                .mapValues((v -> {
                    v.name = v.name.substring(0, 1).toUpperCase() + v.name.substring(1).toLowerCase();
                    return v;
                }))
                .filter((k, v) -> v.age >= 18)
                .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), personSerde));

        return builder.build();
    }
/// SOME CODE

Test 1 – LowercaseStream

Na wyjściu strumienia, każdy string powinien mieć tylko małe litery. Sprawdźmy to!

TopologyTestDriver

Testy w Kafka Streams opierają się na użyciu TopologyTestDriver, którego zadaniem jest udawanie Kafki.

public class LowercaseStreamTest {

    TopologyTestDriver testDriver;

    StringSerializer stringSerializer = new StringSerializer();
    StringDeserializer stringDeserializer = new StringDeserializer();
    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, String> outputTopic;

    @Before
    public void prepareTopologyTestDriver() {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "LowercaseStreamTest");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "doesnt-matter:1337");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        LowercaseStream lowercaseStream = new LowercaseStream();
        Topology topology = lowercaseStream.createTopology();
        testDriver = new TopologyTestDriver(topology, config);
        inputTopic = testDriver.createInputTopic(LowercaseStream.INPUT_TOPIC, stringSerializer, stringSerializer);
        outputTopic = testDriver.createOutputTopic(LowercaseStream.OUTPUT_TOPIC, stringDeserializer, stringDeserializer);
    }

    @After
    public void closeTestDriver() {
        testDriver.close();
    }
...
  • @Before wykonywane jest przed każdym testem
  • @After wykonywane jest po każdym teście. Musimy zamknąć TopologyTestDriver
  • Linie 12-17 -> jest to konfiguracja naszej aplikacji. Najczęściej będzie prawie taka sama jak pierwotna. BOOTSTRAP_SERVERS_CONFIG może być z czapy.
  • Linia 21 -> tworzenie TopologyTestDriver na podstawie topologii z poprzedniej sekcji.
  • Od wersji 2.4 interakcję z kolejkami wejściowymi/wyjściowymi daje nam TestInputTopic i TestOutputTopic. W liniach 22-23 tworzone są na podstawie TopologyTestDriver oraz De/Serializatorów.

Test właściwy

Używając ulubionej konwencji nazewniczej, piszemy test.

...
    @Test
    public void outputShouldBeLowercase()
    {
        String inputText = "Wiadro Danych Rul3Z!";
        inputTopic.pipeInput(inputText);
        String outputText = outputTopic.readValue();
        Assert.assertEquals(outputText, inputText.toLowerCase());
    }
i cieszymy się jego wynikiem 🙂

Test 2 – SerDeJsonStream

A co w przypadku gdy dochodzi de/serializacja? Prawie to samo, tylko z de/serializatorami ?

public class SerDeJsonStreamTest {

    TopologyTestDriver testDriver;

    StringSerializer stringSerializer = new StringSerializer();
    StringDeserializer stringDeserializer = new StringDeserializer();
    PersonSerializer personSerializer = new PersonSerializer();
    PersonDeserializer personDeserializer = new PersonDeserializer();
    private TestInputTopic<String, Person> inputTopic;
    private TestOutputTopic<String, Person> outputTopic;

    @Before
    public void prepareTopologyTestDriver() {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "SerDeJsonStreamTest");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "doesnt-matter:1337");

        SerDeJsonStream serDeJsonStream = new SerDeJsonStream();
        Topology topology = serDeJsonStream.createTopology();
        testDriver = new TopologyTestDriver(topology, config);
        inputTopic = testDriver.createInputTopic(SerDeJsonStream.INPUT_TOPIC, stringSerializer, personSerializer);
        outputTopic = testDriver.createOutputTopic(SerDeJsonStream.OUTPUT_TOPIC, stringDeserializer, personDeserializer);
    }
...

Pisząc testy, posługujemy się obiektami, więc musimy pamiętać o testowaniu de/serializatorów. Przejdźmy jednak do właściwych testów.

...
    @Test
    public void personUnder18ShouldNotBeProcessed(){
        Person inputPerson1 = new Person("grzesiek", 17);
        Person inputPerson2 = new Person("ADAM", 20);
        inputTopic.pipeInput(inputPerson1);
        inputTopic.pipeInput(inputPerson2);
        List<Person> output = outputTopic.readValuesToList();
        Person outputPerson = output.stream().findFirst().get();
        Assert.assertTrue(output.size() == 1);
        Assert.assertEquals("Adam", outputPerson.name);
    }

    @Test
    public void personWithEmptyNameShouldNotBeProcessed(){
        Person inputPerson = new Person("", 17);
        inputTopic.pipeInput(inputPerson);
        Assert.assertTrue(outputTopic.isEmpty());
    }
}

W przypadku pustego name powstał wyjątek StringIndexOutOfBoundsException. O tym jak zabezpieczyć się przed wyjątakmi, pisałem w poprzednim wpisie

Repozytorium

https://github.com/zorteran/wiadro-danych-kafka-streams/tree/kafka_streams_103

branch: kafka_streams_103

Wnioski

Dobrze, że nie odkładałem testów na później. Tak naprawdę szkoda, że od nich nie zacząłem ?. Jest to o wiele szybsze i wydajniejsze niż Kafka Console Producer i Consumer na dwóch terminalach.

3 komentarze do “Kafka Streams 103 – Pisz testy, zapomnij o Kafce”

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *