Skip to content

Conversation

@MehulBatra
Copy link
Contributor

@MehulBatra MehulBatra commented Jan 11, 2026

Purpose

Linked issue: close #2333

Brief change log

File: ChangelogFlinkTableSource.java
Purpose: Flink table source for $changelog virtual tables
────────────────────────────────────────
File: ChangelogDeserializationSchema.java
Purpose: Deserializes LogRecord → RowData with metadata
────────────────────────────────────────
File: ChangelogRowConverter.java
Purpose: Converts records, adds _change_type, _log_offset,
commit_timestamp
────────────────────────────────────────
File: RecordToFlinkRowConverter.java
Purpose: Interface for record converters
────────────────────────────────────────
File: FlinkTableFactory.java
Change: Routes to ChangelogFlinkTableSource for changelog
tables
────────────────────────────────────────
File: TableDescriptor.java
Change: Added CHANGELOG
* column name constants
────────────────────────────────────────
File: TableDescriptorValidation.java
Change: Validates reserved column names
Test Coverage
Test: testChangelogVirtualTableSchema
Type: Schema
What it verifies: DESCRIBE shows correct columns
────────────────────────────────────────
File: FlinkCatalog.java
Change: Detects $changelog suffix, builds virtual table
schema

  • Access via SELECT * FROM table$changelog
  • Metadata columns: _change_type (VARCHAR), _log_offset (BIGINT), _commit_timestamp (TIMESTAMP)
  • All records output as INSERT (change type in column)
  • Supports PK tables with change types: +I, -U, +U, -D

Tests

File: ChangelogVirtualTableITCase.java
Purpose: Integration tests (7 tests)
────────────────────────────────────────
File: ChangelogRowConverterTest.java
Purpose: Unit tests for converter
────────────────────────────────────────

API and Format

Documentation

@MehulBatra MehulBatra requested a review from wuchong January 11, 2026 20:13
@wuchong
Copy link
Member

wuchong commented Jan 12, 2026

Great! Thank you @MehulBatra , I will review it.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @MehulBatra for the contribution. I left some comments.

website/versioned_sidebars
website/versions.json
website/pnpm-lock.yaml
/.claude/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be .claude/?

nit: And we can move it near the .vscode/

Comment on lines +212 to +219
assertThat(schemaRows.get(0)).contains("_change_type");
assertThat(schemaRows.get(0)).contains("STRING");
// Flink DESCRIBE shows nullability as 'false' for NOT NULL columns
assertThat(schemaRows.get(0)).contains("false");

assertThat(schemaRows.get(1)).contains("_log_offset");
assertThat(schemaRows.get(1)).contains("BIGINT");
assertThat(schemaRows.get(1)).contains("false");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DESCRIBE result is deterministic, so we should assert the entire output string rather than using contains.

Using contains makes the test intent unclear to readers and risks missing regressions in other parts of the output that should also be validated.

Comment on lines +436 to +438
assertThat(updateResults.get(1)).contains("+U");
assertThat(updateResults.get(1)).contains("Item-1-Updated");
assertThat(updateResults.get(1)).contains("150");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify to assertThat(updateResults.get(0)).contains("-U", "Item-1", "100");

Update others like this.

// Collect delete record
List<String> deleteResult = collectRowsWithTimeout(rowIter, 1, true);
assertThat(deleteResult).hasSize(1);
// Verify the delete record contains the row data (the change type may be -D or -U)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a delete, the change type must be -D, so we should assert the change type here.

import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Integration test for $changelog virtual table functionality. */
abstract class ChangelogVirtualTableITCase extends AbstractTestBase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a thorough test. However, we are facing limited testing resource (the Flink test is reaching to the 1 hour limit). And IT cases are most time-consuming. I think many tests in this IT case are doing duplicated things and we can improve this.

  1. testChangelogVirtualTableWithPrimaryKeyTable and testBasicChangelogScanWithMetadataValidation do the same thing, but testBasicChangelogScanWithMetadataValidation verified more things (metadata). So I think we can remove testChangelogVirtualTableWithPrimaryKeyTable or merge it into testBasicChangelogScanWithMetadataValidation.

  2. testAllChangeTypes() and testBasicChangelogScanWithMetadataValidation both verified all change types. So I think we can change testAllChangeTypes to testProjectionOnChangelogTable to test projections. We can add one more column to the source table, and select _change_type and other 2 columns.

  3. testChangelogVirtualTableConcurrentChanges can be removed as it doesn't test concurrent updates.

  4. testChangelogVirtualTableWithComplexSchema can be merged into testChangelogVirtualTableSchemaIntrospection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, please add tests to verify $changelog also works when scan.startup.mode option is applied, and also works for partitioned table. Take org.apache.fluss.flink.source.FlinkTableSourceITCase#testReadLogTableWithDifferentScanStartupMode as an example.

import java.io.Serializable;

/** Interface for converting Fluss {@link LogRecord} to Flink {@link RowData}. */
public interface RecordToFlinkRowConverter extends Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we will remove PlainRowConverter, there is only one implementation of this interface. Then this interface abstraction looks useless. I think we can remove this interface class. (add Serializable for ChangelogRowConverter if we decide to remove RecordToFlinkRowConverter).

BUCKET_COLUMN_NAME,
CHANGELOG_CHANGE_TYPE_COLUMN,
CHANGELOG_LOG_OFFSET_COLUMN,
CHANGELOG_COMMIT_TIMESTAMP_COLUMN)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests for the columns in org.apache.fluss.client.admin.FlussAdminITCase#testSystemsColumns

package org.apache.fluss.flink.source;

/** IT case for {@link ChangelogVirtualTableITCase} in Flink 1.20. */
public class Flink120ChangelogVirtualTableITCase extends ChangelogVirtualTableITCase {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create ITCases for all Flink versions to test compatibilites?

Comment on lines +404 to +408
// Parse the row to validate structure
String[] parts = result.substring(3, result.length() - 1).split(", ", 6);

// Validate change type column
assertThat(parts[0]).isEqualTo("+I");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can set ManualClock to the FlussClusterExtension to control the commit timestamp by setting the current time CLOCK.advanceTime(xxxxx, TimeUnit.MILLISECONDS), see the usage in FlinkTableSourceITCase.

In this way, we can assert the changelog result by comparing the whole row string. (set bucket to 1 to have consistent log_offset numbers)

/**
* Creates a virtual $changelog table by modifying the base table's to include metadata columns.
*/
private CatalogBaseTable getVirtualChangelogTable(ObjectPath objectPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add tests in FlinkCatalogITCase to assert the CatalogTable got from a $changelog table.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[flink] Basic $changelog read support without pushdown optimizations for primary key table

2 participants