@@ -75,8 +75,26 @@ namespace
7575 const USHORT CTL_VERSION1 = 1 ;
7676 const USHORT CTL_CURRENT_VERSION = CTL_VERSION1;
7777
78- volatile bool * shutdownPtr = NULL ;
78+ volatile bool shutdownFlag = false ;
7979 AtomicCounter activeThreads;
80+ Semaphore shutdownSemaphore;
81+
82+ int shutdownHandler (const int , const int , void *)
83+ {
84+ if (activeThreads.value ())
85+ {
86+ gds__log (" Shutting down the replication server with %d replicated database(s)" ,
87+ (int ) activeThreads.value ());
88+
89+ shutdownFlag = true ;
90+ shutdownSemaphore.release (activeThreads.value () + 1 );
91+
92+ while (activeThreads.value ())
93+ Thread::sleep (10 );
94+ }
95+
96+ return 0 ;
97+ }
8098
8199 struct ActiveTransaction
82100 {
@@ -626,7 +644,7 @@ namespace
626644 }
627645 }
628646
629- enum ProcessStatus { PROCESS_SUSPEND, PROCESS_CONTINUE, PROCESS_ERROR };
647+ enum ProcessStatus { PROCESS_SUSPEND, PROCESS_CONTINUE, PROCESS_ERROR, PROCESS_SHUTDOWN };
630648
631649 ProcessStatus process_archive (MemoryPool& pool, Target* target)
632650 {
@@ -645,6 +663,9 @@ namespace
645663 for (iter = PathUtils::newDirIterator (pool, config->sourceDirectory );
646664 *iter; ++(*iter))
647665 {
666+ if (shutdownFlag)
667+ return PROCESS_SHUTDOWN;
668+
648669 const auto filename = **iter;
649670
650671#ifdef PRESERVE_LOG
@@ -755,6 +776,9 @@ namespace
755776
756777 for (Segment** iter = queue.begin (); iter != queue.end (); ++iter)
757778 {
779+ if (shutdownFlag)
780+ return PROCESS_SHUTDOWN;
781+
758782 Segment* const segment = *iter;
759783 const FB_UINT64 sequence = segment->header .hdr_sequence ;
760784 const Guid& guid = segment->header .hdr_guid ;
@@ -845,6 +869,9 @@ namespace
845869 ULONG totalLength = sizeof (SegmentHeader);
846870 while (totalLength < segment->header .hdr_length )
847871 {
872+ if (shutdownFlag)
873+ return PROCESS_SHUTDOWN;
874+
848875 Block header;
849876 if (read (file, &header, sizeof (Block)) != sizeof (Block))
850877 raiseError (" Journal file %s read failed (error %d)" , segment->filename .c_str (), ERRNO);
@@ -959,14 +986,12 @@ namespace
959986
960987 THREAD_ENTRY_DECLARE process_thread (THREAD_ENTRY_PARAM arg)
961988 {
962- fb_assert (shutdownPtr);
963-
964989 AutoPtr<Target> target (static_cast <Target*>(arg));
965990 const auto config = target->getConfig ();
966991
967992 target->verbose (" Started replication thread" );
968993
969- while (!*shutdownPtr )
994+ while (!shutdownFlag )
970995 {
971996 AutoMemoryPool workingPool (MemoryPool::createPool ());
972997 ContextPoolHolder threadContext (workingPool);
@@ -978,42 +1003,47 @@ namespace
9781003
9791004 target->shutdown ();
9801005
981- if (!*shutdownPtr)
1006+ if (ret == PROCESS_SHUTDOWN)
1007+ break ;
1008+
1009+ if (!shutdownFlag)
9821010 {
9831011 const ULONG timeout =
9841012 (ret == PROCESS_SUSPEND) ? config->applyIdleTimeout : config->applyErrorTimeout ;
9851013
986- Thread::sleep (timeout * 1000 );
1014+ shutdownSemaphore. tryEnter (timeout);
9871015 }
9881016 }
9891017
9901018 target->verbose (" Finished replication thread" );
991-
9921019 --activeThreads;
9931020
9941021 return 0 ;
9951022 }
9961023}
9971024
998- bool REPL_server (CheckStatusWrapper* status, bool wait, bool * aShutdownPtr)
1025+
1026+ bool REPL_server (CheckStatusWrapper* status, bool wait)
9991027{
10001028 try
10011029 {
1002- shutdownPtr = aShutdownPtr ;
1030+ fb_shutdown_callback ( 0 , shutdownHandler, fb_shut_finish, 0 ) ;
10031031
10041032 TargetList targets;
10051033 readConfig (targets);
10061034
10071035 for (auto target : targets)
10081036 {
1037+ Thread::start (process_thread, target, THREAD_medium, NULL );
10091038 ++activeThreads;
1010- Thread::start ((ThreadEntryPoint*) process_thread, target, THREAD_medium, NULL );
10111039 }
10121040
10131041 if (wait)
10141042 {
1043+ shutdownSemaphore.enter ();
1044+
10151045 do {
1016- Thread::sleep (100 );
1046+ Thread::sleep (10 );
10171047 } while (activeThreads.value ());
10181048 }
10191049 }
0 commit comments