Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed incorrect shutdown of PostHog SDK in the worker. [#609](https://github.com/sourcebot-dev/sourcebot/pull/609)
- Fixed race condition in job schedulers. [#607](https://github.com/sourcebot-dev/sourcebot/pull/607)

### Added
- Added force resync buttons for connections and repositories. [#610](https://github.com/sourcebot-dev/sourcebot/pull/610)

## [4.9.1] - 2025-11-07

### Added
Expand Down
1 change: 1 addition & 0 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"cross-fetch": "^4.0.0",
"dotenv": "^16.4.5",
"express": "^4.21.2",
"express-async-errors": "^3.1.1",
"git-url-parse": "^16.1.0",
"gitea-js": "^1.22.0",
"glob": "^11.0.0",
Expand Down
103 changes: 103 additions & 0 deletions packages/backend/src/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { PrismaClient, RepoIndexingJobType } from '@sourcebot/db';
import { createLogger } from '@sourcebot/shared';
import express, { Request, Response } from 'express';
import 'express-async-errors';
import * as http from "http";
import z from 'zod';
import { ConnectionManager } from './connectionManager.js';
import { PromClient } from './promClient.js';
import { RepoIndexManager } from './repoIndexManager.js';

const logger = createLogger('api');
const PORT = 3060;

export class Api {
private server: http.Server;

constructor(
promClient: PromClient,
private prisma: PrismaClient,
private connectionManager: ConnectionManager,
private repoIndexManager: RepoIndexManager,
) {
const app = express();
app.use(express.json());
app.use(express.urlencoded({ extended: true }));

// Prometheus metrics endpoint
app.use('/metrics', async (_req: Request, res: Response) => {
res.set('Content-Type', promClient.registry.contentType);
const metrics = await promClient.registry.metrics();
res.end(metrics);
});

app.post('/api/sync-connection', this.syncConnection.bind(this));
app.post('/api/index-repo', this.indexRepo.bind(this));

this.server = app.listen(PORT, () => {
logger.info(`API server is running on port ${PORT}`);
});
}

private async syncConnection(req: Request, res: Response) {
const schema = z.object({
connectionId: z.number(),
}).strict();

const parsed = schema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({ error: parsed.error.message });
return;
}

const { connectionId } = parsed.data;
const connection = await this.prisma.connection.findUnique({
where: {
id: connectionId,
}
});

if (!connection) {
res.status(404).json({ error: 'Connection not found' });
return;
}

const [jobId] = await this.connectionManager.createJobs([connection]);

res.status(200).json({ jobId });
}

private async indexRepo(req: Request, res: Response) {
const schema = z.object({
repoId: z.number(),
}).strict();

const parsed = schema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({ error: parsed.error.message });
return;
}

const { repoId } = parsed.data;
const repo = await this.prisma.repo.findUnique({
where: { id: repoId },
});

if (!repo) {
res.status(404).json({ error: 'Repo not found' });
return;
}

const [jobId] = await this.repoIndexManager.createJobs([repo], RepoIndexingJobType.INDEX);
res.status(200).json({ jobId });
}

public async dispose() {
return new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err) reject(err);
else resolve(undefined);
});
});
}
}
19 changes: 13 additions & 6 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import "./instrument.js";

import { PrismaClient } from "@sourcebot/db";
import { createLogger } from "@sourcebot/shared";
import { env, getConfigSettings, hasEntitlement, getDBConnectionString } from '@sourcebot/shared';
import { createLogger, env, getConfigSettings, getDBConnectionString, hasEntitlement } from "@sourcebot/shared";
import 'express-async-errors';
import { existsSync } from 'fs';
import { mkdir } from 'fs/promises';
import { Redis } from 'ioredis';
import { Api } from "./api.js";
import { ConfigManager } from "./configManager.js";
import { ConnectionManager } from './connectionManager.js';
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js';
import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js";
import { GithubAppManager } from "./ee/githubAppManager.js";
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js";
import { shutdownPosthog } from "./posthog.js";
import { PromClient } from './promClient.js';
import { RepoIndexManager } from "./repoIndexManager.js";
import { shutdownPosthog } from "./posthog.js";


const logger = createLogger('backend-entrypoint');

