Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
*/
public interface VolumeManager extends AutoCloseable {

enum DeleteStatus {
ERROR, FALSE, TRUE
};

enum FileType {
TABLE(Constants.TABLE_DIR), WAL(Constants.WAL_DIR), RECOVERY(Constants.RECOVERY_DIR);

Expand Down Expand Up @@ -132,6 +136,17 @@ FSDataOutputStream createSyncable(Path logPath, int buffersize, short replicatio
// delete a directory and anything under it
boolean deleteRecursively(Path path) throws IOException;

/**
* Deletes a collection of files by grouping the files by FileSystem, then either calling
* {@code FileSystem#createBulkDelete(Path)} on FileSystem implementations that support it or by
* calling {@code #delete(Path)}.
*
* @param paths paths of files to delete
* @return paths map of input path to delete state
* @throws IOException on any exception deleting files
*/
Map<Path,DeleteStatus> deleteBulk(Collection<Path> paths) throws IOException;

// forward to the appropriate FileSystem object
boolean exists(Path path) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -53,6 +57,7 @@
import org.apache.accumulo.core.volume.VolumeConfiguration;
import org.apache.accumulo.core.volume.VolumeImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -73,6 +78,7 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.MoreExecutors;

public class VolumeManagerImpl implements VolumeManager {

Expand All @@ -86,6 +92,7 @@ public class VolumeManagerImpl implements VolumeManager {

private final Map<String,Volume> volumesByName;
private final Multimap<URI,Volume> volumesByFileSystemUri;
private final Map<FileSystem,Volume> bulkDeleteFileSystems;
private final VolumeChooser chooser;
private final AccumuloConfiguration conf;
private final Configuration hadoopConf;
Expand All @@ -95,6 +102,29 @@ protected VolumeManagerImpl(Map<String,Volume> volumes, AccumuloConfiguration co
this.volumesByName = volumes;
// We may have multiple directories used in a single FileSystem (e.g. testing)
this.volumesByFileSystemUri = invertVolumesByFileSystem(volumesByName);

this.bulkDeleteFileSystems = new HashMap<>();
for (Volume v : volumes.values()) {
FileSystem fs = v.getFileSystem();
String base = v.getBasePath().isBlank() ? "/" : v.getBasePath();
Path basePath = fs.makeQualified(new Path(base));
try {
if (fs.hasPathCapability(basePath, "fs.capability.bulk.delete")) {
try (BulkDelete bulk = fs.createBulkDelete(basePath)) {
// Don't use the BulkDelete API if the page size is only 1.
// The DefaultBulkDeleteOperation implementation has a page size of 1.
// The S3A FileSystem implementation has a default of 250 and
// is configured by the property `fs.s3a.bulk.delete.page.size`.
if (bulk.pageSize() > 1) {
this.bulkDeleteFileSystems.put(fs, v);
}
}
}
} catch (IllegalArgumentException | IOException e) {
log.warn("Error determining bulk delete capability for volume {}", v.getBasePath(), e);
}
}

ensureSyncIsEnabled();
// if they supplied a property and we cannot load it, then fail hard
VolumeChooser chooser1;
Expand Down Expand Up @@ -213,6 +243,110 @@ public boolean deleteRecursively(Path path) throws IOException {
return getFileSystemByPath(path).delete(path, true);
}

@Override
public Map<Path,DeleteStatus> deleteBulk(Collection<Path> paths) throws IOException {

requireNonNull(paths);

if (paths.isEmpty()) {
return Map.of();
}

if (paths.size() == 1) {
Path p = paths.iterator().next();
try {
if (delete(p)) {
return Map.of(p, DeleteStatus.TRUE);
} else {
return Map.of(p, DeleteStatus.FALSE);
}
} catch (IOException e) {
return Map.of(p, DeleteStatus.ERROR);
}
}

final Map<Volume,ArrayList<Path>> pathsSupportingBulkDelete = new HashMap<>();
final List<Path> pathsNotSupportingBulkDelete = new ArrayList<>();
final Map<Path,DeleteStatus> results = new ConcurrentHashMap<>();
final List<Future<Void>> futures = new LinkedList<>();

for (Path p : paths) {
FileSystem fs = getFileSystemByPath(p);
Volume v = this.bulkDeleteFileSystems.get(fs);
if (v != null) {
pathsSupportingBulkDelete.computeIfAbsent(v, (k) -> new ArrayList<Path>()).add(p);
} else {
pathsNotSupportingBulkDelete.add(p);
}
}

if (!pathsNotSupportingBulkDelete.isEmpty()) {
final ExecutorService delSvc;
if (pathsNotSupportingBulkDelete.size() < 4) {
// Do not bother creating a thread pool and threads for a few files.
delSvc = MoreExecutors.newDirectExecutorService();
} else {
delSvc = Executors.newFixedThreadPool(8);
}
pathsNotSupportingBulkDelete.forEach(p -> {
futures.add(delSvc.submit(() -> {
try {
if (delete(p)) {
results.put(p, DeleteStatus.TRUE);
} else {
results.put(p, DeleteStatus.FALSE);
}
return null;
} catch (IOException e) {
log.error("Error deleting file at {}", p, e);
results.put(p, DeleteStatus.ERROR);
return null;
}
}));
});
delSvc.shutdown();
}

for (Entry<Volume,ArrayList<Path>> e : pathsSupportingBulkDelete.entrySet()) {
Volume v = e.getKey();
List<Path> deletes = e.getValue();
FileSystem fs = v.getFileSystem();
Path basePath = fs.makeQualified(new Path(v.getBasePath()));
try (BulkDelete bulk = fs.createBulkDelete(basePath)) {
int batchSize = bulk.pageSize();
for (int i = 0; i <= deletes.size(); i += batchSize) {
List<Path> subset = deletes.subList(i, Math.min(i + batchSize, deletes.size()));
List<Entry<Path,String>> errors = bulk.bulkDelete(subset);
errors.forEach((entry) -> {
log.error("Failed to delete file at {}, reason: {}", entry.getKey(), entry.getValue());
results.put(entry.getKey(), DeleteStatus.ERROR);
if (!subset.remove(entry.getKey())) {
log.error("Did not find error path {} in input set {}", entry.getKey(), subset);
}
});
subset.forEach(success -> results.put(success, DeleteStatus.TRUE));
}
}
}

while (!futures.isEmpty()) {
Iterator<Future<Void>> iter = futures.iterator();
while (iter.hasNext()) {
Future<Void> f = iter.next();
if (f.isDone()) {
iter.remove();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
return results;
}

protected void ensureSyncIsEnabled() {
for (Entry<String,Volume> entry : volumesByName.entrySet()) {
FileSystem fs = entry.getValue().getFileSystem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
Expand All @@ -46,6 +47,7 @@
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager.DeleteStatus;
import org.apache.accumulo.server.util.FindCompactionTmpFiles.FindOpts;
import org.apache.accumulo.start.spi.CommandGroup;
import org.apache.accumulo.start.spi.CommandGroups;
Expand All @@ -59,7 +61,6 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.auto.service.AutoService;
import com.google.common.util.concurrent.MoreExecutors;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
Expand Down Expand Up @@ -173,70 +174,33 @@ public static void findTmpFiles(ServerContext ctx, TableId tableId, String dirNa
}
}

private static boolean deleteTmpFile(ServerContext context, Path p) throws IOException {
if (context.getVolumeManager().exists(p)) {
boolean result = context.getVolumeManager().delete(p);
if (result) {
LOG.debug("Removed old temp file {}", p);
} else {
LOG.error("Unable to remove old temp file {}, operation returned false with no exception",
p);
}
return result;
}
return true;
}

public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> filesToDelete) {

final ExecutorService delSvc;
if (filesToDelete.size() < 4) {
// Do not bother creating a thread pool and threads for a few files.
delSvc = MoreExecutors.newDirectExecutorService();
} else {
delSvc = Executors.newFixedThreadPool(8);
}

final DeleteStats stats = new DeleteStats();

// use a linked list to make removal from the middle of the list quick
final List<Future<Boolean>> futures = new LinkedList<>();

filesToDelete.forEach(p -> {
futures.add(delSvc.submit(() -> deleteTmpFile(context, p)));
});
delSvc.shutdown();

try {
int expectedResponses = filesToDelete.size();
while (expectedResponses > 0) {
Iterator<Future<Boolean>> iter = futures.iterator();
while (iter.hasNext()) {
Future<Boolean> future = iter.next();
if (future.isDone()) {
expectedResponses--;
iter.remove();
try {
if (future.get()) {
stats.success++;
} else {
stats.failure++;
}
} catch (ExecutionException e) {
stats.error++;
LOG.error("Error deleting a compaction tmp file", e);
}
}
}
if (expectedResponses > 0) {
LOG.debug("Waiting on {} background delete operations", expectedResponses);
UtilWaitThread.sleep(1_000);
Map<Path,DeleteStatus> results = context.getVolumeManager().deleteBulk(filesToDelete);
results.forEach((k, v) -> {
switch (v) {
case ERROR:
LOG.error("Error deleting a compaction tmp file {}", k);
stats.error++;
break;
case FALSE:
LOG.error(
"Unable to remove old temp file {}, operation returned false with no exception", k);
stats.failure++;
break;
case TRUE:
LOG.debug("Removed old temp file {}", k);
stats.success++;
break;
default:
break;
}
}
delSvc.awaitTermination(10, TimeUnit.MINUTES);
});
return stats;
} catch (InterruptedException e) {
throw new IllegalStateException(e);
} catch (IOException e) {
throw new UncheckedIOException("Error in VolumeManager.deleteBulk", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.data.AbstractId;
Expand Down Expand Up @@ -78,8 +79,7 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
Path renamingFile = new Path(bulkDir, Constants.BULK_RENAME_FILE);
Path mappingFile = new Path(bulkDir, Constants.BULK_LOAD_MAPPING);
try {
env.getVolumeManager().delete(renamingFile);
env.getVolumeManager().delete(mappingFile);
env.getVolumeManager().deleteBulk(Set.of(renamingFile, mappingFile));
} catch (IOException ioe) {
log.debug("{} Failed to delete renames and/or loadmap", fateId, ioe);
}
Expand Down