Skip to content

Commit 2e7c5d8

Browse files
garyrussellartembilan
authored andcommitted
GH-2974: Fix race in TcpNetConnection.getPayload()
Fixes #2974 There is a race in that we could get a `SocketException` in `inputStream`. Since this is between payloads, it needs to be thrown as a `SoftEndOfStreamException`. Also fix unnecessary `this.` in `MessageHistoryConfigurer.java`. **cherry-pick to 5.0.x, 4.3.x** * * Add javadocs to SoftEndOfStreamException # Conflicts: # spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java # Conflicts: # spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java # spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java
1 parent 3c69b26 commit 2e7c5d8

File tree

3 files changed

+89
-13
lines changed

3 files changed

+89
-13
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.integration.ip.tcp.connection;
1818

1919
import java.io.BufferedOutputStream;
20+
import java.io.IOException;
21+
import java.io.InputStream;
2022
import java.io.OutputStream;
2123
import java.net.Socket;
2224
import java.net.SocketException;
@@ -82,7 +84,8 @@ public void close() {
8284
try {
8385
this.socket.close();
8486
}
85-
catch (Exception e) { }
87+
catch (Exception e) {
88+
}
8689
super.close();
8790
}
8891

@@ -117,7 +120,15 @@ public synchronized void send(Message<?> message) throws Exception {
117120

118121
@Override
119122
public Object getPayload() throws Exception {
120-
return this.getDeserializer().deserialize(this.socket.getInputStream());
123+
InputStream inputStream;
124+
try {
125+
inputStream = this.socket.getInputStream();
126+
}
127+
catch (IOException e1) {
128+
throw new SoftEndOfStreamException("Socket closed when getting input stream", e1);
129+
}
130+
return getDeserializer()
131+
.deserialize(inputStream);
121132
}
122133

123134
@Override
@@ -184,9 +195,9 @@ public void run() {
184195
catch (NoListenerException nle) { // could also be thrown by an interceptor
185196
if (logger.isWarnEnabled()) {
186197
logger.warn("Unexpected message - no endpoint registered with connection interceptor: "
187-
+ getConnectionId()
188-
+ " - "
189-
+ message);
198+
+ getConnectionId()
199+
+ " - "
200+
+ message);
190201
}
191202
}
192203
catch (Exception e2) {
@@ -230,24 +241,24 @@ protected boolean handleReadException(Exception e) {
230241
if (noReadErrorOnClose) {
231242
if (logger.isTraceEnabled()) {
232243
logger.trace("Read exception " +
233-
this.getConnectionId(), e);
244+
this.getConnectionId(), e);
234245
}
235246
else if (logger.isDebugEnabled()) {
236247
logger.debug("Read exception " +
237-
this.getConnectionId() + " " +
238-
e.getClass().getSimpleName() +
239-
":" + (e.getCause() != null ? e.getCause() + ":" : "") + e.getMessage());
248+
this.getConnectionId() + " " +
249+
e.getClass().getSimpleName() +
250+
":" + (e.getCause() != null ? e.getCause() + ":" : "") + e.getMessage());
240251
}
241252
}
242253
else if (logger.isTraceEnabled()) {
243254
logger.error("Read exception " +
244-
this.getConnectionId(), e);
255+
this.getConnectionId(), e);
245256
}
246257
else {
247258
logger.error("Read exception " +
248-
this.getConnectionId() + " " +
249-
e.getClass().getSimpleName() +
250-
":" + (e.getCause() != null ? e.getCause() + ":" : "") + e.getMessage());
259+
this.getConnectionId() + " " +
260+
e.getClass().getSimpleName() +
261+
":" + (e.getCause() != null ? e.getCause() + ":" : "") + e.getMessage());
251262
}
252263
}
253264
this.sendExceptionToListener(e);

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/SoftEndOfStreamException.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,29 @@ public class SoftEndOfStreamException extends IOException {
3030

3131
private static final long serialVersionUID = 7309907445617226978L;
3232

33+
/**
34+
* Default constructor.
35+
*/
3336
public SoftEndOfStreamException() {
3437
super();
3538
}
3639

40+
/**
41+
* Construct an instance with the message.
42+
* @param message the message.
43+
*/
3744
public SoftEndOfStreamException(String message) {
3845
super(message);
3946
}
4047

48+
/**
49+
* Construct an instance with the message and cause.
50+
* @param message the message.
51+
* @param cause the cause.
52+
* @since 4.3.21.
53+
*/
54+
public SoftEndOfStreamException(String message, Throwable cause) {
55+
super(message, cause);
56+
}
57+
4158
}

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,30 @@
1616

1717
package org.springframework.integration.ip.tcp.connection;
1818

19+
import static org.hamcrest.CoreMatchers.instanceOf;
1920
import static org.junit.Assert.assertEquals;
2021
import static org.junit.Assert.assertNotNull;
22+
import static org.junit.Assert.assertThat;
23+
import static org.junit.Assert.assertTrue;
2124
import static org.mockito.Mockito.doAnswer;
2225
import static org.mockito.Mockito.mock;
2326
import static org.mockito.Mockito.when;
2427

2528
import java.io.ByteArrayOutputStream;
29+
import java.io.IOException;
2630
import java.io.InputStream;
2731
import java.io.PipedInputStream;
2832
import java.io.PipedOutputStream;
2933
import java.net.Socket;
3034
import java.nio.ByteBuffer;
3135
import java.nio.channels.SocketChannel;
36+
import java.util.concurrent.CountDownLatch;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.atomic.AtomicInteger;
3239
import java.util.concurrent.atomic.AtomicReference;
3340

41+
import javax.net.SocketFactory;
42+
3443
import org.apache.commons.logging.Log;
3544
import org.junit.Test;
3645
import org.mockito.Mockito;
@@ -43,6 +52,7 @@
4352
import org.springframework.integration.ip.tcp.connection.TcpNioConnection.ChannelInputStream;
4453
import org.springframework.integration.ip.tcp.serializer.ByteArrayStxEtxSerializer;
4554
import org.springframework.integration.ip.tcp.serializer.MapJsonSerializer;
55+
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
4656
import org.springframework.integration.support.MessageBuilder;
4757
import org.springframework.integration.support.converter.MapMessageConverter;
4858
import org.springframework.integration.test.util.TestUtils;
@@ -157,4 +167,42 @@ public boolean onMessage(Message<?> message) {
157167
assertEquals("baz", inboundMessage.get().getHeaders().get("bar"));
158168
}
159169

170+
@Test
171+
public void socketClosedNextRead() throws InterruptedException, IOException {
172+
TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0);
173+
AtomicInteger port = new AtomicInteger();
174+
CountDownLatch latch = new CountDownLatch(1);
175+
ApplicationEventPublisher publisher = new ApplicationEventPublisher() {
176+
177+
@Override
178+
public void publishEvent(Object ev) {
179+
if (ev instanceof TcpConnectionServerListeningEvent) {
180+
port.set(((TcpConnectionServerListeningEvent) ev).getPort());
181+
latch.countDown();
182+
}
183+
}
184+
185+
@Override
186+
public void publishEvent(ApplicationEvent event) {
187+
publishEvent((Object) event);
188+
}
189+
190+
};
191+
server.setApplicationEventPublisher(publisher);
192+
server.registerListener(message -> false);
193+
server.afterPropertiesSet();
194+
server.start();
195+
assertTrue(latch.await(10, TimeUnit.SECONDS));
196+
Socket socket = SocketFactory.getDefault().createSocket("localhost", port.get());
197+
TcpNetConnection connection = new TcpNetConnection(socket, false, false, publisher, "socketClosedNextRead");
198+
socket.close();
199+
try {
200+
connection.getPayload();
201+
}
202+
catch (Exception e) {
203+
assertThat(e, instanceOf(SoftEndOfStreamException.class));
204+
}
205+
server.stop();
206+
}
207+
160208
}

0 commit comments

Comments
 (0)