Expand Down Expand Up @@ -74,6 +74,13 @@ else if (env.EXPERIMENT_EE_PERMISSION_SYNC_ENABLED === 'true' && hasEntitlement(
accountPermissionSyncer.startScheduler();
}

const api = new Api(
promClient,
prisma,
connectionManager,
repoIndexManager,
);

logger.info('Worker started.');

const cleanup = async (signal: string) => {
Expand All @@ -88,7 +95,6 @@ const cleanup = async (signal: string) => {
connectionManager.dispose(),
repoPermissionSyncer.dispose(),
accountPermissionSyncer.dispose(),
promClient.dispose(),
configManager.dispose(),
]),
new Promise((_, reject) =>
Expand All @@ -102,6 +108,7 @@ const cleanup = async (signal: string) => {

await prisma.$disconnect();
await redis.quit();
await api.dispose();
await shutdownPosthog();
}

Expand Down
33 changes: 1 addition & 32 deletions packages/backend/src/promClient.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
import express, { Request, Response } from 'express';
import { Server } from 'http';
import client, { Registry, Counter, Gauge } from 'prom-client';
import { createLogger } from "@sourcebot/shared";

const logger = createLogger('prometheus-client');

export class PromClient {
private registry: Registry;
private app: express.Application;
private server: Server;
public registry: Registry;

public activeRepoIndexJobs: Gauge<string>;
public pendingRepoIndexJobs: Gauge<string>;
Expand All @@ -22,8 +14,6 @@ export class PromClient {
public connectionSyncJobFailTotal: Counter<string>;
public connectionSyncJobSuccessTotal: Counter<string>;

public readonly PORT = 3060;

constructor() {
this.registry = new Registry();

Expand Down Expand Up @@ -100,26 +90,5 @@ export class PromClient {
client.collectDefaultMetrics({
register: this.registry,
});

this.app = express();
this.app.get('/metrics', async (req: Request, res: Response) => {
res.set('Content-Type', this.registry.contentType);

const metrics = await this.registry.metrics();
res.end(metrics);
});

this.server = this.app.listen(this.PORT, () => {
logger.info(`Prometheus metrics server is running on port ${this.PORT}`);
});
}

async dispose() {
return new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err) reject(err);
else resolve();
});
});
}
}
4 changes: 3 additions & 1 deletion packages/backend/src/repoIndexManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ export class RepoIndexManager {
}
}

private async createJobs(repos: Repo[], type: RepoIndexingJobType) {
public async createJobs(repos: Repo[], type: RepoIndexingJobType) {
// @note: we don't perform this in a transaction because
// we want to avoid the situation where a job is created and run
// prior to the transaction being committed.
Expand Down Expand Up @@ -221,6 +221,8 @@ export class RepoIndexManager {
const jobTypeLabel = getJobTypePrometheusLabel(type);
this.promClient.pendingRepoIndexJobs.inc({ repo: job.repo.name, type: jobTypeLabel });
}

return jobs.map(job => job.id);
}

private async runJob(job: ReservedJob<JobPayload>) {
Expand Down
14 changes: 12 additions & 2 deletions packages/web/src/app/[domain]/repos/[id]/page.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { sew } from "@/actions"
import { getCurrentUserRole, sew } from "@/actions"
import { Badge } from "@/components/ui/badge"
import { Button } from "@/components/ui/button"
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"
Expand All @@ -19,6 +19,7 @@ import { BackButton } from "../../components/backButton"
import { DisplayDate } from "../../components/DisplayDate"
import { RepoBranchesTable } from "../components/repoBranchesTable"
import { RepoJobsTable } from "../components/repoJobsTable"
import { OrgRole } from "@sourcebot/db"

export default async function RepoDetailPage({ params }: { params: Promise<{ id: string }> }) {
const { id } = await params
Expand Down Expand Up @@ -51,6 +52,11 @@ export default async function RepoDetailPage({ params }: { params: Promise<{ id:

const repoMetadata = repoMetadataSchema.parse(repo.metadata);

const userRole = await getCurrentUserRole(SINGLE_TENANT_ORG_DOMAIN);
if (isServiceError(userRole)) {
throw new ServiceErrorException(userRole);
}

return (
<>
<div className="mb-6">
Expand Down Expand Up @@ -172,7 +178,11 @@ export default async function RepoDetailPage({ params }: { params: Promise<{ id:
</CardHeader>
<CardContent>
<Suspense fallback={<Skeleton className="h-96 w-full" />}>
<RepoJobsTable data={repo.jobs} />
<RepoJobsTable
data={repo.jobs}
repoId={repo.id}
isIndexButtonVisible={userRole === OrgRole.OWNER}
/>
</Suspense>
</CardContent>
</Card>
Expand Down
Loading
Loading