Skip to content

[feature](dynamic table) support stream part 2: stream query & comsuption infrastructure#62453

Merged
morningman merged 9 commits into
apache:masterfrom
TsukiokaKogane:support_table_stream.part_comsume
May 8, 2026
Merged

[feature](dynamic table) support stream part 2: stream query & comsuption infrastructure#62453
morningman merged 9 commits into
apache:masterfrom
TsukiokaKogane:support_table_stream.part_comsume

Conversation

@TsukiokaKogane
Copy link
Copy Markdown
Contributor

What problem does this PR solve?

Issue Number: close #xxx

Related PR: #57921

Problem Summary:

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@TsukiokaKogane TsukiokaKogane force-pushed the support_table_stream.part_comsume branch from d35191b to df6b262 Compare April 13, 2026 13:35
@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

1 similar comment
@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 17.15% (53/309) 🎉
Increment coverage report
Complete coverage report

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java: stream offsets are mutated inside commitTransaction() before the transaction state is journaled. If FE crashes after checkAndUpdateStreamOffset(transactionState) but before OP_UPSERT_TRANSACTION_STATE is durably written, the stream offset advancement is lost while the source transaction is retried, so the same data can be consumed again after failover. This breaks the transaction/persistence checkpoint from the review guide because the metadata mutation has no replay-equivalent durable state transition.
  2. fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java: every TableStreamUpdateInfo is created with nextOffset = Collections.emptyMap(), but OlapTableStream.checkStreamOffset() now rejects any normal partition entry that has no nextOffset. As soon as an insert reads from a non-historical stream partition, commit will fail with nextOffset missing, so the new stream-consumption path cannot successfully commit.

Critical checkpoints:

  • Goal / correctness: Not satisfied. The PR intends to add stream query + consumption infrastructure, but the new FE commit path cannot successfully advance normal stream offsets, and its offset state is not crash-safe.
  • Scope / minimality: Reasonable overall, but the commit-path changes need to be completed so the persistence model stays correct.
  • Concurrency / locking: Table-stream write locks are taken in ID order, which is good, but the protected state is mutated before durable journaling, so failover safety is still broken.
  • Lifecycle / replay: Fails. The stream offset mutation has no replay-equivalent state transition if the master dies before the transaction upsert is persisted.
  • Config / compatibility: No new config or rolling-upgrade blocker identified in the reviewed code.
  • Parallel paths: I did not find another producer of nextOffset, so the current insert path appears incomplete for normal stream partitions.
  • Conditional checks: The new nextOffset validation is fine in isolation, but it is inconsistent with the only caller added in this PR.
  • Test coverage: I did not see regression/unit coverage for successful stream insert commit or FE restart/replay behavior; both are needed here.
  • Observability: Existing exceptions include ids, which is acceptable, but observability does not compensate for the correctness issues above.
  • Transaction / persistence: Blocking issue. The write-side mutation and replay semantics are not equivalent.
  • Data write / atomicity: Blocking issue. Stream offset advancement is not atomic with transaction durability.
  • FE/BE variable passing: No additional blocker identified beyond the commit-path issues above.
  • Performance: No blocking performance issue identified in the reviewed code.
  • Other issues: None beyond the two blockers above.

Comment thread fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java Outdated
@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking findings:

  1. nextOffset is never populated for stream scans, so the new commit-time validation will reject normal stream inserts once they hit the partitionOffset path.
  2. The scan path uses the stored stream offset as the scan snapshot version, which rereads the old snapshot instead of the newly arrived data.
  3. The transaction-model/sub-transaction commit path still skips the new stream-offset validation/locking, so concurrent consumers can double-commit the same stream offsets there.

