Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 77 additions & 7 deletions packages/backend/src/connectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import { groupmqLifecycleExceptionWrapper, setIntervalAsync } from "./utils.js";
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
import { captureEvent } from "./posthog.js";
import { PromClient } from "./promClient.js";
import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS } from "./constants.js";

const LOG_TAG = 'connection-manager';
const logger = createLogger(LOG_TAG);
const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`);
const QUEUE_NAME = 'connection-sync-queue';

type JobPayload = {
jobId: string,
Expand All @@ -30,19 +32,19 @@ type JobResult = {
const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 2; // 2 hour timeout

export class ConnectionManager {
private worker: Worker;
private worker: Worker<JobPayload>;
private queue: Queue<JobPayload>;
private interval?: NodeJS.Timeout;

constructor(
private db: PrismaClient,
private settings: Settings,
redis: Redis,
private redis: Redis,
private promClient: PromClient,
) {
this.queue = new Queue<JobPayload>({
redis,
namespace: 'connection-sync-queue',
namespace: QUEUE_NAME,
jobTimeoutMs: JOB_TIMEOUT_MS,
maxAttempts: 3,
logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true',
Expand All @@ -62,6 +64,10 @@ export class ConnectionManager {
this.worker.on('failed', this.onJobFailed.bind(this));
this.worker.on('stalled', this.onJobStalled.bind(this));
this.worker.on('error', this.onWorkerError.bind(this));
// graceful-timeout is triggered when a job is still processing after
// worker.close() is called and the timeout period has elapsed. In this case,
// we fail the job with no retry.
this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this));
}

public startScheduler() {
Expand Down Expand Up @@ -128,6 +134,7 @@ export class ConnectionManager {
});

for (const job of jobs) {
logger.info(`Scheduling job ${job.id} for connection ${job.connection.name} (id: ${job.connectionId})`);
await this.queue.add({
groupId: `connection:${job.connectionId}`,
data: {
Expand All @@ -150,6 +157,22 @@ export class ConnectionManager {
const logger = createJobLogger(jobId);
logger.info(`Running connection sync job ${jobId} for connection ${connectionName} (id: ${job.data.connectionId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);

const currentStatus = await this.db.connectionSyncJob.findUniqueOrThrow({
where: {
id: jobId,
},
select: {
status: true,
}
});

// Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job
// is in an invalid state and should be skipped.
if (currentStatus.status !== ConnectionSyncJobStatus.PENDING && currentStatus.status !== ConnectionSyncJobStatus.IN_PROGRESS) {
throw new Error(`Job ${jobId} is not in a valid state. Expected: ${ConnectionSyncJobStatus.PENDING} or ${ConnectionSyncJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`);
}


this.promClient.pendingConnectionSyncJobs.dec({ connection: connectionName });
this.promClient.activeConnectionSyncJobs.inc({ connection: connectionName });

Expand Down Expand Up @@ -178,7 +201,7 @@ export class ConnectionManager {
const result = await (async () => {
switch (config.type) {
case 'github': {
return await compileGithubConfig(config, job.data.connectionId, abortController);
return await compileGithubConfig(config, job.data.connectionId, abortController.signal);
}
case 'gitlab': {
return await compileGitlabConfig(config, job.data.connectionId);
Expand All @@ -200,7 +223,7 @@ export class ConnectionManager {
}
}
})();

let { repoData, warnings } = result;

await this.db.connectionSyncJob.update({
Expand Down Expand Up @@ -383,6 +406,33 @@ export class ConnectionManager {
});
});

private onJobGracefulTimeout = async (job: Job<JobPayload>) =>
groupmqLifecycleExceptionWrapper('onJobGracefulTimeout', logger, async () => {
const logger = createJobLogger(job.id);

const { connection } = await this.db.connectionSyncJob.update({
where: { id: job.id },
data: {
status: ConnectionSyncJobStatus.FAILED,
completedAt: new Date(),
errorMessage: 'Job timed out',
},
select: {
connection: true,
}
});

this.promClient.activeConnectionSyncJobs.dec({ connection: connection.name });
this.promClient.connectionSyncJobFailTotal.inc({ connection: connection.name });

logger.error(`Job ${job.id} timed out for connection ${connection.name} (id: ${connection.id})`);

captureEvent('backend_connection_sync_job_failed', {
connectionId: connection.id,
error: 'Job timed out',
});
});

private async onWorkerError(error: Error) {
Sentry.captureException(error);
logger.error(`Connection syncer worker error.`, error);
Expand All @@ -392,8 +442,28 @@ export class ConnectionManager {
if (this.interval) {
clearInterval(this.interval);
}
await this.worker.close();
await this.queue.close();

const inProgressJobs = this.worker.getCurrentJobs();
await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS);

// Manually release group locks for in progress jobs to prevent deadlocks.
// @see: https://github.com/Openpanel-dev/groupmq/issues/8
for (const { job } of inProgressJobs) {
const lockKey = `groupmq:${QUEUE_NAME}:lock:${job.groupId}`;
logger.debug(`Releasing group lock ${lockKey} for in progress job ${job.id}`);
try {
await this.redis.del(lockKey);
} catch (error) {
Sentry.captureException(error);
logger.error(`Failed to release group lock ${lockKey} for in progress job ${job.id}. Error: `, error);
}
}

// @note: As of groupmq v1.0.0, queue.close() will just close the underlying
// redis connection. Since we share the same redis client between, skip this
// step and close the redis client directly in index.ts.
// @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900
// await this.queue.close();
}
}

22 changes: 21 additions & 1 deletion packages/backend/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,24 @@ export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES: CodeHostType[] = [
];

export const REPOS_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'repos');
export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index');
export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index');

