Skip to content

Commit a4fbb0c

Browse files
authored
feat(db/rocksdb): improve resource management with try-with-resources (#6472)
* feat(db/rocksdb): improve resource management with try-with-resources * feat(db/rocksdb):reactor code for ReadOptions and Options
1 parent 48cad61 commit a4fbb0c

File tree

22 files changed

+273
-127
lines changed

22 files changed

+273
-127
lines changed

actuator/src/main/java/org/tron/core/vm/repository/WriteOptionsWrapper.java

Lines changed: 0 additions & 24 deletions
This file was deleted.

chainbase/src/main/java/org/tron/common/storage/WriteOptionsWrapper.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.tron.common.storage;
22

3-
public class WriteOptionsWrapper {
3+
import java.io.Closeable;
4+
5+
public class WriteOptionsWrapper implements Closeable {
46

57
public org.rocksdb.WriteOptions rocks = null;
68
public org.iq80.leveldb.WriteOptions level = null;
@@ -9,6 +11,23 @@ private WriteOptionsWrapper() {
911

1012
}
1113

14+
/**
15+
* Returns an WriteOptionsWrapper.
16+
*
17+
* <p><b>CRITICAL:</b> The returned WriteOptionsWrapper holds native resources
18+
* and <b>MUST</b> be closed
19+
* after use to prevent memory leaks. It is strongly recommended to use a try-with-resources
20+
* statement.
21+
*
22+
* <p>Example of correct usage:
23+
* <pre>{@code
24+
* try ( WriteOptionsWrapper readOptions = WriteOptionsWrapper.getInstance()) {
25+
* // do something
26+
* }
27+
* }</pre>
28+
*
29+
* @return a new WriteOptionsWrapper that must be closed.
30+
*/
1231
public static WriteOptionsWrapper getInstance() {
1332
WriteOptionsWrapper wrapper = new WriteOptionsWrapper();
1433
wrapper.level = new org.iq80.leveldb.WriteOptions();
@@ -23,4 +42,12 @@ public WriteOptionsWrapper sync(boolean bool) {
2342
this.rocks.setSync(bool);
2443
return this;
2544
}
45+
46+
@Override
47+
public void close() {
48+
if (rocks != null) {
49+
rocks.close();
50+
}
51+
// leveldb WriteOptions has no close method, and does not need to be closed
52+
}
2653
}

chainbase/src/main/java/org/tron/common/storage/rocksdb/RocksDbDataSourceImpl.java

Lines changed: 76 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class RocksDbDataSourceImpl extends DbStat implements DbSourceInter<byte[
5252
private volatile boolean alive;
5353
private String parentPath;
5454
private ReadWriteLock resetDbLock = new ReentrantReadWriteLock();
55+
private Options options;
5556

5657
public RocksDbDataSourceImpl(String parentPath, String name) {
5758
this.dataBaseName = name;
@@ -78,6 +79,9 @@ public void closeDB() {
7879
if (!isAlive()) {
7980
return;
8081
}
82+
if (this.options != null) {
83+
this.options.close();
84+
}
8185
database.close();
8286
alive = false;
8387
} catch (Exception e) {
@@ -117,7 +121,8 @@ private static void checkArgNotNull(Object value, String name) {
117121
@Override
118122
public Set<byte[]> allKeys() throws RuntimeException {
119123
resetDbLock.readLock().lock();
120-
try (final RocksIterator iter = getRocksIterator()) {
124+
try (final ReadOptions readOptions = getReadOptions();
125+
final RocksIterator iter = getRocksIterator(readOptions)) {
121126
Set<byte[]> result = Sets.newHashSet();
122127
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
123128
result.add(iter.key());
@@ -133,7 +138,8 @@ public Set<byte[]> allKeys() throws RuntimeException {
133138
@Override
134139
public Set<byte[]> allValues() throws RuntimeException {
135140
resetDbLock.readLock().lock();
136-
try (final RocksIterator iter = getRocksIterator()) {
141+
try (final ReadOptions readOptions = getReadOptions();
142+
final RocksIterator iter = getRocksIterator(readOptions)) {
137143
Set<byte[]> result = Sets.newHashSet();
138144
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
139145
result.add(iter.value());
@@ -149,7 +155,8 @@ public Set<byte[]> allValues() throws RuntimeException {
149155
@Override
150156
public long getTotal() throws RuntimeException {
151157
resetDbLock.readLock().lock();
152-
try (final RocksIterator iter = getRocksIterator()) {
158+
try (final ReadOptions readOptions = getReadOptions();
159+
final RocksIterator iter = getRocksIterator(readOptions)) {
153160
long total = 0;
154161
for (iter.seekToFirst(); iter.isValid(); iter.next()) {
155162
total++;
@@ -180,7 +187,7 @@ private void initDB() {
180187
throw new IllegalArgumentException("No name set to the dbStore");
181188
}
182189

183-
try (Options options = RocksDbSettings.getOptionsByDbName(dataBaseName)) {
190+
try {
184191
logger.debug("Opening database {}.", dataBaseName);
185192
final Path dbPath = getDbPath();
186193

@@ -191,14 +198,19 @@ private void initDB() {
191198
try {
192199
DbSourceInter.checkOrInitEngine(getEngine(), dbPath.toString(),
193200
TronError.ErrCode.ROCKSDB_INIT);
194-
database = RocksDB.open(options, dbPath.toString());
201+
this.options = RocksDbSettings.getOptionsByDbName(dataBaseName);
202+
database = RocksDB.open(this.options, dbPath.toString());
195203
} catch (RocksDBException e) {
196204
if (Objects.equals(e.getStatus().getCode(), Status.Code.Corruption)) {
197205
logger.error("Database {} corrupted, please delete database directory({}) "
198206
+ "and restart.", dataBaseName, parentPath, e);
199207
} else {
200208
logger.error("Open Database {} failed", dataBaseName, e);
201209
}
210+
211+
if (this.options != null) {
212+
this.options.close();
213+
}
202214
throw new TronError(e, TronError.ErrCode.ROCKSDB_INIT);
203215
}
204216

@@ -282,7 +294,8 @@ public boolean flush() {
282294
*/
283295
@Override
284296
public org.tron.core.db.common.iterator.DBIterator iterator() {
285-
return new RockStoreIterator(getRocksIterator());
297+
ReadOptions readOptions = getReadOptions();
298+
return new RockStoreIterator(getRocksIterator(readOptions), readOptions);
286299
}
287300

288301
private void updateByBatchInner(Map<byte[], byte[]> rows, WriteOptions options)
@@ -308,7 +321,9 @@ public void updateByBatch(Map<byte[], byte[]> rows, WriteOptionsWrapper optionsW
308321

309322
@Override
310323
public void updateByBatch(Map<byte[], byte[]> rows) {
311-
this.updateByBatch(rows, new WriteOptions());
324+
try (WriteOptions writeOptions = new WriteOptions()) {
325+
this.updateByBatch(rows, writeOptions);
326+
}
312327
}
313328

314329
private void updateByBatch(Map<byte[], byte[]> rows, WriteOptions options) {
@@ -331,7 +346,8 @@ public List<byte[]> getKeysNext(byte[] key, long limit) {
331346
return new ArrayList<>();
332347
}
333348
resetDbLock.readLock().lock();
334-
try (RocksIterator iter = getRocksIterator()) {
349+
try (final ReadOptions readOptions = getReadOptions();
350+
final RocksIterator iter = getRocksIterator(readOptions)) {
335351
List<byte[]> result = new ArrayList<>();
336352
long i = 0;
337353
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
@@ -348,7 +364,8 @@ public Map<byte[], byte[]> getNext(byte[] key, long limit) {
348364
return Collections.emptyMap();
349365
}
350366
resetDbLock.readLock().lock();
351-
try (RocksIterator iter = getRocksIterator()) {
367+
try (final ReadOptions readOptions = getReadOptions();
368+
final RocksIterator iter = getRocksIterator(readOptions)) {
352369
Map<byte[], byte[]> result = new HashMap<>();
353370
long i = 0;
354371
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
@@ -363,7 +380,8 @@ public Map<byte[], byte[]> getNext(byte[] key, long limit) {
363380
@Override
364381
public Map<WrappedByteArray, byte[]> prefixQuery(byte[] key) {
365382
resetDbLock.readLock().lock();
366-
try (RocksIterator iterator = getRocksIterator()) {
383+
try (final ReadOptions readOptions = getReadOptions();
384+
final RocksIterator iterator = getRocksIterator(readOptions)) {
367385
Map<WrappedByteArray, byte[]> result = new HashMap<>();
368386
for (iterator.seek(key); iterator.isValid(); iterator.next()) {
369387
if (Bytes.indexOf(iterator.key(), key) == 0) {
@@ -383,7 +401,8 @@ public Set<byte[]> getlatestValues(long limit) {
383401
return Sets.newHashSet();
384402
}
385403
resetDbLock.readLock().lock();
386-
try (RocksIterator iter = getRocksIterator()) {
404+
try (final ReadOptions readOptions = getReadOptions();
405+
final RocksIterator iter = getRocksIterator(readOptions)) {
387406
Set<byte[]> result = Sets.newHashSet();
388407
long i = 0;
389408
for (iter.seekToLast(); iter.isValid() && i < limit; iter.prev(), i++) {
@@ -400,7 +419,8 @@ public Set<byte[]> getValuesNext(byte[] key, long limit) {
400419
return Sets.newHashSet();
401420
}
402421
resetDbLock.readLock().lock();
403-
try (RocksIterator iter = getRocksIterator()) {
422+
try (final ReadOptions readOptions = getReadOptions();
423+
final RocksIterator iter = getRocksIterator(readOptions)) {
404424
Set<byte[]> result = Sets.newHashSet();
405425
long i = 0;
406426
for (iter.seek(key); iter.isValid() && i < limit; iter.next(), i++) {
@@ -419,11 +439,50 @@ public void backup(String dir) throws RocksDBException {
419439
}
420440
}
421441

422-
private RocksIterator getRocksIterator() {
423-
try (ReadOptions readOptions = new ReadOptions().setFillCache(false)) {
424-
throwIfNotAlive();
425-
return database.newIterator(readOptions);
426-
}
442+
/**
443+
* Returns an iterator over the database.
444+
*
445+
* <p><b>CRITICAL:</b> The returned iterator holds native resources and <b>MUST</b> be closed
446+
* after use to prevent memory leaks. It is strongly recommended to use a try-with-resources
447+
* statement.
448+
*
449+
* <p>Example of correct usage:
450+
* <pre>{@code
451+
* try ( ReadOptions readOptions = new ReadOptions().setFillCache(false);
452+
* RocksIterator iterator = getRocksIterator(readOptions)) {
453+
* iterator.seekToFirst();
454+
* // do something
455+
* }
456+
* }</pre>
457+
*
458+
* @return a new database iterator that must be closed.
459+
*/
460+
private RocksIterator getRocksIterator(ReadOptions readOptions) {
461+
throwIfNotAlive();
462+
return database.newIterator(readOptions);
463+
}
464+
465+
/**
466+
* Returns an ReadOptions.
467+
*
468+
* <p><b>CRITICAL:</b> The returned ReadOptions holds native resources and <b>MUST</b> be closed
469+
* after use to prevent memory leaks. It is strongly recommended to use a try-with-resources
470+
* statement.
471+
*
472+
* <p>Example of correct usage:
473+
* <pre>{@code
474+
* try (ReadOptions readOptions = getReadOptions();
475+
* RocksIterator iterator = getRocksIterator(readOptions)) {
476+
* iterator.seekToFirst();
477+
* // do something
478+
* }
479+
* }</pre>
480+
*
481+
* @return a new database iterator that must be closed.
482+
*/
483+
private ReadOptions getReadOptions() {
484+
throwIfNotAlive();
485+
return new ReadOptions().setFillCache(false);
427486
}
428487

429488
public boolean deleteDbBakPath(String dir) {

chainbase/src/main/java/org/tron/core/db/TronDatabase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public abstract class TronDatabase<T> implements ITronChainBase<T> {
2727
protected DbSourceInter<byte[]> dbSource;
2828
@Getter
2929
private String dbName;
30-
private WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance()
30+
private final WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance()
3131
.sync(CommonParameter.getInstance().getStorage().isDbSync());
3232

3333
@Autowired
@@ -77,6 +77,7 @@ public void reset() {
7777
public void close() {
7878
logger.info("******** Begin to close {}. ********", getName());
7979
try {
80+
writeOptions.close();
8081
dbSource.closeDB();
8182
} catch (Exception e) {
8283
logger.warn("Failed to close {}.", getName(), e);

chainbase/src/main/java/org/tron/core/db/common/iterator/RockStoreIterator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.NoSuchElementException;
66
import java.util.concurrent.atomic.AtomicBoolean;
77
import lombok.extern.slf4j.Slf4j;
8+
import org.rocksdb.ReadOptions;
89
import org.rocksdb.RocksIterator;
910

1011

@@ -15,14 +16,17 @@ public final class RockStoreIterator implements DBIterator {
1516
private boolean first = true;
1617

1718
private final AtomicBoolean close = new AtomicBoolean(false);
19+
private final ReadOptions readOptions;
1820

19-
public RockStoreIterator(RocksIterator dbIterator) {
21+
public RockStoreIterator(RocksIterator dbIterator, ReadOptions readOptions) {
22+
this.readOptions = readOptions;
2023
this.dbIterator = dbIterator;
2124
}
2225

2326
@Override
2427
public void close() throws IOException {
2528
if (close.compareAndSet(false, true)) {
29+
readOptions.close();
2630
dbIterator.close();
2731
}
2832
}

chainbase/src/main/java/org/tron/core/db2/common/LevelDB.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class LevelDB implements DB<byte[], byte[]>, Flusher {
1313

1414
@Getter
1515
private LevelDbDataSourceImpl db;
16-
private WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance()
16+
private final WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance()
1717
.sync(CommonParameter.getInstance().getStorage().isDbSync());
1818

1919
public LevelDB(LevelDbDataSourceImpl db) {
@@ -65,6 +65,7 @@ public void flush(Map<WrappedByteArray, WrappedByteArray> batch) {
6565

6666
@Override
6767
public void close() {
68+
this.writeOptions.close();
6869
db.closeDB();
6970
}
7071

chainbase/src/main/java/org/tron/core/db2/common/RocksDB.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class RocksDB implements DB<byte[], byte[]>, Flusher {
1414
@Getter
1515
private RocksDbDataSourceImpl db;
1616

17-
private WriteOptionsWrapper optionsWrapper = WriteOptionsWrapper.getInstance()
17+
private final WriteOptionsWrapper writeOptions = WriteOptionsWrapper.getInstance()
1818
.sync(CommonParameter.getInstance().getStorage().isDbSync());
1919

2020
public RocksDB(RocksDbDataSourceImpl db) {
@@ -61,11 +61,12 @@ public void flush(Map<WrappedByteArray, WrappedByteArray> batch) {
6161
Map<byte[], byte[]> rows = batch.entrySet().stream()
6262
.map(e -> Maps.immutableEntry(e.getKey().getBytes(), e.getValue().getBytes()))
6363
.collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll);
64-
db.updateByBatch(rows, optionsWrapper);
64+
db.updateByBatch(rows, writeOptions);
6565
}
6666

6767
@Override
6868
public void close() {
69+
writeOptions.close();
6970
db.closeDB();
7071
}
7172

chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.tron.common.error.TronDBException;
3030
import org.tron.common.es.ExecutorServiceManager;
3131
import org.tron.common.parameter.CommonParameter;
32-
import org.tron.common.storage.WriteOptionsWrapper;
3332
import org.tron.common.utils.FileUtil;
3433
import org.tron.common.utils.StorageUtils;
3534
import org.tron.core.db.RevokingDatabase;
@@ -357,7 +356,6 @@ public void flush() {
357356

358357
public void createCheckpoint() {
359358
TronDatabase<byte[]> checkPointStore = null;
360-
boolean syncFlag;
361359
try {
362360
Map<WrappedByteArray, WrappedByteArray> batch = new HashMap<>();
363361
for (Chainbase db : dbs) {
@@ -389,16 +387,13 @@ public void createCheckpoint() {
389387
if (isV2Open()) {
390388
String dbName = String.valueOf(System.currentTimeMillis());
391389
checkPointStore = getCheckpointDB(dbName);
392-
syncFlag = CommonParameter.getInstance().getStorage().isCheckpointSync();
393390
} else {
394391
checkPointStore = checkTmpStore;
395-
syncFlag = CommonParameter.getInstance().getStorage().isDbSync();
396392
}
397393

398-
checkPointStore.getDbSource().updateByBatch(batch.entrySet().stream()
394+
checkPointStore.updateByBatch(batch.entrySet().stream()
399395
.map(e -> Maps.immutableEntry(e.getKey().getBytes(), e.getValue().getBytes()))
400-
.collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll),
401-
WriteOptionsWrapper.getInstance().sync(syncFlag));
396+
.collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll));
402397

403398
} catch (Exception e) {
404399
throw new TronDBException(e);

0 commit comments

Comments
 (0)