Critical checkpoints:

  • Goal of the task: Not met yet. The PR aims to support stream query/consumption infrastructure, but the current implementation either fails commits, rereads already-consumed snapshots, or misses exactly-once protection on one commit path.
  • Modification size/focus: Reasonably focused, but correctness gaps span planner and transaction layers.
  • Concurrency/locking: Not correct on all applicable paths. Single-table commit expands locking, but the sub-transaction path still omits stream-table locking/validation.
  • Special lifecycle/static init: No blocking lifecycle issue found.
  • Configuration changes: No new config item added.
  • Compatibility/incompatible changes: No blocking compatibility issue identified.
  • Parallel code paths: Not handled consistently; single-table commit was updated, sub-transaction commit was not.
  • Special conditional checks: The new checks depend on nextOffset, but that value is never prepared.
  • Test coverage: Insufficient. Current tests only validate FE plan shape; they do not cover real insert/commit behavior, transaction-model commits, or end-to-end incremental semantics.
  • Observability: No additional blocking observability issue found.
  • Transaction/persistence: Risk remains. Offset advancement is wired into transaction state, but commit validation is incomplete and scan semantics are wrong.
  • Data writes/modifications: Not safe yet; exactly-once consumption can fail or duplicate.
  • FE/BE variable passing: Incomplete for incremental semantics; only a snapshot version is sent, no lower-bound offset.
  • Performance: No separate blocking performance issue identified beyond the correctness bugs above.
  • Other issues: None beyond the blocking findings above.

Comment thread fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java Outdated
@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 43.57% (149/342) 🎉
Increment coverage report
Complete coverage report

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Findings:

  1. fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java: the new stream-update path never populates nextOffset, so after a show_initial_rows=true stream is consumed once, the next incremental consume will fail in DatabaseTransactionMgr.checkStreamOffset() with nextOffset missing ... for every normal partition.
  2. fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java: partitionOffset is later used as the scan visible-version upper bound (OlapScanNode.addScanRangeLocations()), but the historical->normal transition stores the old consumed version back into partitionOffset. After new data arrives, the next stream read still scans the old snapshot and replays historical rows instead of the new append.

Critical checkpoint conclusions:

  • Goal of the task: add stream query plus consumption infrastructure. The planner/binding pieces are in place, but the end-to-end consume semantics are not correct yet because stream offsets do not advance safely. Existing tests do not cover repeated consumption after the first successful consume or after subsequent base-table writes.
  • Modification size/focus: broad change across catalog, Nereids, planner, and transaction management. The scope is understandable for the feature, but the offset state machine currently spans too many layers without a validating end-to-end test.
  • Concurrency: applicable. The new commit path does sort stream tables into the table-lock list, so lock ordering looks intentional, but the functional stream-state transition performed under commit still violates correctness.
  • Lifecycle/static init: applicable but no blocking lifecycle or static-init issue found in the reviewed paths.
  • Configuration: no new config items added.
  • Compatibility: no obvious FE/BE protocol or storage-format incompatibility introduced by these lines.
  • Parallel code paths: applicable. Both query planning and transaction commit paths were updated, but the repeated-consume path is still broken.
  • Special conditional checks: no additional blocker here beyond the offset transition logic above.
  • Test coverage: insufficient for the new behavior. UT/regression cases cover plan shape and one consume flow, but miss the two blocking sequences above.
  • Observability: no major blocker, but offset-transition failures currently rely on generic commit errors only.
  • Transaction/persistence: applicable. streamUpdateInfos are persisted on TransactionState, but the persisted values are not semantically correct for later stream reads/commits.
  • Data writes/modifications: applicable and currently not correct for stream consumption progress, so this checkpoint fails.
  • FE/BE variable passing: applicable. The new partitionOffset is threaded into the FE planner/scan path used here; no separate FE->BE field addition was needed in the reviewed code.
  • Performance: no blocking performance issue stood out versus the correctness problems above.
  • Other issues: none beyond the blocking findings above.

Overall opinion: request changes until stream offsets can advance correctly across repeated consumes and the missing end-to-end tests are added.

Comment thread fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java Outdated
Comment thread fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java Outdated
Comment thread fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java Outdated
Comment thread fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java Outdated
Comment thread fe/fe-catalog/src/main/java/org/apache/doris/catalog/Column.java
Comment thread fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java Outdated
@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found two blocking issues.

  1. TableStreamUpdateInfo.update is persisted through TransactionState, but it is declared as the abstract type AbstractTableStreamUpdate without any Gson runtime type adapter. That breaks EditLog/checkpoint replay once a transaction with stream updates is journaled.
  2. The new stream-update state is only consumed in the shared-nothing transaction manager. The cloud commit path still never locks/checks/applies streamUpdateInfos, so stream offset validation can be bypassed in cloud mode.

Critical checkpoint conclusions:

  • Goal of current task: Partially accomplished. Query planning and basic consumption flow are implemented, but transaction correctness is incomplete.
  • Modification size/focus: Reasonably focused, but the transaction integration is incomplete across parallel commit paths.
  • Concurrency: Local FE path adds sorted lock acquisition for stream tables, but cloud mode does not participate in the same locking/validation flow.
  • Lifecycle/static init: No special lifecycle issue found beyond persisted polymorphic state.
  • Configuration: No new config involved.
  • Compatibility/incompatible changes: A new persisted transaction payload was added, but replay compatibility is incomplete because the abstract subtype cannot be reconstructed.
  • Parallel code paths: Not all applicable paths were updated; cloud transaction commit is missing the new logic.
  • Special conditional checks: No blocking issue here.
  • Test coverage: FE UT and regression cover plan/history behavior, but not replay/failover or cloud transaction behavior.
  • Observability: No blocking issue here.
  • Transaction/persistence: Blocking issues remain in journal replay and commit-path coverage.
  • Data writes/modifications: Stream-offset advancement is not guaranteed in cloud mode.
  • FE/BE variable passing: No issue found.
  • Performance: No blocking performance issue found.
  • Other issues: None beyond the two blockers above.

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking findings:

  1. fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java: default/non-historical stream consumption is still incorrect. The scan version for appended data is derived from the already-consumed offset, and the commit path writes that same value back, so a stream created with show_initial_rows = false can get stuck on its creation snapshot and never advance to newly committed rows.
  2. fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java: the stream offset override is only applied in the non-cloud path. With Config.isCloudMode() && Config.enable_cloud_snapshot_version, updateScanRangeVersions() still installs the latest partition visible version and ignores the stream snapshot, so cloud stream reads do not honor the committed stream offset.

Critical checkpoint conclusions:

  • Goal of the task: Partially achieved. Historical stream query/planning and virtual-column plumbing are added, but core stream consumption semantics are still wrong on the default append path, and the cloud snapshot path is inconsistent.
  • Small/clear/focused change: Mostly focused, but the exposed behavior is not yet correct end-to-end.
  • Concurrency review: Transaction commit/update stays under table locks and the added stream tables are lock-ordered by (dbId, tableId); I did not find a new deadlock issue in the locking changes.
  • Lifecycle/static initialization: No special lifecycle or static-init issue identified.
  • Configuration items: No new config items added.
  • Compatibility: No storage-format or API compatibility blocker identified in this review.
  • Parallel code paths: Not fully handled. The non-cloud scan path was updated, but the cloud snapshot path was not.
  • Special conditional checks: The new conditional in OlapScanNode.addScanRangeLocations() is insufficient because the cloud path bypasses it.
  • Test coverage: Added FE UT and regression coverage target historical (show_initial_rows = true) behavior and virtual columns, but they do not cover the default/non-historical consumption path or cloud snapshot mode; that is why the blocking behavior above slips through.
  • Observability: No additional observability issue identified beyond the correctness bugs above.
  • Transaction/persistence: streamUpdateInfos appear to be persisted/replayed through TransactionState correctly, but the semantic value being stored/validated for append consumption is incorrect.
  • Data write/modification correctness: Not correct yet for append stream consumption; consumed offsets do not advance to the newly read version.
  • FE/BE variable passing: The new stream update is plumbed through the planner to OlapScanNode, but the cloud snapshot execution path still misses it.
  • Performance: No major performance blocker found; correctness issues dominate.
  • Other issues: None beyond the blockers above.

