开发了一个将数据更改从 Microsoft SQL Server 流式传输到 Apache Kafka 的工具
Made a Tool to Streams Changes from Microsoft SQL Server to Apache Kafka

原始链接: https://github.com/Niyko/Athena

Athena 是一款轻量级的、基于 Go 语言的变更数据捕获 (CDC) 解决方案,旨在将 MSSQL 数据库的变更(创建、更新、删除)流式传输至 Apache Kafka。与 Debezium 等复杂的替代方案不同,Athena 提供了一种简化的即插即用体验,在自动化数据库 CDC 配置的同时,为下游消费者提供直观的事件格式。 **主要特性:** * **易于使用:** 自动化 CDC 设置;仅需一个简单的 `config.json` 文件。 * **Kafka 集成:** 将变更流式传输至预先创建的 Kafka 主题,并支持 SASL 身份验证和 TLS。 * **运维灵活性:** 支持通过 `skippedTables` 排除特定数据表,并包含可选的 ClickHouse 日志记录功能。 * **管理功能:** 提供辅助命令以进行卸载、添加/删除 CDC 以及清除历史记录。 **入门指南:** 1. 下载预编译的二进制文件。 2. 创建一个包含您的 MSSQL、Kafka 以及(可选的)ClickHouse 凭据的 `config.json` 文件。 3. 手动创建目标 Kafka 主题。 4. 运行 `setup` 命令配置数据库,然后部署服务(例如在 Linux 上使用 `systemd`)以在后台运行 `athena run` 进程。 Athena 基于 MIT 许可证开源,并完全支持通过 Go 进行自定义构建。

一位开发者发布了 **Athena**,这是一个基于 Golang 的轻量级变更数据捕获(CDC)工具,旨在将 Microsoft SQL Server 的变更数据流式传输到 Apache Kafka。 Athena 的设计追求运维简洁,旨在提供一种比 Debezium 等复杂工具更易用的替代方案。它具备配置简单、支持 SASL 认证的 Broker 以及自动管理 CDC 设置等功能。该工具将数据库变更发布到单个 Kafka 主题中,并以简洁直观的格式交付事件,从而简化了下游应用程序的消费过程。 在讨论中,用户指出虽然基于 Kafka 的解决方案行之有效,但其他数据库也存在类似的实现,例如 Zendesk 用于 MySQL 的 Maxwell,它可以将变更流式传输到 ActiveMQ 等其他消息代理,以执行缓存失效等任务。
相关文章

原文

Athena logo Athena is a lightweight Change Data Capture (CDC) solution that streams changes from Microsoft SQL Server to Apache Kafka. Built in Golang, it supports SASL-authenticated Kafka brokers and provides a straightforward setup experience. Unlike alternatives such as Debezium, which can be complex to configure and manage. Athena offers greater simplicity and operational ease. It automatically manages CDC setups, publishes database changes to a single Kafka topic, and delivers a clean, intuitive event format that is easy for downstream consumers to understand and process.

  • Creates a message for changes like create, update, delete for rows in MSSQL database tables to a single Kafka topic.
  • Athena only creates messages for all new table changes. Existing ones are ignored.
  • Kafka topic have to be created before hand. Unlike Debezium, Athena will not create the topic own it's own.
  • All CDC setups in MSSQL is automatically done by Athena when setup command is run.
  • By default, Athena will poll for changes for all tables, you can use the skippedTables option in the config.json to ignore any tables.

You can download the pre-compiled binaries from the Github releases page and copy them to the desired location. After that you can follow the below steps in order.

Create a config.json file in the root folder where you but the Athena binary. Here is the format of the JSON file. Fill all the credentials also.

You can find more details about the paramters in config file in below sections.

{
    "dbHost": "127.0.0.1",
    "dbPort": 1433,
    "dbUser": "",
    "dbPassword": "",
    "dbName": "",

    "kafkaHost": "",
    "kafkaEnableTLS": false,
    "kafkaTopic": "",

    "kafkaSASLMechanisms": "NONE",
    "kafkaSASLUsername": "",
    "kafkaSASLPassword": "",

    "pollInterval": 10,
    "fetchLimit": 50,
    "skippedTables": [],

    // If you want to collect logs in clickhouse
    "clickHouse": false,
    "clickHouseHost": "<host>:<port>",
    "clickHouseUsername": "",
    "clickHousePassword": "",
    "clickHouseDatabase": "",
    "clickHouseTableName": "",
    "clickHouseTableTTL": 12
}

