Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Fixed incorrect shutdown of PostHog SDK in the worker. [#609](https://github.com/sourcebot-dev/sourcebot/pull/609)
- Fixed race condition in job schedulers. [#607](https://github.com/sourcebot-dev/sourcebot/pull/607)
- Fixed connection sync jobs getting stuck in pending or in progress after restarting the worker. [#612](https://github.com/sourcebot-dev/sourcebot/pull/612)

### Added
- Added force resync buttons for connections and repositories. [#610](https://github.com/sourcebot-dev/sourcebot/pull/610)
Expand Down
4 changes: 2 additions & 2 deletions packages/backend/src/configManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ export class ConfigManager {
});

if (connectionNeedsSyncing) {
const [jobId] = await this.connectionManager.createJobs([connection]);
logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Created sync job ${jobId}.`);
logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Creating sync job.`);
await this.connectionManager.createJobs([connection]);
}
}
}
Expand Down
84 changes: 77 additions & 7 deletions packages/backend/src/connectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import { groupmqLifecycleExceptionWrapper, setIntervalAsync } from "./utils.js";
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
import { captureEvent } from "./posthog.js";
import { PromClient } from "./promClient.js";
import { GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS } from "./constants.js";

const LOG_TAG = 'connection-manager';
const logger = createLogger(LOG_TAG);
const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`);
const QUEUE_NAME = 'connection-sync-queue';

type JobPayload = {
jobId: string,
Expand All @@ -30,19 +32,19 @@ type JobResult = {
const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 2; // 2 hour timeout

export class ConnectionManager {
private worker: Worker;
private worker: Worker<JobPayload>;
private queue: Queue<JobPayload>;
private interval?: NodeJS.Timeout;

constructor(
private db: PrismaClient,
private settings: Settings,
redis: Redis,
private redis: Redis,
private promClient: PromClient,
) {
this.queue = new Queue<JobPayload>({
redis,
namespace: 'connection-sync-queue',
namespace: QUEUE_NAME,
jobTimeoutMs: JOB_TIMEOUT_MS,
maxAttempts: 3,
logger: env.DEBUG_ENABLE_GROUPMQ_LOGGING === 'true',
Expand All @@ -62,6 +64,10 @@ export class ConnectionManager {
this.worker.on('failed', this.onJobFailed.bind(this));
this.worker.on('stalled', this.onJobStalled.bind(this));
this.worker.on('error', this.onWorkerError.bind(this));
// graceful-timeout is triggered when a job is still processing after
// worker.close() is called and the timeout period has elapsed. In this case,
// we fail the job with no retry.
this.worker.on('graceful-timeout', this.onJobGracefulTimeout.bind(this));
}

public startScheduler() {
Expand Down Expand Up @@ -128,6 +134,7 @@ export class ConnectionManager {
});

for (const job of jobs) {
logger.info(`Scheduling job ${job.id} for connection ${job.connection.name} (id: ${job.connectionId})`);
await this.queue.add({
groupId: `connection:${job.connectionId}`,
data: {
Expand All @@ -150,6 +157,22 @@ export class ConnectionManager {
const logger = createJobLogger(jobId);
logger.info(`Running connection sync job ${jobId} for connection ${connectionName} (id: ${job.data.connectionId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);

const currentStatus = await this.db.connectionSyncJob.findUniqueOrThrow({
where: {
id: jobId,
},
select: {
status: true,
}
});

// Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job
// is in an invalid state and should be skipped.
if (currentStatus.status !== ConnectionSyncJobStatus.PENDING && currentStatus.status !== ConnectionSyncJobStatus.IN_PROGRESS) {
throw new Error(`Job ${jobId} is not in a valid state. Expected: ${ConnectionSyncJobStatus.PENDING} or ${ConnectionSyncJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`);
}


this.promClient.pendingConnectionSyncJobs.dec({ connection: connectionName });
this.promClient.activeConnectionSyncJobs.inc({ connection: connectionName });

Expand Down Expand Up @@ -178,7 +201,7 @@ export class ConnectionManager {
const result = await (async () => {
switch (config.type) {
case 'github': {
return await compileGithubConfig(config, job.data.connectionId, abortController);
return await compileGithubConfig(config, job.data.connectionId, abortController.signal);
}
case 'gitlab': {
return await compileGitlabConfig(config, job.data.connectionId);
Expand All @@ -200,7 +223,7 @@ export class ConnectionManager {
}
}
})();

let { repoData, warnings } = result;

await this.db.connectionSyncJob.update({
Expand Down Expand Up @@ -383,6 +406,33 @@ export class ConnectionManager {
});
});

private onJobGracefulTimeout = async (job: Job<JobPayload>) =>
groupmqLifecycleExceptionWrapper('onJobGracefulTimeout', logger, async () => {
const logger = createJobLogger(job.id);

const { connection } = await this.db.connectionSyncJob.update({
where: { id: job.id },
data: {
status: ConnectionSyncJobStatus.FAILED,
completedAt: new Date(),
errorMessage: 'Job timed out',
},
select: {
connection: true,
}
});

this.promClient.activeConnectionSyncJobs.dec({ connection: connection.name });
this.promClient.connectionSyncJobFailTotal.inc({ connection: connection.name });

logger.error(`Job ${job.id} timed out for connection ${connection.name} (id: ${connection.id})`);

captureEvent('backend_connection_sync_job_failed', {
connectionId: connection.id,
error: 'Job timed out',
});
});

private async onWorkerError(error: Error) {
Sentry.captureException(error);
logger.error(`Connection syncer worker error.`, error);
Expand All @@ -392,8 +442,28 @@ export class ConnectionManager {
if (this.interval) {
clearInterval(this.interval);
}
await this.worker.close();
await this.queue.close();

const inProgressJobs = this.worker.getCurrentJobs();
await this.worker.close(GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS);

// Manually release group locks for in progress jobs to prevent deadlocks.
// @see: https://github.com/Openpanel-dev/groupmq/issues/8
for (const { job } of inProgressJobs) {
const lockKey = `groupmq:${QUEUE_NAME}:lock:${job.groupId}`;
logger.debug(`Releasing group lock ${lockKey} for in progress job ${job.id}`);
try {
await this.redis.del(lockKey);
} catch (error) {
Sentry.captureException(error);
logger.error(`Failed to release group lock ${lockKey} for in progress job ${job.id}. Error: `, error);
}
}

// @note: As of groupmq v1.0.0, queue.close() will just close the underlying
// redis connection. Since we share the same redis client between, skip this
// step and close the redis client directly in index.ts.
// @see: https://github.com/Openpanel-dev/groupmq/blob/main/src/queue.ts#L1900
// await this.queue.close();
}
}

22 changes: 21 additions & 1 deletion packages/backend/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,24 @@ export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES: CodeHostType[] = [
];

export const REPOS_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'repos');
export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index');
export const INDEX_CACHE_DIR = path.join(env.DATA_CACHE_DIR, 'index');

// Maximum time to wait for current job to finish
export const GROUPMQ_WORKER_STOP_GRACEFUL_TIMEOUT_MS = 5 * 1000; // 5 seconds

// List of shutdown signals
export const SHUTDOWN_SIGNALS: string[] = [
'SIGHUP',
'SIGINT',
'SIGQUIT',
'SIGILL',
'SIGTRAP',
'SIGABRT',
'SIGBUS',
'SIGFPE',
'SIGSEGV',
'SIGUSR2',
'SIGTERM',
// @note: SIGKILL and SIGSTOP cannot have listeners installed.
// @see: https://nodejs.org/api/process.html#signal-events
];
109 changes: 66 additions & 43 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import "./instrument.js";

import * as Sentry from "@sentry/node";
import { PrismaClient } from "@sourcebot/db";
import { createLogger, env, getConfigSettings, getDBConnectionString, hasEntitlement } from "@sourcebot/shared";
import 'express-async-errors';
Expand All @@ -9,14 +10,15 @@ import { Redis } from 'ioredis';
import { Api } from "./api.js";
import { ConfigManager } from "./configManager.js";
import { ConnectionManager } from './connectionManager.js';
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js';
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR, SHUTDOWN_SIGNALS } from './constants.js';
import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js";
import { GithubAppManager } from "./ee/githubAppManager.js";
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
import { shutdownPosthog } from "./posthog.js";
import { PromClient } from './promClient.js';
import { RepoIndexManager } from "./repoIndexManager.js";


