Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Commit e028080

Browse files
committed
Revert "bump the batzh size - I think the GraphView ComputeSteps are just taking too long"
This reverts commit 9824ffd.
1 parent 9824ffd commit e028080

File tree

4 files changed

+34
-3
lines changed

4 files changed

+34
-3
lines changed

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/EigenvectorCentralityComputeStep.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,19 @@ void synchronizeScores(int[] allScores) {
9393
deltas[i] = delta;
9494
allScores[i] = 0;
9595
}
96+
97+
String name = String.format("[%s,%d,%d, %d, synchronizeScores]", this.getClass().getSimpleName(), this.startNode, this.partitionSize, this.iteration);
98+
System.out.println(name + "\nbefore: " + Arrays.toString(before) + "\nafter: " + Arrays.toString(deltas));
9699
}
97100

98101
@Override
99102
void normalizeDeltas() {
103+
// double[] before = deltas.clone();
100104
for (int i = 0; i < deltas.length; i++) {
101105
deltas[i] = deltas[i] / l2Norm;
102106
}
107+
// String name = String.format("[%s,%d,%d, %d]", this.getClass().getSimpleName(), this.startNode, this.partitionSize, this.iteration);
108+
// System.out.println(name + "\nnorm: " + l2Norm + "\nbefore: " + Arrays.toString(before) + "\nafter: " + Arrays.toString(deltas));
103109
}
104110

