Skip to content

[Feature] Support writing CDC data in sink table's column order for StarRock#4543

Open
zhuxt2015 wants to merge 4 commits intoDataLinkDC:devfrom
zhuxt2015:dev
Open

[Feature] Support writing CDC data in sink table's column order for StarRock#4543
zhuxt2015 wants to merge 4 commits intoDataLinkDC:devfrom
zhuxt2015:dev

Conversation

@zhuxt2015
Copy link
Contributor

Purpose of the pull request

When using CDCSOURCE to sync data from a source database (e.g. PostgreSQL) to
StarRocks, the generated Flink DDL was built from the source table's schema.
If the column order in the StarRocks sink table differs from the source, data
would be written into the wrong columns.

This PR fetches the sink table's actual column metadata at job submission time
and uses it as the basis for DDL generation and INSERT statements, ensuring
data is written in the correct column order as defined in the destination table.

Brief change log

Core Logic (CreateCDCSourceOperation)

  • Added setSinkTable(): matches each source table to its corresponding sink
    table by name (supports prefix/suffix/schema-prefix rules) and attaches it
    as table.sinkTable.
  • Added getSinkTables(): lists all tables from the sink schema using the
    sink driver.
  • Refactored getDriver(): extracted connector-specific URL resolution
    (starrocksjdbc-url, dorisfenodes, jdbcurl) into a
    single reusable method, removing duplication with checkAndCreateSinkSchema.

Sink DDL Generation (SQLSinkBuilder)

  • addSinkInsert() now resolves the effective table object as
    table.getSinkTable() when available, falling back to the source table.
    This makes both getFlinkDDL and createInsertOperations use the sink
    table's column definitions.

Model (Table)

  • Added sinkTable field to carry the resolved sink-side table metadata
    alongside the source table object.

Metadata Integration (PostgresCDCBuilder)

  • Implemented parseMetaDataConfig() with a helper composeJdbcProperties()
    to correctly append JDBC connection parameters (?key=value&...) to the
    PostgreSQL metadata URL.

Verify this pull request

Test class Coverage
StarRocksTypeConvertTest datetime→TIMESTAMP, date→DATE mappings; regression for other types and convertToDB
PostgresCDCBuilderTest composeJdbcProperties edge cases: null / empty / single / multiple params, trailing & guard
CreateCDCSourceOperationTest setSinkTable via reflection: plain match, schema-prefixed name, no match, driver column loading, first-match-wins
TableTest New sinkTable field: default null, set/get, source field isolation, reset to null
  1. Set up a PostgreSQL → StarRocks CDC pipeline via CDCSOURCE.
  2. Create the StarRocks target table with columns in a different order than
    the source table.
  3. Submit the job and confirm that data lands in the correct columns.

This pull request is code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

- StarRocksTypeConvertTest: 验证 datetime→TIMESTAMP 和 date→DATE 类型映射
- PostgresCDCBuilderTest: 覆盖 composeJdbcProperties 的边界场景(null/空/单参数/多参数)
- CreateCDCSourceOperationTest: 通过反射测试 setSinkTable()(匹配、不匹配、带schema前缀、driver加载列、首个匹配优先)
- TableTest: 覆盖新增 sinkTable 字段(默认null、存取、隔离性、重置)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant