2121import com .carrotsearch .hppc .IntArrayList ;
2222import com .carrotsearch .hppc .LongArrayList ;
2323import org .neo4j .collection .primitive .PrimitiveLongIterator ;
24- import org .neo4j .graphalgo .api .HugeDegrees ;
25- import org .neo4j .graphalgo .api .HugeIdMapping ;
26- import org .neo4j .graphalgo .api .HugeNodeIterator ;
27- import org .neo4j .graphalgo .api .HugeRelationshipConsumer ;
28- import org .neo4j .graphalgo .api .HugeRelationshipIterator ;
24+ import org .neo4j .graphalgo .api .*;
2925import org .neo4j .graphalgo .core .utils .ParallelUtil ;
3026import org .neo4j .graphalgo .core .utils .paged .AllocationTracker ;
3127import org .neo4j .graphalgo .core .write .Exporter ;
3935import java .util .Iterator ;
4036import java .util .List ;
4137import java .util .concurrent .ExecutorService ;
38+ import java .util .stream .LongStream ;
4239
4340import static org .neo4j .graphalgo .core .utils .ArrayUtil .binaryLookup ;
4441import static org .neo4j .graphalgo .core .utils .paged .AllocationTracker .humanReadable ;
45- import static org .neo4j .graphalgo .core .utils .paged .MemoryUsage .shallowSizeOfInstance ;
46- import static org .neo4j .graphalgo .core .utils .paged .MemoryUsage .sizeOfDoubleArray ;
47- import static org .neo4j .graphalgo .core .utils .paged .MemoryUsage .sizeOfIntArray ;
48- import static org .neo4j .graphalgo .core .utils .paged .MemoryUsage .sizeOfLongArray ;
49- import static org .neo4j .graphalgo .core .utils .paged .MemoryUsage .sizeOfObjectArray ;
42+ import static org .neo4j .graphalgo .core .utils .paged .MemoryUsage .*;
5043
5144
5245/**
@@ -111,6 +104,8 @@ public class HugePageRank extends Algorithm<HugePageRank> implements PageRankAlg
111104 private final HugeRelationshipIterator relationshipIterator ;
112105 private final HugeDegrees degrees ;
113106 private final double dampingFactor ;
107+ private final HugeGraph graph ;
108+ private LongStream sourceNodeIds ;
114109
115110 private Log log ;
116111 private ComputeSteps computeSteps ;
@@ -121,21 +116,17 @@ public class HugePageRank extends Algorithm<HugePageRank> implements PageRankAlg
121116 */
122117 HugePageRank (
123118 AllocationTracker tracker ,
124- HugeIdMapping idMapping ,
125- HugeNodeIterator nodeIterator ,
126- HugeRelationshipIterator relationshipIterator ,
127- HugeDegrees degrees ,
128- double dampingFactor ) {
119+ HugeGraph graph ,
120+ double dampingFactor ,
121+ LongStream sourceNodeIds ) {
129122 this (
130123 null ,
131124 -1 ,
132125 ParallelUtil .DEFAULT_BATCH_SIZE ,
133126 tracker ,
134- idMapping ,
135- nodeIterator ,
136- relationshipIterator ,
137- degrees ,
138- dampingFactor );
127+ graph ,
128+ dampingFactor ,
129+ sourceNodeIds );
139130 }
140131
141132 /**
@@ -148,20 +139,20 @@ public class HugePageRank extends Algorithm<HugePageRank> implements PageRankAlg
148139 int concurrency ,
149140 int batchSize ,
150141 AllocationTracker tracker ,
151- HugeIdMapping idMapping ,
152- HugeNodeIterator nodeIterator ,
153- HugeRelationshipIterator relationshipIterator ,
154- HugeDegrees degrees ,
155- double dampingFactor ) {
142+ HugeGraph graph ,
143+ double dampingFactor ,
144+ LongStream sourceNodeIds ) {
156145 this .executor = executor ;
157146 this .concurrency = concurrency ;
158147 this .batchSize = batchSize ;
159148 this .tracker = tracker ;
160- this .idMapping = idMapping ;
161- this .nodeIterator = nodeIterator ;
162- this .relationshipIterator = relationshipIterator ;
163- this .degrees = degrees ;
149+ this .idMapping = graph ;
150+ this .nodeIterator = graph ;
151+ this .relationshipIterator = graph ;
152+ this .degrees = graph ;
153+ this .graph = graph ;
164154 this .dampingFactor = dampingFactor ;
155+ this .sourceNodeIds = sourceNodeIds ;
165156 }
166157
167158 /**
@@ -209,6 +200,7 @@ private void initializeSteps() {
209200 concurrency ,
210201 idMapping .nodeCount (),
211202 dampingFactor ,
203+ sourceNodeIds .map (graph ::toHugeMappedNodeId ).filter (mappedId -> mappedId != -1L ).toArray (),
212204 relationshipIterator ,
213205 degrees ,
214206 partitions ,
@@ -246,6 +238,7 @@ private ComputeSteps createComputeSteps(
246238 int concurrency ,
247239 long nodeCount ,
248240 double dampingFactor ,
241+ long [] sourceNodeIds ,
249242 HugeRelationshipIterator relationshipIterator ,
250243 HugeDegrees degrees ,
251244 List <Partition > partitions ,
@@ -281,6 +274,7 @@ private ComputeSteps createComputeSteps(
281274
282275 computeSteps .add (new ComputeStep (
283276 dampingFactor ,
277+ sourceNodeIds ,
284278 relationshipIterator ,
285279 degrees ,
286280 tracker ,
@@ -542,6 +536,7 @@ private static final class ComputeStep implements Runnable, HugeRelationshipCons
542536
543537 private long [] starts ;
544538 private int [] lengths ;
539+ private long [] sourceNodeIds ;
545540 private final HugeRelationshipIterator relationshipIterator ;
546541 private final HugeDegrees degrees ;
547542 private final AllocationTracker tracker ;
@@ -562,13 +557,15 @@ private static final class ComputeStep implements Runnable, HugeRelationshipCons
562557
563558 ComputeStep (
564559 double dampingFactor ,
560+ long [] sourceNodeIds ,
565561 HugeRelationshipIterator relationshipIterator ,
566562 HugeDegrees degrees ,
567563 AllocationTracker tracker ,
568564 int partitionSize ,
569565 long startNode ) {
570566 this .dampingFactor = dampingFactor ;
571567 this .alpha = 1.0 - dampingFactor ;
568+ this .sourceNodeIds = sourceNodeIds ;
572569 this .relationshipIterator = relationshipIterator .concurrentCopy ();
573570 this .degrees = degrees ;
574571 this .tracker = tracker ;
@@ -606,8 +603,21 @@ private void initialize() {
606603 });
607604
608605 tracker .add (sizeOfDoubleArray (partitionSize ) << 1 );
606+
609607 double [] partitionRank = new double [partitionSize ];
610- Arrays .fill (partitionRank , alpha );
608+ if (sourceNodeIds .length == 0 ) {
609+ Arrays .fill (partitionRank , alpha );
610+ } else {
611+ Arrays .fill (partitionRank ,0 );
612+
613+ long [] partitionSourceNodeIds = LongStream .of (sourceNodeIds )
614+ .filter (sourceNodeId -> sourceNodeId >= startNode && sourceNodeId <= endNode )
615+ .toArray ();
616+
617+ for (long sourceNodeId : partitionSourceNodeIds ) {
618+ partitionRank [Math .toIntExact (sourceNodeId - this .startNode )] = alpha ;
619+ }
620+ }
611621
612622 this .pageRank = partitionRank ;
613623 this .deltas = Arrays .copyOf (partitionRank , partitionSize );
0 commit comments