Wyślij alerty z Elastic SIEM do Discord za darmoszkę – Kafka Connect + Python

Wrzucenie alertów z Elastic Stack na Apache Kafka daje wiele nowych możliwości. Możemy wysłać notyfikacje na Discord. Wszelka automatyzacja i enrichment stoją otworem. W artykule spróbujemy to zrobić na darmowej licencji 🤫.

Środowisko

W tym przypadku wykorzystałem:

  • Apache Kafka – jako message broker
  • Kafka Connect + kafka-connect-elasticsearch-source – do pobierania danych z Elasticsearch’a
  • Skrypcik w pythonie – szukałem Kafka Connect sink’a do discorda, ale niestety nikt się takowym nie pochwalił na Github 🙂 Może kiedyś napiszę swój connector w Scali…

Przygotowując ten artykuł korzystałem z gotowego klastra Elasticsearch, któy miałem pod ręką. Jeśl takiego nie masz, możesz skorzystać z mojego gotowca z którego często korzystam- zorteran/elastic-stack-docker-boilerplate

Jeśli chodzi o Kafkę, wykorzystalem docker-compose cp-all-in-one/docker-compose.yml od Confluenta. Poniżej fragment który musiałem zmienić. W katalogu connect-plugins umieściłem wspomniany kafka-connect-elasticsearch-source. Plik ca.p12 to certyfikat CA mojego klastra Elasticsearch. Musimy musimy mu zaufać, aby powiodła się weryfikacja TLS. Pamiętaj, aby SAN‘y w certyfikacjie był zgodne z rzeczywistością.

...
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/share/connect-plugins"
    volumes:
      - "./connect-plugins/ca.p12:/etc/certs/ca.p12"
      - "./connect-plugins:/usr/share/connect-plugins"
...

A po co mi ta Kafka?

Twórca Debezium znany jest ze słów Friends Don’t Let Frends Do Dual-Writes. Debezium (Change Data Capture) poświęciłem osobny wpis na blogu. Na począktu mógłby wystarczyć zwykły skrypt w Python… Z czasem zdarzeniami zainteresuje się drugi zespół, potem ktoś wpadnie na pomysł wysyłania zdarzeń typu crtitical prosto na maila. Finalnie skrypt znacznie się rozrośnie, a jego utrzymanie będzie uciążliwe. Nie bez powodu świat ciągnie w kierunku mikroserwisów i wzorców typu outbox.

Alert w Kibanie

Alerty wizualnie mniej więcej w takiej formie. Testowałem to na banalnych regułach typu wykryj wykonania sleep, vi. Klikając w tab JSON zobaczymy źródłowy dokument znajdujący się w indeksie .internal.alerts-security.alerts-default-*.

Konfiguracja Kafka Connect

Do konfiguracji connectoró możemy wykorzystać (płatnego) Confluent Control Center, (płatnego) Conduktor, (darmowego) AKHQ… albo darmowego curl’a 😇. Poniżej przykładowa konfiguracja. Na co zwróciłbym uwagę:

  • “value.converter”: “org.apache.kafka.connect.json.JsonConverter” – ponieważ, dane w Elasticsearch’u to JSON’y
  • value.converter.schemas.enable”: “false” – ponieważ interesje nas dokument, a nie jego JSON schema.
  • “incrementing.field.name”: “@timestamp” – w przypadku alertów, pole @timestamp będzie najlepszym kandydatem. Dla CDC zwykłych indeksów lepszym kandydatem może być event.created lub event.ingested.
{
  "name": "Source - ES alerts",
  "config": {
    "connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "es.host": "10.10.10.10",
    "es.scheme": "https",
    "es.port": "9200",
    "es.user": "kafka-connect",
    "es.password": "some-secret-password",
    "es.tls.truststore.location": "/etc/certs/ca.p12",
    "index.prefix": ".internal.alerts-security.alerts-default-",
    "topic.prefix": "es-alerts",
    "fieldname_converter": "nop",
    "incrementing.field.name": "@timestamp"
  }
}

Po przygotowaniu pliku wystarczy go puścić curl’em. Jeśli coś nie działa (pamietasz o prawidłowym certyfikacie i SAN?), zerknij do logów Kafka Connect.

curl -X POST -H "Content-Type: application/json" --data @body.json http://my-kafka.lan:8083/connectors | jq
Tak wygląda działajacy task z perspektywy Conduktor’a.
Alerty pojawiają się w topic’ach

Skrypcik w pythonie

Strumień alertów w topic’ach zapewniony. Można zabrać się za skrypy. Miałem już wygenerowany webhook dla Gitlaba, więc wybrałem tę drogę. W skrócie: niekończąca się pętla, która wrzuca tytuł i opis alertu jako Embed w Discord.

from json import loads
from confluent_kafka import Consumer
from loguru import logger
from discord_webhook import DiscordWebhook, DiscordEmbed

KAFKA_CONF = {'bootstrap.servers': "10.10.10.10:9092",
              'group.id': "dev-wiaderko",
              'auto.offset.reset': 'latest'}  # earliest / latest
KAFKA_TOPICS = ["^es-alerts.*"]
DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/something/somethingelse"
DISCORD_NOTIFICATION_USERNAME = "Mały Krzykacz"


def send_discord_notification(title, description):
    webhook = DiscordWebhook(url=DISCORD_WEBHOOK_URL, username=DISCORD_NOTIFICATION_USERNAME)
    embed = DiscordEmbed(title=title, description=description, color='03b2f8')
    embed.set_timestamp()
    webhook.add_embed(embed)
    webhook.execute()


def process_message(msg):
    logger.info("Processing message: topic={} partition={} offset={}", msg.error(), msg.topic(), msg.partition(),
                msg.offset())
    message = loads(msg.value().decode('utf-8'))
    send_discord_notification(title=message["kibana.alert.rule.name"],
                              description=message["kibana.alert.reason"].replace("senuto", 'wiaderko'))


logger.info("If you start me up, I'll never stop!")
try:
    consumer = Consumer(KAFKA_CONF)
    consumer.subscribe(KAFKA_TOPICS)
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None: continue
        if msg.error():
            logger.error("Kafka error: code={} topic={} partition={} offset={}", msg.error(), msg.topic(),
                         msg.partition(), msg.offset())
        process_message(msg)
finally:
    consumer.close()

PS. Nie umieszczaj kredek/api key itp. w kodzie w repozytorium. Kod powyżej to prototyp. Wykorzystaj zmienne środowiskowe, pliki konfiguracyjne itp. (os.getenv() method)

Wynik działania

Wnioski

Rozwiązanie wymagało minimalną ilość kodu, a znacznie zwiększyło możliwości wykorzystania alertów. Po głowie chodzi mi skrypt wykonujący enrichment alertów dla pól typu domena, IP, hash. Można wykorzystać do tego np. VirusTotal.

Rozważam napisanie swojego Discord Sink’a dla Kafka Connect. Zaletą tego warianty było by gotowe HA w przypadku więcej niż 1 węzła Kafka Connect. W przypadku skryptu Python o HA muszę postarać się sam.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *