[feature](dynamic table) support stream part 2: stream query & comsuption infrastructure#62453
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
d35191b to
df6b262
Compare
|
run buildall |
1 similar comment
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
|
/review |
There was a problem hiding this comment.
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java: stream offsets are mutated insidecommitTransaction()before the transaction state is journaled. If FE crashes aftercheckAndUpdateStreamOffset(transactionState)but beforeOP_UPSERT_TRANSACTION_STATEis durably written, the stream offset advancement is lost while the source transaction is retried, so the same data can be consumed again after failover. This breaks the transaction/persistence checkpoint from the review guide because the metadata mutation has no replay-equivalent durable state transition.fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java: everyTableStreamUpdateInfois created withnextOffset = Collections.emptyMap(), butOlapTableStream.checkStreamOffset()now rejects any normal partition entry that has nonextOffset. As soon as an insert reads from a non-historical stream partition, commit will fail withnextOffset missing, so the new stream-consumption path cannot successfully commit.
Critical checkpoints:
- Goal / correctness: Not satisfied. The PR intends to add stream query + consumption infrastructure, but the new FE commit path cannot successfully advance normal stream offsets, and its offset state is not crash-safe.
- Scope / minimality: Reasonable overall, but the commit-path changes need to be completed so the persistence model stays correct.
- Concurrency / locking: Table-stream write locks are taken in ID order, which is good, but the protected state is mutated before durable journaling, so failover safety is still broken.
- Lifecycle / replay: Fails. The stream offset mutation has no replay-equivalent state transition if the master dies before the transaction upsert is persisted.
- Config / compatibility: No new config or rolling-upgrade blocker identified in the reviewed code.
- Parallel paths: I did not find another producer of
nextOffset, so the current insert path appears incomplete for normal stream partitions. - Conditional checks: The new
nextOffsetvalidation is fine in isolation, but it is inconsistent with the only caller added in this PR. - Test coverage: I did not see regression/unit coverage for successful stream insert commit or FE restart/replay behavior; both are needed here.
- Observability: Existing exceptions include ids, which is acceptable, but observability does not compensate for the correctness issues above.
- Transaction / persistence: Blocking issue. The write-side mutation and replay semantics are not equivalent.
- Data write / atomicity: Blocking issue. Stream offset advancement is not atomic with transaction durability.
- FE/BE variable passing: No additional blocker identified beyond the commit-path issues above.
- Performance: No blocking performance issue identified in the reviewed code.
- Other issues: None beyond the two blockers above.
|
/review |
|
run buildall |
There was a problem hiding this comment.
Blocking findings:
nextOffsetis never populated for stream scans, so the new commit-time validation will reject normal stream inserts once they hit thepartitionOffsetpath.- The scan path uses the stored stream offset as the scan snapshot version, which rereads the old snapshot instead of the newly arrived data.
- The transaction-model/sub-transaction commit path still skips the new stream-offset validation/locking, so concurrent consumers can double-commit the same stream offsets there.
Critical checkpoints:
- Goal of the task: Not met yet. The PR aims to support stream query/consumption infrastructure, but the current implementation either fails commits, rereads already-consumed snapshots, or misses exactly-once protection on one commit path.
- Modification size/focus: Reasonably focused, but correctness gaps span planner and transaction layers.
- Concurrency/locking: Not correct on all applicable paths. Single-table commit expands locking, but the sub-transaction path still omits stream-table locking/validation.
- Special lifecycle/static init: No blocking lifecycle issue found.
- Configuration changes: No new config item added.
- Compatibility/incompatible changes: No blocking compatibility issue identified.
- Parallel code paths: Not handled consistently; single-table commit was updated, sub-transaction commit was not.
- Special conditional checks: The new checks depend on
nextOffset, but that value is never prepared. - Test coverage: Insufficient. Current tests only validate FE plan shape; they do not cover real insert/commit behavior, transaction-model commits, or end-to-end incremental semantics.
- Observability: No additional blocking observability issue found.
- Transaction/persistence: Risk remains. Offset advancement is wired into transaction state, but commit validation is incomplete and scan semantics are wrong.
- Data writes/modifications: Not safe yet; exactly-once consumption can fail or duplicate.
- FE/BE variable passing: Incomplete for incremental semantics; only a snapshot version is sent, no lower-bound offset.
- Performance: No separate blocking performance issue identified beyond the correctness bugs above.
- Other issues: None beyond the blocking findings above.
FE UT Coverage ReportIncrement line coverage |
|
/review |
|
run buildall |
There was a problem hiding this comment.
Findings:
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java: the new stream-update path never populatesnextOffset, so after ashow_initial_rows=truestream is consumed once, the next incremental consume will fail inDatabaseTransactionMgr.checkStreamOffset()withnextOffset missing ...for every normal partition.fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java:partitionOffsetis later used as the scan visible-version upper bound (OlapScanNode.addScanRangeLocations()), but the historical->normal transition stores the old consumed version back intopartitionOffset. After new data arrives, the next stream read still scans the old snapshot and replays historical rows instead of the new append.
Critical checkpoint conclusions:
- Goal of the task: add stream query plus consumption infrastructure. The planner/binding pieces are in place, but the end-to-end consume semantics are not correct yet because stream offsets do not advance safely. Existing tests do not cover repeated consumption after the first successful consume or after subsequent base-table writes.
- Modification size/focus: broad change across catalog, Nereids, planner, and transaction management. The scope is understandable for the feature, but the offset state machine currently spans too many layers without a validating end-to-end test.
- Concurrency: applicable. The new commit path does sort stream tables into the table-lock list, so lock ordering looks intentional, but the functional stream-state transition performed under commit still violates correctness.
- Lifecycle/static init: applicable but no blocking lifecycle or static-init issue found in the reviewed paths.
- Configuration: no new config items added.
- Compatibility: no obvious FE/BE protocol or storage-format incompatibility introduced by these lines.
- Parallel code paths: applicable. Both query planning and transaction commit paths were updated, but the repeated-consume path is still broken.
- Special conditional checks: no additional blocker here beyond the offset transition logic above.
- Test coverage: insufficient for the new behavior. UT/regression cases cover plan shape and one consume flow, but miss the two blocking sequences above.
- Observability: no major blocker, but offset-transition failures currently rely on generic commit errors only.
- Transaction/persistence: applicable.
streamUpdateInfosare persisted onTransactionState, but the persisted values are not semantically correct for later stream reads/commits. - Data writes/modifications: applicable and currently not correct for stream consumption progress, so this checkpoint fails.
- FE/BE variable passing: applicable. The new
partitionOffsetis threaded into the FE planner/scan path used here; no separate FE->BE field addition was needed in the reviewed code. - Performance: no blocking performance issue stood out versus the correctness problems above.
- Other issues: none beyond the blocking findings above.
Overall opinion: request changes until stream offsets can advance correctly across repeated consumes and the missing end-to-end tests are added.
|
/review |
|
run buildall |
There was a problem hiding this comment.
I found two blocking issues.
TableStreamUpdateInfo.updateis persisted throughTransactionState, but it is declared as the abstract typeAbstractTableStreamUpdatewithout any Gson runtime type adapter. That breaks EditLog/checkpoint replay once a transaction with stream updates is journaled.- The new stream-update state is only consumed in the shared-nothing transaction manager. The cloud commit path still never locks/checks/applies
streamUpdateInfos, so stream offset validation can be bypassed in cloud mode.
Critical checkpoint conclusions:
- Goal of current task: Partially accomplished. Query planning and basic consumption flow are implemented, but transaction correctness is incomplete.
- Modification size/focus: Reasonably focused, but the transaction integration is incomplete across parallel commit paths.
- Concurrency: Local FE path adds sorted lock acquisition for stream tables, but cloud mode does not participate in the same locking/validation flow.
- Lifecycle/static init: No special lifecycle issue found beyond persisted polymorphic state.
- Configuration: No new config involved.
- Compatibility/incompatible changes: A new persisted transaction payload was added, but replay compatibility is incomplete because the abstract subtype cannot be reconstructed.
- Parallel code paths: Not all applicable paths were updated; cloud transaction commit is missing the new logic.
- Special conditional checks: No blocking issue here.
- Test coverage: FE UT and regression cover plan/history behavior, but not replay/failover or cloud transaction behavior.
- Observability: No blocking issue here.
- Transaction/persistence: Blocking issues remain in journal replay and commit-path coverage.
- Data writes/modifications: Stream-offset advancement is not guaranteed in cloud mode.
- FE/BE variable passing: No issue found.
- Performance: No blocking performance issue found.
- Other issues: None beyond the two blockers above.
|
/review |
|
run buildall |
There was a problem hiding this comment.
Blocking findings:
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java: default/non-historical stream consumption is still incorrect. The scan version for appended data is derived from the already-consumed offset, and the commit path writes that same value back, so a stream created withshow_initial_rows = falsecan get stuck on its creation snapshot and never advance to newly committed rows.fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java: the stream offset override is only applied in the non-cloud path. WithConfig.isCloudMode() && Config.enable_cloud_snapshot_version,updateScanRangeVersions()still installs the latest partition visible version and ignores the stream snapshot, so cloud stream reads do not honor the committed stream offset.
Critical checkpoint conclusions:
- Goal of the task: Partially achieved. Historical stream query/planning and virtual-column plumbing are added, but core stream consumption semantics are still wrong on the default append path, and the cloud snapshot path is inconsistent.
- Small/clear/focused change: Mostly focused, but the exposed behavior is not yet correct end-to-end.
- Concurrency review: Transaction commit/update stays under table locks and the added stream tables are lock-ordered by
(dbId, tableId); I did not find a new deadlock issue in the locking changes. - Lifecycle/static initialization: No special lifecycle or static-init issue identified.
- Configuration items: No new config items added.
- Compatibility: No storage-format or API compatibility blocker identified in this review.
- Parallel code paths: Not fully handled. The non-cloud scan path was updated, but the cloud snapshot path was not.
- Special conditional checks: The new conditional in
OlapScanNode.addScanRangeLocations()is insufficient because the cloud path bypasses it. - Test coverage: Added FE UT and regression coverage target historical (
show_initial_rows = true) behavior and virtual columns, but they do not cover the default/non-historical consumption path or cloud snapshot mode; that is why the blocking behavior above slips through. - Observability: No additional observability issue identified beyond the correctness bugs above.
- Transaction/persistence:
streamUpdateInfosappear to be persisted/replayed throughTransactionStatecorrectly, but the semantic value being stored/validated for append consumption is incorrect. - Data write/modification correctness: Not correct yet for append stream consumption; consumed offsets do not advance to the newly read version.
- FE/BE variable passing: The new stream update is plumbed through the planner to
OlapScanNode, but the cloud snapshot execution path still misses it. - Performance: No major performance blocker found; correctness issues dominate.
- Other issues: None beyond the blockers above.
dee776e to
0835d16
Compare
|
/review |
|
run buildall |
There was a problem hiding this comment.
Findings:
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java: the normal incremental path storesnextas the already-consumedpartitionOffset, so a stream created withshow_initial_rows = falsenever advances to later visible data.hasData()becomes true when the base partition version moves forward, butOlapScanNodestill scans the old snapshot version fromstreamUpdate.next, and commit writes the same offset back again.fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java: the new stream-version override is only applied in the non-cloud scan-range path. In cloud mode,ScanNode.setVisibleVersionForOlapScanNodes()later overwrites scan ranges from a global partition-id map built fromCloudPartitionsnapshots, so stream scans ignore the stored offset and read the latest table snapshot instead.
Critical checkpoint conclusions:
- Goal of current task: support stream query and stream consumption for OLAP table streams. Conclusion: not fully accomplished because the incremental consumption version is incorrect on the main path, and cloud mode does not preserve stream offsets. Existing tests do not demonstrate either case.
- Scope/minimality: the change is reasonably focused on FE stream/query/txn wiring, but it spans analysis, planning, planner, transaction, and persistence paths; the remaining blockers are in the core execution path rather than polish.
- Concurrency: the new stream-table locking looks consistent with FE guidance. Stream tables are added into ID-sorted locking for commit/replay, and I did not find a separate deadlock issue in the new code.
- Lifecycle/static initialization: no special lifecycle or static-init issue found.
- Configuration changes: none.
- Compatibility/persistence: Gson subtype registration and transaction-state persistence wiring are present, so the new payload appears serializable/replayable. The blocking issue is the wrong offset value being persisted/consumed, not missing journal integration.
- Parallel code paths: blocker. Cloud and non-cloud scan-version paths are no longer functionally equivalent for stream scans.
- Special condition checks: there are explicit TODOs in the offset logic, and one of them corresponds to a live correctness gap in the non-historical incremental path.
- Test coverage: regression and explain tests were added, but there is still no end-to-end coverage for consuming rows appended after stream creation, and no cloud-path coverage.
- Test result changes: no incorrect
.outresult was proven from the diff alone, but the added tests do not exercise the blocking scenarios above. - Observability: no blocking observability issue found.
- Transaction/persistence modifications: commit/replay hooks exist and stream tables are included in lock sets, but they operate on an incorrect version value for incremental scans.
- Data write/modification correctness: not preserved for stream-consumption metadata because successful commits can leave the source stream offset stale.
- FE/BE variable passing:
streamUpdateis passed intoOlapScanNode, but the later cloud visible-version propagation drops that per-scan override. - Performance: no primary blocking performance finding.
- Other issues: none beyond the blockers above.
FE Regression Coverage ReportIncrement line coverage |
4e1ddde to
928e4bd
Compare
|
/review |
|
run buildall |
|
run buildall |
|
/review |
|
run build all |
|
/review |
|
run buildall |
|
run buildall |
|
run buildall |
|
/review |
|
run buildall |
|
/review |
|
run buildall |
|
/review |
|
run buildall |
|
/review |
|
run p0 |
|
run cloud_p0 |
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #57921
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)