11/*
2- * Copyright 2016-2023 the original author or authors.
2+ * Copyright 2016-2025 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
@@ -97,37 +97,45 @@ private void testAllData(FileListFilter<String> filter, boolean nullFilter) thro
9797 }
9898 streamer .afterPropertiesSet ();
9999 streamer .start ();
100- Message <byte []> received = (Message <byte []>) this .transformer .transform (streamer .receive ());
100+ Message <InputStream > inputStreamMessage = streamer .receive ();
101+ Message <byte []> received = (Message <byte []>) this .transformer .transform (inputStreamMessage );
101102 assertThat (received .getPayload ()).isEqualTo ("foo\n bar" .getBytes ());
102- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_DIRECTORY )).isEqualTo ("/foo" );
103- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_FILE )).isEqualTo ("foo" );
103+ assertThat (received .getHeaders ())
104+ .containsEntry (FileHeaders .REMOTE_DIRECTORY , "/foo" )
105+ .containsEntry (FileHeaders .REMOTE_FILE , "foo" )
106+ .doesNotContainKey (IntegrationMessageHeaderAccessor .CLOSEABLE_RESOURCE );
104107 String fileInfo = (String ) received .getHeaders ().get (FileHeaders .REMOTE_FILE_INFO );
105- assertThat (fileInfo ).contains ("remoteDirectory\" :\" /foo" );
106- assertThat (fileInfo ).contains ("permissions\" :\" -rw-rw-rw" );
107- assertThat (fileInfo ).contains ("size\" :42" );
108- assertThat (fileInfo ).contains ("directory\" :false" );
109- assertThat (fileInfo ).contains ("filename\" :\" foo" );
110- assertThat (fileInfo ).contains ("modified\" :42000" );
111- assertThat (fileInfo ).contains ("link\" :false" );
108+ assertThat (fileInfo )
109+ .contains ("remoteDirectory\" :\" /foo" )
110+ .contains ("permissions\" :\" -rw-rw-rw" )
111+ .contains ("size\" :42" )
112+ .contains ("directory\" :false" )
113+ .contains ("filename\" :\" foo" )
114+ .contains ("modified\" :42000" )
115+ .contains ("link\" :false" );
112116
113117 // close after list, transform
114- verify (StaticMessageHeaderAccessor .getCloseableResource (received ), times (2 )).close ();
118+ verify (StaticMessageHeaderAccessor .getCloseableResource (inputStreamMessage ), times (2 )).close ();
115119
116- received = (Message <byte []>) this .transformer .transform (streamer .receive ());
120+ inputStreamMessage = streamer .receive ();
121+ received = (Message <byte []>) this .transformer .transform (inputStreamMessage );
117122 assertThat (received .getPayload ()).isEqualTo ("baz\n qux" .getBytes ());
118- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_DIRECTORY )).isEqualTo ("/foo" );
119- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_FILE )).isEqualTo ("bar" );
123+ assertThat (received .getHeaders ())
124+ .containsEntry (FileHeaders .REMOTE_DIRECTORY , "/foo" )
125+ .containsEntry (FileHeaders .REMOTE_FILE , "bar" )
126+ .doesNotContainKey (IntegrationMessageHeaderAccessor .CLOSEABLE_RESOURCE );
120127 fileInfo = (String ) received .getHeaders ().get (FileHeaders .REMOTE_FILE_INFO );
121- assertThat (fileInfo ).contains ("remoteDirectory\" :\" /foo" );
122- assertThat (fileInfo ).contains ("permissions\" :\" -rw-rw-rw" );
123- assertThat (fileInfo ).contains ("size\" :42" );
124- assertThat (fileInfo ).contains ("directory\" :false" );
125- assertThat (fileInfo ).contains ("filename\" :\" bar" );
126- assertThat (fileInfo ).contains ("modified\" :42000" );
127- assertThat (fileInfo ).contains ("link\" :false" );
128+ assertThat (fileInfo )
129+ .contains ("remoteDirectory\" :\" /foo" )
130+ .contains ("permissions\" :\" -rw-rw-rw" )
131+ .contains ("size\" :42" )
132+ .contains ("directory\" :false" )
133+ .contains ("filename\" :\" bar" )
134+ .contains ("modified\" :42000" )
135+ .contains ("link\" :false" );
128136
129137 // close after transform
130- verify (StaticMessageHeaderAccessor .getCloseableResource (received ), times (3 )).close ();
138+ verify (StaticMessageHeaderAccessor .getCloseableResource (inputStreamMessage ), times (3 )).close ();
131139
132140 verify (sessionFactory .getSession ()).list ("/foo" );
133141 }
@@ -142,21 +150,25 @@ public void testAllDataMaxFetch() throws Exception {
142150 streamer .setFilter (new AcceptOnceFileListFilter <>());
143151 streamer .afterPropertiesSet ();
144152 streamer .start ();
145- Message <byte []> received = (Message <byte []>) this .transformer .transform (streamer .receive ());
153+ Message <InputStream > inputStreamMessage = streamer .receive ();
154+ Message <byte []> received = (Message <byte []>) this .transformer .transform (inputStreamMessage );
146155 assertThat (received .getPayload ()).isEqualTo ("foo\n bar" .getBytes ());
147- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_DIRECTORY )).isEqualTo ("/foo" );
148- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_FILE )).isEqualTo ("foo" );
156+ assertThat (received .getHeaders ())
157+ .containsEntry (FileHeaders .REMOTE_DIRECTORY , "/foo" )
158+ .containsEntry (FileHeaders .REMOTE_FILE , "foo" );
149159
150160 // close after list, transform
151- verify (StaticMessageHeaderAccessor .getCloseableResource (received ), times (2 )).close ();
161+ verify (StaticMessageHeaderAccessor .getCloseableResource (inputStreamMessage ), times (2 )).close ();
152162
153- received = (Message <byte []>) this .transformer .transform (streamer .receive ());
163+ inputStreamMessage = streamer .receive ();
164+ received = (Message <byte []>) this .transformer .transform (inputStreamMessage );
154165 assertThat (received .getPayload ()).isEqualTo ("baz\n qux" .getBytes ());
155- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_DIRECTORY )).isEqualTo ("/foo" );
156- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_FILE )).isEqualTo ("bar" );
166+ assertThat (received .getHeaders ())
167+ .containsEntry (FileHeaders .REMOTE_DIRECTORY , "/foo" )
168+ .containsEntry (FileHeaders .REMOTE_FILE , "bar" );
157169
158170 // close after transform
159- verify (new IntegrationMessageHeaderAccessor ( received ) .getCloseableResource (), times (3 )).close ();
171+ verify (StaticMessageHeaderAccessor .getCloseableResource (inputStreamMessage ), times (3 )).close ();
160172
161173 verify (sessionFactory .getSession ()).list ("/foo" );
162174 }
@@ -189,31 +201,35 @@ public void testLineByLine() throws Exception {
189201 splitter .handleMessage (receivedStream );
190202 Message <?> received = out .receive (0 );
191203 assertThat (received .getPayload ()).isEqualTo ("foo" );
192- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_DIRECTORY )).isEqualTo ("/foo" );
193- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_FILE )).isEqualTo ("foo" );
204+ assertThat (received .getHeaders ())
205+ .containsEntry (FileHeaders .REMOTE_DIRECTORY , "/foo" )
206+ .containsEntry (FileHeaders .REMOTE_FILE , "foo" );
194207 received = out .receive (0 );
195208 assertThat (received .getPayload ()).isEqualTo ("bar" );
196- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_DIRECTORY )).isEqualTo ("/foo" );
197- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_FILE )).isEqualTo ("foo" );
209+ assertThat (received .getHeaders ())
210+ .containsEntry (FileHeaders .REMOTE_DIRECTORY , "/foo" )
211+ .containsEntry (FileHeaders .REMOTE_FILE , "foo" );
198212 assertThat (out .receive (0 )).isNull ();
199213
200214 // close by list, splitter
201- verify (new IntegrationMessageHeaderAccessor ( receivedStream ) .getCloseableResource (), times (3 )).close ();
215+ verify (StaticMessageHeaderAccessor .getCloseableResource (receivedStream ), times (3 )).close ();
202216
203217 receivedStream = streamer .receive ();
204218 splitter .handleMessage (receivedStream );
205219 received = out .receive (0 );
206220 assertThat (received .getPayload ()).isEqualTo ("baz" );
207- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_DIRECTORY )).isEqualTo ("/foo" );
208- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_FILE )).isEqualTo ("bar" );
221+ assertThat (received .getHeaders ())
222+ .containsEntry (FileHeaders .REMOTE_DIRECTORY , "/foo" )
223+ .containsEntry (FileHeaders .REMOTE_FILE , "bar" );
209224 received = out .receive (0 );
210225 assertThat (received .getPayload ()).isEqualTo ("qux" );
211- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_DIRECTORY )).isEqualTo ("/foo" );
212- assertThat (received .getHeaders ().get (FileHeaders .REMOTE_FILE )).isEqualTo ("bar" );
226+ assertThat (received .getHeaders ())
227+ .containsEntry (FileHeaders .REMOTE_DIRECTORY , "/foo" )
228+ .containsEntry (FileHeaders .REMOTE_FILE , "bar" );
213229 assertThat (out .receive (0 )).isNull ();
214230
215231 // close by splitter
216- verify (new IntegrationMessageHeaderAccessor ( receivedStream ) .getCloseableResource (), times (5 )).close ();
232+ verify (StaticMessageHeaderAccessor .getCloseableResource (receivedStream ), times (5 )).close ();
217233 }
218234
219235 @ SuppressWarnings ("unchecked" )
0 commit comments