RFR 8184285: Buffer sizes of Flow based BodyProcessor API

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

RFR 8184285: Buffer sizes of Flow based BodyProcessor API

Michael McMahon
Hi,

The HTTP client work is continuing in a new branch of the JDK 10 sandbox
forest (http-client-branch),
and here is the first of a number of changes we want to make.

This one is to address the feedback we received where
HttpResponse.BodyProcessors would
be easier to implement if there was control over the size of buffers
being supplied.

To that end we have added APIs for creating buffered response processors
(and handlers)

So, HttpResponse.BodyProcessor has a new static method with the
following signature

public static <T> BodyProcessor<T> buffering(BodyProcessor<T>
downstream, long buffersize) {}

This returns a new processor which delivers data to the supplied
downstream processor, buffered
by the 'buffersize' parameter. It guarantees that all data is delivered
in chunks of that size
until the final chunk, which may be smaller.

This should allow other BodyProcessor implementations that require
buffering to wrap themselves
in this way, be guaranteed that the data they receive is buffered, and
then return that composite
processor to their user.

A similar method is added to HttpResponse.BodyHandler.

Note also, that we have changed HttpResponse.BodyProcessor from being a
Flow.Subscriber<ByteBuffer>
to Flow.Subscriber(List<ByteBuffer>). That change is technically
orthogonal to this one, but is motivated
by it. By transferring ByteBuffers in lists makes it easier to buffer
them efficiently.

The webrev is at: http://cr.openjdk.java.net/~michaelm/8184285/webrev.1/

Thanks,
Michael


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: RFR 8184285: Buffer sizes of Flow based BodyProcessor API

Tobias Thierer
Hi Michael -

thanks for your work! The fact that BufferingProcessor wraps another BodyProcessor looks like it could be quite useful/flexible.

Some things I noticed upon a quick glance:
  • BufferingProcessor's documentation cuts out mid-sentence.
  • BufferingProcessor passes modifiable LinkedList instances. Why? Do you want to guarantee that remove-from-front is efficient? If not, consider singleton and array-based unmodifiable instances since they tend to be faster, more memory efficient, and don't risk applications starting to accidentally rely on the lists being modifiable? 
  • Unsurprisingly, getBuffersOf() performs a copy via getNBytesFrom() - just a thing to be aware of, but I don't expect it can be avoided.
  • The buffer size is fixed, so one can't change the buffer size dynamically (e.g. in response to bitrate changes in a video player app) or even make the first buffer a different size than the later ones (e.g. file header vs. later chunks). I don't know a good solution to the latter either, since Flow.request(long) documents that the long refers to the number of calls; the former looks like it could be made more flexible in a future version of BufferingProcessor, but is probably okay for now.
  • So far it looks like the new code will interoperate with blocking facades such as PipedResponseStream, which is nice. I hope to verify this empirically next week.
I intend to write more comments later, just wanted to send a quick first impression. Thanks for the promising update!

Tobias

On Thu, Aug 3, 2017 at 10:02 AM, Michael McMahon <[hidden email]> wrote:
Hi,

The HTTP client work is continuing in a new branch of the JDK 10 sandbox forest (http-client-branch),
and here is the first of a number of changes we want to make.

This one is to address the feedback we received where HttpResponse.BodyProcessors would
be easier to implement if there was control over the size of buffers being supplied.

To that end we have added APIs for creating buffered response processors (and handlers)

So, HttpResponse.BodyProcessor has a new static method with the following signature

public static <T> BodyProcessor<T> buffering(BodyProcessor<T> downstream, long buffersize) {}

This returns a new processor which delivers data to the supplied downstream processor, buffered
by the 'buffersize' parameter. It guarantees that all data is delivered in chunks of that size
until the final chunk, which may be smaller.

This should allow other BodyProcessor implementations that require buffering to wrap themselves
in this way, be guaranteed that the data they receive is buffered, and then return that composite
processor to their user.

A similar method is added to HttpResponse.BodyHandler.