// Maximum time to wait for current job to finish
export const GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS = 5 * 1000; // 5 seconds

// List of shutdown signals
export const SHUTDOWN_SIGNALS: string[] = [
'SIGHUP',
'SIGINT',
'SIGQUIT',
'SIGILL',
'SIGTRAP',
'SIGABRT',
'SIGBUS',
'SIGFPE',
'SIGSEGV',
'SIGUSR2',
'SIGTERM',
// @note: SIGKILL and SIGSTOP cannot have listeners installed.
// @see: https://nodejs.org/api/process.html#signal-events
];
95 changes: 57 additions & 38 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import "./instrument.js";

import * as Sentry from "@sentry/node";
import { PrismaClient } from "@sourcebot/db";
import { createLogger, env, getConfigSettings, getDBConnectionString, hasEntitlement } from "@sourcebot/shared";
import 'express-async-errors';
Expand All @@ -9,14 +10,15 @@ import { Redis } from 'ioredis';
import { Api } from "./api.js";
import { ConfigManager } from "./configManager.js";
import { ConnectionManager } from './connectionManager.js';
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js';
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR, SHUTDOWN_SIGNALS } from './constants.js';
import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js";
import { GithubAppManager } from "./ee/githubAppManager.js";
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
import { shutdownPosthog } from "./posthog.js";
import { PromClient } from './promClient.js';
import { RepoIndexManager } from "./repoIndexManager.js";


const logger = createLogger('backend-entrypoint');

const reposPath = REPOS_CACHE_DIR;
Expand Down Expand Up @@ -83,45 +85,62 @@ const api = new Api(

logger.info('Worker started.');

const cleanup = async (signal: string) => {
logger.info(`Received ${signal}, cleaning up...`);

const shutdownTimeout = 30000; // 30 seconds

try {
await Promise.race([
Promise.all([
repoIndexManager.dispose(),
connectionManager.dispose(),
repoPermissionSyncer.dispose(),
accountPermissionSyncer.dispose(),
configManager.dispose(),
]),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout)
)
]);
logger.info('All workers shut down gracefully');
} catch (error) {
logger.warn('Shutdown timeout or error, forcing exit:', error instanceof Error ? error.message : String(error));


const listenToShutdownSignals = () => {
const signals = SHUTDOWN_SIGNALS;

let receivedSignal = false;

const cleanup = async (signal: string) => {
try {
if (receivedSignal) {
logger.debug(`Recieved repeat signal ${signal}, ignoring.`);
return;
}
receivedSignal = true;

logger.info(`Received ${signal}, cleaning up...`);

await repoIndexManager.dispose()
await connectionManager.dispose()
await repoPermissionSyncer.dispose()
await accountPermissionSyncer.dispose()
await configManager.dispose()

await prisma.$disconnect();
await redis.quit();
await api.dispose();
await shutdownPosthog();


logger.info('All workers shut down gracefully');

signals.forEach(sig => process.removeListener(sig, cleanup));
process.kill(process.pid, signal);
} catch (error) {
Sentry.captureException(error);
logger.error('Error shutting down worker:', error);
process.exit(1);
}
}

await prisma.$disconnect();
await redis.quit();
await api.dispose();
await shutdownPosthog();
}
signals.forEach(signal => {
process.on(signal, cleanup);
});

process.on('SIGINT', () => cleanup('SIGINT').finally(() => process.exit(0)));
process.on('SIGTERM', () => cleanup('SIGTERM').finally(() => process.exit(0)));
// Register handlers for uncaught exceptions and unhandled rejections
process.on('uncaughtException', (err) => {
logger.error(`Uncaught exception: ${err.message}`);
cleanup('uncaughtException').finally(() => process.exit(1));
});

// Register handlers for uncaught exceptions and unhandled rejections
process.on('uncaughtException', (err) => {
logger.error(`Uncaught exception: ${err.message}`);
cleanup('uncaughtException').finally(() => process.exit(1));
});
process.on('unhandledRejection', (reason, promise) => {
logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
cleanup('unhandledRejection').finally(() => process.exit(1));
});

process.on('unhandledRejection', (reason, promise) => {
logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
cleanup('unhandledRejection').finally(() => process.exit(1));
});

}

listenToShutdownSignals();
4 changes: 2 additions & 2 deletions packages/backend/src/repoCompileUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type CompileResult = {
export const compileGithubConfig = async (
config: GithubConnectionConfig,
connectionId: number,
abortController: AbortController): Promise<CompileResult> => {
const gitHubReposResult = await getGitHubReposFromConfig(config, abortController.signal);
signal: AbortSignal): Promise<CompileResult> => {
const gitHubReposResult = await getGitHubReposFromConfig(config, signal);
const gitHubRepos = gitHubReposResult.repos;
const warnings = gitHubReposResult.warnings;

Expand Down
Loading
Loading