Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public static void main(String[] args) throws Exception {

// wait for all tablets to reach the expected sum file size
tableNames.stream().parallel().forEach(tableName -> {
int elapsedMillis = 0;
long elapsedMillis = 0;
long sleepMillis = SECONDS.toMillis(1);
try {
// wait for each tablet to reach the expected sum file size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void visit(State state, RandWalkEnv env, Properties props) throws Excepti

try (BatchScanner scanner =
client.createBatchScanner(state.getString("seqTableName"), new Authorizations(), 2)) {
int count = 0;
long count = 0;
List<Range> ranges = new ArrayList<>();
while (count < numVerify) {
long rangeStart = env.getRandom().nextInt((int) numWrites);
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/org/apache/accumulo/testing/stress/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,21 @@
public class DataWriter extends Stream<Void> implements AutoCloseable {
private final BatchWriter writer;
private final RandomMutations mutations;
private final Cleanable cleanable;
private Cleanable cleanable;
private final AtomicBoolean closed = new AtomicBoolean(false);

private static final Logger log = LoggerFactory.getLogger(DataWriter.class);

public DataWriter(BatchWriter writer, RandomMutations mutations) {
private DataWriter(BatchWriter writer, RandomMutations mutations) {
this.writer = writer;
this.mutations = mutations;
this.cleanable = CleanerUtil.unclosed(this, DataWriter.class, closed, log, writer);
}

public static DataWriter create(BatchWriter writer, RandomMutations mutations) {
DataWriter dataWriter = new DataWriter(writer, mutations);
dataWriter.cleanable =
CleanerUtil.unclosed(dataWriter, DataWriter.class, dataWriter.closed, log, writer);
return dataWriter;
}

@Override
Expand All @@ -57,12 +63,6 @@ public void close() {
// deregister cleanable, but it won't run because it checks
// the value of closed first, which is now true
cleanable.clean();
try {
writer.close();
} catch (MutationsRejectedException e) {
System.err.println("Error closing batch writer.");
e.printStackTrace();
}
}
}
}
8 changes: 6 additions & 2 deletions src/main/java/org/apache/accumulo/testing/stress/Write.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;

Expand Down Expand Up @@ -54,7 +55,7 @@ public static void main(String[] args) throws Exception {
writeDelay = 0;
}

try (DataWriter dw = new DataWriter(c.createBatchWriter(opts.tableName), new RandomMutations(
RandomMutations mutations = new RandomMutations(
// rows
new RandomByteArrays(new RandomWithinRange(opts.row_seed, opts.rowMin(), opts.rowMax())),
// cfs
Expand All @@ -67,7 +68,10 @@ public static void main(String[] args) throws Exception {
// number of cells per row
new RandomWithinRange(opts.row_width_seed, opts.rowWidthMin(), opts.rowWidthMax()),
// max cells per mutation
opts.max_cells_per_mutation))) {
opts.max_cells_per_mutation);

try (BatchWriter batchWriter = c.createBatchWriter(opts.tableName);
DataWriter dw = DataWriter.create(batchWriter, mutations)) {
while (true) {
dw.next();
if (writeDelay > 0) {
Expand Down
Loading