Skip to content

Commit 6f64d5b

Browse files
fix(worker): Run setInterval as blocking (#607)
1 parent 1be6e88 commit 6f64d5b

File tree

6 files changed

+34
-8
lines changed

6 files changed

+34
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Fixed
1111
- Fixed incorrect shutdown of PostHog SDK in the worker. [#609](https://github.com/sourcebot-dev/sourcebot/pull/609)
12+
- Fixed race condition in job schedulers. [#607](https://github.com/sourcebot-dev/sourcebot/pull/607)
1213

1314
## [4.9.1] - 2025-11-07
1415

packages/backend/src/connectionManager.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { Job, Queue, ReservedJob, Worker } from "groupmq";
77
import { Redis } from 'ioredis';
88
import { compileAzureDevOpsConfig, compileBitbucketConfig, compileGenericGitHostConfig, compileGerritConfig, compileGiteaConfig, compileGithubConfig, compileGitlabConfig } from "./repoCompileUtils.js";
99
import { Settings } from "./types.js";
10-
import { groupmqLifecycleExceptionWrapper } from "./utils.js";
10+
import { groupmqLifecycleExceptionWrapper, setIntervalAsync } from "./utils.js";
1111
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
1212
import { captureEvent } from "./posthog.js";
1313
import { PromClient } from "./promClient.js";
@@ -66,7 +66,7 @@ export class ConnectionManager {
6666

6767
public startScheduler() {
6868
logger.debug('Starting scheduler');
69-
this.interval = setInterval(async () => {
69+
this.interval = setIntervalAsync(async () => {
7070
const thresholdDate = new Date(Date.now() - this.settings.resyncConnectionIntervalMs);
7171
const timeoutDate = new Date(Date.now() - JOB_TIMEOUT_MS);
7272

packages/backend/src/ee/accountPermissionSyncer.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES } from "../constants.js";
77
import { createOctokitFromToken, getReposForAuthenticatedUser } from "../github.js";
88
import { createGitLabFromOAuthToken, getProjectsForAuthenticatedUser } from "../gitlab.js";
99
import { Settings } from "../types.js";
10+
import { setIntervalAsync } from "../utils.js";
1011

1112
const LOG_TAG = 'user-permission-syncer';
1213
const logger = createLogger(LOG_TAG);
@@ -46,7 +47,7 @@ export class AccountPermissionSyncer {
4647

4748
logger.debug('Starting scheduler');
4849

49-
this.interval = setInterval(async () => {
50+
this.interval = setIntervalAsync(async () => {
5051
const thresholdDate = new Date(Date.now() - this.settings.experiment_userDrivenPermissionSyncIntervalMs);
5152

5253
const accounts = await this.db.account.findMany({

packages/backend/src/ee/repoPermissionSyncer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES } from "../constants.js";
88
import { createOctokitFromToken, getRepoCollaborators, GITHUB_CLOUD_HOSTNAME } from "../github.js";
99
import { createGitLabFromPersonalAccessToken, getProjectMembers } from "../gitlab.js";
1010
import { Settings } from "../types.js";
11-
import { getAuthCredentialsForRepo } from "../utils.js";
11+
import { getAuthCredentialsForRepo, setIntervalAsync } from "../utils.js";
1212

1313
type RepoPermissionSyncJob = {
1414
jobId: string;
@@ -48,7 +48,7 @@ export class RepoPermissionSyncer {
4848

4949
logger.debug('Starting scheduler');
5050

51-
this.interval = setInterval(async () => {
51+
this.interval = setIntervalAsync(async () => {
5252
// @todo: make this configurable
5353
const thresholdDate = new Date(Date.now() - this.settings.experiment_repoDrivenPermissionSyncIntervalMs);
5454

packages/backend/src/repoIndexManager.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { cloneRepository, fetchRepository, getBranches, getCommitHashForRefName,
1212
import { captureEvent } from './posthog.js';
1313
import { PromClient } from './promClient.js';
1414
import { RepoWithConnections, Settings } from "./types.js";
15-
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure } from './utils.js';
15+
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure, setIntervalAsync } from './utils.js';
1616
import { indexGitRepository } from './zoekt.js';
1717

1818
const LOG_TAG = 'repo-index-manager';
@@ -72,9 +72,9 @@ export class RepoIndexManager {
7272
this.worker.on('error', this.onWorkerError.bind(this));
7373
}
7474

75-
public async startScheduler() {
75+
public startScheduler() {
7676
logger.debug('Starting scheduler');
77-
this.interval = setInterval(async () => {
77+
this.interval = setIntervalAsync(async () => {
7878
await this.scheduleIndexJobs();
7979
await this.scheduleCleanupJobs();
8080
}, this.settings.reindexRepoPollingIntervalMs);

packages/backend/src/utils.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,3 +268,27 @@ export const groupmqLifecycleExceptionWrapper = async (name: string, logger: Log
268268
}
269269
}
270270

271+
272+
// setInterval wrapper that ensures async callbacks are not executed concurrently.
273+
// @see: https://mottaquikarim.github.io/dev/posts/setinterval-that-blocks-on-await/
274+
export const setIntervalAsync = (target: () => Promise<void>, pollingIntervalMs: number): NodeJS.Timeout => {
275+
const setIntervalWithPromise = <T extends (...args: any[]) => Promise<any>>(
276+
target: T
277+
): (...args: Parameters<T>) => Promise<void> => {
278+
return async function (...args: Parameters<T>): Promise<void> {
279+
if ((target as any).isRunning) return;
280+
281+
(target as any).isRunning = true;
282+
try {
283+
await target(...args);
284+
} finally {
285+
(target as any).isRunning = false;
286+
}
287+
};
288+
}
289+
290+
return setInterval(
291+
setIntervalWithPromise(target),
292+
pollingIntervalMs
293+
);
294+
}

0 commit comments

Comments
 (0)