diff --git a/.gitignore b/.gitignore
index 20d5fe7..83634a9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,3 +8,8 @@ target/
.classpath
.project
.settings
+
+# gtags files
+GPATH
+GRTAGS
+GTAGS
\ No newline at end of file
diff --git a/README.md b/README.md
index d7e2f51..b87e680 100644
--- a/README.md
+++ b/README.md
@@ -6,3 +6,11 @@ Java client for [ROMA](http://roma-kvs.org/ "ROMA").
## Promoters
Roma is promoted by [Rakuten, Inc.](http://global.rakuten.com/corp/) and [Rakuten Institute of Technology](http://rit.rakuten.co.jp/).
+
+## Test
+
+Start ROMA server at localhost port 11311 and localhost port 11411, before run test.
+
+Run test as below command.
+
+ mvn test
diff --git a/romac4j/pom.xml b/romac4j/pom.xml
index b452632..723cf3f 100644
--- a/romac4j/pom.xml
+++ b/romac4j/pom.xml
@@ -6,7 +6,7 @@
com.rakuten.rit.roma
romac4j
- 1.0.0
+ 1.1.0
jar
romac4j
diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java
index da4fdf0..6a03341 100644
--- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java
+++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java
@@ -19,27 +19,31 @@ public ClientObject(ClientObject obj){
this.routing = obj.routing;
this.maxRetry = obj.maxRetry;
}
-
- public ClientObject(String nodeId){
+
+ public ClientObject(String nodeId) {
+ this(new String[]{ nodeId} );
+ }
+
+ public ClientObject(String[] nodeIds) {
BasicConfigurator.configure();
try{
RomaSocketPool.getInstance();
- }catch(RuntimeException e){
+ } catch(RuntimeException e){
RomaSocketPool.init();
log.warn("ClientObject() : SocketPool initialized in RomaClient().");
}
-
- routing = new Routing(nodeId);
+
+ routing = new Routing(nodeIds);
routing.start();
-
+
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("ClientObject() : " + e.getMessage());
}
}
-
+
public void destroy() {
routing.stopThread();
}
@@ -107,10 +111,10 @@ protected Receiver sendCmd(Receiver rcv, String cmd, String key,
routing.returnConnection(con);
} catch (ParseException e) {
routing.returnConnection(con);
- log.error("sendCmd(): " + e.getMessage());
+ log.error("sendCmd(): ", e);
throw new RuntimeException(e);
} catch (Exception e) {
- log.error("sendCmd(): " + e.getMessage());
+ log.error("sendCmd(): ", e);
retry = true;
log.debug("sendCmd(): retry=" + rcv.retry);
routing.failCount(con);
diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/RomaClient.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/RomaClient.java
index d31ea09..52a50e0 100644
--- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/RomaClient.java
+++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/RomaClient.java
@@ -19,6 +19,10 @@ public RomaClient(String nodeId) {
super(nodeId);
}
+ public RomaClient(String[] nodeIds) {
+ super(nodeIds);
+ }
+
public byte[] get(String key) throws IOException {
return sendCmdV("get", key).getValue();
}
diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/Connection.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/Connection.java
index 73009dc..521777e 100644
--- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/Connection.java
+++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/Connection.java
@@ -29,7 +29,7 @@ public void connect(SocketAddress endpoint, int timeout) throws IOException {
is = new BufferedInputStream(getInputStream());
os = new BufferedOutputStream(getOutputStream());
}
-
+
public void write(String cmd, String key, String opt, byte[] value,
int casid) throws TimeoutException, IOException {
if (cmd == null || cmd.length() == 0) {
@@ -37,6 +37,13 @@ public void write(String cmd, String key, String opt, byte[] value,
// fatal error : stop an application
throw new IllegalArgumentException("fatal : cmd string is null or empty.");
}
+
+ if (os == null) {
+ log.error("write() : os is null");
+ // fatal error : stop an application
+ throw new RuntimeException("fatal : os is null:" + nodeId);
+ }
+
String cmdBuff = cmd;
if (key != null && key.length() != 0) {
cmdBuff += " " + key;
diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java
index 11a8192..e973301 100644
--- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java
+++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java
@@ -13,6 +13,7 @@ public class RomaSocketPool {
protected static Logger log = Logger.getLogger(RomaSocketPool.class
.getName());
private static RomaSocketPool instance = null;
+ private static final int GET_CONNECTION_RETRY_MAX = GenericObjectPool.DEFAULT_MAX_IDLE + 1;
private RomaSocketPool() {
poolMap = Collections.synchronizedMap(new HashMap>());
@@ -50,9 +51,13 @@ public static synchronized void init(int maxActive, int maxIdle, int timeout, in
}
public synchronized Connection getConnection(String nodeId) throws Exception {
+ return getConnection(nodeId, 0);
+ }
+
+ public synchronized Connection getConnection(String nodeId, int retryCount) throws Exception {
GenericObjectPool pool = poolMap.get(nodeId);
if (pool == null) {
- PoolableObjectFactory factory =
+ PoolableObjectFactory factory =
new SocketPoolFactory(nodeId, bufferSize, timeout);
pool = new GenericObjectPool(factory);
pool.setMaxActive(maxActive);
@@ -61,7 +66,22 @@ public synchronized Connection getConnection(String nodeId) throws Exception {
poolMap.put(nodeId, pool);
}
Connection con = pool.borrowObject();
- con.setSoTimeout(timeout);
+ try {
+ con.setSoTimeout(timeout);
+ } catch (SocketException e) {
+ log.debug("getConnection setSoTimeout throws exception, so close it", e);
+ try {
+ con.close();
+ } catch (Exception e2) {
+ log.debug("socket close error", e2);
+ }
+ if(retryCount < GET_CONNECTION_RETRY_MAX) {
+ log.debug("getConnection retry");
+ con = getConnection(nodeId, (retryCount + 1));
+ } else {
+ throw e;
+ }
+ }
return con;
}
diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java
index c4732f0..4970f69 100644
--- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java
+++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java
@@ -71,7 +71,7 @@ public void run() {
}
Thread.sleep(threadSleep);
} catch (Exception e) {
- log.debug("routing check thread : " + e.getMessage());
+ log.debug("routing check thread : ", e);
}
}
log.info("routing check thread : stopped");
@@ -85,26 +85,62 @@ public Connection getConnection() throws Exception {
if (routingData != null) {
return (sps.getConnection(routingData.getRandomNodeId()));
} else {
- int n = rnd.nextInt(initialNodes.length);
- return (sps.getConnection(initialNodes[n]));
+
+ Connection c = null;
+ String node = null;
+ try {
+ int n = rnd.nextInt(initialNodes.length);
+ node = initialNodes[n];
+ c = sps.getConnection(node);
+ getMklHash(c);
+ return c;
+ } catch(Exception e) {
+ log.debug("getConnection error(" + node + ")", e);
+ if (c != null) {
+ failCount(c);
+ c = null;
+ }
+ }
+
+ // randam access node was dead. try to get connection sequentially.
+ for(String n : initialNodes) {
+ try {
+ c = sps.getConnection(n);
+ getMklHash(c);
+ return c;
+ } catch(Exception e) {
+ log.debug("getConnection error(" + n + ")", e);
+ if (c != null) {
+ failCount(c);
+ c = null;
+ }
+ }
+ }
+
+ return null;
}
}
public Connection getConnection(String key) throws Exception {
String nid = null;
try {
+ if(routingData == null) {
+ log.debug("routingData isn't initialized:" + key);
+ return getConnection();
+ }
nid = routingData.getPrimaryNodeId(key);
if (nid == null) {
log.error("getConnection() : can't get a primary node. key = "
+ key);
return getConnection();
}
- return (sps.getConnection(nid));
+ return sps.getConnection(nid);
} catch (NoSuchAlgorithmException ex) {
- log.error("getConnection() : " + ex.getMessage());
+ log.error("getConnection() : ", ex);
// fatal error : stop an application
- throw new RuntimeException("fatal : " + ex.getMessage());
+ throw new RuntimeException("fatal : ", ex);
} catch (Exception ex2) {
+ log.error("getConnection() Exception. Return dummy connection object ", ex2);
// returns a dummy connection for failCount()
return new Connection(nid, 0);
}
@@ -137,7 +173,9 @@ public void failCount(String nid) {
log.info("failCount(): failover");
failCountMap.clear();
prevRoutingData = routingData;
- routingData = routingData.failOver(nid);
+ if(routingData != null) {
+ routingData = routingData.failOver(nid);
+ }
sps.deleteConnection(nid);
} else {
failCountMap.put(nid, n);
@@ -154,15 +192,22 @@ public void setThreadSleep(int n) {
}
private String getMklHash() {
- Connection con = null;
+ try {
+ return getMklHash(getConnection());
+ } catch (Exception e) {
+ log.error("getMklHash() : ", e);
+ return null;
+ }
+ }
+
+ private String getMklHash(Connection con) {
Receiver rcv = new StringReceiver();
try {
- con = getConnection();
con.write("mklhash 0");
rcv.receive(con);
returnConnection(con);
} catch (Exception e) {
- log.error("getMklHash() : " + e.getMessage());
+ log.error("getMklHash() : ", e);
failCount(con);
return null;
}
@@ -181,20 +226,20 @@ private RoutingData getRoutingDump() {
routingData = new RoutingData(buff);
returnConnection(con);
} catch (ParseException e) {
- log.error("getRoutingDump() : " + e.getMessage());
+ log.error("getRoutingDump() : ", e);
returnConnection(con);
return null;
} catch (Exception e) {
- log.warn("getRoutingDump() : " + e.getMessage());
+ log.warn("getRoutingDump() : ", e);
failCount(con);
return null;
}
return routingData;
}
-
+
private class RoutingReceiver extends Receiver {
private byte[] value = null;
-
+
@Override
public void receive(Connection con) throws TimeoutException, IOException, ParseException {
int len = 0;
@@ -206,7 +251,7 @@ public void receive(Connection con) throws TimeoutException, IOException, ParseE
try {
len = Integer.parseInt(str);
} catch (NumberFormatException e) {
- log.error("RoutingReceiver.receive() : NumberFormatException [" + str + "] " + e.getMessage());
+ log.error("RoutingReceiver.receive() : NumberFormatException [" + str + "] ", e);
throw new ParseException(str, -1);
}
if (len > 0) {
diff --git a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java
new file mode 100644
index 0000000..e268035
--- /dev/null
+++ b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java
@@ -0,0 +1,59 @@
+package com.rakuten.rit.roma.romac4j;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+
+import java.lang.Thread;
+
+import com.rakuten.rit.roma.romac4j.connection.Connection;
+import com.rakuten.rit.roma.romac4j.RomaClient;
+
+/**
+ * This is test class when pooled socked is already closed.
+ *
+ * If client keep ROMA conection long time, ROMA server close connectoin
+ * when socket is opened more than expired time.
+ *
+ * So this is test case that make sure to run properly even if socket is
+ * closed by server side.
+ */
+public class ClosedSocketTest extends TestCase {
+ class TestRomaClient extends RomaClient {
+ public TestRomaClient(String node) {
+ super(node);
+ }
+
+ // emulate that socket is closed by server side
+ public void closeSocketForTest() throws Exception {
+ while(routing == null) {
+ Thread.yield();
+ }
+
+ // routing
+ Connection con = routing.getConnection("localhost_11311");
+ con.close();
+ routing.returnConnection(con);
+ con = routing.getConnection("localhost_11411");
+ con.close();
+ routing.returnConnection(con);
+ }
+ }
+
+ public void testClosedSocket() throws Exception {
+ TestRomaClient rc = null;
+
+ try {
+ rc = new TestRomaClient("localhost_11311");
+ rc.closeSocketForTest();
+
+ assertTrue(rc.set("key", "", 0));
+ assertEquals("", rc.getString("key"));
+ assertTrue(rc.set("key", "test", 0));
+ assertEquals("test", rc.getString("key"));
+ } finally {
+ if(rc != null) {
+ rc.destroy();
+ }
+ }
+ }
+}
diff --git a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java
new file mode 100644
index 0000000..4e7cbba
--- /dev/null
+++ b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java
@@ -0,0 +1,40 @@
+package com.rakuten.rit.roma.romac4j;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+
+
+/**
+ * This is test case for nodeId array constructor.
+ *
+ * Some test case include dead(can't connect) node information.
+ */
+public class NodeArrayConstructorTest extends TestCase {
+ private void checkSetGet(String[] nodeIds) throws Exception {
+ RomaClient rc = null;
+
+ try {
+ rc = new RomaClient(nodeIds);
+
+ assertTrue(rc.set("key", "test", 0));
+ assertEquals("test", rc.getString("key"));
+ rc.delete("key");
+ } finally {
+ if(rc != null) {
+ rc.destroy();
+ }
+ }
+ }
+
+ public void testFirstNodeIsAlive() throws Exception {
+ checkSetGet(new String[]{"localhost_11311", "localhost_11411"});
+ }
+
+ public void testFirstNodeIsDead() throws Exception {
+ checkSetGet(new String[]{"localhost_11011", "localhost_11311"});
+ }
+
+ public void testSecondNodeIsDead() throws Exception {
+ checkSetGet(new String[]{"localhost_11311", "localhost_11511"});
+ }
+}