Skip to content

Commit 56a56a0

Browse files
RocMarshalMyasuka
authored andcommitted
[FLINK-36746][core] Fix the deadlock bug in SerializedThrowable
Co-authored-by: raoraoxiong <xiongraorao@gmail.com> Co-authored-by: Yun Tang <myasuka@live.com> This closes #27186
1 parent d0c9ed9 commit 56a56a0

File tree

2 files changed

+93
-13
lines changed

2 files changed

+93
-13
lines changed

flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,14 @@ private SerializedThrowable(Throwable exception, Set<Throwable> alreadySeen) {
6868
if (!(exception instanceof SerializedThrowable)) {
6969
// serialize and memoize the original message
7070
byte[] serialized;
71-
try {
72-
serialized = InstantiationUtil.serializeObject(exception);
73-
} catch (Throwable t) {
74-
serialized = null;
71+
// introduce the synchronization here to avoid deadlock of multi thread serializing
72+
// exceptions
73+
synchronized (SerializedThrowable.class) {
74+
try {
75+
serialized = InstantiationUtil.serializeObject(exception);
76+
} catch (Throwable t) {
77+
serialized = null;
78+
}
7579
}
7680
this.serializedException = serialized;
7781
this.cachedException = new WeakReference<>(exception);
@@ -94,7 +98,7 @@ private SerializedThrowable(Throwable exception, Set<Throwable> alreadySeen) {
9498
}
9599
}
96100
// mimic suppressed exceptions
97-
addAllSuppressed(exception.getSuppressed());
101+
this.addAllSuppressed(exception.getSuppressed(), alreadySeen);
98102
} else {
99103
// copy from that serialized throwable
100104
SerializedThrowable other = (SerializedThrowable) exception;
@@ -104,7 +108,7 @@ private SerializedThrowable(Throwable exception, Set<Throwable> alreadySeen) {
104108
this.cachedException = other.cachedException;
105109
this.setStackTrace(other.getStackTrace());
106110
this.initCause(other.getCause());
107-
this.addAllSuppressed(other.getSuppressed());
111+
this.addAllSuppressed(other.getSuppressed(), alreadySeen);
108112
}
109113
}
110114

@@ -141,15 +145,23 @@ public String getFullStringifiedStackTrace() {
141145
return fullStringifiedStackTrace;
142146
}
143147

144-
private void addAllSuppressed(Throwable[] suppressed) {
148+
/**
149+
* Add all suppressed exceptions to this exception.
150+
*
151+
* @param suppressed The suppressed exceptions to add.
152+
* @param alreadySeen The set of exceptions that have already been seen.
153+
*/
154+
private void addAllSuppressed(Throwable[] suppressed, Set<Throwable> alreadySeen) {
145155
for (Throwable s : suppressed) {
146-
SerializedThrowable serializedThrowable;
147-
if (s instanceof SerializedThrowable) {
148-
serializedThrowable = (SerializedThrowable) s;
149-
} else {
150-
serializedThrowable = new SerializedThrowable(s);
156+
if (alreadySeen.add(s)) {
157+
SerializedThrowable serializedThrowable;
158+
if (s instanceof SerializedThrowable) {
159+
serializedThrowable = (SerializedThrowable) s;
160+
} else {
161+
serializedThrowable = new SerializedThrowable(s);
162+
}
163+
this.addSuppressed(serializedThrowable);
151164
}
152-
this.addSuppressed(serializedThrowable);
153165
}
154166
}
155167

flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,22 @@
1919
package org.apache.flink.runtime.util;
2020

2121
import org.apache.flink.core.testutils.CommonTestUtils;
22+
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
2223
import org.apache.flink.testutils.ClassLoaderUtils;
2324
import org.apache.flink.util.ExceptionUtils;
2425
import org.apache.flink.util.InstantiationUtil;
2526
import org.apache.flink.util.SerializedThrowable;
2627

2728
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.Timeout;
30+
31+
import java.io.IOException;
32+
import java.net.InetSocketAddress;
33+
import java.net.SocketAddress;
34+
import java.util.ArrayList;
35+
import java.util.List;
36+
import java.util.concurrent.CountDownLatch;
37+
import java.util.concurrent.TimeUnit;
2838

2939
import static org.assertj.core.api.Assertions.assertThat;
3040
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -179,4 +189,62 @@ void testCopySuppressed() {
179189
.isInstanceOf(SerializedThrowable.class)
180190
.hasMessage("java.lang.Exception: suppressed");
181191
}
192+
193+
@Test
194+
void testCyclicSuppressedThrowableSerialized() {
195+
SerializedThrowable serializedThrowable = new SerializedThrowable(mockThrowable());
196+
assertThat(serializedThrowable).isNotNull();
197+
}
198+
199+
@Test
200+
@Timeout(value = 5, unit = TimeUnit.SECONDS)
201+
void testCyclicSuppressedThrowableConcurrentSerialized() throws InterruptedException {
202+
Throwable throwable = mockThrowable();
203+
int threadNum = 16;
204+
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
205+
List<Thread> threads = new ArrayList<>();
206+
for (int i = 0; i < threadNum; i++) {
207+
String threadName = "thread-" + i;
208+
Thread t = createThread(countDownLatch, throwable, threadName);
209+
t.start();
210+
countDownLatch.countDown();
211+
threads.add(t);
212+
}
213+
for (Thread thread : threads) {
214+
thread.join();
215+
}
216+
}
217+
218+
private static Thread createThread(
219+
CountDownLatch countDownLatch, Throwable throwable, String threadName) {
220+
Thread t =
221+
new Thread(
222+
() -> {
223+
try {
224+
countDownLatch.await();
225+
SerializedThrowable serializedThrowable =
226+
new SerializedThrowable(throwable);
227+
assertThat(serializedThrowable).isNotNull();
228+
} catch (Exception e) {
229+
throw new RuntimeException(e);
230+
}
231+
});
232+
t.setName(threadName);
233+
return t;
234+
}
235+
236+
private static Throwable mockThrowable() {
237+
SocketAddress remoteAddr = new InetSocketAddress(80);
238+
RemoteTransportException remoteTransportException =
239+
new RemoteTransportException(
240+
"Connection unexpectedly closed by remote task manager '"
241+
+ remoteAddr
242+
+ "'. "
243+
+ "This might indicate that the remote task manager was lost.",
244+
remoteAddr,
245+
new IOException("connection reset by peer."));
246+
RuntimeException runtimeException = new RuntimeException(remoteTransportException);
247+
remoteTransportException.addSuppressed(runtimeException);
248+
return remoteTransportException;
249+
}
182250
}

0 commit comments

Comments
 (0)