Skip to content

Commit 8aed63c

Browse files
authored
fix rpc close leakage (#6483)
1 parent 1b637fd commit 8aed63c

File tree

2 files changed

+58
-12
lines changed

2 files changed

+58
-12
lines changed

framework/src/main/java/org/tron/common/application/RpcService.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.grpc.netty.NettyServerBuilder;
2020
import io.grpc.protobuf.services.ProtoReflectionService;
2121
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.ExecutorService;
2223
import java.util.concurrent.TimeUnit;
2324
import lombok.extern.slf4j.Slf4j;
2425
import org.springframework.beans.factory.annotation.Autowired;
@@ -34,6 +35,7 @@
3435
public abstract class RpcService extends AbstractService {
3536

3637
private Server apiServer;
38+
private ExecutorService executorService;
3739
protected String executorName;
3840

3941
@Autowired
@@ -58,7 +60,24 @@ public void innerStart() throws Exception {
5860
@Override
5961
public void innerStop() throws Exception {
6062
if (this.apiServer != null) {
61-
this.apiServer.shutdown().awaitTermination(5, TimeUnit.SECONDS);
63+
this.apiServer.shutdown();
64+
try {
65+
if (!this.apiServer.awaitTermination(5, TimeUnit.SECONDS)) {
66+
logger.warn("gRPC server did not shutdown gracefully, forcing shutdown");
67+
this.apiServer.shutdownNow();
68+
}
69+
} catch (InterruptedException e) {
70+
logger.warn("Interrupted while waiting for gRPC server shutdown", e);
71+
this.apiServer.shutdownNow();
72+
Thread.currentThread().interrupt();
73+
}
74+
}
75+
76+
// Close executor
77+
if (this.executorService != null) {
78+
ExecutorServiceManager.shutdownAndAwaitTermination(
79+
this.executorService, this.executorName);
80+
this.executorService = null;
6281
}
6382
}
6483

@@ -76,9 +95,9 @@ protected NettyServerBuilder initServerBuilder() {
7695
NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(this.port);
7796
CommonParameter parameter = Args.getInstance();
7897
if (parameter.getRpcThreadNum() > 0) {
79-
serverBuilder = serverBuilder
80-
.executor(ExecutorServiceManager.newFixedThreadPool(
81-
this.executorName, parameter.getRpcThreadNum()));
98+
this.executorService = ExecutorServiceManager.newFixedThreadPool(
99+
this.executorName, parameter.getRpcThreadNum());
100+
serverBuilder = serverBuilder.executor(this.executorService);
82101
}
83102
// Set configs from config.conf or default value
84103
serverBuilder

framework/src/test/java/org/tron/core/services/RpcApiServicesTest.java

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.grpc.ManagedChannelBuilder;
1212
import java.io.IOException;
1313
import java.util.Objects;
14+
import java.util.concurrent.ExecutorService;
1415
import java.util.concurrent.TimeUnit;
1516
import org.junit.AfterClass;
1617
import org.junit.Assert;
@@ -53,6 +54,7 @@
5354
import org.tron.common.application.Application;
5455
import org.tron.common.application.ApplicationFactory;
5556
import org.tron.common.application.TronApplicationContext;
57+
import org.tron.common.es.ExecutorServiceManager;
5658
import org.tron.common.utils.ByteArray;
5759
import org.tron.common.utils.PublicMethod;
5860
import org.tron.common.utils.Sha256Hash;
@@ -140,6 +142,9 @@ public class RpcApiServicesTest {
140142
private static ByteString ivk;
141143
private static ByteString d;
142144

145+
private static ExecutorService executorService;
146+
private static final String executorName = "rpc-test-executor";
147+
143148
@BeforeClass
144149
public static void init() throws IOException {
145150
Args.setParam(new String[] {"-d", temporaryFolder.newFolder().toString()}, Constant.TEST_CONF);
@@ -163,16 +168,22 @@ public static void init() throws IOException {
163168
String pBFTNode = String.format("%s:%d", Constant.LOCAL_HOST,
164169
getInstance().getRpcOnPBFTPort());
165170

171+
executorService = ExecutorServiceManager.newFixedThreadPool(
172+
executorName, 3);
173+
166174
channelFull = ManagedChannelBuilder.forTarget(fullNode)
167175
.usePlaintext()
176+
.executor(executorService)
168177
.intercept(new TimeoutInterceptor(5000))
169178
.build();
170179
channelPBFT = ManagedChannelBuilder.forTarget(pBFTNode)
171180
.usePlaintext()
181+
.executor(executorService)
172182
.intercept(new TimeoutInterceptor(5000))
173183
.build();
174184
channelSolidity = ManagedChannelBuilder.forTarget(solidityNode)
175185
.usePlaintext()
186+
.executor(executorService)
176187
.intercept(new TimeoutInterceptor(5000))
177188
.build();
178189
context = new TronApplicationContext(DefaultConfig.class);
@@ -197,19 +208,35 @@ public static void init() throws IOException {
197208

198209
@AfterClass
199210
public static void destroy() {
200-
if (channelFull != null) {
201-
channelFull.shutdownNow();
202-
}
203-
if (channelPBFT != null) {
204-
channelPBFT.shutdownNow();
205-
}
206-
if (channelSolidity != null) {
207-
channelSolidity.shutdownNow();
211+
shutdownChannel(channelFull);
212+
shutdownChannel(channelPBFT);
213+
shutdownChannel(channelSolidity);
214+
215+
if (executorService != null) {
216+
ExecutorServiceManager.shutdownAndAwaitTermination(
217+
executorService, executorName);
218+
executorService = null;
208219
}
220+
209221
context.close();
210222
Args.clearParam();
211223
}
212224

225+
private static void shutdownChannel(ManagedChannel channel) {
226+
if (channel == null) {
227+
return;
228+
}
229+
try {
230+
channel.shutdown();
231+
if (!channel.awaitTermination(5, TimeUnit.SECONDS)) {
232+
channel.shutdownNow();
233+
}
234+
} catch (InterruptedException e) {
235+
channel.shutdownNow();
236+
Thread.currentThread().interrupt();
237+
}
238+
}
239+
213240
@Test
214241
public void testGetBlockByNum() {
215242
NumberMessage message = NumberMessage.newBuilder().setNum(0).build();

0 commit comments

Comments
 (0)