From 64356edf21ab5f27bcf1d119eaf0f50ab70d1a07 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 23 Jun 2026 15:52:33 +0000 Subject: [PATCH 1/2] Added VolumeManager.deleteBulk method to use FileSystem.createBulkDelete Added method to VolumeManager that accepts a collection of Paths to delete. If the underlying FileSystem supports bulk deletion, then that API method is called. Otherwise the Paths are deleted using an ExecutorService. Reviewed the existing code that used the other delete methods in the VolumeManager. Only two places seemed like candidates for using the new deleteBulk method. The other locations either only deleted one file aperiodically, or deleted many files but using some complex logic. The former didn't make sense to modify and the latter could potentially be modified later with some refactoring. Closes #5131 --- .../accumulo/server/fs/VolumeManager.java | 15 ++ .../accumulo/server/fs/VolumeManagerImpl.java | 133 ++++++++++++++++++ .../server/util/FindCompactionTmpFiles.java | 84 ++++------- .../tableOps/bulkVer2/CleanUpBulkImport.java | 4 +- 4 files changed, 174 insertions(+), 62 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index c1ef3df0de4..5482d3c6698 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -51,6 +51,10 @@ */ public interface VolumeManager extends AutoCloseable { + enum DeleteStatus { + ERROR, FALSE, TRUE + }; + enum FileType { TABLE(Constants.TABLE_DIR), WAL(Constants.WAL_DIR), RECOVERY(Constants.RECOVERY_DIR); @@ -132,6 +136,17 @@ FSDataOutputStream createSyncable(Path logPath, int buffersize, short replicatio // delete a directory and anything under it boolean deleteRecursively(Path path) throws IOException; + /** + * Deletes a collection of files by grouping the files by FileSystem, then either calling + * {@code FileSystem#createBulkDelete(Path)} on FileSystem implementations that support it or by + * calling {@code #delete(Path)}. + * + * @param paths paths of files to delete + * @return paths map of input path to delete state + * @throws IOException + */ + Map deleteBulk(Collection paths) throws IOException; + // forward to the appropriate FileSystem object boolean exists(Path path) throws IOException; diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index b472539f691..77b80a8cf07 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -29,13 +29,17 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -53,6 +57,7 @@ import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.core.volume.VolumeImpl; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; @@ -73,6 +78,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.MoreExecutors; public class VolumeManagerImpl implements VolumeManager { @@ -86,6 +92,7 @@ public class VolumeManagerImpl implements VolumeManager { private final Map volumesByName; private final Multimap volumesByFileSystemUri; + private final Map bulkDeleteFileSystems; private final VolumeChooser chooser; private final AccumuloConfiguration conf; private final Configuration hadoopConf; @@ -95,6 +102,29 @@ protected VolumeManagerImpl(Map volumes, AccumuloConfiguration co this.volumesByName = volumes; // We may have multiple directories used in a single FileSystem (e.g. testing) this.volumesByFileSystemUri = invertVolumesByFileSystem(volumesByName); + + this.bulkDeleteFileSystems = new HashMap<>(); + for (Volume v : volumes.values()) { + FileSystem fs = v.getFileSystem(); + String base = v.getBasePath().isBlank() ? "/" : v.getBasePath(); + Path basePath = fs.makeQualified(new Path(base)); + try { + if (fs.hasPathCapability(basePath, "fs.capability.bulk.delete")) { + try (BulkDelete bulk = fs.createBulkDelete(basePath)) { + // Don't use the BulkDelete API if the page size is only 1. + // The DefaultBulkDeleteOperation implementation has a page size of 1. + // The S3A FileSystem implementation has a default of 250 and + // is configured by the property `fs.s3a.bulk.delete.page.size`. + if (bulk.pageSize() > 1) { + this.bulkDeleteFileSystems.put(fs, v); + } + } + } + } catch (IllegalArgumentException | IOException e) { + log.warn("Error determining bulk delete capability for volume {}", v.getBasePath(), e); + } + } + ensureSyncIsEnabled(); // if they supplied a property and we cannot load it, then fail hard VolumeChooser chooser1; @@ -213,6 +243,109 @@ public boolean deleteRecursively(Path path) throws IOException { return getFileSystemByPath(path).delete(path, true); } + public Map deleteBulk(Collection paths) throws IOException { + + requireNonNull(paths); + + if (paths.isEmpty()) { + return Map.of(); + } + + if (paths.size() == 1) { + Path p = paths.iterator().next(); + try { + if (delete(p)) { + return Map.of(p, DeleteStatus.TRUE); + } else { + return Map.of(p, DeleteStatus.FALSE); + } + } catch (IOException e) { + return Map.of(p, DeleteStatus.ERROR); + } + } + + final Map> pathsSupportingBulkDelete = new HashMap<>(); + final List pathsNotSupportingBulkDelete = new ArrayList<>(); + final Map results = new ConcurrentHashMap<>(); + final List> futures = new LinkedList<>(); + + for (Path p : paths) { + FileSystem fs = getFileSystemByPath(p); + Volume v = this.bulkDeleteFileSystems.get(fs); + if (v != null) { + pathsSupportingBulkDelete.computeIfAbsent(v, (k) -> new ArrayList()).add(p); + } else { + pathsNotSupportingBulkDelete.add(p); + } + } + + if (!pathsNotSupportingBulkDelete.isEmpty()) { + final ExecutorService delSvc; + if (pathsNotSupportingBulkDelete.size() < 4) { + // Do not bother creating a thread pool and threads for a few files. + delSvc = MoreExecutors.newDirectExecutorService(); + } else { + delSvc = Executors.newFixedThreadPool(8); + } + pathsNotSupportingBulkDelete.forEach(p -> { + futures.add(delSvc.submit(() -> { + try { + if (delete(p)) { + results.put(p, DeleteStatus.TRUE); + } else { + results.put(p, DeleteStatus.FALSE); + } + return null; + } catch (IOException e) { + log.error("Error deleting file at {}", p, e); + results.put(p, DeleteStatus.ERROR); + return null; + } + })); + }); + delSvc.shutdown(); + } + + for (Entry> e : pathsSupportingBulkDelete.entrySet()) { + Volume v = e.getKey(); + List deletes = e.getValue(); + FileSystem fs = v.getFileSystem(); + Path basePath = fs.makeQualified(new Path(v.getBasePath())); + try (BulkDelete bulk = fs.createBulkDelete(basePath)) { + int batchSize = bulk.pageSize(); + for (int i = 0; i <= deletes.size(); i += batchSize) { + List subset = deletes.subList(i, Math.min(i + batchSize, deletes.size())); + List> errors = bulk.bulkDelete(subset); + errors.forEach((entry) -> { + log.error("Failed to delete file at {}, reason: {}", entry.getKey(), entry.getValue()); + results.put(entry.getKey(), DeleteStatus.ERROR); + if (!subset.remove(entry.getKey())) { + log.error("Did not find error path {} in input set {}", entry.getKey(), subset); + } + }); + subset.forEach(success -> results.put(success, DeleteStatus.TRUE)); + } + } + } + + while (!futures.isEmpty()) { + Iterator> iter = futures.iterator(); + while (iter.hasNext()) { + Future f = iter.next(); + if (f.isDone()) { + iter.remove(); + } + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + return results; + } + protected void ensureSyncIsEnabled() { for (Entry entry : volumesByName.entrySet()) { FileSystem fs = entry.getValue().getFileSystem(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java index 7064b12de37..e156a2e93a3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java @@ -20,12 +20,13 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutionException; @@ -46,6 +47,7 @@ import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.fs.VolumeManager.DeleteStatus; import org.apache.accumulo.server.util.FindCompactionTmpFiles.FindOpts; import org.apache.accumulo.start.spi.CommandGroup; import org.apache.accumulo.start.spi.CommandGroups; @@ -59,7 +61,6 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.google.auto.service.AutoService; -import com.google.common.util.concurrent.MoreExecutors; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -173,70 +174,33 @@ public static void findTmpFiles(ServerContext ctx, TableId tableId, String dirNa } } - private static boolean deleteTmpFile(ServerContext context, Path p) throws IOException { - if (context.getVolumeManager().exists(p)) { - boolean result = context.getVolumeManager().delete(p); - if (result) { - LOG.debug("Removed old temp file {}", p); - } else { - LOG.error("Unable to remove old temp file {}, operation returned false with no exception", - p); - } - return result; - } - return true; - } - public static DeleteStats deleteTempFiles(ServerContext context, Set filesToDelete) { - final ExecutorService delSvc; - if (filesToDelete.size() < 4) { - // Do not bother creating a thread pool and threads for a few files. - delSvc = MoreExecutors.newDirectExecutorService(); - } else { - delSvc = Executors.newFixedThreadPool(8); - } - final DeleteStats stats = new DeleteStats(); - - // use a linked list to make removal from the middle of the list quick - final List> futures = new LinkedList<>(); - - filesToDelete.forEach(p -> { - futures.add(delSvc.submit(() -> deleteTmpFile(context, p))); - }); - delSvc.shutdown(); - try { - int expectedResponses = filesToDelete.size(); - while (expectedResponses > 0) { - Iterator> iter = futures.iterator(); - while (iter.hasNext()) { - Future future = iter.next(); - if (future.isDone()) { - expectedResponses--; - iter.remove(); - try { - if (future.get()) { - stats.success++; - } else { - stats.failure++; - } - } catch (ExecutionException e) { - stats.error++; - LOG.error("Error deleting a compaction tmp file", e); - } - } - } - if (expectedResponses > 0) { - LOG.debug("Waiting on {} background delete operations", expectedResponses); - UtilWaitThread.sleep(1_000); + Map results = context.getVolumeManager().deleteBulk(filesToDelete); + results.forEach((k, v) -> { + switch (v) { + case ERROR: + LOG.error("Error deleting a compaction tmp file {}", k); + stats.error++; + break; + case FALSE: + LOG.error( + "Unable to remove old temp file {}, operation returned false with no exception", k); + stats.failure++; + break; + case TRUE: + LOG.debug("Removed old temp file {}", k); + stats.success++; + break; + default: + break; } - } - delSvc.awaitTermination(10, TimeUnit.MINUTES); + }); return stats; - } catch (InterruptedException e) { - throw new IllegalStateException(e); + } catch (IOException e) { + throw new UncheckedIOException("Error in VolumeManager.deleteBulk", e); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java index b9229bf8f67..5a4af29b453 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; +import java.util.Set; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.AbstractId; @@ -78,8 +79,7 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { Path renamingFile = new Path(bulkDir, Constants.BULK_RENAME_FILE); Path mappingFile = new Path(bulkDir, Constants.BULK_LOAD_MAPPING); try { - env.getVolumeManager().delete(renamingFile); - env.getVolumeManager().delete(mappingFile); + env.getVolumeManager().deleteBulk(Set.of(renamingFile, mappingFile)); } catch (IOException ioe) { log.debug("{} Failed to delete renames and/or loadmap", fateId, ioe); } From f722118eb5510e6fd88079fbc025221675d8df5f Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 23 Jun 2026 18:21:25 +0000 Subject: [PATCH 2/2] Fix build --- .../main/java/org/apache/accumulo/server/fs/VolumeManager.java | 2 +- .../java/org/apache/accumulo/server/fs/VolumeManagerImpl.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index 5482d3c6698..caa8b9a8bcb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -143,7 +143,7 @@ FSDataOutputStream createSyncable(Path logPath, int buffersize, short replicatio * * @param paths paths of files to delete * @return paths map of input path to delete state - * @throws IOException + * @throws IOException on any exception deleting files */ Map deleteBulk(Collection paths) throws IOException; diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index 77b80a8cf07..bd07bacf995 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -243,6 +243,7 @@ public boolean deleteRecursively(Path path) throws IOException { return getFileSystemByPath(path).delete(path, true); } + @Override public Map deleteBulk(Collection paths) throws IOException { requireNonNull(paths);