105111
}

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/PageRank.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,18 +350,26 @@ CentralityResult getPageRank() {
350350

351351
private void run(int iterations) {
352352
// initialize data structures
353+
System.out.println("[pre iterations] init data structures");
353354
ParallelUtil.runWithConcurrency(concurrency, steps, pool);
354355
for (int iteration = 0; iteration < iterations && running(); iteration++) {
356+
System.out.println("-------");
357+
System.out.println("[iteration started] iteration:" + iteration);
355358
// calculate scores
356359
ParallelUtil.runWithConcurrency(concurrency, steps, pool);
357360

358361
// sync scores
362+
System.out.println("[sync scores] iteration:" + iteration + ", steps:" + steps.size());
359363
synchronizeScores();
360364
ParallelUtil.runWithConcurrency(concurrency, steps, pool);
361365

362366
// normalize deltas
367+
System.out.println("[norm computation] iteration:" + iteration + ", steps:" + steps.size());
363368
normalizeDeltas(iteration);
364369
ParallelUtil.runWithConcurrency(concurrency, steps, pool);
370+
371+
System.out.println("[iteration finished] iteration:" + iteration);
372+
System.out.println("-------");
365373
}
366374
}
367375

@@ -377,11 +385,14 @@ private double computeNorm(int iteration) {
377385
double l2Norm = 0.0;
378386
for (ComputeStep step : steps) {
379387
double[] deltas = step.deltas();
388+
System.out.println("[norm computation] iteration:" + iteration + ", deltas:" + Arrays.toString(deltas));
380389
l2Norm += Arrays.stream(deltas).map(score -> score * score).sum();
381390
}
382391

383392
l2Norm = Math.sqrt(l2Norm);
384393

394+
System.out.println("[norm computation] iteration:" + iteration + ", l2Norm: " + l2Norm );
395+
385396
l2Norm = l2Norm <= 0 ? 1 : l2Norm;
386397
return l2Norm;
387398
}

core/src/main/java/org/neo4j/graphalgo/core/utils/ParallelUtil.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,6 +671,7 @@ private static void runWithConcurrency(
671671
int maxWaitRetries,
672672
TerminationFlag terminationFlag,
673673
ExecutorService executor) {
674+
System.out.println("[ParallelUtil#runWithConcurrency] running with " + tasks.size() + " tasks" + ", and concurrency of " + concurrency);
674675
if (!canRunInParallel(executor)
675676
|| tasks.size() == 1
676677
|| concurrency <= 1) {
@@ -691,6 +692,7 @@ private static void runWithConcurrency(
691692
// generally assumes that tasks.size is notably larger than concurrency
692693
try {
693694
//noinspection StatementWithEmptyBody - add first concurrency tasks
695+
System.out.println("[ParallelUtil#runWithConcurrency] submit tasks");
694696
while (concurrency-- > 0
695697
&& terminationFlag.running()
696698
&& completionService.trySubmit(ts));
@@ -700,6 +702,7 @@ private static void runWithConcurrency(
700702
}
701703

702704

705+
System.out.println("[ParallelUtil#runWithConcurrency] submit more tasks");
703706
// submit all remaining tasks
704707
int tries = 0;
705708
while (ts.hasNext()) {
@@ -716,15 +719,18 @@ private static void runWithConcurrency(
716719
}
717720
if (!completionService.trySubmit(ts) && !completionService.hasTasks()) {
718721
if (++tries >= maxWaitRetries) {
722+
System.out.println("[ParallelUtil#runWithConcurrency] exceeded max wait retriesS");
719723
break;
720724
}
721725
LockSupport.parkNanos(waitNanos);
722726
}
723727
}
724728

729+
System.out.println("[ParallelUtil#runWithConcurrency] wait for tasks");
725730
// wait for all tasks to finish
726731
while (completionService.hasTasks() && terminationFlag.running()) {
727732
try {
733+
System.out.println("[ParallelUtil#runWithConcurrency] waiting for next task");
728734
completionService.awaitNext();
729735
} catch (ExecutionException e) {
730736
error = Exceptions.chain(error, e.getCause());
@@ -736,6 +742,7 @@ private static void runWithConcurrency(
736742
} finally {
737743
finishRunWithConcurrency(completionService, error);
738744
}
745+
System.out.println("[ParallelUtil#runWithConcurrency] finished running with " + tasks.size() + " tasks");
739746
}
740747

741748
private static void finishRunWithConcurrency(
@@ -912,6 +919,7 @@ boolean submit(Runnable task) {
912919
executor.execute(future);
913920
return true;
914921
}
922+
System.out.println("[ParallelUtil#runWithConcurrency] unable to submit task " + task);
915923
return false;
916924
}
917925

@@ -930,7 +938,13 @@ void cancelAll() {
930938

931939
private boolean canSubmit() {
932940
int activeCount = pool.getActiveCount();
933-
return pool == null || activeCount < availableConcurrency;
941+
boolean canSubmit = pool == null || activeCount < availableConcurrency;
942+
943+
if(!canSubmit) {
944+
System.out.println("[ParallelUtil#runWithConcurrency] unable to submit task and pool:" + pool + ", activeCount:" + activeCount + ", availableConcurrency:" + availableConcurrency);
945+
}
946+
947+
return canSubmit;
934948
}
935949

936950
private void stopFutures(Collection<Future<Void>> futures) {

tests/src/test/java/org/neo4j/graphalgo/algo/EigenvectorCentralityProcIntegrationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public void testWriteBackUnderDifferentProperty() throws Exception {
219219
@Test
220220
public void testParallelWriteBack() throws Exception {
221221
runQuery(
222-
"CALL algo.eigenvector('Character', 'INTERACTS_SEASON1', {batchSize: 100, concurrency:2, write:true, graph:'"+graphImpl+"', direction: 'BOTH'}) YIELD writeMillis, write, writeProperty, iterations",
222+
"CALL algo.eigenvector('Character', 'INTERACTS_SEASON1', {batchSize:3, concurrency:2, write:true, graph:'"+graphImpl+"', direction: 'BOTH'}) YIELD writeMillis, write, writeProperty, iterations",
223223
row -> {
224224
assertTrue(
225225
"write time not set",
@@ -232,7 +232,7 @@ public void testParallelWriteBack() throws Exception {
232232
public void testParallelExecution() throws Exception {
233233
final Map<Long, Double> actual = new HashMap<>();
234234
runQuery(
235-
"CALL algo.eigenvector.stream('Character', 'INTERACTS_SEASON1', {batchSize:100, concurrency:2, graph:'"+graphImpl+"', direction: 'BOTH'}) " +
235+
"CALL algo.eigenvector.stream('Character', 'INTERACTS_SEASON1', {batchSize:2, concurrency:2, graph:'"+graphImpl+"', direction: 'BOTH'}) " +
236236
"YIELD nodeId, score " +
237237
"RETURN nodeId, score " +
238238
"ORDER BY score DESC " +

0 commit comments

Comments
 (0)