From 50d62aabcf49a317d50d40e9e0cb396a21490145 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 7 Apr 2026 12:56:38 -0700 Subject: [PATCH] HBASE-30061 Add EWMA-based BlockCompressedSizePredicator PreviousBlockCompressionRatePredicator has three algorithmic deficiencies that cause compressed blocks to systematically undershoot the configured block size target: integer division truncation, single-sample estimation, and no smoothing of the estimated compression ratio. EWMABlockSizePredicator addresses these issues with double-precision arithmetic and weighted moving average smoothed estimation of the compression ratio. This produces compressed HFile blocks that are closer to the configured target block size. The ratio is smoothed using a default alpha of 0.5. This adapts quickly to changing data while dampening single-block variance. After 3 blocks, the EWMA captures 87.5% of the true ratio. Alpha = 0.5 is chosen because HFile blocks within a single file tend to have similar compression ratios (same column family, similar data distribution), and fast adaptation matters more than long-term smoothing since predicator state is per-file. Adds HFileBlockPerformanceEvaluation to microbenchmark HFileBlock related concerns. --- .../HFileBlockPerformanceEvaluation.java | 716 ++++++++++++++++++ .../io/hfile/EWMABlockSizePredicator.java | 111 +++ .../io/hfile/TestEWMABlockSizePredicator.java | 249 ++++++ 3 files changed, 1076 insertions(+) create mode 100644 hbase-diagnostics/src/main/java/org/apache/hadoop/hbase/HFileBlockPerformanceEvaluation.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/EWMABlockSizePredicator.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestEWMABlockSizePredicator.java diff --git a/hbase-diagnostics/src/main/java/org/apache/hadoop/hbase/HFileBlockPerformanceEvaluation.java b/hbase-diagnostics/src/main/java/org/apache/hadoop/hbase/HFileBlockPerformanceEvaluation.java new file mode 100644 index 000000000000..2581ab6651ca --- /dev/null +++ b/hbase-diagnostics/src/main/java/org/apache/hadoop/hbase/HFileBlockPerformanceEvaluation.java @@ -0,0 +1,716 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.EWMABlockSizePredicator; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.io.hfile.PreviousBlockCompressionRatePredicator; +import org.apache.hadoop.hbase.io.hfile.UncompressedBlockSizePredicator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Performance evaluation utility for HFile block encoding, compression algorithms, and block size + * predicators ({@link BlockCompressedSizePredicator} implementations). + *

+ * Tests are parameterized by number of blocks rather than number of rows. A tunable value generator + * produces cell values that compress to approximately the requested target ratio. The tool sweeps + * all combinations of compression algorithm, data block encoding, and block size (powers of two + * between min and max), and for predicator tests, also sweeps all three predicator implementations. + *

+ *

Usage

