From 53cfac970894d2c8f81b1d24e584a2784a39ebb0 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Thu, 7 May 2026 18:50:51 +0530 Subject: [PATCH 1/6] fix: separate base and msa timelines to prevent stale activityRelations Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../member-organization-affiliation/index.ts | 73 +++++++++++++------ .../member-organization-affiliation/types.ts | 1 + 2 files changed, 52 insertions(+), 22 deletions(-) diff --git a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts index f574c468e9..18fc552ac3 100644 --- a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts +++ b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts @@ -116,15 +116,12 @@ async function prepareMemberOrganizationAffiliationTimeline( } } - // solves conflicts in timeranges, always decides on one org when there are overlapping ranges const buildTimeline = ( - memberOrganizations: MemberOrganizationWithOverrides[], - manualAffiliations: IManualAffiliationData[], + affiliations: AffiliationItem[], fallbackOrganizationId: string | null, + includeFallback = true, ): TimelineItem[] => { - const allAffiliationsWithDates = [...memberOrganizations, ...manualAffiliations].filter( - (row) => !!row.dateStart, - ) + const allAffiliationsWithDates = affiliations.filter((row) => !!row.dateStart) const earliestStartDate = allAffiliationsWithDates.length > 0 @@ -148,7 +145,7 @@ async function prepareMemberOrganizationAffiliationTimeline( let gapStartDate = null for (let date = new Date(earliestStartDate); date <= now; date.setDate(date.getDate() + 1)) { - const orgs = findOrgsWithRolesInDate(date, [...memberOrganizations, ...manualAffiliations]) + const orgs = findOrgsWithRolesInDate(date, affiliations) if (orgs.length === 0) { // means there's a gap in the timeline, close the current range if there's one @@ -226,13 +223,14 @@ async function prepareMemberOrganizationAffiliationTimeline( fallbackEnd = oneDayBefore(earliestStartDate) } - // prepend range to cover all activities before the earliest affiliation date - // also handles edge case where fallback org is null and the timeline is empty. - timeline.unshift({ - organizationId: fallbackOrganizationId, - dateStart: fallbackStart.toISOString(), - dateEnd: fallbackEnd.toISOString(), - }) + if (includeFallback) { + // Covers activity before the first dated stint (and empty-timeline edge cases). Manual passes omit this. + timeline.unshift({ + organizationId: fallbackOrganizationId, + dateStart: fallbackStart.toISOString(), + dateEnd: fallbackEnd.toISOString(), + }) + } return timeline } @@ -308,7 +306,23 @@ async function prepareMemberOrganizationAffiliationTimeline( .value() ?? null } - return buildTimeline(memberOrganizations, manualAffiliations, fallbackOrganizationId) + // we separate global and manual timelines to prevent 'stale' organizationIds. + // member organizations apply globally, while member segment affiliations only override specific segments. + const baseTimeline = buildTimeline(memberOrganizations, fallbackOrganizationId).map((item) => ({ + ...item, + skipManualAffiliationSegments: manualAffiliations.length > 0, + })) + + // Only keep items with an actual org. Gaps (null org) are already handled by the base timeline. + const manualTimeline = _.flatMap( + _.groupBy(manualAffiliations, 'segmentId'), + (affiliations, segmentId) => + buildTimeline(affiliations, null, false) + .filter((item) => item.organizationId !== null) + .map((item) => ({ ...item, segmentId })), + ) + + return [...baseTimeline, ...manualTimeline] } async function processAffiliationActivities( @@ -327,31 +341,46 @@ async function processAffiliationActivities( } // Build the where conditions for the subquery - const conditions = [`"memberId" = $(memberId)`] + const conditions = [`ar."memberId" = $(memberId)`] // Organization filtering if (affiliation.organizationId) { - conditions.push(`("organizationId" is null or "organizationId" <> $(organizationId))`) + conditions.push(`(ar."organizationId" is null or ar."organizationId" <> $(organizationId))`) } else { - conditions.push(`"organizationId" is not null`) + conditions.push(`ar."organizationId" is not null`) } // Date filtering if (affiliation.dateStart) { - conditions.push(`"timestamp" >= $(dateStart)::date`) + conditions.push(`ar."timestamp" >= $(dateStart)::date`) params.dateStart = affiliation.dateStart } if (affiliation.dateEnd) { - conditions.push(`"timestamp" < $(dateEnd)::date + interval '1 day'`) + conditions.push(`ar."timestamp" < $(dateEnd)::date + interval '1 day'`) params.dateEnd = affiliation.dateEnd } // Segment filtering (for manual affiliations) if (affiliation.segmentId) { - conditions.push(`"segmentId" = $(segmentId)`) + conditions.push(`ar."segmentId" = $(segmentId)`) params.segmentId = affiliation.segmentId } + // Don't overwrite activities that an member segment affiliation covers + // Those are handled in the manual timeline. + if (affiliation.skipManualAffiliationSegments) { + conditions.push(` + NOT EXISTS ( + SELECT 1 + FROM "memberSegmentAffiliations" msa + WHERE msa."memberId" = $(memberId) + AND msa."segmentId" = ar."segmentId" + AND (msa."dateStart" IS NULL OR ar."timestamp" >= msa."dateStart"::date) + AND (msa."dateEnd" IS NULL OR ar."timestamp" < msa."dateEnd"::date + interval '1 day') + ) + `) + } + const whereClause = conditions.join(' and ') do { @@ -360,7 +389,7 @@ async function processAffiliationActivities( UPDATE "activityRelations" SET "organizationId" = $(organizationId), "updatedAt" = CURRENT_TIMESTAMP WHERE "activityId" in ( - select "activityId" from "activityRelations" + select ar."activityId" from "activityRelations" ar where ${whereClause} limit $(batchSize) ) diff --git a/services/libs/data-access-layer/src/member-organization-affiliation/types.ts b/services/libs/data-access-layer/src/member-organization-affiliation/types.ts index 1ee9b4a677..8e954b8799 100644 --- a/services/libs/data-access-layer/src/member-organization-affiliation/types.ts +++ b/services/libs/data-access-layer/src/member-organization-affiliation/types.ts @@ -10,4 +10,5 @@ export type TimelineItem = { dateEnd: string | null organizationId: string | null segmentId?: string + skipManualAffiliationSegments?: boolean } From f77174bc7504e739bfcbe1a67130453ce0ba844a Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Thu, 7 May 2026 19:33:47 +0530 Subject: [PATCH 2/6] fix: filter out null organization IDs in affiliation activities query Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/member-organization-affiliation/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts index 18fc552ac3..d62c1d3dea 100644 --- a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts +++ b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts @@ -375,6 +375,7 @@ async function processAffiliationActivities( FROM "memberSegmentAffiliations" msa WHERE msa."memberId" = $(memberId) AND msa."segmentId" = ar."segmentId" + AND msa."organizationId" IS NOT NULL AND (msa."dateStart" IS NULL OR ar."timestamp" >= msa."dateStart"::date) AND (msa."dateEnd" IS NULL OR ar."timestamp" < msa."dateEnd"::date + interval '1 day') ) From 5c2352aba6c42f9d0fca81e242981dd413a5fac2 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Thu, 7 May 2026 20:17:04 +0530 Subject: [PATCH 3/6] fix: improve manual timeline handling Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../member-organization-affiliation/index.ts | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts index d62c1d3dea..d84046633a 100644 --- a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts +++ b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts @@ -306,8 +306,8 @@ async function prepareMemberOrganizationAffiliationTimeline( .value() ?? null } - // we separate global and manual timelines to prevent 'stale' organizationIds. - // member organizations apply globally, while member segment affiliations only override specific segments. + // We separate global and manual timelines to prevent 'stale' organizationIds + // Member organizations apply globally, while member segment affiliations only override specific segments. const baseTimeline = buildTimeline(memberOrganizations, fallbackOrganizationId).map((item) => ({ ...item, skipManualAffiliationSegments: manualAffiliations.length > 0, @@ -316,10 +316,25 @@ async function prepareMemberOrganizationAffiliationTimeline( // Only keep items with an actual org. Gaps (null org) are already handled by the base timeline. const manualTimeline = _.flatMap( _.groupBy(manualAffiliations, 'segmentId'), - (affiliations, segmentId) => - buildTimeline(affiliations, null, false) + (affiliations, segmentId) => { + const items = buildTimeline(affiliations, null, false) .filter((item) => item.organizationId !== null) - .map((item) => ({ ...item, segmentId })), + .map((item) => ({ ...item, segmentId })) + + // Undated MSAs are invisible to buildTimeline (no dateStart to anchor the loop). + // Create a catch-all so the base pass's NOT EXISTS still has a matching manual item. + if (items.length === 0) { + const primary = selectPrimaryWorkExperience(affiliations) + items.push({ + organizationId: primary.organizationId, + dateStart: new Date('1970-01-01').toISOString(), + dateEnd: null, + segmentId, + }) + } + + return items + }, ) return [...baseTimeline, ...manualTimeline] @@ -366,7 +381,7 @@ async function processAffiliationActivities( params.segmentId = affiliation.segmentId } - // Don't overwrite activities that an member segment affiliation covers + // Don't overwrite activities that a member segment affiliation covers // Those are handled in the manual timeline. if (affiliation.skipManualAffiliationSegments) { conditions.push(` From e2453f3c5cd25c737fb141d62b110d0113e1637c Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Thu, 7 May 2026 20:26:29 +0530 Subject: [PATCH 4/6] revert: code comments we had before Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/member-organization-affiliation/index.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts index d84046633a..d309dc9713 100644 --- a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts +++ b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts @@ -116,6 +116,7 @@ async function prepareMemberOrganizationAffiliationTimeline( } } + // solves conflicts in timeranges, always decides on one org when there are overlapping ranges const buildTimeline = ( affiliations: AffiliationItem[], fallbackOrganizationId: string | null, @@ -224,7 +225,8 @@ async function prepareMemberOrganizationAffiliationTimeline( } if (includeFallback) { - // Covers activity before the first dated stint (and empty-timeline edge cases). Manual passes omit this. + // prepend range to cover all activities before the earliest affiliation date + // also handles edge case where fallback org is null and the timeline is empty. timeline.unshift({ organizationId: fallbackOrganizationId, dateStart: fallbackStart.toISOString(), From ef5ebaefc4e38104ed08cd0e8d0971c3ad7a5669 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Thu, 7 May 2026 21:12:46 +0530 Subject: [PATCH 5/6] fix: update dateEnd handling in prepareMemberOrganizationAffiliationTimeline func Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/member-organization-affiliation/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts index d309dc9713..eddefbaf6a 100644 --- a/services/libs/data-access-layer/src/member-organization-affiliation/index.ts +++ b/services/libs/data-access-layer/src/member-organization-affiliation/index.ts @@ -330,7 +330,7 @@ async function prepareMemberOrganizationAffiliationTimeline( items.push({ organizationId: primary.organizationId, dateStart: new Date('1970-01-01').toISOString(), - dateEnd: null, + dateEnd: primary.dateEnd ? new Date(primary.dateEnd).toISOString() : null, segmentId, }) } From 826182307024a493f494ec9ff6b6cccea9e3c758 Mon Sep 17 00:00:00 2001 From: Yeganathan S <63534555+skwowet@users.noreply.github.com> Date: Fri, 8 May 2026 17:25:46 +0530 Subject: [PATCH 6/6] chore: make prettier and linter happy Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com> --- .../src/activities/member.ts | 46 +++++++------------ 1 file changed, 17 insertions(+), 29 deletions(-) diff --git a/services/apps/members_enrichment_worker/src/activities/member.ts b/services/apps/members_enrichment_worker/src/activities/member.ts index 36225a1176..06f41841f8 100644 --- a/services/apps/members_enrichment_worker/src/activities/member.ts +++ b/services/apps/members_enrichment_worker/src/activities/member.ts @@ -77,16 +77,8 @@ export async function getIdentitiesExistInOtherMembers( excludeMemberId: string, identities: IMemberIdentity[], ): Promise { - let rows: IMemberIdentity[] = [] - - try { - const db = svc.postgres.reader - rows = await getIdentitiesExistInOthers(db, excludeMemberId, identities) - } catch (err) { - throw err - } - - return rows + const db = svc.postgres.reader + return getIdentitiesExistInOthers(db, excludeMemberId, identities) } export async function updateMemberWithEnrichmentData( @@ -94,25 +86,21 @@ export async function updateMemberWithEnrichmentData( identities: IMemberIdentity[], attributes?: IAttributes, ): Promise { - try { - await svc.postgres.writer.connection().tx(async (tx) => { - for (const identity of identities) { - await createMemberIdentity(new PgPromiseQueryExecutor(tx), { - memberId, - platform: identity.platform, - value: identity.value, - type: identity.type, - verified: identity.verified || false, - source: 'enrichment', - }) - } - if (attributes) { - await updateMemberAttributes(tx, memberId, attributes) - } - }) - } catch (err) { - throw err - } + await svc.postgres.writer.connection().tx(async (tx) => { + for (const identity of identities) { + await createMemberIdentity(new PgPromiseQueryExecutor(tx), { + memberId, + platform: identity.platform, + value: identity.value, + type: identity.type, + verified: identity.verified || false, + source: 'enrichment', + }) + } + if (attributes) { + await updateMemberAttributes(tx, memberId, attributes) + } + }) } export async function mergeMembers(