3131import io .rsocket .internal .ClientSetup ;
3232import io .rsocket .internal .ServerSetup ;
3333import io .rsocket .keepalive .KeepAliveHandler ;
34+ import io .rsocket .lease .*;
3435import io .rsocket .plugins .DuplexConnectionInterceptor ;
3536import io .rsocket .plugins .PluginRegistry ;
3637import io .rsocket .plugins .Plugins ;
4041import io .rsocket .transport .ServerTransport ;
4142import io .rsocket .util .ConnectionUtils ;
4243import io .rsocket .util .EmptyPayload ;
44+ import io .rsocket .util .MultiSubscriberRSocket ;
4345import java .time .Duration ;
4446import java .util .Objects ;
45- import java .util .function .BiFunction ;
46- import java .util .function .Consumer ;
47- import java .util .function .Function ;
48- import java .util .function .Supplier ;
47+ import java .util .function .*;
4948import reactor .core .publisher .Mono ;
5049
5150/** Factory for creating RSocket clients and servers. */
@@ -92,6 +91,8 @@ default <T extends Closeable> Start<T> transport(ServerTransport<T> transport) {
9291 }
9392
9493 public static class ClientRSocketFactory implements ClientTransportAcceptor {
94+ private static final String CLIENT_TAG = "client" ;
95+
9596 private Supplier <Function <RSocket , RSocket >> acceptor =
9697 () -> rSocket -> new AbstractRSocket () {};
9798
@@ -115,13 +116,17 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
115116 private boolean resumeCleanupStoreOnKeepAlive ;
116117 private Supplier <ByteBuf > resumeTokenSupplier = ResumeFrameFlyweight ::generateResumeToken ;
117118 private Function <? super ByteBuf , ? extends ResumableFramesStore > resumeStoreFactory =
118- token -> new InMemoryResumableFramesStore ("client" , 100_000 );
119+ token -> new InMemoryResumableFramesStore (CLIENT_TAG , 100_000 );
119120 private Duration resumeSessionDuration = Duration .ofMinutes (2 );
120121 private Duration resumeStreamTimeout = Duration .ofSeconds (10 );
121122 private Supplier <ResumeStrategy > resumeStrategySupplier =
122123 () ->
123124 new ExponentialBackoffResumeStrategy (Duration .ofSeconds (1 ), Duration .ofSeconds (16 ), 2 );
124125
126+ private boolean multiSubscriberRequester = true ;
127+ private boolean leaseEnabled ;
128+ private Supplier <Leases <?>> leasesSupplier = Leases ::new ;
129+
125130 private ByteBufAllocator allocator = ByteBufAllocator .DEFAULT ;
126131
127132 public ClientRSocketFactory byteBufAllocator (ByteBufAllocator allocator ) {
@@ -205,6 +210,22 @@ public ClientRSocketFactory metadataMimeType(String metadataMimeType) {
205210 return this ;
206211 }
207212
213+ public ClientRSocketFactory lease (Supplier <Leases <? extends LeaseStats >> leasesSupplier ) {
214+ this .leaseEnabled = true ;
215+ this .leasesSupplier = Objects .requireNonNull (leasesSupplier );
216+ return this ;
217+ }
218+
219+ public ClientRSocketFactory lease () {
220+ this .leaseEnabled = true ;
221+ return this ;
222+ }
223+
224+ public ClientRSocketFactory singleSubscriberRequester () {
225+ this .multiSubscriberRequester = false ;
226+ return this ;
227+ }
228+
208229 public ClientRSocketFactory resume () {
209230 this .resumeEnabled = true ;
210231 return this ;
@@ -300,9 +321,16 @@ public Mono<RSocket> start() {
300321 DuplexConnection wrappedConnection = clientSetup .connection ();
301322
302323 ClientServerInputMultiplexer multiplexer =
303- new ClientServerInputMultiplexer (wrappedConnection , plugins );
324+ new ClientServerInputMultiplexer (wrappedConnection , plugins , true );
325+
326+ boolean isLeaseEnabled = leaseEnabled ;
327+ Leases <?> leases = leasesSupplier .get ();
328+ RequesterLeaseHandler requesterLeaseHandler =
329+ isLeaseEnabled
330+ ? new RequesterLeaseHandler .Impl (CLIENT_TAG , leases .receiver ())
331+ : RequesterLeaseHandler .None ;
304332
305- RSocketRequester rSocketRequester =
333+ RSocket rSocketRequester =
306334 new RSocketRequester (
307335 allocator ,
308336 multiplexer .asClientConnection (),
@@ -311,12 +339,17 @@ public Mono<RSocket> start() {
311339 StreamIdSupplier .clientSupplier (),
312340 keepAliveTickPeriod (),
313341 keepAliveTimeout (),
314- keepAliveHandler );
342+ keepAliveHandler ,
343+ requesterLeaseHandler );
344+
345+ if (multiSubscriberRequester ) {
346+ rSocketRequester = new MultiSubscriberRSocket (rSocketRequester );
347+ }
315348
316349 ByteBuf setupFrame =
317350 SetupFrameFlyweight .encode (
318351 allocator ,
319- false ,
352+ isLeaseEnabled ,
320353 keepAliveTickPeriod (),
321354 keepAliveTimeout (),
322355 resumeToken ,
@@ -337,13 +370,20 @@ public Mono<RSocket> start() {
337370
338371 RSocket wrappedRSocketHandler = plugins .applyResponder (rSocketHandler );
339372
340- RSocketResponder rSocketResponder =
373+ ResponderLeaseHandler responderLeaseHandler =
374+ isLeaseEnabled
375+ ? new ResponderLeaseHandler .Impl <>(
376+ CLIENT_TAG , allocator , leases .sender (), errorConsumer , leases .stats ())
377+ : ResponderLeaseHandler .None ;
378+
379+ RSocket rSocketResponder =
341380 new RSocketResponder (
342381 allocator ,
343382 multiplexer .asServerConnection (),
344383 wrappedRSocketHandler ,
345384 payloadDecoder ,
346- errorConsumer );
385+ errorConsumer ,
386+ responderLeaseHandler );
347387
348388 return wrappedConnection .sendOne (setupFrame ).thenReturn (wrappedRSocketRequester );
349389 });
@@ -382,16 +422,23 @@ private Mono<DuplexConnection> newConnection() {
382422 }
383423
384424 public static class ServerRSocketFactory {
425+ private static final String SERVER_TAG = "server" ;
426+
385427 private SocketAcceptor acceptor ;
386428 private PayloadDecoder payloadDecoder = PayloadDecoder .DEFAULT ;
387429 private Consumer <Throwable > errorConsumer = Throwable ::printStackTrace ;
388430 private int mtu = 0 ;
389431 private PluginRegistry plugins = new PluginRegistry (Plugins .defaultPlugins ());
432+
390433 private boolean resumeSupported ;
391434 private Duration resumeSessionDuration = Duration .ofSeconds (120 );
392435 private Duration resumeStreamTimeout = Duration .ofSeconds (10 );
393436 private Function <? super ByteBuf , ? extends ResumableFramesStore > resumeStoreFactory =
394- token -> new InMemoryResumableFramesStore ("server" , 100_000 );
437+ token -> new InMemoryResumableFramesStore (SERVER_TAG , 100_000 );
438+
439+ private boolean multiSubscriberRequester = true ;
440+ private boolean leaseEnabled ;
441+ private Supplier <Leases <?>> leasesSupplier = Leases ::new ;
395442
396443 private ByteBufAllocator allocator = ByteBufAllocator .DEFAULT ;
397444 private boolean resumeCleanupStoreOnKeepAlive ;
@@ -450,6 +497,22 @@ public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
450497 return this ;
451498 }
452499
500+ public ServerRSocketFactory lease (Supplier <Leases <?>> leasesSupplier ) {
501+ this .leaseEnabled = true ;
502+ this .leasesSupplier = Objects .requireNonNull (leasesSupplier );
503+ return this ;
504+ }
505+
506+ public ServerRSocketFactory lease () {
507+ this .leaseEnabled = true ;
508+ return this ;
509+ }
510+
511+ public ServerRSocketFactory singleSubscriberRequester () {
512+ this .multiSubscriberRequester = false ;
513+ return this ;
514+ }
515+
453516 public ServerRSocketFactory resume () {
454517 this .resumeSupported = true ;
455518 return this ;
@@ -500,7 +563,7 @@ public <T extends Closeable> Start<T> transport(Supplier<ServerTransport<T>> tra
500563
501564 private Mono <Void > acceptor (ServerSetup serverSetup , DuplexConnection connection ) {
502565 ClientServerInputMultiplexer multiplexer =
503- new ClientServerInputMultiplexer (connection , plugins );
566+ new ClientServerInputMultiplexer (connection , plugins , false );
504567
505568 return multiplexer
506569 .asSetupConnection ()
@@ -541,20 +604,45 @@ private Mono<Void> acceptSetup(
541604 multiplexer .dispose ();
542605 });
543606 }
607+
608+ boolean isLeaseEnabled = leaseEnabled ;
609+
610+ if (SetupFrameFlyweight .honorLease (setupFrame ) && !isLeaseEnabled ) {
611+ return sendError (multiplexer , new InvalidSetupException ("lease is not supported" ))
612+ .doFinally (
613+ signalType -> {
614+ setupFrame .release ();
615+ multiplexer .dispose ();
616+ });
617+ }
618+
544619 return serverSetup .acceptRSocketSetup (
545620 setupFrame ,
546621 multiplexer ,
547622 (keepAliveHandler , wrappedMultiplexer ) -> {
548623 ConnectionSetupPayload setupPayload = ConnectionSetupPayload .create (setupFrame );
549624
550- RSocketRequester rSocketRequester =
625+ Leases <?> leases = leasesSupplier .get ();
626+ RequesterLeaseHandler requesterLeaseHandler =
627+ isLeaseEnabled
628+ ? new RequesterLeaseHandler .Impl (SERVER_TAG , leases .receiver ())
629+ : RequesterLeaseHandler .None ;
630+
631+ RSocket rSocketRequester =
551632 new RSocketRequester (
552633 allocator ,
553634 wrappedMultiplexer .asServerConnection (),
554635 payloadDecoder ,
555636 errorConsumer ,
556- StreamIdSupplier .serverSupplier ());
557-
637+ StreamIdSupplier .serverSupplier (),
638+ setupPayload .keepAliveInterval (),
639+ setupPayload .keepAliveMaxLifetime (),
640+ keepAliveHandler ,
641+ requesterLeaseHandler );
642+
643+ if (multiSubscriberRequester ) {
644+ rSocketRequester = new MultiSubscriberRSocket (rSocketRequester );
645+ }
558646 RSocket wrappedRSocketRequester = plugins .applyRequester (rSocketRequester );
559647
560648 return acceptor
@@ -565,16 +653,24 @@ private Mono<Void> acceptSetup(
565653 rSocketHandler -> {
566654 RSocket wrappedRSocketHandler = plugins .applyResponder (rSocketHandler );
567655
568- RSocketResponder rSocketResponder =
656+ ResponderLeaseHandler responderLeaseHandler =
657+ isLeaseEnabled
658+ ? new ResponderLeaseHandler .Impl <>(
659+ SERVER_TAG ,
660+ allocator ,
661+ leases .sender (),
662+ errorConsumer ,
663+ leases .stats ())
664+ : ResponderLeaseHandler .None ;
665+
666+ RSocket rSocketResponder =
569667 new RSocketResponder (
570668 allocator ,
571669 wrappedMultiplexer .asClientConnection (),
572670 wrappedRSocketHandler ,
573671 payloadDecoder ,
574672 errorConsumer ,
575- setupPayload .keepAliveInterval (),
576- setupPayload .keepAliveMaxLifetime (),
577- keepAliveHandler );
673+ responderLeaseHandler );
578674 })
579675 .doFinally (signalType -> setupPayload .release ())
580676 .then ();
0 commit comments