77import org .HdrHistogram .DoubleHistogram ;
88import org .neo4j .graphalgo .core .ProcedureConfiguration ;
99import org .neo4j .graphalgo .core .ProcedureConstants ;
10- import org .neo4j .graphalgo .core .utils .ParallelUtil ;
11- import org .neo4j .graphalgo .core .utils .Pools ;
12- import org .neo4j .graphalgo .core .utils .QueueBasedSpliterator ;
13- import org .neo4j .graphalgo .core .utils .TerminationFlag ;
10+ import org .neo4j .graphalgo .core .utils .*;
1411import org .neo4j .graphalgo .impl .util .TopKConsumer ;
1512import org .neo4j .graphdb .Result ;
1613import org .neo4j .kernel .api .KernelTransaction ;
@@ -77,7 +74,76 @@ Long getWriteBatchSize(ProcedureConfiguration configuration) {
7774 return configuration .get ("writeBatchSize" , 10000L );
7875 }
7976
77+
78+ public class SimilarityResultBuilder {
79+ protected long writeDuration = -1 ;
80+ protected boolean write = false ;
81+ private int nodes ;
82+ private String writeRelationshipType ;
83+ private String writeProperty ;
84+ private AtomicLong similarityPairs ;
85+ private DoubleHistogram histogram ;
86+
87+
88+ public SimilarityResultBuilder withWriteDuration (long writeDuration ) {
89+ this .writeDuration = writeDuration ;
90+ return this ;
91+ }
92+
93+ public SimilarityResultBuilder withWrite (boolean write ) {
94+ this .write = write ;
95+ return this ;
96+ }
97+
98+ /**
99+ * returns an AutoClosable which measures the time
100+ * until it gets closed. Saves the duration as writeMillis
101+ *
102+ * @return
103+ */
104+ public ProgressTimer timeWrite () {
105+ return ProgressTimer .start (this ::withWriteDuration );
106+ }
107+
108+ public SimilaritySummaryResult build () {
109+ return SimilaritySummaryResult .from (nodes , similarityPairs , writeRelationshipType , writeProperty , write , histogram , writeDuration );
110+ }
111+
112+ public SimilarityResultBuilder nodes (int nodes ) {
113+ this .nodes = nodes ;
114+ return this ;
115+ }
116+
117+ public SimilarityResultBuilder write (boolean write ) {
118+ this .write = write ;
119+ return this ;
120+ }
121+
122+ public SimilarityResultBuilder writeRelationshipType (String writeRelationshipType ) {
123+ this .writeRelationshipType = writeRelationshipType ;
124+ return this ;
125+ }
126+
127+ public SimilarityResultBuilder writeProperty (String writeProperty ) {
128+ this .writeProperty = writeProperty ;
129+ return this ;
130+ }
131+
132+ public SimilarityResultBuilder similarityPairs (AtomicLong similarityPairs ) {
133+ this .similarityPairs = similarityPairs ;
134+ return this ;
135+ }
136+
137+ public SimilarityResultBuilder histogram (DoubleHistogram histogram ) {
138+ this .histogram = histogram ;
139+ return this ;
140+ }
141+ }
142+
80143 Stream <SimilaritySummaryResult > writeAndAggregateResults (Stream <SimilarityResult > stream , int length , ProcedureConfiguration configuration , boolean write , String writeRelationshipType , String writeProperty , boolean writeParallel ) {
144+ SimilarityResultBuilder builder = new SimilarityResultBuilder ();
145+ builder .nodes (length ).write (write ).writeRelationshipType (writeRelationshipType ).writeProperty (writeProperty );
146+
81147 long writeBatchSize = getWriteBatchSize (configuration );
82148 AtomicLong similarityPairs = new AtomicLong ();
83149 DoubleHistogram histogram = new DoubleHistogram (5 );
@@ -88,24 +154,31 @@ Stream<SimilaritySummaryResult> writeAndAggregateResults(Stream<SimilarityResult
88154
89155 if (write ) {
90156 if (writeParallel ) {
91- ParallelSimilarityExporter parallelSimilarityExporter = new ParallelSimilarityExporter (api , log , writeRelationshipType , writeProperty , length );
92- parallelSimilarityExporter .export (stream .peek (recorder ), writeBatchSize );
157+ try (ProgressTimer timer = builder .timeWrite ()) {
158+ ParallelSimilarityExporter parallelSimilarityExporter = new ParallelSimilarityExporter (api , log , writeRelationshipType , writeProperty , length );
159+ parallelSimilarityExporter .export (stream .peek (recorder ), writeBatchSize );
160+ }
93161
94162 } else {
95- SimilarityExporter similarityExporter = new SimilarityExporter (api , log , writeRelationshipType , writeProperty );
96- similarityExporter .export (stream .peek (recorder ), writeBatchSize );
163+ try (ProgressTimer timer = builder .timeWrite ()) {
164+ SimilarityExporter similarityExporter = new SimilarityExporter (api , log , writeRelationshipType , writeProperty );
165+ similarityExporter .export (stream .peek (recorder ), writeBatchSize );
166+ }
97167 }
98168
99169 } else {
100170 stream .forEach (recorder );
101171 }
102172
103- return Stream .of (SimilaritySummaryResult .from (length , similarityPairs , writeRelationshipType , writeProperty , write , histogram ));
173+ builder .similarityPairs (similarityPairs ).histogram (histogram );
174+ return Stream .of (builder .build ());
175+
176+ // return Stream.of(SimilaritySummaryResult.from(length, similarityPairs, writeRelationshipType, writeProperty, write, histogram));
104177 }
105178
106179 Stream <SimilaritySummaryResult > emptyStream (String writeRelationshipType , String writeProperty ) {
107180 return Stream .of (SimilaritySummaryResult .from (0 , new AtomicLong (0 ), writeRelationshipType ,
108- writeProperty , false , new DoubleHistogram (5 )));
181+ writeProperty , false , new DoubleHistogram (5 ), - 1 ));
109182 }
110183
111184 Double getSimilarityCutoff (ProcedureConfiguration configuration ) {
0 commit comments