diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java index d28f557b..920c9905 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ManySplits.java @@ -168,7 +168,7 @@ public static void main(String[] args) throws Exception { // wait for all tablets to reach the expected sum file size tableNames.stream().parallel().forEach(tableName -> { - int elapsedMillis = 0; + long elapsedMillis = 0; long sleepMillis = SECONDS.toMillis(1); try { // wait for each tablet to reach the expected sum file size diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/BatchVerify.java b/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/BatchVerify.java index 594c4ab4..e13f21a2 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/BatchVerify.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/BatchVerify.java @@ -53,7 +53,7 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti try (BatchScanner scanner = client.createBatchScanner(state.getString("seqTableName"), new Authorizations(), 2)) { - int count = 0; + long count = 0; List ranges = new ArrayList<>(); while (count < numVerify) { long rangeStart = env.getRandom().nextInt((int) numWrites); diff --git a/src/main/java/org/apache/accumulo/testing/stress/DataWriter.java b/src/main/java/org/apache/accumulo/testing/stress/DataWriter.java index 5cc4c90a..959c675b 100644 --- a/src/main/java/org/apache/accumulo/testing/stress/DataWriter.java +++ b/src/main/java/org/apache/accumulo/testing/stress/DataWriter.java @@ -30,15 +30,21 @@ public class DataWriter extends Stream implements AutoCloseable { private final BatchWriter writer; private final RandomMutations mutations; - private final Cleanable cleanable; + private Cleanable cleanable; private final AtomicBoolean closed = new AtomicBoolean(false); private static final Logger log = LoggerFactory.getLogger(DataWriter.class); - public DataWriter(BatchWriter writer, RandomMutations mutations) { + private DataWriter(BatchWriter writer, RandomMutations mutations) { this.writer = writer; this.mutations = mutations; - this.cleanable = CleanerUtil.unclosed(this, DataWriter.class, closed, log, writer); + } + + public static DataWriter create(BatchWriter writer, RandomMutations mutations) { + DataWriter dataWriter = new DataWriter(writer, mutations); + dataWriter.cleanable = + CleanerUtil.unclosed(dataWriter, DataWriter.class, dataWriter.closed, log, writer); + return dataWriter; } @Override @@ -57,12 +63,6 @@ public void close() { // deregister cleanable, but it won't run because it checks // the value of closed first, which is now true cleanable.clean(); - try { - writer.close(); - } catch (MutationsRejectedException e) { - System.err.println("Error closing batch writer."); - e.printStackTrace(); - } } } } diff --git a/src/main/java/org/apache/accumulo/testing/stress/Write.java b/src/main/java/org/apache/accumulo/testing/stress/Write.java index 6f2c71d7..de3b65fa 100644 --- a/src/main/java/org/apache/accumulo/testing/stress/Write.java +++ b/src/main/java/org/apache/accumulo/testing/stress/Write.java @@ -20,6 +20,7 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; @@ -54,7 +55,7 @@ public static void main(String[] args) throws Exception { writeDelay = 0; } - try (DataWriter dw = new DataWriter(c.createBatchWriter(opts.tableName), new RandomMutations( + RandomMutations mutations = new RandomMutations( // rows new RandomByteArrays(new RandomWithinRange(opts.row_seed, opts.rowMin(), opts.rowMax())), // cfs @@ -67,7 +68,10 @@ public static void main(String[] args) throws Exception { // number of cells per row new RandomWithinRange(opts.row_width_seed, opts.rowWidthMin(), opts.rowWidthMax()), // max cells per mutation - opts.max_cells_per_mutation))) { + opts.max_cells_per_mutation); + + try (BatchWriter batchWriter = c.createBatchWriter(opts.tableName); + DataWriter dw = DataWriter.create(batchWriter, mutations)) { while (true) { dw.next(); if (writeDelay > 0) {