Skip to content

[fix](streaming-job) fix S3 offset and job statistics lost after FE checkpoint restart#62449

Merged
JNSimba merged 7 commits intoapache:masterfrom
JNSimba:fix/streaming-job-s3-offset-checkpoint
Apr 15, 2026
Merged

[fix](streaming-job) fix S3 offset and job statistics lost after FE checkpoint restart#62449
JNSimba merged 7 commits intoapache:masterfrom
JNSimba:fix/streaming-job-s3-offset-checkpoint

Conversation

@JNSimba
Copy link
Copy Markdown
Member

@JNSimba JNSimba commented Apr 13, 2026

What problem does this PR solve?

Problem Summary:

After FE checkpoint restart, streaming job's S3 offset and partial statistics (fileNumber, fileSize, filteredRows) are lost, causing:

  1. S3 offset lost → duplicate data consumption: S3SourceOffsetProvider did not implement getPersistInfo(), so offsetProviderPersist was always null. The checkpoint image did not contain offset state. After restart from checkpoint image, the job re-consumed all files from the beginning.

  2. Job statistics partially lost: StreamingJobStatistic.fileNumber, fileSize, filteredRows were missing @SerializedName annotations. GsonUtils.GSON uses HiddenAnnotationExclusionStrategy which skips fields without @SerializedName during serialization, causing these fields to reset to 0 after checkpoint restart.

  3. ALTER offset not synced to offsetProviderPersist: When ALTER JOB modified the offset property, offsetProviderPersist was not updated. After FE restart, replayIfNeed() could restore a stale offset.

Root cause: Both issues 1 and 2 share the same root cause — fields that should be persisted in the checkpoint image were not being serialized by GsonUtils.GSON. Issue 3 is a missing sync when altering offset.

Fix

  1. S3 offset persistence (S3SourceOffsetProvider.java):

    • Implement getPersistInfo() to return serialized current offset
    • Implement replayIfNeed(): if currentOffset is already set by EditLog replay (via replayOnCommittedupdateOffset), skip restoring from offsetProviderPersist to avoid overwriting with stale values; only restore from offsetProviderPersist when currentOffset is null (pure checkpoint recovery without subsequent EditLog replay)
  2. Sync offsetProviderPersist on offset updates (StreamingInsertJob.java):

    • Add this.offsetProviderPersist = offsetProvider.getPersistInfo() in updateJobStatisticAndOffset() after updateOffset(). This runs during both normal operation and replay — the checkpoint thread replays journals on its own Env copy and needs this field updated before saveImage()
    • Add the same sync in modifyPropertiesInternal() after ALTER offset update
  3. Statistics serialization (StreamingJobStatistic.java):

    • Add @SerializedName to fileNumber, fileSize, filteredRows

Release note

Fix streaming job S3 offset and statistics (fileNumber, fileSize, filteredRows) lost after FE checkpoint restart, which could cause duplicate data consumption.

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes. S3 streaming job offset is now persisted in checkpoint image via offsetProviderPersist, preventing duplicate data consumption after FE checkpoint restart.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

JNSimba and others added 2 commits April 13, 2026 18:21
…heckpoint restart

1. S3SourceOffsetProvider did not implement getPersistInfo(), so
   offsetProviderPersist was always null. After checkpoint, the S3
   offset was not in the image, causing duplicate data consumption
   on restart.

   Fix: implement getPersistInfo() and replayIfNeed() for S3, and
   sync offsetProviderPersist in updateJobStatisticAndOffset() so
   the checkpoint image always contains the latest offset.

2. StreamingJobStatistic.fileNumber, fileSize, filteredRows are
   missing @SerializedName annotations. GsonUtils.GSON uses
   HiddenAnnotationExclusionStrategy which skips fields without
   @SerializedName during serialization, causing these statistics
   to reset to 0 after checkpoint restart.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Thearas
Copy link
Copy Markdown
Contributor

Thearas commented Apr 13, 2026

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

