Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/computer-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ jobs:
TRAVIS_DIR: computer-dist/src/assembly/travis
BSP_ETCD_URL: http://localhost:2579
KUBERNETES_VERSION: 1.20.1
# TODO: adapt the HugeGraph Server/Loader version to 1.5.0 (EdgeID has 5 parts now)
# NOTE: Remember to adaptor/update the version before new release
GRAPH_ENV_VERSION: 1.3.0
GRAPH_ENV_VERSION: 1.7.0

steps:
- name: Checkout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.hugegraph.computer.core.output.hg.exceptions.WriteBackException;
import org.apache.hugegraph.computer.core.output.hg.metrics.LoadSummary;
import org.apache.hugegraph.computer.core.output.hg.metrics.Printer;
import org.apache.hugegraph.computer.core.util.HugeClientUtil;
import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.driver.HugeClientBuilder;
import org.apache.hugegraph.structure.graph.Vertex;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
Expand Down Expand Up @@ -58,7 +58,8 @@ public TaskManager(Config config) {
String graph = config.get(ComputerOptions.HUGEGRAPH_GRAPH_NAME);
String username = config.get(ComputerOptions.HUGEGRAPH_USERNAME);
String password = config.get(ComputerOptions.HUGEGRAPH_PASSWORD);
this.client = new HugeClientBuilder(url, graph).configUser(username, password).build();
this.client = HugeClientUtil.newHugeClient(url, graph, username,
password);
// Try to make all batch threads running and don't wait for producer
this.batchSemaphore = new Semaphore(this.batchSemaphoreNum());
/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.computer.core.util;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.driver.HugeClientBuilder;
import org.apache.hugegraph.rest.RestResult;
import org.apache.hugegraph.structure.schema.EdgeLabel;
import org.apache.hugegraph.util.JsonUtil;

import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.deser.BeanDeserializerBuilder;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.module.SimpleModule;

public final class HugeClientUtil {

private static final AtomicBoolean COMPATIBILITY_REGISTERED =
new AtomicBoolean(false);

public static HugeClient newHugeClient(String url, String graph,
String username, String password) {
registerCompatibilityModule();
return new HugeClientBuilder(url, graph).configUser(username, password)
.build();
}

public static HugeClient newHugeClient(String url, String graph,
String username, String password,
int timeout) {
registerCompatibilityModule();
return new HugeClientBuilder(url, graph).configUser(username, password)
.configTimeout(timeout)
.build();
}

public static void registerCompatibilityModule() {
if (!COMPATIBILITY_REGISTERED.compareAndSet(false, true)) {
return;
}
RestResult.registerModule(newCompatibilityModule());
JsonUtil.registerModule(newCompatibilityModule());
}

private static SimpleModule newCompatibilityModule() {
SimpleModule module = new SimpleModule(
"hugegraph-computer-client-compatibility");
module.setDeserializerModifier(new BeanDeserializerModifier() {

@Override
public BeanDeserializerBuilder updateBuilder(
DeserializationConfig config, BeanDescription beanDesc,
BeanDeserializerBuilder builder) {
if (EdgeLabel.class.equals(beanDesc.getBeanClass())) {
builder.addIgnorable("edgelabel_type");
builder.addIgnorable("parent_label");
builder.addIgnorable("links");
}
return builder;
}
});
return module;
}

private HugeClientUtil() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,19 @@
import org.apache.hugegraph.computer.core.graph.value.NullValue;
import org.apache.hugegraph.computer.core.graph.value.StringValue;
import org.apache.hugegraph.computer.core.graph.value.Value;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.SplicingIdGenerator;

public final class HugeConverter {

private static final int LEGACY_EDGE_ID_PARTS = 4;
private static final int PARENT_EDGE_ID_PARTS = 5;
private static final int DIRECTIONAL_EDGE_ID_PARTS = 6;
private static final int LEGACY_EDGE_NAME_INDEX = 2;
private static final int PARENT_EDGE_NAME_INDEX = 3;
private static final int DIRECTIONAL_EDGE_NAME_INDEX = 4;

private static final GraphFactory GRAPH_FACTORY =
ComputerContext.instance().graphFactory();

Expand Down Expand Up @@ -96,4 +105,27 @@ public static Properties convertProperties(
}
return properties;
}

public static String convertEdgeName(Edge edge) {
E.checkArgumentNotNull(edge, "The edge can't be null");
String edgeId = edge.id();
if (edgeId == null) {
return edge.name();
}

String[] parts = SplicingIdGenerator.split(edgeId);
if (parts.length == LEGACY_EDGE_ID_PARTS) {
return parts[LEGACY_EDGE_NAME_INDEX];
} else if (parts.length == PARENT_EDGE_ID_PARTS) {
return parts[PARENT_EDGE_NAME_INDEX];
} else if (parts.length == DIRECTIONAL_EDGE_ID_PARTS &&
isEdgeDirection(parts[1])) {
return parts[DIRECTIONAL_EDGE_NAME_INDEX];
}
return edge.name();
}

private static boolean isEdgeDirection(String part) {
return "EDGE_OUT".equals(part) || "EDGE_IN".equals(part);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.hugegraph.computer.core.input.InputSplit;
import org.apache.hugegraph.computer.core.input.VertexFetcher;
import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService;
import org.apache.hugegraph.computer.core.util.HugeClientUtil;
import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.driver.HugeClientBuilder;

public class HugeGraphFetcher implements GraphFetcher {

Expand All @@ -39,7 +39,8 @@ public HugeGraphFetcher(Config config, InputSplitRpcService rpcService) {
String graph = config.get(ComputerOptions.HUGEGRAPH_GRAPH_NAME);
String username = config.get(ComputerOptions.HUGEGRAPH_USERNAME);
String password = config.get(ComputerOptions.HUGEGRAPH_PASSWORD);
this.client = new HugeClientBuilder(url, graph).configUser(username, password).build();
this.client = HugeClientUtil.newHugeClient(url, graph, username,
password);
this.vertexFetcher = new HugeVertexFetcher(config, this.client);
this.edgeFetcher = new HugeEdgeFetcher(config, this.client);
this.rpcService = rpcService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.input.InputSplit;
import org.apache.hugegraph.computer.core.input.InputSplitFetcher;
import org.apache.hugegraph.computer.core.util.HugeClientUtil;
import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.driver.HugeClientBuilder;
import org.apache.hugegraph.structure.graph.Shard;
import org.apache.hugegraph.util.E;

Expand All @@ -41,9 +41,8 @@ public HugeInputSplitFetcher(Config config) {
String username = config.get(ComputerOptions.HUGEGRAPH_USERNAME);
String password = config.get(ComputerOptions.HUGEGRAPH_PASSWORD);
int timeout = config.get(ComputerOptions.INPUT_SPLIT_FETCH_TIMEOUT);
this.client = new HugeClientBuilder(url, graph).configUser(username, password)
.configTimeout(timeout)
.build();
this.client = HugeClientUtil.newHugeClient(url, graph, username,
password, timeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ private Edge convert(org.apache.hugegraph.structure.graph.Edge edge) {
Properties properties = HugeConverter.convertProperties(
edge.properties());
Edge computerEdge = graphFactory.createEdge(edge.label(),
edge.name(), targetId
HugeConverter.convertEdgeName(edge),
targetId
);
computerEdge.label(edge.label());
computerEdge.properties(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.apache.hugegraph.computer.core.graph.value.StringValue;
import org.apache.hugegraph.computer.core.graph.value.ValueType;
import org.apache.hugegraph.computer.suite.unit.UnitTestBase;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.testutil.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -126,4 +128,73 @@ public void testConvertProperties() {
Assert.assertEquals(properties,
HugeConverter.convertProperties(rawProperties));
}

@Test
public void testConvertEdgeNameWithLegacyFourPartEdgeId() {
Edge edge = Mockito.mock(Edge.class);
Mockito.when(edge.id()).thenReturn(
"S1:178201>5>参数标准!3BA0>S4:239464");
Mockito.when(edge.name()).thenReturn("stale_client_name");

Assert.assertEquals("参数标准!3BA0",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithFivePartEdgeId() {
Edge edge = new Edge("belong_to_el_defect");
edge.id("S1:178201>5>5>参数标准!3BA0>S4:239464");

Assert.assertEquals("参数标准!3BA0",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithSixPartEdgeId() {
Edge edge = new Edge("belong_to_el_defect");
edge.id("S1:178201>EDGE_OUT>5>5>参数标准!3BA0>S4:239464");

Assert.assertEquals("参数标准!3BA0",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithSixPartInEdgeId() {
Edge edge = new Edge("belong_to_el_defect");
edge.id("S4:239464>EDGE_IN>5>5>参数标准!3BA0>S1:178201");

Assert.assertEquals("参数标准!3BA0",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithNullEdgeId() {
Edge edge = Mockito.mock(Edge.class);
Mockito.when(edge.id()).thenReturn(null);
Mockito.when(edge.name()).thenReturn("fallback_name");

Assert.assertEquals("fallback_name",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithUnknownEdgeIdFormat() {
Edge edge = Mockito.mock(Edge.class);
Mockito.when(edge.id()).thenReturn(
"S1:178201>VERTEX>5>5>参数标准!3BA0>S4:239464");
Mockito.when(edge.name()).thenReturn("fallback_name");

Assert.assertEquals("fallback_name",
HugeConverter.convertEdgeName(edge));

Mockito.when(edge.id()).thenReturn("S1:178201>bad>edge");
Assert.assertEquals("fallback_name",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithNullEdge() {
Assert.assertThrows(IllegalArgumentException.class,
() -> HugeConverter.convertEdgeName(null));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.computer.core.input.hg;

import org.apache.hugegraph.computer.core.util.HugeClientUtil;
import org.apache.hugegraph.rest.RestResult;
import org.apache.hugegraph.structure.schema.EdgeLabel;
import org.apache.hugegraph.testutil.Assert;
import org.junit.Test;

public class HugeClientCompatibilityTest {

@Test
public void testReadEdgeLabelWithCurrentServerFields() {
HugeClientUtil.registerCompatibilityModule();

String content = "{" +
"\"id\":1," +
"\"name\":\"link\"," +
"\"edgelabel_type\":\"NORMAL\"," +
"\"source_label\":\"user\"," +
"\"target_label\":\"user\"," +
"\"links\":[{\"user\":\"user\"}]," +
"\"frequency\":\"SINGLE\"," +
"\"sort_keys\":[]," +
"\"nullable_keys\":[]," +
"\"index_labels\":[]," +
"\"properties\":[]," +
"\"status\":\"CREATED\"," +
"\"ttl\":0," +
"\"enable_label_index\":true," +
"\"user_data\":{\"~create_time\":\"2026-06-22 15:26:42.781\"}" +
"}";

EdgeLabel edgeLabel = new RestResult(200, content, null).readObject(
EdgeLabel.class);

Assert.assertEquals("link", edgeLabel.name());
Assert.assertEquals("user", edgeLabel.sourceLabel());
Assert.assertEquals("user", edgeLabel.targetLabel());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hugegraph.computer.core.io.Writable;
import org.apache.hugegraph.computer.core.store.entry.EntryInput;
import org.apache.hugegraph.computer.core.store.entry.EntryInputImpl;
import org.apache.hugegraph.computer.core.util.HugeClientUtil;
import org.apache.hugegraph.computer.core.util.ComputerContextUtil;
import org.apache.hugegraph.computer.core.worker.MockComputationParams;
import org.apache.hugegraph.config.OptionSpace;
Expand Down Expand Up @@ -287,7 +288,8 @@ protected static StreamGraphOutput newStreamGraphOutput(

protected static synchronized HugeClient client() {
if (CLIENT == null) {
CLIENT = HugeClient.builder(URL, GRAPH).configUser(USERNAME, PASSWORD).build();
CLIENT = HugeClientUtil.newHugeClient(URL, GRAPH, USERNAME,
PASSWORD);
}
return CLIENT;
}
Expand Down
Loading