Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchDeleter;
Expand All @@ -60,6 +68,7 @@
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.core.file.FilePrefix;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SystemTables;
Expand All @@ -81,8 +90,10 @@
import org.apache.accumulo.core.util.Encoding;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
import org.apache.accumulo.core.util.tables.TableNameUtil;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.codec.VersionedProperties;
import org.apache.accumulo.server.conf.store.NamespacePropKey;
Expand All @@ -91,7 +102,9 @@
import org.apache.accumulo.server.conf.store.TablePropKey;
import org.apache.accumulo.server.init.FileSystemInitializer;
import org.apache.accumulo.server.init.InitialConfiguration;
import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats;
import org.apache.accumulo.server.util.PropUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -300,6 +313,8 @@ public void upgradeMetadata(ServerContext context) {
removeCompactColumnsFromTable(context, SystemTables.METADATA.tableName());
LOG.info("Removing bulk file columns from metadata table");
removeBulkFileColumnsFromTable(context, SystemTables.METADATA.tableName());
LOG.info("Removing major compaction temp files from prior versions");
deleteCompactionTempFiles(context, new DeleteStats(), new HashSet<Path>());
}

private static void addAssistantManager(ServerContext context) {
Expand Down Expand Up @@ -921,6 +936,7 @@ private void upgradeTabletsMetadata(@NonNull ServerContext context, String metaN
}
}

