RATIS-2568. Add Streaming Read to FileStore#1488
Conversation
| return ProtoUtils.toByteString(p.toString()); | ||
| } | ||
|
|
||
| static ByteBuffer getReplyBuffer(DataStreamReply reply) { |
There was a problem hiding this comment.
I'm thinking if we could make the DataStreamInput#readAsync to return:
ReferenceCountedObject<DataStreamReadChunk>
interface DataStreamReadChunk {
long getOffset();
ByteBuffer[] nioBuffers();
}no terminal reply bytebuffer handling logic anymore, user don't need to aware the DataStreamReplyByteBuf and DataStreamReplyByteBuffer.
There was a problem hiding this comment.
@peterxcli I found it is quite a lot changes for this API change. So I put it into a different PR: #1492
| long remaining = length; | ||
| while (remaining > 0) { | ||
| final int chunkSize = FileStoreCommon.getChunkSize(remaining); | ||
| final ByteBuffer buffer = ByteBuffer.allocateDirect(chunkSize); |
There was a problem hiding this comment.
Could we reuse the buffe?
final ByteBuffer buffer =
ByteBuffer.allocateDirect(FileStoreCommon.getChunkSize(length));
while (remaining > 0) {
buffer.clear();
buffer.limit(FileStoreCommon.getChunkSize(remaining));
...
}
szetszwo
left a comment
There was a problem hiding this comment.
@amaliujia , thanks for working on this!
Just have two comments inlined.
| try (SeekableByteChannel in = Files.newByteChannel( | ||
| resolver.apply(getRelativePath()), StandardOpenOption.READ)) { | ||
| in.position(offset); | ||
| long remaining = length; | ||
| while (remaining > 0) { | ||
| final int chunkSize = FileStoreCommon.getChunkSize(remaining); | ||
| final ByteBuffer buffer = ByteBuffer.allocateDirect(chunkSize); | ||
| final int n = in.read(buffer); | ||
| if (n <= 0) { | ||
| break; | ||
| } | ||
| buffer.flip(); | ||
| stream.write(buffer); | ||
| remaining -= n; | ||
| } | ||
| } finally { | ||
| stream.close(); | ||
| } |
There was a problem hiding this comment.
| long total = 0; | ||
| try (DataStreamInput in = getStreamInput(path, offset, length)) { | ||
| while (true) { | ||
| final ReferenceCountedObject<DataStreamReply> ref = in.readAsync().join(); |
There was a problem hiding this comment.
We should not call join(). Otherwise, it becomes sync'ed. I think it is fine for now and we can improve it later.
What changes were proposed in this pull request?
Add streaming read support to the FileStore example, mirroring the existing data-stream write path and building on RATIS-2546 (DataStreamApi.streamReadOnly).
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/RATIS-2568
Please replace this section with the link to the Apache JIRA)
How was this patch tested?