JNSimba and others added 2 commits April 13, 2026 19:30
When ALTER JOB modifies the offset property, offsetProvider.updateOffset()
was called but offsetProviderPersist was not synced. After FE restart,
replayIfNeed() would restore the stale offset from offsetProviderPersist,
overwriting the ALTER'd offset.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… by EditLog replay

During FE restart, txn replay (replayOnCommitted) may run after ALTER replay
(replayOnUpdated), causing offsetProviderPersist to be overwritten with
the pre-ALTER offset. In replayIfNeed(), if currentOffset is already set
by EditLog replay, skip restoring from offsetProviderPersist to avoid
using a stale value.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 14, 2026

run buildall

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba JNSimba requested a review from Copilot April 14, 2026 02:27
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 14, 2026

/review

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 14, 2026

run buildall

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes streaming insert job state loss after FE checkpoint restart by persisting S3 offsets and ensuring streaming job statistics fields are serialized into the checkpoint image.

Changes:

  • Implement S3 offset persistence and checkpoint-time restore logic in S3SourceOffsetProvider.
  • Sync offsetProviderPersist whenever offsets are updated (normal operation + replay + ALTER JOB offset changes).
  • Add missing @SerializedName annotations for streaming job statistics fields and add a regression test for checkpoint-restart retention.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
regression-test/suites/job_p0/streaming_job/test_streaming_job_checkpoint_restart_fe.groovy Adds regression coverage for streaming job loadStatistic retention across FE checkpoint restart.
fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java Persists current S3 offset into checkpoint image and restores it on checkpoint-only recovery without overwriting newer EditLog state.
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java Ensures fileNumber, fileSize, filteredRows are included in Gson checkpoint serialization.
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java Keeps offsetProviderPersist in sync after offset updates and ALTER offset changes so checkpoint images contain the latest offset.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Findings

  1. fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java: the new currentOffset != null guard is not a safe proxy for "latest state already replayed". currentOffset can also be written by replayOnUpdated() via modifyPropertiesInternal(), and transaction replay is not ordered with job-update replay. If a newer ALTER JOB ... offset has already been replayed into offsetProviderPersist, a later replay of an older txn attachment can still set currentOffset back to the pre-ALTER file; this branch will then skip the persisted ALTER offset and the job can resume from the stale location.

  2. regression-test/suites/job_p0/streaming_job/test_streaming_job_checkpoint_restart_fe.groovy: the new regression only validates loadStatistic after a pure checkpoint restart. It does not assert currentOffset, and it does not cover the checkpoint + ALTER JOB ... offset ordering case that motivated the new S3 replayIfNeed() logic, so the stale-offset regression above can still pass CI.

Critical Checkpoints

  • Goal of current task: Partially accomplished. The statistics serialization fix is covered, but the S3 offset recovery ordering issue is still not correctly handled.
  • Modification size/focus: Yes, the patch is small and focused.
  • Concurrency: No new locks or thread-safety primitives were added, but replay ordering between transaction replay and job-update replay is still incorrect for S3 offset restoration.
  • Lifecycle/static initialization: No special lifecycle or static initialization issue in the touched code.
  • Configuration items: None added.
  • Compatibility: Adding missing serialized fields is backward-compatible; no incompatible FE/BE or storage-format change found.
  • Parallel code paths: Only the S3 path was changed; JDBC/Jdbc TVF keep different replay semantics.
  • Special conditional checks: The new if (currentOffset != null) check is not backed by a reliable invariant and is the source of the main correctness issue.
  • Test coverage: Insufficient for the claimed fix. The new test does not validate offset restoration or the ALTER + checkpoint replay order.
  • Observability: The added INFO logs are adequate for this path.
  • Transaction/persistence: Not correct yet for the S3 replay path; restart order can still restore a stale offset.
  • Data writes/modifications: Risk remains that a restarted job resumes from the wrong S3 file after an offset alteration.
  • FE/BE variable passing: Not applicable.
  • Performance: No significant issue found in the touched lines.
  • Other issues: None beyond the findings above.

Overall opinion: not ready yet because the restart/replay correctness issue for S3 offsets is still open.

…mprove checkpoint test

