Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.bytes.LittleEndianDataOutputStream;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
Expand All @@ -46,11 +45,9 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {

private ValuesWriter lengthWriter;
private CapacityByteArrayOutputStream arrayOut;
private LittleEndianDataOutputStream out;

public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) {
arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator);
out = new LittleEndianDataOutputStream(arrayOut);
lengthWriter = new DeltaBinaryPackingValuesWriterForInteger(
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES,
DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS,
Expand All @@ -63,24 +60,29 @@ public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBuffe
public void writeBytes(Binary v) {
try {
lengthWriter.writeInteger(v.length());
v.writeTo(out);
v.writeTo(arrayOut);
} catch (IOException e) {
throw new ParquetEncodingException("could not write bytes", e);
}
}

/**
* Writes raw bytes directly, avoiding Binary object creation overhead.
* Used by {@link org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter}
* to write suffix bytes without creating an intermediate Binary.slice().
*/
public void writeBytes(byte[] data, int offset, int length) {
lengthWriter.writeInteger(length);
arrayOut.write(data, offset, length);
}

@Override
public long getBufferedSize() {
return lengthWriter.getBufferedSize() + arrayOut.size();
}

@Override
public BytesInput getBytes() {
try {
out.flush();
} catch (IOException e) {
throw new ParquetEncodingException("could not write page", e);
}
LOG.debug("writing a buffer of size {}", arrayOut.size());
return BytesInput.concat(lengthWriter.getBytes(), BytesInput.from(arrayOut));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
public class DeltaByteArrayWriter extends ValuesWriter {

private ValuesWriter prefixLengthWriter;
private ValuesWriter suffixWriter;
private DeltaLengthByteArrayValuesWriter suffixWriter;
private byte[] previous;

public DeltaByteArrayWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
Expand Down Expand Up @@ -95,7 +95,9 @@ public void writeBytes(Binary v) {
for (i = 0; (i < length) && (previous[i] == vb[i]); i++)
;
prefixLengthWriter.writeInteger(i);
suffixWriter.writeBytes(v.slice(i, vb.length - i));
// Write suffix bytes directly from the byte array, avoiding Binary.slice() allocation
// and the virtual dispatch chain through Binary.writeTo()
suffixWriter.writeBytes(vb, i, vb.length - i);
previous = vb;
}
}