Skip to content

Commit 02a18ce

Browse files
authored
add examples for api call algorithm and string id mapping (#21)
* add algorithm examples * add id mapping example * update the hdfs address
1 parent a37e0db commit 02a18ce

File tree

15 files changed

+550
-34
lines changed

15 files changed

+550
-34
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
src,dst,weight
2+
1,2,1.0
3+
1,3,2.0
4+
1,4,3.0
5+
1,5,1.5
6+
2,3,2.6
7+
2,4,2.0
8+
3,4,3.0
9+
3,5,4.0
10+
2,5,1.0
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
src,dst,weight
2+
a,b,1.0
3+
c,d,2.0
4+
1,4,3.0
5+
1,5,1.5
6+
2,3,2.6
7+
2,4,2.0
8+
3,4,3.0
9+
3,5,4.0
10+
2,5,1.0
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
1,1
2+
2,1
3+
3,1
4+
4,4
5+
5,4
6+
6,4
7+
7,4
8+
8,4
9+
9,9
10+
10,9
Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,61 @@
11
/* Copyright (c) 2020 vesoft inc. All rights reserved.
22
*
3-
* This source code is licensed under Apache 2.0 License,
4-
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
3+
* This source code is licensed under Apache 2.0 License.
54
*/
65

76
package com.vesoft.nebula.algorithm
87

9-
object ClusteringCoefficientExample {}
8+
import com.facebook.thrift.protocol.TCompactProtocol
9+
import com.vesoft.nebula.algorithm.config.CoefficientConfig
10+
import com.vesoft.nebula.algorithm.lib.ClusteringCoefficientAlgo
11+
import org.apache.spark.SparkConf
12+
import org.apache.spark.sql.functions.col
13+
import org.apache.spark.sql.{DataFrame, SparkSession}
14+
15+
object ClusteringCoefficientExample {
16+
17+
def main(args: Array[String]): Unit = {
18+
val sparkConf = new SparkConf()
19+
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
20+
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
21+
val spark = SparkSession
22+
.builder()
23+
.master("local")
24+
.config(sparkConf)
25+
.getOrCreate()
26+
27+
val csvDF = ReadData.readCsvData(spark)
28+
val nebulaDF = ReadData.readNebulaData(spark)
29+
val liveJournalDF = ReadData.readLiveJournalData(spark)
30+
31+
localClusteringCoefficient(spark, csvDF)
32+
globalCLusteringCoefficient(spark, csvDF)
33+
}
34+
35+
/**
36+
* compute for local clustering coefficient
37+
*/
38+
def localClusteringCoefficient(spark: SparkSession, df: DataFrame): Unit = {
39+
val localClusteringCoefficientConfig = new CoefficientConfig("local")
40+
val localClusterCoeff =
41+
ClusteringCoefficientAlgo.apply(spark, df, localClusteringCoefficientConfig)
42+
localClusterCoeff.show()
43+
localClusterCoeff
44+
.filter(row => !row.get(1).toString.equals("0.0"))
45+
.orderBy(col("clustercoefficient"))
46+
.write
47+
.option("header", true)
48+
.csv("hdfs://127.0.0.1:9000/tmp/ccresult")
49+
}
50+
51+
/**
52+
* compute for global clustering coefficient
53+
*/
54+
def globalCLusteringCoefficient(spark: SparkSession, df: DataFrame): Unit = {
55+
val globalClusteringCoefficientConfig = new CoefficientConfig("global")
56+
val globalClusterCoeff =
57+
ClusteringCoefficientAlgo.apply(spark, df, globalClusteringCoefficientConfig)
58+
globalClusterCoeff.show()
59+
}
60+
61+
}

example/src/main/scala/com/vesoft/nebula/algorithm/DegreeStaticExample.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,32 @@
55

66
package com.vesoft.nebula.algorithm
77

8-
object DegreeStaticExample {}
8+
import com.facebook.thrift.protocol.TCompactProtocol
9+
import com.vesoft.nebula.algorithm.lib.{DegreeStaticAlgo}
10+
import org.apache.spark.SparkConf
11+
import org.apache.spark.sql.{DataFrame, SparkSession}
12+
13+
object DegreeStaticExample {
14+
15+
def main(args: Array[String]): Unit = {
16+
val sparkConf = new SparkConf()
17+
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
18+
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
19+
val spark = SparkSession
20+
.builder()
21+
.master("local")
22+
.config(sparkConf)
23+
.getOrCreate()
24+
25+
// val csvDF = ReadData.readCsvData(spark)
26+
// val nebulaDF = ReadData.readNebulaData(spark)
27+
val journalDF = ReadData.readLiveJournalData(spark)
28+
29+
degree(spark, journalDF)
30+
}
31+
32+
def degree(spark: SparkSession, df: DataFrame): Unit = {
33+
val degree = DegreeStaticAlgo.apply(spark, df)
34+
degree.show()
35+
}
36+
}

example/src/main/scala/com/vesoft/nebula/algorithm/GraphTriangleCountExample.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,36 @@
55

66
package com.vesoft.nebula.algorithm
77

8-
object GraphTriangleCountExample {}
8+
import com.facebook.thrift.protocol.TCompactProtocol
9+
import com.vesoft.nebula.algorithm.ClusteringCoefficientExample.{
10+
globalCLusteringCoefficient,
11+
localClusteringCoefficient
12+
}
13+
import com.vesoft.nebula.algorithm.lib.GraphTriangleCountAlgo
14+
import org.apache.spark.SparkConf
15+
import org.apache.spark.sql.{DataFrame, SparkSession}
16+
17+
object GraphTriangleCountExample {
18+
19+
def main(args: Array[String]): Unit = {
20+
val sparkConf = new SparkConf()
21+
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
22+
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
23+
val spark = SparkSession
24+
.builder()
25+
.master("local")
26+
.config(sparkConf)
27+
.getOrCreate()
28+
29+
// val csvDF = ReadData.readCsvData(spark)
30+
// val nebulaDF = ReadData.readNebulaData(spark)
31+
val journalDF = ReadData.readLiveJournalData(spark)
32+
33+
graphTriangleCount(spark, journalDF)
34+
}
35+
36+
def graphTriangleCount(spark: SparkSession, df: DataFrame): Unit = {
37+
val graphTriangleCount = GraphTriangleCountAlgo.apply(spark, df)
38+
graphTriangleCount.show()
39+
}
40+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/* Copyright (c) 2021 vesoft inc. All rights reserved.
2+
*
3+
* This source code is licensed under Apache 2.0 License.
4+
*/
5+
6+
package com.vesoft.nebula.algorithm
7+
8+
import com.facebook.thrift.protocol.TCompactProtocol
9+
import com.vesoft.nebula.algorithm.config.{LPAConfig, LouvainConfig}
10+
import com.vesoft.nebula.algorithm.lib.{LabelPropagationAlgo, LouvainAlgo}
11+
import org.apache.spark.SparkConf
12+
import org.apache.spark.sql.{DataFrame, SparkSession}
13+
14+
object LouvainExample {
15+
def main(args: Array[String]): Unit = {
16+
val sparkConf = new SparkConf()
17+
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
18+
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
19+
val spark = SparkSession
20+
.builder()
21+
.master("local")
22+
.config(sparkConf)
23+
.getOrCreate()
24+
25+
// val csvDF = ReadData.readCsvData(spark)
26+
// val nebulaDF = ReadData.readNebulaData(spark)
27+
val journalDF = ReadData.readLiveJournalData(spark)
28+
29+
louvain(spark, journalDF)
30+
}
31+
32+
def louvain(spark: SparkSession, df: DataFrame): Unit = {
33+
val louvainConfig = LouvainConfig(10, 5, 0.5)
34+
val louvain = LouvainAlgo.apply(spark, df, louvainConfig, false)
35+
louvain.show()
36+
}
37+
}

example/src/main/scala/com/vesoft/nebula/algorithm/LpaExample.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,33 @@
55

66
package com.vesoft.nebula.algorithm
77

8-
object LpaExample {}
8+
import com.facebook.thrift.protocol.TCompactProtocol
9+
import com.vesoft.nebula.algorithm.config.LPAConfig
10+
import com.vesoft.nebula.algorithm.lib.LabelPropagationAlgo
11+
import org.apache.spark.SparkConf
12+
import org.apache.spark.sql.{DataFrame, SparkSession}
13+
14+
object LpaExample {
15+
def main(args: Array[String]): Unit = {
16+
val sparkConf = new SparkConf()
17+
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
18+
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
19+
val spark = SparkSession
20+
.builder()
21+
.master("local")
22+
.config(sparkConf)
23+
.getOrCreate()
24+
25+
// val csvDF = ReadData.readCsvData(spark)
26+
// val nebulaDF = ReadData.readNebulaData(spark)
27+
val journalDF = ReadData.readLiveJournalData(spark)
28+
29+
lpa(spark, journalDF)
30+
}
31+
32+
def lpa(spark: SparkSession, df: DataFrame): Unit = {
33+
val lpaConfig = LPAConfig(Int.MaxValue)
34+
val lpa = LabelPropagationAlgo.apply(spark, df, lpaConfig, false)
35+
lpa.show()
36+
}
37+
}

example/src/main/scala/com/vesoft/nebula/algorithm/PageRankExample.scala

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,105 @@
55

66
package com.vesoft.nebula.algorithm
77

8-
object PageRankExample {}
8+
import com.facebook.thrift.protocol.TCompactProtocol
9+
import com.vesoft.nebula.algorithm.config.{CcConfig, PRConfig}
10+
import com.vesoft.nebula.algorithm.lib.{PageRankAlgo, StronglyConnectedComponentsAlgo}
11+
import org.apache.spark.SparkConf
12+
import org.apache.spark.sql.expressions.Window
13+
import org.apache.spark.sql.functions.{col, dense_rank}
14+
import org.apache.spark.sql.{DataFrame, SparkSession}
15+
16+
object PageRankExample {
17+
def main(args: Array[String]): Unit = {
18+
val sparkConf = new SparkConf()
19+
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
20+
.registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol]))
21+
val spark = SparkSession
22+
.builder()
23+
.master("local")
24+
.config(sparkConf)
25+
.getOrCreate()
26+
27+
// the edge data has numerical vid
28+
val csvDF = ReadData.readCsvData(spark)
29+
pagerank(spark, csvDF)
30+
31+
// the edge data has string vid
32+
val stringCsvDF = ReadData.readStringCsvData(spark)
33+
pagerankWithIdMaping(spark, stringCsvDF)
34+
}
35+
36+
/**
37+
* the src id and dst id are numerical values
38+
*/
39+
def pagerank(spark: SparkSession, df: DataFrame): Unit = {
40+
val pageRankConfig = PRConfig(3, 0.85)
41+
val pr = PageRankAlgo.apply(spark, df, pageRankConfig, false)
42+
pr.show()
43+
}
44+
45+
/**
46+
* convert src id and dst id to numerical value
47+
*/
48+
def pagerankWithIdMaping(spark: SparkSession, df: DataFrame): Unit = {
49+
val encodedDF = convertStringId2LongId(df)
50+
val pageRankConfig = PRConfig(3, 0.85)
51+
val pr = PageRankAlgo.apply(spark, encodedDF, pageRankConfig, false)
52+
val decodedPr = reconvertLongId2StringId(spark, pr)
53+
decodedPr.show()
54+
}
55+
56+
/**
57+
* if your edge data has String type src_id and dst_id, then you need to convert the String id to Long id.
58+
*
59+
* in this example, the columns of edge dataframe is: src, dst
60+
*
61+
*/
62+
def convertStringId2LongId(dataframe: DataFrame): DataFrame = {
63+
// get all vertex ids from edge dataframe
64+
val srcIdDF: DataFrame = dataframe.select("src").withColumnRenamed("src", "id")
65+
val dstIdDF: DataFrame = dataframe.select("dst").withColumnRenamed("dst", "id")
66+
val idDF = srcIdDF.union(dstIdDF).distinct()
67+
idDF.show()
68+
69+
// encode id to Long type using dense_rank, the encodeId has two columns: id, encodedId
70+
// then you need to save the encodeId to convert back for the algorithm's result.
71+
val encodeId = idDF.withColumn("encodedId", dense_rank().over(Window.orderBy("id")))
72+
encodeId.write.option("header", true).csv("file:///tmp/encodeId.csv")
73+
encodeId.show()
74+
75+
// convert the edge data's src and dst
76+
val srcJoinDF = dataframe
77+
.join(encodeId)
78+
.where(col("src") === col("id"))
79+
.drop("src")
80+
.drop("id")
81+
.withColumnRenamed("encodedId", "src")
82+
srcJoinDF.cache()
83+
val dstJoinDF = srcJoinDF
84+
.join(encodeId)
85+
.where(col("dst") === col("id"))
86+
.drop("dst")
87+
.drop("id")
88+
.withColumnRenamed("encodedId", "dst")
89+
dstJoinDF.show()
90+
91+
// make the first two columns of edge dataframe are src and dst id
92+
dstJoinDF.select("src", "dst", "weight")
93+
}
94+
95+
/**
96+
* re-convert the algorithm's result
97+
* @return dataframe with columns: id, {algo_name}
98+
*/
99+
def reconvertLongId2StringId(spark: SparkSession, dataframe: DataFrame): DataFrame = {
100+
// the String id and Long id map data
101+
val encodeId = spark.read.option("header", true).csv("file:///tmp/encodeId.csv")
102+
103+
encodeId
104+
.join(dataframe)
105+
.where(col("encodedId") === col("_id"))
106+
.drop("encodedId")
107+
.drop("_id")
108+
}
109+
}

0 commit comments

Comments
 (0)