[fix](streaming-job) fix S3 offset and job statistics lost after FE checkpoint restart#62449
Conversation
…heckpoint restart 1. S3SourceOffsetProvider did not implement getPersistInfo(), so offsetProviderPersist was always null. After checkpoint, the S3 offset was not in the image, causing duplicate data consumption on restart. Fix: implement getPersistInfo() and replayIfNeed() for S3, and sync offsetProviderPersist in updateJobStatisticAndOffset() so the checkpoint image always contains the latest offset. 2. StreamingJobStatistic.fileNumber, fileSize, filteredRows are missing @SerializedName annotations. GsonUtils.GSON uses HiddenAnnotationExclusionStrategy which skips fields without @SerializedName during serialization, causing these statistics to reset to 0 after checkpoint restart. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
When ALTER JOB modifies the offset property, offsetProvider.updateOffset() was called but offsetProviderPersist was not synced. After FE restart, replayIfNeed() would restore the stale offset from offsetProviderPersist, overwriting the ALTER'd offset. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… by EditLog replay During FE restart, txn replay (replayOnCommitted) may run after ALTER replay (replayOnUpdated), causing offsetProviderPersist to be overwritten with the pre-ALTER offset. In replayIfNeed(), if currentOffset is already set by EditLog replay, skip restoring from offsetProviderPersist to avoid using a stale value. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run buildall |
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/review |
|
run buildall |
There was a problem hiding this comment.
Pull request overview
Fixes streaming insert job state loss after FE checkpoint restart by persisting S3 offsets and ensuring streaming job statistics fields are serialized into the checkpoint image.
Changes:
- Implement S3 offset persistence and checkpoint-time restore logic in
S3SourceOffsetProvider. - Sync
offsetProviderPersistwhenever offsets are updated (normal operation + replay + ALTER JOB offset changes). - Add missing
@SerializedNameannotations for streaming job statistics fields and add a regression test for checkpoint-restart retention.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/test_streaming_job_checkpoint_restart_fe.groovy | Adds regression coverage for streaming job loadStatistic retention across FE checkpoint restart. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java | Persists current S3 offset into checkpoint image and restores it on checkpoint-only recovery without overwriting newer EditLog state. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java | Ensures fileNumber, fileSize, filteredRows are included in Gson checkpoint serialization. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Keeps offsetProviderPersist in sync after offset updates and ALTER offset changes so checkpoint images contain the latest offset. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Findings
-
fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java: the newcurrentOffset != nullguard is not a safe proxy for "latest state already replayed".currentOffsetcan also be written byreplayOnUpdated()viamodifyPropertiesInternal(), and transaction replay is not ordered with job-update replay. If a newerALTER JOB ... offsethas already been replayed intooffsetProviderPersist, a later replay of an older txn attachment can still setcurrentOffsetback to the pre-ALTER file; this branch will then skip the persisted ALTER offset and the job can resume from the stale location. -
regression-test/suites/job_p0/streaming_job/test_streaming_job_checkpoint_restart_fe.groovy: the new regression only validatesloadStatisticafter a pure checkpoint restart. It does not assertcurrentOffset, and it does not cover the checkpoint +ALTER JOB ... offsetordering case that motivated the new S3replayIfNeed()logic, so the stale-offset regression above can still pass CI.
Critical Checkpoints
- Goal of current task: Partially accomplished. The statistics serialization fix is covered, but the S3 offset recovery ordering issue is still not correctly handled.
- Modification size/focus: Yes, the patch is small and focused.
- Concurrency: No new locks or thread-safety primitives were added, but replay ordering between transaction replay and job-update replay is still incorrect for S3 offset restoration.
- Lifecycle/static initialization: No special lifecycle or static initialization issue in the touched code.
- Configuration items: None added.
- Compatibility: Adding missing serialized fields is backward-compatible; no incompatible FE/BE or storage-format change found.
- Parallel code paths: Only the S3 path was changed; JDBC/Jdbc TVF keep different replay semantics.
- Special conditional checks: The new
if (currentOffset != null)check is not backed by a reliable invariant and is the source of the main correctness issue. - Test coverage: Insufficient for the claimed fix. The new test does not validate offset restoration or the ALTER + checkpoint replay order.
- Observability: The added INFO logs are adequate for this path.
- Transaction/persistence: Not correct yet for the S3 replay path; restart order can still restore a stale offset.
- Data writes/modifications: Risk remains that a restarted job resumes from the wrong S3 file after an offset alteration.
- FE/BE variable passing: Not applicable.
- Performance: No significant issue found in the touched lines.
- Other issues: None beyond the findings above.
Overall opinion: not ready yet because the restart/replay correctness issue for S3 offsets is still open.
…mprove checkpoint test 1. New test: test_streaming_job_alter_offset_checkpoint_restart_fe - ALTER offset → wait checkpoint → restart FE → verify offset preserved - RESUME → verify job consumes from altered offset 2. Improve test_streaming_job_checkpoint_restart_fe: - Wait for SucceedTaskCount >= 2 to ensure all files processed - Add filteredRows assertion Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run buildall |
|
PR approved by anyone and no changes requested. |
…eckpoint test 1. Add restoreFromPersistInfo() interface to SourceOffsetProvider with default no-op. S3SourceOffsetProvider implements it to restore currentOffset from offsetProviderPersist during gsonPostProcess, so PAUSED jobs show correct offset after checkpoint restart. 2. Call restoreFromPersistInfo() in gsonPostProcess for TVF providers. 3. New test: test_streaming_job_alter_offset_checkpoint_restart_fe - ALTER offset → checkpoint → restart → verify offset preserved 4. Improve test_streaming_job_checkpoint_restart_fe: - Wait for SucceedTaskCount >= 2 - Add filteredRows assertion Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run buildall |
|
run cloud_p0 |
|
run feut |
|
run feut |
|
PR approved by at least one committer and no changes requested. |
|
/review |
There was a problem hiding this comment.
Findings:
- Cloud-mode paused S3 jobs can still come back with an empty or stale
currentOffsetafter checkpoint restart.gsonPostProcess()now restores TVF providers fromoffsetProviderPersist, but the cloud commit path still never refreshes that field, andStreamingJobSchedulerTaskonly callsreplayOnCloudMode()inPENDING, notPAUSED. filteredRowsis still never propagated into txn-backed streaming job statistics.beforeCommitted()only writes scanned rows, load bytes, file count and file size intoStreamingTaskTxnCommitAttachment, andupdateJobStatisticAndOffset()never incrementsjobStatistic.filteredRows. Adding@SerializedName("filteredRows")only preserves the default zero, so this part of the stated fix is still incomplete.
Critical checkpoint conclusions:
- Goal of the task: Partially met. The non-cloud S3 checkpoint/ALTER recovery and
fileNumber/fileSizeimage serialization are addressed, but the cloud paused-job replay path andfilteredRowspersistence are still incorrect. - Scope/minimality: The change is reasonably focused.
- Concurrency: No new lock-order or thread-safety issue found in the modified code.
- Lifecycle/replay ordering: Not fully safe yet; image-load restoration now depends on
offsetProviderPersist, but not every replay path keeps that state up to date. - Configuration: No new configs.
- Compatibility: No obvious rolling-upgrade or storage-format compatibility break beyond additive Gson fields.
- Parallel code paths: Not fully covered; the non-cloud S3 path was updated but the cloud S3 commit/recovery path was not.
- Special conditional checks: The new
currentOffset != nullguard is understandable, but it does not fix the stale-state case above. - Test coverage: Non-cloud restart coverage improved, but there is still no cloud regression coverage and no test that proves a non-zero
filteredRowssurvives restart. - Observability: Existing logs are sufficient for these paths.
- Transaction/persistence correctness: Incomplete for the reasons above.
- Data write/modification correctness: No BE-side data visibility issue identified in this diff.
- FE/BE variable passing: Not applicable.
- Performance: No significant regression found.
- Other issues: None beyond the findings above.
offsetProviderPersist only affects the persistence of checkpoint images in BDBJE mode (integrated storage and compute). In compute-delivery separation (cloud mode), the offset exists in the meta service and is restored via replayOnCloudMode(), without using offsetProviderPersist.
What problem does this PR solve?
Problem Summary:
After FE checkpoint restart, streaming job's S3 offset and partial statistics (fileNumber, fileSize, filteredRows) are lost, causing:
S3 offset lost → duplicate data consumption:
S3SourceOffsetProviderdid not implementgetPersistInfo(), sooffsetProviderPersistwas always null. The checkpoint image did not contain offset state. After restart from checkpoint image, the job re-consumed all files from the beginning.Job statistics partially lost:
StreamingJobStatistic.fileNumber,fileSize,filteredRowswere missing@SerializedNameannotations.GsonUtils.GSONusesHiddenAnnotationExclusionStrategywhich skips fields without@SerializedNameduring serialization, causing these fields to reset to 0 after checkpoint restart.ALTER offset not synced to offsetProviderPersist: When
ALTER JOBmodified the offset property,offsetProviderPersistwas not updated. After FE restart,replayIfNeed()could restore a stale offset.Root cause: Both issues 1 and 2 share the same root cause — fields that should be persisted in the checkpoint image were not being serialized by
GsonUtils.GSON. Issue 3 is a missing sync when altering offset.Fix
S3 offset persistence (
S3SourceOffsetProvider.java):getPersistInfo()to return serialized current offsetreplayIfNeed(): ifcurrentOffsetis already set by EditLog replay (viareplayOnCommitted→updateOffset), skip restoring fromoffsetProviderPersistto avoid overwriting with stale values; only restore fromoffsetProviderPersistwhencurrentOffsetis null (pure checkpoint recovery without subsequent EditLog replay)Sync offsetProviderPersist on offset updates (
StreamingInsertJob.java):this.offsetProviderPersist = offsetProvider.getPersistInfo()inupdateJobStatisticAndOffset()afterupdateOffset(). This runs during both normal operation and replay — the checkpoint thread replays journals on its own Env copy and needs this field updated beforesaveImage()modifyPropertiesInternal()after ALTER offset updateStatistics serialization (
StreamingJobStatistic.java):@SerializedNametofileNumber,fileSize,filteredRowsRelease note
Fix streaming job S3 offset and statistics (fileNumber, fileSize, filteredRows) lost after FE checkpoint restart, which could cause duplicate data consumption.
Check List (For Author)
Test
Behavior changed:
offsetProviderPersist, preventing duplicate data consumption after FE checkpoint restart.Does this need documentation?
Check List (For Reviewer who merge this PR)