Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;

import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.Path;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -85,6 +86,9 @@ public enum WalState {
UNREFERENCED
}

public record WalStatePath(WalState state, Path path) {
}

private final ZooReaderWriter zoo;

private volatile boolean checkedExistance = false;
Expand Down Expand Up @@ -144,9 +148,9 @@ public void walUnreferenced(TServerInstance tsi, Path path) throws WalMarkerExce
updateState(tsi, path, WalState.UNREFERENCED);
}

public static Pair<WalState,Path> parse(byte[] data) {
public static WalStatePath parse(byte[] data) {
String[] parts = new String(data, UTF_8).split(",");
return new Pair<>(WalState.valueOf(parts[0]), new Path(parts[1]));
return new WalStatePath(WalState.valueOf(parts[0]), new Path(parts[1]));
}

// Manager needs to know the logs for the given instance
Expand All @@ -168,9 +172,9 @@ public List<Path> getWalsInUse(TServerInstance tsi) throws WalMarkerException {
}

if (zdata != null) {
Pair<WalState,Path> parts = parse(zdata);
if (parts.getFirst() != WalState.UNREFERENCED) {
result.add(parts.getSecond());
WalStatePath parts = parse(zdata);
if (parts.state() != WalState.UNREFERENCED) {
result.add(parts.path());
}
}
}
Expand Down Expand Up @@ -204,7 +208,7 @@ public Map<TServerInstance,List<UUID>> getAllMarkers() throws WalMarkerException
}

