Skip to content

Commit 903d15a

Browse files
fix(worker): Fix issues with gracefully shutting down (#612)
1 parent 18fad64 commit 903d15a

File tree

7 files changed

+228
-59
lines changed

7 files changed

+228
-59
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Fixed
1111
- Fixed incorrect shutdown of PostHog SDK in the worker. [#609](https://github.com/sourcebot-dev/sourcebot/pull/609)
1212
- Fixed race condition in job schedulers. [#607](https://github.com/sourcebot-dev/sourcebot/pull/607)
13+
- Fixed connection sync jobs getting stuck in pending or in progress after restarting the worker. [#612](https://github.com/sourcebot-dev/sourcebot/pull/612)
1314

1415
### Added
1516
- Added force resync buttons for connections and repositories. [#610](https://github.com/sourcebot-dev/sourcebot/pull/610)

packages/backend/src/configManager.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ export class ConfigManager {
9393
});
9494

9595
if (connectionNeedsSyncing) {
96-
const [jobId] = await this.connectionManager.createJobs([connection]);
97-
logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Created sync job ${jobId}.`);
96+
logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Creating sync job.`);
97+
await this.connectionManager.createJobs([connection]);
9898
}
9999
}
100100
}

packages/backend/src/connectionManager.ts

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ import { groupmqLifecycleExceptionWrapper, setIntervalAsync } from "./utils.js";
1111
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
1212
import { captureEvent } from "./posthog.js";
1313
import { PromClient } from "./promClient.js";
14+
import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS } from "./constants.js";
1415

