Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ target/
.classpath
.project
.settings

# gtags files
GPATH
GRTAGS
GTAGS
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,11 @@ Java client for [ROMA](http://roma-kvs.org/ "ROMA").
## Promoters

Roma is promoted by [Rakuten, Inc.](http://global.rakuten.com/corp/) and [Rakuten Institute of Technology](http://rit.rakuten.co.jp/).

## Test

Start ROMA server at localhost port 11311 and localhost port 11411, before run test.

Run test as below command.

mvn test
2 changes: 1 addition & 1 deletion romac4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.rakuten.rit.roma</groupId>
<artifactId>romac4j</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>
<packaging>jar</packaging>

<name>romac4j</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,31 @@ public ClientObject(ClientObject obj){
this.routing = obj.routing;
this.maxRetry = obj.maxRetry;
}

public ClientObject(String nodeId){

public ClientObject(String nodeId) {
this(new String[]{ nodeId} );
}

public ClientObject(String[] nodeIds) {
BasicConfigurator.configure();

try{
RomaSocketPool.getInstance();
}catch(RuntimeException e){
} catch(RuntimeException e){
RomaSocketPool.init();
log.warn("ClientObject() : SocketPool initialized in RomaClient().");
}
routing = new Routing(nodeId);

routing = new Routing(nodeIds);
routing.start();

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("ClientObject() : " + e.getMessage());
}
}

public void destroy() {
routing.stopThread();
}
Expand Down Expand Up @@ -107,10 +111,10 @@ protected Receiver sendCmd(Receiver rcv, String cmd, String key,
routing.returnConnection(con);
} catch (ParseException e) {
routing.returnConnection(con);
log.error("sendCmd(): " + e.getMessage());
log.error("sendCmd(): ", e);
throw new RuntimeException(e);
} catch (Exception e) {
log.error("sendCmd(): " + e.getMessage());
log.error("sendCmd(): ", e);
retry = true;
log.debug("sendCmd(): retry=" + rcv.retry);
routing.failCount(con);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ public RomaClient(String nodeId) {
super(nodeId);
}

public RomaClient(String[] nodeIds) {
super(nodeIds);
}

public byte[] get(String key) throws IOException {
return sendCmdV("get", key).getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@ public void connect(SocketAddress endpoint, int timeout) throws IOException {
is = new BufferedInputStream(getInputStream());
os = new BufferedOutputStream(getOutputStream());
}

public void write(String cmd, String key, String opt, byte[] value,
int casid) throws TimeoutException, IOException {
if (cmd == null || cmd.length() == 0) {
log.error("write() : cmd string is null or empty.");
// fatal error : stop an application
throw new IllegalArgumentException("fatal : cmd string is null or empty.");
}

if (os == null) {
log.error("write() : os is null");
// fatal error : stop an application
throw new RuntimeException("fatal : os is null:" + nodeId);
}

String cmdBuff = cmd;
if (key != null && key.length() != 0) {
cmdBuff += " " + key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class RomaSocketPool {
protected static Logger log = Logger.getLogger(RomaSocketPool.class
.getName());
private static RomaSocketPool instance = null;
private static final int GET_CONNECTION_RETRY_MAX = GenericObjectPool.DEFAULT_MAX_IDLE + 1;

private RomaSocketPool() {
poolMap = Collections.synchronizedMap(new HashMap<String, GenericObjectPool<Connection>>());
Expand Down Expand Up @@ -50,9 +51,13 @@ public static synchronized void init(int maxActive, int maxIdle, int timeout, in
}

public synchronized Connection getConnection(String nodeId) throws Exception {
return getConnection(nodeId, 0);
}

public synchronized Connection getConnection(String nodeId, int retryCount) throws Exception {
GenericObjectPool<Connection> pool = poolMap.get(nodeId);
if (pool == null) {
PoolableObjectFactory<Connection> factory =
PoolableObjectFactory<Connection> factory =
new SocketPoolFactory(nodeId, bufferSize, timeout);
pool = new GenericObjectPool<Connection>(factory);
pool.setMaxActive(maxActive);
Expand All @@ -61,7 +66,22 @@ public synchronized Connection getConnection(String nodeId) throws Exception {
poolMap.put(nodeId, pool);
}
Connection con = pool.borrowObject();
con.setSoTimeout(timeout);
try {
con.setSoTimeout(timeout);
} catch (SocketException e) {
log.debug("getConnection setSoTimeout throws exception, so close it", e);
try {
con.close();
} catch (Exception e2) {
log.debug("socket close error", e2);
}
if(retryCount < GET_CONNECTION_RETRY_MAX) {
log.debug("getConnection retry");
con = getConnection(nodeId, (retryCount + 1));
} else {
throw e;
}
}

return con;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void run() {
}
Thread.sleep(threadSleep);
} catch (Exception e) {
log.debug("routing check thread : " + e.getMessage());
log.debug("routing check thread : ", e);
}
}
log.info("routing check thread : stopped");
Expand All @@ -85,26 +85,62 @@ public Connection getConnection() throws Exception {
if (routingData != null) {
return (sps.getConnection(routingData.getRandomNodeId()));
} else {
int n = rnd.nextInt(initialNodes.length);
return (sps.getConnection(initialNodes[n]));

Connection c = null;
String node = null;
try {
int n = rnd.nextInt(initialNodes.length);
node = initialNodes[n];
c = sps.getConnection(node);
getMklHash(c);
return c;
} catch(Exception e) {
log.debug("getConnection error(" + node + ")", e);
if (c != null) {
failCount(c);
c = null;
}
}

// randam access node was dead. try to get connection sequentially.
for(String n : initialNodes) {
try {
c = sps.getConnection(n);
getMklHash(c);
return c;
} catch(Exception e) {
log.debug("getConnection error(" + n + ")", e);
if (c != null) {
failCount(c);
c = null;
}
}
}

return null;
}
}

public Connection getConnection(String key) throws Exception {
String nid = null;
try {
if(routingData == null) {
log.debug("routingData isn't initialized:" + key);
return getConnection();
}
nid = routingData.getPrimaryNodeId(key);
if (nid == null) {
log.error("getConnection() : can't get a primary node. key = "
+ key);
return getConnection();
}
return (sps.getConnection(nid));
return sps.getConnection(nid);
} catch (NoSuchAlgorithmException ex) {
log.error("getConnection() : " + ex.getMessage());
log.error("getConnection() : ", ex);
// fatal error : stop an application
throw new RuntimeException("fatal : " + ex.getMessage());
throw new RuntimeException("fatal : ", ex);
} catch (Exception ex2) {
log.error("getConnection() Exception. Return dummy connection object ", ex2);
// returns a dummy connection for failCount()
return new Connection(nid, 0);
}
Expand Down Expand Up @@ -137,7 +173,9 @@ public void failCount(String nid) {
log.info("failCount(): failover");
failCountMap.clear();
prevRoutingData = routingData;
routingData = routingData.failOver(nid);
if(routingData != null) {
routingData = routingData.failOver(nid);
}
sps.deleteConnection(nid);
} else {
failCountMap.put(nid, n);
Expand All @@ -154,15 +192,22 @@ public void setThreadSleep(int n) {
}

private String getMklHash() {
Connection con = null;
try {
return getMklHash(getConnection());
} catch (Exception e) {
log.error("getMklHash() : ", e);
return null;
}
}

private String getMklHash(Connection con) {
Receiver rcv = new StringReceiver();
try {
con = getConnection();
con.write("mklhash 0");
rcv.receive(con);
returnConnection(con);
} catch (Exception e) {
log.error("getMklHash() : " + e.getMessage());
log.error("getMklHash() : ", e);
failCount(con);
return null;
}
Expand All @@ -181,20 +226,20 @@ private RoutingData getRoutingDump() {
routingData = new RoutingData(buff);
returnConnection(con);
} catch (ParseException e) {
log.error("getRoutingDump() : " + e.getMessage());
log.error("getRoutingDump() : ", e);
returnConnection(con);
return null;
} catch (Exception e) {
log.warn("getRoutingDump() : " + e.getMessage());
log.warn("getRoutingDump() : ", e);
failCount(con);
return null;
}
return routingData;
}

private class RoutingReceiver extends Receiver {
private byte[] value = null;

@Override
public void receive(Connection con) throws TimeoutException, IOException, ParseException {
int len = 0;
Expand All @@ -206,7 +251,7 @@ public void receive(Connection con) throws TimeoutException, IOException, ParseE
try {
len = Integer.parseInt(str);
} catch (NumberFormatException e) {
log.error("RoutingReceiver.receive() : NumberFormatException [" + str + "] " + e.getMessage());
log.error("RoutingReceiver.receive() : NumberFormatException [" + str + "] ", e);
throw new ParseException(str, -1);
}
if (len > 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.rakuten.rit.roma.romac4j;

import junit.framework.Test;
import junit.framework.TestCase;

import java.lang.Thread;

import com.rakuten.rit.roma.romac4j.connection.Connection;
import com.rakuten.rit.roma.romac4j.RomaClient;

/**
* This is test class when pooled socked is already closed.
*
* If client keep ROMA conection long time, ROMA server close connectoin
* when socket is opened more than expired time.
*
* So this is test case that make sure to run properly even if socket is
* closed by server side.
*/
public class ClosedSocketTest extends TestCase {
class TestRomaClient extends RomaClient {
public TestRomaClient(String node) {
super(node);
}

// emulate that socket is closed by server side
public void closeSocketForTest() throws Exception {
while(routing == null) {
Thread.yield();
}

// routing
Connection con = routing.getConnection("localhost_11311");
con.close();
routing.returnConnection(con);
con = routing.getConnection("localhost_11411");
con.close();
routing.returnConnection(con);
}
}

public void testClosedSocket() throws Exception {
TestRomaClient rc = null;

try {
rc = new TestRomaClient("localhost_11311");
rc.closeSocketForTest();

assertTrue(rc.set("key", "", 0));
assertEquals("", rc.getString("key"));
assertTrue(rc.set("key", "test", 0));
assertEquals("test", rc.getString("key"));
} finally {
if(rc != null) {
rc.destroy();
}
}
}
}
Loading