diff --git a/CHANGELOG.md b/CHANGELOG.md index e4c1f9f9..37ce8d24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Fixed incorrect shutdown of PostHog SDK in the worker. [#609](https://github.com/sourcebot-dev/sourcebot/pull/609) - Fixed race condition in job schedulers. [#607](https://github.com/sourcebot-dev/sourcebot/pull/607) +- Fixed connection sync jobs getting stuck in pending or in progress after restarting the worker. [#612](https://github.com/sourcebot-dev/sourcebot/pull/612) ### Added - Added force resync buttons for connections and repositories. [#610](https://github.com/sourcebot-dev/sourcebot/pull/610) diff --git a/packages/backend/src/configManager.ts b/packages/backend/src/configManager.ts index 55dbd6ed..6049a52f 100644 --- a/packages/backend/src/configManager.ts +++ b/packages/backend/src/configManager.ts @@ -93,8 +93,8 @@ export class ConfigManager { }); if (connectionNeedsSyncing) { - const [jobId] = await this.connectionManager.createJobs([connection]); - logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Created sync job ${jobId}.`); + logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Creating sync job.`); + await this.connectionManager.createJobs([connection]); } } } diff --git a/packages/backend/src/connectionManager.ts b/packages/backend/src/connectionManager.ts index ee17543a..bfb414df 100644 --- a/packages/backend/src/connectionManager.ts +++ b/packages/backend/src/connectionManager.ts @@ -11,10 +11,12 @@ import { groupmqLifecycleExceptionWrapper, setIntervalAsync } from "./utils.js"; import { syncSearchContexts } from "./ee/syncSearchContexts.js"; import { captureEvent } from "./posthog.js"; import { PromClient } from "./promClient.js"; +import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS } from "./constants.js"; const LOG_TAG = 'connection-manager'; const logger = createLogger(LOG_TAG); const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`); +const QUEUE_NAME = 'connection-sync-queue'; type JobPayload = { jobId: string, @@ -30,19 +32,19 @@ type JobResult = { const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 2; // 2 hour timeout export class ConnectionManager { - private worker: Worker; + private worker: Worker; private queue: Queue; private interval?: NodeJS.Timeout; constructor( private db: PrismaClient, private settings: Settings, - redis: Redis, + private redis: Redis, private promClient: PromClient, ) { this.queue = new Queue({ redis, - namespace: 'connection-sync-queue', + namespace: QUEUE_NAME, jobTimeoutMs: JOB_TIMEOUT_MS, maxAttempts: 3, logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true', @@ -62,6 +64,10 @@ export class ConnectionManager { this.worker.on('failed', this.onJobFailed.bind(this)); this.worker.on('stalled', this.onJobStalled.bind(this)); this.worker.on('error', this.onWorkerError.bind(this)); + // graceful-timeout is triggered when a job is still processing after + // worker.close() is called and the timeout period has elapsed. In this case, + // we fail the job with no retry. + this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this)); } public startScheduler() { @@ -128,6 +134,7 @@ export class ConnectionManager { }); for (const job of jobs) { + logger.info(`Scheduling job ${job.id} for connection ${job.connection.name} (id: ${job.connectionId})`); await this.queue.add({ groupId: `connection:${job.connectionId}`, data: { @@ -150,6 +157,22 @@ export class ConnectionManager { const logger = createJobLogger(jobId); logger.info(`Running connection sync job ${jobId} for connection ${connectionName} (id: ${job.data.connectionId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`); + const currentStatus = await this.db.connectionSyncJob.findUniqueOrThrow({ + where: { + id: jobId, + }, + select: { + status: true, + } + }); + + // Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job + // is in an invalid state and should be skipped. + if (currentStatus.status !== ConnectionSyncJobStatus.PENDING && currentStatus.status !== ConnectionSyncJobStatus.IN_PROGRESS) { + throw new Error(`Job ${jobId} is not in a valid state. Expected: ${ConnectionSyncJobStatus.PENDING} or ${ConnectionSyncJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`); + } + + this.promClient.pendingConnectionSyncJobs.dec({ connection: connectionName }); this.promClient.activeConnectionSyncJobs.inc({ connection: connectionName }); @@ -178,7 +201,7 @@ export class ConnectionManager { const result = await (async () => { switch (config.type) { case 'github': { - return await compileGithubConfig(config, job.data.connectionId, abortController); + return await compileGithubConfig(config, job.data.connectionId, abortController.signal); } case 'gitlab': { return await compileGitlabConfig(config, job.data.connectionId); @@ -200,7 +223,7 @@ export class ConnectionManager { } } })(); - + let { repoData, warnings } = result; await this.db.connectionSyncJob.update({ @@ -383,6 +406,33 @@ export class ConnectionManager { }); }); + private onJobGracefulTimeout = async (job: Job) => + groupmqLifecycleExceptionWrapper('onJobGracefulTimeout', logger, async () => { + const logger = createJobLogger(job.id); + + const { connection } = await this.db.connectionSyncJob.update({ + where: { id: job.id }, + data: { + status: ConnectionSyncJobStatus.FAILED, + completedAt: new Date(), + errorMessage: 'Job timed out', + }, + select: { + connection: true, + } + }); + + this.promClient.activeConnectionSyncJobs.dec({ connection: connection.name }); + this.promClient.connectionSyncJobFailTotal.inc({ connection: connection.name }); + + logger.error(`Job ${job.id} timed out for connection ${connection.name} (id: ${connection.id})`); + + captureEvent('backend_connection_sync_job_failed', { + connectionId: connection.id, + error: 'Job timed out', + }); + }); + private async onWorkerError(error: Error) { Sentry.captureException(error); logger.error(`Connection syncer worker error.`, error); @@ -392,8 +442,28 @@ export class ConnectionManager { if (this.interval) { clearInterval(this.interval); } - await this.worker.close(); - await this.queue.close(); + + const inProgressJobs = this.worker.getCurrentJobs(); + await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS); + + // Manually release group locks for in progress jobs to prevent deadlocks. + // @see: https://github.com/Openpanel-dev/groupmq/issues/8 + for (const { job } of inProgressJobs) { + const lockKey = `groupmq:${QUEUE_NAME}:lock:${job.groupId}`; + logger.debug(`Releasing group lock ${lockKey} for in progress job ${job.id}`); + try { + await this.redis.del(lockKey); + } catch (error) { + Sentry.captureException(error); + logger.error(`Failed to release group lock ${lockKey} for in progress job ${job.id}. Error: `, error); + } + } + + // @note: As of groupmq v1.0.0, queue.close() will just close the underlying + // redis connection. Since we share the same redis client between, skip this + // step and close the redis client directly in index.ts. + // @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900 + // await this.queue.close(); } } diff --git a/packages/backend/src/constants.ts b/packages/backend/src/constants.ts index a52d822e..b11f5102 100644 --- a/packages/backend/src/constants.ts +++ b/packages/backend/src/constants.ts @@ -10,4 +10,24 @@ export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES: CodeHostType[] = [ ]; export const REPOS_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'repos'); -export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index'); \ No newline at end of file +export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index'); + +// Maximum time to wait for current job to finish +export const GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS = 5 * 1000; // 5 seconds + +// List of shutdown signals +export const SHUTDOWN_SIGNALS: string[] = [ + 'SIGHUP', + 'SIGINT', + 'SIGQUIT', + 'SIGILL', + 'SIGTRAP', + 'SIGABRT', + 'SIGBUS', + 'SIGFPE', + 'SIGSEGV', + 'SIGUSR2', + 'SIGTERM', + // @note: SIGKILL and SIGSTOP cannot have listeners installed. + // @see: https://nodejs.org/api/process.html#signal-events +]; diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 5e6d6ba0..c3674834 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -1,5 +1,6 @@ import "./instrument.js"; +import * as Sentry from "@sentry/node"; import { PrismaClient } from "@sourcebot/db"; import { createLogger, env, getConfigSettings, getDBConnectionString, hasEntitlement } from "@sourcebot/shared"; import 'express-async-errors'; @@ -9,7 +10,7 @@ import { Redis } from 'ioredis'; import { Api } from "./api.js"; import { ConfigManager } from "./configManager.js"; import { ConnectionManager } from './connectionManager.js'; -import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js'; +import { INDEX_CACHE_DIR, REPOS_CACHE_DIR, SHUTDOWN_SIGNALS } from './constants.js'; import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js"; import { GithubAppManager } from "./ee/githubAppManager.js"; import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js'; @@ -17,6 +18,7 @@ import { shutdownPosthog } from "./posthog.js"; import { PromClient } from './promClient.js'; import { RepoIndexManager } from "./repoIndexManager.js"; + const logger = createLogger('backend-entrypoint'); const reposPath = REPOS_CACHE_DIR; @@ -40,13 +42,14 @@ const prisma = new PrismaClient({ const redis = new Redis(env.REDIS_URL, { maxRetriesPerRequest: null }); -redis.ping().then(() => { + +try { + await redis.ping(); logger.info('Connected to redis'); -}).catch((err: unknown) => { - logger.error('Failed to connect to redis'); - logger.error(err); +} catch (err: unknown) { + logger.error('Failed to connect to redis. Error:', err); process.exit(1); -}); +} const promClient = new PromClient(); @@ -83,45 +86,65 @@ const api = new Api( logger.info('Worker started.'); -const cleanup = async (signal: string) => { - logger.info(`Received ${signal}, cleaning up...`); - - const shutdownTimeout = 30000; // 30 seconds - - try { - await Promise.race([ - Promise.all([ - repoIndexManager.dispose(), - connectionManager.dispose(), - repoPermissionSyncer.dispose(), - accountPermissionSyncer.dispose(), - configManager.dispose(), - ]), - new Promise((_, reject) => - setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout) - ) - ]); - logger.info('All workers shut down gracefully'); - } catch (error) { - logger.warn('Shutdown timeout or error, forcing exit:', error instanceof Error ? error.message : String(error)); +const listenToShutdownSignals = () => { + const signals = SHUTDOWN_SIGNALS; + + let receivedSignal = false; + + const cleanup = async (signal: string) => { + try { + if (receivedSignal) { + logger.debug(`Recieved repeat signal ${signal}, ignoring.`); + return; + } + receivedSignal = true; + + logger.info(`Received ${signal}, cleaning up...`); + + await repoIndexManager.dispose() + await connectionManager.dispose() + await repoPermissionSyncer.dispose() + await accountPermissionSyncer.dispose() + await configManager.dispose() + + await prisma.$disconnect(); + await redis.quit(); + await api.dispose(); + await shutdownPosthog(); + + + logger.info('All workers shut down gracefully'); + signals.forEach(sig => process.removeListener(sig, cleanup)); + } catch (error) { + Sentry.captureException(error); + logger.error('Error shutting down worker:', error); + } } - await prisma.$disconnect(); - await redis.quit(); - await api.dispose(); - await shutdownPosthog(); -} + signals.forEach(signal => { + process.on(signal, (err) => { + cleanup(err).finally(() => { + process.kill(process.pid, signal); + }); + }); + }); + + // Register handlers for uncaught exceptions and unhandled rejections + process.on('uncaughtException', (err) => { + logger.error(`Uncaught exception: ${err.message}`); + cleanup('uncaughtException').finally(() => { + process.exit(1); + }); + }); + + process.on('unhandledRejection', (reason, promise) => { + logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`); + cleanup('unhandledRejection').finally(() => { + process.exit(1); + }); + }); -process.on('SIGINT', () => cleanup('SIGINT').finally(() => process.exit(0))); -process.on('SIGTERM', () => cleanup('SIGTERM').finally(() => process.exit(0))); -// Register handlers for uncaught exceptions and unhandled rejections -process.on('uncaughtException', (err) => { - logger.error(`Uncaught exception: ${err.message}`); - cleanup('uncaughtException').finally(() => process.exit(1)); -}); +} -process.on('unhandledRejection', (reason, promise) => { - logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`); - cleanup('unhandledRejection').finally(() => process.exit(1)); -}); +listenToShutdownSignals(); diff --git a/packages/backend/src/repoCompileUtils.ts b/packages/backend/src/repoCompileUtils.ts index 10c748a8..5b2c0349 100644 --- a/packages/backend/src/repoCompileUtils.ts +++ b/packages/backend/src/repoCompileUtils.ts @@ -39,8 +39,8 @@ type CompileResult = { export const compileGithubConfig = async ( config: GithubConnectionConfig, connectionId: number, - abortController: AbortController): Promise => { - const gitHubReposResult = await getGitHubReposFromConfig(config, abortController.signal); + signal: AbortSignal): Promise => { + const gitHubReposResult = await getGitHubReposFromConfig(config, signal); const gitHubRepos = gitHubReposResult.repos; const warnings = gitHubReposResult.warnings; diff --git a/packages/backend/src/repoIndexManager.ts b/packages/backend/src/repoIndexManager.ts index 17ed2d8a..5a576d05 100644 --- a/packages/backend/src/repoIndexManager.ts +++ b/packages/backend/src/repoIndexManager.ts @@ -7,7 +7,7 @@ import { readdir, rm } from 'fs/promises'; import { Job, Queue, ReservedJob, Worker } from "groupmq"; import { Redis } from 'ioredis'; import micromatch from 'micromatch'; -import { INDEX_CACHE_DIR } from './constants.js'; +import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS, INDEX_CACHE_DIR } from './constants.js'; import { cloneRepository, fetchRepository, getBranches, getCommitHashForRefName, getTags, isPathAValidGitRepoRoot, unsetGitConfig, upsertGitConfig } from './git.js'; import { captureEvent } from './posthog.js'; import { PromClient } from './promClient.js'; @@ -45,7 +45,7 @@ export class RepoIndexManager { constructor( private db: PrismaClient, private settings: Settings, - redis: Redis, + private redis: Redis, private promClient: PromClient, ) { this.queue = new Queue({ @@ -70,6 +70,10 @@ export class RepoIndexManager { this.worker.on('failed', this.onJobFailed.bind(this)); this.worker.on('stalled', this.onJobStalled.bind(this)); this.worker.on('error', this.onWorkerError.bind(this)); + // graceful-timeout is triggered when a job is still processing after + // worker.close() is called and the timeout period has elapsed. In this case, + // we fail the job with no retry. + this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this)); } public startScheduler() { @@ -230,6 +234,23 @@ export class RepoIndexManager { const logger = createJobLogger(id); logger.info(`Running ${job.data.type} job ${id} for repo ${job.data.repoName} (id: ${job.data.repoId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`); + const currentStatus = await this.db.repoIndexingJob.findUniqueOrThrow({ + where: { + id, + }, + select: { + status: true, + } + }); + + // Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job + // is in an invalid state and should be skipped. + if ( + currentStatus.status !== RepoIndexingJobStatus.PENDING && + currentStatus.status !== RepoIndexingJobStatus.IN_PROGRESS + ) { + throw new Error(`Job ${id} is not in a valid state. Expected: ${RepoIndexingJobStatus.PENDING} or ${RepoIndexingJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`); + } const { repo, type: jobType } = await this.db.repoIndexingJob.update({ where: { @@ -540,6 +561,28 @@ export class RepoIndexManager { logger.error(`Job ${jobId} stalled for repo ${repo.name} (id: ${repo.id})`); }); + private onJobGracefulTimeout = async (job: Job) => + groupmqLifecycleExceptionWrapper('onJobGracefulTimeout', logger, async () => { + const logger = createJobLogger(job.data.jobId); + const jobTypeLabel = getJobTypePrometheusLabel(job.data.type); + + const { repo } = await this.db.repoIndexingJob.update({ + where: { id: job.data.jobId }, + data: { + status: RepoIndexingJobStatus.FAILED, + completedAt: new Date(), + errorMessage: 'Job timed out', + }, + select: { repo: true } + }); + + this.promClient.activeRepoIndexJobs.dec({ repo: job.data.repoName, type: jobTypeLabel }); + this.promClient.repoIndexJobFailTotal.inc({ repo: job.data.repoName, type: jobTypeLabel }); + + logger.error(`Job ${job.data.jobId} timed out for repo ${repo.name} (id: ${repo.id}). Failing job.`); + + }); + private async onWorkerError(error: Error) { Sentry.captureException(error); logger.error(`Index syncer worker error.`, error); @@ -549,8 +592,20 @@ export class RepoIndexManager { if (this.interval) { clearInterval(this.interval); } - await this.worker.close(); - await this.queue.close(); + const inProgressJobs = this.worker.getCurrentJobs(); + await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS); + // Manually release group locks for in progress jobs to prevent deadlocks. + // @see: https://github.com/Openpanel-dev/groupmq/issues/8 + for (const { job } of inProgressJobs) { + const lockKey = `groupmq:repo-index-queue:lock:${job.groupId}`; + logger.debug(`Releasing group lock ${lockKey} for in progress job ${job.id}`); + await this.redis.del(lockKey); + } + + // @note: As of groupmq v1.0.0, queue.close() will just close the underlying + // redis connection. Since we share the same redis client between, skip this + // step and close the redis client directly in index.ts. + // await this.queue.close(); } }