Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Commit cc4edad

Browse files
mneedhamMark Needham
authored andcommitted
3.4 cache weights pagerank (#734)
* remove unnecessary weight lookup + introduce DegreeCache abstraction * unused imports * unused comment * Cache weights when calculating weighted degree centrality * Test for weights array * cached weights configurable * benchmarks
1 parent e77d486 commit cc4edad

22 files changed

+506
-120
lines changed

algo/src/main/java/org/neo4j/graphalgo/PageRankProc.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,16 @@ private PageRankResult evaluate(
201201

202202
PageRankAlgorithm prAlgo;
203203
if(weightPropertyKey != null) {
204+
final boolean cacheWeights = configuration.get("cacheWeights", false);
204205
prAlgo = PageRankAlgorithm.weightedOf(
205206
tracker,
206207
graph,
207208
dampingFactor,
208209
sourceNodeIds,
209210
Pools.DEFAULT,
210211
concurrency,
211-
batchSize);
212+
batchSize,
213+
cacheWeights);
212214
} else {
213215
prAlgo = PageRankAlgorithm.of(
214216
tracker,

algo/src/main/java/org/neo4j/graphalgo/impl/WeightedDegreeCentrality.java

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
package org.neo4j.graphalgo.impl;
22

33
import org.neo4j.graphalgo.api.Graph;
4-
import org.neo4j.graphalgo.api.WeightedRelationshipConsumer;
54
import org.neo4j.graphalgo.core.utils.ParallelUtil;
65
import org.neo4j.graphalgo.core.utils.Pools;
7-
import org.neo4j.graphalgo.impl.pagerank.HugeComputeStep;
86
import org.neo4j.graphdb.Direction;
97

108
import java.util.ArrayList;
119
import java.util.List;
1210
import java.util.concurrent.ExecutorService;
13-
import java.util.concurrent.Future;
1411
import java.util.concurrent.atomic.AtomicInteger;
1512
import java.util.stream.IntStream;
1613
import java.util.stream.Stream;
@@ -22,7 +19,9 @@ public class WeightedDegreeCentrality extends Algorithm<WeightedDegreeCentrality
2219
private final ExecutorService executor;
2320
private final int concurrency;
2421
private volatile AtomicInteger nodeQueue = new AtomicInteger();
22+
2523
private double[] degrees;
24+
private double[][] weights;
2625

2726
public WeightedDegreeCentrality(
2827
Graph graph,
@@ -40,14 +39,19 @@ public WeightedDegreeCentrality(
4039
nodeCount = Math.toIntExact(graph.nodeCount());
4140
this.direction = direction;
4241
degrees = new double[nodeCount];
42+
weights = new double[nodeCount][];
4343
}
4444

45-
public WeightedDegreeCentrality compute() {
45+
public WeightedDegreeCentrality compute(boolean cacheWeights) {
4646
nodeQueue.set(0);
4747

48-
List<DegreeTask> tasks = new ArrayList<>();
48+
List<Runnable> tasks = new ArrayList<>();
4949
for (int i = 0; i < concurrency; i++) {
50-
tasks.add(new DegreeTask());
50+
if(cacheWeights) {
51+
tasks.add(new DegreeAndWeightsTask());
52+
} else {
53+
tasks.add(new DegreeTask());
54+
}
5155
}
5256
ParallelUtil.runWithConcurrency(concurrency, tasks, executor);
5357

@@ -79,6 +83,36 @@ public void run() {
7983
if(weight > 0) {
8084
weightedDegree[0] += weight;
8185
}
86+
87+
return true;
88+
});
89+
90+
degrees[nodeId] = weightedDegree[0];
91+
92+
}
93+
}
94+
}
95+
96+
private class DegreeAndWeightsTask implements Runnable {
97+
@Override
98+
public void run() {
99+
for (; ; ) {
100+
final int nodeId = nodeQueue.getAndIncrement();
101+
if (nodeId >= nodeCount || !running()) {
102+
return;
103+
}
104+
105+
weights[nodeId] = new double[graph.degree(nodeId, direction)];
106+
107+
int[] index = {0};
108+
double[] weightedDegree = new double[1];
109+
graph.forEachRelationship(nodeId, direction, (sourceNodeId, targetNodeId, relationId, weight) -> {
110+
if(weight > 0) {
111+
weightedDegree[0] += weight;
112+
}
113+
114+
weights[nodeId][index[0]] = weight;
115+
index[0]++;
82116
return true;
83117
});
84118

@@ -91,6 +125,9 @@ public void run() {
91125
public double[] degrees() {
92126
return degrees;
93127
}
128+
public double[][] weights() {
129+
return weights;
130+
}
94131

95132
public Stream<DegreeCentrality.Result> resultStream() {
96133
return IntStream.range(0, nodeCount)

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/BaseComputeStep.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
11
package org.neo4j.graphalgo.impl.pagerank;
22

33
import org.neo4j.graphalgo.api.Degrees;
4-
import org.neo4j.graphalgo.api.RelationshipIterator;
5-
import org.neo4j.graphalgo.api.RelationshipWeights;
6-
import org.neo4j.graphdb.Direction;
74

85
import java.util.Arrays;
96
import java.util.stream.IntStream;
107

11-
import static org.neo4j.graphalgo.core.utils.ArrayUtil.binaryLookup;
12-
138
public abstract class BaseComputeStep implements ComputeStep {
149
private static final int S_INIT = 0;
1510
private static final int S_CALC = 1;
@@ -20,7 +15,6 @@ public abstract class BaseComputeStep implements ComputeStep {
2015
int[] starts;
2116
private int[] lengths;
2217
private int[] sourceNodeIds;
23-
final RelationshipIterator relationshipIterator;
2418
final Degrees degrees;
2519

2620
private final double alpha;
@@ -38,14 +32,12 @@ public abstract class BaseComputeStep implements ComputeStep {
3832
BaseComputeStep(
3933
double dampingFactor,
4034
int[] sourceNodeIds,
41-
RelationshipIterator relationshipIterator,
4235
Degrees degrees,
4336
int partitionSize,
4437
int startNode) {
4538
this.dampingFactor = dampingFactor;
4639
this.alpha = 1.0 - dampingFactor;
4740
this.sourceNodeIds = sourceNodeIds;
48-
this.relationshipIterator = relationshipIterator;
4941
this.degrees = degrees;
5042
this.partitionSize = partitionSize;
5143
this.startNode = startNode;
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.neo4j.graphalgo.impl.pagerank;
2+
3+
public class DegreeCache {
4+
5+
public final static DegreeCache EMPTY = new DegreeCache(new double[0], new double[0][0]);
6+
7+
private double[] aggregatedDegrees;
8+
private double[][] weights;
9+
10+
public DegreeCache(double[] aggregatedDegrees, double[][] weights) {
11+
this.aggregatedDegrees = aggregatedDegrees;
12+
this.weights = weights;
13+
}
14+
15+
double[] aggregatedDegrees() {
16+
return aggregatedDegrees;
17+
}
18+
19+
double[][] weights() {
20+
return weights;
21+
}
22+
23+
boolean hasCachedValues() {
24+
return weights.length > 0;
25+
}
26+
}

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/DegreeComputer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
import java.util.concurrent.ExecutorService;
44

55
public interface DegreeComputer {
6-
double[] degree(ExecutorService executor, int concurrency);
6+
DegreeCache degree(ExecutorService executor, int concurrency);
77
}

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/HugePageRank.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,7 @@ private void initializeSteps() {
205205
? this.executor : null;
206206

207207
WeightedDegreeCentrality degreeCentrality = new WeightedDegreeCentrality(graph, executor, concurrency, Direction.OUTGOING);
208-
degreeCentrality.compute();
209-
210-
DegreeComputer degreeComputer = pageRankVariant.degreeComputer(graph);
208+
degreeCentrality.compute(false);
211209

212210
computeSteps = createComputeSteps(
213211
concurrency,
@@ -270,7 +268,7 @@ private ComputeSteps createComputeSteps(
270268
Iterator<Partition> parts = partitions.iterator();
271269

272270
DegreeComputer degreeComputer = pageRankVariant.degreeComputer(graph);
273-
double[] aggregatedDegrees = degreeComputer.degree(pool, concurrency);
271+
DegreeCache degreeCache = degreeComputer.degree(pool, concurrency);
274272

275273
while (parts.hasNext()) {
276274
Partition partition = parts.next();
@@ -296,7 +294,7 @@ private ComputeSteps createComputeSteps(
296294
tracker,
297295
partitionCount,
298296
start,
299-
aggregatedDegrees));
297+
degreeCache));
300298
}
301299

302300
long[] startArray = starts.toArray();

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/HugeWeightedComputeStep.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ public class HugeWeightedComputeStep extends HugeBaseComputeStep implements Huge
2323
HugeRelationshipWeights relationshipWeights,
2424
AllocationTracker tracker,
2525
int partitionSize,
26-
long startNode, double[] aggregatedDegrees) {
26+
long startNode,
27+
DegreeCache degreeCache) {
2728
super(dampingFactor,
2829
sourceNodeIds,
2930
relationshipIterator,
@@ -32,7 +33,7 @@ public class HugeWeightedComputeStep extends HugeBaseComputeStep implements Huge
3233
partitionSize,
3334
startNode);
3435
this.relationshipWeights = relationshipWeights;
35-
this.aggregatedDegrees = aggregatedDegrees;
36+
this.aggregatedDegrees = degreeCache.aggregatedDegrees();
3637
}
3738

3839
void singleIteration() {

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NoOpDegreeComputer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
public class NoOpDegreeComputer implements DegreeComputer {
66

77
@Override
8-
public double[] degree(ExecutorService executor, int concurrency) {
9-
return new double[0];
8+
public DegreeCache degree(ExecutorService executor, int concurrency) {
9+
return DegreeCache.EMPTY;
1010
}
1111
}

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NonWeightedComputeStep.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
final class NonWeightedComputeStep extends BaseComputeStep implements RelationshipConsumer {
1111

12+
private final RelationshipIterator relationshipIterator;
1213

1314
NonWeightedComputeStep(
1415
double dampingFactor,
@@ -17,7 +18,8 @@ final class NonWeightedComputeStep extends BaseComputeStep implements Relationsh
1718
Degrees degrees,
1819
int partitionSize,
1920
int startNode) {
20-
super(dampingFactor, sourceNodeIds, relationshipIterator, degrees, partitionSize, startNode);
21+
super(dampingFactor, sourceNodeIds, degrees, partitionSize, startNode);
22+
this.relationshipIterator = relationshipIterator;
2123
}
2224

2325

algo/src/main/java/org/neo4j/graphalgo/impl/pagerank/NonWeightedPageRankVariant.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
import org.neo4j.graphalgo.api.*;
44
import org.neo4j.graphalgo.core.utils.paged.AllocationTracker;
55

6-
import java.util.concurrent.ExecutorService;
7-
86
public class NonWeightedPageRankVariant implements PageRankVariant {
9-
public ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds, RelationshipIterator relationshipIterator, Degrees degrees, RelationshipWeights relationshipWeights, int partitionCount, int start, double[] aggregatedDegrees) {
7+
public ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds,
8+
RelationshipIterator relationshipIterator,
9+
WeightedRelationshipIterator weightedRelationshipIterator,
10+
Degrees degrees,
11+
int partitionCount, int start,
12+
DegreeCache degreeCache) {
1013
return new NonWeightedComputeStep(
1114
dampingFactor,
1215
sourceNodeIds,
@@ -18,7 +21,7 @@ public ComputeStep createComputeStep(double dampingFactor, int[] sourceNodeIds,
1821
}
1922

2023
@Override
21-
public HugeNonWeightedComputeStep createHugeComputeStep(double dampingFactor, long[] sourceNodeIds, HugeRelationshipIterator relationshipIterator, HugeDegrees degrees, HugeRelationshipWeights relationshipWeights, AllocationTracker tracker, int partitionCount, long start, double[] aggregatedDegrees) {
24+
public HugeNonWeightedComputeStep createHugeComputeStep(double dampingFactor, long[] sourceNodeIds, HugeRelationshipIterator relationshipIterator, HugeDegrees degrees, HugeRelationshipWeights relationshipWeights, AllocationTracker tracker, int partitionCount, long start, DegreeCache aggregatedDegrees) {
2225
return new HugeNonWeightedComputeStep(
2326
dampingFactor,
2427
sourceNodeIds,

0 commit comments

Comments
 (0)