Note also, that we have changed HttpResponse.BodyProcessor from being a Flow.Subscriber<ByteBuffer>
to Flow.Subscriber(List<ByteBuffer>). That change is technically orthogonal to this one, but is motivated
by it. By transferring ByteBuffers in lists makes it easier to buffer them efficiently.

The webrev is at: http://cr.openjdk.java.net/~michaelm/8184285/webrev.1/

Thanks,
Michael



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: RFR 8184285: Buffer sizes of Flow based BodyProcessor API

Michael McMahon
Hi Tobias,

On 04/08/2017, 10:22, Tobias Thierer wrote:
Hi Michael -

thanks for your work! The fact that BufferingProcessor wraps another BodyProcessor looks like it could be quite useful/flexible.

Some things I noticed upon a quick glance:
  • BufferingProcessor's documentation cuts out mid-sentence.
That could be just how the unformatted text looks. I've uploaded the HTML apidoc
for the two interfaces to:
http://cr.openjdk.java.net/~michaelm/8184285/apidoc/
which should be easier to read. Links won't work but you can just search for the buffering() method in both docs.
  • BufferingProcessor passes modifiable LinkedList instances. Why? Do you want to guarantee that remove-from-front is efficient? If not, consider singleton and array-based unmodifiable instances since they tend to be faster, more memory efficient, and don't risk applications starting to accidentally rely on the lists being modifiable?
That seems like a good idea.
  • Unsurprisingly, getBuffersOf() performs a copy via getNBytesFrom() - just a thing to be aware of, but I don't expect it can be avoided.
  • The buffer size is fixed, so one can't change the buffer size dynamically (e.g. in response to bitrate changes in a video player app) or even make the first buffer a different size than the later ones (e.g. file header vs. later chunks). I don't know a good solution to the latter either, since Flow.request(long) documents that the long refers to the number of calls; the former looks like it could be made more flexible in a future version of BufferingProcessor, but is probably okay for now.
At the very least it would require defining a new subtype of HttpResponse.BufferingProcessor and it complicates the Flow semantics
a bit, because you could be changing the buffersize while there is outstanding demand at the old buffersize. But, it could be usable
if you stick to doing request(1) all the time. I think we should defer on that for now though.
  • So far it looks like the new code will interoperate with blocking facades such as PipedResponseStream, which is nice. I hope to verify this empirically next week.
I intend to write more comments later, just wanted to send a quick first impression. Thanks for the promising update!

Great thanks for the first impression.

Michael
Tobias

On Thu, Aug 3, 2017 at 10:02 AM, Michael McMahon <[hidden email]> wrote:
Hi,

The HTTP client work is continuing in a new branch of the JDK 10 sandbox forest (http-client-branch),
and here is the first of a number of changes we want to make.

This one is to address the feedback we received where HttpResponse.BodyProcessors would
be easier to implement if there was control over the size of buffers being supplied.

To that end we have added APIs for creating buffered response processors (and handlers)

So, HttpResponse.BodyProcessor has a new static method with the following signature

public static <T> BodyProcessor<T> buffering(BodyProcessor<T> downstream, long buffersize) {}

This returns a new processor which delivers data to the supplied downstream processor, buffered
by the 'buffersize' parameter. It guarantees that all data is delivered in chunks of that size
until the final chunk, which may be smaller.

This should allow other BodyProcessor implementations that require buffering to wrap themselves
in this way, be guaranteed that the data they receive is buffered, and then return that composite
processor to their user.

A similar method is added to HttpResponse.BodyHandler.

Note also, that we have changed HttpResponse.BodyProcessor from being a Flow.Subscriber<ByteBuffer>
to Flow.Subscriber(List<ByteBuffer>). That change is technically orthogonal to this one, but is motivated
by it. By transferring ByteBuffers in lists makes it easier to buffer them efficiently.

The webrev is at: http://cr.openjdk.java.net/~michaelm/8184285/webrev.1/

Thanks,
Michael



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: RFR 8184285: Buffer sizes of Flow based BodyProcessor API