// garbage collector wants to know the state (open/closed) of a log, and the filename to delete
public Pair<WalState,Path> state(TServerInstance instance, UUID uuid) throws WalMarkerException {
public WalStatePath state(TServerInstance instance, UUID uuid) throws WalMarkerException {
try {
String path = root() + "/" + instance + "/" + uuid;
return parse(zoo.getData(path));
Expand All @@ -213,15 +217,15 @@ public Pair<WalState,Path> state(TServerInstance instance, UUID uuid) throws Wal
}
}

// utility combination of getAllMarkers and state
public Map<Path,WalState> getAllState() throws WalMarkerException {
Map<Path,WalState> result = new HashMap<>();
/**
* @return the state and path for all WAL markers.
*/
public Set<WalStatePath> getAllState() throws WalMarkerException {
Set<WalStatePath> result = new HashSet<>();
for (Entry<TServerInstance,List<UUID>> entry : getAllMarkers().entrySet()) {
for (UUID id : entry.getValue()) {
// This function is called by the Accumulo GC which deletes WAL markers. Therefore we do not
// expect the following call to fail because the WAL info in ZK was deleted.
Pair<WalState,Path> state = state(entry.getKey(), id);
result.put(state.getSecond(), state.getFirst());
// WAL markers can be deleted while reading them.
result.add(state(entry.getKey(), id));
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ private void listTable(Ample.DataLevel level, ServerContext context) throws Exce
volumes.clear();

WalStateManager wals = new WalStateManager(context);
for (Path path : wals.getAllState().keySet()) {
volumes.add(getLogURI(path.toString()));
for (var wal : wals.getAllState()) {
volumes.add(getLogURI(wal.path().toString()));
}
for (String volume : volumes) {
System.out.println("\tVolume : " + volume);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@
import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalStatePath;
import org.apache.accumulo.server.util.adminCommand.SystemCheck.Check;
import org.apache.hadoop.fs.Path;

import com.google.common.collect.Sets;

Expand Down Expand Up @@ -169,10 +168,10 @@ private static boolean checkZKWALsMetadata(ServerContext context) throws Excepti
var walsBefore = gatherWalsFromZK(context, zrw);

// gather any wals present in ZooKeeper but missing in DFS
Map<TServerInstance,Set<Pair<WalStateManager.WalState,Path>>> missingWals = new HashMap<>();
Map<TServerInstance,Set<WalStatePath>> missingWals = new HashMap<>();
for (var instanceAndWals : walsBefore.entrySet()) {
for (var wal : instanceAndWals.getValue()) {
if (!context.getVolumeManager().exists(wal.getSecond())) {
if (!context.getVolumeManager().exists(wal.path())) {
missingWals.computeIfAbsent(instanceAndWals.getKey(), k -> new HashSet<>()).add(wal);
}
}
Expand All @@ -195,10 +194,10 @@ private static boolean checkZKWALsMetadata(ServerContext context) throws Excepti
return status;
}

private static Map<TServerInstance,Set<Pair<WalStateManager.WalState,Path>>>
gatherWalsFromZK(ServerContext context, ZooReaderWriter zrw) throws Exception {
private static Map<TServerInstance,Set<WalStatePath>> gatherWalsFromZK(ServerContext context,
ZooReaderWriter zrw) throws Exception {
final var rootWalsDir = WalStateManager.ZWALS;
Map<TServerInstance,Set<Pair<WalStateManager.WalState,Path>>> wals = new HashMap<>();
Map<TServerInstance,Set<WalStatePath>> wals = new HashMap<>();
var tserverInstances = TabletMetadata.getLiveTServers(context);
for (var tsi : tserverInstances) {
wals.put(tsi, new HashSet<>());
Expand All @@ -222,8 +221,8 @@ private static boolean checkZKWALsMetadata(ServerContext context) throws Excepti
}
var parseRes = WalStateManager.parse(data);
log.trace("Successfully parsed WAL metadata at {} result {}", fullWalPath, parseRes);
if (parseRes.getFirst() == WalStateManager.WalState.OPEN
|| parseRes.getFirst() == WalStateManager.WalState.CLOSED) {
if (parseRes.state() == WalStateManager.WalState.OPEN
|| parseRes.state() == WalStateManager.WalState.CLOSED) {
wals.get(tsi).add(parseRes);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@
import org.apache.accumulo.core.metadata.schema.filters.GcWalsFilter;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
import org.apache.accumulo.server.log.WalStateManager.WalState;
import org.apache.accumulo.server.log.WalStateManager.WalStatePath;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -126,7 +126,7 @@ public void collect(GCStatus status) {
long count;
long fileScanStop;
Map<TServerInstance,Set<UUID>> logsByServer;
Map<UUID,Pair<WalState,Path>> logsState;
Map<UUID,WalStatePath> logsState;
Map<UUID,Path> recoveryLogs;

Span span = TraceUtil.startSpan(this.getClass(), "getCandidates");
Expand Down Expand Up @@ -260,18 +260,18 @@ private Future<?> removeFile(ExecutorService deleteThreadPool, Path path, Atomic
});
}

private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) {
private long removeFiles(Collection<WalStatePath> collection, final GCStatus status) {

final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools()
.createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS);
final Map<Path,Future<?>> futures = new HashMap<>(collection.size());
final AtomicLong counter = new AtomicLong();

try {
for (Pair<WalState,Path> stateFile : collection) {
Path path = stateFile.getSecond();
for (WalStatePath stateFile : collection) {
Path path = stateFile.path();
futures.put(path, removeFile(deleteThreadPool, path, counter,
"Removing " + stateFile.getFirst() + " WAL " + path));
"Removing " + stateFile.state() + " WAL " + path));
}

while (!futures.isEmpty()) {
Expand Down Expand Up @@ -342,7 +342,7 @@ private long removeFiles(Collection<Path> values) {
}

private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUID>> candidates,
Set<TServerInstance> liveServers, Map<UUID,Pair<WalState,Path>> logsState,
Set<TServerInstance> liveServers, Map<UUID,WalStatePath> logsState,
Map<UUID,Path> recoveryLogs) {

Map<UUID,TServerInstance> result = new HashMap<>();
Expand Down Expand Up @@ -388,12 +388,8 @@ private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUI
Set<UUID> idsForServer = candidates.get(liveServer);
// Server may not have any logs yet
if (idsForServer != null) {
for (UUID id : idsForServer) {
Pair<WalState,Path> stateFile = logsState.get(id);
if (stateFile.getFirst() != WalState.UNREFERENCED) {
result.remove(id);
}
}
result.keySet().removeIf(
id -> idsForServer.contains(id) && logsState.get(id).state() != WalState.UNREFERENCED);

recoveryLogs.keySet().removeAll(idsForServer);
}
Expand All @@ -408,7 +404,7 @@ private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUI
* @return total number of log files
*/
private long getCurrent(Map<TServerInstance,Set<UUID>> logsByServer,
Map<UUID,Pair<WalState,Path>> logState) throws Exception {
Map<UUID,WalStatePath> logState) throws Exception {

// get all the unused WALs in zookeeper
long result = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalState;
import org.apache.accumulo.server.log.WalStateManager.WalStatePath;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -98,7 +98,8 @@ public void testRemoveUnusedLog() throws Exception {
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));

EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.UNREFERENCED, path));
EasyMock.expect(marker.state(server1, id))
.andReturn(new WalStatePath(WalState.UNREFERENCED, path));
EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
marker.removeWalMarker(server1, id);
EasyMock.expectLastCall().once();
Expand Down Expand Up @@ -138,7 +139,7 @@ public void testKeepClosedLog() throws Exception {
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));

EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.CLOSED, path));
EasyMock.expect(marker.state(server1, id)).andReturn(new WalStatePath(WalState.CLOSED, path));
EasyMock.replay(conf, context, marker, tserverSet, fs);
var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) {

Expand Down Expand Up @@ -174,7 +175,7 @@ public void deleteUnreferencedLogOnDeadServer() throws Exception {
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));

EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
EasyMock.expect(marker.state(server2, id)).andReturn(new WalStatePath(WalState.OPEN, path));

EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
marker.removeWalMarker(server2, id);
Expand Down Expand Up @@ -214,7 +215,7 @@ public void ignoreReferenceLogOnDeadServer() throws Exception {
EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));

EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
EasyMock.expect(marker.state(server2, id)).andReturn(new WalStatePath(WalState.OPEN, path));

EasyMock.replay(conf, context, fs, marker, tserverSet);
GarbageCollectWriteAheadLogs gc =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ private void writeData(AccumuloClient client, String table, String rowToWrite) t
}

private long getWalCount(ServerContext context) throws Exception {
return new WalStateManager(context).getAllState().values().stream()
.filter(walState -> walState != WalStateManager.WalState.OPEN).count();
return new WalStateManager(context).getAllState().stream()
.filter(wal -> wal.state() != WalStateManager.WalState.OPEN).count();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ public void testSystemConfigCheck2() throws Exception {
var wal = WalStateManager.parse(zrw.getData(fullWalPathZk));

// delete from HDFS
context.getVolumeManager().delete(wal.getSecond());
context.getVolumeManager().delete(wal.path());

var p = getCluster().exec(SystemCheck.class, "run", sysConfCheck.name());
assertEquals(1, p.getProcess().waitFor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,10 @@ public void waLogsSentToConfiguredVolumes() throws Exception {
createAndVerifyTable(client, tableName, alpha_rows, false);
// should only go to v2 as per configuration in configure()
var walMgr = new WalStateManager(getServerContext());
Map<Path,WalStateManager.WalState> allLogs = walMgr.getAllState();
var allLogs = walMgr.getAllState();
assertFalse(allLogs.isEmpty());
String volume = v2.toString();
allLogs.keySet().stream().map(Path::toString).forEach(path -> {
allLogs.stream().map(wal -> wal.path().toString()).forEach(path -> {
assertTrue(path.startsWith(volume), () -> path + " did not contain " + volume);
});
}
Expand Down
7 changes: 3 additions & 4 deletions test/src/main/java/org/apache/accumulo/test/VolumeITBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,13 @@ private void verifyVolumesUsed(AccumuloClient client, String tableName, boolean
retry: while (true) {
WalStateManager wals = new WalStateManager(getServerContext());
try {
outer: for (Map.Entry<Path,WalStateManager.WalState> entry : wals.getAllState()
.entrySet()) {
outer: for (var wal : wals.getAllState()) {
for (Path path : paths) {
if (entry.getKey().toString().startsWith(path.toString())) {
if (wal.path().toString().startsWith(path.toString())) {
continue outer;
}
}
log.warn("Unexpected volume " + entry.getKey() + " (" + entry.getValue() + ")");
log.warn("Unexpected volume " + wal.path() + " (" + wal.state() + ")");
UtilWaitThread.sleep(100);
continue retry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.accumulo.server.log.WalStateManager.WalState;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException.NoNodeException;
Expand Down Expand Up @@ -268,9 +267,9 @@ static Map<String,WalState> _getWals(ServerContext c) throws Exception {
try {
Map<String,WalState> result = new HashMap<>();
WalStateManager wals = new WalStateManager(c);
for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
for (var wal : wals.getAllState()) {
// WALs are in use if they are not unreferenced
result.put(entry.getKey().toString(), entry.getValue());
result.put(wal.path().toString(), wal.state());
}
return result;
} catch (WalMarkerException wme) {
Expand Down