3939/**
4040 * Protocol implementation abstracted over a {@link DuplexConnection}.
4141 * <p>
42- * Concrete implementations of {@link DuplexConnection} over TCP, WebSockets, Aeron, etc can be passed to this class for protocol handling. The request handlers passed in at creation will be invoked
42+ * Concrete implementations of {@link DuplexConnection} over TCP, WebSockets,
43+ * Aeron, etc can be passed to this class for protocol handling. The request
44+ * handlers passed in at creation will be invoked
4345 * for each request over the connection.
4446 */
4547public class Responder {
@@ -52,7 +54,15 @@ public class Responder {
5254 private final Consumer <ConnectionSetupPayload > setupCallback ;
5355 private final boolean isServer ;
5456
55- private Responder (boolean isServer , DuplexConnection connection , ConnectionSetupHandler connectionHandler , RequestHandler requestHandler , LeaseGovernor leaseGovernor , Consumer <Throwable > errorStream , Consumer <ConnectionSetupPayload > setupCallback ) {
57+ private Responder (
58+ boolean isServer ,
59+ DuplexConnection connection ,
60+ ConnectionSetupHandler connectionHandler ,
61+ RequestHandler requestHandler ,
62+ LeaseGovernor leaseGovernor ,
63+ Consumer <Throwable > errorStream ,
64+ Consumer <ConnectionSetupPayload > setupCallback
65+ ) {
5666 this .isServer = isServer ;
5767 this .connection = connection ;
5868 this .connectionHandler = connectionHandler ;
@@ -64,42 +74,64 @@ private Responder(boolean isServer, DuplexConnection connection, ConnectionSetup
6474 }
6575
6676 /**
67- * @param connectionHandler
68- * Handle connection setup and set up request handling.
69- * @param errorStream
70- * A {@link Consumer<Throwable>} which will receive all errors that occurs processing requests.
71- * <p>
72- * This include fireAndForget which ONLY emit errors server-side via this mechanism.
77+ * @param connectionHandler Handle connection setup and set up request
78+ * handling.
79+ * @param errorStream A {@link Consumer<Throwable>} which will receive
80+ * all errors that occurs processing requests.
81+ * This include fireAndForget which ONLY emit errors
82+ * server-side via this mechanism.
7383 * @return responder instance
7484 */
75- public static <T > Responder createServerResponder (DuplexConnection connection , ConnectionSetupHandler connectionHandler , LeaseGovernor leaseGovernor , Consumer <Throwable > errorStream , Completable responderCompletable , Consumer <ConnectionSetupPayload > setupCallback ) {
76- Responder responder = new Responder (true , connection , connectionHandler , null , leaseGovernor , errorStream , setupCallback );
85+ public static <T > Responder createServerResponder (
86+ DuplexConnection connection ,
87+ ConnectionSetupHandler connectionHandler ,
88+ LeaseGovernor leaseGovernor ,
89+ Consumer <Throwable > errorStream ,
90+ Completable responderCompletable ,
91+ Consumer <ConnectionSetupPayload > setupCallback
92+ ) {
93+ Responder responder = new Responder (true , connection , connectionHandler , null ,
94+ leaseGovernor , errorStream , setupCallback );
7795 responder .start (responderCompletable );
7896 return responder ;
7997 }
8098
81- public static <T > Responder createServerResponder (DuplexConnection connection , ConnectionSetupHandler connectionHandler , LeaseGovernor leaseGovernor , Consumer <Throwable > errorStream , Completable responderCompletable ) {
82- return createServerResponder (connection , connectionHandler , leaseGovernor , errorStream , responderCompletable , s -> {});
99+ public static <T > Responder createServerResponder (
100+ DuplexConnection connection ,
101+ ConnectionSetupHandler connectionHandler ,
102+ LeaseGovernor leaseGovernor ,
103+ Consumer <Throwable > errorStream ,
104+ Completable responderCompletable
105+ ) {
106+ return createServerResponder (connection , connectionHandler , leaseGovernor ,
107+ errorStream , responderCompletable , s -> {});
83108 }
84109
85- public static <T > Responder createClientResponder (DuplexConnection connection , RequestHandler requestHandler , LeaseGovernor leaseGovernor , Consumer <Throwable > errorStream , Completable responderCompletable ) {
86- Responder responder = new Responder (false , connection , null , requestHandler , leaseGovernor , errorStream , s -> {});
110+ public static <T > Responder createClientResponder (
111+ DuplexConnection connection ,
112+ RequestHandler requestHandler ,
113+ LeaseGovernor leaseGovernor ,
114+ Consumer <Throwable > errorStream ,
115+ Completable responderCompletable
116+ ) {
117+ Responder responder = new Responder (false , connection , null , requestHandler ,
118+ leaseGovernor , errorStream , s -> {});
87119 responder .start (responderCompletable );
88120 return responder ;
89121 }
90122
91123 /**
92- * Send a LEASE frame immediately. Only way a LEASE is sent. Handled entirely by application logic.
124+ * Send a LEASE frame immediately. Only way a LEASE is sent. Handled
125+ * entirely by application logic.
93126 *
94127 * @param ttl of lease
95128 * @param numberOfRequests of lease
96129 */
97- public void sendLease (final int ttl , final int numberOfRequests )
98- {
99- connection .addOutput (PublisherUtils .just (Frame . Lease . from ( ttl , numberOfRequests , Frame . NULL_BYTEBUFFER ) ), new Completable () {
130+ public void sendLease (final int ttl , final int numberOfRequests ) {
131+ Frame leaseFrame = Frame . Lease . from ( ttl , numberOfRequests , Frame . NULL_BYTEBUFFER );
132+ connection .addOutput (PublisherUtils .just (leaseFrame ), new Completable () {
100133 @ Override
101- public void success () {
102- }
134+ public void success () {}
103135
104136 @ Override
105137 public void error (Throwable e ) {
@@ -113,8 +145,7 @@ public void error(Throwable e) {
113145 *
114146 * @return time from {@link System#nanoTime()} of last keepalive
115147 */
116- public long timeOfLastKeepalive ()
117- {
148+ public long timeOfLastKeepalive () {
118149 return timeOfLastKeepalive ;
119150 }
120151
@@ -124,8 +155,10 @@ private void start(final Completable responderCompletable) {
124155 /* streams in flight that can receive REQUEST_N messages */
125156 final Int2ObjectHashMap <SubscriptionArbiter > inFlight = new Int2ObjectHashMap <>();
126157 /* bidirectional channels */
127- final Int2ObjectHashMap <UnicastSubject <Payload >> channels = new Int2ObjectHashMap <>(); // TODO should/can we make this optional so that it only gets allocated per connection if channels are
128- // used?
158+ // TODO: should/can we make this optional so that it only gets allocated per connection if
159+ // channels are used?
160+ final Int2ObjectHashMap <UnicastSubject <Payload >> channels = new Int2ObjectHashMap <>();
161+
129162 final AtomicBoolean childTerminated = new AtomicBoolean (false );
130163 final AtomicReference <Disposable > transportSubscription = new AtomicReference <>();
131164
@@ -143,14 +176,16 @@ public void onSubscribe(Disposable d) {
143176 }
144177 }
145178
146- volatile RequestHandler requestHandler = !isServer ? clientRequestHandler : null ; // null until after first Setup frame
179+ // null until after first Setup frame
180+ volatile RequestHandler requestHandler = !isServer ? clientRequestHandler : null ;
147181
148182 @ Override
149183 public void onNext (Frame requestFrame ) {
150184 final int streamId = requestFrame .getStreamId ();
151185 if (requestHandler == null ) { // this will only happen when isServer==true
152186 if (childTerminated .get ()) {
153- // already terminated, but still receiving latent messages ... ignore them while shutdown occurs
187+ // already terminated, but still receiving latent messages...
188+ // ignore them while shutdown occurs
154189 return ;
155190 }
156191 if (requestFrame .getType ().equals (FrameType .SETUP )) {
@@ -210,44 +245,42 @@ public void onNext(Frame requestFrame) {
210245 return ;
211246 } else if (requestFrame .getType () == FrameType .REQUEST_N ) {
212247 SubscriptionArbiter inFlightSubscription = null ;
213- synchronized (Responder .this )
214- {
248+ synchronized (Responder .this ) {
215249 inFlightSubscription = inFlight .get (requestFrame .getStreamId ());
216250 }
217- if (inFlightSubscription != null )
218- {
219- inFlightSubscription .addApplicationRequest (Frame . RequestN . requestN ( requestFrame ) );
251+ if (inFlightSubscription != null ) {
252+ long requestN = Frame . RequestN . requestN ( requestFrame );
253+ inFlightSubscription .addApplicationRequest (requestN );
220254 return ;
221255 }
222- // TODO should we do anything if we don't find the stream? emitting an error is risky as the responder could have terminated and cleaned up already
256+ // TODO should we do anything if we don't find the stream? emitting an
257+ // error is risky as the responder could have terminated and cleaned up already
223258 } else if (requestFrame .getType () == FrameType .KEEPALIVE ) {
224259 // this client is alive.
225260 timeOfLastKeepalive = System .nanoTime ();
226261 // echo back if flag set
227- if (Frame .Keepalive .hasRespondFlag (requestFrame ))
228- {
262+ if (Frame .Keepalive .hasRespondFlag (requestFrame )) {
229263 responsePublisher = PublisherUtils .just (Frame .Keepalive .from (requestFrame .getData (), false ));
230- }
231- else
232- {
264+ } else {
233265 return ;
234266 }
267+ } else if (requestFrame .getType () == FrameType .LEASE ) {
268+ // LEASE only concerns the Requester
235269 } else {
236270 responsePublisher = PublisherUtils .errorFrame (streamId , new IllegalStateException ("Unexpected prefix: " + requestFrame .getType ()));
237271 }
238272 } catch (Throwable e ) {
239- // synchronous try/catch since we execute user functions in the handlers and they could throw
240- errorStream .accept (new RuntimeException ("Error in request handling." , e ));
241- // error message to user
242- responsePublisher = PublisherUtils .errorFrame (streamId , new RuntimeException ("Unhandled error processing request" ));
243- }
273+ // synchronous try/catch since we execute user functions in the handlers and they could throw
274+ errorStream .accept (new RuntimeException ("Error in request handling." , e ));
275+ // error message to user
276+ responsePublisher = PublisherUtils .errorFrame (streamId , new RuntimeException ("Unhandled error processing request" ));
277+ }
244278 } else {
245279 final RejectedException exception = new RejectedException ("No associated lease" );
246280 responsePublisher = PublisherUtils .errorFrame (streamId , exception );
247281 }
248282
249283 connection .addOutput (responsePublisher , new Completable () {
250-
251284 @ Override
252285 public void success () {
253286 // TODO Auto-generated method stub
0 commit comments