1516
const LOG_TAG = 'connection-manager';
1617
const logger = createLogger(LOG_TAG);
1718
const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`);
19+
const QUEUE_NAME = 'connection-sync-queue';
1820

1921
type JobPayload = {
2022
jobId: string,
@@ -30,19 +32,19 @@ type JobResult = {
3032
const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 2; // 2 hour timeout
3133

3234
export class ConnectionManager {
33-
private worker: Worker;
35+
private worker: Worker<JobPayload>;
3436
private queue: Queue<JobPayload>;
3537
private interval?: NodeJS.Timeout;
3638

3739
constructor(
3840
private db: PrismaClient,
3941
private settings: Settings,
40-
redis: Redis,
42+
private redis: Redis,
4143
private promClient: PromClient,
4244
) {
4345
this.queue = new Queue<JobPayload>({
4446
redis,
45-
namespace: 'connection-sync-queue',
47+
namespace: QUEUE_NAME,
4648
jobTimeoutMs: JOB_TIMEOUT_MS,
4749
maxAttempts: 3,
4850
logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true',
@@ -62,6 +64,10 @@ export class ConnectionManager {
6264
this.worker.on('failed', this.onJobFailed.bind(this));
6365
this.worker.on('stalled', this.onJobStalled.bind(this));
6466
this.worker.on('error', this.onWorkerError.bind(this));
67+
// graceful-timeout is triggered when a job is still processing after
68+
// worker.close() is called and the timeout period has elapsed. In this case,
69+
// we fail the job with no retry.
70+
this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this));
6571
}
6672

6773
public startScheduler() {
@@ -128,6 +134,7 @@ export class ConnectionManager {
128134
});
129135

130136
for (const job of jobs) {
137+
logger.info(`Scheduling job ${job.id} for connection ${job.connection.name} (id: ${job.connectionId})`);
131138
await this.queue.add({
132139
groupId: `connection:${job.connectionId}`,
133140
data: {
@@ -150,6 +157,22 @@ export class ConnectionManager {
150157
const logger = createJobLogger(jobId);
151158
logger.info(`Running connection sync job ${jobId} for connection ${connectionName} (id: ${job.data.connectionId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);
152159

160+
const currentStatus = await this.db.connectionSyncJob.findUniqueOrThrow({
161+
where: {
162+
id: jobId,
163+
},
164+
select: {
165+
status: true,
166+
}
167+
});
168+
169+
// Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job
170+
// is in an invalid state and should be skipped.
171+
if (currentStatus.status !== ConnectionSyncJobStatus.PENDING && currentStatus.status !== ConnectionSyncJobStatus.IN_PROGRESS) {
172+
throw new Error(`Job ${jobId} is not in a valid state. Expected: ${ConnectionSyncJobStatus.PENDING} or ${ConnectionSyncJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`);
173+
}
174+
175+
153176
this.promClient.pendingConnectionSyncJobs.dec({ connection: connectionName });
154177
this.promClient.activeConnectionSyncJobs.inc({ connection: connectionName });
155178

@@ -178,7 +201,7 @@ export class ConnectionManager {
178201
const result = await (async () => {
179202
switch (config.type) {
180203
case 'github': {
181-
return await compileGithubConfig(config, job.data.connectionId, abortController);
204+
return await compileGithubConfig(config, job.data.connectionId, abortController.signal);
182205
}
183206
case 'gitlab': {
184207
return await compileGitlabConfig(config, job.data.connectionId);
@@ -200,7 +223,7 @@ export class ConnectionManager {
200223
}
201224
}
202225
})();
203-
226+
204227
let { repoData, warnings } = result;
205228

206229
await this.db.connectionSyncJob.update({
@@ -383,6 +406,33 @@ export class ConnectionManager {
383406
});
384407
});
385408

409+
private onJobGracefulTimeout = async (job: Job<JobPayload>) =>
410+
groupmqLifecycleExceptionWrapper('onJobGracefulTimeout', logger, async () => {
411+
const logger = createJobLogger(job.id);
412+
413+
const { connection } = await this.db.connectionSyncJob.update({
414+
where: { id: job.id },
415+
data: {
416+
status: ConnectionSyncJobStatus.FAILED,
417+
completedAt: new Date(),
418+
errorMessage: 'Job timed out',
419+
},
420+
select: {
421+
connection: true,
422+
}
423+
});
424+
425+
this.promClient.activeConnectionSyncJobs.dec({ connection: connection.name });
426+
this.promClient.connectionSyncJobFailTotal.inc({ connection: connection.name });
427+
428+
logger.error(`Job ${job.id} timed out for connection ${connection.name} (id: ${connection.id})`);
429+
430+
captureEvent('backend_connection_sync_job_failed', {
431+
connectionId: connection.id,
432+
error: 'Job timed out',
433+
});
434+
});
435+
386436
private async onWorkerError(error: Error) {
387437
Sentry.captureException(error);
388438
logger.error(`Connection syncer worker error.`, error);
@@ -392,8 +442,28 @@ export class ConnectionManager {
392442
if (this.interval) {
393443
clearInterval(this.interval);
394444
}
395-
await this.worker.close();
396-
await this.queue.close();
445+
446+
const inProgressJobs = this.worker.getCurrentJobs();
447+
await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS);
448+
449+
// Manually release group locks for in progress jobs to prevent deadlocks.
450+
// @see: https://github.com/Openpanel-dev/groupmq/issues/8
451+
for (const { job } of inProgressJobs) {
452+
const lockKey = `groupmq:${QUEUE_NAME}:lock:${job.groupId}`;
453+
logger.debug(`Releasing group lock ${lockKey} for in progress job ${job.id}`);
454+
try {
455+
await this.redis.del(lockKey);
456+
} catch (error) {
457+
Sentry.captureException(error);
458+
logger.error(`Failed to release group lock ${lockKey} for in progress job ${job.id}. Error: `, error);
459+
}
460+
}
461+
462+
// @note: As of groupmq v1.0.0, queue.close() will just close the underlying
463+
// redis connection. Since we share the same redis client between, skip this
464+
// step and close the redis client directly in index.ts.
465+
// @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900
466+
// await this.queue.close();
397467
}
398468
}
399469

packages/backend/src/constants.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,24 @@ export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES: CodeHostType[] = [
1010
];
1111

1212
export const REPOS_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'repos');
13-
export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index');
13+
export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index');
14+
15+
// Maximum time to wait for current job to finish
16+
export const GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS = 5 * 1000; // 5 seconds
17+
18+
// List of shutdown signals
19+
export const SHUTDOWN_SIGNALS: string[] = [
20+
'SIGHUP',
21+
'SIGINT',
22+
'SIGQUIT',
23+
'SIGILL',
24+
'SIGTRAP',
25+
'SIGABRT',
26+
'SIGBUS',
27+
'SIGFPE',
28+
'SIGSEGV',
29+
'SIGUSR2',
30+
'SIGTERM',
31+
// @note: SIGKILL and SIGSTOP cannot have listeners installed.
32+
// @see: https://nodejs.org/api/process.html#signal-events
33+
];

packages/backend/src/index.ts

Lines changed: 66 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import "./instrument.js";
22

3+
import * as Sentry from "@sentry/node";
34
import { PrismaClient } from "@sourcebot/db";
45
import { createLogger, env, getConfigSettings, getDBConnectionString, hasEntitlement } from "@sourcebot/shared";
56
import 'express-async-errors';
@@ -9,14 +10,15 @@ import { Redis } from 'ioredis';
910
import { Api } from "./api.js";
1011
import { ConfigManager } from "./configManager.js";
1112
import { ConnectionManager } from './connectionManager.js';
12-
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js';
13+
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR, SHUTDOWN_SIGNALS } from './constants.js';
1314
import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js";
1415
import { GithubAppManager } from "./ee/githubAppManager.js";
1516
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
1617
import { shutdownPosthog } from "./posthog.js";
1718
import { PromClient } from './promClient.js';
1819
import { RepoIndexManager } from "./repoIndexManager.js";
1920

21+
2022
const logger = createLogger('backend-entrypoint');
2123

