Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2241,7 +2241,7 @@ int getAddedQueueSize() {
*
* @return all memcached nodes from node locator
*/
protected Collection<MemcachedNode> getAllNodes() {
public Collection<MemcachedNode> getAllNodes() {
return conn.getLocator().getAll();
}

Expand Down
281 changes: 217 additions & 64 deletions src/main/java/net/spy/memcached/v2/AsyncArcusCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package net.spy.memcached.v2;

import java.net.SocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -98,6 +99,7 @@
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.StatsOperation;
import net.spy.memcached.ops.StatusCode;
import net.spy.memcached.ops.StoreType;
import net.spy.memcached.transcoders.Transcoder;
Expand Down Expand Up @@ -627,34 +629,6 @@ public void complete() {
return future;
}

public ArcusFuture<Boolean> flush(int delay) {
ArcusClient client = arcusClientSupplier.get();
Collection<MemcachedNode> nodes = client.getFlushNodes();

Collection<CompletableFuture<?>> futures = new ArrayList<>();

for (MemcachedNode node : nodes) {
CompletableFuture<Boolean> future = flush(client, node, delay).toCompletableFuture();
futures.add(future);
}

/*
* Combine all futures. Returns true if all flush operations succeed.
* Returns false if any flush operation fails.
*/
return new ArcusMultiFuture<>(futures, () -> {
for (CompletableFuture<?> future : futures) {
if (future.isCompletedExceptionally()) {
return false;
}
Boolean result = (Boolean) future.join();
if (result == null || !result) {
return false;
}
}
return true;
});
}

public ArcusFuture<Boolean> delete(String key) {
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
Expand Down Expand Up @@ -714,41 +688,6 @@ public ArcusFuture<Map<String, Boolean>> multiDelete(List<String> keys) {
});
}

/**
* Use only in flush method.
*
* @param client the ArcusClient instance to use
* @param node the MemcachedNode to flush
* @param delay flush delay
* @return ArcusFuture with flush result
*/
private ArcusFuture<Boolean> flush(ArcusClient client, MemcachedNode node, int delay) {
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result);

OperationCallback cb = new OperationCallback() {
@Override
public void receivedStatus(OperationStatus status) {
if (status.getStatusCode() == StatusCode.CANCELLED) {
future.internalCancel();
return;
}
result.set(status.isSuccess());
}

@Override
public void complete() {
future.complete();
}
};

Operation op = client.getOpFact().flush(delay, cb);
future.setOp(op);
client.addOp(node, op);

return future;
}

