diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index c1ef3df0de4..caa8b9a8bcb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -51,6 +51,10 @@ */ public interface VolumeManager extends AutoCloseable { + enum DeleteStatus { + ERROR, FALSE, TRUE + }; + enum FileType { TABLE(Constants.TABLE_DIR), WAL(Constants.WAL_DIR), RECOVERY(Constants.RECOVERY_DIR); @@ -132,6 +136,17 @@ FSDataOutputStream createSyncable(Path logPath, int buffersize, short replicatio // delete a directory and anything under it boolean deleteRecursively(Path path) throws IOException; + /** + * Deletes a collection of files by grouping the files by FileSystem, then either calling + * {@code FileSystem#createBulkDelete(Path)} on FileSystem implementations that support it or by + * calling {@code #delete(Path)}. + * + * @param paths paths of files to delete + * @return paths map of input path to delete state + * @throws IOException on any exception deleting files + */ + Map deleteBulk(Collection paths) throws IOException; + // forward to the appropriate FileSystem object boolean exists(Path path) throws IOException; diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index b472539f691..bd07bacf995 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -29,13 +29,17 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -53,6 +57,7 @@ import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.core.volume.VolumeImpl; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; @@ -73,6 +78,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.MoreExecutors; public class VolumeManagerImpl implements VolumeManager { @@ -86,6 +92,7 @@ public class VolumeManagerImpl implements VolumeManager { private final Map volumesByName; private final Multimap volumesByFileSystemUri; + private final Map bulkDeleteFileSystems; private final VolumeChooser chooser; private final AccumuloConfiguration conf; private final Configuration hadoopConf; @@ -95,6 +102,29 @@ protected VolumeManagerImpl(Map volumes, AccumuloConfiguration co this.volumesByName = volumes; // We may have multiple directories used in a single FileSystem (e.g. testing) this.volumesByFileSystemUri = invertVolumesByFileSystem(volumesByName); + + this.bulkDeleteFileSystems = new HashMap<>(); + for (Volume v : volumes.values()) { + FileSystem fs = v.getFileSystem(); + String base = v.getBasePath().isBlank() ? "/" : v.getBasePath(); + Path basePath = fs.makeQualified(new Path(base)); + try { + if (fs.hasPathCapability(basePath, "fs.capability.bulk.delete")) { + try (BulkDelete bulk = fs.createBulkDelete(basePath)) { + // Don't use the BulkDelete API if the page size is only 1. + // The DefaultBulkDeleteOperation implementation has a page size of 1. + // The S3A FileSystem implementation has a default of 250 and + // is configured by the property `fs.s3a.bulk.delete.page.size`. + if (bulk.pageSize() > 1) { + this.bulkDeleteFileSystems.put(fs, v); + } + } + } + } catch (IllegalArgumentException | IOException e) { + log.warn("Error determining bulk delete capability for volume {}", v.getBasePath(), e); + } + } + ensureSyncIsEnabled(); // if they supplied a property and we cannot load it, then fail hard VolumeChooser chooser1; @@ -213,6 +243,110 @@ public boolean deleteRecursively(Path path) throws IOException { return getFileSystemByPath(path).delete(path, true); } + @Override + public Map deleteBulk(Collection paths) throws IOException { + + requireNonNull(paths); + + if (paths.isEmpty()) { + return Map.of(); + } + + if (paths.size() == 1) { + Path p = paths.iterator().next(); + try { + if (delete(p)) { + return Map.of(p, DeleteStatus.TRUE); + } else { + return Map.of(p, DeleteStatus.FALSE); + } + } catch (IOException e) { + return Map.of(p, DeleteStatus.ERROR); + } + } + + final Map> pathsSupportingBulkDelete = new HashMap<>(); + final List pathsNotSupportingBulkDelete = new ArrayList<>(); + final Map results = new ConcurrentHashMap<>(); + final List> futures = new LinkedList<>(); + + for (Path p : paths) { + FileSystem fs = getFileSystemByPath(p); + Volume v = this.bulkDeleteFileSystems.get(fs); + if (v != null) { + pathsSupportingBulkDelete.computeIfAbsent(v, (k) -> new ArrayList()).add(p); + } else { + pathsNotSupportingBulkDelete.add(p); + } + } + + if (!pathsNotSupportingBulkDelete.isEmpty()) { + final ExecutorService delSvc; + if (pathsNotSupportingBulkDelete.size() < 4) { + // Do not bother creating a thread pool and threads for a few files. + delSvc = MoreExecutors.newDirectExecutorService(); + } else { + delSvc = Executors.newFixedThreadPool(8); + } + pathsNotSupportingBulkDelete.forEach(p -> { + futures.add(delSvc.submit(() -> { + try { + if (delete(p)) { + results.put(p, DeleteStatus.TRUE); + } else { + results.put(p, DeleteStatus.FALSE); + } + return null; + } catch (IOException e) { + log.error("Error deleting file at {}", p, e); + results.put(p, DeleteStatus.ERROR); + return null; + } + })); + }); + delSvc.shutdown(); + } + + for (Entry> e : pathsSupportingBulkDelete.entrySet()) { + Volume v = e.getKey(); + List deletes = e.getValue(); + FileSystem fs = v.getFileSystem(); + Path basePath = fs.makeQualified(new Path(v.getBasePath())); + try (BulkDelete bulk = fs.createBulkDelete(basePath)) { + int batchSize = bulk.pageSize(); + for (int i = 0; i <= deletes.size(); i += batchSize) { + List subset = deletes.subList(i, Math.min(i + batchSize, deletes.size())); + List> errors = bulk.bulkDelete(subset); + errors.forEach((entry) -> { + log.error("Failed to delete file at {}, reason: {}", entry.getKey(), entry.getValue()); + results.put(entry.getKey(), DeleteStatus.ERROR); + if (!subset.remove(entry.getKey())) { + log.error("Did not find error path {} in input set {}", entry.getKey(), subset); + } + }); + subset.forEach(success -> results.put(success, DeleteStatus.TRUE)); + } + } + } + + while (!futures.isEmpty()) { + Iterator> iter = futures.iterator(); + while (iter.hasNext()) { + Future f = iter.next(); + if (f.isDone()) { + iter.remove(); + } + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + return results; + } + protected void ensureSyncIsEnabled() { for (Entry entry : volumesByName.entrySet()) { FileSystem fs = entry.getValue().getFileSystem(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java index 7064b12de37..e156a2e93a3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java @@ -20,12 +20,13 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutionException; @@ -46,6 +47,7 @@ import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.fs.VolumeManager.DeleteStatus; import org.apache.accumulo.server.util.FindCompactionTmpFiles.FindOpts; import org.apache.accumulo.start.spi.CommandGroup; import org.apache.accumulo.start.spi.CommandGroups; @@ -59,7 +61,6 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.google.auto.service.AutoService; -import com.google.common.util.concurrent.MoreExecutors; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -173,70 +174,33 @@ public static void findTmpFiles(ServerContext ctx, TableId tableId, String dirNa } } - private static boolean deleteTmpFile(ServerContext context, Path p) throws IOException { - if (context.getVolumeManager().exists(p)) { - boolean result = context.getVolumeManager().delete(p); - if (result) { - LOG.debug("Removed old temp file {}", p); - } else { - LOG.error("Unable to remove old temp file {}, operation returned false with no exception", - p); - } - return result; - } - return true; - } - public static DeleteStats deleteTempFiles(ServerContext context, Set filesToDelete) { - final ExecutorService delSvc; - if (filesToDelete.size() < 4) { - // Do not bother creating a thread pool and threads for a few files. - delSvc = MoreExecutors.newDirectExecutorService(); - } else { - delSvc = Executors.newFixedThreadPool(8); - } - final DeleteStats stats = new DeleteStats(); - - // use a linked list to make removal from the middle of the list quick - final List> futures = new LinkedList<>(); - - filesToDelete.forEach(p -> { - futures.add(delSvc.submit(() -> deleteTmpFile(context, p))); - }); - delSvc.shutdown(); - try { - int expectedResponses = filesToDelete.size(); - while (expectedResponses > 0) { - Iterator> iter = futures.iterator(); - while (iter.hasNext()) { - Future future = iter.next(); - if (future.isDone()) { - expectedResponses--; - iter.remove(); - try { - if (future.get()) { - stats.success++; - } else { - stats.failure++; - } - } catch (ExecutionException e) { - stats.error++; - LOG.error("Error deleting a compaction tmp file", e); - } - } - } - if (expectedResponses > 0) { - LOG.debug("Waiting on {} background delete operations", expectedResponses); - UtilWaitThread.sleep(1_000); + Map results = context.getVolumeManager().deleteBulk(filesToDelete); + results.forEach((k, v) -> { + switch (v) { + case ERROR: + LOG.error("Error deleting a compaction tmp file {}", k); + stats.error++; + break; + case FALSE: + LOG.error( + "Unable to remove old temp file {}, operation returned false with no exception", k); + stats.failure++; + break; + case TRUE: + LOG.debug("Removed old temp file {}", k); + stats.success++; + break; + default: + break; } - } - delSvc.awaitTermination(10, TimeUnit.MINUTES); + }); return stats; - } catch (InterruptedException e) { - throw new IllegalStateException(e); + } catch (IOException e) { + throw new UncheckedIOException("Error in VolumeManager.deleteBulk", e); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java index b9229bf8f67..5a4af29b453 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; +import java.util.Set; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.AbstractId; @@ -78,8 +79,7 @@ public Repo call(FateId fateId, FateEnv env) throws Exception { Path renamingFile = new Path(bulkDir, Constants.BULK_RENAME_FILE); Path mappingFile = new Path(bulkDir, Constants.BULK_LOAD_MAPPING); try { - env.getVolumeManager().delete(renamingFile); - env.getVolumeManager().delete(mappingFile); + env.getVolumeManager().deleteBulk(Set.of(renamingFile, mappingFile)); } catch (IOException ioe) { log.debug("{} Failed to delete renames and/or loadmap", fateId, ioe); }