Michael McMahon
In reply to this post by Tobias Thierer


On 04/08/2017, 10:22, Tobias Thierer wrote:
Hi Michael -

thanks for your work! The fact that BufferingProcessor wraps another BodyProcessor looks like it could be quite useful/flexible.

Some things I noticed upon a quick glance:
  • BufferingProcessor's documentation cuts out mid-sentence.
I realise now you are referring to the implementation class. Yes, I'll fix that too.

Thanks
Michael

  • BufferingProcessor passes modifiable LinkedList instances. Why? Do you want to guarantee that remove-from-front is efficient? If not, consider singleton and array-based unmodifiable instances since they tend to be faster, more memory efficient, and don't risk applications starting to accidentally rely on the lists being modifiable? 
  • Unsurprisingly, getBuffersOf() performs a copy via getNBytesFrom() - just a thing to be aware of, but I don't expect it can be avoided.
  • The buffer size is fixed, so one can't change the buffer size dynamically (e.g. in response to bitrate changes in a video player app) or even make the first buffer a different size than the later ones (e.g. file header vs. later chunks). I don't know a good solution to the latter either, since Flow.request(long) documents that the long refers to the number of calls; the former looks like it could be made more flexible in a future version of BufferingProcessor, but is probably okay for now.
  • So far it looks like the new code will interoperate with blocking facades such as PipedResponseStream, which is nice. I hope to verify this empirically next week.
I intend to write more comments later, just wanted to send a quick first impression. Thanks for the promising update!

Tobias

On Thu, Aug 3, 2017 at 10:02 AM, Michael McMahon <[hidden email]> wrote:
Hi,

The HTTP client work is continuing in a new branch of the JDK 10 sandbox forest (http-client-branch),
and here is the first of a number of changes we want to make.

This one is to address the feedback we received where HttpResponse.BodyProcessors would
be easier to implement if there was control over the size of buffers being supplied.

To that end we have added APIs for creating buffered response processors (and handlers)

So, HttpResponse.BodyProcessor has a new static method with the following signature

public static <T> BodyProcessor<T> buffering(BodyProcessor<T> downstream, long buffersize) {}

This returns a new processor which delivers data to the supplied downstream processor, buffered
by the 'buffersize' parameter. It guarantees that all data is delivered in chunks of that size
until the final chunk, which may be smaller.

This should allow other BodyProcessor implementations that require buffering to wrap themselves
in this way, be guaranteed that the data they receive is buffered, and then return that composite
processor to their user.

A similar method is added to HttpResponse.BodyHandler.

Note also, that we have changed HttpResponse.BodyProcessor from being a Flow.Subscriber<ByteBuffer>
to Flow.Subscriber(List<ByteBuffer>). That change is technically orthogonal to this one, but is motivated
by it. By transferring ByteBuffers in lists makes it easier to buffer them efficiently.

The webrev is at: http://cr.openjdk.java.net/~michaelm/8184285/webrev.1/

Thanks,
Michael



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: RFR 8184285: Buffer sizes of Flow based BodyProcessor API

Chris Hegarty
In reply to this post by Michael McMahon
Tobias, Michael,

I did a few rounds of iteration on this, that address Tobias’s comments ( modulo the variable buffer sizing, which we agreed to defer for now unless there is a compelling use case ). With this change, there is zero-copy of data. The composite container, List, is relatively lightweight ( and we need some type for composition anyway ( no need to invent something new )).

http://cr.openjdk.java.net/~chegar/httpBuffering/

I pushed this version to the sandbox, where can get further experience with it [1].

-Chris.

[1] http://hg.openjdk.java.net/jdk10/sandbox/jdk/rev/c243b0e0f9a6

hg clone http://hg.openjdk.java.net/jdk10/sandbox
cd sandbox
bash get_source.sh
bash common/bin/hgforest.sh -s update http-client-branch
# configure, make, etc


