@@ -429,6 +429,7 @@ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
429429
430430 @ Test
431431 void consumerReattachesToOtherReplicaWhenReplicaGoesAway () throws Exception {
432+ LOGGER .info ("Stream name is {}" , stream );
432433 executorService = Executors .newCachedThreadPool ();
433434 Client metadataClient = cf .get (new Client .ClientParameters ().port (streamPortNode1 ()));
434435 Map <String , Client .StreamMetadata > metadata = metadataClient .metadata (stream );
@@ -514,42 +515,64 @@ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
514515
515516 CountDownLatch reconnectionLatch = new CountDownLatch (1 );
516517 AtomicReference <Client .ShutdownListener > shutdownListenerReference = new AtomicReference <>();
518+ Runnable resubscribe =
519+ () -> {
520+ AtomicInteger newReplicaPort = new AtomicInteger (-1 );
521+ waitAtMost (
522+ Duration .ofSeconds (5 ),
523+ () -> {
524+ try {
525+ Client .StreamMetadata m = metadataClient .metadata (stream ).get (stream );
526+ newReplicaPort .set (m .getReplicas ().get (0 ).getPort ());
527+ LOGGER .info ("Metadata: {}" , m );
528+ return true ;
529+ } catch (Exception e ) {
530+ return false ;
531+ }
532+ });
533+ LOGGER .info ("Replica port is {}" , newReplicaPort );
534+
535+ Client newConsumer =
536+ cf .get (
537+ new Client .ClientParameters ()
538+ .port (newReplicaPort .get ())
539+ .shutdownListener (shutdownListenerReference .get ())
540+ .chunkListener (credit ())
541+ .messageListener (messageListener ));
542+
543+ LOGGER .info ("Subscribing..." );
544+ newConsumer .subscribe (
545+ (byte ) 1 , stream , OffsetSpecification .offset (lastProcessedOffset .get () + 1 ), 10 );
546+ LOGGER .info ("Subscribed" );
547+
548+ generation .incrementAndGet ();
549+ reconnectionLatch .countDown ();
550+ LOGGER .info ("Shutdown listener done" );
551+ };
517552 Client .ShutdownListener shutdownListener =
518553 shutdownContext -> {
554+ LOGGER .info ("Shutdown reason: {}" , shutdownContext .getShutdownReason ());
519555 if (shutdownContext .getShutdownReason ()
520556 == Client .ShutdownContext .ShutdownReason .UNKNOWN ) {
521557 // avoid long-running task in the IO thread
522558 executorService .submit (
523559 () -> {
524- AtomicInteger newReplicaPort = new AtomicInteger (-1 );
525- waitAtMost (
526- Duration .ofSeconds (5 ),
527- () -> {
528- try {
529- Client .StreamMetadata m = metadataClient .metadata (stream ).get (stream );
530- newReplicaPort .set (m .getReplicas ().get (0 ).getPort ());
531- return true ;
532- } catch (Exception e ) {
533- return false ;
534- }
535- });
536-
537- Client newConsumer =
538- cf .get (
539- new Client .ClientParameters ()
540- .port (newReplicaPort .get ())
541- .shutdownListener (shutdownListenerReference .get ())
542- .chunkListener (credit ())
543- .messageListener (messageListener ));
544-
545- newConsumer .subscribe (
546- (byte ) 1 ,
547- stream ,
548- OffsetSpecification .offset (lastProcessedOffset .get () + 1 ),
549- 10 );
550-
551- generation .incrementAndGet ();
552- reconnectionLatch .countDown ();
560+ int attempts = 0 ;
561+ while (attempts < 3 ) {
562+ try {
563+ resubscribe .run ();
564+ break ;
565+ } catch (RuntimeException e ) {
566+ LOGGER .warn ("Error while re-subscribing: {}" , e .getMessage ());
567+ try {
568+ Thread .sleep (1000 );
569+ } catch (InterruptedException ex ) {
570+ Thread .currentThread ().interrupt ();
571+ break ;
572+ }
573+ attempts ++;
574+ }
575+ }
553576 });
554577 }
555578 };
0 commit comments