diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java index 89b538860d..abd3f48254 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/config/H2Config.java @@ -47,19 +47,21 @@ public class H2Config { private final boolean pushEnabled; private final int maxConcurrentStreams; private final int initialWindowSize; + private final int connectionWindowSize; private final int maxFrameSize; private final int maxHeaderListSize; private final boolean compressionEnabled; private final int maxContinuations; H2Config(final int headerTableSize, final boolean pushEnabled, final int maxConcurrentStreams, - final int initialWindowSize, final int maxFrameSize, final int maxHeaderListSize, - final boolean compressionEnabled, final int maxContinuations) { + final int initialWindowSize, final int connectionWindowSize, final int maxFrameSize, + final int maxHeaderListSize, final boolean compressionEnabled, final int maxContinuations) { super(); this.headerTableSize = headerTableSize; this.pushEnabled = pushEnabled; this.maxConcurrentStreams = maxConcurrentStreams; this.initialWindowSize = initialWindowSize; + this.connectionWindowSize = connectionWindowSize; this.maxFrameSize = maxFrameSize; this.maxHeaderListSize = maxHeaderListSize; this.compressionEnabled = compressionEnabled; @@ -82,6 +84,16 @@ public int getInitialWindowSize() { return initialWindowSize; } + /** + * Returns the connection-level receive window size. This controls the flow-control + * window for the entire connection as opposed to individual streams. + * + * @since 5.5 + */ + public int getConnectionWindowSize() { + return connectionWindowSize; + } + public int getMaxFrameSize() { return maxFrameSize; } @@ -105,6 +117,7 @@ public String toString() { .append(", pushEnabled=").append(this.pushEnabled) .append(", maxConcurrentStreams=").append(this.maxConcurrentStreams) .append(", initialWindowSize=").append(this.initialWindowSize) + .append(", connectionWindowSize=").append(this.connectionWindowSize) .append(", maxFrameSize=").append(this.maxFrameSize) .append(", maxHeaderListSize=").append(this.maxHeaderListSize) .append(", compressionEnabled=").append(this.compressionEnabled) @@ -130,6 +143,7 @@ public static H2Config.Builder initial() { .setMaxConcurrentStreams(Integer.MAX_VALUE) // no limit .setMaxFrameSize(INIT_MAX_FRAME_SIZE) .setInitialWindowSize(INIT_WINDOW_SIZE) + .setConnectionWindowSize(INIT_WINDOW_SIZE) .setMaxHeaderListSize(Integer.MAX_VALUE); // unlimited } @@ -140,6 +154,7 @@ public static H2Config.Builder copy(final H2Config config) { .setPushEnabled(config.isPushEnabled()) .setMaxConcurrentStreams(config.getMaxConcurrentStreams()) .setInitialWindowSize(config.getInitialWindowSize()) + .setConnectionWindowSize(config.getConnectionWindowSize()) .setMaxFrameSize(config.getMaxFrameSize()) .setMaxHeaderListSize(config.getMaxHeaderListSize()) .setCompressionEnabled(config.isCompressionEnabled()); @@ -151,6 +166,7 @@ public static class Builder { private boolean pushEnabled; private int maxConcurrentStreams; private int initialWindowSize; + private int connectionWindowSize; private int maxFrameSize; private int maxHeaderListSize; private boolean compressionEnabled; @@ -161,6 +177,7 @@ public static class Builder { this.pushEnabled = INIT_ENABLE_PUSH; this.maxConcurrentStreams = INIT_CONCURRENT_STREAM; this.initialWindowSize = INIT_WINDOW_SIZE; + this.connectionWindowSize = Integer.MAX_VALUE; this.maxFrameSize = FrameConsts.MIN_FRAME_SIZE * 4; this.maxHeaderListSize = FrameConsts.MAX_FRAME_SIZE; this.compressionEnabled = true; @@ -188,6 +205,19 @@ public Builder setInitialWindowSize(final int initialWindowSize) { return this; } + /** + * Sets the connection-level receive window size. This controls the flow-control + * window for the entire connection as opposed to individual streams governed by + * {@link #setInitialWindowSize(int)}. + * + * @since 5.5 + */ + public Builder setConnectionWindowSize(final int connectionWindowSize) { + this.connectionWindowSize = Args.checkRange(connectionWindowSize, INIT_WINDOW_SIZE, + Integer.MAX_VALUE, "Connection window size"); + return this; + } + public Builder setMaxFrameSize(final int maxFrameSize) { this.maxFrameSize = Args.checkRange(maxFrameSize, FrameConsts.MIN_FRAME_SIZE, FrameConsts.MAX_FRAME_SIZE, "Invalid max frame size"); @@ -222,6 +252,7 @@ public H2Config build() { pushEnabled, maxConcurrentStreams, initialWindowSize, + connectionWindowSize, maxFrameSize, maxHeaderListSize, compressionEnabled, diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 8e9171b57e..1113b24af2 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -104,7 +104,7 @@ abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection { - private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; + private static final int DEFAULT_CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN } enum SettingsHandshake { READY, TRANSMITTED, ACKED } @@ -132,6 +132,8 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } private SettingsHandshake localSettingState = SettingsHandshake.READY; private SettingsHandshake remoteSettingState = SettingsHandshake.READY; + private final int connWindowSize; + private final long connWindowLowMark; private int initInputWinSize; private int initOutputWinSize; private int lowMark; @@ -178,6 +180,10 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor"); this.streams = new H2Streams(idGenerator); this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT; + this.connWindowSize = this.localConfig.getConnectionWindowSize(); + this.connWindowLowMark = this.connWindowSize == Integer.MAX_VALUE + ? DEFAULT_CONNECTION_WINDOW_LOW_MARK + : this.connWindowSize / 2; this.inputMetrics = new BasicH2TransportMetrics(); this.outputMetrics = new BasicH2TransportMetrics(); this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics); @@ -258,11 +264,15 @@ private int updateWindow(final AtomicInteger window, final int delta) throws Ari } } - private int updateWindowMax(final AtomicInteger window) throws ArithmeticException { + private int updateWindowTo(final AtomicInteger window, final int target) throws ArithmeticException { for (;;) { final int current = window.get(); - if (window.compareAndSet(current, Integer.MAX_VALUE)) { - return Integer.MAX_VALUE - current; + final int delta = target - current; + if (delta <= 0) { + return 0; + } + if (window.compareAndSet(current, target)) { + return delta; } } } @@ -446,7 +456,7 @@ public final void onConnect() throws HttpException, IOException { commitFrame(settingsFrame); localSettingState = SettingsHandshake.TRANSMITTED; - maximizeWindow(0, connInputWindow); + replenishWindow(0, connInputWindow, connWindowSize); if (streamListener != null) { final int initInputWindow = connInputWindow.get(); @@ -1119,15 +1129,15 @@ private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throw stream.produceInputCapacityUpdate(); } final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength); - if (connWinSize < CONNECTION_WINDOW_LOW_MARK) { - maximizeWindow(0, connInputWindow); + if (connWinSize < connWindowLowMark) { + replenishWindow(0, connInputWindow, connWindowSize); } } stream.consumeData(payload, frame.isFlagSet(FrameFlag.END_STREAM)); } - private void maximizeWindow(final int streamId, final AtomicInteger window) throws IOException { - final int delta = updateWindowMax(window); + private void replenishWindow(final int streamId, final AtomicInteger window, final int target) throws IOException { + final int delta = updateWindowTo(window, target); if (delta > 0) { final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, delta); commitFrame(windowUpdateFrame); @@ -1594,7 +1604,10 @@ public void push(final List
headers, final AsyncPushProducer pushProduce @Override public void update(final int increment) throws IOException { - incrementInputCapacity(0, connInputWindow, increment); + final int connCeiling = connWindowSize - connInputWindow.get(); + if (connCeiling > 0) { + incrementInputCapacity(0, connInputWindow, Math.min(increment, connCeiling)); + } incrementInputCapacity(id, inputWindow, increment); } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/config/H2ConfigTest.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/config/H2ConfigTest.java index a6a2859d5f..73c535bd5b 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/config/H2ConfigTest.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/config/H2ConfigTest.java @@ -37,28 +37,32 @@ class H2ConfigTest { @Test - void builder() { - // Create and start requester - final H2Config h2Config = H2Config.custom() - .setPushEnabled(false) - .build(); + void defaults() { + final H2Config h2Config = H2Config.custom().build(); assertNotNull(h2Config); + assertEquals(65535, h2Config.getInitialWindowSize()); + assertEquals(Integer.MAX_VALUE, h2Config.getConnectionWindowSize()); } @Test void checkValues() { - // Create and start requester final H2Config h2Config = H2Config.custom() .setHeaderTableSize(1) .setMaxConcurrentStreams(1) + .setInitialWindowSize(131072) + .setConnectionWindowSize(1048576) .setMaxFrameSize(16384) + .setMaxHeaderListSize(4096) .setPushEnabled(true) .setCompressionEnabled(true) .build(); assertEquals(1, h2Config.getHeaderTableSize()); assertEquals(1, h2Config.getMaxConcurrentStreams()); + assertEquals(131072, h2Config.getInitialWindowSize()); + assertEquals(1048576, h2Config.getConnectionWindowSize()); assertEquals(16384, h2Config.getMaxFrameSize()); + assertEquals(4096, h2Config.getMaxHeaderListSize()); assertTrue(h2Config.isPushEnabled()); assertTrue(h2Config.isCompressionEnabled()); } @@ -70,6 +74,7 @@ void copy() { .setHeaderTableSize(1) .setMaxConcurrentStreams(1) .setMaxFrameSize(16384) + .setConnectionWindowSize(2097152) .setPushEnabled(true) .setCompressionEnabled(true) .build(); @@ -80,6 +85,7 @@ void copy() { assertAll( () -> assertEquals(h2Config.getHeaderTableSize(), h2Config2.getHeaderTableSize()), () -> assertEquals(h2Config.getInitialWindowSize(), h2Config2.getInitialWindowSize()), + () -> assertEquals(h2Config.getConnectionWindowSize(), h2Config2.getConnectionWindowSize()), () -> assertEquals(h2Config.getMaxConcurrentStreams(), h2Config2.getMaxConcurrentStreams()), () -> assertEquals(h2Config.getMaxFrameSize(), h2Config2.getMaxFrameSize()), () -> assertEquals(h2Config.getMaxHeaderListSize(), h2Config2.getMaxHeaderListSize()) diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ConnectionWindowSizeExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ConnectionWindowSizeExample.java new file mode 100644 index 0000000000..9a44f7e70a --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ConnectionWindowSizeExample.java @@ -0,0 +1,339 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.http2.examples; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.impl.routing.RequestRouter; +import org.apache.hc.core5.http.message.BasicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncClientEndpoint; +import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; +import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; +import org.apache.hc.core5.http.nio.support.AsyncClientPipeline; +import org.apache.hc.core5.http.nio.support.AsyncServerPipeline; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.frame.RawFrame; +import org.apache.hc.core5.http2.impl.nio.H2StreamListener; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; + +/** + * HTTP/2 client demo illustrating the effect of the connection-level receive + * window on a multiplexed multi-stream exchange while keeping the per-stream + * initial window constant. + * + * @since 5.7 + */ +public final class H2ConnectionWindowSizeExample { + + private static final int STREAM_COUNT = 128; + private static final int RESPONSE_SIZE = 2 * 1024 * 1024; + + private static final int STREAM_WINDOW_SIZE = 1024 * 1024; + private static final int SMALL_CONNECTION_WINDOW = 65_535; + private static final int LARGE_CONNECTION_WINDOW = 16 * 1024 * 1024; + + private static final Timeout CONNECT_TIMEOUT = Timeout.ofSeconds(30); + private static final long COMPLETION_TIMEOUT_MINUTES = 5; + + private H2ConnectionWindowSizeExample() { + } + + public static void main(final String[] args) throws Exception { + final String payload = createPayload(RESPONSE_SIZE); + final ServerHandle serverHandle = startServer(payload); + try { + System.out.println("Listening on " + serverHandle.port); + System.out.println("Streams: " + STREAM_COUNT); + System.out.println("Response size per stream: " + RESPONSE_SIZE + " bytes"); + System.out.println("Per-stream initial window: " + STREAM_WINDOW_SIZE + " bytes"); + System.out.println(); + + final RunResult small = runScenario(serverHandle.port, SMALL_CONNECTION_WINDOW); + final RunResult large = runScenario(serverHandle.port, LARGE_CONNECTION_WINDOW); + + printResult("small connection window", small); + printResult("large connection window", large); + } finally { + serverHandle.server.close(CloseMode.GRACEFUL); + serverHandle.server.awaitShutdown(TimeValue.ofSeconds(5)); + } + } + + private static ServerHandle startServer(final String payload) throws Exception { + final H2Config serverH2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(STREAM_COUNT) + .build(); + + final Supplier exchangeHandlerSupplier = AsyncServerPipeline.assemble() + .request(Method.GET) + .consumeContent(contentType -> DiscardingEntityConsumer::new) + .response() + .asString(ContentType.TEXT_PLAIN) + .handle((request, context) -> Message.of( + new BasicHttpResponse(HttpStatus.SC_OK), + payload)) + .supplier(); + + final HttpAsyncServer server = H2ServerBootstrap.bootstrap() + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setH2Config(serverH2Config) + .setRequestRouter(RequestRouter.>builder() + .addRoute(RequestRouter.LOCAL_AUTHORITY, "*", exchangeHandlerSupplier) + .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) + .build()) + .create(); + + server.start(); + final Future future = server.listen(new InetSocketAddress(0), URIScheme.HTTP); + final ListenerEndpoint listenerEndpoint = future.get(); + final InetSocketAddress address = (InetSocketAddress) listenerEndpoint.getAddress(); + return new ServerHandle(server, address.getPort()); + } + + private static RunResult runScenario(final int port, final int connectionWindowSize) throws Exception { + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setSoTimeout(30, TimeUnit.SECONDS) + .setTcpNoDelay(true) + .build(); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setInitialWindowSize(STREAM_WINDOW_SIZE) + .setConnectionWindowSize(connectionWindowSize) + .setMaxConcurrentStreams(STREAM_COUNT) + .build(); + + final AtomicInteger inputConnFlowControlEvents = new AtomicInteger(); + final AtomicInteger failures = new AtomicInteger(); + + final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setH2Config(h2Config) + .setStreamListener(new H2StreamListener() { + + @Override + public void onHeaderInput( + final HttpConnection connection, + final int streamId, + final List headers) { + } + + @Override + public void onHeaderOutput( + final HttpConnection connection, + final int streamId, + final List headers) { + } + + @Override + public void onFrameInput( + final HttpConnection connection, + final int streamId, + final RawFrame frame) { + } + + @Override + public void onFrameOutput( + final HttpConnection connection, + final int streamId, + final RawFrame frame) { + } + + @Override + public void onInputFlowControl( + final HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + if (streamId == 0) { + inputConnFlowControlEvents.incrementAndGet(); + } + } + + @Override + public void onOutputFlowControl( + final HttpConnection connection, + final int streamId, + final int delta, + final int actualSize) { + } + }) + .create(); + + requester.start(); + try { + final HttpHost target = new HttpHost("http", "localhost", port); + final Future future = requester.connect(target, CONNECT_TIMEOUT); + final AsyncClientEndpoint clientEndpoint = future.get(); + + try { + final CountDownLatch latch = new CountDownLatch(STREAM_COUNT); + final long started = System.nanoTime(); + + for (int i = 0; i < STREAM_COUNT; i++) { + final String requestUri = "/bytes/" + i; + clientEndpoint.execute( + AsyncClientPipeline.assemble() + .request() + .get(target, requestUri) + .response() + .consumeContent(contentType -> DiscardingEntityConsumer::new) + .result(new FutureCallback>() { + + @Override + public void completed(final Message message) { + try { + final HttpResponse response = message.head(); + if (response.getCode() != HttpStatus.SC_OK) { + failures.incrementAndGet(); + System.out.println(requestUri + " -> unexpected status: " + response.getCode()); + } + } finally { + latch.countDown(); + } + } + + @Override + public void failed(final Exception ex) { + failures.incrementAndGet(); + ex.printStackTrace(System.out); + latch.countDown(); + } + + @Override + public void cancelled() { + failures.incrementAndGet(); + System.out.println(requestUri + " cancelled"); + latch.countDown(); + } + }) + .create(), + null, + HttpCoreContext.create()); + } + + final boolean completed = latch.await(COMPLETION_TIMEOUT_MINUTES, TimeUnit.MINUTES); + final long elapsedNanos = System.nanoTime() - started; + + if (!completed) { + throw new IllegalStateException("Timed out waiting for responses"); + } + if (failures.get() > 0) { + throw new IllegalStateException("Scenario failed with " + failures.get() + " error(s)"); + } + + return new RunResult( + connectionWindowSize, + elapsedNanos, + inputConnFlowControlEvents.get()); + } finally { + clientEndpoint.releaseAndDiscard(); + } + } finally { + requester.close(CloseMode.GRACEFUL); + } + } + + private static void printResult(final String label, final RunResult result) { + System.out.println(label + ":"); + System.out.println(" connection window size: " + result.connectionWindowSize + " bytes"); + System.out.printf(" elapsed: %.3f s%n", nanosToSeconds(result.elapsedNanos)); + System.out.println(" stream-0 input flow-control events: " + result.inputConnFlowControlEvents); + System.out.println(); + } + + private static double nanosToSeconds(final long nanos) { + return nanos / 1_000_000_000.0; + } + + private static String createPayload(final int size) { + final StringBuilder buffer = new StringBuilder(size); + while (buffer.length() < size) { + buffer.append("0123456789abcdef"); + } + if (buffer.length() > size) { + buffer.setLength(size); + } + return buffer.toString(); + } + + private static final class ServerHandle { + + private final HttpAsyncServer server; + private final int port; + + private ServerHandle(final HttpAsyncServer server, final int port) { + this.server = server; + this.port = port; + } + } + + private static final class RunResult { + + private final int connectionWindowSize; + private final long elapsedNanos; + private final int inputConnFlowControlEvents; + + private RunResult( + final int connectionWindowSize, + final long elapsedNanos, + final int inputConnFlowControlEvents) { + this.connectionWindowSize = connectionWindowSize; + this.elapsedNanos = elapsedNanos; + this.inputConnFlowControlEvents = inputConnFlowControlEvents; + } + } +} \ No newline at end of file diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 6c9e88e4f1..fd47cef345 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.stream.IntStream; @@ -2017,5 +2018,132 @@ void testHeadersWithPrioritySelfDependencyIsStreamProtocolError() throws Excepti .consumeHeader(ArgumentMatchers.anyList(), ArgumentMatchers.anyBoolean()); } + @Test + void testConnectionWindowSizeOnConnect() throws Exception { + final int configuredWindowSize = 1048576; // 1 MB + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, + H2Config.custom() + .setConnectionWindowSize(configuredWindowSize) + .build(), + h2StreamListener, () -> streamHandler); + + mux.onConnect(); + + final List frames = parseFrames(concat(writes)); + final int expectedDelta = configuredWindowSize - 65535; + final boolean found = frames.stream().anyMatch(f -> + f.type == FrameType.WINDOW_UPDATE.getValue() + && f.streamId == 0 + && f.payload.length == 4 + && ((f.payload[0] & 0xff) << 24 + | (f.payload[1] & 0xff) << 16 + | (f.payload[2] & 0xff) << 8 + | (f.payload[3] & 0xff)) == expectedDelta); + Assertions.assertTrue(found, + "onConnect must send WINDOW_UPDATE(stream=0, delta=" + expectedDelta + ")"); + + final Field connInputWindowField = + AbstractH2StreamMultiplexer.class.getDeclaredField("connInputWindow"); + connInputWindowField.setAccessible(true); + final AtomicInteger connInputWindow = (AtomicInteger) connInputWindowField.get(mux); + Assertions.assertEquals(configuredWindowSize, connInputWindow.get()); + } + + @Test + void testDefaultConnectionWindowSizeMaximizesOnConnect() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, H2Config.DEFAULT, + h2StreamListener, () -> streamHandler); + + mux.onConnect(); + + final Field connInputWindowField = + AbstractH2StreamMultiplexer.class.getDeclaredField("connInputWindow"); + connInputWindowField.setAccessible(true); + final AtomicInteger connInputWindow = (AtomicInteger) connInputWindowField.get(mux); + Assertions.assertEquals(Integer.MAX_VALUE, connInputWindow.get(), + "Default config must maximize connection input window"); + } + + @Test + void testConnectionWindowSizeDoesNotAffectSettingsInitialWindowSize() throws Exception { + final int streamWindowSize = 32768; + final int connWindowSize = 1048576; + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, + H2Config.custom() + .setInitialWindowSize(streamWindowSize) + .setConnectionWindowSize(connWindowSize) + .build(), + h2StreamListener, () -> streamHandler); + + mux.onConnect(); + + final List frames = parseFrames(concat(writes)); + final FrameStub settingsFrame = frames.stream() + .filter(f -> f.type == FrameType.SETTINGS.getValue() + && f.streamId == 0 + && !f.isAck()) + .findFirst() + .orElseThrow(() -> new AssertionError("No SETTINGS frame found")); + + final int initialWindowSizeParamCode = 0x4; + int advertisedWindowSize = -1; + for (int i = 0; i + 5 < settingsFrame.payload.length; i += 6) { + final int id = ((settingsFrame.payload[i] & 0xff) << 8) + | (settingsFrame.payload[i + 1] & 0xff); + final int val = ((settingsFrame.payload[i + 2] & 0xff) << 24) + | ((settingsFrame.payload[i + 3] & 0xff) << 16) + | ((settingsFrame.payload[i + 4] & 0xff) << 8) + | (settingsFrame.payload[i + 5] & 0xff); + if (id == initialWindowSizeParamCode) { + advertisedWindowSize = val; + } + } + + Assertions.assertEquals(streamWindowSize, advertisedWindowSize, + "SETTINGS INITIAL_WINDOW_SIZE must reflect initialWindowSize, not connectionWindowSize"); + } + } \ No newline at end of file