From 512cfe7bfd7a5d48f30b470c356ec009d6034cc5 Mon Sep 17 00:00:00 2001 From: byplayer Date: Tue, 25 Jul 2017 12:50:45 +0800 Subject: [PATCH 01/14] add gtags files into ignore list --- .gitignore | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.gitignore b/.gitignore index 20d5fe7..83634a9 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,8 @@ target/ .classpath .project .settings + +# gtags files +GPATH +GRTAGS +GTAGS \ No newline at end of file From 15f69b17725cfb24fca6f89fad341c851c3c2ec0 Mon Sep 17 00:00:00 2001 From: byplayer Date: Tue, 25 Jul 2017 12:51:15 +0800 Subject: [PATCH 02/14] support String[] of RomaClient constructor --- .../rit/roma/romac4j/ClientObject.java | 18 ++++++---- .../rakuten/rit/roma/romac4j/RomaClient.java | 4 +++ .../romac4j/NodeArrayConstractorTest.java | 33 +++++++++++++++++++ 3 files changed, 48 insertions(+), 7 deletions(-) create mode 100644 romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstractorTest.java diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java index da4fdf0..3d534c0 100644 --- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java +++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java @@ -19,27 +19,31 @@ public ClientObject(ClientObject obj){ this.routing = obj.routing; this.maxRetry = obj.maxRetry; } - - public ClientObject(String nodeId){ + + public ClientObject(String nodeId) { + this(new String[]{ nodeId} ); + } + + public ClientObject(String[] nodeIds) { BasicConfigurator.configure(); try{ RomaSocketPool.getInstance(); - }catch(RuntimeException e){ + } catch(RuntimeException e){ RomaSocketPool.init(); log.warn("ClientObject() : SocketPool initialized in RomaClient()."); } - - routing = new Routing(nodeId); + + routing = new Routing(nodeIds); routing.start(); - + try { Thread.sleep(1000); } catch (InterruptedException e) { log.error("ClientObject() : " + e.getMessage()); } } - + public void destroy() { routing.stopThread(); } diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/RomaClient.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/RomaClient.java index d31ea09..52a50e0 100644 --- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/RomaClient.java +++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/RomaClient.java @@ -19,6 +19,10 @@ public RomaClient(String nodeId) { super(nodeId); } + public RomaClient(String[] nodeIds) { + super(nodeIds); + } + public byte[] get(String key) throws IOException { return sendCmdV("get", key).getValue(); } diff --git a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstractorTest.java b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstractorTest.java new file mode 100644 index 0000000..ecbfde0 --- /dev/null +++ b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstractorTest.java @@ -0,0 +1,33 @@ +package com.rakuten.rit.roma.romac4j; + +import junit.framework.Test; +import junit.framework.TestCase; + +public class NodeArrayConstractorTest extends TestCase { + private void checkSetGet(String[] nodeIds) throws Exception { + RomaClient rc = null; + + try { + rc = new RomaClient(nodeIds); + + assertTrue(rc.set("key", "test", 0)); + assertEquals("test", rc.getString("key")); + } finally { + if(rc != null) { + rc.destroy(); + } + } + } + + public void testAliveNodes() throws Exception { + checkSetGet(new String[]{"localhost_11211", "localhost_11311"}); + } + + public void testFirstNodeIsDead() throws Exception { + checkSetGet(new String[]{"localhost_11111", "localhost_11211"}); + } + + public void testSecondNodeIsDead() throws Exception { + checkSetGet(new String[]{"localhost_11211", "localhost_11311"}); + } +} From f3a59b8b106540b568b46f9ef984ab6ac7636b3f Mon Sep 17 00:00:00 2001 From: byplayer Date: Tue, 25 Jul 2017 12:53:27 +0800 Subject: [PATCH 03/14] fix typo --- ...eArrayConstractorTest.java => NodeArrayConstructorTest.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename romac4j/src/test/java/com/rakuten/rit/roma/romac4j/{NodeArrayConstractorTest.java => NodeArrayConstructorTest.java} (93%) diff --git a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstractorTest.java b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java similarity index 93% rename from romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstractorTest.java rename to romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java index ecbfde0..0900e64 100644 --- a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstractorTest.java +++ b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java @@ -3,7 +3,7 @@ import junit.framework.Test; import junit.framework.TestCase; -public class NodeArrayConstractorTest extends TestCase { +public class NodeArrayConstructorTest extends TestCase { private void checkSetGet(String[] nodeIds) throws Exception { RomaClient rc = null; From 89262ae1e245f96625157ed3a555aefc5232b941 Mon Sep 17 00:00:00 2001 From: byplayer Date: Thu, 21 Sep 2017 11:52:19 +0800 Subject: [PATCH 04/14] clean up key for test --- .../com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java index 0900e64..298ff03 100644 --- a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java +++ b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java @@ -12,6 +12,7 @@ private void checkSetGet(String[] nodeIds) throws Exception { assertTrue(rc.set("key", "test", 0)); assertEquals("test", rc.getString("key")); + rc.delete("key"); } finally { if(rc != null) { rc.destroy(); From 60d51116bdcbf2b8bea8428426b7621e87592389 Mon Sep 17 00:00:00 2001 From: byplayer Date: Thu, 21 Sep 2017 11:52:43 +0800 Subject: [PATCH 05/14] change test name more meaning --- .../com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java index 298ff03..486e5cf 100644 --- a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java +++ b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java @@ -20,7 +20,7 @@ private void checkSetGet(String[] nodeIds) throws Exception { } } - public void testAliveNodes() throws Exception { + public void testFirstNodeIsAlive() throws Exception { checkSetGet(new String[]{"localhost_11211", "localhost_11311"}); } From 299749960034d66bc61235ec466e714c505d94a4 Mon Sep 17 00:00:00 2001 From: byplayer Date: Thu, 28 Sep 2017 11:05:52 +0800 Subject: [PATCH 06/14] fix dead node issue for initialize --- .../rit/roma/romac4j/ClientObject.java | 4 +- .../rit/roma/romac4j/routing/Routing.java | 63 +++++++++++++++---- 2 files changed, 52 insertions(+), 15 deletions(-) diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java index 3d534c0..6a03341 100644 --- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java +++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/ClientObject.java @@ -111,10 +111,10 @@ protected Receiver sendCmd(Receiver rcv, String cmd, String key, routing.returnConnection(con); } catch (ParseException e) { routing.returnConnection(con); - log.error("sendCmd(): " + e.getMessage()); + log.error("sendCmd(): ", e); throw new RuntimeException(e); } catch (Exception e) { - log.error("sendCmd(): " + e.getMessage()); + log.error("sendCmd(): ", e); retry = true; log.debug("sendCmd(): retry=" + rcv.retry); routing.failCount(con); diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java index c4732f0..14e7e83 100644 --- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java +++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java @@ -71,7 +71,7 @@ public void run() { } Thread.sleep(threadSleep); } catch (Exception e) { - log.debug("routing check thread : " + e.getMessage()); + log.debug("routing check thread : ", e); } } log.info("routing check thread : stopped"); @@ -85,8 +85,38 @@ public Connection getConnection() throws Exception { if (routingData != null) { return (sps.getConnection(routingData.getRandomNodeId())); } else { - int n = rnd.nextInt(initialNodes.length); - return (sps.getConnection(initialNodes[n])); + + Connection c = null; + try { + int n = rnd.nextInt(initialNodes.length); + String node = initialNodes[n]; + c = sps.getConnection(node); + getMklHash(c); + return c; + } catch(Exception e) { + log.debug("getConnection error ", e); + if (c != null) { + failCount(c); + c = null; + } + } + + // randam access node was dead. try to get connection sequentially. + for(String node : initialNodes) { + try { + c = sps.getConnection(node); + getMklHash(c); + return c; + } catch(Exception e) { + log.debug("getConnection error ", e); + if (c != null) { + failCount(c); + c = null; + } + } + } + + return null; } } @@ -101,9 +131,9 @@ public Connection getConnection(String key) throws Exception { } return (sps.getConnection(nid)); } catch (NoSuchAlgorithmException ex) { - log.error("getConnection() : " + ex.getMessage()); + log.error("getConnection() : ", ex); // fatal error : stop an application - throw new RuntimeException("fatal : " + ex.getMessage()); + throw new RuntimeException("fatal : ", ex); } catch (Exception ex2) { // returns a dummy connection for failCount() return new Connection(nid, 0); @@ -154,15 +184,22 @@ public void setThreadSleep(int n) { } private String getMklHash() { - Connection con = null; + try { + return getMklHash(getConnection()); + } catch (Exception e) { + log.error("getMklHash() : ", e); + return null; + } + } + + private String getMklHash(Connection con) { Receiver rcv = new StringReceiver(); try { - con = getConnection(); con.write("mklhash 0"); rcv.receive(con); returnConnection(con); } catch (Exception e) { - log.error("getMklHash() : " + e.getMessage()); + log.error("getMklHash() : ", e); failCount(con); return null; } @@ -181,20 +218,20 @@ private RoutingData getRoutingDump() { routingData = new RoutingData(buff); returnConnection(con); } catch (ParseException e) { - log.error("getRoutingDump() : " + e.getMessage()); + log.error("getRoutingDump() : ", e); returnConnection(con); return null; } catch (Exception e) { - log.warn("getRoutingDump() : " + e.getMessage()); + log.warn("getRoutingDump() : ", e); failCount(con); return null; } return routingData; } - + private class RoutingReceiver extends Receiver { private byte[] value = null; - + @Override public void receive(Connection con) throws TimeoutException, IOException, ParseException { int len = 0; @@ -206,7 +243,7 @@ public void receive(Connection con) throws TimeoutException, IOException, ParseE try { len = Integer.parseInt(str); } catch (NumberFormatException e) { - log.error("RoutingReceiver.receive() : NumberFormatException [" + str + "] " + e.getMessage()); + log.error("RoutingReceiver.receive() : NumberFormatException [" + str + "] ", e); throw new ParseException(str, -1); } if (len > 0) { From fa156f66cdf939c8702e5e1b906dc044bdeb8b96 Mon Sep 17 00:00:00 2001 From: byplayer Date: Fri, 29 Sep 2017 14:51:41 +0800 Subject: [PATCH 07/14] Fix send commands error when socket is closed If pooled socket expired by ROMA server, socket is closed, but client still has that connection. In that case, client received error. So I checked exception, and catch it, then retry once. --- .../roma/romac4j/connection/Connection.java | 9 ++- .../romac4j/connection/RomaSocketPool.java | 22 ++++++- .../rit/roma/romac4j/routing/Routing.java | 14 +++-- .../rit/roma/romac4j/ClosedSocketTest.java | 59 +++++++++++++++++++ .../romac4j/NodeArrayConstructorTest.java | 6 ++ 5 files changed, 101 insertions(+), 9 deletions(-) create mode 100644 romac4j/src/test/java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/Connection.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/Connection.java index 24738f2..9a48def 100644 --- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/Connection.java +++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/Connection.java @@ -29,7 +29,7 @@ public void connect(SocketAddress endpoint) throws IOException { is = new BufferedInputStream(getInputStream()); os = new BufferedOutputStream(getOutputStream()); } - + public void write(String cmd, String key, String opt, byte[] value, int casid) throws TimeoutException, IOException { if (cmd == null || cmd.length() == 0) { @@ -37,6 +37,13 @@ public void write(String cmd, String key, String opt, byte[] value, // fatal error : stop an application throw new IllegalArgumentException("fatal : cmd string is null or empty."); } + + if (os == null) { + log.error("write() : os is null"); + // fatal error : stop an application + throw new RuntimeException("fatal : os is null:" + nodeId); + } + String cmdBuff = cmd; if (key != null && key.length() != 0) { cmdBuff += " " + key; diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java index 7a96504..fa61c8c 100644 --- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java +++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java @@ -50,9 +50,13 @@ public static synchronized void init(int maxActive, int maxIdle, int timeout, in } public synchronized Connection getConnection(String nodeId) throws Exception { + return getConnection(nodeId, false); + } + + public synchronized Connection getConnection(String nodeId, boolean retry) throws Exception { GenericObjectPool pool = poolMap.get(nodeId); if (pool == null) { - PoolableObjectFactory factory = + PoolableObjectFactory factory = new SocketPoolFactory(nodeId, bufferSize); pool = new GenericObjectPool(factory); pool.setMaxActive(maxActive); @@ -61,7 +65,21 @@ public synchronized Connection getConnection(String nodeId) throws Exception { poolMap.put(nodeId, pool); } Connection con = pool.borrowObject(); - con.setSoTimeout(timeout); + try { + con.setSoTimeout(timeout); + } catch (SocketException e) { + log.debug("getConnection setSoTimeout throws exception, so close it", e); + try { + con.close(); + } catch (Exception e2) { + log.debug("socket close error", e2); + } + if(!retry) { + con = getConnection(nodeId); + } else { + throw e; + } + } return con; } diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java index 14e7e83..7fafa75 100644 --- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java +++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java @@ -87,14 +87,15 @@ public Connection getConnection() throws Exception { } else { Connection c = null; + String node = null; try { int n = rnd.nextInt(initialNodes.length); - String node = initialNodes[n]; + node = initialNodes[n]; c = sps.getConnection(node); getMklHash(c); return c; } catch(Exception e) { - log.debug("getConnection error ", e); + log.debug("getConnection error(" + node + ")", e); if (c != null) { failCount(c); c = null; @@ -102,13 +103,13 @@ public Connection getConnection() throws Exception { } // randam access node was dead. try to get connection sequentially. - for(String node : initialNodes) { + for(String n : initialNodes) { try { - c = sps.getConnection(node); + c = sps.getConnection(n); getMklHash(c); return c; } catch(Exception e) { - log.debug("getConnection error ", e); + log.debug("getConnection error(" + n + ")", e); if (c != null) { failCount(c); c = null; @@ -129,12 +130,13 @@ public Connection getConnection(String key) throws Exception { + key); return getConnection(); } - return (sps.getConnection(nid)); + return sps.getConnection(nid); } catch (NoSuchAlgorithmException ex) { log.error("getConnection() : ", ex); // fatal error : stop an application throw new RuntimeException("fatal : ", ex); } catch (Exception ex2) { + log.error("getConnection() Exception. Return dummy connection object ", ex2); // returns a dummy connection for failCount() return new Connection(nid, 0); } diff --git a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java new file mode 100644 index 0000000..e561bb7 --- /dev/null +++ b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java @@ -0,0 +1,59 @@ +package com.rakuten.rit.roma.romac4j; + +import junit.framework.Test; +import junit.framework.TestCase; + +import java.lang.Thread; + +import com.rakuten.rit.roma.romac4j.connection.Connection; +import com.rakuten.rit.roma.romac4j.RomaClient; + +/** + * This is test class when pooled socked is already closed. + * + * If client keep ROMA conection long time, ROMA server close connectoin + * when socket is opened more than expired time. + * + * So this is test case that make sure to run properly even if socket is + * closed by server side. + */ +public class ClosedSocketTest extends TestCase { + class TestRomaClient extends RomaClient { + public TestRomaClient(String node) { + super(node); + } + + // emulate that socket is closed by server side + public void closeSocketForTest() throws Exception { + while(routing == null) { + Thread.yield(); + } + + // routing + Connection con = routing.getConnection("localhost_11211"); + con.close(); + routing.returnConnection(con); + con = routing.getConnection("localhost_11311"); + con.close(); + routing.returnConnection(con); + } + } + + public void testClosedSocket() throws Exception { + TestRomaClient rc = null; + + try { + rc = new TestRomaClient("localhost_11211"); + rc.closeSocketForTest(); + + assertTrue(rc.set("key", "", 0)); + assertEquals("", rc.getString("key")); + assertTrue(rc.set("key", "test", 0)); + assertEquals("test", rc.getString("key")); + } finally { + if(rc != null) { + rc.destroy(); + } + } + } +} diff --git a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java index 486e5cf..7983dd9 100644 --- a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java +++ b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java @@ -3,6 +3,12 @@ import junit.framework.Test; import junit.framework.TestCase; + +/** + * This is test case for nodeId array constructor. + * + * Some test case include dead(can't connect) node information. + */ public class NodeArrayConstructorTest extends TestCase { private void checkSetGet(String[] nodeIds) throws Exception { RomaClient rc = null; From b0c5cd2f153d72336acec93749c216bcd443bdfd Mon Sep 17 00:00:00 2001 From: byplayer Date: Fri, 29 Sep 2017 14:55:19 +0800 Subject: [PATCH 08/14] add test section --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index d7e2f51..54253a9 100644 --- a/README.md +++ b/README.md @@ -6,3 +6,11 @@ Java client for [ROMA](http://roma-kvs.org/ "ROMA"). ## Promoters Roma is promoted by [Rakuten, Inc.](http://global.rakuten.com/corp/) and [Rakuten Institute of Technology](http://rit.rakuten.co.jp/). + +## Test + +Start ROMA server at localhost port 11211 and localhost port 11311, before run test. + +Run test as below command. + + mvn test From 1e5f43d6735503a0e4058870bae5c0ee3e777005 Mon Sep 17 00:00:00 2001 From: byplayer Date: Fri, 29 Sep 2017 14:59:57 +0800 Subject: [PATCH 09/14] bump up version --- romac4j/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/romac4j/pom.xml b/romac4j/pom.xml index b452632..723cf3f 100644 --- a/romac4j/pom.xml +++ b/romac4j/pom.xml @@ -6,7 +6,7 @@ com.rakuten.rit.roma romac4j - 1.0.0 + 1.1.0 jar romac4j From 2b7cb430fdb36bae180340abca505d9275bf116c Mon Sep 17 00:00:00 2001 From: byplayer Date: Fri, 29 Sep 2017 15:30:13 +0800 Subject: [PATCH 10/14] fix null pointer exception When routing isn't created, then null pointer exception occur --- .../java/com/rakuten/rit/roma/romac4j/routing/Routing.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java index 7fafa75..b0dc548 100644 --- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java +++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java @@ -169,7 +169,9 @@ public void failCount(String nid) { log.info("failCount(): failover"); failCountMap.clear(); prevRoutingData = routingData; - routingData = routingData.failOver(nid); + if(routingData != null) { + routingData = routingData.failOver(nid); + } sps.deleteConnection(nid); } else { failCountMap.put(nid, n); From 9b0d11d4cbc99608828d17f91517ac0442325b10 Mon Sep 17 00:00:00 2001 From: byplayer Date: Tue, 10 Oct 2017 14:17:17 +0800 Subject: [PATCH 11/14] Fix crash issue. When getConnection is called before initialize When getConnection is called before initialize routingData, target function throws null pointer exception. --- .../java/com/rakuten/rit/roma/romac4j/routing/Routing.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java index b0dc548..4970f69 100644 --- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java +++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/routing/Routing.java @@ -124,6 +124,10 @@ public Connection getConnection() throws Exception { public Connection getConnection(String key) throws Exception { String nid = null; try { + if(routingData == null) { + log.debug("routingData isn't initialized:" + key); + return getConnection(); + } nid = routingData.getPrimaryNodeId(key); if (nid == null) { log.error("getConnection() : can't get a primary node. key = " From 7750d7489c74c6e23bb479d3a2e749b52beab083 Mon Sep 17 00:00:00 2001 From: byplayer Date: Tue, 10 Oct 2017 14:19:30 +0800 Subject: [PATCH 12/14] fix retry logic, when socket exception has occurred When Pooled connection are closed by server, this method return exception. So add retry logic for it. --- .../rit/roma/romac4j/connection/RomaSocketPool.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java index fa61c8c..6d5a3c9 100644 --- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java +++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java @@ -13,6 +13,7 @@ public class RomaSocketPool { protected static Logger log = Logger.getLogger(RomaSocketPool.class .getName()); private static RomaSocketPool instance = null; + private static final int GET_CONNECTION_RETRY_MAX = 5; private RomaSocketPool() { poolMap = Collections.synchronizedMap(new HashMap>()); @@ -50,10 +51,10 @@ public static synchronized void init(int maxActive, int maxIdle, int timeout, in } public synchronized Connection getConnection(String nodeId) throws Exception { - return getConnection(nodeId, false); + return getConnection(nodeId, 0); } - public synchronized Connection getConnection(String nodeId, boolean retry) throws Exception { + public synchronized Connection getConnection(String nodeId, int retryCount) throws Exception { GenericObjectPool pool = poolMap.get(nodeId); if (pool == null) { PoolableObjectFactory factory = @@ -74,8 +75,9 @@ public synchronized Connection getConnection(String nodeId, boolean retry) throw } catch (Exception e2) { log.debug("socket close error", e2); } - if(!retry) { - con = getConnection(nodeId); + if(retryCount < GET_CONNECTION_RETRY_MAX) { + log.debug("getConnection retry"); + con = getConnection(nodeId, (retryCount + 1)); } else { throw e; } From 3992d5ce520ef206f8165d15003cebdc3e6111e8 Mon Sep 17 00:00:00 2001 From: byplayer Date: Tue, 10 Oct 2017 16:58:38 +0800 Subject: [PATCH 13/14] fix default retry count --- .../com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java index 6d5a3c9..ff87fa5 100644 --- a/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java +++ b/romac4j/src/main/java/com/rakuten/rit/roma/romac4j/connection/RomaSocketPool.java @@ -13,7 +13,7 @@ public class RomaSocketPool { protected static Logger log = Logger.getLogger(RomaSocketPool.class .getName()); private static RomaSocketPool instance = null; - private static final int GET_CONNECTION_RETRY_MAX = 5; + private static final int GET_CONNECTION_RETRY_MAX = GenericObjectPool.DEFAULT_MAX_IDLE + 1; private RomaSocketPool() { poolMap = Collections.synchronizedMap(new HashMap>()); From 0c224a2e7b0f9f57d4ac237d18e3df7dbed75ffa Mon Sep 17 00:00:00 2001 From: byplayer Date: Thu, 19 Oct 2017 13:48:02 +0800 Subject: [PATCH 14/14] change test ROMA port Change test ROMA port 11211,11311 to 11311,11411, because 11211 port is unstable on travis CI. --- README.md | 2 +- .../java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java | 6 +++--- .../rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 54253a9..b87e680 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Roma is promoted by [Rakuten, Inc.](http://global.rakuten.com/corp/) and [Rakute ## Test -Start ROMA server at localhost port 11211 and localhost port 11311, before run test. +Start ROMA server at localhost port 11311 and localhost port 11411, before run test. Run test as below command. diff --git a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java index e561bb7..e268035 100644 --- a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java +++ b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/ClosedSocketTest.java @@ -30,10 +30,10 @@ public void closeSocketForTest() throws Exception { } // routing - Connection con = routing.getConnection("localhost_11211"); + Connection con = routing.getConnection("localhost_11311"); con.close(); routing.returnConnection(con); - con = routing.getConnection("localhost_11311"); + con = routing.getConnection("localhost_11411"); con.close(); routing.returnConnection(con); } @@ -43,7 +43,7 @@ public void testClosedSocket() throws Exception { TestRomaClient rc = null; try { - rc = new TestRomaClient("localhost_11211"); + rc = new TestRomaClient("localhost_11311"); rc.closeSocketForTest(); assertTrue(rc.set("key", "", 0)); diff --git a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java index 7983dd9..4e7cbba 100644 --- a/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java +++ b/romac4j/src/test/java/com/rakuten/rit/roma/romac4j/NodeArrayConstructorTest.java @@ -27,14 +27,14 @@ private void checkSetGet(String[] nodeIds) throws Exception { } public void testFirstNodeIsAlive() throws Exception { - checkSetGet(new String[]{"localhost_11211", "localhost_11311"}); + checkSetGet(new String[]{"localhost_11311", "localhost_11411"}); } public void testFirstNodeIsDead() throws Exception { - checkSetGet(new String[]{"localhost_11111", "localhost_11211"}); + checkSetGet(new String[]{"localhost_11011", "localhost_11311"}); } public void testSecondNodeIsDead() throws Exception { - checkSetGet(new String[]{"localhost_11211", "localhost_11311"}); + checkSetGet(new String[]{"localhost_11311", "localhost_11511"}); } }