Change Data Capture – Zmień Bazę W Strumień (Debezium)

change-data-capture-debezium

Myślałeś/aś kiedyś o utworzeniu strumienia z operacji w bazie danych? W tym wpisie dowiesz się czym jest Change Data Capture i jak go wykorzystać planując architekturę naszego systemu. W części praktycznej sprawdzimy działanie Debezium na bazie MySQL

Co to jest Change Data Capture?

Change Data Capture to proces wykrywania zmian dokonywanych w bazie danych. Takie zmiany potem możemy strumieniowo aplikować w innych bazach, przetwarzać, zapisywać i na nie reagować. Innymi słowy: otrzymujemy strumień zdarzeń z naszej bazy danych.

Umożliwia nam to szybsze i precyzyjne decyzje oparte na danych (Stream Processing i Streaming ETL). Nie obciąża systemów i sieci tak jak klasyczne rozwiązania. Często jest to jedyne sensowne rozwiązanie umożliwiające modernizacje systemów legacy.

Rodzaje Change Data Capture

Query-Based

Czy jeśli będę pytał co 5 sekund bazę danych o konkretną tabelę, to jest to Change Data Capture? Na swój sposób tak, ale ma swoje minusy.

  1. Wymaga odpowiedniego przygotowania tabeli: kolumna z czasem modyfikacji rekordu oraz mechanizm aktualizacji tego pola
  2. Nie wykrywamy wszystkich operacji. W przypadku usuwania rekordów, musimy zastosować trick soft delete, czyli zaznaczyć rekord flagą do usunięcia, a fizycznie usunąć go później innym mechanizmem
  3. Nie wykrywamy zmiany w strukturze tabeli
  4. Możemy nadmiernie obciążyć pytaniami bazę danych lub sieć, jeśli danych jest sporo
  5. Nie wykrywamy poprzedniego stanu w którym był rekord.

Czy tyle minusów skreśla wykorzystanie tego sposobu? To wszystko zależy od kontekstu. Logstash’em można w ten sposób synchronizować bazę z Elasticsearch’em. Jeśli nam to wystarcza, nie ma sensu utrzymywać kolejnych technologii.

Log-Based

Generalizując, bazy danych zapisują wszystkie transakcje i modyfikacje bazy w transacation log‘u. Jest to kluczowy element. W przypadku awarii pozwala na powrót do spójnego stanu.

Czytając taki log wykrywamy wszystkie operacje zapisu, zmiany i usunięcia rekordów, a także zmiany schematu tabel. Takie dane możemy wysyłać od razu lub paczkami. Obciążenie bazy jest znikome, a na pewno mniejsze niż w przypadku Query-Based CDC.

Każda baza na swój sposób podchodzi do Change Data Capture. Tutaj znajdziesz informacje jak to robi SQL Server. W dalszej części artykułu użyjemy Debezium, otwartej i rozproszonej platformy CDC.

Jak mogę to wykorzystać?

Zapis do wielu źródeł

Zaczynamy z prostą aplikacją i pojedynczą bazą danych. Z czasem dochodzi cache (Redis, Memcached), silnik wyszukiwarki (Elasticsearch, Solr), kolejki (Kafka, RabbitMQ), mikroserwisy i wiele baz (Polyglot Persistence).

Ostatecznie, generowane dane mają wielu adresatów. Wysyłanie rekordu do wszystkich baz przez jeden serwis komplikuje logikę i utrudnia zachowanie spójności (Two Phase Commit też ma swoje bolączki). Na implementację architektury zdarzeniowej (Event Sourcing) w aplikacji może być już za późno. W takiej sytuacji Change Data Capture “całe na biało”.

Integracja baz danych

Widziałes/aś kiedyś “integrację” danych między bazami na zasadzie SELECT * FROM table ? Ja tak. Nie wiedziałem czy śmiać się czy płakać 😉. Change Data Capture sprawdzi się też w tym przypadku. Na pewno o wiele mniej inwazyjnie.

ETL i Stream Processing

Utworzenie strumienia z bazy danych otwiera nam drogę do przetwarzania strumieniowego w procesach ETL. Architektura typu lambda lub kappa przyśpieszają procesy decyzyjne. Dodatkowo, nie wszystkie operacje można zrównoleglić. Z Prawa Amdahla wiemy że nawet w większości zrównoleglony kod nie skalują się liniowo. Podejście strumieniowe rozciąga przetwarzanie w czasie co ma pozytywny wpływ na utylizację zasobów.

Debezium

Debezium to otwarta i rozproszona platforma CDC. W praktyce jest to zbiór connectorów do Kafka Connect, a co z tym się wiąże, rozwiązanie oparte jest o Apache Kafka. Mamy do dyspozycji konektory do MySQL, PostgreSQL, MongoDb, Cassandra, SQL Server, Oracle, Db2 i Vitesse.

Środowisko

Im nowsze systemy, tym uruchomienie Hello World’a staje się trudniejsze i wymaga coraz więcej klocków 😅. Posłużymy się Docker Compose, a w nim:

  • Zookeeper
  • Apache Kafka
  • AKHQ – Webowe GUI do Kafki
  • Kafka Connect z Debezium
  • MySQL w wersji 8
