1515 * limitations under the License.
1616 */
1717
18- import static io .rsocket .frame .FrameHeaderCodec .frameType ;
19- import static org .hamcrest .MatcherAssert .assertThat ;
20- import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
21- import static org .hamcrest .Matchers .hasSize ;
22-
2318import io .netty .buffer .ByteBuf ;
2419import io .netty .util .CharsetUtil ;
2520import io .netty .util .ReferenceCountUtil ;
3732import java .time .Duration ;
3833import java .util .ArrayList ;
3934import java .util .Collection ;
40- import java .util .List ;
4135import java .util .Map ;
4236import java .util .concurrent .CancellationException ;
4337import java .util .concurrent .atomic .AtomicBoolean ;
5246import org .junit .jupiter .params .ParameterizedTest ;
5347import org .junit .jupiter .params .provider .Arguments ;
5448import org .junit .jupiter .params .provider .MethodSource ;
55- import org .junit .runners .model .Statement ;
5649import org .reactivestreams .Publisher ;
5750import reactor .core .Disposable ;
5851import reactor .core .publisher .Flux ;
6457import reactor .test .publisher .TestPublisher ;
6558import reactor .test .util .RaceTestUtils ;
6659import reactor .util .context .Context ;
60+ import reactor .util .context .ContextView ;
6761import reactor .util .retry .Retry ;
6862
6963public class DefaultRSocketClientTests {
@@ -75,13 +69,7 @@ public void setUp() throws Throwable {
7569 Hooks .onNextDropped (ReferenceCountUtil ::safeRelease );
7670 Hooks .onErrorDropped ((t ) -> {});
7771 rule = new ClientSocketRule ();
78- rule .apply (
79- new Statement () {
80- @ Override
81- public void evaluate () {}
82- },
83- null )
84- .evaluate ();
72+ rule .init ();
8573 }
8674
8775 @ AfterEach
@@ -179,19 +167,12 @@ public void shouldSentFrameOnResolution(
179167 @ MethodSource ("interactions" )
180168 @ SuppressWarnings ({"unchecked" , "rawtypes" })
181169 public void shouldHaveNoLeaksOnPayloadInCaseOfRacingOfOnNextAndCancel (
182- BiFunction <RSocketClient , Publisher <Payload >, Publisher <?>> request , FrameType requestType )
183- throws Throwable {
170+ BiFunction <RSocketClient , Publisher <Payload >, Publisher <?>> request , FrameType requestType ) {
184171 Assumptions .assumeThat (requestType ).isNotEqualTo (FrameType .REQUEST_CHANNEL );
185172
186173 for (int i = 0 ; i < RaceTestConstants .REPEATS ; i ++) {
187174 ClientSocketRule rule = new ClientSocketRule ();
188- rule .apply (
189- new Statement () {
190- @ Override
191- public void evaluate () {}
192- },
193- null )
194- .evaluate ();
175+ rule .init ();
195176 Payload payload = ByteBufPayload .create ("test" , "testMetadata" );
196177 TestPublisher <Payload > testPublisher =
197178 TestPublisher .createNoncompliant (TestPublisher .Violation .DEFER_CANCELLATION );
@@ -241,19 +222,12 @@ public void evaluate() {}
241222 @ MethodSource ("interactions" )
242223 @ SuppressWarnings ({"unchecked" , "rawtypes" })
243224 public void shouldHaveNoLeaksOnPayloadInCaseOfRacingOfRequestAndCancel (
244- BiFunction <RSocketClient , Publisher <Payload >, Publisher <?>> request , FrameType requestType )
245- throws Throwable {
225+ BiFunction <RSocketClient , Publisher <Payload >, Publisher <?>> request , FrameType requestType ) {
246226 Assumptions .assumeThat (requestType ).isNotEqualTo (FrameType .REQUEST_CHANNEL );
247227
248228 for (int i = 0 ; i < RaceTestConstants .REPEATS ; i ++) {
249229 ClientSocketRule rule = new ClientSocketRule ();
250- rule .apply (
251- new Statement () {
252- @ Override
253- public void evaluate () {}
254- },
255- null )
256- .evaluate ();
230+ rule .init ();
257231 ByteBuf dataBuffer = rule .allocator .buffer ();
258232 dataBuffer .writeCharSequence ("test" , CharsetUtil .UTF_8 );
259233
@@ -311,14 +285,17 @@ public void shouldPropagateDownstreamContext(
311285 Payload payload = ByteBufPayload .create (dataBuffer , metadataBuffer );
312286 AssertSubscriber assertSubscriber = new AssertSubscriber (Context .of ("test" , "test" ));
313287
314- Context [] receivedContext = new Context [1 ];
288+ ContextView [] receivedContext = new Context [1 ];
315289 Publisher <?> publisher =
316290 request .apply (
317291 rule .client ,
318292 Mono .just (payload )
319293 .mergeWith (
320- Mono .subscriberContext ()
321- .doOnNext (c -> receivedContext [0 ] = c )
294+ Mono .deferContextual (
295+ c -> {
296+ receivedContext [0 ] = c ;
297+ return Mono .empty ();
298+ })
322299 .then (Mono .empty ())));
323300 publisher .subscribe (assertSubscriber );
324301
@@ -481,16 +458,11 @@ public void shouldDisposeOriginalSource() {
481458 }
482459
483460 @ Test
484- public void shouldDisposeOriginalSourceIfRacing () throws Throwable {
461+ public void shouldDisposeOriginalSourceIfRacing () {
485462 for (int i = 0 ; i < RaceTestConstants .REPEATS ; i ++) {
486463 ClientSocketRule rule = new ClientSocketRule ();
487- rule .apply (
488- new Statement () {
489- @ Override
490- public void evaluate () {}
491- },
492- null )
493- .evaluate ();
464+
465+ rule .init ();
494466
495467 AssertSubscriber <RSocket > assertSubscriber = AssertSubscriber .create ();
496468 rule .client .source ().subscribe (assertSubscriber );
@@ -520,8 +492,8 @@ public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester
520492 protected Sinks .One <RSocket > producer ;
521493
522494 @ Override
523- protected void init () {
524- super .init ();
495+ protected void doInit () {
496+ super .doInit ();
525497 delayer = () -> producer .tryEmitValue (socket );
526498 producer = Sinks .one ();
527499 client =
@@ -547,22 +519,5 @@ protected RSocketRequester newRSocket() {
547519 __ -> null ,
548520 null );
549521 }
550-
551- public int getStreamIdForRequestType (FrameType expectedFrameType ) {
552- assertThat ("Unexpected frames sent." , connection .getSent (), hasSize (greaterThanOrEqualTo (1 )));
553- List <FrameType > framesFound = new ArrayList <>();
554- for (ByteBuf frame : connection .getSent ()) {
555- FrameType frameType = frameType (frame );
556- if (frameType == expectedFrameType ) {
557- return FrameHeaderCodec .streamId (frame );
558- }
559- framesFound .add (frameType );
560- }
561- throw new AssertionError (
562- "No frames sent with frame type: "
563- + expectedFrameType
564- + ", frames found: "
565- + framesFound );
566- }
567522 }
568523}
0 commit comments