ksqlDB to rozwiązanie z rodziny Apache Kafka i Confluent. Pozwala na wykorzystanie języka SQL do definiowania zadań przetwarzania strumieniowego. Wpis ten zaczyna serię o ksqlDB. Spróbujemy zrobić coś fajnego na podstawie danych z Packetbeat’a (monitoring ruchu sieciowego) i zobaczymy jak to dalej się rozwinie.
Co to jest ksqlDB?
ksqlDB (wcześniej ksql) to platforma do budowania aplikacji przetwarzających strumienie danych z Kafki (O Apache Kafka dowiesz się z mojego filmu na YT). Pod spodem działa Kafka Streams, czyli biblioteka do budowania aplikacji przetwarzających dane z i do Kafki.
DB sugeruje, że jest to strumieniowa baza danych. Coś w tym jest. Mamy do dyspozycji strumienie, tabele, widoki zmaterializowane, zapytania, a przede wszystkim operujemy SQL’em.
Dlaczego SQL jest tak dużą zaletą? Na moim blogu znajdziesz serię wpisów o Kafka Streams. Utworzenie aplikacji Kafka Streams wymaga posiadania IDE, wiedzy programistycznej, zbudowania aplikacji, czyli jest to złożony i skomplikowany proces. Napisanie kilku linijek SQL w ksqlDB jest znacznie szybsze. Spark SQL, Flink SQL, Beam SQL… okazuje się, że SQL jest spoko 😊.
ksqlDB w Cybersecurity?
ksqlDB zwrócił moją uwagę nie bez powodu. Okazuje się, że można tłumaczyć reguły sigma na zapytania właśnie w ksqlDB. Reguły sigma są to takie generyczne moduł detekcji, które możemy przetłumaczyć na dowolne inne rozwiązanie (Splunk, Elasticserach, QRadar itp.)
Jako osoba łącząca światy Big Data i Cybersecurity, widzę tu spory potencjał 😁. Możliwość detekcji na danych które jeszcze nie są w SIEM’ie (a może nie muszą być?) daje spore możliwości architektoniczne. No ale temat potencjału ekosystemu Kafki w rozwiązanich Security zostawię na inny artykuł.
Problem
We wpisie postaramy się wykorzystać ksqlDB do rozwiązania problemów dotyczących monitorowania komunikacji sieciowej (np. Netflow, flowy z Packetbeat/Zeek/Suricata). Póki co wpadły mi do głowy takie:
- Wolumen danych jest ogromny. Nawet “bezczynne” stacje mogą generować dużą liczbę zapytań sieciowych. – Spróbujemy zagregować dane w oknach 1 minuty po adresach ip i portach. Powinno to znacznie ograniczyć rozmiar danych zachowując jednocześnie potencjał analityczny.
- Liczenie liczności zabioru to nie lada problem(dla zbiorów o dużej liczności), a skanowanie sieci można wykrywać na podstawie unikalnej liczby adresatów i portów w czasie. – Niektóre rozwiązania podają przybliżone wyniki, np. Cardinality aggregation w Elasticsearch oraz Terms Query o czym napisałem artykuł. Przygotowane agregaty przyśpieszą detekcje.
W tym przypadku użyję Packetbeat’a, czyli aplikacji ze stajni Elastic do monitorowania sieci.
Jeśli masz więcej pomysłow, daj znać 😁.
Środowisko
Kilka VM’ek, jedna z Packetbeatem, druga z Kafką i ksqlDB, trzecia do robienia “hałasu”.
Apache Kafka + Zookeeper + ksqlDB
Poniżej docker-compose.yml
ze strony https://ksqldb.io. Jeśli chcesz wystawić Kafkę poza localhost
, zmień wartość z KAFKA_ADVERTISED_LISTENERS
na adres ip lub nazwę hosta.
--- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.0.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-kafka:7.0.0 hostname: broker container_name: broker depends_on: - zookeeper ports: - "29092:29092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 ksqldb-server: image: confluentinc/ksqldb-server:0.22.0 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker ports: - "8088:8088" environment: KSQL_LISTENERS: http://0.0.0.0:8088 KSQL_BOOTSTRAP_SERVERS: broker:9092 KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" ksqldb-cli: image: confluentinc/ksqldb-cli:0.22.0 container_name: ksqldb-cli depends_on: - broker - ksqldb-server entrypoint: /bin/sh tty: true
docker-compose up
postawi wszystko na nogi.
Packetbeat
Po instalacji w ścieżce /etc/packetbeat.packetbeat.yml
znajdziesz plik konfiguracyjny. W moim przypadku:
- Maszyna wirtualna ma dwa interfejsy sieciowe. Jeden z nich ustawiłem w tryb nasłuchu (
ip link set nazwa_interfejsu promisc on
). W konfiguracji można wskazać konkretny interfejs na którym słuchamypacketbeat.interfaces.device: nazwa_interfejsu
- Dane lądują bezpośrednio w kafce
... output.kafka: hosts: ["tam_gdzie_mamy_kafka:29092"] topic: "packetbeat" ...
no i odpalamy systemctl start packetbeat
.
Jeśli wszystko działa to w Kafce powinien pojawić się topic packetbeat
. Poniżej użycie aplikacji kafkacat
.
maciej@node-01:~/kafka$ kafkacat -b 192.168.63.132:29092 -L Metadata for all topics (from broker 1: 192.168.63.132:29092/1): 1 brokers: broker 1 at 192.168.63.132:29092 (controller) 4 topics: topic "packetbeat" with 1 partitions: partition 0, leader 1, replicas: 1, isrs: 1 topic "_confluent-ksql-default__command_topic" with 1 partitions: partition 0, leader 1, replicas: 1, isrs: 1 topic "default_ksql_processing_log" with 1 partitions: partition 0, leader 1, replicas: 1, isrs: 1 topic "__transaction_state" with 50 partitions: partition 0, leader 1, replicas: 1, isrs: 1 partition 1, leader 1, replicas: 1, isrs: 1 partition 2, leader 1, replicas: 1, isrs: 1 partition 3, leader 1, replicas: 1, isrs: 1 ...
ksqlDB
Aby dostać się do CLI ksqldb wpisz docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
.
maciej@node-01:~/kafka$ sudo docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release. =========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = The Database purpose-built = = for stream processing apps = =========================================== Copyright 2017-2021 Confluent Inc. CLI v0.22.0, Server v0.22.0 located at http://ksqldb-server:8088 Server Status: RUNNING Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
Możemy wypróbować SHOW STREAMS
oraz SHOW TABLES
.
ksql> show streams; Stream Name | Kafka Topic | Key Format | Value Format | Windowed ------------------------------------------------------------------------------------------ KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA | JSON | false ------------------------------------------------------------------------------------------ ksql> show tables; Table Name | Kafka Topic | Key Format | Value Format | Windowed ----------------------------------------------------------------- -----------------------------------------------------------------
Gdzie strumyk płynie z wolna
Przykładowy rekord
Czas utworzyć strumień na bazie topic’a packetbeat
. Strumień wymaga konkretnego schematu. Spójrzmy na przykładowy dokument z kolejki.
{ "@timestamp" : "2021-12-12T16:39:46.106Z", "@metadata" : { "beat" : "packetbeat", "type" : "_doc", "version" : "7.16.0" }, "source" : { "ip" : "192.168.63.1", "port" : 61668, "packets" : 5, "bytes" : 607, "mac" : "00:50:56:c0:00:0b" }, "event" : { "category" : [ "network_traffic", "network" ], "action" : "network_flow", "type" : [ "connection" ], "start" : "2021-12-12T16:39:42.257Z", "end" : "2021-12-12T16:39:42.258Z", "duration" : 59589, "dataset" : "flow", "kind" : "event" }, "flow" : { "id" : "EQQA////DP//////FP8BAAEAUFYxBLMAUFbAAAvAqD+EwKg/AZgf5PA", "final" : false }, "agent" : { "id" : "bc13a45f-d15e-4f8f-9dd8-b8a415c77cba", "name" : "sonda", "type" : "packetbeat", "version" : "7.16.0", "hostname" : "sonda", "ephemeral_id" : "3b163a1e-4015-4269-8bbd-9b5b6ed15275" }, "destination" : { "mac" : "00:50:56:31:04:b3", "ip" : "192.168.63.132", "port" : 8088, "packets" : 4, "bytes" : 691 }, "type" : "flow", "network" : { "community_id" : "1:lDm3gUH3XVecHFO7Rgl1pM9ys7k=", "bytes" : 1298, "packets" : 9, "type" : "ipv4", "transport" : "tcp" }, "ecs" : { "version" : "1.12.0" }, "host" : { "os" : { "type" : "linux", "platform" : "ubuntu", "version" : "20.04.3 LTS (Focal Fossa)", "family" : "debian", "name" : "Ubuntu", "kernel" : "5.4.0-91-generic", "codename" : "focal" }, "id" : "a6be3dd21ef0448e9f1148cc0ce23900", "containerized" : false, "ip" : [ "192.168.63.131", "fe80::250:56ff:fe3e:a6b4", "fe80::250:56ff:fe31:676a" ], "mac" : [ "00:50:56:3e:a6:b4", "00:50:56:31:67:6a" ], "hostname" : "sonda", "architecture" : "x86_64", "name" : "sonda" } }
Stream ‘packetbeat’
Nie potrzebujemy wszystkich pól. Ograniczymy się do nadawcy, adresata, typu oraz informacji sieciowych.
CREATE STREAM packetbeat ( "@timestamp" VARCHAR, "type" VARCHAR, destination STRUCT < packets INT, bytes INT, mac VARCHAR, ip VARCHAR, port INT >, source STRUCT < packets INT, bytes INT, mac VARCHAR, ip VARCHAR, port INT >, network STRUCT < community_id VARCHAR, bytes INT, packets INT, "type" VARCHAR, ksqltransport VARCHAR > ) WITH ( kafka_topic='packetbeat', value_format='json', partitions=1 );
“Niepoprawne” nazwy kolumn otoczone są "
lub '
. W sekcji WITH
można zdefiniować pole odpowiadające za czas rekordu (TIMESTAMP
oraz TIMESTAMP_FORMAT
). Prawdopodobnie z powodu nazwy nie mogłem zrobić tego na tym etapie. Zediniujemy to w kolejnej iteracji strumienia.
Zajrzeć w strumień można za pomocą SELECT * FROM PACKETBEAT EMIT CHANGES;
ksql> SELECT * FROM PACKETBEAT EMIT CHANGES; +---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+ |@timestamp |type |DESTINATION |SOURCE |NETWORK | +---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+ |2021-12-12T17:08:06.106Z |flow |{PACKETS=981, BYTES=99900, |{PACKETS=1334, BYTES=185319|{COMMUNITY_ID=1:dLXfK8dyy8V| | | |MAC=00:50:56:31:04:b3, IP=1|4, MAC=00:50:56:3e:a6:b4, I|KZRcQtDWxdwExpmE=, BYTES=19| | | |92.168.63.132, PORT=29092} |P=192.168.63.131, PORT=5060|53094, PACKETS=2315, type=i| | | | |2} |pv4, TRANSPORT=tcp} | |2021-12-12T17:08:06.106Z |flow |{PACKETS=202, BYTES=13947, |{PACKETS=205, BYTES=13710, |{COMMUNITY_ID=1:yOSZ3T04I7D| | | |MAC=00:50:56:31:04:b3, IP=1|MAC=00:50:56:3e:a6:b4, IP=1|5ymG8mdUZFO/lsqc=, BYTES=27| | | |92.168.63.132, PORT=29092} |92.168.63.131, PORT=50600} |657, PACKETS=407, type=ipv4| | | | | |, TRANSPORT=tcp} | |2021-12-12T17:08:06.106Z |flow |{PACKETS=11862, BYTES=88076|{PACKETS=5983, BYTES=687565|{COMMUNITY_ID=1:x89DUAxMC1t| | | |7, MAC=00:50:56:31:04:b3, I|, MAC=00:50:56:c0:00:0b, IP|Qo+dR7Sz8qb8xV4g=, BYTES=15| | | |P=192.168.63.132, PORT=2909|=192.168.63.1, PORT=52238} |68332, PACKETS=17845, type=| | | |2} | |ipv4, TRANSPORT=tcp} | |2021-12-12T17:08:06.106Z |flow |{PACKETS=5, BYTES=870, MAC=|{PACKETS=7, BYTES=727, MAC=|{COMMUNITY_ID=1:6ykEYsDO1g6| | | |00:50:56:31:04:b3, IP=192.1|00:50:56:c0:00:0b, IP=192.1|jlUviXxYLxdWxyKE=, BYTES=15| | | |68.63.132, PORT=8088} |68.63.1, PORT=50703} |97, PACKETS=12, type=ipv4, | | | | | |TRANSPORT=tcp} |
Stream ‘packetbeat_renamed’
Przy tworzeniu strumienia packetbeat
był problem z nazwą dwóch pól. Zrómy kolejny strumień w którym użyjemy aliasów.
CREATE STREAM packetbeat_renamed AS SELECT "@timestamp" as ts, destination, source, network, "type" as flow_type FROM packetbeat EMIT CHANGES;
Stream ‘packetbeat_flows’
Załóżmy, że interesują nas tylko rekordy type = 'flow'
. Przy okazji poprawimy też kwestię czasu utworzenia rekordu.
CREATE STREAM packetbeat_flows WITH ( timestamp = 'ts', timestamp_format = 'yyyy-MM-dd''T''HH:mm:ss.SSSX' ) AS SELECT ts, destination, source, network FROM packetbeat_renamed WHERE flow_type = 'flow' EMIT CHANGES;
Zwróć uwagę na format znacznika czasu. Tutaj masz ściągę. Przy szukaniu odpowiedniego formatu timestamp’a przydały mi się logi ksqldb. SELECT * FROM...
nic nie zwracał przy błędnym formacie (nawet błędu).
Table ‘packetbeat_flows_by_1m’
Tym razem utworzymy tabelę. Będzie to agregacja z wykorzystaniem Tumbling Window o stałym odstępie 1 minuty.
CREATE TABLE packetbeat_flows_by_1m WITH (KEY_FORMAT='JSON') AS SELECT source -> ip as srcip, source -> port as srcport, destination -> ip as dstip, destination -> port as dstport, network -> transport, SUM(source -> packets) as source_packets, SUM(source -> bytes) as source_bytes, SUM(destination -> packets) as destination_packets, SUM(destination -> bytes) as destination_bytes, SUM(network -> packets) as network_packets, SUM(network -> bytes) as network_bytes, COUNT(*) as cnt FROM packetbeat_flows WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY source->ip, source-> port, destination->ip, destination->port, network->transport EMIT CHANGES;
Kod jest dość prosty. Zwróć uwagę na:
- KEY_FORMAT – Format JSON wymagany jest dla kluczy składających się z wielu kolumn.
- -> – W ten sposób odnosimy się do pól w polach typu STRUCT.
- WINDOW TUMBLING (SIZE 1 MINUTE) – Okno o stałej długości 1 minuty które nie nachodzi na siebie.
Pewnie zastanawiasz się co to jest tabela i czym różni się od strumienia. Tabela jest to odpowiednik topic’a w kafce z kompaktowaniem danych ( log compaction). Polega to na tym, że rekordy nadpisane (rekord o tym samym kluczu, ale nowszy) znikają z kolejki.
W powyższym screenie widać, że cleanup.policy
jest zarówno delete
jak i compact
. W przypadku samego compact
tabela/topic może bardzo spuchnąć, aczkolwiek czasami jest ku temu powód. delete
to domyślna polityka dla topicu w kafce (rekordy znikają po tygodniu).
maciej@node-01:~$ kafkacat -t PACKETBEAT_FLOWS_BY_1M -b localhost:29092 -c3 -K : % Auto-selecting Consumer mode (use -P or -C to override) {"SRCIP":"192.168.63.131","SRCPORT":50602,"DSTIP":"192.168.63.132","DSTPORT":29092,"TRANSPORT":"tcp"}}:{"SOURCE_PACKETS":4889,"SOURCE_BYTES":5456387,"DESTINATION_PACKETS":3630,"DESTINATION_BYTES":370056,"NETWORK_PACKETS":8519,"NETWORK_BYTES":5826443,"CNT":1} {"SRCIP":"192.168.63.131","SRCPORT":50600,"DSTIP":"192.168.63.132","DSTPORT":29092,"TRANSPORT":"tcp"}}:{"SOURCE_PACKETS":772,"SOURCE_BYTES":51636,"DESTINATION_PACKETS":755,"DESTINATION_BYTES":52167,"NETWORK_PACKETS":1527,"NETWORK_BYTES":103803,"CNT":1} {"SRCIP":"192.168.63.1","SRCPORT":52238,"DSTIP":"192.168.63.132","DSTPORT":29092,"TRANSPORT":"tcp"}}:{"SOURCE_PACKETS":23272,"SOURCE_BYTES":2713747,"DESTINATION_PACKETS":45827,"DESTINATION_BYTES":3790956,"NETWORK_PACKETS":69099,"NETWORK_BYTES":6504703,"CNT":1}
W pobranych danych za pomocą kafkacat
widać klucz (kolumny po których grupowaliśmy) oraz wartości (w tym przypadku sumy).
Table ‘packetbeat_flow_unique_dest_by_srcip’
Zajmijmy się kolejnym problemem, czyli skanowaniem portów.
CREATE TABLE packetbeat_flow_unique_dest_by_srcip AS SELECT source -> ip as srcip, COUNT_DISTINCT(destination -> ip) as unique_dstip_cnt, COUNT_DISTINCT(destination -> port) as unique_dstport_cnt FROM packetbeat_flows WINDOW TUMBLING (SIZE 1 MINUTE, GRACE PERIOD 1 MINUTE) GROUP BY source->ip EMIT CHANGES;
Kod SQL jest dość prosty. Tym razem nie musimy wskazywać formatu klucza. Dodałem GRACE PERIOD, czyli maksymalny czas na jaki pozwalamy “spóźnić się” rekordom z tego okna czasowego. We wcześniejszych wersjach ksqlDB domyślnyą wartością były 24 godziny. Zostało to zmienione w KIP-633.
Returns the approximate number of unique values of col1 in a group. The function implementation uses HyperLogLog to estimate cardinalities of 10^9 with a typical standard error of 2%.
Powyższy fragment pochodzi z dokumentacji COUNT_DISTINCT. Pamiętasz jak wspominałem, że liczenie liczności to nie lada problem?
Wnioski
ksqlDB zapowiada się obiecująco. Można w znaczny sposób ograniczyć wolumen danych i uprościć dalszą analizę/detekcję.
Co dalej?
Pewnie spróbuję wrzucić wyniki do Elasticsearch’a. Zastanawiam się czy będzie trzeba definiować id dokumentów (w końcu wynik agregacji może się zmienić jak przyjdzie jakaś zguba).
Repozytorium
Utworzyłem repo zorteran/cybersecurity-ksqldb. Zobaczymy, może z czasem wrzucę tam wiecej skryptów związanych z ksqlDB.
Super artykuł. Trzeba było by w końcu zobaczyć, czy ksql będzie działał z RedPanda.
Jak możesz do dorzuć plik z docker-compose do Swojego repo, zawsze jest szybszy start.