> On 4 Aug 2017, at 15:05, Michael McMahon <[hidden email]> wrote:
>
> Hi Tobias,
>
> On 04/08/2017, 10:22, Tobias Thierer wrote:
>> Hi Michael -
>>
>> thanks for your work! The fact that BufferingProcessor wraps another BodyProcessor looks like it could be quite useful/flexible.
>>
>> Some things I noticed upon a quick glance:
>> • BufferingProcessor's documentation cuts out mid-sentence.
> That could be just how the unformatted text looks. I've uploaded the HTML apidoc
> for the two interfaces to:
> http://cr.openjdk.java.net/~michaelm/8184285/apidoc/
> which should be easier to read. Links won't work but you can just search for the buffering() method in both docs.
>> • BufferingProcessor passes modifiable LinkedList instances. Why? Do you want to guarantee that remove-from-front is efficient? If not, consider singleton and array-based unmodifiable instances since they tend to be faster, more memory efficient, and don't risk applications starting to accidentally rely on the lists being modifiable?
> That seems like a good idea.
>> • Unsurprisingly, getBuffersOf() performs a copy via getNBytesFrom() - just a thing to be aware of, but I don't expect it can be avoided.
>> • The buffer size is fixed, so one can't change the buffer size dynamically (e.g. in response to bitrate changes in a video player app) or even make the first buffer a different size than the later ones (e.g. file header vs. later chunks). I don't know a good solution to the latter either, since Flow.request(long) documents that the long refers to the number of calls; the former looks like it could be made more flexible in a future version of BufferingProcessor, but is probably okay for now.
> At the very least it would require defining a new subtype of HttpResponse.BufferingProcessor and it complicates the Flow semantics
> a bit, because you could be changing the buffersize while there is outstanding demand at the old buffersize. But, it could be usable
> if you stick to doing request(1) all the time. I think we should defer on that for now though.
>> • So far it looks like the new code will interoperate with blocking facades such as PipedResponseStream, which is nice. I hope to verify this empirically next week.
>> I intend to write more comments later, just wanted to send a quick first impression. Thanks for the promising update!
>>
> Great thanks for the first impression.
>
> Michael
>> Tobias
>>
>> On Thu, Aug 3, 2017 at 10:02 AM, Michael McMahon <[hidden email]> wrote:
>> Hi,
>>
>> The HTTP client work is continuing in a new branch of the JDK 10 sandbox forest (http-client-branch),
>> and here is the first of a number of changes we want to make.
>>
>> This one is to address the feedback we received where HttpResponse.BodyProcessors would
>> be easier to implement if there was control over the size of buffers being supplied.
>>
>> To that end we have added APIs for creating buffered response processors (and handlers)
>>
>> So, HttpResponse.BodyProcessor has a new static method with the following signature
>>
>> public static <T> BodyProcessor<T> buffering(BodyProcessor<T> downstream, long buffersize) {}
>>
>> This returns a new processor which delivers data to the supplied downstream processor, buffered
>> by the 'buffersize' parameter. It guarantees that all data is delivered in chunks of that size
>> until the final chunk, which may be smaller.
>>
>> This should allow other BodyProcessor implementations that require buffering to wrap themselves
>> in this way, be guaranteed that the data they receive is buffered, and then return that composite
>> processor to their user.
>>
>> A similar method is added to HttpResponse.BodyHandler.
>>
>> Note also, that we have changed HttpResponse.BodyProcessor from being a Flow.Subscriber<ByteBuffer>
>> to Flow.Subscriber(List<ByteBuffer>). That change is technically orthogonal to this one, but is motivated
>> by it. By transferring ByteBuffers in lists makes it easier to buffer them efficiently.
>>
>> The webrev is at: http://cr.openjdk.java.net/~michaelm/8184285/webrev.1/
>>
>> Thanks,
>> Michael
>>
>>
>>

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: RFR 8184285: Buffer sizes of Flow based BodyProcessor API

Tobias Thierer
In reply to this post by Michael McMahon
Hi Michael & Chris -

