Skip to content

Commit a1db3a1

Browse files
committed
Allow scheduling of CF completion in parallel
1 parent e5e5bf5 commit a1db3a1

3 files changed

Lines changed: 65 additions & 18 deletions

File tree

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -325,40 +325,52 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object>
325325
stats.incrementBatchLoadCountBy(keys.size(), new IncrementBatchLoadCountByStatisticsContext<>(keys, callContexts));
326326
CompletableFuture<List<V>> batchLoad = invokeLoader(keys, callContexts, queuedFutures, loaderOptions.cachingEnabled());
327327
return batchLoad
328-
.thenApply(values -> {
328+
.thenCompose(values -> {
329329
assertResultSize(keys, values);
330330
if (isPublisher() || isMappedPublisher()) {
331331
// We have already completed the queued futures by the time the overall batchLoad future has completed.
332-
return values;
332+
return CompletableFuture.completedFuture(values);
333333
}
334334

335335
List<K> clearCacheKeys = new ArrayList<>();
336+
var batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
337+
CompletableFuture<Void>[] scheduledCompletions = new CompletableFuture[keys.size()];
336338
for (int idx = 0; idx < queuedFutures.size(); idx++) {
337339
K key = keys.get(idx);
338340
V value = values.get(idx);
339341
Object callContext = callContexts.get(idx);
340342
CompletableFuture<V> future = queuedFutures.get(idx);
341-
if (value instanceof Throwable) {
342-
stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext));
343-
future.completeExceptionally((Throwable) value);
344-
clearCacheKeys.add(keys.get(idx));
345-
} else if (value instanceof Try) {
346-
// we allow the batch loader to return a Try so we can better represent a computation
347-
// that might have worked or not.
348-
Try<V> tryValue = (Try<V>) value;
349-
if (tryValue.isSuccess()) {
350-
future.complete(tryValue.get());
351-
} else {
343+
Runnable completeValueRunnable = () -> {
344+
if (value instanceof Throwable) {
352345
stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext));
353-
future.completeExceptionally(tryValue.getThrowable());
354-
clearCacheKeys.add(keys.get(idx));
346+
future.completeExceptionally((Throwable) value);
347+
clearCacheKeys.add(key);
348+
} else if (value instanceof Try) {
349+
// we allow the batch loader to return a Try so we can better represent a computation
350+
// that might have worked or not.
351+
Try<V> tryValue = (Try<V>) value;
352+
if (tryValue.isSuccess()) {
353+
future.complete(tryValue.get());
354+
} else {
355+
stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext));
356+
future.completeExceptionally(tryValue.getThrowable());
357+
clearCacheKeys.add(key);
358+
}
359+
} else {
360+
future.complete(value);
355361
}
362+
};
363+
if(batchLoaderScheduler != null) {
364+
scheduledCompletions[idx] = batchLoaderScheduler.scheduleCompletion(completeValueRunnable, key, value);
356365
} else {
357-
future.complete(value);
366+
scheduledCompletions[idx] = CompletableFutureKit.run(completeValueRunnable);
358367
}
359368
}
360-
possiblyClearCacheEntriesOnExceptions(clearCacheKeys);
361-
return values;
369+
// Wait for all completions to return
370+
return allOf(scheduledCompletions).thenApply(ignored -> {
371+
possiblyClearCacheEntriesOnExceptions(clearCacheKeys);
372+
return values;
373+
});
362374
}).exceptionally(ex -> {
363375
stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts));
364376
if (ex instanceof CompletionException) {

src/main/java/org/dataloader/impl/CompletableFutureKit.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,20 @@ public static <K, V> CompletableFuture<Map<K, V>> allOf(Map<K, CompletableFuture
6767
)
6868
);
6969
}
70+
71+
/**
72+
* Runs the given {@link Runnable} synchronously on the current thread, returning a
73+
* {@link CompletableFuture} that is completed normally or exceptionally based on the outcome.
74+
*
75+
* @param runnable the task to execute
76+
* @return a completed future, or a failed future if the runnable throws
77+
*/
78+
public static CompletableFuture<Void> run(Runnable runnable) {
79+
try {
80+
runnable.run();
81+
return CompletableFuture.completedFuture(null);
82+
} catch (Exception e) {
83+
return CompletableFuture.failedFuture(e);
84+
}
85+
}
7086
}

src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
import org.dataloader.MappedBatchLoader;
88
import org.dataloader.MappedBatchPublisher;
99
import org.dataloader.BatchPublisher;
10+
import org.dataloader.impl.CompletableFutureKit;
1011

1112
import java.util.List;
1213
import java.util.Map;
14+
import java.util.concurrent.CompletableFuture;
1315
import java.util.concurrent.CompletionStage;
1416

1517
/**
@@ -92,4 +94,21 @@ interface ScheduledBatchPublisherCall {
9294
* @param <K> the key type
9395
*/
9496
<K> void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List<K> keys, BatchLoaderEnvironment environment);
97+
98+
/**
99+
* Schedules the completion of a {@link DataLoader} value's {@link java.util.concurrent.CompletableFuture} after a batch load.
100+
* <p>
101+
* Override this to offload completions to another thread, avoiding slow chained work
102+
* (e.g. {@code thenApply}) from executing inline on the dispatch thread.
103+
* <p>
104+
* By default, completions run synchronously on the current thread, sequentially for each key in the batch.
105+
*
106+
* @param completeValueRunnable the runnable that completes the value
107+
* @param key the key being completed
108+
* @param value the value returned by the batch function for this key
109+
* @return a {@link CompletionStage} representing the scheduled work
110+
*/
111+
default <K, V> CompletableFuture<Void> scheduleCompletion(Runnable completeValueRunnable, K key, V value) {
112+
return CompletableFutureKit.run(completeValueRunnable);
113+
}
95114
}

0 commit comments

Comments
 (0)