1. New test: test_streaming_job_alter_offset_checkpoint_restart_fe
   - ALTER offset → wait checkpoint → restart FE → verify offset preserved
   - RESUME → verify job consumes from altered offset

2. Improve test_streaming_job_checkpoint_restart_fe:
   - Wait for SucceedTaskCount >= 2 to ensure all files processed
   - Add filteredRows assertion

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 14, 2026

run buildall

Copy link
Copy Markdown
Contributor

@sollhui sollhui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions
Copy link
Copy Markdown
Contributor

PR approved by anyone and no changes requested.

…eckpoint test

1. Add restoreFromPersistInfo() interface to SourceOffsetProvider with
   default no-op. S3SourceOffsetProvider implements it to restore
   currentOffset from offsetProviderPersist during gsonPostProcess,
   so PAUSED jobs show correct offset after checkpoint restart.

2. Call restoreFromPersistInfo() in gsonPostProcess for TVF providers.

3. New test: test_streaming_job_alter_offset_checkpoint_restart_fe
   - ALTER offset → checkpoint → restart → verify offset preserved

4. Improve test_streaming_job_checkpoint_restart_fe:
   - Wait for SucceedTaskCount >= 2
   - Add filteredRows assertion

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 14, 2026

run buildall

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 14, 2026

run cloud_p0

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 14, 2026

run feut

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

run feut

Copy link
Copy Markdown
Contributor

@liaoxin01 liaoxin01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions
Copy link
Copy Markdown
Contributor

PR approved by at least one committer and no changes requested.

@github-actions github-actions bot added the approved Indicates a PR has been approved by one committer. label Apr 15, 2026
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 15, 2026

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Findings:

  1. Cloud-mode paused S3 jobs can still come back with an empty or stale currentOffset after checkpoint restart. gsonPostProcess() now restores TVF providers from offsetProviderPersist, but the cloud commit path still never refreshes that field, and StreamingJobSchedulerTask only calls replayOnCloudMode() in PENDING, not PAUSED.
  2. filteredRows is still never propagated into txn-backed streaming job statistics. beforeCommitted() only writes scanned rows, load bytes, file count and file size into StreamingTaskTxnCommitAttachment, and updateJobStatisticAndOffset() never increments jobStatistic.filteredRows. Adding @SerializedName("filteredRows") only preserves the default zero, so this part of the stated fix is still incomplete.

Critical checkpoint conclusions:

  • Goal of the task: Partially met. The non-cloud S3 checkpoint/ALTER recovery and fileNumber/fileSize image serialization are addressed, but the cloud paused-job replay path and filteredRows persistence are still incorrect.
  • Scope/minimality: The change is reasonably focused.
  • Concurrency: No new lock-order or thread-safety issue found in the modified code.
  • Lifecycle/replay ordering: Not fully safe yet; image-load restoration now depends on offsetProviderPersist, but not every replay path keeps that state up to date.
  • Configuration: No new configs.
  • Compatibility: No obvious rolling-upgrade or storage-format compatibility break beyond additive Gson fields.
  • Parallel code paths: Not fully covered; the non-cloud S3 path was updated but the cloud S3 commit/recovery path was not.
  • Special conditional checks: The new currentOffset != null guard is understandable, but it does not fix the stale-state case above.
  • Test coverage: Non-cloud restart coverage improved, but there is still no cloud regression coverage and no test that proves a non-zero filteredRows survives restart.
  • Observability: Existing logs are sufficient for these paths.
  • Transaction/persistence correctness: Incomplete for the reasons above.
  • Data write/modification correctness: No BE-side data visibility issue identified in this diff.
  • FE/BE variable passing: Not applicable.
  • Performance: No significant regression found.
  • Other issues: None beyond the findings above.

@JNSimba JNSimba dismissed github-actions[bot]’s stale review April 15, 2026 06:32

offsetProviderPersist only affects the persistence of checkpoint images in BDBJE mode (integrated storage and compute). In compute-delivery separation (cloud mode), the offset exists in the meta service and is restored via replayOnCloudMode(), without using offsetProviderPersist.

@JNSimba JNSimba merged commit f70678a into apache:master Apr 15, 2026
32 of 33 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants