Skip to content

RATIS-2568. Add Streaming Read to FileStore#1488

Open
amaliujia wants to merge 1 commit into
apache:masterfrom
amaliujia:streaming_read_filestore
Open

RATIS-2568. Add Streaming Read to FileStore#1488
amaliujia wants to merge 1 commit into
apache:masterfrom
amaliujia:streaming_read_filestore

Conversation

@amaliujia

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Add streaming read support to the FileStore example, mirroring the existing data-stream write path and building on RATIS-2546 (DataStreamApi.streamReadOnly).

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/RATIS-2568

Please replace this section with the link to the Apache JIRA)

How was this patch tested?

  1. Unit test
  2. Manually test through command line and run FileStore cli.

@amaliujia

Copy link
Copy Markdown
Contributor Author

@peterxcli @szetszwo

return ProtoUtils.toByteString(p.toString());
}

static ByteBuffer getReplyBuffer(DataStreamReply reply) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if we could make the DataStreamInput#readAsync to return:

ReferenceCountedObject<DataStreamReadChunk>
interface DataStreamReadChunk {
  long getOffset();
  ByteBuffer[] nioBuffers();
}

no terminal reply bytebuffer handling logic anymore, user don't need to aware the DataStreamReplyByteBuf and DataStreamReplyByteBuffer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterxcli I found it is quite a lot changes for this API change. So I put it into a different PR: #1492

long remaining = length;
while (remaining > 0) {
final int chunkSize = FileStoreCommon.getChunkSize(remaining);
final ByteBuffer buffer = ByteBuffer.allocateDirect(chunkSize);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reuse the buffe?

final ByteBuffer buffer =
    ByteBuffer.allocateDirect(FileStoreCommon.getChunkSize(length));

while (remaining > 0) {
  buffer.clear();
  buffer.limit(FileStoreCommon.getChunkSize(remaining));
  ...
}

@szetszwo szetszwo left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amaliujia , thanks for working on this!

Just have two comments inlined.

Comment on lines +100 to +117
try (SeekableByteChannel in = Files.newByteChannel(
resolver.apply(getRelativePath()), StandardOpenOption.READ)) {
in.position(offset);
long remaining = length;
while (remaining > 0) {
final int chunkSize = FileStoreCommon.getChunkSize(remaining);
final ByteBuffer buffer = ByteBuffer.allocateDirect(chunkSize);
final int n = in.read(buffer);
if (n <= 0) {
break;
}
buffer.flip();
stream.write(buffer);
remaining -= n;
}
} finally {
stream.close();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long total = 0;
try (DataStreamInput in = getStreamInput(path, offset, length)) {
while (true) {
final ReferenceCountedObject<DataStreamReply> ref = in.readAsync().join();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not call join(). Otherwise, it becomes sync'ed. I think it is fine for now and we can improve it later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants