Skip to content
This repository was archived by the owner on Apr 22, 2020. It is now read-only.

Commit 81c8bfb

Browse files
mneedhamjexp
authored andcommitted
Fix bug with degree centrality when processing multiple batches (#865)
1 parent 1f27da0 commit 81c8bfb

File tree

2 files changed

+139
-1
lines changed

2 files changed

+139
-1
lines changed

algo/src/main/java/org/neo4j/graphalgo/impl/degree/DegreeCentrality.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public class DegreeCentrality extends Algorithm<DegreeCentrality> implements Deg
4040
private Graph graph;
4141
private final ExecutorService executor;
4242
private final int concurrency;
43-
private volatile AtomicInteger nodeQueue = new AtomicInteger();
4443

4544
private double[] degrees;
4645

@@ -70,12 +69,15 @@ public void compute() {
7069
this.starts = new long[taskCount];
7170
this.partitions = new double[taskCount][batchSize];
7271

72+
long startNode = 0L;
7373
for (int i = 0; i < taskCount; i++) {
74+
starts[i] = startNode;
7475
if(weighted) {
7576
tasks.add(new WeightedDegreeTask(starts[i], partitions[i]));
7677
} else {
7778
tasks.add(new DegreeTask(starts[i], partitions[i]));
7879
}
80+
startNode += batchSize;
7981
}
8082
ParallelUtil.runWithConcurrency(concurrency, tasks, executor);
8183
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/**
2+
* Copyright (c) 2017 "Neo4j, Inc." <http://neo4j.com>
3+
*
4+
* This file is part of Neo4j Graph Algorithms <http://github.com/neo4j-contrib/neo4j-graph-algorithms>.
5+
*
6+
* Neo4j Graph Algorithms is free software: you can redistribute it and/or modify
7+
* it under the terms of the GNU General Public License as published by
8+
* the Free Software Foundation, either version 3 of the License, or
9+
* (at your option) any later version.
10+
*
11+
* This program is distributed in the hope that it will be useful,
12+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
* GNU General Public License for more details.
15+
*
16+
* You should have received a copy of the GNU General Public License
17+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
*/
19+
package org.neo4j.graphalgo.algo;
20+
21+
import org.junit.AfterClass;
22+
import org.junit.BeforeClass;
23+
import org.junit.Test;
24+
import org.junit.runner.RunWith;
25+
import org.junit.runners.Parameterized;
26+
import org.neo4j.graphalgo.DegreeCentralityProc;
27+
import org.neo4j.graphalgo.TestDatabaseCreator;
28+
import org.neo4j.graphdb.Label;
29+
import org.neo4j.graphdb.Result;
30+
import org.neo4j.graphdb.Transaction;
31+
import org.neo4j.internal.kernel.api.exceptions.KernelException;
32+
import org.neo4j.kernel.impl.proc.Procedures;
33+
import org.neo4j.kernel.internal.GraphDatabaseAPI;
34+
35+
import java.util.*;
36+
import java.util.function.Consumer;
37+
38+
import static org.junit.Assert.*;
39+
40+
@RunWith(Parameterized.class)
41+
public class DegreeProcIssue848IntegrationTest {
42+
43+
private static GraphDatabaseAPI db;
44+
45+
private static final String DB_CYPHER = "" +
46+
"UNWIND range(1,10001) as s\n" +
47+
"CREATE (:Node{id:s});\n";
48+
49+
@AfterClass
50+
public static void tearDown() throws Exception {
51+
if (db != null) db.shutdown();
52+
}
53+
54+
@BeforeClass
55+
public static void setup() throws KernelException {
56+
db = TestDatabaseCreator.createTestDatabase();
57+
try (Transaction tx = db.beginTx()) {
58+
db.execute(DB_CYPHER).close();
59+
tx.success();
60+
}
61+
62+
db.getDependencyResolver()
63+
.resolveDependency(Procedures.class)
64+
.registerProcedure(DegreeCentralityProc.class);
65+
}
66+
67+
@Parameterized.Parameters(name = "{0}")
68+
public static Collection<Object[]> data() {
69+
return Arrays.asList(
70+
new Object[]{"Heavy"},
71+
new Object[]{"Light"},
72+
new Object[]{"Kernel"},
73+
new Object[]{"Huge"}
74+
);
75+
}
76+
77+
@Parameterized.Parameter
78+
public String graphImpl;
79+
80+
@Test
81+
public void multipleBatches() throws Exception {
82+
final Map<Long, Double> actual = new HashMap<>();
83+
runQuery(
84+
"CALL algo.degree.stream(\"Node\", \"REL\", {direction: \"incoming\"})\n" +
85+
"YIELD nodeId, score\n",
86+
row -> actual.put(
87+
(Long)row.get("nodeId"),
88+
(Double) row.get("score")));
89+
90+
Map<Long, Double> expected = new HashMap<>();
91+
for (long i = 0; i < 10001; i++) {
92+
expected.put(i, 0.0);
93+
}
94+
95+
assertMapEquals(expected, actual);
96+
}
97+
98+
private static void assertMapEquals(
99+
Map<Long, Double> expected,
100+
Map<Long, Double> actual) {
101+
assertEquals("number of elements", expected.size(), actual.size());
102+
HashSet<Long> expectedKeys = new HashSet<>(expected.keySet());
103+
for (Map.Entry<Long, Double> entry : actual.entrySet()) {
104+
assertTrue(
105+
"unknown key " + entry.getKey(),
106+
expectedKeys.remove(entry.getKey()));
107+
assertEquals(
108+
"value for " + entry.getKey(),
109+
expected.get(entry.getKey()),
110+
entry.getValue(),
111+
0.1);
112+
}
113+
for (Long expectedKey : expectedKeys) {
114+
fail("missing key " + expectedKey);
115+
}
116+
}
117+
118+
private static void runQuery(
119+
String query,
120+
Consumer<Result.ResultRow> check) {
121+
runQuery(query, new HashMap<>(), check);
122+
}
123+
124+
private static void runQuery(
125+
String query,
126+
Map<String, Object> params,
127+
Consumer<Result.ResultRow> check) {
128+
try (Result result = db.execute(query, params)) {
129+
result.accept(row -> {
130+
check.accept(row);
131+
return true;
132+
});
133+
}
134+
}
135+
136+
}

0 commit comments

Comments
 (0)