Skip to content

Commit 8cb2993

Browse files
authored
GH-10624: PartitionedChannel: customize worker queue size (#10647)
Fixes: #10624 * Expose `workerQueueSize` from the `PartitionedDispatcher` and respective delegates from the `PartitionedChannel` and `PartitionedChannelSpec` **Auto-cherry-pick to `6.5.x` & `6.4.x`**
1 parent 45a519d commit 8cb2993

File tree

4 files changed

+80
-5
lines changed

4 files changed

+80
-5
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
* The {@code partitionKeyFunction} is used to determine to which partition the message
3737
* has to be dispatched.
3838
* By default, the {@link IntegrationMessageHeaderAccessor#CORRELATION_ID} message header is used
39-
* for partition key.
39+
* for a partition key.
4040
* <p>
4141
* The actual dispatching and threading logic is implemented in the {@link PartitionedDispatcher}.
4242
* <p>
@@ -71,7 +71,7 @@ public PartitionedChannel(int partitionCount) {
7171
}
7272

7373
/**
74-
* Instantiate based on a provided number of partitions and function for partition key against
74+
* Instantiate based on a provided number of partitions and function for a partition key against
7575
* the message.
7676
* @param partitionCount the number of partitions in this channel.
7777
* @param partitionKeyFunction the function to resolve a partition key against the message
@@ -123,6 +123,16 @@ public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalanci
123123
getDispatcher().setLoadBalancingStrategy(loadBalancingStrategy);
124124
}
125125

126+
/**
127+
* Provide a size of the queue in the partition executor's worker.
128+
* Default to zero.
129+
* @param workerQueueSize the size of the partition executor's worker queue.
130+
* @since 6.4.10
131+
*/
132+
public void setWorkerQueueSize(int workerQueueSize) {
133+
getDispatcher().setWorkerQueueSize(workerQueueSize);
134+
}
135+
126136
@Override
127137
protected PartitionedDispatcher getDispatcher() {
128138
return (PartitionedDispatcher) this.dispatcher;

spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,22 @@
1919
import java.util.ArrayList;
2020
import java.util.List;
2121
import java.util.Set;
22+
import java.util.concurrent.BlockingQueue;
2223
import java.util.concurrent.Executor;
2324
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.Executors;
25+
import java.util.concurrent.LinkedBlockingQueue;
26+
import java.util.concurrent.SynchronousQueue;
2527
import java.util.concurrent.ThreadFactory;
28+
import java.util.concurrent.ThreadPoolExecutor;
29+
import java.util.concurrent.TimeUnit;
2630
import java.util.concurrent.locks.Lock;
2731
import java.util.concurrent.locks.ReentrantLock;
2832
import java.util.function.Function;
2933
import java.util.function.Predicate;
3034

3135
import org.jspecify.annotations.Nullable;
3236

37+
import org.springframework.integration.util.CallerBlocksPolicy;
3338
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
3439
import org.springframework.messaging.Message;
3540
import org.springframework.messaging.MessageHandler;
@@ -78,6 +83,8 @@ public class PartitionedDispatcher extends AbstractDispatcher {
7883

7984
private final Lock lock = new ReentrantLock();
8085

86+
private int workerQueueSize;
87+
8188
/**
8289
* Instantiate based on a provided number of partitions and function for a partition key against
8390
* the message to dispatch.
@@ -153,6 +160,17 @@ public void setMessageHandlingTaskDecorator(MessageHandlingTaskDecorator message
153160
this.messageHandlingTaskDecorator = messageHandlingTaskDecorator;
154161
}
155162

163+
/**
164+
* Provide a size of the queue in the partition executor's worker.
165+
* Default to zero.
166+
* @param workerQueueSize the size of the partition executor's worker queue.
167+
* @since 6.4.10
168+
*/
169+
public void setWorkerQueueSize(int workerQueueSize) {
170+
Assert.isTrue(workerQueueSize >= 0, "'workerQueueSize' must be greater than or equal to 0.");
171+
this.workerQueueSize = workerQueueSize;
172+
}
173+
156174
/**
157175
* Shutdown this dispatcher on application close.
158176
* The partition executors are shutdown and the internal state of this instance is cleared.
@@ -188,7 +206,16 @@ private void populatedPartitions() {
188206
}
189207

190208
private UnicastingDispatcher newPartition() {
191-
ExecutorService executor = Executors.newSingleThreadExecutor(this.threadFactory);
209+
BlockingQueue<Runnable> workQueue =
210+
this.workerQueueSize == 0
211+
? new SynchronousQueue<>()
212+
: new LinkedBlockingQueue<>(this.workerQueueSize);
213+
ExecutorService executor =
214+
new ThreadPoolExecutor(1, 1,
215+
0L, TimeUnit.MILLISECONDS,
216+
workQueue,
217+
this.threadFactory,
218+
new CallerBlocksPolicy(Long.MAX_VALUE));
192219
this.executors.add(executor);
193220

194221
Executor effectiveExecutor = this.errorHandler != null

spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class PartitionedChannelSpec extends LoadBalancingChannelSpec<Partitioned
3939

4040
private @Nullable ThreadFactory threadFactory;
4141

42+
private int workerQueueSize;
43+
4244
protected PartitionedChannelSpec(int partitionCount) {
4345
this.partitionCount = partitionCount;
4446
}
@@ -53,6 +55,18 @@ public PartitionedChannelSpec threadFactory(ThreadFactory threadFactory) {
5355
return this;
5456
}
5557

58+
/**
59+
* Provide a size of the queue in the partition executor's worker.
60+
* Default to zero.
61+
* @param workerQueueSize the size of the partition executor's worker queue.
62+
* @return the spec.
63+
* @since 6.4.10
64+
*/
65+
public PartitionedChannelSpec workerQueueSize(int workerQueueSize) {
66+
this.workerQueueSize = workerQueueSize;
67+
return this;
68+
}
69+
5670
@Override
5771
protected PartitionedChannel doGet() {
5872
if (this.partitionKeyFunction != null) {
@@ -62,6 +76,7 @@ protected PartitionedChannel doGet() {
6276
this.channel = new PartitionedChannel(this.partitionCount);
6377
}
6478
this.channel.setLoadBalancingStrategy(this.loadBalancingStrategy);
79+
this.channel.setWorkerQueueSize(this.workerQueueSize);
6580
if (this.failoverStrategy != null) {
6681
this.channel.setFailoverStrategy(this.failoverStrategy);
6782
}

spring-integration-core/src/test/java/org/springframework/integration/channel/PartitionedChannelTests.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,13 @@
1818

1919
import java.util.Collection;
2020
import java.util.HashSet;
21+
import java.util.List;
2122
import java.util.Set;
23+
import java.util.concurrent.BlockingQueue;
2224
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.LinkedBlockingQueue;
26+
import java.util.concurrent.SynchronousQueue;
27+
import java.util.concurrent.ThreadPoolExecutor;
2328
import java.util.concurrent.TimeUnit;
2429
import java.util.concurrent.locks.Lock;
2530
import java.util.concurrent.locks.ReentrantLock;
@@ -36,6 +41,7 @@
3641
import org.springframework.integration.config.EnableIntegration;
3742
import org.springframework.integration.dsl.IntegrationFlow;
3843
import org.springframework.integration.support.MessageBuilder;
44+
import org.springframework.integration.test.util.TestUtils;
3945
import org.springframework.messaging.Message;
4046
import org.springframework.messaging.MessageChannel;
4147
import org.springframework.messaging.MessageHandler;
@@ -48,6 +54,7 @@
4854
import org.springframework.util.MultiValueMap;
4955

5056
import static org.assertj.core.api.Assertions.assertThat;
57+
import static org.assertj.core.api.InstanceOfAssertFactories.type;
5158
import static org.mockito.Mockito.mock;
5259

5360
/**
@@ -128,6 +135,11 @@ public void afterMessageHandled(Message<?> message, MessageChannel ch, MessageHa
128135
String partitionForLastMessage = partitionedMessages.keySet().iterator().next();
129136
assertThat(partitionForLastMessage).isIn(allocatedPartitions);
130137

138+
List<?> partitionExecutors = TestUtils.getPropertyValue(partitionedChannel, "dispatcher.executors", List.class);
139+
BlockingQueue<?> workQueue = ((ThreadPoolExecutor) partitionExecutors.get(0)).getQueue();
140+
141+
assertThat(workQueue).isInstanceOf(SynchronousQueue.class);
142+
131143
partitionedChannel.destroy();
132144
}
133145

@@ -138,6 +150,9 @@ public void afterMessageHandled(Message<?> message, MessageChannel ch, MessageHa
138150
@Autowired
139151
PollableChannel resultChannel;
140152

153+
@Autowired
154+
PartitionedChannel testChannel;
155+
141156
@Test
142157
void messagesArePartitionedByCorrelationId() {
143158
this.inputChannel.send(new GenericMessage<>(IntStream.range(0, 5).toArray()));
@@ -153,6 +168,14 @@ void messagesArePartitionedByCorrelationId() {
153168
Set<String> strings = new HashSet<>((Collection<? extends String>) receive.getPayload());
154169
assertThat(strings).hasSize(1)
155170
.allMatch(value -> value.startsWith("testChannel-partition-thread-"));
171+
172+
List<?> partitionExecutors = TestUtils.getPropertyValue(this.testChannel, "dispatcher.executors", List.class);
173+
BlockingQueue<?> workQueue = ((ThreadPoolExecutor) partitionExecutors.get(0)).getQueue();
174+
175+
assertThat(workQueue)
176+
.asInstanceOf(type(LinkedBlockingQueue.class))
177+
.extracting(LinkedBlockingQueue::remainingCapacity)
178+
.isEqualTo(1);
156179
}
157180

158181
@Configuration
@@ -163,7 +186,7 @@ public static class TestConfiguration {
163186
IntegrationFlow someFlow() {
164187
return f -> f
165188
.split()
166-
.channel(c -> c.partitioned("testChannel", 10))
189+
.channel(c -> c.partitioned("testChannel", 10).workerQueueSize(1))
167190
.transform(p -> Thread.currentThread().getName())
168191
.aggregate()
169192
.channel(c -> c.queue("resultChannel"));

0 commit comments

Comments
 (0)