diff --git a/services/apps/members_enrichment_worker/src/activities/member.ts b/services/apps/members_enrichment_worker/src/activities/member.ts index 36225a1176..06f41841f8 100644 --- a/services/apps/members_enrichment_worker/src/activities/member.ts +++ b/services/apps/members_enrichment_worker/src/activities/member.ts @@ -77,16 +77,8 @@ export async function getIdentitiesExistInOtherMembers( excludeMemberId: string, identities: IMemberIdentity[], ): Promise { - let rows: IMemberIdentity[] = [] - - try { - const db = svc.postgres.reader - rows = await getIdentitiesExistInOthers(db, excludeMemberId, identities) - } catch (err) { - throw err - } - - return rows + const db = svc.postgres.reader + return getIdentitiesExistInOthers(db, excludeMemberId, identities) } export async function updateMemberWithEnrichmentData( @@ -94,25 +86,21 @@ export async function updateMemberWithEnrichmentData( identities: IMemberIdentity[], attributes?: IAttributes, ): Promise { - try { - await svc.postgres.writer.connection().tx(async (tx) => { - for (const identity of identities) { - await createMemberIdentity(new PgPromiseQueryExecutor(tx), { - memberId, - platform: identity.platform, - value: identity.value, - type: identity.type, - verified: identity.verified || false, - source: 'enrichment', - }) - } - if (attributes) { - await updateMemberAttributes(tx, memberId, attributes) - } - }) - } catch (err) { - throw err - } + await svc.postgres.writer.connection().tx(async (tx) => { + for (const identity of identities) { + await createMemberIdentity(new PgPromiseQueryExecutor(tx), { + memberId, + platform: identity.platform, + value: identity.value, + type: identity.type, + verified: identity.verified || false, + source: 'enrichment', + }) + } + if (attributes) { + await updateMemberAttributes(tx, memberId, attributes) + } + }) } export async function mergeMembers( diff --git a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts index f574c468e9..eddefbaf6a 100644 --- a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts +++ b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts @@ -118,13 +118,11 @@ async function prepareMemberOrganizationAffiliationTimeline( // solves conflicts in timeranges, always decides on one org when there are overlapping ranges const buildTimeline = ( - memberOrganizations: MemberOrganizationWithOverrides[], - manualAffiliations: IManualAffiliationData[], + affiliations: AffiliationItem[], fallbackOrganizationId: string | null, + includeFallback = true, ): TimelineItem[] => { - const allAffiliationsWithDates = [...memberOrganizations, ...manualAffiliations].filter( - (row) => !!row.dateStart, - ) + const allAffiliationsWithDates = affiliations.filter((row) => !!row.dateStart) const earliestStartDate = allAffiliationsWithDates.length > 0 @@ -148,7 +146,7 @@ async function prepareMemberOrganizationAffiliationTimeline( let gapStartDate = null for (let date = new Date(earliestStartDate); date <= now; date.setDate(date.getDate() + 1)) { - const orgs = findOrgsWithRolesInDate(date, [...memberOrganizations, ...manualAffiliations]) + const orgs = findOrgsWithRolesInDate(date, affiliations) if (orgs.length === 0) { // means there's a gap in the timeline, close the current range if there's one @@ -226,13 +224,15 @@ async function prepareMemberOrganizationAffiliationTimeline( fallbackEnd = oneDayBefore(earliestStartDate) } - // prepend range to cover all activities before the earliest affiliation date - // also handles edge case where fallback org is null and the timeline is empty. - timeline.unshift({ - organizationId: fallbackOrganizationId, - dateStart: fallbackStart.toISOString(), - dateEnd: fallbackEnd.toISOString(), - }) + if (includeFallback) { + // prepend range to cover all activities before the earliest affiliation date + // also handles edge case where fallback org is null and the timeline is empty. + timeline.unshift({ + organizationId: fallbackOrganizationId, + dateStart: fallbackStart.toISOString(), + dateEnd: fallbackEnd.toISOString(), + }) + } return timeline } @@ -308,7 +308,38 @@ async function prepareMemberOrganizationAffiliationTimeline( .value() ?? null } - return buildTimeline(memberOrganizations, manualAffiliations, fallbackOrganizationId) + // We separate global and manual timelines to prevent 'stale' organizationIds + // Member organizations apply globally, while member segment affiliations only override specific segments. + const baseTimeline = buildTimeline(memberOrganizations, fallbackOrganizationId).map((item) => ({ + ...item, + skipManualAffiliationSegments: manualAffiliations.length > 0, + })) + + // Only keep items with an actual org. Gaps (null org) are already handled by the base timeline. + const manualTimeline = _.flatMap( + _.groupBy(manualAffiliations, 'segmentId'), + (affiliations, segmentId) => { + const items = buildTimeline(affiliations, null, false) + .filter((item) => item.organizationId !== null) + .map((item) => ({ ...item, segmentId })) + + // Undated MSAs are invisible to buildTimeline (no dateStart to anchor the loop). + // Create a catch-all so the base pass's NOT EXISTS still has a matching manual item. + if (items.length === 0) { + const primary = selectPrimaryWorkExperience(affiliations) + items.push({ + organizationId: primary.organizationId, + dateStart: new Date('1970-01-01').toISOString(), + dateEnd: primary.dateEnd ? new Date(primary.dateEnd).toISOString() : null, + segmentId, + }) + } + + return items + }, + ) + + return [...baseTimeline, ...manualTimeline] } async function processAffiliationActivities( @@ -327,31 +358,47 @@ async function processAffiliationActivities( } // Build the where conditions for the subquery - const conditions = [`"memberId" = $(memberId)`] + const conditions = [`ar."memberId" = $(memberId)`] // Organization filtering if (affiliation.organizationId) { - conditions.push(`("organizationId" is null or "organizationId" <> $(organizationId))`) + conditions.push(`(ar."organizationId" is null or ar."organizationId" <> $(organizationId))`) } else { - conditions.push(`"organizationId" is not null`) + conditions.push(`ar."organizationId" is not null`) } // Date filtering if (affiliation.dateStart) { - conditions.push(`"timestamp" >= $(dateStart)::date`) + conditions.push(`ar."timestamp" >= $(dateStart)::date`) params.dateStart = affiliation.dateStart } if (affiliation.dateEnd) { - conditions.push(`"timestamp" < $(dateEnd)::date + interval '1 day'`) + conditions.push(`ar."timestamp" < $(dateEnd)::date + interval '1 day'`) params.dateEnd = affiliation.dateEnd } // Segment filtering (for manual affiliations) if (affiliation.segmentId) { - conditions.push(`"segmentId" = $(segmentId)`) + conditions.push(`ar."segmentId" = $(segmentId)`) params.segmentId = affiliation.segmentId } + // Don't overwrite activities that a member segment affiliation covers + // Those are handled in the manual timeline. + if (affiliation.skipManualAffiliationSegments) { + conditions.push(` + NOT EXISTS ( + SELECT 1 + FROM "memberSegmentAffiliations" msa + WHERE msa."memberId" = $(memberId) + AND msa."segmentId" = ar."segmentId" + AND msa."organizationId" IS NOT NULL + AND (msa."dateStart" IS NULL OR ar."timestamp" >= msa."dateStart"::date) + AND (msa."dateEnd" IS NULL OR ar."timestamp" < msa."dateEnd"::date + interval '1 day') + ) + `) + } + const whereClause = conditions.join(' and ') do { @@ -360,7 +407,7 @@ async function processAffiliationActivities( UPDATE "activityRelations" SET "organizationId" = $(organizationId), "updatedAt" = CURRENT_TIMESTAMP WHERE "activityId" in ( - select "activityId" from "activityRelations" + select ar."activityId" from "activityRelations" ar where ${whereClause} limit $(batchSize) ) diff --git a/services/libs/data-access-layer/src/member-organization-affiliation/types.ts b/services/libs/data-access-layer/src/member-organization-affiliation/types.ts index 1ee9b4a677..8e954b8799 100644 --- a/services/libs/data-access-layer/src/member-organization-affiliation/types.ts +++ b/services/libs/data-access-layer/src/member-organization-affiliation/types.ts @@ -10,4 +10,5 @@ export type TimelineItem = { dateEnd: string | null organizationId: string | null segmentId?: string + skipManualAffiliationSegments?: boolean }