diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java b/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java index 519e08f4e5f..b83d4281858 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java @@ -22,16 +22,17 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.ServerContext; import org.apache.hadoop.fs.Path; import org.apache.zookeeper.KeeperException; @@ -85,6 +86,9 @@ public enum WalState { UNREFERENCED } + public record WalStatePath(WalState state, Path path) { + } + private final ZooReaderWriter zoo; private volatile boolean checkedExistance = false; @@ -144,9 +148,9 @@ public void walUnreferenced(TServerInstance tsi, Path path) throws WalMarkerExce updateState(tsi, path, WalState.UNREFERENCED); } - public static Pair parse(byte[] data) { + public static WalStatePath parse(byte[] data) { String[] parts = new String(data, UTF_8).split(","); - return new Pair<>(WalState.valueOf(parts[0]), new Path(parts[1])); + return new WalStatePath(WalState.valueOf(parts[0]), new Path(parts[1])); } // Manager needs to know the logs for the given instance @@ -168,9 +172,9 @@ public List getWalsInUse(TServerInstance tsi) throws WalMarkerException { } if (zdata != null) { - Pair parts = parse(zdata); - if (parts.getFirst() != WalState.UNREFERENCED) { - result.add(parts.getSecond()); + WalStatePath parts = parse(zdata); + if (parts.state() != WalState.UNREFERENCED) { + result.add(parts.path()); } } } @@ -204,7 +208,7 @@ public Map> getAllMarkers() throws WalMarkerException } // garbage collector wants to know the state (open/closed) of a log, and the filename to delete - public Pair state(TServerInstance instance, UUID uuid) throws WalMarkerException { + public WalStatePath state(TServerInstance instance, UUID uuid) throws WalMarkerException { try { String path = root() + "/" + instance + "/" + uuid; return parse(zoo.getData(path)); @@ -213,15 +217,15 @@ public Pair state(TServerInstance instance, UUID uuid) throws Wal } } - // utility combination of getAllMarkers and state - public Map getAllState() throws WalMarkerException { - Map result = new HashMap<>(); + /** + * @return the state and path for all WAL markers. + */ + public Set getAllState() throws WalMarkerException { + Set result = new HashSet<>(); for (Entry> entry : getAllMarkers().entrySet()) { for (UUID id : entry.getValue()) { - // This function is called by the Accumulo GC which deletes WAL markers. Therefore we do not - // expect the following call to fail because the WAL info in ZK was deleted. - Pair state = state(entry.getKey(), id); - result.put(state.getSecond(), state.getFirst()); + // WAL markers can be deleted while reading them. + result.add(state(entry.getKey(), id)); } } return result; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/adminCommand/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/adminCommand/ListVolumesUsed.java index e88d739e10f..90d585b6748 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/adminCommand/ListVolumesUsed.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/adminCommand/ListVolumesUsed.java @@ -125,8 +125,8 @@ private void listTable(Ample.DataLevel level, ServerContext context) throws Exce volumes.clear(); WalStateManager wals = new WalStateManager(context); - for (Path path : wals.getAllState().keySet()) { - volumes.add(getLogURI(path.toString())); + for (var wal : wals.getAllState()) { + volumes.add(getLogURI(wal.path().toString())); } for (String volume : volumes) { System.out.println("\tVolume : " + volume); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/SystemConfigCheckRunner.java b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/SystemConfigCheckRunner.java index 93a94184994..3318a8b4c4a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/SystemConfigCheckRunner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/SystemConfigCheckRunner.java @@ -30,11 +30,10 @@ import org.apache.accumulo.core.metadata.SystemTables; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.log.WalStateManager; +import org.apache.accumulo.server.log.WalStateManager.WalStatePath; import org.apache.accumulo.server.util.adminCommand.SystemCheck.Check; -import org.apache.hadoop.fs.Path; import com.google.common.collect.Sets; @@ -169,10 +168,10 @@ private static boolean checkZKWALsMetadata(ServerContext context) throws Excepti var walsBefore = gatherWalsFromZK(context, zrw); // gather any wals present in ZooKeeper but missing in DFS - Map>> missingWals = new HashMap<>(); + Map> missingWals = new HashMap<>(); for (var instanceAndWals : walsBefore.entrySet()) { for (var wal : instanceAndWals.getValue()) { - if (!context.getVolumeManager().exists(wal.getSecond())) { + if (!context.getVolumeManager().exists(wal.path())) { missingWals.computeIfAbsent(instanceAndWals.getKey(), k -> new HashSet<>()).add(wal); } } @@ -195,10 +194,10 @@ private static boolean checkZKWALsMetadata(ServerContext context) throws Excepti return status; } - private static Map>> - gatherWalsFromZK(ServerContext context, ZooReaderWriter zrw) throws Exception { + private static Map> gatherWalsFromZK(ServerContext context, + ZooReaderWriter zrw) throws Exception { final var rootWalsDir = WalStateManager.ZWALS; - Map>> wals = new HashMap<>(); + Map> wals = new HashMap<>(); var tserverInstances = TabletMetadata.getLiveTServers(context); for (var tsi : tserverInstances) { wals.put(tsi, new HashSet<>()); @@ -222,8 +221,8 @@ private static boolean checkZKWALsMetadata(ServerContext context) throws Excepti } var parseRes = WalStateManager.parse(data); log.trace("Successfully parsed WAL metadata at {} result {}", fullWalPath, parseRes); - if (parseRes.getFirst() == WalStateManager.WalState.OPEN - || parseRes.getFirst() == WalStateManager.WalState.CLOSED) { + if (parseRes.state() == WalStateManager.WalState.OPEN + || parseRes.state() == WalStateManager.WalState.CLOSED) { wals.get(tsi).add(parseRes); } } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 528621d49fd..dd01b635990 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -53,13 +53,13 @@ import org.apache.accumulo.core.metadata.schema.filters.GcWalsFilter; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; import org.apache.accumulo.server.log.WalStateManager.WalState; +import org.apache.accumulo.server.log.WalStateManager.WalStatePath; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -126,7 +126,7 @@ public void collect(GCStatus status) { long count; long fileScanStop; Map> logsByServer; - Map> logsState; + Map logsState; Map recoveryLogs; Span span = TraceUtil.startSpan(this.getClass(), "getCandidates"); @@ -260,7 +260,7 @@ private Future removeFile(ExecutorService deleteThreadPool, Path path, Atomic }); } - private long removeFiles(Collection> collection, final GCStatus status) { + private long removeFiles(Collection collection, final GCStatus status) { final ExecutorService deleteThreadPool = ThreadPools.getServerThreadPools() .createExecutorService(context.getConfiguration(), Property.GC_DELETE_WAL_THREADS); @@ -268,10 +268,10 @@ private long removeFiles(Collection> collection, final GCSta final AtomicLong counter = new AtomicLong(); try { - for (Pair stateFile : collection) { - Path path = stateFile.getSecond(); + for (WalStatePath stateFile : collection) { + Path path = stateFile.path(); futures.put(path, removeFile(deleteThreadPool, path, counter, - "Removing " + stateFile.getFirst() + " WAL " + path)); + "Removing " + stateFile.state() + " WAL " + path)); } while (!futures.isEmpty()) { @@ -342,7 +342,7 @@ private long removeFiles(Collection values) { } private Map removeEntriesInUse(Map> candidates, - Set liveServers, Map> logsState, + Set liveServers, Map logsState, Map recoveryLogs) { Map result = new HashMap<>(); @@ -388,12 +388,8 @@ private Map removeEntriesInUse(Map idsForServer = candidates.get(liveServer); // Server may not have any logs yet if (idsForServer != null) { - for (UUID id : idsForServer) { - Pair stateFile = logsState.get(id); - if (stateFile.getFirst() != WalState.UNREFERENCED) { - result.remove(id); - } - } + result.keySet().removeIf( + id -> idsForServer.contains(id) && logsState.get(id).state() != WalState.UNREFERENCED); recoveryLogs.keySet().removeAll(idsForServer); } @@ -408,7 +404,7 @@ private Map removeEntriesInUse(Map> logsByServer, - Map> logState) throws Exception { + Map logState) throws Exception { // get all the unused WALs in zookeeper long result = 0; diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 896afcc2c31..99064550840 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -39,11 +39,11 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalState; +import org.apache.accumulo.server.log.WalStateManager.WalStatePath; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -98,7 +98,8 @@ public void testRemoveUnusedLog() throws Exception { EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1)); EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once(); - EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.UNREFERENCED, path)); + EasyMock.expect(marker.state(server1, id)) + .andReturn(new WalStatePath(WalState.UNREFERENCED, path)); EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once(); marker.removeWalMarker(server1, id); EasyMock.expectLastCall().once(); @@ -138,7 +139,7 @@ public void testKeepClosedLog() throws Exception { EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1)); EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once(); - EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<>(WalState.CLOSED, path)); + EasyMock.expect(marker.state(server1, id)).andReturn(new WalStatePath(WalState.CLOSED, path)); EasyMock.replay(conf, context, marker, tserverSet, fs); var gc = new GarbageCollectWriteAheadLogs(context, fs, tserverSet, marker) { @@ -174,7 +175,7 @@ public void deleteUnreferencedLogOnDeadServer() throws Exception { EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1)); EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once(); - EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path)); + EasyMock.expect(marker.state(server2, id)).andReturn(new WalStatePath(WalState.OPEN, path)); EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once(); marker.removeWalMarker(server2, id); @@ -214,7 +215,7 @@ public void ignoreReferenceLogOnDeadServer() throws Exception { EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1)); EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once(); - EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path)); + EasyMock.expect(marker.state(server2, id)).andReturn(new WalStatePath(WalState.OPEN, path)); EasyMock.replay(conf, context, fs, marker, tserverSet); GarbageCollectWriteAheadLogs gc = diff --git a/test/src/main/java/org/apache/accumulo/test/MaxWalReferencedIT.java b/test/src/main/java/org/apache/accumulo/test/MaxWalReferencedIT.java index 9f20c03fdc4..e8aed2b98b1 100644 --- a/test/src/main/java/org/apache/accumulo/test/MaxWalReferencedIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MaxWalReferencedIT.java @@ -132,8 +132,8 @@ private void writeData(AccumuloClient client, String table, String rowToWrite) t } private long getWalCount(ServerContext context) throws Exception { - return new WalStateManager(context).getAllState().values().stream() - .filter(walState -> walState != WalStateManager.WalState.OPEN).count(); + return new WalStateManager(context).getAllState().stream() + .filter(wal -> wal.state() != WalStateManager.WalState.OPEN).count(); } } diff --git a/test/src/main/java/org/apache/accumulo/test/SystemCheckIT.java b/test/src/main/java/org/apache/accumulo/test/SystemCheckIT.java index 4996b573b17..7e52b8afd88 100644 --- a/test/src/main/java/org/apache/accumulo/test/SystemCheckIT.java +++ b/test/src/main/java/org/apache/accumulo/test/SystemCheckIT.java @@ -612,7 +612,7 @@ public void testSystemConfigCheck2() throws Exception { var wal = WalStateManager.parse(zrw.getData(fullWalPathZk)); // delete from HDFS - context.getVolumeManager().delete(wal.getSecond()); + context.getVolumeManager().delete(wal.path()); var p = getCluster().exec(SystemCheck.class, "run", sysConfCheck.name()); assertEquals(1, p.getProcess().waitFor()); diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java index 9ac74280b99..d635aeb4aa8 100644 --- a/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java +++ b/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java @@ -305,10 +305,10 @@ public void waLogsSentToConfiguredVolumes() throws Exception { createAndVerifyTable(client, tableName, alpha_rows, false); // should only go to v2 as per configuration in configure() var walMgr = new WalStateManager(getServerContext()); - Map allLogs = walMgr.getAllState(); + var allLogs = walMgr.getAllState(); assertFalse(allLogs.isEmpty()); String volume = v2.toString(); - allLogs.keySet().stream().map(Path::toString).forEach(path -> { + allLogs.stream().map(wal -> wal.path().toString()).forEach(path -> { assertTrue(path.startsWith(volume), () -> path + " did not contain " + volume); }); } diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeITBase.java b/test/src/main/java/org/apache/accumulo/test/VolumeITBase.java index abf7abed8a8..3c1b301744b 100644 --- a/test/src/main/java/org/apache/accumulo/test/VolumeITBase.java +++ b/test/src/main/java/org/apache/accumulo/test/VolumeITBase.java @@ -207,14 +207,13 @@ private void verifyVolumesUsed(AccumuloClient client, String tableName, boolean retry: while (true) { WalStateManager wals = new WalStateManager(getServerContext()); try { - outer: for (Map.Entry entry : wals.getAllState() - .entrySet()) { + outer: for (var wal : wals.getAllState()) { for (Path path : paths) { - if (entry.getKey().toString().startsWith(path.toString())) { + if (wal.path().toString().startsWith(path.toString())) { continue outer; } } - log.warn("Unexpected volume " + entry.getKey() + " (" + entry.getValue() + ")"); + log.warn("Unexpected volume " + wal.path() + " (" + wal.state() + ")"); UtilWaitThread.sleep(100); continue retry; } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayITBase.java b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayITBase.java index a5686364a2e..e197c49002d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayITBase.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayITBase.java @@ -62,7 +62,6 @@ import org.apache.accumulo.server.log.WalStateManager.WalState; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -268,9 +267,9 @@ static Map _getWals(ServerContext c) throws Exception { try { Map result = new HashMap<>(); WalStateManager wals = new WalStateManager(c); - for (Entry entry : wals.getAllState().entrySet()) { + for (var wal : wals.getAllState()) { // WALs are in use if they are not unreferenced - result.put(entry.getKey().toString(), entry.getValue()); + result.put(wal.path().toString(), wal.state()); } return result; } catch (WalMarkerException wme) {