version: '2'

services:
  zookeeper:
    image: 'docker.io/bitnami/zookeeper:3-debian-10'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'docker.io/bitnami/kafka:2-debian-10'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
    depends_on:
      - zookeeper
  connect:
    image: debezium/connect:1.4
    ports:
      - 8083:8083
    environment:
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - GROUP_ID=1
      - BOOTSTRAP_SERVERS=kafka:9092
  akhq:
    image: tchiotludo/akhq
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            docker-kafka-server:
              properties:
                bootstrap.servers: "kafka:9092"
              connect:
                - name: "connect"
                  url: "http://connect:8083"
    ports:
      - 8080:8080
    links:
      - kafka
      - zookeeper

  mysql:
    image: mysql:8
    ports:
      - 3306:3306
    environment:
      MYSQL_DATABASE: 'school'
      MYSQL_USER: 'user'
      MYSQL_PASSWORD: 'password'
      MYSQL_ROOT_PASSWORD: 'password'
AKHQ ułatwia zarządzanie klastrem Apache Kafka

Konfiguracja Debezium MySQL w Kafka Connect

Kafka Connect poświęciłem osobny artykuł. Skoro już mamy AKHQ, spróbujmy coś wyklikać.

Widać dostępne wszystkie connectory od Debezium

Wykorzystana konfiguracja znajduje się poniżej. AKHQ podpowiada nam wszystkie możliwe opcje razem z opisami.

tasks.max = 1
database.history.kafka.topic = schema-changes.school
database.hostname = mysql
database.user = user
database.password = password
database.server.name = school
database.include.list = school
database.history.kafka.bootstrap.servers = kafka:9092

Jeśli konfiguracja jest poprawna to powinniśmy zobaczyć zielony status conenctora, nowe topic’ki w Kafce i “pozytywne” logi

onnect_1    | 2021-01-10 17:42:06,243 INFO   MySQL|school|task  Creating thread debezium-mysqlconnector-school-binlog-client   [io.debezium.util.Threads]
connect_1    | 2021-01-10 17:42:06,246 INFO   MySQL|school|task  Creating thread debezium-mysqlconnector-school-binlog-client   [io.debezium.util.Threads]
connect_1    | Jan 10, 2021 5:42:06 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
connect_1    | INFO: Connected to mysql:3306 at binlog.000001/156 (sid:5793, cid:14)
connect_1    | 2021-01-10 17:42:06,262 INFO   MySQL|school|binlog  Connected to MySQL binlog at mysql:3306, starting at binlog file 'binlog.000001', pos=156, skipping 0 events plus 0 rows   [io.debezium.connector.mysql.BinlogReader]
connect_1    | 2021-01-10 17:42:06,262 INFO   MySQL|school|task  Waiting for keepalive thread to start   [io.debezium.connector.mysql.BinlogReader]
connect_1    | 2021-01-10 17:42:06,263 INFO   MySQL|school|binlog  Creating thread debezium-mysqlconnector-school-binlog-client   [io.debezium.util.Threads]
connect_1    | 2021-01-10 17:42:06,363 INFO   MySQL|school|task  Keepalive thread is running   [io.debezium.connector.mysql.BinlogReader]

Dodanie danych do MySQL

Dobrać się do konsoli MySQL w Docker Compose możemy poniższą komendą.

 sudo docker-compose exec mysql mysql -uroot -p

Sensowność danych w tym przypadku nie ma znaczenia. Wziąłem gotowe kawałki SQL z integracji Logstash’em bazy MySQL z Elasticsearch z mojego kursu.

use school;

CREATE TABLE IF NOT EXISTS groups(
    group_id int(11) NOT NULL AUTO_INCREMENT,
    group_number char(4) NOT NULL,
    description VARCHAR(100),
    something_important int(11),
    modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (group_id),
    KEY group_number (group_number)
    ) ENGINE=InnoDB;

INSERT INTO groups
(group_number, description, something_important)
VALUES
('1A','some group',100),
('2A','some group',102),
('3A','some group',101),
('1B','some group',123),
('2B','some group',133),
('3B','some group',144);

UPDATE school.groups
SET
   description = 'updated'
WHERE
    group_id % 2 = 0;

GIF z działania connectora

Podsumowanie

Jak właśnie się przekonałeś/aś, CDC w Debezium nie jest takie trudne, a otwiera sporo możliwości. Czasami będą potrzebne drobne aplikacje po stronie serwera (np. PostgreSQL lub Cassandra). Jeśli zainteresował Cię ten temat, warto posłuchać podcastu od Confluent. Wypowiada w nim współautor Debezium.

3 komentarze do “Change Data Capture – Zmień Bazę W Strumień (Debezium)”

  1. Cześć,
    wydaje mi się, że jest jakiś błąd w komunikacji. Utknąłem na utworzeniu definicji.
    Dostaje następujący komunikat:
    Connector configuration is invalid and contains the following 1 error(s): Unable to connect: Public Key Retrieval is not allowed You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

    Na screenie widzę, że podany w konfiguracji jest user “root” a w listingu user “user”. Który z nich jest prawidłowy?

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *