2828import reactor .core .publisher .Operators ;
2929import reactor .util .context .Context ;
3030
31- /** Default implementation of {@link PooledRSocket } stored in {@link RSocketPool} */
32- final class DefaultPooledRSocket extends ResolvingOperator <RSocket >
33- implements CoreSubscriber <RSocket >, PooledRSocket {
31+ /** Default implementation of {@link WeightedRSocket } stored in {@link RSocketPool} */
32+ final class PooledWeightedRSocket extends ResolvingOperator <RSocket >
33+ implements CoreSubscriber <RSocket >, WeightedRSocket {
3434
3535 final RSocketPool parent ;
3636 final LoadbalanceRSocketSource loadbalanceRSocketSource ;
3737 final Stats stats ;
3838
3939 volatile Subscription s ;
4040
41- static final AtomicReferenceFieldUpdater <DefaultPooledRSocket , Subscription > S =
42- AtomicReferenceFieldUpdater .newUpdater (DefaultPooledRSocket .class , Subscription .class , "s" );
41+ static final AtomicReferenceFieldUpdater <PooledWeightedRSocket , Subscription > S =
42+ AtomicReferenceFieldUpdater .newUpdater (PooledWeightedRSocket .class , Subscription .class , "s" );
4343
44- DefaultPooledRSocket (
44+ PooledWeightedRSocket (
4545 RSocketPool parent , LoadbalanceRSocketSource loadbalanceRSocketSource , Stats stats ) {
4646 this .parent = parent ;
4747 this .stats = stats ;
@@ -128,7 +128,7 @@ public void dispose() {
128128 protected void doOnDispose () {
129129 final RSocketPool parent = this .parent ;
130130 for (; ; ) {
131- final PooledRSocket [] sockets = parent .activeSockets ;
131+ final PooledWeightedRSocket [] sockets = parent .activeSockets ;
132132 final int activeSocketsCount = sockets .length ;
133133
134134 int index = -1 ;
@@ -144,7 +144,7 @@ protected void doOnDispose() {
144144 }
145145
146146 final int lastIndex = activeSocketsCount - 1 ;
147- final PooledRSocket [] newSockets = new PooledRSocket [lastIndex ];
147+ final PooledWeightedRSocket [] newSockets = new PooledWeightedRSocket [lastIndex ];
148148 if (index != 0 ) {
149149 System .arraycopy (sockets , 0 , newSockets , 0 , index );
150150 }
@@ -196,8 +196,7 @@ public Stats stats() {
196196 return stats ;
197197 }
198198
199- @ Override
200- public LoadbalanceRSocketSource source () {
199+ LoadbalanceRSocketSource source () {
201200 return loadbalanceRSocketSource ;
202201 }
203202
@@ -211,7 +210,7 @@ static final class RequestTrackingMonoInner<RESULT>
211210
212211 long startTime ;
213212
214- RequestTrackingMonoInner (DefaultPooledRSocket parent , Payload payload , FrameType requestType ) {
213+ RequestTrackingMonoInner (PooledWeightedRSocket parent , Payload payload , FrameType requestType ) {
215214 super (parent , payload , requestType );
216215 }
217216
@@ -245,7 +244,7 @@ public void accept(RSocket rSocket, Throwable t) {
245244 return ;
246245 }
247246
248- startTime = ((DefaultPooledRSocket ) parent ).stats .startRequest ();
247+ startTime = ((PooledWeightedRSocket ) parent ).stats .startRequest ();
249248
250249 source .subscribe ((CoreSubscriber ) this );
251250 } else {
@@ -257,7 +256,7 @@ public void accept(RSocket rSocket, Throwable t) {
257256 public void onComplete () {
258257 final long state = this .requested ;
259258 if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
260- final Stats stats = ((DefaultPooledRSocket ) parent ).stats ;
259+ final Stats stats = ((PooledWeightedRSocket ) parent ).stats ;
261260 final long now = stats .stopRequest (startTime );
262261 stats .record (now - startTime );
263262 super .onComplete ();
@@ -268,7 +267,7 @@ public void onComplete() {
268267 public void onError (Throwable t ) {
269268 final long state = this .requested ;
270269 if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
271- Stats stats = ((DefaultPooledRSocket ) parent ).stats ;
270+ Stats stats = ((PooledWeightedRSocket ) parent ).stats ;
272271 stats .stopRequest (startTime );
273272 stats .recordError (0.0 );
274273 super .onError (t );
@@ -284,7 +283,7 @@ public void cancel() {
284283
285284 if (state == STATE_SUBSCRIBED ) {
286285 this .s .cancel ();
287- ((DefaultPooledRSocket ) parent ).stats .stopRequest (startTime );
286+ ((PooledWeightedRSocket ) parent ).stats .stopRequest (startTime );
288287 } else {
289288 this .parent .remove (this );
290289 ReferenceCountUtil .safeRelease (this .payload );
@@ -296,7 +295,7 @@ static final class RequestTrackingFluxInner<INPUT>
296295 extends FluxDeferredResolution <INPUT , RSocket > {
297296
298297 RequestTrackingFluxInner (
299- DefaultPooledRSocket parent , INPUT fluxOrPayload , FrameType requestType ) {
298+ PooledWeightedRSocket parent , INPUT fluxOrPayload , FrameType requestType ) {
300299 super (parent , fluxOrPayload , requestType );
301300 }
302301
@@ -329,7 +328,7 @@ public void accept(RSocket rSocket, Throwable t) {
329328 return ;
330329 }
331330
332- ((DefaultPooledRSocket ) parent ).stats .startStream ();
331+ ((PooledWeightedRSocket ) parent ).stats .startStream ();
333332
334333 source .subscribe (this );
335334 } else {
@@ -341,7 +340,7 @@ public void accept(RSocket rSocket, Throwable t) {
341340 public void onComplete () {
342341 final long state = this .requested ;
343342 if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
344- ((DefaultPooledRSocket ) parent ).stats .stopStream ();
343+ ((PooledWeightedRSocket ) parent ).stats .stopStream ();
345344 super .onComplete ();
346345 }
347346 }
@@ -350,7 +349,7 @@ public void onComplete() {
350349 public void onError (Throwable t ) {
351350 final long state = this .requested ;
352351 if (state != TERMINATED_STATE && REQUESTED .compareAndSet (this , state , TERMINATED_STATE )) {
353- ((DefaultPooledRSocket ) parent ).stats .stopStream ();
352+ ((PooledWeightedRSocket ) parent ).stats .stopStream ();
354353 super .onError (t );
355354 }
356355 }
@@ -364,7 +363,7 @@ public void cancel() {
364363
365364 if (state == STATE_SUBSCRIBED ) {
366365 this .s .cancel ();
367- ((DefaultPooledRSocket ) parent ).stats .stopStream ();
366+ ((PooledWeightedRSocket ) parent ).stats .stopStream ();
368367 } else {
369368 this .parent .remove (this );
370369 if (requestType == FrameType .REQUEST_STREAM ) {
0 commit comments