+ * + *
+ * HFileBlockPerformanceEvaluation [options]
+ *   --blocks N              Number of blocks per test (default: 100)
+ *   --compressions LIST     Comma-separated compression algorithms (default: none,gz)
+ *   --encodings LIST        Comma-separated data block encodings (default: none,fast_diff)
+ *   --min-block-size BYTES  Minimum block size in bytes (default: 8192)
+ *   --max-block-size BYTES  Maximum block size in bytes (default: 131072)
+ *   --target-ratio FLOAT    Target compression ratio (default: 3.0)
+ *   --value-size BYTES      Value size per cell in bytes (default: 1000)
+ *   --predicators-only      Run only predicator accuracy tests
+ *   --throughput-only       Run only read/write throughput tests
+ * 
+ */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class HFileBlockPerformanceEvaluation { + + private static final Logger LOG = LoggerFactory.getLogger(HFileBlockPerformanceEvaluation.class); + + static { + System.setProperty("org.apache.commons.logging.Log", + "org.apache.commons.logging.impl.SimpleLog"); + System.setProperty( + "org.apache.commons.logging.simplelog.log.org.apache.hadoop.hbase.io.compress.CodecPool", + "WARN"); + } + + private static final int ROW_KEY_LENGTH = 16; + + private int numBlocks = 100; + private int minBlockSize = 8 * 1024; + private int maxBlockSize = 128 * 1024; + private double targetRatio = 3.0; + private int valueSize = 1000; + private boolean predicatorsOnly = false; + private boolean throughputOnly = false; + + private List compressions = new ArrayList<>(); + private List encodings = new ArrayList<>(); + + // ---- Value generator ---- + + /** + * Generates byte arrays that compress to approximately the given target ratio. A fraction (1 - + * 1/ratio) of the bytes are a repeating pattern and the rest are random. The repeating portion + * compresses nearly to zero while the random portion is incompressible, so the overall ratio + * approaches the target. + */ + static class CompressibleValueGenerator { + private final int valueSize; + private final int randomBytes; + private final byte patternByte; + + CompressibleValueGenerator(int valueSize, double targetRatio) { + this.valueSize = valueSize; + double incompressibleFraction = 1.0 / targetRatio; + this.randomBytes = Math.max(1, (int) (valueSize * incompressibleFraction)); + this.patternByte = (byte) 0x42; + } + + byte[] generate() { + byte[] value = new byte[valueSize]; + Arrays.fill(value, patternByte); + byte[] rand = new byte[randomBytes]; + Bytes.random(rand); + System.arraycopy(rand, 0, value, 0, Math.min(randomBytes, valueSize)); + return value; + } + } + + // ---- Cell creation ---- + + static byte[] formatRowKey(long i) { + String v = Long.toString(i); + StringBuilder sb = new StringBuilder(ROW_KEY_LENGTH); + for (int pad = ROW_KEY_LENGTH - v.length(); pad > 0; pad--) { + sb.append('0'); + } + sb.append(v); + return Bytes.toBytes(sb.toString()); + } + + static ExtendedCell createCell(long i, byte[] value) { + return ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(formatRowKey(i)) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("q")) + .setTimestamp(EnvironmentEdgeManager.currentTime()).setType(KeyValue.Type.Put.getCode()) + .setValue(value).build(); + } + + // ---- Block size sweep ---- + + List blockSizeSweep() { + List sizes = new ArrayList<>(); + int size = Integer.highestOneBit(minBlockSize); + if (size < minBlockSize) { + size <<= 1; + } + while (size <= maxBlockSize) { + sizes.add(size); + size <<= 1; + } + if (sizes.isEmpty()) { + sizes.add(minBlockSize); + } + return sizes; + } + + // ---- Predicator classes ---- + + @SuppressWarnings("unchecked") + static final Class[] PREDICATOR_CLASSES = + new Class[] { UncompressedBlockSizePredicator.class, + PreviousBlockCompressionRatePredicator.class, EWMABlockSizePredicator.class }; + + // ---- Write helper: writes enough cells to fill numBlocks blocks ---- + + /** + * Writes an HFile with the given parameters and returns the path. Writes cells until at least + * {@code numBlocks} data blocks are produced. + */ + Path writeHFile(Configuration conf, FileSystem fs, Path dir, Compression.Algorithm compression, + DataBlockEncoding encoding, int blockSize, String label) throws IOException { + Path path = new Path(dir, "blockencpe-" + label + ".hfile"); + if (fs.exists(path)) { + fs.delete(path, false); + } + + HFileContext context = new HFileContextBuilder().withCompression(compression) + .withDataBlockEncoding(encoding).withBlockSize(blockSize).build(); + + CompressibleValueGenerator valueGen = new CompressibleValueGenerator(valueSize, targetRatio); + + long rowIndex = 0; + // Estimate rows per block based on uncompressed cell size to avoid writing unnecessary cells. + // Each cell is roughly ROW_KEY_LENGTH + family + qualifier + value + KV overhead. + int approxCellSize = ROW_KEY_LENGTH + 2 + 1 + valueSize + KeyValue.FIXED_OVERHEAD; + long estimatedRowsPerBlock = Math.max(1, blockSize / approxCellSize); + // For compressed data the predicator will let blocks grow larger, account for the ratio. + if (compression != Compression.Algorithm.NONE) { + estimatedRowsPerBlock = (long) (estimatedRowsPerBlock * targetRatio); + } + long maxRows = estimatedRowsPerBlock * numBlocks * 3L; + + try (HFile.Writer writer = + HFile.getWriterFactoryNoCache(conf).withPath(fs, path).withFileContext(context).create()) { + for (rowIndex = 0; rowIndex < maxRows; rowIndex++) { + writer.append(createCell(rowIndex, valueGen.generate())); + } + } + return path; + } + + // ---- Predicator accuracy benchmark ---- + + static class PredicatorAccuracyResult { + final String predicatorName; + final String compression; + final String encoding; + final int blockSize; + final int actualBlocks; + final double meanCompressedSize; + final double stddevCompressedSize; + final int minCompressedSize; + final int maxCompressedSize; + final double meanDeviationPct; + + PredicatorAccuracyResult(String predicatorName, String compression, String encoding, + int blockSize, int actualBlocks, double meanCompressedSize, double stddevCompressedSize, + int minCompressedSize, int maxCompressedSize, double meanDeviationPct) { + this.predicatorName = predicatorName; + this.compression = compression; + this.encoding = encoding; + this.blockSize = blockSize; + this.actualBlocks = actualBlocks; + this.meanCompressedSize = meanCompressedSize; + this.stddevCompressedSize = stddevCompressedSize; + this.minCompressedSize = minCompressedSize; + this.maxCompressedSize = maxCompressedSize; + this.meanDeviationPct = meanDeviationPct; + } + } + + List runPredicatorAccuracyTests(Configuration baseConf, FileSystem fs, + Path dir) throws IOException { + List results = new ArrayList<>(); + List blockSizes = blockSizeSweep(); + + for (Compression.Algorithm compression : compressions) { + for (DataBlockEncoding encoding : encodings) { + for (int blockSize : blockSizes) { + for (Class predClass : PREDICATOR_CLASSES) { + String predName = predClass.getSimpleName(); + String label = String.format("%s-%s-%d-%s", compression.getName(), encoding.name(), + blockSize, predName); + + Configuration conf = new Configuration(baseConf); + conf.setClass(BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR, predClass, + BlockCompressedSizePredicator.class); + + LOG.info("Predicator accuracy test: compression={}, encoding={}, blockSize={}, " + + "predicator={}", compression.getName(), encoding.name(), blockSize, predName); + + long startTime = EnvironmentEdgeManager.currentTime(); + Path hfilePath = writeHFile(conf, fs, dir, compression, encoding, blockSize, label); + long writeElapsed = EnvironmentEdgeManager.currentTime() - startTime; + + List compressedBlockSizes = new ArrayList<>(); + try (HFile.Reader reader = + HFile.createReader(fs, hfilePath, CacheConfig.DISABLED, true, conf)) { + try (HFileScanner scanner = reader.getScanner(conf, false, false)) { + scanner.seekTo(); + HFileBlock prevBlock = null; + do { + Cell cell = scanner.getCell(); + if (cell == null) { + break; + } + // We iterate cells but track block transitions via the scanner's internal block + } while (scanner.next()); + } + + // Read block-level info by traversing the data index + int dataBlockCount = reader.getTrailer().getDataIndexCount(); + // Use the reader's block iterator to get on-disk sizes + try (HFileScanner scanner = reader.getScanner(conf, false, false)) { + if (scanner.seekTo()) { + HFileBlock block = null; + long offset = reader.getTrailer().getFirstDataBlockOffset(); + long lastOffset = -1; + // Read blocks directly + while (offset >= 0 && offset != lastOffset) { + lastOffset = offset; + try { + block = reader.readBlock(offset, -1, false, false, false, true, null, + reader.getDataBlockEncoding()); + if (block == null || !block.getBlockType().isData()) { + break; + } + compressedBlockSizes.add(block.getOnDiskSizeWithHeader()); + long nextOffset = offset + block.getOnDiskSizeWithHeader(); + block.release(); + block = null; + offset = nextOffset; + } catch (Exception e) { + break; + } + } + } + } + } + + // Compute stats + int actualBlocks = compressedBlockSizes.size(); + double mean = 0; + int min = Integer.MAX_VALUE; + int max = Integer.MIN_VALUE; + for (int s : compressedBlockSizes) { + mean += s; + min = Math.min(min, s); + max = Math.max(max, s); + } + if (actualBlocks > 0) { + mean /= actualBlocks; + } + double variance = 0; + for (int s : compressedBlockSizes) { + variance += (s - mean) * (s - mean); + } + double stddev = actualBlocks > 1 ? Math.sqrt(variance / (actualBlocks - 1)) : 0; + double meanDeviationPct = + blockSize > 0 ? Math.abs(mean - blockSize) / blockSize * 100.0 : 0; + + PredicatorAccuracyResult result = new PredicatorAccuracyResult(predName, + compression.getName(), encoding.name(), blockSize, actualBlocks, mean, stddev, + actualBlocks > 0 ? min : 0, actualBlocks > 0 ? max : 0, meanDeviationPct); + results.add(result); + + LOG.info( + " predicator={}: blocks={}, meanOnDisk={}, stddev={}, min={}, max={}, " + + "devFromTarget={}%, writeTime={}ms", + predName, actualBlocks, String.format("%.0f", mean), String.format("%.0f", stddev), + actualBlocks > 0 ? min : "N/A", actualBlocks > 0 ? max : "N/A", + String.format("%.1f", meanDeviationPct), writeElapsed); + + // Cleanup + fs.delete(hfilePath, false); + } + } + } + } + return results; + } + + // ---- Write throughput benchmark ---- + + static class ThroughputResult { + final String testName; + final String compression; + final String encoding; + final int blockSize; + final long elapsedMs; + final long totalBytes; + final double mbPerSec; + final int blockCount; + + ThroughputResult(String testName, String compression, String encoding, int blockSize, + long elapsedMs, long totalBytes, double mbPerSec, int blockCount) { + this.testName = testName; + this.compression = compression; + this.encoding = encoding; + this.blockSize = blockSize; + this.elapsedMs = elapsedMs; + this.totalBytes = totalBytes; + this.mbPerSec = mbPerSec; + this.blockCount = blockCount; + } + } + + List runWriteBenchmarks(Configuration baseConf, FileSystem fs, Path dir) + throws IOException { + List results = new ArrayList<>(); + List blockSizes = blockSizeSweep(); + + for (Compression.Algorithm compression : compressions) { + for (DataBlockEncoding encoding : encodings) { + for (int blockSize : blockSizes) { + String label = + String.format("write-%s-%s-%d", compression.getName(), encoding.name(), blockSize); + + Configuration conf = new Configuration(baseConf); + // Use default predicator for throughput tests + conf.setClass(BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR, + UncompressedBlockSizePredicator.class, BlockCompressedSizePredicator.class); + + LOG.info("Write benchmark: compression={}, encoding={}, blockSize={}", + compression.getName(), encoding.name(), blockSize); + + long startTime = EnvironmentEdgeManager.currentTime(); + Path hfilePath = writeHFile(conf, fs, dir, compression, encoding, blockSize, label); + long elapsed = EnvironmentEdgeManager.currentTime() - startTime; + + long fileSize = fs.getFileStatus(hfilePath).getLen(); + int blockCount = 0; + try (HFile.Reader reader = + HFile.createReader(fs, hfilePath, CacheConfig.DISABLED, true, conf)) { + blockCount = reader.getTrailer().getDataIndexCount(); + } + + double mbps = elapsed > 0 ? (fileSize / (1024.0 * 1024.0)) / (elapsed / 1000.0) : 0; + + ThroughputResult result = new ThroughputResult("SequentialWrite", compression.getName(), + encoding.name(), blockSize, elapsed, fileSize, mbps, blockCount); + results.add(result); + + LOG.info(" elapsed={}ms, fileSize={}, blocks={}, throughput={} MB/s", elapsed, fileSize, + blockCount, String.format("%.2f", mbps)); + + // Keep file for read benchmarks if needed, otherwise clean up + if (throughputOnly || !predicatorsOnly) { + // Will be read by read benchmarks; clean up after read + } else { + fs.delete(hfilePath, false); + } + } + } + } + return results; + } + + // ---- Read throughput benchmarks ---- + + List runReadBenchmarks(Configuration baseConf, FileSystem fs, Path dir) + throws IOException { + List results = new ArrayList<>(); + List blockSizes = blockSizeSweep(); + + for (Compression.Algorithm compression : compressions) { + for (DataBlockEncoding encoding : encodings) { + for (int blockSize : blockSizes) { + String label = + String.format("write-%s-%s-%d", compression.getName(), encoding.name(), blockSize); + Path hfilePath = new Path(dir, "blockencpe-" + label + ".hfile"); + + if (!fs.exists(hfilePath)) { + // Write the file first if it doesn't exist + Configuration writeConf = new Configuration(baseConf); + writeConf.setClass(BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR, + UncompressedBlockSizePredicator.class, BlockCompressedSizePredicator.class); + hfilePath = writeHFile(writeConf, fs, dir, compression, encoding, blockSize, label); + } + + Configuration conf = new Configuration(baseConf); + + // Sequential read + LOG.info("Sequential read benchmark: compression={}, encoding={}, blockSize={}", + compression.getName(), encoding.name(), blockSize); + + long totalBytesRead = 0; + int cellCount = 0; + long startTime = EnvironmentEdgeManager.currentTime(); + try (HFile.Reader reader = + HFile.createReader(fs, hfilePath, CacheConfig.DISABLED, true, conf)) { + try (HFileScanner scanner = reader.getScanner(conf, false, false)) { + scanner.seekTo(); + do { + Cell cell = scanner.getCell(); + if (cell == null) { + break; + } + totalBytesRead += cell.getSerializedSize(); + cellCount++; + } while (scanner.next()); + } + } + long seqElapsed = EnvironmentEdgeManager.currentTime() - startTime; + + double seqMbps = + seqElapsed > 0 ? (totalBytesRead / (1024.0 * 1024.0)) / (seqElapsed / 1000.0) : 0; + + results.add(new ThroughputResult("SequentialRead", compression.getName(), encoding.name(), + blockSize, seqElapsed, totalBytesRead, seqMbps, cellCount)); + + LOG.info(" sequential: elapsed={}ms, cells={}, throughput={} MB/s", seqElapsed, + cellCount, String.format("%.2f", seqMbps)); + + // Random read + LOG.info("Random read benchmark: compression={}, encoding={}, blockSize={}", + compression.getName(), encoding.name(), blockSize); + + int randomReads = numBlocks; + startTime = EnvironmentEdgeManager.currentTime(); + try (HFile.Reader reader = + HFile.createReader(fs, hfilePath, CacheConfig.DISABLED, true, conf)) { + long entryCount = reader.getEntries(); + for (int i = 0; i < randomReads; i++) { + long randomRow = ThreadLocalRandom.current().nextLong(entryCount); + byte[] rowKey = formatRowKey(randomRow); + ExtendedCell seekCell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(rowKey).setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("q")) + .setTimestamp(Long.MAX_VALUE).setType(KeyValue.Type.Maximum.getCode()) + .setValue(HConstants.EMPTY_BYTE_ARRAY).build(); + try (HFileScanner scanner = reader.getScanner(conf, false, true)) { + scanner.seekTo(seekCell); + Cell cell = scanner.getCell(); + if (cell != null) { + totalBytesRead += cell.getSerializedSize(); + } + } + } + } + long randElapsed = EnvironmentEdgeManager.currentTime() - startTime; + double opsPerSec = randElapsed > 0 ? (randomReads * 1000.0) / randElapsed : 0; + + results.add(new ThroughputResult("RandomRead", compression.getName(), encoding.name(), + blockSize, randElapsed, randomReads, opsPerSec, randomReads)); + + LOG.info(" random: elapsed={}ms, reads={}, throughput={} ops/s", randElapsed, + randomReads, String.format("%.0f", opsPerSec)); + + // Cleanup + fs.delete(hfilePath, false); + } + } + } + return results; + } + + // ---- Summary formatting ---- + + static String shortPredicatorName(String className) { + if (className.contains("Uncompressed")) { + return "Uncompressed"; + } else if (className.contains("Previous")) { + return "PrevBlock"; + } else if (className.contains("EWMA")) { + return "EWMA"; + } + return className; + } + + void printPredicatorSummary(List results) { + System.out.println(); + System.out.println( + "=========================================================================================="); + System.out.println(" PREDICATOR ACCURACY RESULTS"); + System.out.println( + "=========================================================================================="); + System.out.println(); + System.out.printf("%-14s %-6s %-10s %7s %6s %10s %8s %10s %10s %7s%n", "Predicator", "Compr", + "Encoding", "BlkSize", "Blocks", "MeanOnDisk", "Stddev", "Min", "Max", "Dev%"); + System.out.printf("%-14s %-6s %-10s %7s %6s %10s %8s %10s %10s %7s%n", "--------------", + "------", "----------", "-------", "------", "----------", "--------", "----------", + "----------", "-------"); + + String prevGroup = null; + for (PredicatorAccuracyResult r : results) { + String group = r.compression + "|" + r.encoding + "|" + r.blockSize; + if (prevGroup != null && !prevGroup.equals(group)) { + System.out.println(); + } + prevGroup = group; + System.out.printf("%-14s %-6s %-10s %7d %6d %10.0f %8.0f %10d %10d %6.1f%%%n", + shortPredicatorName(r.predicatorName), r.compression, r.encoding, r.blockSize, + r.actualBlocks, r.meanCompressedSize, r.stddevCompressedSize, r.minCompressedSize, + r.maxCompressedSize, r.meanDeviationPct); + } + System.out.println(); + } + + void printThroughputSummary(List results) { + System.out.println(); + System.out.println( + "=========================================================================================="); + System.out.println(" THROUGHPUT RESULTS"); + System.out.println( + "=========================================================================================="); + System.out.println(); + System.out.printf("%-16s %-6s %-10s %7s %10s %12s %14s%n", "Test", "Compr", "Encoding", + "BlkSize", "Elapsed(ms)", "Bytes/Count", "Throughput"); + System.out.printf("%-16s %-6s %-10s %7s %10s %12s %14s%n", "----------------", "------", + "----------", "-------", "----------", "------------", "--------------"); + + String prevTest = null; + for (ThroughputResult r : results) { + if (prevTest != null && !prevTest.equals(r.testName)) { + System.out.println(); + } + prevTest = r.testName; + String throughput; + if ("RandomRead".equals(r.testName)) { + throughput = String.format("%.0f ops/s", r.mbPerSec); + } else { + throughput = String.format("%.2f MB/s", r.mbPerSec); + } + System.out.printf("%-16s %-6s %-10s %7d %10d %12d %14s%n", r.testName, r.compression, + r.encoding, r.blockSize, r.elapsedMs, r.totalBytes, throughput); + } + System.out.println(); + } + + // ---- Main driver ---- + + void runBenchmarks() throws Exception { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + Path dir = fs.makeQualified(new Path("blockenc-pe-" + System.currentTimeMillis())); + fs.mkdirs(dir); + + LOG.info("HFileBlockPerformanceEvaluation starting"); + LOG.info(" blocks={}, minBlockSize={}, maxBlockSize={}, targetRatio={}, valueSize={}", + numBlocks, minBlockSize, maxBlockSize, targetRatio, valueSize); + LOG.info(" compressions={}", compressions); + LOG.info(" encodings={}", encodings); + LOG.info(" output dir={}", dir); + + try { + if (!throughputOnly) { + List predResults = runPredicatorAccuracyTests(conf, fs, dir); + printPredicatorSummary(predResults); + } + + if (!predicatorsOnly) { + List throughputResults = new ArrayList<>(); + + List writeResults = runWriteBenchmarks(conf, fs, dir); + throughputResults.addAll(writeResults); + + List readResults = runReadBenchmarks(conf, fs, dir); + throughputResults.addAll(readResults); + + printThroughputSummary(throughputResults); + } + + } finally { + fs.delete(dir, true); + } + } + + // ---- CLI parsing ---- + + void parseArgs(String[] args) { + for (int i = 0; i < args.length; i++) { + switch (args[i]) { + case "--blocks": + numBlocks = Integer.parseInt(args[++i]); + break; + case "--compressions": + compressions.clear(); + for (String name : args[++i].split(",")) { + compressions + .add(Compression.getCompressionAlgorithmByName(name.trim().toLowerCase(Locale.ROOT))); + } + break; + case "--encodings": + encodings.clear(); + for (String name : args[++i].split(",")) { + encodings.add(DataBlockEncoding.valueOf(name.trim().toUpperCase(Locale.ROOT))); + } + break; + case "--min-block-size": + minBlockSize = Integer.parseInt(args[++i]); + break; + case "--max-block-size": + maxBlockSize = Integer.parseInt(args[++i]); + break; + case "--target-ratio": + targetRatio = Double.parseDouble(args[++i]); + break; + case "--value-size": + valueSize = Integer.parseInt(args[++i]); + break; + case "--predicators-only": + predicatorsOnly = true; + break; + case "--throughput-only": + throughputOnly = true; + break; + default: + LOG.warn("Unknown argument: {}", args[i]); + printUsage(); + System.exit(1); + } + } + + if (compressions.isEmpty()) { + compressions.add(Compression.Algorithm.NONE); + compressions.add(Compression.Algorithm.GZ); + } + + if (encodings.isEmpty()) { + encodings.add(DataBlockEncoding.NONE); + encodings.add(DataBlockEncoding.FAST_DIFF); + } + + if (targetRatio < 1.0) { + throw new IllegalArgumentException("target-ratio must be >= 1.0, got " + targetRatio); + } + } + + static void printUsage() { + System.err.println("Usage: HFileBlockPerformanceEvaluation [options]"); + System.err.println(" --blocks N Number of blocks per test (default: 100)"); + System.err.println( + " --compressions LIST Comma-separated compression algorithms (default: none,gz)"); + System.err.println( + " --encodings LIST Comma-separated data block encodings (default: none,fast_diff)"); + System.err.println(" --min-block-size BYTES Minimum block size (default: 8192)"); + System.err.println(" --max-block-size BYTES Maximum block size (default: 131072)"); + System.err.println(" --target-ratio FLOAT Target compression ratio (default: 3.0)"); + System.err.println(" --value-size BYTES Value size per cell (default: 1000)"); + System.err.println(" --predicators-only Run only predicator accuracy tests"); + System.err.println(" --throughput-only Run only read/write throughput tests"); + } + + public static void main(String[] args) throws Exception { + HFileBlockPerformanceEvaluation eval = new HFileBlockPerformanceEvaluation(); + eval.parseArgs(args); + eval.runBenchmarks(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/EWMABlockSizePredicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/EWMABlockSizePredicator.java new file mode 100644 index 000000000000..ae5664b9e67a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/EWMABlockSizePredicator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A {@link BlockCompressedSizePredicator} that uses an Exponentially Weighted Moving Average (EWMA) + * of the compression ratio to predict the uncompressed block size needed to produce compressed + * blocks close to the configured target block size. + */ +@InterfaceAudience.Private +public class EWMABlockSizePredicator implements BlockCompressedSizePredicator, Configurable { + + public static final String EWMA_ALPHA_KEY = "hbase.block.compressed.size.predicator.ewma.alpha"; + static final double DEFAULT_ALPHA = 0.5; + + private Configuration conf; + private double alpha = DEFAULT_ALPHA; + private double ewmaRatio; + private int adjustedBlockSize; + private int configuredMaxBlockSize; + private boolean initialized; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.alpha = conf.getDouble(EWMA_ALPHA_KEY, DEFAULT_ALPHA); + } + + @Override + public Configuration getConf() { + return conf; + } + + /** + * Estimates the effective compression ratio and adjusts the block size limit. On the first block, + * the EWMA is seeded with the observed ratio. On subsequent blocks, the ratio is smoothed: + * {@code ewmaRatio = alpha * currentRatio + (1 - alpha) * ewmaRatio}. If {@code compressed} is + * zero or negative, the update is skipped to avoid corrupting the EWMA state with + * {@code Infinity} or {@code NaN}. + * @param context HFileContext containing the configured max block size. + * @param uncompressed the uncompressed (encoded) size of last block written. + * @param compressed the compressed size of last block written. + */ + @Override + public void updateLatestBlockSizes(HFileContext context, int uncompressed, int compressed) { + configuredMaxBlockSize = context.getBlocksize(); + if (compressed <= 0) { + return; + } + double currentRatio = (double) uncompressed / compressed; + if (!initialized) { + ewmaRatio = currentRatio; + initialized = true; + } else { + ewmaRatio = alpha * currentRatio + (1.0 - alpha) * ewmaRatio; + } + adjustedBlockSize = (int) (configuredMaxBlockSize * ewmaRatio); + } + + /** + * Returns {@code true} if the block should be finished. Before any block has been written (cold + * start), returns {@code true} to fall through to the default uncompressed-size-based decision. + * After the first block, returns {@code true} only when the uncompressed size reaches the + * EWMA-adjusted target. + * @param uncompressed the current uncompressed size of the block being written. + */ + @Override + public boolean shouldFinishBlock(int uncompressed) { + if (!initialized) { + return true; + } + if (uncompressed >= configuredMaxBlockSize) { + return uncompressed >= adjustedBlockSize; + } + return false; + } + + /** Returns the current EWMA compression ratio estimate. Visible for testing. */ + double getEwmaRatio() { + return ewmaRatio; + } + + /** Returns the current adjusted block size. Visible for testing. */ + int getAdjustedBlockSize() { + return adjustedBlockSize; + } + + /** Returns the current alpha value. Visible for testing. */ + double getAlpha() { + return alpha; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestEWMABlockSizePredicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestEWMABlockSizePredicator.java new file mode 100644 index 000000000000..1af821a31bad --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestEWMABlockSizePredicator.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ IOTests.class, SmallTests.class }) +public class TestEWMABlockSizePredicator { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestEWMABlockSizePredicator.class); + + private static final int BLOCK_SIZE_64KB = 64 * 1024; + private static final int BLOCK_SIZE_1MB = 1024 * 1024; + + private static HFileContext contextWithBlockSize(int blockSize) { + return new HFileContextBuilder().withBlockSize(blockSize).build(); + } + + /** + * Verify that double-precision arithmetic preserves fractional compression ratios and that the + * adjusted block size reflects the full ratio. + */ + @Test + public void testDoublePrecisionRatio() { + EWMABlockSizePredicator predicator = new EWMABlockSizePredicator(); + HFileContext ctx = contextWithBlockSize(BLOCK_SIZE_64KB); + + // 3.4:1 ratio — the fractional part matters + predicator.updateLatestBlockSizes(ctx, 68000, 20000); + + assertEquals(3.4, predicator.getEwmaRatio(), 0.001); + int expectedAdjusted = (int) (BLOCK_SIZE_64KB * 3.4); + assertEquals(expectedAdjusted, predicator.getAdjustedBlockSize()); + } + + /** + * Feed a sequence of blocks with a consistent ratio and verify the EWMA remains stable. + */ + @Test + public void testConvergenceWithConstantRatio() { + EWMABlockSizePredicator predicator = new EWMABlockSizePredicator(); + HFileContext ctx = contextWithBlockSize(BLOCK_SIZE_64KB); + + for (int i = 0; i < 5; i++) { + predicator.updateLatestBlockSizes(ctx, 60000, 20000); // 3.0:1 + assertEquals("EWMA should be stable at block " + (i + 1), 3.0, predicator.getEwmaRatio(), + 0.001); + } + + int expectedAdjusted = (int) (BLOCK_SIZE_64KB * 3.0); + assertEquals(expectedAdjusted, predicator.getAdjustedBlockSize()); + } + + /** + * Feed blocks with alternating high/low compression ratios and verify the EWMA converges toward + * the mean and that the adjusted block size swing decreases over successive pairs. + */ + @Test + public void testSmoothingUnderVariance() { + EWMABlockSizePredicator predicator = new EWMABlockSizePredicator(); + HFileContext ctx = contextWithBlockSize(BLOCK_SIZE_64KB); + double meanRatio = 3.0; + + // Alternating ratios: 4.0:1 and 2.0:1 (mean = 3.0) + int[][] blocks = { { 80000, 20000 }, // 4.0:1 + { 40000, 20000 }, // 2.0:1 + { 80000, 20000 }, // 4.0:1 + { 40000, 20000 }, // 2.0:1 + { 80000, 20000 }, // 4.0:1 + { 40000, 20000 }, // 2.0:1 + }; + + int lastAdj = 0; + int firstPairSwing = 0; + int lastPairSwing = 0; + + for (int i = 0; i < blocks.length; i++) { + predicator.updateLatestBlockSizes(ctx, blocks[i][0], blocks[i][1]); + int adj = predicator.getAdjustedBlockSize(); + + if (i > 0) { + int swing = Math.abs(adj - lastAdj); + if (i <= 2) { + firstPairSwing += swing; + } + if (i >= blocks.length - 2) { + lastPairSwing += swing; + } + } + lastAdj = adj; + } + + assertTrue("Swing should decrease as EWMA converges: first pair swing=" + firstPairSwing + + " last pair swing=" + lastPairSwing, lastPairSwing < firstPairSwing); + + // After several alternating blocks, the ratio should be near the mean. + // With alpha=0.5 the EWMA is biased toward the most recent sample, so the + // tolerance must account for ending on a low-ratio block (exact value: 2.6875). + assertEquals(meanRatio, predicator.getEwmaRatio(), 0.5); + } + + /** + * Before any block has been written, {@code shouldFinishBlock} returns {@code true} (falling + * through to default sizing). After the first block, the predicator gates on the adjusted size. + */ + @Test + public void testColdStartBehavior() { + EWMABlockSizePredicator predicator = new EWMABlockSizePredicator(); + + assertTrue("Cold start: shouldFinishBlock should return true before initialization", + predicator.shouldFinishBlock(BLOCK_SIZE_64KB)); + assertTrue("Cold start: shouldFinishBlock should return true for any size", + predicator.shouldFinishBlock(1)); + + HFileContext ctx = contextWithBlockSize(BLOCK_SIZE_64KB); + predicator.updateLatestBlockSizes(ctx, 68000, 20000); // 3.4:1 + + int adjustedSize = predicator.getAdjustedBlockSize(); + + assertFalse("After init: block below configured size should not finish", + predicator.shouldFinishBlock(BLOCK_SIZE_64KB - 1)); + assertFalse("After init: block at configured size should not finish (needs to grow)", + predicator.shouldFinishBlock(BLOCK_SIZE_64KB)); + assertTrue("After init: block at adjusted size should finish", + predicator.shouldFinishBlock(adjustedSize)); + assertTrue("After init: block above adjusted size should finish", + predicator.shouldFinishBlock(adjustedSize + 1)); + } + + /** + * Verify the predicator works correctly with a large configured block size. + */ + @Test + public void testLargeBlockSize() { + EWMABlockSizePredicator predicator = new EWMABlockSizePredicator(); + HFileContext ctx = contextWithBlockSize(BLOCK_SIZE_1MB); + + predicator.updateLatestBlockSizes(ctx, 340000, 100000); // 3.4:1 + + assertEquals(3.4, predicator.getEwmaRatio(), 0.001); + int expectedAdjusted = (int) (BLOCK_SIZE_1MB * 3.4); + assertEquals(expectedAdjusted, predicator.getAdjustedBlockSize()); + } + + /** + * Verify that the EWMA alpha can be configured. A lower alpha should smooth more aggressively, + * producing a slower response to a sudden ratio change. + */ + @Test + public void testConfigurableAlpha() { + Configuration conf = new Configuration(false); + conf.setDouble(EWMABlockSizePredicator.EWMA_ALPHA_KEY, 0.2); + + EWMABlockSizePredicator predicator = new EWMABlockSizePredicator(); + predicator.setConf(conf); + assertEquals(0.2, predicator.getAlpha(), 0.001); + + HFileContext ctx = contextWithBlockSize(BLOCK_SIZE_64KB); + + // Seed with 3.0:1 + predicator.updateLatestBlockSizes(ctx, 60000, 20000); + assertEquals(3.0, predicator.getEwmaRatio(), 0.001); + + // Spike to 5.0:1 — with alpha=0.2: ewma = 0.2*5.0 + 0.8*3.0 = 3.4 + predicator.updateLatestBlockSizes(ctx, 100000, 20000); + assertEquals(3.4, predicator.getEwmaRatio(), 0.001); + + // With default alpha=0.5 the same spike yields 4.0 — lower alpha is more conservative + EWMABlockSizePredicator fast = new EWMABlockSizePredicator(); + fast.updateLatestBlockSizes(ctx, 60000, 20000); + fast.updateLatestBlockSizes(ctx, 100000, 20000); + assertEquals(4.0, fast.getEwmaRatio(), 0.001); + + assertTrue("Lower alpha should dampen the spike more", + predicator.getEwmaRatio() < fast.getEwmaRatio()); + } + + /** + * The default alpha is used when no configuration is set or when the configuration omits the + * alpha key. + */ + @Test + public void testDefaultAlphaWithoutConfiguration() { + EWMABlockSizePredicator predicator = new EWMABlockSizePredicator(); + assertEquals(EWMABlockSizePredicator.DEFAULT_ALPHA, predicator.getAlpha(), 0.0); + + Configuration emptyConf = new Configuration(false); + predicator.setConf(emptyConf); + assertEquals(EWMABlockSizePredicator.DEFAULT_ALPHA, predicator.getAlpha(), 0.0); + } + + /** + * Verify that {@code compressed <= 0} is handled gracefully: the update is skipped and the EWMA + * state is not corrupted. + */ + @Test + public void testCompressedSizeZeroOrNegative() { + EWMABlockSizePredicator predicator = new EWMABlockSizePredicator(); + HFileContext ctx = contextWithBlockSize(BLOCK_SIZE_64KB); + + // compressed=0 before initialization — should remain uninitialized + predicator.updateLatestBlockSizes(ctx, 68000, 0); + assertTrue("Should still be in cold-start state after compressed=0", + predicator.shouldFinishBlock(BLOCK_SIZE_64KB)); + + // Initialize with a valid block + predicator.updateLatestBlockSizes(ctx, 68000, 20000); + double ratioAfterInit = predicator.getEwmaRatio(); + int adjustedAfterInit = predicator.getAdjustedBlockSize(); + assertEquals(3.4, ratioAfterInit, 0.001); + + // compressed=0 after initialization — state should be unchanged + predicator.updateLatestBlockSizes(ctx, 68000, 0); + assertEquals(ratioAfterInit, predicator.getEwmaRatio(), 0.0); + assertEquals(adjustedAfterInit, predicator.getAdjustedBlockSize()); + + // compressed=-1 — state should be unchanged + predicator.updateLatestBlockSizes(ctx, 68000, -1); + assertEquals(ratioAfterInit, predicator.getEwmaRatio(), 0.0); + assertEquals(adjustedAfterInit, predicator.getAdjustedBlockSize()); + } +}