diff --git a/backend/src/api/integration/helpers/githubOrgRepos.ts b/backend/src/api/integration/helpers/githubOrgRepos.ts index 24a1942822..2f9a1be2e0 100644 --- a/backend/src/api/integration/helpers/githubOrgRepos.ts +++ b/backend/src/api/integration/helpers/githubOrgRepos.ts @@ -1,5 +1,6 @@ +import { GithubIntegrationService } from '@crowd/common_services' + import Permissions from '@/security/permissions' -import GithubIntegrationService from '@/services/githubIntegrationService' import PermissionChecker from '@/services/user/permissionChecker' export default async (req, res) => { diff --git a/backend/src/api/integration/helpers/githubSearchOrgs.ts b/backend/src/api/integration/helpers/githubSearchOrgs.ts index bd5b2a1678..a6527bcd2f 100644 --- a/backend/src/api/integration/helpers/githubSearchOrgs.ts +++ b/backend/src/api/integration/helpers/githubSearchOrgs.ts @@ -1,5 +1,6 @@ +import { GithubIntegrationService } from '@crowd/common_services' + import Permissions from '@/security/permissions' -import GithubIntegrationService from '@/services/githubIntegrationService' import PermissionChecker from '@/services/user/permissionChecker' export default async (req, res) => { diff --git a/backend/src/api/integration/helpers/githubSearchRepos.ts b/backend/src/api/integration/helpers/githubSearchRepos.ts index 943a0d0388..f4725b906a 100644 --- a/backend/src/api/integration/helpers/githubSearchRepos.ts +++ b/backend/src/api/integration/helpers/githubSearchRepos.ts @@ -1,11 +1,12 @@ +import { GithubIntegrationService } from '@crowd/common_services' + import Permissions from '@/security/permissions' -import GithubIntegrationService from '@/services/githubIntegrationService' import PermissionChecker from '@/services/user/permissionChecker' export default async (req, res) => { new PermissionChecker(req).validateHas(Permissions.values.integrationEdit) - const payload = await new GithubIntegrationService(req).findGithubRepos( + const payload = await new GithubIntegrationService(req.log).findGithubRepos( req.query.query, req.query.limit, req.query.offset, diff --git a/backend/src/bin/jobs/index.ts b/backend/src/bin/jobs/index.ts index 20912e0edb..ed97533441 100644 --- a/backend/src/bin/jobs/index.ts +++ b/backend/src/bin/jobs/index.ts @@ -4,7 +4,7 @@ import autoImportGroups from './autoImportGroupsioGroups' import checkStuckIntegrationRuns from './checkStuckIntegrationRuns' import cleanUp from './cleanUp' import integrationTicks from './integrationTicks' -// import refreshGithubRepoSettingsJob from './refreshGithubRepoSettings' +import refreshGithubRepoSettingsJob from './refreshGithubRepoSettings' import refreshGitlabToken from './refreshGitlabToken' import refreshGroupsioToken from './refreshGroupsioToken' import refreshMaterializedViews from './refreshMaterializedViews' @@ -16,7 +16,7 @@ const jobs: CrowdJob[] = [ checkStuckIntegrationRuns, refreshGroupsioToken, refreshGitlabToken, - // refreshGithubRepoSettingsJob, + refreshGithubRepoSettingsJob, autoImportGroups, ] diff --git a/backend/src/bin/jobs/refreshGithubRepoSettings.ts b/backend/src/bin/jobs/refreshGithubRepoSettings.ts index e235347041..20a385c56e 100644 --- a/backend/src/bin/jobs/refreshGithubRepoSettings.ts +++ b/backend/src/bin/jobs/refreshGithubRepoSettings.ts @@ -1,7 +1,7 @@ /* eslint-disable no-continue */ import cronGenerator from 'cron-time-generator' -import { timeout } from '@crowd/common' +import { IS_DEV_ENV, timeout } from '@crowd/common' import { getServiceChildLogger } from '@crowd/logging' import SequelizeRepository from '../../database/repositories/sequelizeRepository' @@ -59,7 +59,7 @@ export const refreshGithubRepoSettings = async () => { const job: CrowdJob = { name: 'Refresh Github repo settings', // every day - cronTime: cronGenerator.every(1).days(), + cronTime: IS_DEV_ENV ? cronGenerator.every(5).minutes() : cronGenerator.every(1).days(), onTrigger: async () => { await refreshGithubRepoSettings() }, diff --git a/backend/src/database/migrations/U1762508771__autobackup-integrations.sql b/backend/src/database/migrations/U1762508771__autobackup-integrations.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/src/database/migrations/V1762508771__autobackup-integrations.sql b/backend/src/database/migrations/V1762508771__autobackup-integrations.sql new file mode 100644 index 0000000000..c9c407df01 --- /dev/null +++ b/backend/src/database/migrations/V1762508771__autobackup-integrations.sql @@ -0,0 +1,29 @@ +create table "integrationsHistory" as +select * +from "integrations" +where 1 = 2; + +alter table "integrationsHistory" + add column "historyCreatedAt" timestamptz not null default now(); + +create index if not exists ix_integration_history_integration_id on "integrationsHistory" ("id"); + +create function log_integrations_changes() + returns trigger as +$$ +begin + insert into "integrationsHistory" select old.*; + + if (tg_op = 'DELETE') then + return old; + else + return new; + end if; +end; +$$ language plpgsql; + +create trigger integrations_audit_trigger + after update or delete + on integrations + for each row +execute function log_integrations_changes(); \ No newline at end of file diff --git a/backend/src/segment/track.ts b/backend/src/segment/track.ts index 40221762f1..53f7087d56 100644 --- a/backend/src/segment/track.ts +++ b/backend/src/segment/track.ts @@ -1,7 +1,7 @@ import { getServiceChildLogger } from '@crowd/logging' import { Edition } from '@crowd/types' -import { API_CONFIG, IS_TEST_ENV, SEGMENT_CONFIG } from '../conf' +import { API_CONFIG, IS_DEV_ENV, IS_TEST_ENV, SEGMENT_CONFIG } from '../conf' import SequelizeRepository from '../database/repositories/sequelizeRepository' import { CROWD_ANALYTICS_PLATORM_NAME } from './addProductDataToCrowdTenant' @@ -21,6 +21,7 @@ export default async function identify( }).email if ( !IS_TEST_ENV && + !IS_DEV_ENV && SEGMENT_CONFIG.writeKey && // This is only for events in the hosted version. Self-hosted has less telemetry. (API_CONFIG.edition === Edition.CROWD_HOSTED || API_CONFIG.edition === Edition.LFX) && @@ -41,6 +42,10 @@ export default async function identify( const { userIdOut, tenantIdOut } = getTenatUser(userId, options) + if (!userIdOut) { + return + } + const payload = { userId: userIdOut, event, diff --git a/backend/src/services/collectionService.ts b/backend/src/services/collectionService.ts index 9492fcf798..0dfb3c7bc8 100644 --- a/backend/src/services/collectionService.ts +++ b/backend/src/services/collectionService.ts @@ -1,6 +1,7 @@ import { uniq } from 'lodash' import { getCleanString } from '@crowd/common' +import { GithubIntegrationService } from '@crowd/common_services' import { OrganizationField, QueryExecutor, findOrgById, queryOrgs } from '@crowd/data-access-layer' import { listCategoriesByIds } from '@crowd/data-access-layer/src/categories' import { @@ -49,7 +50,6 @@ import SequelizeRepository from '@/database/repositories/sequelizeRepository' import { IGithubInsights } from '@/types/githubTypes' import { IServiceOptions } from './IServiceOptions' -import GithubIntegrationService from './githubIntegrationService' export class CollectionService extends LoggerBase { options: IServiceOptions diff --git a/backend/src/services/helpers/githubToken.ts b/backend/src/services/helpers/githubToken.ts deleted file mode 100644 index c4bd951d73..0000000000 --- a/backend/src/services/helpers/githubToken.ts +++ /dev/null @@ -1,44 +0,0 @@ -import axios from 'axios' -import * as jwt from 'jsonwebtoken' - -import { GITHUB_TOKEN_CONFIG } from '@/conf' - -let token: string | undefined -let expiration: Date | undefined - -export const getGithubInstallationToken = async (): Promise => { - if (token && expiration && expiration.getTime() > Date.now()) { - return token - } - - // refresh token - const config = GITHUB_TOKEN_CONFIG - - const now = Math.floor(Date.now() / 1000) - const payload = { - iat: now - 60, - exp: now + 10 * 60, - iss: config.clientId, - } - - const privateKey = Buffer.from(config.privateKey, 'base64').toString('ascii') - - const jwtToken = jwt.sign(payload, privateKey, { algorithm: 'RS256' }) - - const response = await axios.post( - `https://api.github.com/app/installations/${config.installationId}/access_tokens`, - {}, - { - headers: { - Authorization: `Bearer ${jwtToken}`, - Accept: 'application/vnd.github+json', - 'X-GitHub-Api-Version': '2022-11-28', - }, - }, - ) - - token = response.data.token - expiration = new Date(response.data.expires_at) - - return token -} diff --git a/backend/src/services/integrationService.ts b/backend/src/services/integrationService.ts index 17979d5bea..885d42ec7e 100644 --- a/backend/src/services/integrationService.ts +++ b/backend/src/services/integrationService.ts @@ -4,10 +4,11 @@ import { request } from '@octokit/request' import axios, { AxiosRequestConfig, AxiosResponse } from 'axios' import lodash from 'lodash' import moment from 'moment' -import { QueryTypes, Transaction } from 'sequelize' -import { v4 as uuidv4 } from 'uuid' +import { Transaction } from 'sequelize' import { EDITION, Error400, Error404, Error542 } from '@crowd/common' +import { getGithubInstallationToken } from '@crowd/common_services' +import { syncRepositoriesToGitV2 } from '@crowd/data-access-layer' import { ICreateInsightsProject, deleteMissingSegmentRepositories, @@ -68,7 +69,6 @@ import { encryptData } from '../utils/crypto' import { IServiceOptions } from './IServiceOptions' import { CollectionService } from './collectionService' -import { getGithubInstallationToken } from './helpers/githubToken' const discordToken = DISCORD_CONFIG.token || DISCORD_CONFIG.token2 @@ -1340,11 +1340,15 @@ export default class IntegrationService { ) // upsert repositories to git.repositories in order to be processed by git-integration V2 - await this.syncRepositoriesToGitV2( - remotes, - options || this.options, + const qx = SequelizeRepository.getQueryExecutor({ + ...(options || this.options), transaction, + }) + await syncRepositoriesToGitV2( + qx, + remotes, integration.id, + (options || this.options).currentSegments[0].id, ) // Only commit if we created the transaction ourselves @@ -1362,95 +1366,6 @@ export default class IntegrationService { return integration } - /** - * Syncs repositories to git.repositories table (git-integration V2) - * @param remotes Array of repository objects with url and optional forkedFrom - * @param options Repository options - * @param transaction Database transaction - * @param integrationId The integration ID from the git integration - * @param inheritFromExistingRepos If true, queries githubRepos and gitlabRepos for IDs; if false, generates new UUIDs - * - * TODO: @Mouad After migration is complete, simplify this function by: - * 1. Using an object parameter instead of multiple parameters for better maintainability - * 2. Removing the inheritFromExistingRepos parameter since git.repositories will be the source of truth - * 3. Simplifying the logic to only handle git.repositories operations - */ - private async syncRepositoriesToGitV2( - remotes: Array<{ url: string; forkedFrom?: string | null }>, - options: IRepositoryOptions, - transaction: Transaction, - integrationId: string, - ) { - const seq = SequelizeRepository.getSequelize(options) - - let repositoriesToSync: Array<{ - id: string - url: string - integrationId: string - segmentId: string - forkedFrom?: string | null - }> = [] - // check GitHub repos first, fallback to GitLab repos if none found - const existingRepos: Array<{ - id: string - url: string - }> = await seq.query( - ` - WITH github_repos AS ( - SELECT id, url FROM "githubRepos" - WHERE url IN (:urls) AND "deletedAt" IS NULL - ), - gitlab_repos AS ( - SELECT id, url FROM "gitlabRepos" - WHERE url IN (:urls) AND "deletedAt" IS NULL - ) - SELECT id, url FROM github_repos - UNION ALL - SELECT id, url FROM gitlab_repos - WHERE NOT EXISTS (SELECT 1 FROM github_repos) - `, - { - replacements: { - urls: remotes.map((r) => r.url), - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - - // Create a map of url to forkedFrom for quick lookup - const forkedFromMap = new Map(remotes.map((r) => [r.url, r.forkedFrom])) - - repositoriesToSync = existingRepos.map((repo) => ({ - id: repo.id, - url: repo.url, - integrationId, - segmentId: options.currentSegments[0].id, - forkedFrom: forkedFromMap.get(repo.url) || null, - })) - - if (repositoriesToSync.length === 0) { - this.options.log.warn( - 'No existing repos found in githubRepos or gitlabRepos - inserting new to git.repositories with new uuid', - ) - repositoriesToSync = remotes.map((remote) => ({ - id: uuidv4(), // Generate new UUID - url: remote.url, - integrationId, - segmentId: options.currentSegments[0].id, - forkedFrom: remote.forkedFrom || null, - })) - } - - // Sync to git.repositories v2 - await GitReposRepository.upsert(repositoriesToSync, { - ...options, - transaction, - }) - - this.options.log.info(`Synced ${repositoriesToSync.length} repos to git v2`) - } - async atlassianAdminConnect(adminApi: string, organizationId: string) { const nangoPayload = { params: { diff --git a/backend/src/services/memberService.ts b/backend/src/services/memberService.ts index 987d5836b6..1321f47e13 100644 --- a/backend/src/services/memberService.ts +++ b/backend/src/services/memberService.ts @@ -6,7 +6,7 @@ import validator from 'validator' import { captureApiChange, memberUnmergeAction } from '@crowd/audit-logs' import { Error400, calculateReach, getProperDisplayName, isDomainExcluded } from '@crowd/common' -import { CommonMemberService } from '@crowd/common_services' +import { CommonMemberService, getGithubInstallationToken } from '@crowd/common_services' import { findMemberAffiliations } from '@crowd/data-access-layer/src/member_segment_affiliations' import { MemberField, @@ -58,7 +58,6 @@ import { import telemetryTrack from '../segment/telemetryTrack' import { IServiceOptions } from './IServiceOptions' -import { getGithubInstallationToken } from './helpers/githubToken' import MemberAttributeSettingsService from './memberAttributeSettingsService' import MemberOrganizationService from './memberOrganizationService' import OrganizationService from './organizationService' diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5502a87566..205f038d54 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1117,6 +1117,9 @@ importers: '@crowd/common': specifier: workspace:* version: link:../../libs/common + '@crowd/common_services': + specifier: workspace:* + version: link:../../libs/common_services '@crowd/data-access-layer': specifier: workspace:* version: link:../../libs/data-access-layer @@ -1817,6 +1820,9 @@ importers: '@crowd/database': specifier: workspace:* version: link:../database + '@crowd/integrations': + specifier: workspace:* + version: link:../integrations '@crowd/logging': specifier: workspace:* version: link:../logging @@ -1829,6 +1835,18 @@ importers: '@crowd/types': specifier: workspace:* version: link:../types + '@octokit/request': + specifier: ^5.6.3 + version: 5.6.3(encoding@0.1.13) + '@octokit/rest': + specifier: ^22.0.0 + version: 22.0.0 + axios: + specifier: ^1.13.1 + version: 1.13.1 + jsonwebtoken: + specifier: 8.5.1 + version: 8.5.1 lodash.isequal: specifier: ^4.5.0 version: 4.5.0 @@ -5235,6 +5253,9 @@ packages: axios@1.11.0: resolution: {integrity: sha512-1Lx3WLFQWm3ooKDYZD1eXmoGO9fxYQjrycfHFC8P0sCfQVXyROp0p9PFWBehewBOdCwHc+f/b8I0fMto5eSfwA==} + axios@1.13.1: + resolution: {integrity: sha512-hU4EGxxt+j7TQijx1oYdAjw4xuIp1wRQSsbMFwSthCWeBQur1eF+qJ5iQ5sN3Tw8YRzQNKb8jszgBdMDVqwJcw==} + axios@1.6.8: resolution: {integrity: sha512-v/ZHtJDU39mDpyBoFVkETcd/uNdxrWRrg3bKpOKzXFA6Bvqopts6ALSMU3y6ijYxbw2B+wPrIv46egTzJXCLGQ==} @@ -6159,10 +6180,6 @@ packages: resolution: {integrity: sha512-FGgH2h8zKNim9ljj7dankFPcICIK9Cp5bm+c2gQSYePhpaG5+esrLODihIorn+Pe6FGJzWhXQotPv73jTaldXA==} engines: {node: '>= 0.4'} - es-set-tostringtag@2.0.3: - resolution: {integrity: sha512-3T8uNMC3OQTHkFUsFq8r/BwAXLHvU/9O9mE0fBc/MY5iq/8H7ncvO947LmYA6ldWw9Uh8Yhf25zu6n7nML5QWQ==} - engines: {node: '>= 0.4'} - es-set-tostringtag@2.1.0: resolution: {integrity: sha512-j6vWzfrGVfyXxge+O0x5sh6cvxAog0a/4Rdd2K36zCMV5eJ+/+tOAngRO8cODMNWbVRdVlmGZQL2YS3yR8bIUA==} engines: {node: '>= 0.4'} @@ -11205,7 +11222,7 @@ snapshots: '@azure/logger': 1.1.4 '@types/node-fetch': 2.6.12 '@types/tunnel': 0.0.3 - form-data: 4.0.0 + form-data: 4.0.4 node-fetch: 2.7.0(encoding@0.1.13) process: 0.11.10 tslib: 2.6.2 @@ -12981,7 +12998,7 @@ snapshots: '@slack/types': 2.11.0 '@types/is-stream': 1.1.0 '@types/node': 20.12.7 - axios: 1.8.4 + axios: 1.11.0 eventemitter3: 3.1.2 form-data: 2.5.1 is-electron: 2.2.2 @@ -13939,7 +13956,7 @@ snapshots: '@types/node-fetch@2.6.12': dependencies: '@types/node': 20.12.7 - form-data: 4.0.0 + form-data: 4.0.4 '@types/node@18.0.6': {} @@ -14484,7 +14501,7 @@ snapshots: define-properties: 1.2.1 es-abstract: 1.23.3 es-errors: 1.3.0 - get-intrinsic: 1.2.4 + get-intrinsic: 1.3.0 is-array-buffer: 3.0.4 is-shared-array-buffer: 1.0.3 @@ -14560,6 +14577,14 @@ snapshots: transitivePeerDependencies: - debug + axios@1.13.1: + dependencies: + follow-redirects: 1.15.6 + form-data: 4.0.4 + proxy-from-env: 1.1.0 + transitivePeerDependencies: + - debug + axios@1.6.8: dependencies: follow-redirects: 1.15.6 @@ -15596,7 +15621,7 @@ snapshots: es-define-property: 1.0.0 es-errors: 1.3.0 es-object-atoms: 1.0.0 - es-set-tostringtag: 2.0.3 + es-set-tostringtag: 2.1.0 es-to-primitive: 1.2.1 function.prototype.name: 1.1.6 get-intrinsic: 1.2.4 @@ -15635,7 +15660,7 @@ snapshots: es-define-property@1.0.0: dependencies: - get-intrinsic: 1.2.4 + get-intrinsic: 1.3.0 es-define-property@1.0.1: {} @@ -15651,12 +15676,6 @@ snapshots: dependencies: es-errors: 1.3.0 - es-set-tostringtag@2.0.3: - dependencies: - get-intrinsic: 1.2.4 - has-tostringtag: 1.0.2 - hasown: 2.0.2 - es-set-tostringtag@2.1.0: dependencies: es-errors: 1.3.0 @@ -16415,7 +16434,7 @@ snapshots: dependencies: call-bind: 1.0.7 es-errors: 1.3.0 - get-intrinsic: 1.2.4 + get-intrinsic: 1.3.0 get-tsconfig@4.7.3: dependencies: @@ -16526,7 +16545,7 @@ snapshots: gopd@1.0.1: dependencies: - get-intrinsic: 1.2.4 + get-intrinsic: 1.3.0 gopd@1.2.0: {} @@ -16594,7 +16613,7 @@ snapshots: has-tostringtag@1.0.2: dependencies: - has-symbols: 1.0.3 + has-symbols: 1.1.0 has-unicode@2.0.1: {} @@ -16791,7 +16810,7 @@ snapshots: is-array-buffer@3.0.4: dependencies: call-bind: 1.0.7 - get-intrinsic: 1.2.4 + get-intrinsic: 1.3.0 is-arrayish@0.2.1: {} @@ -16911,7 +16930,7 @@ snapshots: is-symbol@1.0.4: dependencies: - has-symbols: 1.0.3 + has-symbols: 1.1.0 is-text-path@2.0.0: dependencies: @@ -18070,7 +18089,7 @@ snapshots: peopledatalabs@6.1.5: dependencies: - axios: 1.8.4 + axios: 1.11.0 copy-anything: 3.0.5 transitivePeerDependencies: - debug @@ -18527,7 +18546,7 @@ snapshots: safe-array-concat@1.1.2: dependencies: call-bind: 1.0.7 - get-intrinsic: 1.2.4 + get-intrinsic: 1.3.0 has-symbols: 1.0.3 isarray: 2.0.5 @@ -18725,7 +18744,7 @@ snapshots: define-data-property: 1.1.4 es-errors: 1.3.0 function-bind: 1.1.2 - get-intrinsic: 1.2.4 + get-intrinsic: 1.3.0 gopd: 1.0.1 has-property-descriptors: 1.0.2 diff --git a/services/apps/cron_service/src/jobs/nangoGithubSync.job.ts b/services/apps/cron_service/src/jobs/nangoGithubSync.job.ts index b377103f59..281fcaec74 100644 --- a/services/apps/cron_service/src/jobs/nangoGithubSync.job.ts +++ b/services/apps/cron_service/src/jobs/nangoGithubSync.job.ts @@ -1,5 +1,6 @@ import CronTime from 'cron-time-generator' +import { IS_DEV_ENV } from '@crowd/common' import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' import { fetchNangoIntegrationData } from '@crowd/data-access-layer/src/integrations' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' @@ -12,7 +13,7 @@ import { IJobDefinition } from '../types' const job: IJobDefinition = { name: 'nango-github-sync', cronTime: CronTime.every( - Number(process.env.CROWD_GH_NANGO_SYNC_INTERVAL_MINUTES || 60), + Number(process.env.CROWD_GH_NANGO_SYNC_INTERVAL_MINUTES || IS_DEV_ENV ? 5 : 60), ).minutes(), timeout: 10 * 60, process: async (ctx) => { diff --git a/services/apps/nango_worker/package.json b/services/apps/nango_worker/package.json index 227381f2a3..41344353ff 100644 --- a/services/apps/nango_worker/package.json +++ b/services/apps/nango_worker/package.json @@ -16,6 +16,7 @@ "@crowd/archetype-worker": "workspace:*", "@crowd/common": "workspace:*", "@crowd/data-access-layer": "workspace:*", + "@crowd/common_services": "workspace:*", "@crowd/logging": "workspace:*", "@crowd/nango": "workspace:*", "@crowd/redis": "workspace:*", diff --git a/services/apps/nango_worker/src/activities.ts b/services/apps/nango_worker/src/activities.ts index 7de4ff80e0..97ce68a799 100644 --- a/services/apps/nango_worker/src/activities.ts +++ b/services/apps/nango_worker/src/activities.ts @@ -2,12 +2,14 @@ import { analyzeGithubIntegration, createGithubConnection, deleteConnection, + mapGithubRepo, numberOfGithubConnectionsToCreate, processNangoWebhook, removeGithubConnection, setGithubConnection, startNangoSync, unmapGithubRepo, + updateGitIntegrationWithRepo, } from './activities/nangoActivities' export { @@ -18,6 +20,8 @@ export { removeGithubConnection, setGithubConnection, startNangoSync, + mapGithubRepo, unmapGithubRepo, numberOfGithubConnectionsToCreate, + updateGitIntegrationWithRepo, } diff --git a/services/apps/nango_worker/src/activities/nangoActivities.ts b/services/apps/nango_worker/src/activities/nangoActivities.ts index 0018c6efc1..7aa548c0f5 100644 --- a/services/apps/nango_worker/src/activities/nangoActivities.ts +++ b/services/apps/nango_worker/src/activities/nangoActivities.ts @@ -1,10 +1,14 @@ import { IS_DEV_ENV, IS_STAGING_ENV, singleOrDefault } from '@crowd/common' +import { GithubIntegrationService } from '@crowd/common_services' import { + addGitHubRepoMapping, addGithubNangoConnection, + addRepoToGitIntegration, fetchIntegrationById, findIntegrationDataForNangoWebhookProcessing, removeGitHubRepoMapping, removeGithubNangoConnection, + setGithubIntegrationSettingsOrgs, setNangoIntegrationCursor, } from '@crowd/data-access-layer/src/integrations' import IntegrationStreamRepository from '@crowd/data-access-layer/src/old/apps/integration_stream_worker/integrationStream.repo' @@ -53,7 +57,7 @@ export async function numberOfGithubConnectionsToCreate(): Promise { if (IS_DEV_ENV || IS_STAGING_ENV) { svc.log.info('[GITHUB] Number of github connections to create: 5') - return 5 + return 10 } const lastConnectDate = await getLastConnectTs() @@ -184,6 +188,10 @@ export async function analyzeGithubIntegration( repo: IGithubRepoData connectionId: string }[] = [] + const duplicatesToDelete: { + repo: IGithubRepoData + connectionId: string + }[] = [] const reposToSync: IGithubRepoData[] = [] const integration = await fetchIntegrationById(dbStoreQx(svc.postgres.writer), integrationId) @@ -192,6 +200,30 @@ export async function analyzeGithubIntegration( if (integration.platform === PlatformType.GITHUB_NANGO) { const settings = integration.settings + // check if we need to sync org repos + let added = 0 + for (const org of settings.orgs) { + if (org.fullSync) { + const results = await GithubIntegrationService.getOrgRepos(org.name) + for (const result of results) { + // we didn't find the repo so we add it + if (!org.repos.some((r) => r.url === result.url)) { + org.repos.push(result) + added++ + } + } + } + } + + if (added > 0) { + // we need to update the integration settings in the database + await setGithubIntegrationSettingsOrgs( + dbStoreQx(svc.postgres.writer), + integrationId, + settings.orgs, + ) + } + const repos = new Set() if (settings.orgs) { for (const org of settings.orgs) { @@ -212,19 +244,44 @@ export async function analyzeGithubIntegration( if (settings.nangoMapping) { const nangoMapping = settings.nangoMapping as Record - for (const connectionId of Object.keys(nangoMapping)) { + const connectionIds = Object.keys(nangoMapping) + + // check for duplicates as well by tracking which repos have connectionIds + const existingConnectedRepos = [] + for (const connectionId of connectionIds) { const mappedRepo = nangoMapping[connectionId] - const found = singleOrDefault( - finalRepos, - (r) => r.owner === mappedRepo.owner && r.repoName === mappedRepo.repoName, - ) - // if repo is in nangoMapping but not in settings delete the connection - if (!found) { - reposToDelete.push({ + if ( + existingConnectedRepos.some( + (r) => r.owner === mappedRepo.owner && r.repoName === mappedRepo.repoName, + ) + ) { + // found duplicate connectionId for the same repo + duplicatesToDelete.push({ repo: mappedRepo, connectionId, }) + + // just so that later singleOrDefault doesn't find it + delete nangoMapping[connectionId] + } else { + const found = singleOrDefault( + finalRepos, + (r) => r.owner === mappedRepo.owner && r.repoName === mappedRepo.repoName, + ) + + // if repo is in nangoMapping but not in settings delete the connection + if (!found) { + reposToDelete.push({ + repo: mappedRepo, + connectionId, + }) + + // just so that later singleOrDefault doesn't find it + delete nangoMapping[connectionId] + } else { + existingConnectedRepos.push(mappedRepo) + } } } } @@ -264,6 +321,7 @@ export async function analyzeGithubIntegration( return { providerConfigKey: NangoIntegration.GITHUB, reposToDelete, + duplicatesToDelete, reposToSync, } } @@ -356,6 +414,15 @@ export async function deleteConnection( await deleteNangoConnection(providerConfigKey as NangoIntegration, connectionId) } +export async function mapGithubRepo(integrationId: string, repo: IGithubRepoData): Promise { + await addGitHubRepoMapping( + dbStoreQx(svc.postgres.writer), + integrationId, + repo.owner, + repo.repoName, + ) +} + export async function unmapGithubRepo(integrationId: string, repo: IGithubRepoData): Promise { // remove repo from githubRepos mapping await removeGitHubRepoMapping( @@ -367,6 +434,15 @@ export async function unmapGithubRepo(integrationId: string, repo: IGithubRepoDa ) } +export async function updateGitIntegrationWithRepo( + integrationId: string, + repo: IGithubRepoData, +): Promise { + const repoUrl = `https://github.com/${repo.owner}/${repo.repoName}` + const forkedFrom = await GithubIntegrationService.getForkedFrom(repo.owner, repo.repoName) + await addRepoToGitIntegration(dbStoreQx(svc.postgres.writer), integrationId, repoUrl, forkedFrom) +} + function parseGithubUrl(url: string): IGithubRepoData { // Create URL object const parsedUrl = new URL(url) diff --git a/services/apps/nango_worker/src/types.ts b/services/apps/nango_worker/src/types.ts index 06d45ddee1..d7cd7c048e 100644 --- a/services/apps/nango_worker/src/types.ts +++ b/services/apps/nango_worker/src/types.ts @@ -17,6 +17,10 @@ export interface IGithubIntegrationSyncInstructions { repo: IGithubRepoData connectionId: string }[] + duplicatesToDelete: { + repo: IGithubRepoData + connectionId: string + }[] reposToSync: IGithubRepoData[] } diff --git a/services/apps/nango_worker/src/workflows/syncGithubIntegration.ts b/services/apps/nango_worker/src/workflows/syncGithubIntegration.ts index 33c5f16364..fdf3aebd73 100644 --- a/services/apps/nango_worker/src/workflows/syncGithubIntegration.ts +++ b/services/apps/nango_worker/src/workflows/syncGithubIntegration.ts @@ -26,6 +26,17 @@ export async function syncGithubIntegration(args: ISyncGithubIntegrationArgument await activity.unmapGithubRepo(integrationId, repo.repo) } + // delete duplicate connections + for (const repo of result.duplicatesToDelete) { + // delete nango connection + await activity.deleteConnection(result.providerConfigKey, repo.connectionId) + + // delete connection from integrations.settings.nangoMapping object + await activity.removeGithubConnection(integrationId, repo.connectionId) + + // we don't unmap because this one was duplicated + } + // create connections for repos that are not already connected for (const repo of result.reposToSync) { if (created >= limit) { @@ -38,6 +49,12 @@ export async function syncGithubIntegration(args: ISyncGithubIntegrationArgument // add connection to integrations.settings.nangoMapping object await activity.setGithubConnection(integrationId, repo, connectionId) + // add repo to githubRepos mapping if it's not already mapped + await activity.mapGithubRepo(integrationId, repo) + + // add repo to git integration + await activity.updateGitIntegrationWithRepo(integrationId, repo) + // start nango sync await activity.startNangoSync(result.providerConfigKey, connectionId) diff --git a/services/archetypes/worker/src/worker.ts b/services/archetypes/worker/src/worker.ts index 570b7fa3df..9d59814dee 100644 --- a/services/archetypes/worker/src/worker.ts +++ b/services/archetypes/worker/src/worker.ts @@ -293,9 +293,11 @@ export class ServiceWorker extends Service { }, ], }, - dataConverter: { - ...dataConverter, - }, + dataConverter: IS_DEV_ENV + ? undefined + : { + ...dataConverter, + }, maxTaskQueueActivitiesPerSecond: this.options.maxTaskQueueActivitiesPerSecond, maxConcurrentActivityTaskExecutions: this.options.maxConcurrentActivityTaskExecutions, }) diff --git a/services/libs/common_services/package.json b/services/libs/common_services/package.json index ad4293ac0a..c886b25356 100644 --- a/services/libs/common_services/package.json +++ b/services/libs/common_services/package.json @@ -17,10 +17,15 @@ "@crowd/common": "workspace:*", "@crowd/data-access-layer": "workspace:*", "@crowd/database": "workspace:*", + "@crowd/integrations": "workspace:*", "@crowd/logging": "workspace:*", "@crowd/queue": "workspace:*", "@crowd/temporal": "workspace:*", "@crowd/types": "workspace:*", + "@octokit/request": "^5.6.3", + "@octokit/rest": "^22.0.0", + "axios": "^1.13.1", + "jsonwebtoken": "8.5.1", "lodash.isequal": "^4.5.0", "lodash.pick": "~4.4.0", "moment": "^2.30.1" diff --git a/backend/src/services/githubIntegrationService.ts b/services/libs/common_services/src/services/github.integration.service.ts similarity index 82% rename from backend/src/services/githubIntegrationService.ts rename to services/libs/common_services/src/services/github.integration.service.ts index caaf076a76..fe185654a4 100644 --- a/backend/src/services/githubIntegrationService.ts +++ b/services/libs/common_services/src/services/github.integration.service.ts @@ -1,20 +1,67 @@ import { request } from '@octokit/request' -import { Octokit } from '@octokit/rest' +import axios from 'axios' +import * as jwt from 'jsonwebtoken' -import { LlmService } from '@crowd/common_services' import { QueryExecutor } from '@crowd/data-access-layer/src/queryExecutor' import { GithubIntegrationSettings } from '@crowd/integrations' -import { getServiceLogger } from '@crowd/logging' +import { Logger, getServiceLogger } from '@crowd/logging' import { PageData } from '@crowd/types' -import { IServiceOptions } from './IServiceOptions' -import { getGithubInstallationToken } from './helpers/githubToken' +import { LlmService } from './' + +/* eslint-disable @typescript-eslint/no-explicit-any */ // this is a hard limit for search endpoints https://docs.github.com/en/rest/search/search?apiVersion=2022-11-28#about-search const githubMaxSearchResult = 1000 -export default class GithubIntegrationService { - constructor(private readonly options: IServiceOptions) {} +let token: string | undefined +let expiration: Date | undefined + +export const getGithubInstallationToken = async (): Promise => { + if (token && expiration && expiration.getTime() > Date.now()) { + return token + } + + // refresh token + const clientId = process.env.GITHUB_TOKEN_CLIENT_ID + const installationId = process.env.GITHUB_TOKEN_INSTALLATION_ID + const privateKeyRaw = process.env.GITHUB_TOKEN_PRIVATE_KEY + + if (!clientId || !installationId || !privateKeyRaw) { + throw new Error('Github token client id, installation id or private key is not set!') + } + + const now = Math.floor(Date.now() / 1000) + const payload = { + iat: now - 60, + exp: now + 10 * 60, + iss: clientId, + } + + const privateKey = Buffer.from(privateKeyRaw, 'base64').toString('ascii') + + const jwtToken = jwt.sign(payload, privateKey, { algorithm: 'RS256' }) + + const response = await axios.post( + `https://api.github.com/app/installations/${installationId}/access_tokens`, + {}, + { + headers: { + Authorization: `Bearer ${jwtToken}`, + Accept: 'application/vnd.github+json', + 'X-GitHub-Api-Version': '2022-11-28', + }, + }, + ) + + token = response.data.token + expiration = new Date(response.data.expires_at) + + return token +} + +export class GithubIntegrationService { + constructor(private readonly log: Logger) {} /** * Normalizes forkedFrom URL for special cases. @@ -39,7 +86,7 @@ export default class GithubIntegrationService { * @param repo - The repository name * @returns The parent repository information or null if not available */ - private static async getForkedFrom(owner: string, repo: string): Promise { + public static async getForkedFrom(owner: string, repo: string): Promise { try { const auth = await getGithubInstallationToken() const { data } = await request(`GET /repos/${owner}/${repo}`, { @@ -58,14 +105,10 @@ export default class GithubIntegrationService { return null } - public async findGithubRepos( - query: string, - limit: number = 30, - offset: number = 0, - ): Promise> { + public async findGithubRepos(query: string, limit = 30, offset = 0): Promise> { const auth = await getGithubInstallationToken() const page = Math.floor(offset / limit) + 1 // offset to page conversion - this.options.log.info(`Searching repo ${query}, page ${page} and limit: ${limit}`) + this.log.info(`Searching repo ${query}, page ${page} and limit: ${limit}`) const [orgRepos, repos] = await Promise.all([ request('GET /search/repositories', { q: `owner:${query} fork:true`, @@ -75,7 +118,7 @@ export default class GithubIntegrationService { authorization: `bearer ${auth}`, }, }).catch((err) => { - this.options.log.error(`Error getting GitHub repositories for org: ${query}`, err) + this.log.error(`Error getting GitHub repositories for org: ${query}`, err) return { data: { items: [], total_count: 0 } } }), request('GET /search/repositories', { @@ -86,7 +129,7 @@ export default class GithubIntegrationService { authorization: `bearer ${auth}`, }, }).catch((err) => { - this.options.log.error(`Error getting GitHub repositories for org: ${query}`, err) + this.log.error(`Error getting GitHub repositories for org: ${query}`, err) return { data: { items: [], total_count: 0 } } }), ]) @@ -127,11 +170,7 @@ export default class GithubIntegrationService { } } - public static async findOrgs( - query: string, - limit: number = 30, - offset: number = 0, - ): Promise> { + public static async findOrgs(query: string, limit = 30, offset = 0): Promise> { const auth = await getGithubInstallationToken() const page = Math.floor(offset / limit) + 1 // offset to page conversion @@ -159,12 +198,16 @@ export default class GithubIntegrationService { } } - public static async getOrgRepos(org: string) { + public static async getOrgRepos( + org: string, + ): Promise> { const token = await getGithubInstallationToken() + const { Octokit } = await import('@octokit/rest') const octokit = new Octokit({ auth: `Bearer ${token}`, }) + // octokit.paginate automatically fetches all pages const repos = await octokit.paginate(octokit.rest.repos.listForOrg, { org, per_page: 100, // max results per page is 100 diff --git a/services/libs/common_services/src/services/index.ts b/services/libs/common_services/src/services/index.ts index 76c322bfa4..afe75c733d 100644 --- a/services/libs/common_services/src/services/index.ts +++ b/services/libs/common_services/src/services/index.ts @@ -3,3 +3,4 @@ export * from './llm.service' export * from './common.member.service' export * from './bot.service' export * from './emitters' +export * from './github.integration.service' diff --git a/services/libs/data-access-layer/src/index.ts b/services/libs/data-access-layer/src/index.ts index 0052e9cd99..c7963a6a40 100644 --- a/services/libs/data-access-layer/src/index.ts +++ b/services/libs/data-access-layer/src/index.ts @@ -6,3 +6,4 @@ export * from './prompt-history' export * from './queryExecutor' export * from './security_insights' export * from './systemSettings' +export * from './integrations' diff --git a/services/libs/data-access-layer/src/integrations/index.ts b/services/libs/data-access-layer/src/integrations/index.ts index a9e8350e63..f61e1ee6fc 100644 --- a/services/libs/data-access-layer/src/integrations/index.ts +++ b/services/libs/data-access-layer/src/integrations/index.ts @@ -1,3 +1,4 @@ +import { DEFAULT_TENANT_ID, generateUUIDv4 } from '@crowd/common' import { getServiceChildLogger } from '@crowd/logging' import { RedisCache, RedisClient } from '@crowd/redis' import { IIntegration, PlatformType } from '@crowd/types' @@ -230,6 +231,24 @@ export async function fetchIntegrationById( ) } +export async function setGithubIntegrationSettingsOrgs( + qx: QueryExecutor, + integrationId: string, + orgs: unknown, +): Promise { + await qx.result( + ` + update integrations + set settings = jsonb_set(settings, '{orgs}', $(orgs)) + where id = $(integrationId) + `, + { + integrationId, + orgs: JSON.stringify(orgs), + }, + ) +} + export async function fetchNangoIntegrationData( qx: QueryExecutor, platforms: string[], @@ -437,6 +456,226 @@ export async function removeGitHubRepoMapping( await cache.deleteAll() } +export async function addGitHubRepoMapping( + qx: QueryExecutor, + integrationId: string, + owner: string, + repoName: string, +): Promise { + await qx.result( + ` + insert into "githubRepos"("tenantId", "integrationId", "segmentId", url) + values( + $(tenantId), + $(integrationId), + (select "segmentId" from integrations where id = $(integrationId) limit 1), + $(url) + ) + on conflict ("tenantId", url) do update + set + "deletedAt" = null, + "segmentId" = (select "segmentId" from integrations where id = $(integrationId) limit 1), + "integrationId" = $(integrationId), + "updatedAt" = now() + -- in case there is a row already only update it if it's deleted so deletedAt is not null + -- otherwise leave it as is + where "githubRepos"."deletedAt" is not null + `, + { + tenantId: DEFAULT_TENANT_ID, + integrationId, + url: `https://github.com/${owner}/${repoName}`, + }, + ) +} + +/** + * Syncs repositories to git.repositories table (git-integration V2) + * + * Finds existing repository IDs from githubRepos or gitlabRepos tables, + * or generates new UUIDs, then upserts to git.repositories table. + * + * @param qx - Query executor + * @param remotes - Array of repository objects with url and optional forkedFrom + * @param gitIntegrationId - The git integration ID + * @param segmentId - The segment ID for the repositories + */ +export async function syncRepositoriesToGitV2( + qx: QueryExecutor, + remotes: Array<{ url: string; forkedFrom?: string | null }>, + gitIntegrationId: string, + segmentId: string, +): Promise { + if (!remotes || remotes.length === 0) { + log.warn('No remotes provided to syncRepositoriesToGitV2') + return + } + + // Check GitHub repos first, fallback to GitLab repos if none found + const existingRepos: Array<{ + id: string + url: string + }> = await qx.select( + ` + WITH github_repos AS ( + SELECT id, url FROM "githubRepos" + WHERE url IN ($(urls:csv)) AND "deletedAt" IS NULL + ), + gitlab_repos AS ( + SELECT id, url FROM "gitlabRepos" + WHERE url IN ($(urls:csv)) AND "deletedAt" IS NULL + ) + SELECT id, url FROM github_repos + UNION ALL + SELECT id, url FROM gitlab_repos + WHERE NOT EXISTS (SELECT 1 FROM github_repos) + `, + { + urls: remotes.map((r) => r.url), + }, + ) + + // Create a map of url to forkedFrom for quick lookup + const forkedFromMap = new Map(remotes.map((r) => [r.url, r.forkedFrom])) + + let repositoriesToSync: Array<{ + id: string + url: string + integrationId: string + segmentId: string + forkedFrom?: string | null + }> = [] + + // Map existing repos with their IDs + if (existingRepos.length > 0) { + repositoriesToSync = existingRepos.map((repo) => ({ + id: repo.id, + url: repo.url, + integrationId: gitIntegrationId, + segmentId, + forkedFrom: forkedFromMap.get(repo.url) || null, + })) + } else { + // If no existing repos found, create new ones with generated UUIDs + log.warn( + 'No existing repos found in githubRepos or gitlabRepos - inserting new to git.repositories with new UUIDs', + ) + repositoriesToSync = remotes.map((remote) => ({ + id: generateUUIDv4(), + url: remote.url, + integrationId: gitIntegrationId, + segmentId, + forkedFrom: remote.forkedFrom || null, + })) + } + + // Build SQL placeholders and parameters + const placeholders: string[] = [] + const params: Record = {} + + repositoriesToSync.forEach((repo, idx) => { + placeholders.push( + `($(id_${idx}), $(url_${idx}), $(integrationId_${idx}), $(segmentId_${idx}), $(forkedFrom_${idx}))`, + ) + params[`id_${idx}`] = repo.id + params[`url_${idx}`] = repo.url + params[`integrationId_${idx}`] = repo.integrationId + params[`segmentId_${idx}`] = repo.segmentId + params[`forkedFrom_${idx}`] = repo.forkedFrom || null + }) + + const placeholdersString = placeholders.join(', ') + + // Upsert to git.repositories + await qx.result( + ` + INSERT INTO git.repositories (id, url, "integrationId", "segmentId", "forkedFrom") + VALUES ${placeholdersString} + ON CONFLICT (id) DO UPDATE SET + "integrationId" = EXCLUDED."integrationId", + "segmentId" = EXCLUDED."segmentId", + "forkedFrom" = COALESCE(EXCLUDED."forkedFrom", git.repositories."forkedFrom"), + "updatedAt" = NOW(), + "deletedAt" = NULL + `, + params, + ) + + log.info(`Synced ${repositoriesToSync.length} repos to git.repositories`) +} + +export async function addRepoToGitIntegration( + qx: QueryExecutor, + integrationId: string, + repoUrl: string, + forkedFrom: string | null, +): Promise { + // Get the github integration to find its segmentId + const githubIntegration = await qx.selectOneOrNone( + ` + select "segmentId" from integrations where id = $(integrationId) and "deletedAt" is null + `, + { integrationId }, + ) + + if (!githubIntegration) { + log.warn({ integrationId }, 'GitHub integration not found!') + return + } + + // Find the git integration for this segment + const gitIntegration = await qx.selectOneOrNone( + ` + select id, settings from integrations + where "segmentId" = $(segmentId) + and platform = 'git' + and "deletedAt" is null + `, + { segmentId: githubIntegration.segmentId }, + ) + + if (!gitIntegration) { + log.warn({ segmentId: githubIntegration.segmentId }, 'Git integration not found for segment!') + return + } + + // Get existing remotes + const existingRemotes = gitIntegration.settings?.remotes || [] + + // Check if repo already exists + if (existingRemotes.includes(repoUrl)) { + log.debug({ repoUrl }, 'Repo already exists in git integration, skipping!') + return + } + + // Add new repo to remotes array + const updatedRemotes = [...existingRemotes, repoUrl] + + // Update git integration settings + await qx.result( + ` + update integrations + set settings = jsonb_set(settings, '{remotes}', $(remotes)::jsonb), + "updatedAt" = now() + where id = $(id) + `, + { + id: gitIntegration.id, + remotes: JSON.stringify(updatedRemotes), + }, + ) + + log.info({ integrationId: gitIntegration.id, repoUrl }, 'Added repo to git integration settings!') + + // Also sync to git.repositories table (git-integration V2) + await syncRepositoriesToGitV2( + qx, + [{ url: repoUrl, forkedFrom }], + gitIntegration.id, + githubIntegration.segmentId, + ) +} + export async function removePlainGitHubRepoMapping( qx: QueryExecutor, redisClient: RedisClient, diff --git a/services/libs/nango/src/client.ts b/services/libs/nango/src/client.ts index 2a0fc12163..afa5b9ea61 100644 --- a/services/libs/nango/src/client.ts +++ b/services/libs/nango/src/client.ts @@ -355,6 +355,19 @@ export const deleteNangoConnection = async ( try { await backendClient.deleteConnection(integration, connectionId) } catch (err) { + if (axios.isAxiosError(err)) { + if (err.response?.status === 404) { + return + } + + if ( + err.response?.status === 400 && + err.response?.data?.error?.code === 'unknown_connection' + ) { + return + } + } + if (retries <= MAX_RETRIES) { await timeout(100) return await deleteNangoConnection(integration, connectionId, retries + 1) diff --git a/services/libs/temporal/src/index.ts b/services/libs/temporal/src/index.ts index be2896efa2..5020473b9b 100644 --- a/services/libs/temporal/src/index.ts +++ b/services/libs/temporal/src/index.ts @@ -1,6 +1,6 @@ import { Client, Connection } from '@temporalio/client' -import { SERVICE } from '@crowd/common' +import { IS_DEV_ENV, SERVICE } from '@crowd/common' import { getServiceChildLogger } from '@crowd/logging' import { getDataConverter } from './encryption/data-converter' @@ -64,7 +64,7 @@ export const getTemporalClient = async (cfg: ITemporalConfig): Promise = connection, namespace: cfg.namespace, identity: cfg.identity, - dataConverter: await getDataConverter(), + dataConverter: IS_DEV_ENV ? undefined : await getDataConverter(), }) log.info(