Comment thread fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java Outdated
Comment thread fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@TsukiokaKogane TsukiokaKogane force-pushed the support_table_stream.part_comsume branch from dee776e to 0835d16 Compare April 15, 2026 11:38
@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Findings:

  1. fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java: the normal incremental path stores next as the already-consumed partitionOffset, so a stream created with show_initial_rows = false never advances to later visible data. hasData() becomes true when the base partition version moves forward, but OlapScanNode still scans the old snapshot version from streamUpdate.next, and commit writes the same offset back again.
  2. fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java: the new stream-version override is only applied in the non-cloud scan-range path. In cloud mode, ScanNode.setVisibleVersionForOlapScanNodes() later overwrites scan ranges from a global partition-id map built from CloudPartition snapshots, so stream scans ignore the stored offset and read the latest table snapshot instead.

Critical checkpoint conclusions:

  • Goal of current task: support stream query and stream consumption for OLAP table streams. Conclusion: not fully accomplished because the incremental consumption version is incorrect on the main path, and cloud mode does not preserve stream offsets. Existing tests do not demonstrate either case.
  • Scope/minimality: the change is reasonably focused on FE stream/query/txn wiring, but it spans analysis, planning, planner, transaction, and persistence paths; the remaining blockers are in the core execution path rather than polish.
  • Concurrency: the new stream-table locking looks consistent with FE guidance. Stream tables are added into ID-sorted locking for commit/replay, and I did not find a separate deadlock issue in the new code.
  • Lifecycle/static initialization: no special lifecycle or static-init issue found.
  • Configuration changes: none.
  • Compatibility/persistence: Gson subtype registration and transaction-state persistence wiring are present, so the new payload appears serializable/replayable. The blocking issue is the wrong offset value being persisted/consumed, not missing journal integration.
  • Parallel code paths: blocker. Cloud and non-cloud scan-version paths are no longer functionally equivalent for stream scans.
  • Special condition checks: there are explicit TODOs in the offset logic, and one of them corresponds to a live correctness gap in the non-historical incremental path.
  • Test coverage: regression and explain tests were added, but there is still no end-to-end coverage for consuming rows appended after stream creation, and no cloud-path coverage.
  • Test result changes: no incorrect .out result was proven from the diff alone, but the added tests do not exercise the blocking scenarios above.
  • Observability: no blocking observability issue found.
  • Transaction/persistence modifications: commit/replay hooks exist and stream tables are included in lock sets, but they operate on an incorrect version value for incremental scans.
  • Data write/modification correctness: not preserved for stream-consumption metadata because successful commits can leave the source stream offset stale.
  • FE/BE variable passing: streamUpdate is passed into OlapScanNode, but the later cloud visible-version propagation drops that per-scan override.
  • Performance: no primary blocking performance finding.
  • Other issues: none beyond the blockers above.

Comment thread fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java Outdated
Comment thread fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 74.65% (265/355) 🎉
Increment coverage report
Complete coverage report

@TsukiokaKogane TsukiokaKogane force-pushed the support_table_stream.part_comsume branch 2 times, most recently from 4e1ddde to 928e4bd Compare April 16, 2026 13:24
@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run build all

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

Copy link
Copy Markdown
Contributor

@yujun777 yujun777 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@morningman
Copy link
Copy Markdown
Contributor

run buildall

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run buildall

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

/review

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run p0

@TsukiokaKogane
Copy link
Copy Markdown
Contributor Author

run cloud_p0

@morningman morningman merged commit 6e6729c into apache:master May 8, 2026
28 of 30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants