Skip to content

Commit 7897698

Browse files
committed
Add SSE perf client , minimal test server and H2 examples
1 parent d07c8e2 commit 7897698

File tree

3 files changed

+752
-0
lines changed

3 files changed

+752
-0
lines changed
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.sse.example;
28+
29+
import java.net.URI;
30+
import java.util.HashMap;
31+
import java.util.Locale;
32+
import java.util.Map;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.Executor;
35+
import java.util.concurrent.Future;
36+
import java.util.concurrent.ScheduledThreadPoolExecutor;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.atomic.AtomicInteger;
39+
40+
import javax.net.ssl.SSLSession;
41+
42+
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
43+
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
44+
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
45+
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
46+
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
47+
import org.apache.hc.client5.http.config.TlsConfig;
48+
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
49+
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
50+
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
51+
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
52+
import org.apache.hc.client5.http.protocol.HttpClientContext;
53+
import org.apache.hc.client5.http.sse.EventSource;
54+
import org.apache.hc.client5.http.sse.EventSourceConfig;
55+
import org.apache.hc.client5.http.sse.EventSourceListener;
56+
import org.apache.hc.client5.http.sse.SseExecutor;
57+
import org.apache.hc.client5.http.sse.impl.ExponentialJitterBackoff;
58+
import org.apache.hc.client5.http.sse.impl.SseParser;
59+
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
60+
import org.apache.hc.core5.http.HttpHost;
61+
import org.apache.hc.core5.http.HttpVersion;
62+
import org.apache.hc.core5.http.message.StatusLine;
63+
import org.apache.hc.core5.http2.HttpVersionPolicy;
64+
import org.apache.hc.core5.http2.config.H2Config;
65+
import org.apache.hc.core5.reactor.IOReactorConfig;
66+
import org.apache.hc.core5.util.TimeValue;
67+
68+
/**
69+
* HTTP/2 SSE demo.
70+
* <p>
71+
* This example connects to an SSE endpoint using the async transport with HTTP/2 forced via TLS + ALPN,
72+
* probes the negotiated protocol, and then opens multiple SSE subscriptions concurrently to demonstrate
73+
* HTTP/2 stream multiplexing.
74+
* </p>
75+
*
76+
* <h2>Usage</h2>
77+
* <pre>
78+
* ClientSseH2Example [uri] [streamCount] [maxEventsPerStream]
79+
* </pre>
80+
*
81+
* <p>
82+
* Defaults:
83+
* </p>
84+
* <ul>
85+
* <li>{@code uri=https://stream.wikimedia.org/v2/stream/recentchange}</li>
86+
* <li>{@code streamCount=4}</li>
87+
* <li>{@code maxEventsPerStream=25}</li>
88+
* </ul>
89+
*
90+
* <h2>Notes</h2>
91+
* <ul>
92+
* <li>HTTP/2 is enforced with {@link org.apache.hc.core5.http2.HttpVersionPolicy#FORCE_HTTP_2}. If the origin
93+
* cannot negotiate H2, the connection attempt fails (no silent downgrade).</li>
94+
* <li>The probe request prints {@code HTTP/2.0} when ALPN negotiation succeeds.</li>
95+
* <li>With {@code maxConnPerRoute=1}, multiple active SSE subscriptions are only possible with HTTP/2
96+
* multiplexing (each subscription is a separate H2 stream).</li>
97+
* </ul>
98+
*/
99+
public final class ClientSseH2Example {
100+
101+
private static void probeProtocol(final CloseableHttpAsyncClient httpClient, final URI sseUri) throws Exception {
102+
final HttpHost target = new HttpHost(sseUri.getScheme(), sseUri.getHost(), sseUri.getPort());
103+
final HttpClientContext ctx = HttpClientContext.create();
104+
105+
final SimpleHttpRequest req = SimpleRequestBuilder.get()
106+
.setHttpHost(target)
107+
.setPath("/")
108+
.build();
109+
110+
final Future<SimpleHttpResponse> f = httpClient.execute(
111+
SimpleRequestProducer.create(req),
112+
SimpleResponseConsumer.create(),
113+
ctx,
114+
null);
115+
116+
final SimpleHttpResponse resp = f.get(10, TimeUnit.SECONDS);
117+
118+
System.out.println("[probe] " + req + " -> " + new StatusLine(resp));
119+
System.out.println("[probe] negotiated protocol: " + ctx.getProtocolVersion());
120+
121+
final SSLSession sslSession = ctx.getSSLSession();
122+
if (sslSession != null) {
123+
System.out.println("[probe] TLS protocol: " + sslSession.getProtocol());
124+
System.out.println("[probe] TLS cipher: " + sslSession.getCipherSuite());
125+
}
126+
127+
if (!HttpVersion.HTTP_2.equals(ctx.getProtocolVersion())) {
128+
System.out.println("[probe] WARNING: not HTTP/2 (server / proxy downgraded?)");
129+
}
130+
}
131+
132+
public static void main(final String[] args) throws Exception {
133+
final URI uri = URI.create(args.length > 0
134+
? args[0]
135+
: "https://stream.wikimedia.org/v2/stream/recentchange");
136+
137+
final int streamCount = args.length > 1 ? Integer.parseInt(args[1]) : 4;
138+
final int maxEventsPerStream = args.length > 2 ? Integer.parseInt(args[2]) : 25;
139+
140+
final IOReactorConfig ioCfg = IOReactorConfig.custom()
141+
.setIoThreadCount(Math.max(2, Runtime.getRuntime().availableProcessors()))
142+
.setSoKeepAlive(true)
143+
.setTcpNoDelay(true)
144+
.build();
145+
146+
final PoolingAsyncClientConnectionManager connMgr =
147+
PoolingAsyncClientConnectionManagerBuilder.create()
148+
.useSystemProperties()
149+
.setMessageMultiplexing(true)
150+
.setMaxConnPerRoute(1)
151+
.setMaxConnTotal(4)
152+
.setDefaultTlsConfig(TlsConfig.custom()
153+
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
154+
.build())
155+
.build();
156+
157+
final CloseableHttpAsyncClient httpClient = HttpAsyncClientBuilder.create()
158+
.setIOReactorConfig(ioCfg)
159+
.setConnectionManager(connMgr)
160+
.setH2Config(H2Config.custom()
161+
.setPushEnabled(false)
162+
.setMaxConcurrentStreams(Math.max(64, streamCount * 8))
163+
.build())
164+
.useSystemProperties()
165+
.evictExpiredConnections()
166+
.evictIdleConnections(TimeValue.ofMinutes(1))
167+
.build();
168+
169+
httpClient.start();
170+
probeProtocol(httpClient, uri);
171+
172+
final ScheduledThreadPoolExecutor scheduler =
173+
new ScheduledThreadPoolExecutor(2, new DefaultThreadFactory("sse-backoff", true));
174+
scheduler.setRemoveOnCancelPolicy(true);
175+
176+
final Executor callbacks = Runnable::run;
177+
178+
final EventSourceConfig cfg = EventSourceConfig.builder()
179+
.backoff(new ExponentialJitterBackoff(500L, 30_000L, 2.0, 250L))
180+
.maxReconnects(-1)
181+
.build();
182+
183+
final Map<String, String> defaultHeaders = new HashMap<>();
184+
defaultHeaders.put("User-Agent", "Apache-HttpClient-SSE/5.x");
185+
defaultHeaders.put("Accept-Language", "en");
186+
187+
final SseExecutor exec = SseExecutor.custom()
188+
.setHttpClient(httpClient)
189+
.setScheduler(scheduler)
190+
.setCallbackExecutor(callbacks)
191+
.setEventSourceConfig(cfg)
192+
.setDefaultHeaders(defaultHeaders)
193+
.setParserStrategy(SseParser.BYTE)
194+
.build();
195+
196+
final CountDownLatch done = new CountDownLatch(streamCount);
197+
final EventSource[] sources = new EventSource[streamCount];
198+
199+
for (int i = 0; i < streamCount; i++) {
200+
final int idx = i;
201+
final AtomicInteger count = new AtomicInteger(0);
202+
203+
final Map<String, String> headers = new HashMap<>();
204+
headers.put("X-Client-Stream", Integer.toString(idx));
205+
206+
final EventSourceListener listener = new EventSourceListener() {
207+
208+
@Override
209+
public void onOpen() {
210+
System.out.printf(Locale.ROOT, "[SSE/%d] open: %s%n", idx, uri);
211+
}
212+
213+
@Override
214+
public void onEvent(final String id, final String type, final String data) {
215+
final int n = count.incrementAndGet();
216+
final String shortData = data.length() > 120 ? data.substring(0, 120) + "…" : data;
217+
System.out.printf(Locale.ROOT, "[SSE/%d] #%d %s id=%s %s%n",
218+
idx, n, type != null ? type : "message", id, shortData);
219+
220+
if (n >= maxEventsPerStream) {
221+
sources[idx].cancel();
222+
}
223+
}
224+
225+
@Override
226+
public void onClosed() {
227+
System.out.printf(Locale.ROOT, "[SSE/%d] closed%n", idx);
228+
done.countDown();
229+
}
230+
231+
@Override
232+
public void onFailure(final Throwable t, final boolean willReconnect) {
233+
System.err.printf(Locale.ROOT, "[SSE/%d] failure: %s willReconnect=%s%n",
234+
idx, t, willReconnect);
235+
if (!willReconnect) {
236+
done.countDown();
237+
}
238+
}
239+
};
240+
241+
sources[i] = exec.open(uri, headers, listener, cfg, SseParser.BYTE, scheduler, callbacks);
242+
}
243+
244+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
245+
for (final EventSource es : sources) {
246+
if (es != null) {
247+
es.cancel();
248+
}
249+
}
250+
try {
251+
exec.close();
252+
} catch (final Exception ignore) {
253+
}
254+
scheduler.shutdownNow();
255+
}, "sse-shutdown"));
256+
257+
for (final EventSource es : sources) {
258+
es.start();
259+
}
260+
261+
done.await();
262+
263+
for (final EventSource es : sources) {
264+
es.cancel();
265+
}
266+
exec.close();
267+
scheduler.shutdownNow();
268+
}
269+
}

0 commit comments

Comments
 (0)