Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Commit 7633780

Browse files
committed
add retries to pagerank parallel tasks
1 parent 1c8baa1 commit 7633780

File tree

5 files changed

+21
-7
lines changed

5 files changed

+21
-7
lines changed

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/EigenvectorCentralityComputeStep.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,11 @@ void normalizeDeltas() {
100100
}
101101
}
102102

103+
@Override
104+
public String toString() {
105+
return "EigenvectorCentralityComputeStep{" +
106+
"partitionSize=" + partitionSize +
107+
", startNode=" + startNode +
108+
'}';
109+
}
103110
}

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRank.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import java.util.*;
3333
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.TimeUnit;
3435
import java.util.stream.LongStream;
3536

3637

@@ -353,15 +354,15 @@ private void run(int iterations) {
353354
ParallelUtil.runWithConcurrency(concurrency, steps, pool);
354355
for (int i = 0; i < iterations && running(); i++) {
355356
// calculate scores
356-
ParallelUtil.runWithConcurrency(concurrency, steps, pool);
357+
ParallelUtil.runWithConcurrency(concurrency, steps, 3, 1, TimeUnit.SECONDS, pool);
357358

358359
// sync scores
359360
synchronizeScores();
360-
ParallelUtil.runWithConcurrency(concurrency, steps, pool);
361+
ParallelUtil.runWithConcurrency(concurrency, steps, 3, 1, TimeUnit.SECONDS, pool);
361362

362363
// normalize deltas
363364
normalizeDeltas();
364-
ParallelUtil.runWithConcurrency(concurrency, steps, pool);
365+
ParallelUtil.runWithConcurrency(concurrency, steps, 3, 1, TimeUnit.SECONDS, pool);
365366
}
366367
}
367368

core/src/main/java/org/neo4j/graphalgo/core/utils/ParallelUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,9 @@ private static void runWithConcurrency(
705705
while (ts.hasNext()) {
706706
if (completionService.hasTasks()) {
707707
try {
708+
System.out.println("[ParallelUtil#runWithConcurrency] waiting for task to finish... " + executor);
708709
completionService.awaitNext();
710+
System.out.println("[ParallelUtil#runWithConcurrency] task finished... " + executor);
709711
} catch (ExecutionException e) {
710712
error = Exceptions.chain(error, e.getCause());
711713
} catch (CancellationException ignore) {
@@ -882,6 +884,7 @@ protected void done() {
882884
if (executor instanceof ThreadPoolExecutor) {
883885
pool = (ThreadPoolExecutor) executor;
884886
availableConcurrency = pool.getCorePoolSize();
887+
// availableConcurrency = 2;
885888
int capacity = Math.max(targetConcurrency, availableConcurrency) + 1;
886889
completionQueue = new ArrayBlockingQueue<>(capacity);
887890
} else {

core/src/main/java/org/neo4j/graphalgo/core/utils/Pools.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class Pools {
4242
DEFAULT_CONCURRENCY = definedProcessors;
4343
} else {
4444
DEFAULT_CONCURRENCY = Runtime.getRuntime().availableProcessors();
45+
// DEFAULT_CONCURRENCY = 2;
4546
}
4647
}
4748

tests/src/test/java/org/neo4j/graphalgo/algo/EigenvectorCentralityProcIntegrationTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,12 @@ public void testWriteBackUnderDifferentProperty() throws Exception {
219219
@Test
220220
public void testParallelWriteBack() throws Exception {
221221
runQuery(
222-
"CALL algo.eigenvector('Character', 'INTERACTS_SEASON1', {batchSize:3, write:true, graph:'"+graphImpl+"', direction: 'BOTH'}) YIELD writeMillis, write, writeProperty",
223-
row -> assertTrue(
224-
"write time not set",
225-
row.getNumber("writeMillis").intValue() >= 0));
222+
"CALL algo.eigenvector('Character', 'INTERACTS_SEASON1', {batchSize:3, concurrency:2, write:true, graph:'"+graphImpl+"', direction: 'BOTH'}) YIELD writeMillis, write, writeProperty, iterations",
223+
row -> {
224+
assertTrue(
225+
"write time not set",
226+
row.getNumber("writeMillis").intValue() >= 0);
227+
});
226228

227229
assertResult("eigenvector", expected);
228230
}

0 commit comments

Comments
 (0)