Create topic with name given in config.json without scheme and with required partitions in you Kafka broker.

Run the setup command in order to create the CDC in database and other required setup.

Setup a service for running Athena in the background. Setting this up will different for Windows and Linux. Below given are the steps to create them on a Linux distro.

Create a service file called athena_mssql_kafka.service in the directory /etc/systemd/system using the following commands.

cd /etc/systemd/system
nano athena_mssql_kafka.service

Copy and paste the below contents to the above created service file athena_mssql_kafka.service.

[Unit]
Description=Athena MSSQL Kafka Service
After=network.target

[Service]
Type=simple
ExecStart=athena run

[Install]
WantedBy=multi-user.target

Please note that path in ExecStart needs to change while creating the service file.

Now you can start the service and also check the status of the service.

systemctl start athena_mmsql_kafka.service
systemctl status athena_mmsql_kafka.service

⚙️ Configuring Athena

Athena can be configured using the config.json file created on the root the Athena binary. Here are the details of the configuration keys and what they do in table format.

Option Description Example
dbHost Database host of MSSQL 127.0.0.1
dbPort Database port of MSSQL 1433
dbUser Username for the MSSQL database
dbPassword Password for the MSSQL database
dbName Database name of MSSQL
kafkaHost Host with port for the Kafka server 127.0.0.1:9092
kafkaTopic Kafka topic that you created for table changes to show
kafkaEnableTLS Enables TLS for Kafka connection true, false
kafkaSASLMechanisms SASL mechanism that need to be used for Kafka connection NONE, SASL-PLAIN, SASL-SCRAM-SHA-256, SASL-SCRAM-SHA-512
kafkaSASLUsername SASL user name of the Kafka server
kafkaSASLPassword SASL password of the Kafka server
pollInterval Interval where next polling to the database is made. It's given in seconds format. 10
fetchLimit Number of CDC changes rows that will be pulled from the table at once. 50
skippedTables Array of tables that needs to skipped while taking CDC changes. ["table1", "table2"]
clickHouse Enable Clickhouse logs. Table and struture for Clickhouse is automatically created by Athena when setup command is run true, false
clickHouseHost Host with port for the Clickhouse server 127.0.0.1:8123
clickHouseUsername User name of Clickhouse server
clickHousePassword Password of Clickhouse server
clickHouseDatabase Dasebase name of Clickhouse server
clickHouseTableName Table name of Clickhouse server
clickHouseTableTTL Time to live for each record in hours 24

🍄 Helper options in Athena

Athena executable have some other helper functions apart from setup or run which are explained below. These can be run like eg: ./athena uninstall

Option Description
uninstall Will disable CDC in MSSQL database and remove the SQlite database
add-cdc Will run CDC setup in the MSSQL database
remove-cdc Will disable CDC in MSSQL database
clear-cdc-history Clear CDC history or changes that Athena didn't process yet from the MSSQL database
recreate-clickhouse Rerun the Clickhouse migration
recreate-sqlite Recreate the SQlite database and rerun the migration
help To view all the options available

For setting up development environment, there is a docker file in the folder dev. It will create all necessary services like MSSQL with sample database, Kafka etc. This same environment can be used for running integration tests.

  • Install latest version of Go from here.
  • Clone that project from Github.
  • Run go mod download command to install all mods.
  • Then run the commands below as needed.
cd dev
docker composer up -d
cd ..
set GORUN=true # Used for identifying if script is run from go run command to choose correct path for reading config.json or db.sqlite
go run . setup
go run . run

Please note that the docker-compose.yml in the dev folder should only be used for development purpose.

Before running the tests, make sure you have setup the development environment and also config.json is setup correctly.

cd tests
go test -v -run TestIntegration

You can build the binaries or do development of Athena by following the below steps. Athena is build fully on Golang. So you should install latest version of Go from here. Do note that building binaries are managed with the Goreleaser.

  • Clone that project from Github.
  • Run go mod download command to install all mods.
  • Run SET GORUN=true command to set gorun variable.
  • Run the command goreleaser release --snapshot --clean for building the binaries.

Athena is licensed under the MIT License.

联系我们 contact @ memedata.com