apologies for the slow follow-up. I couldn't get my IDE (IntelliJ) to accept a locally built OpenJDK 9 into which I had patched your proposed changes ("The selected directory is not a valid home for the JDK"). Not your problem to solve, but it led me to procrastinate a little because I had to run command lines to compile & see failures. Also, note that I had only patched initial your change rather than Chris's later revision, but from quick inspection it looks like that doesn't affect the comments below.

I've run into one limitation with the new List<ByteBuffer> based approach that I wasn't aware of when I wrote my initial reaction. It's not necessarily a deal-breaker (BufferingProcessor is still useful), but I wanted to mention it. I also have an idea that would allow us to go back to ByteBuffer (rather than List<ByteBuffer>) being the unit of data that's passed through the subscription, without losing any flexibility/capability of the API.

===== Limitation of the new API

The two goals that I expected your change to achieve is:
  1. Give an application control over the size of the data chunks that it has to process at a time, and 
  2. Give an application control (lower/upper bound) on how many bytes, as opposed to how many ByteBuffers, are being held in memory.
I only realized today that your change actually only achieves the first goal, but not the second. I also had an idea how the first goal could be achieved without changing the unit of data from ByteBuffer to List<ByteBuffer> (see more below).

The issue with the second goal is that while the new BodyProcessor.buffering() API gives the application control over the size of the ByteBuffers delivered to it, it doesn't give it control over the number of bytes buffered, because it doesn't know how long a List<ByteBuffer> will be delivered to it on each call to onNext(). 

Note that both before and after your proposed change, an application can achieve the state where a request for more data is issued immediately when the number of bytes buffered drops below a lower bound; it just can't stop the system from giving it more data than it'd like (upper bound). This is true both before and after your change.

For example, I've adjusted my example of a PipedResponseStream to a new API. The way I've implemented the lower bound is:
  • Previously, it started with subscription.request(initialBuffersToRequest) and followed that up with subscription.request(1) each time a buffer was cleared from the queue.
  • I now changed it to keep track of whether there is currently a request outstanding (the time between subscription.request(1) and the corresponding onNext()). Everytime there is (a) no request outstanding, and (b) buffers.size < numByteBuffersToBuffer, a new subscription.request(1) is made. This can happen at three different times: 1.) during onSubscribe(), 2.) when on request completes in onNext() but we discover that we still have too little data, and 3.) during take() when a ByteBuffer is taken out of the internal buffer. Of course, if the latency between onRequest(1) and onNext() is too high, it could be that the internal buffer runs out and we don't keep up refilling it.
I can ask Martin to upload the latest code to his workspace if you like, but I suspect you get the idea.

===== How to change back to the old API without losing the new capability

I think it's not actually necessary to change from ByteBuffer to List<ByteBuffer> in order to achieve goal (1.), i.e. the fixed ByteBuffer size delivered by BufferingProcessor.

Here's how:
  • BufferingProcessor keeps a Queue<ByteBuffer> internally, instead of passing the whole List to the delegate BodyProcessor.
  • When the delegate calls request(n), BufferingProcessor calls onNext(ByteBuffer) n times, supplying ByteBuffers from its internal Queue.
  • When the internal queue size gets small, BufferingProcessor calls request(1) (or request() with some value > 1) on its subscription to get more data to feed into its Queue<ByteBuffer>.
Notes:
  • Obviously, BufferingProcessor will run the risk of running out of data buffered internally if the delegate processor is requesting the data too quickly. And obviously, the BufferingProcessor has a hard time deciding on the correct number n to pass to request(n) on its subscription. But, because BufferingProcessor is part of the HTTP Client implementation, it is in a much better position than the application to know about internal buffer sizes, have some heuristic to determine n based on that ration between that internal buffer size and the chunk size requested by the delegate BodyProcessor, and potentially (in a sophistication improvement) measure throughput / latency to make a guess as to how much more data it should request how early on.
  • Also obviously, the application would have no control over how much extra data will be used up by the BufferingProcessor. But that's not too different from how the application currently has no control over how many extra ByteBuffers will be delivered by the system during onNext(List<ByteBuffer>). And again, BufferingProessor is in a better position to deal with this than the application because it can hard code more knowledge about the rest of the implementation.
  • By going back from List<ByteBuffer> to ByteBuffer, everyone who implements BodyProcessor is saved the onerous task of doing the "for (BodyProcessor item : items) { ... }" loop in onNext().
Thoughts?

Tobias

On Thu, Aug 3, 2017 at 6:02 PM, Michael McMahon <[hidden email]> wrote:
Hi,

The HTTP client work is continuing in a new branch of the JDK 10 sandbox forest (http-client-branch),
and here is the first of a number of changes we want to make.

This one is to address the feedback we received where HttpResponse.BodyProcessors would
be easier to implement if there was control over the size of buffers being supplied.

To that end we have added APIs for creating buffered response processors (and handlers)

So, HttpResponse.BodyProcessor has a new static method with the following signature

public static <T> BodyProcessor<T> buffering(BodyProcessor<T> downstream, long buffersize) {}

This returns a new processor which delivers data to the supplied downstream processor, buffered
by the 'buffersize' parameter. It guarantees that all data is delivered in chunks of that size
until the final chunk, which may be smaller.

This should allow other BodyProcessor implementations that require buffering to wrap themselves
in this way, be guaranteed that the data they receive is buffered, and then return that composite
processor to their user.

A similar method is added to HttpResponse.BodyHandler.

Note also, that we have changed HttpResponse.BodyProcessor from being a Flow.Subscriber<ByteBuffer>
to Flow.Subscriber(List<ByteBuffer>). That change is technically orthogonal to this one, but is motivated
by it. By transferring ByteBuffers in lists makes it easier to buffer them efficiently.

The webrev is at: http://cr.openjdk.java.net/~michaelm/8184285/webrev.1/

Thanks,
Michael



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: RFR 8184285: Buffer sizes of Flow based BodyProcessor API

Chris Hegarty
Tobias,

Thank you for trying this out and giving such a detailed reply. Comments inline ...

On 18 Aug 2017, at 18:45, Tobias Thierer [hidden email] wrote:

Hi Michael & Chris -

apologies for the slow follow-up. I couldn't get my IDE (IntelliJ) to accept a locally built OpenJDK 9 into which I had patched your proposed changes ("The selected directory is not a valid home for the JDK"). Not your problem to solve, but it led me to procrastinate a little because I had to run command lines to compile & see failures. Also, note that I had only patched initial your change rather than Chris's later revision, but from quick inspection it looks like that doesn't affect the comments below.

I've run into one limitation with the new List<ByteBuffer> based approach that I wasn't aware of when I wrote my initial reaction. It's not necessarily a deal-breaker (BufferingProcessor is still useful), but I wanted to mention it. I also have an idea that would allow us to go back to ByteBuffer (rather than List<ByteBuffer>) being the unit of data that's passed through the subscription, without losing any flexibility/capability of the API.

===== Limitation of the new API

The two goals that I expected your change to achieve is:
• Give an application control over the size of the data chunks that it has to process at a time, and 
• Give an application control (lower/upper bound) on how many bytes, as opposed to how many ByteBuffers, are being held in memory.
I only realized today that your change actually only achieves the first goal, but not the second. I also had an idea how the first goal could be achieved without changing the unit of data from ByteBuffer to List<ByteBuffer> (see more below).

I do not agree with this, or your goals. You seem to be equating ByteBuffers with data chunks, and that is not always the case. When reading HTTP/2 frames it is not practical, or even possible, to try to have a one-to-one association of frames to ByteBuffers. Sizing ByteBuffers to a given size does not translate well to a fixed size unit of body data delivery. At least not without copying, which we want to avoid baking into the API.

The buffering processor has just one goal: ensure that the downstream processor is invoked with a predetermined / fixed number of bytes each time its onNext method is called.

