Skip to content

Commit e04c76d

Browse files
committed
Allow BatchLoaderScheduler process all CF completions as a batch and potentially in parallel
1 parent 8eadcb2 commit e04c76d

6 files changed

Lines changed: 102 additions & 28 deletions

File tree

README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -655,15 +655,17 @@ Perhaps you want this completion to happen more asynchronously so that the `.dis
655655
quickly and is not bound to the `.doSomethingSlow(v)` call.
656656

657657
By default, the dispatch completion is done on the current thread in a synchronous manner, which will include
658-
any extra `CompletableFuture` chained methods.
658+
any extra `CompletableFuture` chained methods. If multiple keys were loaded, the dispatch completion is done
659+
sequentially.
659660

660661
This is an example of running the completion step in an asynchronous manner :
661662

662663
```java
663664

664665
@Override
665-
public <K> CompletionStage<Void> scheduleCompletion(Runnable completeValuesRunnable, List<K> keys, BatchLoaderEnvironment environment) {
666-
return CompletableFuture.runAsync(completeValuesRunnable);
666+
public <K> CompletionStage<Void> scheduleCompletion(List<Runnable> completeValueRunnables, List<K> keys, BatchLoaderEnvironment environment) {
667+
return CompletableFuture.allOf(completeValueRunnables.stream()
668+
.map(CompletableFuture::runAsync).toArray(CompletableFuture[]::new));
667669
}
668670
```
669671

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Instant;
2020
import java.util.ArrayList;
2121
import java.util.Arrays;
22+
import java.util.Collection;
2223
import java.util.LinkedHashSet;
2324
import java.util.List;
2425
import java.util.Map;
@@ -27,6 +28,7 @@
2728
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.CompletionException;
2930
import java.util.concurrent.CompletionStage;
31+
import java.util.concurrent.ConcurrentLinkedQueue;
3032
import java.util.concurrent.atomic.AtomicReference;
3133

3234
import static java.util.Collections.emptyList;
@@ -329,17 +331,18 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object>
329331
return CompletableFutureKit.success(values);
330332
}
331333

332-
Runnable completeValuesRunnable = () -> {
333-
List<K> clearCacheKeys = new ArrayList<>();
334-
for (int idx = 0; idx < queuedFutures.size(); idx++) {
335-
K key = keys.get(idx);
336-
V value = values.get(idx);
337-
Object callContext = keyContexts.get(idx);
338-
CompletableFuture<V> future = queuedFutures.get(idx);
334+
Collection<K> clearCacheKeys = new ConcurrentLinkedQueue<>();
335+
List<Runnable> completeValueRunnables = new ArrayList<>();
336+
for (int idx = 0; idx < queuedFutures.size(); idx++) {
337+
K key = keys.get(idx);
338+
V value = values.get(idx);
339+
Object callContext = keyContexts.get(idx);
340+
CompletableFuture<V> future = queuedFutures.get(idx);
341+
Runnable completeValueRunnable = () -> {
339342
if (value instanceof Throwable) {
340343
stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext));
341344
future.completeExceptionally((Throwable) value);
342-
clearCacheKeys.add(keys.get(idx));
345+
clearCacheKeys.add(key);
343346
} else if (value instanceof Try) {
344347
// we allow the batch loader to return a Try so we can better represent a computation
345348
// that might have worked or not.
@@ -349,16 +352,18 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object>
349352
} else {
350353
stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext));
351354
future.completeExceptionally(tryValue.getThrowable());
352-
clearCacheKeys.add(keys.get(idx));
355+
clearCacheKeys.add(key);
353356
}
354357
} else {
355358
future.complete(value);
356359
}
357-
}
360+
};
361+
completeValueRunnables.add(completeValueRunnable);
362+
}
363+
return scheduleCompletion(environment, keys, values, completeValueRunnables).thenApply(ignored -> {
358364
possiblyClearCacheEntriesOnExceptions(clearCacheKeys);
359-
};
360-
361-
return scheduleCompletion(environment, keys, values, completeValuesRunnable);
365+
return values;
366+
});
362367
}).exceptionally(ex -> {
363368
stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, keyContexts));
364369
if (ex instanceof CompletionException) {
@@ -375,14 +380,14 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object>
375380
});
376381
}
377382

378-
private CompletableFuture<List<V>> scheduleCompletion(BatchLoaderEnvironment environment, List<K> keys, List<V> values, Runnable completeValuesRunnable) {
383+
private CompletableFuture<List<V>> scheduleCompletion(BatchLoaderEnvironment environment, List<K> keys, List<V> values, List<Runnable> completeValueRunnables) {
379384
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
380385
CompletionStage<?> scheduledCompletion;
381386
if (batchLoaderScheduler != null) {
382387
scheduledCompletion = batchLoaderScheduler
383-
.scheduleCompletion(completeValuesRunnable, keys, environment);
388+
.scheduleCompletion(completeValueRunnables, keys, environment);
384389
} else {
385-
scheduledCompletion = CompletableFutureKit.run(completeValuesRunnable);
390+
scheduledCompletion = CompletableFutureKit.runAll(completeValueRunnables);
386391
}
387392
return scheduledCompletion
388393
.thenApply(ignored -> values)
@@ -400,7 +405,7 @@ private void assertResultSize(List<K> keys, List<V> values) {
400405
assertState(keys.size() == values.size(), () -> "The size of the promised values MUST be the same size as the key list");
401406
}
402407

403-
private void possiblyClearCacheEntriesOnExceptions(List<K> keys) {
408+
private void possiblyClearCacheEntriesOnExceptions(Collection<K> keys) {
404409
if (keys.isEmpty()) {
405410
return;
406411
}

src/main/java/org/dataloader/impl/CompletableFutureKit.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ public static <T> CompletableFuture<List<T>> allOfFlatMap(List<CompletableFuture
8484
.collect(toList()));
8585
}
8686

87+
/**
88+
* Runs the given {@link Runnable} synchronously on the current thread, returning a
89+
* {@link CompletableFuture} that is completed normally or exceptionally based on the outcome.
90+
*
91+
* @param runnable the task to execute
92+
* @return a completed future, or a failed future if the runnable throws
93+
*/
8794
public static CompletableFuture<Void> run(Runnable runnable) {
8895
try {
8996
runnable.run();
@@ -92,4 +99,16 @@ public static CompletableFuture<Void> run(Runnable runnable) {
9299
return CompletableFutureKit.failedFuture(e);
93100
}
94101
}
102+
103+
/**
104+
* Runs all the {@link Runnable} from the list synchronously on the current thread, returning a
105+
* {@link CompletableFuture} that is completed normally or exceptionally based on the outcome.
106+
*
107+
* @param runnables the list of tasks to execute
108+
* @return a completed future, or a failed future if any of the tasks throws
109+
*/
110+
public static CompletableFuture<Void> runAll(List<Runnable> runnables) {
111+
return CompletableFuture.allOf(runnables.stream().map(CompletableFutureKit::run).toArray(CompletableFuture[]::new));
112+
}
113+
95114
}

src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,15 @@ interface ScheduledBatchPublisherCall {
121121
* By default, the dispatch completion is done on the current thread in a synchronous manner, which will include
122122
* any extra {@link java.util.concurrent.CompletableFuture} dependent chained methods.
123123
*
124-
* @param completeValuesRunnable this is the runnable that the {@link DataLoader} engine code needs to be run
124+
* @param completeValueRunnables these are the runnable tasks that the {@link DataLoader} engine code needs to be run
125125
* @param keys this is the list of keys that will be passed to the {@link BatchPublisher}.
126126
* This is provided only for informative reasons and, you can't change the keys that are used
127127
* @param environment this is the {@link BatchLoaderEnvironment} in place,
128128
*
129129
* @return a {@link CompletionStage} representing this work is being scheduled
130130
*/
131-
default <K> CompletionStage<Void> scheduleCompletion(Runnable completeValuesRunnable, List<K> keys, BatchLoaderEnvironment environment) {
132-
return CompletableFutureKit.run(completeValuesRunnable);
131+
default <K> CompletionStage<Void> scheduleCompletion(List<Runnable> completeValueRunnables, List<K> keys, BatchLoaderEnvironment environment) {
132+
return CompletableFutureKit.runAll(completeValueRunnables);
133133
}
134134

135135
}

src/test/java/org/dataloader/impl/CompletableFutureKitTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,31 @@ void run() {
9999

100100

101101
}
102-
}
102+
103+
@Test
104+
void runAll() {
105+
AtomicBoolean ran1 = new AtomicBoolean(false);
106+
AtomicBoolean ran2 = new AtomicBoolean(false);
107+
Runnable runnable1 = () -> ran1.set(true);
108+
Runnable runnable2 = () -> ran2.set(true);
109+
110+
CompletableFuture<?> runCF = CompletableFutureKit.runAll(List.of(runnable1, runnable2));
111+
runCF.join();
112+
assertThat(ran1.get(), equalTo(true));
113+
assertThat(ran2.get(), equalTo(true));
114+
115+
ran1.set(false);
116+
ran2.set(false);
117+
Runnable runnable3 = () -> {
118+
throw new RuntimeException("BANG");
119+
};
120+
121+
runCF = CompletableFutureKit.runAll(List.of(runnable1, runnable2, runnable3));
122+
123+
CompletionException completionException = assertThrows(CompletionException.class, runCF::join);
124+
assertThat(ran1.get(), equalTo(true));
125+
assertThat(ran2.get(), equalTo(true));
126+
assertThat(completionException.getCause().getMessage(), equalTo("BANG"));
127+
}
128+
129+
}

src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.dataloader.BatchLoaderEnvironment;
44
import org.dataloader.DataLoader;
55
import org.dataloader.DataLoaderOptions;
6+
import org.dataloader.impl.CompletableFutureKit;
67
import org.junit.jupiter.api.Test;
78

89
import java.util.List;
@@ -11,6 +12,7 @@
1112
import java.util.concurrent.CompletionStage;
1213
import java.util.concurrent.atomic.AtomicBoolean;
1314
import java.util.function.Function;
15+
import java.util.stream.Collectors;
1416

1517
import static org.awaitility.Awaitility.await;
1618
import static org.dataloader.DataLoaderFactory.newDataLoader;
@@ -188,6 +190,7 @@ public <K> void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall
188190
void can_schedule_cf_completion() {
189191

190192
AtomicBoolean useThreading = new AtomicBoolean(false);
193+
AtomicBoolean parallelCompletion = new AtomicBoolean(false);
191194
BatchLoaderScheduler scheduler = new BatchLoaderScheduler() {
192195
@Override
193196
public <K, V> CompletionStage<List<V>> scheduleBatchLoader(ScheduledBatchLoaderCall<V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
@@ -205,12 +208,17 @@ public <K> void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall
205208
}
206209

207210
@Override
208-
public <K> CompletionStage<Void> scheduleCompletion(Runnable completeValuesRunnable, List<K> keys, BatchLoaderEnvironment environment) {
211+
public <K> CompletionStage<Void> scheduleCompletion(List<Runnable> completeValueRunnables, List<K> keys, BatchLoaderEnvironment environment) {
209212
if (useThreading.get()) {
210213
snooze(500);
211-
return CompletableFuture.runAsync(completeValuesRunnable);
214+
if (!parallelCompletion.get()) {
215+
return CompletableFutureKit.runAll(completeValueRunnables);
216+
} else {
217+
return CompletableFuture.allOf(completeValueRunnables.stream()
218+
.map(CompletableFuture::runAsync).toArray(CompletableFuture[]::new));
219+
}
212220
} else {
213-
return BatchLoaderScheduler.super.scheduleCompletion(completeValuesRunnable, keys, environment);
221+
return BatchLoaderScheduler.super.scheduleCompletion(completeValueRunnables, keys, environment);
214222
}
215223
}
216224
};
@@ -227,7 +235,20 @@ public <K> CompletionStage<Void> scheduleCompletion(Runnable completeValuesRunna
227235
assertThat(cf1.join(), equalTo(1));
228236
assertThat(cf2.join(), equalTo(2));
229237

230-
// switch mode to threading mdoe
238+
// switch mode to threading mode
239+
240+
useThreading.set(true);
241+
242+
cf1 = identityLoader.load(10);
243+
cf2 = identityLoader.load(20);
244+
dispatchCF = identityLoader.dispatch();
245+
246+
247+
await().until(dispatchCF::isDone);
248+
assertThat(cf1.join(), equalTo(10));
249+
assertThat(cf2.join(), equalTo(20));
250+
251+
// switch mode to parallel execution
231252

232253
useThreading.set(true);
233254

0 commit comments

Comments
 (0)