diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 5c4b5b397..820700281 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -35,9 +35,7 @@ import java.util.Deque; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -141,7 +139,6 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } private EndpointDetails endpointDetails; private boolean goAwayReceived; - private final Map priorities = new ConcurrentHashMap<>(); private volatile boolean peerNoRfc7540Priorities; @@ -1020,6 +1017,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PRIORITY_UPDATE payload"); } final int prioritizedId = payload.getInt() & 0x7fffffff; + if (prioritizedId == 0) { + throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "PRIORITY_UPDATE stream id is 0"); + } final int len = payload.remaining(); final String field; if (len > 0) { @@ -1029,9 +1029,13 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio } else { field = ""; } - final PriorityValue pv = PriorityParamsParser.parse(field).toValueWithDefaults(); - priorities.put(prioritizedId, pv); - requestSessionOutput(); + final PriorityValue pv = parsePriorityValue(field); + if (pv != null) { + final H2Stream prioritizedStream = streams.lookup(prioritizedId); + if (prioritizedStream != null) { + prioritizedStream.setPriorityValue(pv); + } + } } break; } @@ -1106,7 +1110,7 @@ private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) thr if (streamListener != null) { streamListener.onHeaderInput(this, streamId, headers); } - recordPriorityFromHeaders(streamId, headers); + recordPriorityFromHeaders(stream, headers); stream.consumeHeader(headers, frame.isFlagSet(FrameFlag.END_STREAM)); } else { continuation.copyPayload(payload); @@ -1125,7 +1129,7 @@ private void consumeContinuationFrame(final RawFrame frame, final H2Stream strea if (streamListener != null) { streamListener.onHeaderInput(this, streamId, headers); } - recordPriorityFromHeaders(streamId, headers); + recordPriorityFromHeaders(stream, headers); if (continuation.type == FrameType.PUSH_PROMISE.getValue()) { stream.consumePromise(headers); } else { @@ -1378,19 +1382,43 @@ H2Stream createStream(final H2StreamChannel channel, final H2StreamHandler strea return stream; } - private void recordPriorityFromHeaders(final int streamId, final List headers) { + private void recordPriorityFromHeaders(final H2Stream stream, final List headers) { if (headers == null || headers.isEmpty()) { return; } for (final Header h : headers) { if (HttpHeaders.PRIORITY.equalsIgnoreCase(h.getName())) { - final PriorityValue pv = PriorityParamsParser.parse(h.getValue()).toValueWithDefaults(); - priorities.put(streamId, pv); + final PriorityValue pv = parsePriorityValue(h); + if (pv != null) { + stream.setPriorityValue(pv); + } break; } } } + private PriorityValue parsePriorityValue(final Header header) { + if (header == null) { + return null; + } + try { + return PriorityParamsParser.parse(header).toValueWithDefaults(); + } catch (final IllegalArgumentException ignore) { + return null; + } + } + + private PriorityValue parsePriorityValue(final String field) { + if (field == null) { + return null; + } + try { + return PriorityParamsParser.parse(field).toValueWithDefaults(); + } catch (final IllegalArgumentException ignore) { + return null; + } + } + class H2StreamChannelImpl implements H2StreamChannel { private final int id; @@ -1438,18 +1466,21 @@ public void submit(final List
headers, final boolean endStream) throws I return; } ensureNotClosed(); - if (peerNoRfc7540Priorities && streams.isSameSide(id)) { + if (peerNoRfc7540Priorities) { for (final Header h : headers) { if (HttpHeaders.PRIORITY.equalsIgnoreCase(h.getName())) { final byte[] ascii = h.getValue() != null ? h.getValue().getBytes(StandardCharsets.US_ASCII) : new byte[0]; + + final int sid = id & 0x7fffffff; final ByteArrayBuffer b = new ByteArrayBuffer(4 + ascii.length); - b.append((byte) (id >> 24)); - b.append((byte) (id >> 16)); - b.append((byte) (id >> 8)); - b.append((byte) id); + b.append((byte) (sid >> 24)); + b.append((byte) (sid >> 16)); + b.append((byte) (sid >> 8)); + b.append((byte) sid); b.append(ascii, 0, ascii.length); + final ByteBuffer pl = ByteBuffer.wrap(b.array(), 0, b.length()); final RawFrame priUpd = new RawFrame(FrameType.PRIORITY_UPDATE.getValue(), 0, 0, pl); commitFrameInternal(priUpd); diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java index c409e352e..404360dce 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java @@ -45,6 +45,7 @@ import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http2.H2Error; import org.apache.hc.core5.http2.H2StreamResetException; +import org.apache.hc.core5.http2.priority.PriorityValue; import org.apache.hc.core5.util.Timeout; class H2Stream implements StreamControl { @@ -64,6 +65,7 @@ class H2Stream implements StreamControl { private volatile long lastActivityNanos; private volatile Timeout idleTimeout; + private volatile PriorityValue priorityValue; H2Stream(final H2StreamChannel channel, final H2StreamHandler handler, final Consumer stateChangeCallback) { this.channel = channel; @@ -313,7 +315,7 @@ public String toString() { buf.append("[") .append("id=").append(channel.getId()) .append(", reserved=").append(reserved) - .append(", removeClosed=").append(remoteClosed) + .append(", remoteClosed=").append(remoteClosed) .append(", localClosed=").append(channel.isLocalClosed()) .append(", localReset=").append(channel.isLocalReset()) .append("]"); @@ -332,4 +334,12 @@ Timeout getIdleTimeout() { return idleTimeout; } -} + PriorityValue getPriorityValue() { + return priorityValue; + } + + void setPriorityValue(final PriorityValue priorityValue) { + this.priorityValue = priorityValue; + } + +} \ No newline at end of file