From the HTTP Client’s point of view, there should be no excessive ByteBuffers being held in memory. From the perspective of Flow, once an item ( a list of ByteBuffers ) is passed to onNext, control of said item is also passed. If the processor stuffs the buffers into some container for later use, then yes they may be held there for some period of time, but that is up to the processor itself. Clearly buffering processor does do this, but then again that’s the point of it.

In the case of HTTP/2, its own flow control ensures an upper bound on the amount of data that can be sent, hence potentially buffered. The HTTP/1.1 implementation should ensure that "reasonably" sized ByteBuffers are used. I don't think that a tuning nob is required to bubble up to the API level for this. How would a Java developer know how best to size the internal ByteBuffers used by the client implementation when making a request to a HTTP/2 server that may send multiple push promises ( i.e. have may streams )? This is not something that we want Java developers to have to think about. The implementation should have some reasonable defaults.

The issue with the second goal is that while the new BodyProcessor.buffering() API gives the application control over the size of the ByteBuffers delivered to it, it doesn't give it control over the number of bytes buffered, because it doesn't know how long a List<ByteBuffer> will be delivered to it on each call to onNext(). 

The new BodyProcessor.buffering() does give control over the number of bytes. It guarantees that the List<ByteBuffer> will contain N number of bytes each time the downstream onNext is invoked. The tests assert this. Internally, the buffering processor cannot grow more than the size of one ByteBuffer, whose size is determined by the default client implementation.

Note: List<ByteBuffer> is used as the Flow since there is no composite ByteBuffer available ( or constructible ) as things stand in the Java Platform. In many cases the BB’s are heap BB’s so relatively small wrappers around byte[]’s that offer positional constraints.  Using a combination of both allows an implementation to minimise, or eliminate, the need to copy the data as it flows through.

Note that both before and after your proposed change, an application can achieve the state where a request for more data is issued immediately when the number of bytes buffered drops below a lower bound; it just can't stop the system from giving it more data than it'd like (upper bound). This is true both before and after your change.

Not true. The new buffering processor will put an upper bound on the NUMBER OF BYTES being passed to the downstream processor.   As for back pressure, that can be achieved through the Flow API, by the processor not requesting more data. I do accept that the buffering processor cannot limit the max amount of data received, but I do not see that as a problem given my comments about reasonable defaults above.

[ Note: the internal implementation of some of the convenience request processors eagerly read ALL data and put it into a queue ( no flow control ). There are changes coming to address this. ]

For example, I've adjusted my example of a PipedResponseStream to a new API. The way I've implemented the lower bound is:
• Previously, it started with subscription.request(initialBuffersToRequest) and followed that up with subscription.request(1) each time a buffer was cleared from the queue.
• I now changed it to keep track of whether there is currently a request outstanding (the time between subscription.request(1) and the corresponding onNext()). Everytime there is (a) no request outstanding, and (b) buffers.size < numByteBuffersToBuffer, a new subscription.request(1) is made. This can happen at three different times: 1.) during onSubscribe(), 2.) when on request completes in onNext() but we discover that we still have too little data, and 3.) during take() when a ByteBuffer is taken out of the internal buffer. Of course, if the latency between onRequest(1) and onNext() is too high, it could be that the internal buffer runs out and we don't keep up refilling it.
I can ask Martin to upload the latest code to his workspace if you like, but I suspect you get the idea.
Your description is sufficient, no need to upload.

===== How to change back to the old API without losing the new capability

I think it's not actually necessary to change from ByteBuffer to List<ByteBuffer> in order to achieve goal (1.), i.e. the fixed ByteBuffer size delivered by BufferingProcessor.

Given my comment above, with HTTP/2 the client cannot guarantee that a single channel read will result in a ByteBuffer that contains only data from a single frame. If you accept this, then maybe we can you can stop reading here, otherwise I'll try to reply to your comments / suggestions.

Here's how:
• BufferingProcessor keeps a Queue<ByteBuffer> internally, instead of passing the whole List to the delegate BodyProcessor.
• When the delegate calls request(n), BufferingProcessor calls onNext(ByteBuffer) n times, supplying ByteBuffers from its internal Queue.

So the downstream processor may get less than N number of bytes in each onNext call. I'm not sure how that helps. But maybe you are not concerned about this anymore.

• When the internal queue size gets small, BufferingProcessor calls request(1) (or request() with some value > 1) on its subscription to get more data to feed into its Queue<ByteBuffer>.
Notes:
• Obviously, BufferingProcessor will run the risk of running out of data buffered internally if the delegate processor is requesting the data too quickly. And obviously, the BufferingProcessor has a hard time deciding on the correct number n to pass to request(n) on its subscription. But, because BufferingProcessor is part of the HTTP Client implementation, it is in a much better position than the application to know about internal buffer sizes, have some heuristic to determine n based on that ration between that internal buffer size and the chunk size requested by the delegate BodyProcessor, and potentially (in a sophistication improvement) measure throughput / latency to make a guess as to how much more data it should request how early on.
• Also obviously, the application would have no control over how much extra data will be used up by the BufferingProcessor. But that's not too different from how the application currently has no control over how many extra ByteBuffers will be delivered by the system during onNext(List<ByteBuffer>).

Sure, but without a composite ByteBuffer, if we want to guaranteed onNext is delivered N bytes, we cannot avoid a sequence-of-ByteBuffers ( without copying ).

And again, BufferingProessor is in a better position to deal with this than the application because it can hard code more knowledge about the rest of the implementation.
Well, it should just use the processor API, but I see that you are suggesting that it could have a closer relationship with the client implementation.

• By going back from List<ByteBuffer> to ByteBuffer, everyone who implements BodyProcessor is saved the onerous task of doing the "for (BodyProcessor item : items) { ... }" loop in onNext().
I do not accept that dealing with List<ByteBuffer> is onerous. Java developers are already familiar with the scatter / gather pattern. This is not introducing a new concept. The code to deal with ByteBuffer already needs to handle the case where the data is incomplete, all that is changing here is a straightforward foreach loop.

Thoughts?

Speaking generally, your concern now seems to have moved to how much memory will be consumed by the client, maybe per request. This different from what was said earlier, but I suggest that in general the implementation should not use/buffer any more data than can fit in a single ByteBuffer per HTTP1.1 request/reply or HTTP/2 stream. And that the client choose a reasonable default size ( maybe configuration by a system property or something ( not at the API level ) ).

-Chris.

Tobias

On Thu, Aug 3, 2017 at 6:02 PM, Michael McMahon [hidden email] wrote:
Hi,

The HTTP client work is continuing in a new branch of the JDK 10 sandbox forest (http-client-branch),
and here is the first of a number of changes we want to make.

This one is to address the feedback we received where HttpResponse.BodyProcessors would
be easier to implement if there was control over the size of buffers being supplied.

To that end we have added APIs for creating buffered response processors (and handlers)

So, HttpResponse.BodyProcessor has a new static method with the following signature

public static <T> BodyProcessor<T> buffering(BodyProcessor<T> downstream, long buffersize) {}

This returns a new processor which delivers data to the supplied downstream processor, buffered
by the 'buffersize' parameter. It guarantees that all data is delivered in chunks of that size
until the final chunk, which may be smaller.

This should allow other BodyProcessor implementations that require buffering to wrap themselves
in this way, be guaranteed that the data they receive is buffered, and then return that composite
processor to their user.

A similar method is added to HttpResponse.BodyHandler.

Note also, that we have changed HttpResponse.BodyProcessor from being a Flow.Subscriber<ByteBuffer>
to Flow.Subscriber(List<ByteBuffer>). That change is technically orthogonal to this one, but is motivated
by it. By transferring ByteBuffers in lists makes it easier to buffer them efficiently.

The webrev is at: http://cr.openjdk.java.net/~michaelm/8184285/webrev.1/

Thanks,
Michael




Loading...