@VisibleForTesting
public void removeScanServerRanges(ServerContext context) {
try (BatchDeleter batchDeleter =
context.createBatchDeleter(Ample.DataLevel.USER.metaTable(), Authorizations.EMPTY, 4)) {
Expand Down Expand Up @@ -988,4 +1004,108 @@ void moveTableProperties(ServerContext context) {
LOG.info(
"Moving table properties from system configuration to namespace configurations complete.");
}

@VisibleForTesting
public void deleteCompactionTempFiles(final ServerContext ctx, final DeleteStats stats,
final Collection<Path> deletedFiles) {

final String pattern = "/tables/*/*/*";
final Collection<Volume> vols = ctx.getVolumeManager().getVolumes();
final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
final List<Future<Void>> futures = new ArrayList<>(vols.size());
final Set<Path> oldCompactionTmpFiles = ConcurrentHashMap.newKeySet();

for (Volume vol : vols) {
final Path volPattern = new Path(vol.getBasePath() + pattern);
LOG.info("Looking for old compaction tmp files that match pattern: {}", volPattern);
futures.add(svc.submit(() -> {
try {
FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
(p) -> (p.getName().startsWith("" + FilePrefix.FULL_COMPACTION.getPrefix())
|| p.getName().startsWith("" + FilePrefix.COMPACTION.getPrefix()))
&& p.getName().endsWith(".rf_tmp"));
Arrays.stream(files).forEach(fs -> oldCompactionTmpFiles.add(fs.getPath()));
} catch (IOException e) {
LOG.error("Error looking for old compaction tmp files in volume: {}", vol, e);
}
return null;
}));
}
svc.shutdown();

LOG.info("Waiting for tasks to complete finding files");
while (futures.size() > 0) {
Iterator<Future<Void>> iter = futures.iterator();
while (iter.hasNext()) {
Future<Void> future = iter.next();
if (future.isDone()) {
iter.remove();
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Error getting list of old compaction tmp files", e);
}
}
}
int remaining = futures.size();
if (remaining > 0) {
LOG.debug("Waiting for {} tasks to complete", remaining);
UtilWaitThread.sleep(3_000);
}
}
LOG.info("Found {} old compaction tmp files:", oldCompactionTmpFiles.size());
oldCompactionTmpFiles.forEach(p -> LOG.debug("{}", p));

LOG.info("Deleting old compaction tmp files...");
final ExecutorService delSvc = Executors.newFixedThreadPool(vols.size());
// use a linked list to make removal from the middle of the list quick
final List<Future<Boolean>> delFutures = new LinkedList<>();

final Set<Path> filesDeleted = ConcurrentHashMap.newKeySet();
oldCompactionTmpFiles.forEach(p -> {
delFutures.add(delSvc.submit(() -> {
if (ctx.getVolumeManager().exists(p)) {
boolean result = ctx.getVolumeManager().delete(p);
if (result) {
LOG.debug("Removed old temp file {}", p);
filesDeleted.add(p);
} else {
LOG.error(
"Unable to remove old temp file {}, operation returned false with no exception", p);
}
return result;
}
return true;
}));
});
delSvc.shutdown();

while (delFutures.size() > 0) {
Iterator<Future<Boolean>> iter = delFutures.iterator();
while (iter.hasNext()) {
Future<Boolean> future = iter.next();
if (future.isDone()) {
iter.remove();
try {
if (future.get()) {
stats.success++;
} else {
stats.failure++;
}
} catch (InterruptedException | ExecutionException e) {
stats.error++;
LOG.error("Error deleting a compaction tmp file", e);
}
}
}
int remaining = delFutures.size();
if (remaining > 0) {
LOG.debug("Waiting on {} background delete operations", remaining);
UtilWaitThread.sleep(3_000);
}
}
deletedFiles.addAll(filesDeleted);
LOG.info("Deletion of compaction tmp files completed. Success:{}, Failure:{}, Error:{}",
stats.success, stats.failure, stats.error);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.upgrade;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.manager.upgrade.Upgrader11to12;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TmpFileUpgrade11to12TestIT extends SharedMiniClusterBase {

private static final Logger LOG = LoggerFactory.getLogger(TmpFileUpgrade11to12TestIT.class);

@BeforeAll
public static void start() throws Exception {
SharedMiniClusterBase.startMiniCluster();
}

@AfterAll
public static void stop() throws Exception {
stopMiniCluster();
}

@Test
public void testDeleteOldCompactionTmpFiles() throws Exception {

final Set<String> fileNames =
Set.of("A00001.rf", "F000002.rf", "F000003.rf_tmp", "C000004.rf", "C000005.rf_tmp",
"C000006.rf_tmp_" + ExternalCompactionId.generate(UUID.randomUUID()), "A000007.rf_tmp");

final Set<String> validFileNames = new HashSet<>(fileNames);
validFileNames.removeAll(Set.of("C000005.rf_tmp", "A000007.rf_tmp"));

final ServerContext ctx = getCluster().getServerContext();
String[] tableNames = getUniqueNames(10);

final Set<String> volumePaths = new HashSet<>();
ctx.getVolumeManager().getVolumes().forEach(v -> volumePaths.add(v.getBasePath()));

SortedSet<Text> splits = new TreeSet<>();
IntStream.range(0, 9).forEach(i -> splits.add(new Text("" + i)));

try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
for (String tableName : tableNames) {
client.tableOperations().create(tableName);
client.tableOperations().addSplits(tableName, splits);
}

final Map<String,String> tableNameIdMap = ctx.tableOperations().tableIdMap();
final Map<TableId,Set<String>> tableNameDirsMap = new HashMap<>();
final AtomicInteger numTabletDirs = new AtomicInteger(0);
for (Entry<String,String> e : tableNameIdMap.entrySet()) {
final TableId tid = TableId.of(e.getValue());
if (SystemTables.containsTableId(tid)) {
continue;
}
try (TabletsMetadata tm =
ctx.getAmple().readTablets().forTable(tid).fetch(ColumnType.DIR).build()) {
Set<String> tableDirs = new HashSet<>();
tm.forEach(t -> {
numTabletDirs.incrementAndGet();
tableDirs.add(t.getDirName());
});
tableNameDirsMap.put(tid, tableDirs);
}
}

final Set<Path> createdPaths = new HashSet<>();
for (Entry<TableId,Set<String>> e : tableNameDirsMap.entrySet()) {
if (SystemTables.containsTableId(e.getKey())) {
continue;
}
for (String tabletDirName : e.getValue()) {
String tabletDirPath = Constants.HDFS_TABLES_DIR + Path.SEPARATOR + e.getKey()
+ Path.SEPARATOR + tabletDirName;
for (String fileName : fileNames) {
for (String volume : volumePaths) {
Path p =
new Path(volume + Path.SEPARATOR + tabletDirPath + Path.SEPARATOR + fileName);
assertTrue(ctx.getVolumeManager().createNewFile(p));
createdPaths.add(p);
}
}
}
}
LOG.info("Created {} files", createdPaths.size());
LOG.info("{}", createdPaths);

Upgrader11to12 upgrader = new Upgrader11to12();
DeleteStats stats = new DeleteStats();
Set<Path> deleted = new HashSet<Path>();
upgrader.deleteCompactionTempFiles(ctx, stats, deleted);
assertEquals(numTabletDirs.get() * 2, deleted.size());
assertEquals(0, stats.error);
assertEquals(0, stats.failure);
assertEquals(deleted.size(), stats.success);

for (Path delete : deleted) {
assertFalse(validFileNames.contains(delete.getName()));
}
}
}

}