From 2db4acbd2d06c1da77a5c649b7f5a638f896f440 Mon Sep 17 00:00:00 2001 From: Dhriti Chopra Date: Wed, 20 May 2026 15:06:40 +0000 Subject: [PATCH 1/7] feat(storage): Adding CumulativeHasher wrapper class for Full object checksum --- .../cloud/storage/CumulativeHasher.java | 152 ++++++++++++++++++ .../java/com/google/cloud/storage/Hasher.java | 5 +- 2 files changed, 155 insertions(+), 2 deletions(-) create mode 100644 java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/CumulativeHasher.java diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/CumulativeHasher.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/CumulativeHasher.java new file mode 100644 index 000000000000..ad4b5a2360e5 --- /dev/null +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/CumulativeHasher.java @@ -0,0 +1,152 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; +import com.google.protobuf.ByteString; +import io.grpc.Status.Code; +import com.google.storage.v2.Object; +import java.nio.ByteBuffer; +import java.util.OptionalLong; +import java.util.function.Supplier; + +/** + * A wrapper around hasher that accumulates checksums and validates them at the + * end of the read if it was a full object read. + */ +final class CumulativeHasher implements Hasher { + private final Hasher delegate; + private final long startOffset; + private final OptionalLong limit; + private Crc32cLengthKnown cumulativeHash; + + CumulativeHasher(Hasher delegate, long startOffset, OptionalLong limit) { + this.delegate = delegate; + this.startOffset = startOffset; + this.limit = limit; + this.cumulativeHash = Crc32cValue.zero(); + } + + @Override + public Crc32cLengthKnown hash(ByteBuffer b) { + return delegate.hash(b); + } + + @Override + public Crc32cLengthKnown hash(ByteString byteString) { + return delegate.hash(byteString); + } + + @Override + public void validate(Crc32cValue expected, Supplier b) throws ChecksumMismatchException { + ByteBuffer byteBuffer = b.get(); + Crc32cLengthKnown actual = delegate.hash(byteBuffer); + if (actual != null) { + if (expected != null && !actual.eqValue(expected)) { + throw new ChecksumMismatchException(expected, actual); + } + accumulate(actual); + } + } + + @Override + public void validate(Crc32cValue expected, ByteString byteString) throws ChecksumMismatchException { + Crc32cLengthKnown actual = delegate.hash(byteString); + if (actual != null) { + if (expected != null && !actual.eqValue(expected)) { + throw new ChecksumMismatchException(expected, actual); + } + accumulate(actual); + } + } + + @Override + public void validateUnchecked(Crc32cValue expected, ByteString byteString) + throws UncheckedChecksumMismatchException { + Crc32cLengthKnown actual = delegate.hash(byteString); + if (actual != null) { + if (expected != null && !actual.eqValue(expected)) { + throw new UncheckedChecksumMismatchException(expected, actual); + } + accumulate(actual); + } + } + + @Override + public > C nullSafeConcat(C r1, Crc32cLengthKnown r2) { + return delegate.nullSafeConcat(r1, r2); + } + + @Override + public Crc32cLengthKnown initialValue() { + return delegate.initialValue(); + } + + // Checks if it was a full object read. + boolean qualifiesForVerification(Object metadata) { + return startOffset == 0 + && metadata != null + && metadata.hasChecksums() + && metadata.getChecksums().hasCrc32C() + && (!limit.isPresent() || limit.getAsLong() >= metadata.getSize()); + } + + void validateCumulativeChecksum(Object metadata) throws UncheckedCumulativeChecksumMismatchException { + if (qualifiesForVerification(metadata)) { + Crc32cValue expected = Crc32cValue.of(metadata.getChecksums().getCrc32C()); + Crc32cLengthKnown actual = getCumulativeHash(); + if (!actual.eqValue(expected)) { + throw new UncheckedCumulativeChecksumMismatchException(expected, actual); + } + } + } + + private void accumulate(Crc32cLengthKnown actual) { + cumulativeHash = cumulativeHash.concat(actual); + } + + Crc32cLengthKnown getCumulativeHash() { + return cumulativeHash; + } +} + +class UncheckedCumulativeChecksumMismatchException extends com.google.api.gax.rpc.DataLossException { + private static final GrpcStatusCode STATUS_CODE = GrpcStatusCode.of(Code.DATA_LOSS); + private final Crc32cValue expected; + private final Crc32cLengthKnown actual; + + UncheckedCumulativeChecksumMismatchException(Crc32cValue expected, Crc32cLengthKnown actual) { + super( + String.format( + "Mismatch cumulative checksum value. Expected %s actual %s", + expected.debugString(), actual.debugString()), + /* cause= */ null, + STATUS_CODE, + /* retryable= */ false); + this.expected = expected; + this.actual = actual; + } + + Crc32cValue getExpected() { + return expected; + } + + Crc32cLengthKnown getActual() { + return actual; + } +} diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java index c1b506de2f7e..59f90fec0601 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java @@ -212,7 +212,7 @@ final class ChecksumMismatchException extends IOException { private final Crc32cValue expected; private final Crc32cLengthKnown actual; - private ChecksumMismatchException(Crc32cValue expected, Crc32cLengthKnown actual) { + ChecksumMismatchException(Crc32cValue expected, Crc32cLengthKnown actual) { super( String.format( Locale.US, @@ -237,7 +237,7 @@ final class UncheckedChecksumMismatchException extends DataLossException { private final Crc32cValue expected; private final Crc32cLengthKnown actual; - private UncheckedChecksumMismatchException(Crc32cValue expected, Crc32cLengthKnown actual) { + UncheckedChecksumMismatchException(Crc32cValue expected, Crc32cLengthKnown actual) { super( String.format( "Mismatch checksum value. Expected %s actual %s", @@ -258,3 +258,4 @@ Crc32cLengthKnown getActual() { } } } + From 351729c281b051401fa3e3d84531f0135c4d0ad3 Mon Sep 17 00:00:00 2001 From: Dhriti Chopra Date: Mon, 25 May 2026 14:53:36 +0000 Subject: [PATCH 2/7] Fixing lint and small updates --- .../cloud/storage/CumulativeHasher.java | 23 ++++++++++++------- .../java/com/google/cloud/storage/Hasher.java | 1 - 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/CumulativeHasher.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/CumulativeHasher.java index ad4b5a2360e5..1614a3595a75 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/CumulativeHasher.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/CumulativeHasher.java @@ -19,15 +19,16 @@ import com.google.api.gax.grpc.GrpcStatusCode; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; import com.google.protobuf.ByteString; -import io.grpc.Status.Code; import com.google.storage.v2.Object; +import io.grpc.Status.Code; import java.nio.ByteBuffer; +import java.util.Locale; import java.util.OptionalLong; import java.util.function.Supplier; /** - * A wrapper around hasher that accumulates checksums and validates them at the - * end of the read if it was a full object read. + * A wrapper around hasher that accumulates checksums and validates them at the end of the read if + * it was a full object read. */ final class CumulativeHasher implements Hasher { private final Hasher delegate; @@ -53,7 +54,8 @@ public Crc32cLengthKnown hash(ByteString byteString) { } @Override - public void validate(Crc32cValue expected, Supplier b) throws ChecksumMismatchException { + public void validate(Crc32cValue expected, Supplier b) + throws ChecksumMismatchException { ByteBuffer byteBuffer = b.get(); Crc32cLengthKnown actual = delegate.hash(byteBuffer); if (actual != null) { @@ -65,7 +67,8 @@ public void validate(Crc32cValue expected, Supplier b) throws Che } @Override - public void validate(Crc32cValue expected, ByteString byteString) throws ChecksumMismatchException { + public void validate(Crc32cValue expected, ByteString byteString) + throws ChecksumMismatchException { Crc32cLengthKnown actual = delegate.hash(byteString); if (actual != null) { if (expected != null && !actual.eqValue(expected)) { @@ -106,7 +109,8 @@ boolean qualifiesForVerification(Object metadata) { && (!limit.isPresent() || limit.getAsLong() >= metadata.getSize()); } - void validateCumulativeChecksum(Object metadata) throws UncheckedCumulativeChecksumMismatchException { + void validateCumulativeChecksum(Object metadata) + throws UncheckedCumulativeChecksumMismatchException { if (qualifiesForVerification(metadata)) { Crc32cValue expected = Crc32cValue.of(metadata.getChecksums().getCrc32C()); Crc32cLengthKnown actual = getCumulativeHash(); @@ -125,7 +129,8 @@ Crc32cLengthKnown getCumulativeHash() { } } -class UncheckedCumulativeChecksumMismatchException extends com.google.api.gax.rpc.DataLossException { +final class UncheckedCumulativeChecksumMismatchException + extends com.google.api.gax.rpc.DataLossException { private static final GrpcStatusCode STATUS_CODE = GrpcStatusCode.of(Code.DATA_LOSS); private final Crc32cValue expected; private final Crc32cLengthKnown actual; @@ -133,8 +138,10 @@ class UncheckedCumulativeChecksumMismatchException extends com.google.api.gax.rp UncheckedCumulativeChecksumMismatchException(Crc32cValue expected, Crc32cLengthKnown actual) { super( String.format( + Locale.US, "Mismatch cumulative checksum value. Expected %s actual %s", - expected.debugString(), actual.debugString()), + expected.debugString(), + actual.debugString()), /* cause= */ null, STATUS_CODE, /* retryable= */ false); diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java index 59f90fec0601..02dfe9efef41 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java @@ -258,4 +258,3 @@ Crc32cLengthKnown getActual() { } } } - From 72af919d11ffbcf0a46ff3767f9a37bf3998acde Mon Sep 17 00:00:00 2001 From: Dhriti Chopra Date: Mon, 25 May 2026 15:08:52 +0000 Subject: [PATCH 3/7] feat(storage): add full object checksum validation for grpc flow --- .../GapicUnbufferedReadableByteChannel.java | 18 +- ...apicUnbufferedReadableByteChannelTest.java | 426 ++++++++++++++++++ 2 files changed, 443 insertions(+), 1 deletion(-) diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index cef751213c6d..776cb13e2ebf 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -33,6 +33,7 @@ import com.google.cloud.storage.Hasher.UncheckedChecksumMismatchException; import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef; import com.google.cloud.storage.Retrying.Retrier; +import java.util.OptionalLong; import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel; import com.google.common.base.Suppliers; import com.google.protobuf.ByteString; @@ -91,7 +92,9 @@ final class GapicUnbufferedReadableByteChannel this.result = result; this.read = read; this.req = req; - this.hasher = hasher; + this.hasher = (req.getReadOffset() == 0) + ? new CumulativeHasher(hasher, 0, req.getReadLimit() <= 0 ? OptionalLong.empty() : OptionalLong.of(req.getReadLimit())) + : hasher; this.fetchOffset = new AtomicLong(req.getReadOffset()); this.blobOffset = req.getReadOffset(); this.retrier = retrier; @@ -174,6 +177,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { } if (take == EOF_MARKER) { complete = true; + validateCumulativeChecksum(); break; } @@ -311,6 +315,18 @@ private IOException createError(String message) throws IOException { return new IOException(message, cause); } + private void validateCumulativeChecksum() throws IOException { + if (hasher instanceof CumulativeHasher) { + CumulativeHasher cumulativeHasher = (CumulativeHasher) hasher; + try { + cumulativeHasher.validateCumulativeChecksum(metadata); + } catch (UncheckedCumulativeChecksumMismatchException exception) { + throw new IOException(StorageException.coalesce(exception)); + } + } + } + + private final class ReadObjectObserver extends StateCheckingResponseObserver { private final SettableApiFuture open = SettableApiFuture.create(); diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java index 27d96ef6f06f..44553923ca54 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.storage.TestUtils.xxd; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ApiCallContext; @@ -26,6 +27,8 @@ import com.google.cloud.storage.GrpcUtils.ZeroCopyServerStreamingCallable; import com.google.cloud.storage.Retrying.Retrier; import com.google.cloud.storage.it.ChecksummedTestContent; +import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; import com.google.storage.v2.ReadObjectRequest; import com.google.storage.v2.ReadObjectResponse; import java.io.IOException; @@ -75,4 +78,427 @@ public void call( assertThat(close.get()).isTrue(); } } + + @Test + public void validateCumulativeChecksum_success() throws IOException { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object metadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(testContent.getCrc32c()) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(testContent.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + int read = (int) c.read(new ByteBuffer[] {buffer}, 0, 1); + assertThat(read).isEqualTo(testContent.length()); + assertThat(xxd(buffer)).isEqualTo(xxd(testContent.getBytes())); + } + } + + @Test + public void validateCumulativeChecksum_failure() throws IOException { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object metadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(testContent.getCrc32c() + 1) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(testContent.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + IOException exception = assertThrows(IOException.class, () -> c.read(buffer)); + assertThat(exception.getCause()).isInstanceOf(StorageException.class); + assertThat(exception.getCause().getCause()).isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); + } + } + + @Test + public void validateCumulativeChecksum_skippedForRangedRead() throws IOException { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object metadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(testContent.getCrc32c() + 1) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + ReadObjectRequest req = ReadObjectRequest.newBuilder().setReadLimit(5).build(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(testContent.slice(0, 5).asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + req, + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + int read = (int) c.read(buffer); + assertThat(read).isEqualTo(5); + assertThat(xxd(buffer)).isEqualTo(xxd(testContent.slice(0, 5).getBytes())); + } + } + + @Test + public void validateCumulativeChecksum_multipleChunks_success() throws IOException { + ChecksummedTestContent chunk1 = ChecksummedTestContent.of("abcde".getBytes()); + ChecksummedTestContent chunk2 = ChecksummedTestContent.of("fghij".getBytes()); + ChecksummedTestContent chunk3 = ChecksummedTestContent.of("klmno".getBytes()); + byte[] fullBytes = "abcdefghijklmno".getBytes(); + ChecksummedTestContent fullContent = ChecksummedTestContent.of(fullBytes); + + Object metadata = + Object.newBuilder() + .setSize(fullContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(fullContent.getCrc32c()) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + new Thread(() -> { + try { + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk1.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk2.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk3.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } catch (Throwable t) { + respond.onError(t); + } + }).start(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(20); + int read = (int) c.read(new ByteBuffer[] {buffer}, 0, 1); + assertThat(read).isEqualTo(15); + assertThat(xxd(buffer)).isEqualTo(xxd(fullContent.getBytes())); + } + } + + @Test + public void validateCumulativeChecksum_multipleChunks_failure() throws IOException { + ChecksummedTestContent chunk1 = ChecksummedTestContent.of("abcde".getBytes()); + ChecksummedTestContent chunk2 = ChecksummedTestContent.of("fghij".getBytes()); + ChecksummedTestContent chunk3 = ChecksummedTestContent.of("klmno".getBytes()); + byte[] fullBytes = "abcdefghijklmno".getBytes(); + ChecksummedTestContent fullContent = ChecksummedTestContent.of(fullBytes); + + Object metadata = + Object.newBuilder() + .setSize(fullContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(fullContent.getCrc32c() + 1) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + new Thread(() -> { + try { + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk1.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk2.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk3.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } catch (Throwable t) { + respond.onError(t); + } + }).start(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(20); + IOException exception = assertThrows(IOException.class, () -> { + c.read(new ByteBuffer[] {buffer}, 0, 1); + }); + assertThat(exception.getCause()).isInstanceOf(StorageException.class); + assertThat(exception.getCause().getCause()).isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); + } + } + + @Test + public void validateCumulativeChecksum_metadataMissingCrc32c_skipped() throws IOException { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object metadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums(ObjectChecksums.newBuilder().build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(testContent.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + int read = (int) c.read(buffer); + assertThat(read).isEqualTo(10); + } + } + + @Test + public void validateCumulativeChecksum_nonZeroOffset_skipped() throws IOException { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object metadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(testContent.getCrc32c() + 1) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + ReadObjectRequest req = ReadObjectRequest.newBuilder().setReadOffset(5).build(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(testContent.slice(5, 5).asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + req, + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + int read = (int) c.read(buffer); + assertThat(read).isEqualTo(5); + } + } + + @Test + public void validateCumulativeChecksum_zeroByteObject_success() throws IOException { + Object metadata = + Object.newBuilder() + .setSize(0) + .setChecksums( + ObjectChecksums.newBuilder() + .setCrc32C(0) + .build()) + .build(); + + ResponseContentLifecycleManager manager = + ResponseContentLifecycleManager.noop(); + + try (GapicUnbufferedReadableByteChannel c = + new GapicUnbufferedReadableByteChannel( + SettableApiFuture.create(), + new ZeroCopyServerStreamingCallable<>( + new ServerStreamingCallable() { + @Override + public void call( + ReadObjectRequest request, + ResponseObserver respond, + ApiCallContext context) { + respond.onStart(TestUtils.nullStreamController()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setMetadata(metadata) + .build()); + respond.onComplete(); + } + }, + manager), + ReadObjectRequest.getDefaultInstance(), + Hasher.defaultHasher(), + Retrier.attemptOnce(), + Retrying.neverRetry())) { + + ByteBuffer buffer = ByteBuffer.allocate(15); + int read = (int) c.read(buffer); + assertThat(read).isEqualTo(0); + assertThat(c.read(buffer)).isEqualTo(-1); + } + } } From 91e8f36400d90ef7e81b7d6ce9df95ed7d1cd3db Mon Sep 17 00:00:00 2001 From: Dhriti Chopra Date: Mon, 25 May 2026 15:15:22 +0000 Subject: [PATCH 4/7] fix: fix lint --- .../GapicUnbufferedReadableByteChannel.java | 15 +- ...apicUnbufferedReadableByteChannelTest.java | 134 ++++++++---------- 2 files changed, 72 insertions(+), 77 deletions(-) diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index 776cb13e2ebf..26bc6ac37cef 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -33,7 +33,6 @@ import com.google.cloud.storage.Hasher.UncheckedChecksumMismatchException; import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef; import com.google.cloud.storage.Retrying.Retrier; -import java.util.OptionalLong; import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel; import com.google.common.base.Suppliers; import com.google.protobuf.ByteString; @@ -50,6 +49,7 @@ import java.nio.channels.ScatteringByteChannel; import java.util.List; import java.util.Locale; +import java.util.OptionalLong; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -92,9 +92,15 @@ final class GapicUnbufferedReadableByteChannel this.result = result; this.read = read; this.req = req; - this.hasher = (req.getReadOffset() == 0) - ? new CumulativeHasher(hasher, 0, req.getReadLimit() <= 0 ? OptionalLong.empty() : OptionalLong.of(req.getReadLimit())) - : hasher; + this.hasher = + (req.getReadOffset() == 0) + ? new CumulativeHasher( + hasher, + 0, + req.getReadLimit() <= 0 + ? OptionalLong.empty() + : OptionalLong.of(req.getReadLimit())) + : hasher; this.fetchOffset = new AtomicLong(req.getReadOffset()); this.blobOffset = req.getReadOffset(); this.retrier = retrier; @@ -326,7 +332,6 @@ private void validateCumulativeChecksum() throws IOException { } } - private final class ReadObjectObserver extends StateCheckingResponseObserver { private final SettableApiFuture open = SettableApiFuture.create(); diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java index 44553923ca54..0fce7f04afd8 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannelTest.java @@ -87,10 +87,7 @@ public void validateCumulativeChecksum_success() throws IOException { Object metadata = Object.newBuilder() .setSize(testContent.length()) - .setChecksums( - ObjectChecksums.newBuilder() - .setCrc32C(testContent.getCrc32c()) - .build()) + .setChecksums(ObjectChecksums.newBuilder().setCrc32C(testContent.getCrc32c()).build()) .build(); ResponseContentLifecycleManager manager = @@ -137,9 +134,7 @@ public void validateCumulativeChecksum_failure() throws IOException { Object.newBuilder() .setSize(testContent.length()) .setChecksums( - ObjectChecksums.newBuilder() - .setCrc32C(testContent.getCrc32c() + 1) - .build()) + ObjectChecksums.newBuilder().setCrc32C(testContent.getCrc32c() + 1).build()) .build(); ResponseContentLifecycleManager manager = @@ -173,7 +168,8 @@ public void call( ByteBuffer buffer = ByteBuffer.allocate(15); IOException exception = assertThrows(IOException.class, () -> c.read(buffer)); assertThat(exception.getCause()).isInstanceOf(StorageException.class); - assertThat(exception.getCause().getCause()).isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); + assertThat(exception.getCause().getCause()) + .isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); } } @@ -186,9 +182,7 @@ public void validateCumulativeChecksum_skippedForRangedRead() throws IOException Object.newBuilder() .setSize(testContent.length()) .setChecksums( - ObjectChecksums.newBuilder() - .setCrc32C(testContent.getCrc32c() + 1) - .build()) + ObjectChecksums.newBuilder().setCrc32C(testContent.getCrc32c() + 1).build()) .build(); ResponseContentLifecycleManager manager = @@ -239,10 +233,7 @@ public void validateCumulativeChecksum_multipleChunks_success() throws IOExcepti Object metadata = Object.newBuilder() .setSize(fullContent.length()) - .setChecksums( - ObjectChecksums.newBuilder() - .setCrc32C(fullContent.getCrc32c()) - .build()) + .setChecksums(ObjectChecksums.newBuilder().setCrc32C(fullContent.getCrc32c()).build()) .build(); ResponseContentLifecycleManager manager = @@ -259,26 +250,28 @@ public void call( ResponseObserver respond, ApiCallContext context) { respond.onStart(TestUtils.nullStreamController()); - new Thread(() -> { - try { - respond.onResponse( - ReadObjectResponse.newBuilder() - .setChecksummedData(chunk1.asChecksummedData()) - .build()); - respond.onResponse( - ReadObjectResponse.newBuilder() - .setChecksummedData(chunk2.asChecksummedData()) - .build()); - respond.onResponse( - ReadObjectResponse.newBuilder() - .setChecksummedData(chunk3.asChecksummedData()) - .setMetadata(metadata) - .build()); - respond.onComplete(); - } catch (Throwable t) { - respond.onError(t); - } - }).start(); + new Thread( + () -> { + try { + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk1.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk2.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk3.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } catch (Throwable t) { + respond.onError(t); + } + }) + .start(); } }, manager), @@ -306,9 +299,7 @@ public void validateCumulativeChecksum_multipleChunks_failure() throws IOExcepti Object.newBuilder() .setSize(fullContent.length()) .setChecksums( - ObjectChecksums.newBuilder() - .setCrc32C(fullContent.getCrc32c() + 1) - .build()) + ObjectChecksums.newBuilder().setCrc32C(fullContent.getCrc32c() + 1).build()) .build(); ResponseContentLifecycleManager manager = @@ -325,26 +316,28 @@ public void call( ResponseObserver respond, ApiCallContext context) { respond.onStart(TestUtils.nullStreamController()); - new Thread(() -> { - try { - respond.onResponse( - ReadObjectResponse.newBuilder() - .setChecksummedData(chunk1.asChecksummedData()) - .build()); - respond.onResponse( - ReadObjectResponse.newBuilder() - .setChecksummedData(chunk2.asChecksummedData()) - .build()); - respond.onResponse( - ReadObjectResponse.newBuilder() - .setChecksummedData(chunk3.asChecksummedData()) - .setMetadata(metadata) - .build()); - respond.onComplete(); - } catch (Throwable t) { - respond.onError(t); - } - }).start(); + new Thread( + () -> { + try { + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk1.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk2.asChecksummedData()) + .build()); + respond.onResponse( + ReadObjectResponse.newBuilder() + .setChecksummedData(chunk3.asChecksummedData()) + .setMetadata(metadata) + .build()); + respond.onComplete(); + } catch (Throwable t) { + respond.onError(t); + } + }) + .start(); } }, manager), @@ -354,11 +347,15 @@ public void call( Retrying.neverRetry())) { ByteBuffer buffer = ByteBuffer.allocate(20); - IOException exception = assertThrows(IOException.class, () -> { - c.read(new ByteBuffer[] {buffer}, 0, 1); - }); + IOException exception = + assertThrows( + IOException.class, + () -> { + c.read(new ByteBuffer[] {buffer}, 0, 1); + }); assertThat(exception.getCause()).isInstanceOf(StorageException.class); - assertThat(exception.getCause().getCause()).isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); + assertThat(exception.getCause().getCause()) + .isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); } } @@ -416,9 +413,7 @@ public void validateCumulativeChecksum_nonZeroOffset_skipped() throws IOExceptio Object.newBuilder() .setSize(testContent.length()) .setChecksums( - ObjectChecksums.newBuilder() - .setCrc32C(testContent.getCrc32c() + 1) - .build()) + ObjectChecksums.newBuilder().setCrc32C(testContent.getCrc32c() + 1).build()) .build(); ResponseContentLifecycleManager manager = @@ -462,10 +457,7 @@ public void validateCumulativeChecksum_zeroByteObject_success() throws IOExcepti Object metadata = Object.newBuilder() .setSize(0) - .setChecksums( - ObjectChecksums.newBuilder() - .setCrc32C(0) - .build()) + .setChecksums(ObjectChecksums.newBuilder().setCrc32C(0).build()) .build(); ResponseContentLifecycleManager manager = @@ -483,9 +475,7 @@ public void call( ApiCallContext context) { respond.onStart(TestUtils.nullStreamController()); respond.onResponse( - ReadObjectResponse.newBuilder() - .setMetadata(metadata) - .build()); + ReadObjectResponse.newBuilder().setMetadata(metadata).build()); respond.onComplete(); } }, From e67803a97bd33df83d883ca1bfa09722f2c04515 Mon Sep 17 00:00:00 2001 From: Dhriti Chopra Date: Mon, 25 May 2026 15:24:33 +0000 Subject: [PATCH 5/7] feat(storage): add full object checksum validation for bidi flow --- .../BaseObjectReadSessionStreamRead.java | 10 +- .../storage/ObjectReadSessionStream.java | 15 + .../storage/ObjectReadSessionStreamTest.java | 466 ++++++++++++++++++ 3 files changed, 489 insertions(+), 2 deletions(-) diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java index dc71350d70ca..a590f404e57b 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java @@ -150,7 +150,10 @@ private AccumulatingRead( IOAutoCloseable onCloseCallback) { super(rangeSpec, retryContext, onCloseCallback); this.readId = readId; - this.hasher = hasher; + this.hasher = + (rangeSpec.begin() == 0) + ? new CumulativeHasher(hasher, 0, rangeSpec.maxLength()) + : hasher; this.complete = SettableApiFuture.create(); this.childRefs = Collections.synchronizedList(new ArrayList<>()); } @@ -280,7 +283,10 @@ static class StreamingRead extends BaseObjectReadSessionStreamRead(2); diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java index 6f02b16866a8..beda7e5f7732 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java @@ -351,6 +351,12 @@ public void onResponse(BidiReadObjectResponse response) { executor.execute( StorageException.liftToRunnable( () -> { + try { + validateCumulativeChecksum(read); + } catch (UncheckedCumulativeChecksumMismatchException e) { + state.removeOutstandingReadOnFailure(id, read::fail).onFailure(e); + return; + } read.eof(); // don't remove the outstanding read until the future has been resolved state.removeOutstandingRead(id); @@ -545,6 +551,15 @@ public void onComplete() { } } + private void validateCumulativeChecksum(ObjectReadSessionStreamRead read) { + Hasher hasher = read.hasher(); + if (hasher instanceof CumulativeHasher) { + CumulativeHasher cumulativeHasher = (CumulativeHasher) hasher; + com.google.storage.v2.Object metadata = state.getMetadata(); + cumulativeHasher.validateCumulativeChecksum(metadata); + } + } + static ObjectReadSessionStream create( ScheduledExecutorService executor, ZeroCopyBidiStreamingCallable callable, diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamTest.java index 75fec2cb2de6..925ae4c1fe5f 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.storage.ByteSizeConstants._2MiB; import static com.google.cloud.storage.TestUtils.assertAll; +import static com.google.cloud.storage.TestUtils.xxd; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; @@ -41,14 +42,17 @@ import com.google.cloud.storage.RetryContext.RetryContextProvider; import com.google.cloud.storage.RetryContextTest.BlockingOnSuccess; import com.google.cloud.storage.Retrying.RetryingDependencies; +import com.google.cloud.storage.it.ChecksummedTestContent; import com.google.protobuf.ByteString; import com.google.storage.v2.BidiReadObjectRequest; import com.google.storage.v2.BidiReadObjectResponse; import com.google.storage.v2.BidiReadObjectSpec; import com.google.storage.v2.BucketName; import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -388,4 +392,466 @@ static TestObjectReadSessionStreamRead of() { id, RangeSpec.of(0, 10), RetryContext.neverRetry()); } } + + @Test + public void validateCumulativeChecksum_bidi_success() throws Exception { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object bidiMetadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums(ObjectChecksums.newBuilder().setCrc32C(testContent.getCrc32c()).build()) + .build(); + + SettableApiFuture> observerFuture = + SettableApiFuture.create(); + + ZeroCopyBidiStreamingCallable customCallable = + new ZeroCopyBidiStreamingCallable<>( + new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver responseObserver, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + observerFuture.set(responseObserver); + responseObserver.onStart(TestUtils.nullStreamController()); + return new ClientStream() { + @Override + public void send(BidiReadObjectRequest request) {} + + @Override + public void closeSendWithError(Throwable t) {} + + @Override + public void closeSend() { + responseObserver.onComplete(); + } + + @Override + public boolean isSendReady() { + return true; + } + }; + } + }, + ResponseContentLifecycleManager.noopBidiReadObjectResponse()); + + try (AccumulatingRead read1 = + ObjectReadSessionStreamRead.createByteArrayAccumulatingRead( + 1, RangeSpec.all(), Hasher.defaultHasher(), RetryContext.neverRetry())) { + state.putOutstandingRead(1, read1); + + try (ObjectReadSessionStream stream = + ObjectReadSessionStream.create(exec, customCallable, state, RetryContext.neverRetry())) { + + stream.send(BidiReadObjectRequest.getDefaultInstance()); + + ResponseObserver observer = observerFuture.get(2, TimeUnit.SECONDS); + + BidiReadObjectResponse resp = + BidiReadObjectResponse.newBuilder() + .setMetadata(bidiMetadata) + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange( + com.google.storage.v2.ReadRange.newBuilder() + .setReadId(1) + .setReadOffset(0) + .build()) + .setChecksummedData(testContent.asChecksummedData()) + .setRangeEnd(true) + .build()) + .build(); + + observer.onResponse(resp); + + byte[] resultBytes = read1.get(2, TimeUnit.SECONDS); + assertThat(xxd(ByteBuffer.wrap(resultBytes))).isEqualTo(xxd(testContent.asByteBuffer())); + } + } + } + + @Test + public void validateCumulativeChecksum_bidi_failure() throws Exception { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object bidiMetadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder().setCrc32C(testContent.getCrc32c() + 1).build()) + .build(); + + SettableApiFuture> observerFuture = + SettableApiFuture.create(); + + ZeroCopyBidiStreamingCallable customCallable = + new ZeroCopyBidiStreamingCallable<>( + new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver responseObserver, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + observerFuture.set(responseObserver); + responseObserver.onStart(TestUtils.nullStreamController()); + return new ClientStream() { + @Override + public void send(BidiReadObjectRequest request) {} + + @Override + public void closeSendWithError(Throwable t) {} + + @Override + public void closeSend() { + responseObserver.onComplete(); + } + + @Override + public boolean isSendReady() { + return true; + } + }; + } + }, + ResponseContentLifecycleManager.noopBidiReadObjectResponse()); + + try (AccumulatingRead read1 = + ObjectReadSessionStreamRead.createByteArrayAccumulatingRead( + 1, RangeSpec.all(), Hasher.defaultHasher(), RetryContext.neverRetry())) { + state.putOutstandingRead(1, read1); + + try (ObjectReadSessionStream stream = + ObjectReadSessionStream.create(exec, customCallable, state, RetryContext.neverRetry())) { + + stream.send(BidiReadObjectRequest.getDefaultInstance()); + + ResponseObserver observer = observerFuture.get(2, TimeUnit.SECONDS); + + BidiReadObjectResponse resp = + BidiReadObjectResponse.newBuilder() + .setMetadata(bidiMetadata) + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange( + com.google.storage.v2.ReadRange.newBuilder() + .setReadId(1) + .setReadOffset(0) + .build()) + .setChecksummedData(testContent.asChecksummedData()) + .setRangeEnd(true) + .build()) + .build(); + + observer.onResponse(resp); + + ExecutionException exception = + assertThrows(ExecutionException.class, () -> read1.get(2, TimeUnit.SECONDS)); + assertThat(exception.getCause()) + .isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); + } + } + } + + @Test + public void validateCumulativeChecksum_bidi_skippedForRangedRead() throws Exception { + ChecksummedTestContent testContent = + ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(10)); + + Object bidiMetadata = + Object.newBuilder() + .setSize(testContent.length()) + .setChecksums( + ObjectChecksums.newBuilder().setCrc32C(testContent.getCrc32c() + 1).build()) + .build(); + + SettableApiFuture> observerFuture = + SettableApiFuture.create(); + + ZeroCopyBidiStreamingCallable customCallable = + new ZeroCopyBidiStreamingCallable<>( + new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver responseObserver, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + observerFuture.set(responseObserver); + responseObserver.onStart(TestUtils.nullStreamController()); + return new ClientStream() { + @Override + public void send(BidiReadObjectRequest request) {} + + @Override + public void closeSendWithError(Throwable t) {} + + @Override + public void closeSend() { + responseObserver.onComplete(); + } + + @Override + public boolean isSendReady() { + return true; + } + }; + } + }, + ResponseContentLifecycleManager.noopBidiReadObjectResponse()); + + try (AccumulatingRead read1 = + ObjectReadSessionStreamRead.createByteArrayAccumulatingRead( + 1, RangeSpec.of(0, 5), Hasher.defaultHasher(), RetryContext.neverRetry())) { + state.putOutstandingRead(1, read1); + + try (ObjectReadSessionStream stream = + ObjectReadSessionStream.create(exec, customCallable, state, RetryContext.neverRetry())) { + + stream.send(BidiReadObjectRequest.getDefaultInstance()); + + ResponseObserver observer = observerFuture.get(2, TimeUnit.SECONDS); + + BidiReadObjectResponse resp = + BidiReadObjectResponse.newBuilder() + .setMetadata(bidiMetadata) + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange( + com.google.storage.v2.ReadRange.newBuilder() + .setReadId(1) + .setReadOffset(0) + .build()) + .setChecksummedData(testContent.slice(0, 5).asChecksummedData()) + .setRangeEnd(true) + .build()) + .build(); + + observer.onResponse(resp); + + byte[] resultBytes = read1.get(2, TimeUnit.SECONDS); + assertThat(xxd(ByteBuffer.wrap(resultBytes))) + .isEqualTo(xxd(testContent.slice(0, 5).asByteBuffer())); + } + } + } + + @Test + public void validateCumulativeChecksum_bidi_multipleChunks_success() throws Exception { + ChecksummedTestContent chunk1 = ChecksummedTestContent.of("abcde".getBytes()); + ChecksummedTestContent chunk2 = ChecksummedTestContent.of("fghij".getBytes()); + ChecksummedTestContent chunk3 = ChecksummedTestContent.of("klmno".getBytes()); + byte[] fullBytes = "abcdefghijklmno".getBytes(); + ChecksummedTestContent fullContent = ChecksummedTestContent.of(fullBytes); + + Object bidiMetadata = + Object.newBuilder() + .setSize(fullContent.length()) + .setChecksums(ObjectChecksums.newBuilder().setCrc32C(fullContent.getCrc32c()).build()) + .build(); + + SettableApiFuture> observerFuture = + SettableApiFuture.create(); + + ZeroCopyBidiStreamingCallable customCallable = + new ZeroCopyBidiStreamingCallable<>( + new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver responseObserver, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + observerFuture.set(responseObserver); + responseObserver.onStart(TestUtils.nullStreamController()); + return new ClientStream() { + @Override + public void send(BidiReadObjectRequest request) {} + + @Override + public void closeSendWithError(Throwable t) {} + + @Override + public void closeSend() { + responseObserver.onComplete(); + } + + @Override + public boolean isSendReady() { + return true; + } + }; + } + }, + ResponseContentLifecycleManager.noopBidiReadObjectResponse()); + + try (AccumulatingRead read1 = + ObjectReadSessionStreamRead.createByteArrayAccumulatingRead( + 1, RangeSpec.all(), Hasher.defaultHasher(), RetryContext.neverRetry())) { + state.putOutstandingRead(1, read1); + + try (ObjectReadSessionStream stream = + ObjectReadSessionStream.create(exec, customCallable, state, RetryContext.neverRetry())) { + + stream.send(BidiReadObjectRequest.getDefaultInstance()); + + ResponseObserver observer = observerFuture.get(2, TimeUnit.SECONDS); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange( + com.google.storage.v2.ReadRange.newBuilder() + .setReadId(1) + .setReadOffset(0) + .build()) + .setChecksummedData(chunk1.asChecksummedData()) + .build()) + .build()); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange( + com.google.storage.v2.ReadRange.newBuilder() + .setReadId(1) + .setReadOffset(5) + .build()) + .setChecksummedData(chunk2.asChecksummedData()) + .build()) + .build()); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .setMetadata(bidiMetadata) + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange( + com.google.storage.v2.ReadRange.newBuilder() + .setReadId(1) + .setReadOffset(10) + .build()) + .setChecksummedData(chunk3.asChecksummedData()) + .setRangeEnd(true) + .build()) + .build()); + + byte[] resultBytes = read1.get(2, TimeUnit.SECONDS); + assertThat(xxd(ByteBuffer.wrap(resultBytes))).isEqualTo(xxd(fullContent.asByteBuffer())); + } + } + } + + @Test + public void validateCumulativeChecksum_bidi_multipleChunks_failure() throws Exception { + ChecksummedTestContent chunk1 = ChecksummedTestContent.of("abcde".getBytes()); + ChecksummedTestContent chunk2 = ChecksummedTestContent.of("fghij".getBytes()); + ChecksummedTestContent chunk3 = ChecksummedTestContent.of("klmno".getBytes()); + byte[] fullBytes = "abcdefghijklmno".getBytes(); + ChecksummedTestContent fullContent = ChecksummedTestContent.of(fullBytes); + + Object bidiMetadata = + Object.newBuilder() + .setSize(fullContent.length()) + .setChecksums( + ObjectChecksums.newBuilder().setCrc32C(fullContent.getCrc32c() + 1).build()) + .build(); + + SettableApiFuture> observerFuture = + SettableApiFuture.create(); + + ZeroCopyBidiStreamingCallable customCallable = + new ZeroCopyBidiStreamingCallable<>( + new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver responseObserver, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + observerFuture.set(responseObserver); + responseObserver.onStart(TestUtils.nullStreamController()); + return new ClientStream() { + @Override + public void send(BidiReadObjectRequest request) {} + + @Override + public void closeSendWithError(Throwable t) {} + + @Override + public void closeSend() { + responseObserver.onComplete(); + } + + @Override + public boolean isSendReady() { + return true; + } + }; + } + }, + ResponseContentLifecycleManager.noopBidiReadObjectResponse()); + + try (AccumulatingRead read1 = + ObjectReadSessionStreamRead.createByteArrayAccumulatingRead( + 1, RangeSpec.all(), Hasher.defaultHasher(), RetryContext.neverRetry())) { + state.putOutstandingRead(1, read1); + + try (ObjectReadSessionStream stream = + ObjectReadSessionStream.create(exec, customCallable, state, RetryContext.neverRetry())) { + + stream.send(BidiReadObjectRequest.getDefaultInstance()); + + ResponseObserver observer = observerFuture.get(2, TimeUnit.SECONDS); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange( + com.google.storage.v2.ReadRange.newBuilder() + .setReadId(1) + .setReadOffset(0) + .build()) + .setChecksummedData(chunk1.asChecksummedData()) + .build()) + .build()); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange( + com.google.storage.v2.ReadRange.newBuilder() + .setReadId(1) + .setReadOffset(5) + .build()) + .setChecksummedData(chunk2.asChecksummedData()) + .build()) + .build()); + + observer.onResponse( + BidiReadObjectResponse.newBuilder() + .setMetadata(bidiMetadata) + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange( + com.google.storage.v2.ReadRange.newBuilder() + .setReadId(1) + .setReadOffset(10) + .build()) + .setChecksummedData(chunk3.asChecksummedData()) + .setRangeEnd(true) + .build()) + .build()); + + ExecutionException exception = + assertThrows(ExecutionException.class, () -> read1.get(2, TimeUnit.SECONDS)); + assertThat(exception.getCause()) + .isInstanceOf(UncheckedCumulativeChecksumMismatchException.class); + } + } + } } From a24e32a62687d67915ec77283e359577ce50fda5 Mon Sep 17 00:00:00 2001 From: Dhriti Chopra Date: Tue, 26 May 2026 05:52:41 +0000 Subject: [PATCH 6/7] Disabling cumulative hasher for noop --- .../BaseObjectReadSessionStreamRead.java | 4 +- .../storage/ObjectReadSessionStreamTest.java | 81 +++++++++++++++++++ 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java index a590f404e57b..0032d1a2da91 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java @@ -151,7 +151,7 @@ private AccumulatingRead( super(rangeSpec, retryContext, onCloseCallback); this.readId = readId; this.hasher = - (rangeSpec.begin() == 0) + (rangeSpec.begin() == 0 && !(hasher instanceof Hasher.NoOpHasher)) ? new CumulativeHasher(hasher, 0, rangeSpec.maxLength()) : hasher; this.complete = SettableApiFuture.create(); @@ -284,7 +284,7 @@ static class StreamingRead extends BaseObjectReadSessionStreamRead> observerFuture = + SettableApiFuture.create(); + + ZeroCopyBidiStreamingCallable customCallable = + new ZeroCopyBidiStreamingCallable<>( + new BidiStreamingCallable() { + @Override + public ClientStream internalCall( + ResponseObserver responseObserver, + ClientStreamReadyObserver onReady, + ApiCallContext context) { + observerFuture.set(responseObserver); + responseObserver.onStart(TestUtils.nullStreamController()); + return new ClientStream() { + @Override + public void send(BidiReadObjectRequest request) {} + + @Override + public void closeSendWithError(Throwable t) {} + + @Override + public void closeSend() { + responseObserver.onComplete(); + } + + @Override + public boolean isSendReady() { + return true; + } + }; + } + }, + ResponseContentLifecycleManager.noopBidiReadObjectResponse()); + + try (AccumulatingRead read1 = + ObjectReadSessionStreamRead.createByteArrayAccumulatingRead( + 1, RangeSpec.all(), Hasher.noop(), RetryContext.neverRetry())) { + state.putOutstandingRead(1, read1); + + try (ObjectReadSessionStream stream = + ObjectReadSessionStream.create(exec, customCallable, state, RetryContext.neverRetry())) { + + stream.send(BidiReadObjectRequest.getDefaultInstance()); + + ResponseObserver observer = observerFuture.get(2, TimeUnit.SECONDS); + + BidiReadObjectResponse resp = + BidiReadObjectResponse.newBuilder() + .setMetadata(bidiMetadata) + .addObjectDataRanges( + com.google.storage.v2.ObjectRangeData.newBuilder() + .setReadRange( + com.google.storage.v2.ReadRange.newBuilder() + .setReadId(1) + .setReadOffset(0) + .build()) + .setChecksummedData(testContent.asChecksummedData()) + .setRangeEnd(true) + .build()) + .build(); + + observer.onResponse(resp); + + byte[] resultBytes = read1.get(2, TimeUnit.SECONDS); + assertThat(xxd(ByteBuffer.wrap(resultBytes))).isEqualTo(xxd(testContent.asByteBuffer())); + } + } + } + @Test public void validateCumulativeChecksum_bidi_skippedForRangedRead() throws Exception { ChecksummedTestContent testContent = From 50a27bf741a93ef6b33a2122b2d6769f034ab5df Mon Sep 17 00:00:00 2001 From: Dhriti Chopra Date: Thu, 4 Jun 2026 05:33:32 +0000 Subject: [PATCH 7/7] feat: Adding takeover operation checksum --- .../google/cloud/storage/GrpcStorageImpl.java | 7 +- .../storage/ITAppendableUploadFakeTest.java | 79 +++++++++++++++++++ .../cloud/storage/ITAppendableUploadTest.java | 12 +-- 3 files changed, 91 insertions(+), 7 deletions(-) diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 120b7a269724..c14d676d1ea5 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -1514,13 +1514,18 @@ public AppendableUploadState getAppendableState( : getBidiWriteObjectRequest(info, opts, /* appendable= */ true); AppendableUploadState state; if (takeOver) { + Crc32cValue.Crc32cLengthKnown initialCrc32c = null; + if (info.getCrc32c() != null && info.getSize() != null) { + initialCrc32c = + Crc32cValue.of(Utils.crc32cCodec.decode(info.getCrc32c()), info.getSize()); + } state = BidiUploadState.appendableTakeover( req, Retrying::newCallContext, maxPendingBytes, SettableApiFuture.create(), - /* initialCrc32c= */ null); + initialCrc32c); } else { state = BidiUploadState.appendableNew( diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java index 78d07c4d9425..65e1006d3cb0 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java @@ -921,6 +921,85 @@ public void crc32cWorks() throws Exception { } } + @Test + public void takeoverChecksumsWorks() throws Exception { + byte[] b = new byte[10]; + DataGenerator.base64Characters().fill(b, 0, 3); // ABC + DataGenerator.base64Characters().fill(b, 3, 7); // DEFGHIJ + ChecksummedTestContent existing = ChecksummedTestContent.of(b, 0, 3); + ChecksummedTestContent appendData = ChecksummedTestContent.of(b, 3, 7); + ChecksummedTestContent all = ChecksummedTestContent.of(b); + + BidiWriteObjectRequest takeoverInitialReq = + BidiWriteObjectRequest.newBuilder() + .setAppendObjectSpec( + AppendObjectSpec.newBuilder() + .setBucket(METADATA.getBucket()) + .setObject(METADATA.getName()) + .setGeneration(METADATA.getGeneration()) + .build()) + .setStateLookup(true) + .build(); + BidiWriteObjectResponse takeoverResNoResource = + BidiWriteObjectResponse.newBuilder() + .setPersistedSize(3) + .build(); + + BidiWriteObjectRequest req1 = + BidiWriteObjectRequest.newBuilder() + .setWriteOffset(3) + .setChecksummedData(appendData.asChecksummedData()) + .setFinishWrite(true) + .setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(all.getCrc32c()).build()) + .build(); + BidiWriteObjectResponse res1 = + BidiWriteObjectResponse.newBuilder() + .setResource( + Object.newBuilder() + .setName(METADATA.getName()) + .setBucket(METADATA.getBucket()) + .setGeneration(METADATA.getGeneration()) + .setSize(10) + .setFinalizeTime(timestampNow()) + .setChecksums(ObjectChecksums.newBuilder().setCrc32C(all.getCrc32c()).build()) + .build()) + .build(); + + FakeStorage fake = + FakeStorage.of( + ImmutableMap.of( + takeoverInitialReq, + respond -> respond.onNext(takeoverResNoResource), + req1, + respond -> { + respond.onNext(res1); + respond.onCompleted(); + })); + try (FakeServer fakeServer = FakeServer.of(fake); + Storage storage = fakeServer.getGrpcStorageOptions().toBuilder().build().getService()) { + BlobId id = BlobId.of("b", "o", METADATA.getGeneration()); + + BlobInfo done1 = + BlobInfo.newBuilder(id) + .setSize(3L) + .setCrc32c(Utils.crc32cCodec.encode(existing.getCrc32c())) + .build(); + + BlobAppendableUploadConfig config = + BlobAppendableUploadConfig.of() + .withFlushPolicy(FlushPolicy.maxFlushSize(10)) + .withCloseAction(CloseAction.FINALIZE_WHEN_CLOSING); + BlobAppendableUpload upload = + storage.blobAppendableUpload(done1, config); + try (AppendableUploadWriteableByteChannel channel = upload.open()) { + StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(appendData.getBytes()), channel); + } + ApiFuture result = upload.getResult(); + BlobInfo finalInfo = result.get(5, TimeUnit.SECONDS); + assertThat(finalInfo.getCrc32c()).isEqualTo(Utils.crc32cCodec.encode(all.getCrc32c())); + } + } + /** * If a stream is held open for an extended period (i.e. longer than the configured retry timeout) * and the server returns an error, we want to make sure the currently pending request is able to diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java index 9e3ae209fbbe..6df2efc3b843 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadTest.java @@ -189,8 +189,7 @@ public void appendableBlobUploadTakeover() throws Exception { assertThat(done1.getCrc32c()).isEqualTo(Utils.crc32cCodec.encode(c1.getCrc32c())); BlobAppendableUpload takeOver = - storage.blobAppendableUpload( - BlobInfo.newBuilder(done1.getBlobId()).build(), p.uploadConfig); + storage.blobAppendableUpload(done1, p.uploadConfig); try (AppendableUploadWriteableByteChannel channel = takeOver.open()) { int written = Buffers.emptyTo(ByteBuffer.wrap(c2.getBytes()), channel); assertThat(written).isEqualTo(c2.length()); @@ -198,7 +197,7 @@ public void appendableBlobUploadTakeover() throws Exception { BlobInfo done2 = takeOver.getResult().get(5, TimeUnit.SECONDS); assertThat(done2.getSize()).isEqualTo(p.content.length()); - assertThat(done2.getCrc32c()).isAnyOf(Utils.crc32cCodec.encode(p.content.getCrc32c()), null); + assertThat(done2.getCrc32c()).isEqualTo(Utils.crc32cCodec.encode(p.content.getCrc32c())); } @Test @@ -251,15 +250,16 @@ public void takeoverJustToFinalizeWorks() throws Exception { assertThat(done1.getCrc32c()).isEqualTo(Utils.crc32cCodec.encode(p.content.getCrc32c())); BlobAppendableUpload takeOver = - storage.blobAppendableUpload( - BlobInfo.newBuilder(done1.getBlobId()).build(), p.uploadConfig); + storage.blobAppendableUpload(done1, p.uploadConfig); takeOver.open().finalizeAndClose(); BlobInfo done2 = takeOver.getResult().get(5, TimeUnit.SECONDS); assertAll( () -> assertThat(done2).isNotNull(), () -> assertThat(done2.getSize()).isEqualTo(p.content.length()), - () -> assertThat(done2.getCrc32c()).isNotNull()); + () -> + assertThat(done2.getCrc32c()) + .isEqualTo(Utils.crc32cCodec.encode(p.content.getCrc32c()))); } private void checkTestbenchIssue733() {