Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public final class IOReactorConfig {
private final SocketAddress socksProxyAddress;
private final String socksProxyUsername;
private final String socksProxyPassword;
private final int maxCommandsPerSession;

IOReactorConfig(
final TimeValue selectInterval,
Expand All @@ -82,7 +83,8 @@ public final class IOReactorConfig {
final int tcpKeepCount,
final SocketAddress socksProxyAddress,
final String socksProxyUsername,
final String socksProxyPassword) {
final String socksProxyPassword,
final int maxCommandsPerSession) {
super();
this.selectInterval = selectInterval;
this.ioThreadCount = ioThreadCount;
Expand All @@ -101,6 +103,7 @@ public final class IOReactorConfig {
this.socksProxyAddress = socksProxyAddress;
this.socksProxyUsername = socksProxyUsername;
this.socksProxyPassword = socksProxyPassword;
this.maxCommandsPerSession = maxCommandsPerSession;
}

/**
Expand Down Expand Up @@ -240,6 +243,16 @@ public String getSocksProxyPassword() {
return this.socksProxyPassword;
}

/**
* Maximum number of commands that can be enqueued per I/O session.
* A value of {@code 0} means unlimited.
*
* @since 5.5
*/
public int getMaxCommandsPerSession() {
return this.maxCommandsPerSession;
}

public static Builder custom() {
return new Builder();
}
Expand All @@ -262,7 +275,9 @@ public static Builder copy(final IOReactorConfig config) {
.setTcpKeepCount(config.getTcpKeepCount())
.setSocksProxyAddress(config.getSocksProxyAddress())
.setSocksProxyUsername(config.getSocksProxyUsername())
.setSocksProxyPassword(config.getSocksProxyPassword());
.setSocksProxyPassword(config.getSocksProxyPassword())
.setMaxCommandsPerSession(config.getMaxCommandsPerSession());

}

public static class Builder {
Expand Down Expand Up @@ -311,6 +326,7 @@ public static void setDefaultMaxIOThreadCount(final int defaultMaxIOThreadCount)
private SocketAddress socksProxyAddress;
private String socksProxyUsername;
private String socksProxyPassword;
private int maxCommandsPerSession;

Builder() {
this.selectInterval = TimeValue.ofSeconds(1);
Expand All @@ -330,6 +346,7 @@ public static void setDefaultMaxIOThreadCount(final int defaultMaxIOThreadCount)
this.socksProxyAddress = null;
this.socksProxyUsername = null;
this.socksProxyPassword = null;
this.maxCommandsPerSession = 0;
}

/**
Expand Down Expand Up @@ -596,6 +613,17 @@ public Builder setSocksProxyPassword(final String socksProxyPassword) {
return this;
}

/**
* Sets maximum number of commands enqueued per I/O session.
* A value of {@code 0} means unlimited.
*
* @since 5.5
*/
public Builder setMaxCommandsPerSession(final int maxCommandsPerSession) {
this.maxCommandsPerSession = Args.notNegative(maxCommandsPerSession, "Max commands per session");
return this;
}

public IOReactorConfig build() {
return new IOReactorConfig(
selectInterval != null ? selectInterval : TimeValue.ofSeconds(1),
Expand All @@ -608,7 +636,7 @@ public IOReactorConfig build() {
trafficClass,
sndBufSize, rcvBufSize, backlogSize,
tcpKeepIdle, tcpKeepInterval, tcpKeepCount,
socksProxyAddress, socksProxyUsername, socksProxyPassword);
socksProxyAddress, socksProxyUsername, socksProxyPassword, maxCommandsPerSession);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.nio.channels.SocketChannel;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -65,9 +67,11 @@ class IOSessionImpl implements IOSession {
private volatile long lastReadTime;
private volatile long lastWriteTime;
private volatile long lastEventTime;
private final int maxCommandsPerSession;
private final AtomicInteger queuedCommands;

public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel,
final Callback<IOSession> sessionClosedCallback) {
final Callback<IOSession> sessionClosedCallback, final int maxCommandsPerSession) {
super();
this.key = Args.notNull(key, "Selection key");
this.channel = Args.notNull(socketChannel, "Socket channel");
Expand All @@ -82,6 +86,8 @@ public IOSessionImpl(final String type, final SelectionKey key, final SocketChan
this.lastReadTime = currentTimeMillis;
this.lastWriteTime = currentTimeMillis;
this.lastEventTime = currentTimeMillis;
this.maxCommandsPerSession = maxCommandsPerSession;
this.queuedCommands = maxCommandsPerSession > 0 ? new AtomicInteger(0) : null;
}

@Override
Expand All @@ -104,17 +110,47 @@ public Lock getLock() {
return lock;
}

private boolean tryIncrementQueuedCommands(final Command command) {
for (;;) {
final int q = queuedCommands.get();
if (q >= maxCommandsPerSession) {
command.cancel();
return false;
}
if (queuedCommands.compareAndSet(q, q + 1)) {
return true;
}
}
}

@Override
public void enqueue(final Command command, final Command.Priority priority) {
if (priority == Command.Priority.IMMEDIATE) {
commandQueue.addFirst(command);
} else {
commandQueue.add(command);
if (command == null) {
return;
}
if (maxCommandsPerSession > 0 && !tryIncrementQueuedCommands(command)) {
throw new RejectedExecutionException("I/O session command queue limit reached (max=" + maxCommandsPerSession + ")");
}
if (isOpen()) {
if (!isOpen()) {
command.cancel();
if (maxCommandsPerSession > 0) {
queuedCommands.decrementAndGet();
}
return;
}
try {
if (priority == Command.Priority.IMMEDIATE) {
commandQueue.addFirst(command);
} else {
commandQueue.add(command);
}
setEvent(SelectionKey.OP_WRITE);
} else {
} catch (final RuntimeException ex) {
command.cancel();
if (maxCommandsPerSession > 0) {
queuedCommands.decrementAndGet();
}
throw ex;
}
}

Expand All @@ -125,7 +161,11 @@ public boolean hasCommands() {

@Override
public Command poll() {
return commandQueue.poll();
final Command command = commandQueue.poll();
if (command != null && maxCommandsPerSession > 0) {
queuedCommands.decrementAndGet();
}
return command;
}

@Override
Expand Down Expand Up @@ -211,7 +251,7 @@ public void setSocketTimeout(final Timeout timeout) {

@Override
public int read(final ByteBuffer dst) throws IOException {
return this.channel.read(dst);
return this.channel.read(dst);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private void processPendingChannels() throws IOException {
} catch (final ClosedChannelException ex) {
return;
}
final IOSessionImpl ioSession = new IOSessionImpl("a", key, socketChannel, closedSessions::add);
final IOSessionImpl ioSession = new IOSessionImpl("a", key, socketChannel, closedSessions::add, reactorConfig.getMaxCommandsPerSession());
final InternalDataChannel dataChannel = new InternalDataChannel(
ioSession,
null,
Expand Down Expand Up @@ -391,7 +391,7 @@ private void processConnectionRequest(final SocketChannel socketChannel, final I
validateAddress(remoteAddress);
final boolean connected = socketChannel.connect(remoteAddress);
final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
final IOSessionImpl ioSession = new IOSessionImpl("c", key, socketChannel, closedSessions::add);
final IOSessionImpl ioSession = new IOSessionImpl("c", key, socketChannel, closedSessions::add, reactorConfig.getMaxCommandsPerSession());
final InternalDataChannel dataChannel = new InternalDataChannel(
ioSession,
sessionRequest.remoteEndpoint,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.hc.core5.http.examples;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.Timeout;

public class AsyncPipelinedRequestExecutionWithPerSessionCapExample {

public static void main(final String[] args) throws Exception {

final int maxCommandsPerSession = 2;

final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setSoTimeout(5, TimeUnit.SECONDS)
.setMaxCommandsPerSession(maxCommandsPerSession)
.build();

final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
.setIOReactorConfig(ioReactorConfig)
.create();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("HTTP requester shutting down");
requester.close(CloseMode.GRACEFUL);
}));

requester.start();

final HttpHost target = new HttpHost("httpbin.org");
final String[] requestUris = new String[] {"/delay/3?i=0", "/delay/3?i=1", "/delay/3?i=2", "/delay/3?i=3"};

final Future<AsyncClientEndpoint> future = requester.connect(target, Timeout.ofSeconds(5));
final AsyncClientEndpoint clientEndpoint = future.get();

final CountDownLatch latch = new CountDownLatch(requestUris.length);

for (final String requestUri: requestUris) {
try {
clientEndpoint.execute(
AsyncRequestBuilder.get()
.setHttpHost(target)
.setPath(requestUri)
.build(),
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
new FutureCallback<Message<HttpResponse, String>>() {

@Override
public void completed(final Message<HttpResponse, String> message) {
latch.countDown();
final HttpResponse response = message.getHead();
System.out.println(requestUri + "->" + response.getCode());
}

@Override
public void failed(final Exception ex) {
latch.countDown();
System.out.println(requestUri + "->" + ex);
}

@Override
public void cancelled() {
latch.countDown();
System.out.println(requestUri + " cancelled");
}

});
} catch (final RejectedExecutionException ex) {
latch.countDown();
System.out.println(requestUri + "-> rejected: " + ex.getMessage());
}
}

latch.await();

clientEndpoint.releaseAndDiscard();

System.out.println("Shutting down I/O reactor");
requester.initiateShutdown();
}

}
Loading
Loading