@@ -123,50 +123,74 @@ public ZeroMqChannel(ZContext context, boolean pubSub) {
123123
124124 Supplier <String > localPairConnection = () -> "inproc://" + getComponentName () + ".pair" ;
125125
126- Mono <?> proxyMono = proxyMono ();
127-
128- this .sendSocket =
129- proxyMono .publishOn (this .publisherScheduler )
130- .then (Mono .fromCallable (() ->
131- this .context .createSocket (
132- this .connectSendUrl == null
133- ? SocketType .PAIR
134- : (this .pubSub ? SocketType .XPUB : SocketType .PUSH ))
135- ))
136- .doOnNext (this .sendSocketConfigurer )
137- .doOnNext ((socket ) ->
138- socket .connect (this .connectSendUrl != null
139- ? this .connectSendUrl
140- : localPairConnection .get ()))
141- .delayUntil ((socket ) ->
142- (this .pubSub && this .connectSendUrl != null )
143- ? Mono .just (socket ).map (ZMQ .Socket ::recv )
144- : Mono .empty ())
145- .cache ()
146- .publishOn (this .publisherScheduler );
147-
148- this .subscribeSocket =
149- proxyMono .publishOn (this .subscriberScheduler )
150- .then (Mono .fromCallable (() ->
151- this .context .createSocket (
152- this .connectSubscribeUrl == null
153- ? SocketType .PAIR
154- : (this .pubSub ? SocketType .SUB : SocketType .PULL ))))
155- .doOnNext (this .subscribeSocketConfigurer )
156- .doOnNext ((socket ) -> {
157- if (this .connectSubscribeUrl != null ) {
158- socket .connect (this .connectSubscribeUrl );
159- if (this .pubSub ) {
160- socket .subscribe (ZMQ .SUBSCRIPTION_ALL );
161- }
162- }
163- else {
164- socket .bind (localPairConnection .get ());
165- }
166- })
167- .cache ()
168- .publishOn (this .subscriberScheduler );
126+ Mono <?> proxyMono = prepareProxyMono ();
127+ this .sendSocket = prepareSendSocketMono (localPairConnection , proxyMono );
128+ this .subscribeSocket = prepareSubscribeSocketMono (localPairConnection , proxyMono );
129+ this .subscriberData = prepareSubscriberDataFlux ();
130+ }
131+
132+ private Mono <Integer > prepareProxyMono () {
133+ if (this .zeroMqProxy != null ) {
134+ return Mono .fromCallable (() -> this .zeroMqProxy .getBackendPort ())
135+ .filter ((proxyPort ) -> proxyPort > 0 )
136+ .repeatWhenEmpty (100 , (repeat ) -> repeat .delayElements (Duration .ofMillis (100 ))) // NOSONAR
137+ .doOnNext ((proxyPort ) ->
138+ setConnectUrl ("tcp://localhost:" + this .zeroMqProxy .getFrontendPort () +
139+ ':' + this .zeroMqProxy .getBackendPort ()))
140+ .doOnError ((error ) ->
141+ logger .error ("The provided '" + this .zeroMqProxy + "' has not been started" , error ))
142+ .cache ();
143+ }
144+ else {
145+ return Mono .empty ();
146+ }
147+ }
169148
149+ private Mono <ZMQ .Socket > prepareSendSocketMono (Supplier <String > localPairConnection , Mono <?> proxyMono ) {
150+ return proxyMono .publishOn (this .publisherScheduler )
151+ .then (Mono .fromCallable (() ->
152+ this .context .createSocket (
153+ this .connectSendUrl == null
154+ ? SocketType .PAIR
155+ : (this .pubSub ? SocketType .XPUB : SocketType .PUSH ))
156+ ))
157+ .doOnNext (this .sendSocketConfigurer )
158+ .doOnNext ((socket ) ->
159+ socket .connect (this .connectSendUrl != null
160+ ? this .connectSendUrl
161+ : localPairConnection .get ()))
162+ .delayUntil ((socket ) ->
163+ (this .pubSub && this .connectSendUrl != null )
164+ ? Mono .just (socket ).map (ZMQ .Socket ::recv )
165+ : Mono .empty ())
166+ .cache ()
167+ .publishOn (this .publisherScheduler );
168+ }
169+
170+ private Mono <ZMQ .Socket > prepareSubscribeSocketMono (Supplier <String > localPairConnection , Mono <?> proxyMono ) {
171+ return proxyMono .publishOn (this .subscriberScheduler )
172+ .then (Mono .fromCallable (() ->
173+ this .context .createSocket (
174+ this .connectSubscribeUrl == null
175+ ? SocketType .PAIR
176+ : (this .pubSub ? SocketType .SUB : SocketType .PULL ))))
177+ .doOnNext (this .subscribeSocketConfigurer )
178+ .doOnNext ((socket ) -> {
179+ if (this .connectSubscribeUrl != null ) {
180+ socket .connect (this .connectSubscribeUrl );
181+ if (this .pubSub ) {
182+ socket .subscribe (ZMQ .SUBSCRIPTION_ALL );
183+ }
184+ }
185+ else {
186+ socket .bind (localPairConnection .get ());
187+ }
188+ })
189+ .cache ()
190+ .publishOn (this .subscriberScheduler );
191+ }
192+
193+ private Flux <? extends Message <?>> prepareSubscriberDataFlux () {
170194 Flux <? extends Message <?>> receiveData =
171195 this .subscribeSocket
172196 .flatMap ((socket ) -> {
@@ -192,26 +216,7 @@ public ZeroMqChannel(ZContext context, boolean pubSub) {
192216 receiveData .publish ()
193217 .autoConnect (1 , (disposable ) -> this .subscriberDataDisposable = disposable );
194218 }
195-
196- this .subscriberData = receiveData ;
197-
198- }
199-
200- private Mono <Integer > proxyMono () {
201- if (this .zeroMqProxy != null ) {
202- return Mono .fromCallable (() -> this .zeroMqProxy .getBackendPort ())
203- .filter ((proxyPort ) -> proxyPort > 0 )
204- .repeatWhenEmpty (100 , (repeat ) -> repeat .delayElements (Duration .ofMillis (100 ))) // NOSONAR
205- .doOnNext ((proxyPort ) ->
206- setConnectUrl ("tcp://localhost:" + this .zeroMqProxy .getFrontendPort () +
207- ':' + this .zeroMqProxy .getBackendPort ()))
208- .doOnError ((error ) ->
209- logger .error ("The provided '" + this .zeroMqProxy + "' has not been started" , error ))
210- .cache ();
211- }
212- else {
213- return Mono .empty ();
214- }
219+ return receiveData ;
215220 }
216221
217222 /**
0 commit comments