2224
const reposPath = REPOS_CACHE_DIR;
@@ -40,13 +42,14 @@ const prisma = new PrismaClient({
4042
const redis = new Redis(env.REDIS_URL, {
4143
maxRetriesPerRequest: null
4244
});
43-
redis.ping().then(() => {
45+
46+
try {
47+
await redis.ping();
4448
logger.info('Connected to redis');
45-
}).catch((err: unknown) => {
46-
logger.error('Failed to connect to redis');
47-
logger.error(err);
49+
} catch (err: unknown) {
50+
logger.error('Failed to connect to redis. Error:', err);
4851
process.exit(1);
49-
});
52+
}
5053

5154
const promClient = new PromClient();
5255

@@ -83,45 +86,65 @@ const api = new Api(
8386

8487
logger.info('Worker started.');
8588

86-
const cleanup = async (signal: string) => {
87-
logger.info(`Received ${signal}, cleaning up...`);
88-
89-
const shutdownTimeout = 30000; // 30 seconds
90-
91-
try {
92-
await Promise.race([
93-
Promise.all([
94-
repoIndexManager.dispose(),
95-
connectionManager.dispose(),
96-
repoPermissionSyncer.dispose(),
97-
accountPermissionSyncer.dispose(),
98-
configManager.dispose(),
99-
]),
100-
new Promise((_, reject) =>
101-
setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout)
102-
)
103-
]);
104-
logger.info('All workers shut down gracefully');
105-
} catch (error) {
106-
logger.warn('Shutdown timeout or error, forcing exit:', error instanceof Error ? error.message : String(error));
89+
const listenToShutdownSignals = () => {
90+
const signals = SHUTDOWN_SIGNALS;
91+
92+
let receivedSignal = false;
93+
94+
const cleanup = async (signal: string) => {
95+
try {
96+
if (receivedSignal) {
97+
logger.debug(`Recieved repeat signal ${signal}, ignoring.`);
98+
return;
99+
}
100+
receivedSignal = true;
101+
102+
logger.info(`Received ${signal}, cleaning up...`);
103+
104+
await repoIndexManager.dispose()
105+
await connectionManager.dispose()
106+
await repoPermissionSyncer.dispose()
107+
await accountPermissionSyncer.dispose()
108+
await configManager.dispose()
109+
110+
await prisma.$disconnect();
111+
await redis.quit();
112+
await api.dispose();
113+
await shutdownPosthog();
114+
115+
116+
logger.info('All workers shut down gracefully');
117+
signals.forEach(sig => process.removeListener(sig, cleanup));
118+
} catch (error) {
119+
Sentry.captureException(error);
120+
logger.error('Error shutting down worker:', error);
121+
}
107122
}
108123

109-
await prisma.$disconnect();
110-
await redis.quit();
111-
await api.dispose();
112-
await shutdownPosthog();
113-
}
124+
signals.forEach(signal => {
125+
process.on(signal, (err) => {
126+
cleanup(err).finally(() => {
127+
process.kill(process.pid, signal);
128+
});
129+
});
130+
});
131+
132+
// Register handlers for uncaught exceptions and unhandled rejections
133+
process.on('uncaughtException', (err) => {
134+
logger.error(`Uncaught exception: ${err.message}`);
135+
cleanup('uncaughtException').finally(() => {
136+
process.exit(1);
137+
});
138+
});
139+
140+
process.on('unhandledRejection', (reason, promise) => {
141+
logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
142+
cleanup('unhandledRejection').finally(() => {
143+
process.exit(1);
144+
});
145+
});
114146

115-
process.on('SIGINT', () => cleanup('SIGINT').finally(() => process.exit(0)));
116-
process.on('SIGTERM', () => cleanup('SIGTERM').finally(() => process.exit(0)));
117147

118-
// Register handlers for uncaught exceptions and unhandled rejections
119-
process.on('uncaughtException', (err) => {
120-
logger.error(`Uncaught exception: ${err.message}`);
121-
cleanup('uncaughtException').finally(() => process.exit(1));
122-
});
148+
}
123149

124-
process.on('unhandledRejection', (reason, promise) => {
125-
logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
126-
cleanup('unhandledRejection').finally(() => process.exit(1));
127-
});
150+
listenToShutdownSignals();

packages/backend/src/repoCompileUtils.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ type CompileResult = {
3939
export const compileGithubConfig = async (
4040
config: GithubConnectionConfig,
4141
connectionId: number,
42-
abortController: AbortController): Promise<CompileResult> => {
43-
const gitHubReposResult = await getGitHubReposFromConfig(config, abortController.signal);
42+
signal: AbortSignal): Promise<CompileResult> => {
43+
const gitHubReposResult = await getGitHubReposFromConfig(config, signal);
4444
const gitHubRepos = gitHubReposResult.repos;
4545
const warnings = gitHubReposResult.warnings;
4646

0 commit comments

Comments
 (0)