این پروژه نمونهای برای راهاندازی Change Data Capture (CDC) از دیتابیس PostgreSQL و ارسال تغییرات به Kafka (Redpanda) است. همچنین امکان خواندن داده از فایلهای CSV و ارسال به Kafka فراهم شده است.
- Docker و Docker Compose
- دسترسی به اینترنت برای دریافت ایمیجها
.env: تنظیم نسخه PostgreSQLcompose.yml: تعریف سرویسهای Docker (Postgres, Redpanda, Redpanda Connect, Kafka Connect, Redpanda Console)postgres_cdc.yml: تنظیمات CDC برای خواندن تغییرات جداول Postgres و ارسال به Kafkaconnect-config.yml: تنظیمات خواندن فایلهای CSV و ارسال به Kafkainit.sql: اسکریپت ساخت جداول نمونهcommand.sql: درج داده نمونه در جداولdata/: محل قرارگیری فایلهای CSVredpanda_data/,db_data/: دادههای دائمی سرویسهاredpanda_license.txt: لایسنس Redpanda
در فایل .env نسخه مورد نظر را وارد کنید (مثلاً ۱۵):
PG_VERSION=15
ابتدا پوشه داده Redpanda را بسازید و دسترسی لازم را بدهید:
mkdir redpanda_data
chmod -R 777 redpanda_dataسپس سرویسها را اجرا کنید:
docker compose up -dبرای اجرای Redpanda Connect (پروفایل red-connect):
docker compose --profile=red-connect run --rm -it redpanda_connectتوجه داشته باشید که کانکتور ردپاندا برای پستگرس نیازمند لایسنس می باشد. بنابراین برای اجرای آن لازم است لایسنس معتبر را وارد کنید.
پس از بالا آمدن سرویسها، کانکتور Debezium را به Kafka Connect اضافه کنید:
curl -X POST http://127.0.0.1:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "pg-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres123",
"database.dbname": "modai",
"database.server.name": "pgserver1",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_pub",
"table.include.list": "public.users",
"topic.prefix": "pg",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}'پس از اینکه دستور فوق اجرا شد با مراجعه به http://127.0.0.1:9100/topics می توانید تاپیک های ایجاد شده را مشاهده کنید.
می توانید کوئری های درج، ویرایش و حذف را روی جدول users انجام دهید و دیتای ذخیره شده در تاپیک را مشاهده کنید. چیزی شبیه زیر:
"payload": {
"before": null,
"after": {
"id": 2,
"name": "ali alavi"
},
"source": {
"version": "2.7.3.Final",
"connector": "postgresql",
"name": "pg",
"ts_ms": 1749561987091,
"snapshot": "false",
"db": "modai",
"sequence": "[null,\"26718008\"]",
"ts_us": 1749561987091292,
"ts_ns": 1749561987091292000,
"schema": "public",
"table": "users",
"txId": 750,
"lsn": 26718008,
"xmin": null
},
"transaction": null,
"op": "u",
"ts_ms": 1749561987185,
"ts_us": 1749561987185745,
"ts_ns": 1749561987185746000
}فایل postgres_cdc.yml برای Redpanda Connect تنظیم شده و جداول users و posts را مانیتور میکند.
خروجی به Kafka روی تاپیک modai ارسال میشود.
برای خواندن فایلهای CSV و ارسال به Kafka، فایل connect-config.yml را به جای postgres_cdc.yml به کانتینر redpanda_connect متصل کنید (در compose.yml خط مربوطه را فعال کنید):