const logger = createLogger('backend-entrypoint');

const reposPath = REPOS_CACHE_DIR;
Expand All @@ -40,13 +42,14 @@ const prisma = new PrismaClient({
const redis = new Redis(env.REDIS_URL, {
maxRetriesPerRequest: null
});
redis.ping().then(() => {

try {
await redis.ping();
logger.info('Connected to redis');
}).catch((err: unknown) => {
logger.error('Failed to connect to redis');
logger.error(err);
} catch (err: unknown) {
logger.error('Failed to connect to redis. Error:', err);
process.exit(1);
});
}

const promClient = new PromClient();

Expand Down Expand Up @@ -83,45 +86,65 @@ const api = new Api(

logger.info('Worker started.');

const cleanup = async (signal: string) => {
logger.info(`Received ${signal}, cleaning up...`);

const shutdownTimeout = 30000; // 30 seconds

try {
await Promise.race([
Promise.all([
repoIndexManager.dispose(),
connectionManager.dispose(),
repoPermissionSyncer.dispose(),
accountPermissionSyncer.dispose(),
configManager.dispose(),
]),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout)
)
]);
logger.info('All workers shut down gracefully');
} catch (error) {
logger.warn('Shutdown timeout or error, forcing exit:', error instanceof Error ? error.message : String(error));
const listenToShutdownSignals = () => {
const signals = SHUTDOWN_SIGNALS;

let receivedSignal = false;

const cleanup = async (signal: string) => {
try {
if (receivedSignal) {
logger.debug(`Recieved repeat signal ${signal}, ignoring.`);
return;
}
receivedSignal = true;

logger.info(`Received ${signal}, cleaning up...`);

await repoIndexManager.dispose()
await connectionManager.dispose()
await repoPermissionSyncer.dispose()
await accountPermissionSyncer.dispose()
await configManager.dispose()

await prisma.$disconnect();
await redis.quit();
await api.dispose();
await shutdownPosthog();


logger.info('All workers shut down gracefully');
signals.forEach(sig => process.removeListener(sig, cleanup));
} catch (error) {
Sentry.captureException(error);
logger.error('Error shutting down worker:', error);
}
}

await prisma.$disconnect();
await redis.quit();
await api.dispose();
await shutdownPosthog();
}
signals.forEach(signal => {
process.on(signal, (err) => {
cleanup(err).finally(() => {
process.kill(process.pid, signal);
});
});
});

// Register handlers for uncaught exceptions and unhandled rejections
process.on('uncaughtException', (err) => {
logger.error(`Uncaught exception: ${err.message}`);
cleanup('uncaughtException').finally(() => {
process.exit(1);
});
});

process.on('unhandledRejection', (reason, promise) => {
logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
cleanup('unhandledRejection').finally(() => {
process.exit(1);
});
});

process.on('SIGINT', () => cleanup('SIGINT').finally(() => process.exit(0)));
process.on('SIGTERM', () => cleanup('SIGTERM').finally(() => process.exit(0)));

// Register handlers for uncaught exceptions and unhandled rejections
process.on('uncaughtException', (err) => {
logger.error(`Uncaught exception: ${err.message}`);
cleanup('uncaughtException').finally(() => process.exit(1));
});
}

process.on('unhandledRejection', (reason, promise) => {
logger.error(`Unhandled rejection at: ${promise}, reason: ${reason}`);
cleanup('unhandledRejection').finally(() => process.exit(1));
});
listenToShutdownSignals();
4 changes: 2 additions & 2 deletions packages/backend/src/repoCompileUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type CompileResult = {
export const compileGithubConfig = async (
config: GithubConnectionConfig,
connectionId: number,
abortController: AbortController): Promise<CompileResult> => {
const gitHubReposResult = await getGitHubReposFromConfig(config, abortController.signal);
signal: AbortSignal): Promise<CompileResult> => {
const gitHubReposResult = await getGitHubReposFromConfig(config, signal);
const gitHubRepos = gitHubReposResult.repos;
const warnings = gitHubReposResult.warnings;

Expand Down
Loading
Loading