public ArcusFuture<Boolean> bopCreate(String key, ElementValueType type,
CollectionAttributes attributes) {
if (attributes == null) {
Expand Down Expand Up @@ -1175,7 +1114,7 @@ public ArcusFuture<Map<String, BTreeElements<T>>> bopMultiGet(List<String> keys,
return new ArcusMultiFuture<>(futures, () -> {
Map<String, BTreeElements<T>> results = new HashMap<>();
for (Map.Entry<CompletableFuture<Map<String, BTreeElements<T>>>, List<String>> entry
: futureToKeys.entrySet()) {
: futureToKeys.entrySet()) {
if (entry.getKey().isCompletedExceptionally()) {
for (String key : entry.getValue()) {
results.put(key, null);
Expand Down Expand Up @@ -2125,4 +2064,218 @@ public ArcusFuture<Boolean> mopDelete(String key, List<String> mKeys, boolean dr
MapDelete delete = new MapDelete(mKeys, dropIfEmpty, false);
return collectionDelete(key, delete);
}

public ArcusFuture<Boolean> flush() {
return flush(-1);
}

public ArcusFuture<Boolean> flush(int delay) {
if (delay < -1) {
throw new IllegalArgumentException("Delay should be greater than or equal to -1");
}

ArcusClient client = arcusClientSupplier.get();
Collection<MemcachedNode> nodes = client.getFlushNodes();
Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size());

for (MemcachedNode node : nodes) {
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result);

OperationCallback cb = new OperationCallback() {
@Override
public void receivedStatus(OperationStatus status) {
switch (status.getStatusCode()) {
case SUCCESS:
result.set(true);
break;
case CANCELLED:
future.internalCancel();
break;
default:
result.addError(node.getSocketAddress().toString(), status);
break;
}
}

@Override
public void complete() {
future.complete();
}
};

Operation op = client.getOpFact().flush(delay, cb);
future.setOp(op);
client.addOp(node, op);
futures.add(future);
}

return new ArcusMultiFuture<>(futures, () -> true);
Comment thread
oliviarla marked this conversation as resolved.
}

public ArcusFuture<Boolean> flush(String prefix) {
return flush(prefix, -1);
}

public ArcusFuture<Boolean> flush(String prefix, int delay) {
Comment thread
f1v3-dev marked this conversation as resolved.
if (prefix == null) {
throw new IllegalArgumentException("Prefix should not be null");
}
Comment thread
oliviarla marked this conversation as resolved.

if (delay < -1) {
throw new IllegalArgumentException("Delay should be greater than or equal to -1");
}

ArcusClient client = arcusClientSupplier.get();
Collection<MemcachedNode> nodes = client.getFlushNodes();
Collection<CompletableFuture<?>> futures = new ArrayList<>(nodes.size());

for (MemcachedNode node : nodes) {
AbstractArcusResult<Boolean> result = new AbstractArcusResult<>(new AtomicReference<>());
ArcusFutureImpl<Boolean> future = new ArcusFutureImpl<>(result);

OperationCallback cb = new OperationCallback() {
@Override
public void receivedStatus(OperationStatus status) {
switch (status.getStatusCode()) {
case SUCCESS:
result.set(true);
break;
case ERR_NOT_FOUND:
result.set(false);
break;
case CANCELLED:
future.internalCancel();
break;
default:
result.addError(node.getSocketAddress().toString(), status);
break;
}
}

@Override
public void complete() {
future.complete();
}
};

Operation op = client.getOpFact()
.flush(prefix.isEmpty() ? "<null>" : prefix, delay, false, cb);
future.setOp(op);
client.addOp(node, op);
futures.add(future);
}


return new ArcusMultiFuture<>(futures, () -> {
for (CompletableFuture<?> future : futures) {
if (!future.isCompletedExceptionally() && Boolean.TRUE.equals(future.join())) {
return true;
}
}
return false;
Comment thread
jhpark816 marked this conversation as resolved.
Comment thread
oliviarla marked this conversation as resolved.
});
}

public ArcusFuture<Map<SocketAddress, Map<String, String>>> stats() {
return stats(StatsArg.GENERAL);
}

public ArcusFuture<Map<SocketAddress, Map<String, String>>> stats(StatsArg arg) {
ArcusClient client = arcusClientSupplier.get();
Collection<MemcachedNode> nodes = client.getAllNodes();

Map<SocketAddress, CompletableFuture<?>> addressToFuture = new HashMap<>(nodes.size());

for (MemcachedNode node : nodes) {
SocketAddress address = node.getSocketAddress();
AbstractArcusResult<Map<String, String>> result
= new AbstractArcusResult<>(new AtomicReference<>(new HashMap<>()));
ArcusFutureImpl<Map<String, String>> future = new ArcusFutureImpl<>(result);

StatsOperation.Callback cb = new StatsOperation.Callback() {
@Override
public void gotStat(String name, String val) {
result.get().put(name, val);
}

@Override
public void receivedStatus(OperationStatus status) {
if (status.getStatusCode() == StatusCode.CANCELLED) {
future.internalCancel();
}
}

@Override
public void complete() {
future.complete();
}
};
Operation op = client.getOpFact().stats(arg.getArg(), cb);
future.setOp(op);
client.addOp(node, op);

addressToFuture.put(address, future);
}

return new ArcusMultiFuture<>(addressToFuture.values(), () -> {
Map<SocketAddress, Map<String, String>> resultMap = new HashMap<>(addressToFuture.size());
addressToFuture.forEach((address, future) -> {
if (future.isCompletedExceptionally()) {
resultMap.put(address, null);
} else {
@SuppressWarnings("unchecked")
Map<String, String> stats = (Map<String, String>) future.join();
Comment thread
oliviarla marked this conversation as resolved.
resultMap.put(address, stats);
}
});
return resultMap;
});
}

public ArcusFuture<Map<SocketAddress, String>> versions() {
ArcusClient client = arcusClientSupplier.get();
Collection<MemcachedNode> nodes = client.getAllNodes();

Map<SocketAddress, CompletableFuture<?>> addressToFuture = new HashMap<>(nodes.size());

for (MemcachedNode node : nodes) {
SocketAddress address = node.getSocketAddress();
AbstractArcusResult<String> result = new AbstractArcusResult<>(new AtomicReference<>());
ArcusFutureImpl<String> future = new ArcusFutureImpl<>(result);

OperationCallback cb = new OperationCallback() {
@Override
public void receivedStatus(OperationStatus status) {
if (status.getStatusCode() == StatusCode.CANCELLED) {
future.internalCancel();
return;
}
result.set(status.getMessage());
}

@Override
public void complete() {
future.complete();
}
};
Operation op = client.getOpFact().version(cb);
future.setOp(op);
client.addOp(node, op);

addressToFuture.put(address, future);
}

return new ArcusMultiFuture<>(addressToFuture.values(), () -> {
Map<SocketAddress, String> resultMap = new HashMap<>(addressToFuture.size());
addressToFuture.forEach((address, future) -> {
if (future.isCompletedExceptionally()) {
resultMap.put(address, null);
} else {
resultMap.put(address, (String) future.join());
Comment thread
oliviarla marked this conversation as resolved.
}
});
return resultMap;
});
}
}
Loading
Loading