Skip to content
46 changes: 17 additions & 29 deletions services/apps/members_enrichment_worker/src/activities/member.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,42 +77,30 @@ export async function getIdentitiesExistInOtherMembers(
excludeMemberId: string,
identities: IMemberIdentity[],
): Promise<IMemberIdentity[]> {
let rows: IMemberIdentity[] = []

try {
const db = svc.postgres.reader
rows = await getIdentitiesExistInOthers(db, excludeMemberId, identities)
} catch (err) {
throw err
}

return rows
const db = svc.postgres.reader
return getIdentitiesExistInOthers(db, excludeMemberId, identities)
}

export async function updateMemberWithEnrichmentData(
memberId: string,
identities: IMemberIdentity[],
attributes?: IAttributes,
): Promise<void> {
try {
await svc.postgres.writer.connection().tx(async (tx) => {
for (const identity of identities) {
await createMemberIdentity(new PgPromiseQueryExecutor(tx), {
memberId,
platform: identity.platform,
value: identity.value,
type: identity.type,
verified: identity.verified || false,
source: 'enrichment',
})
}
if (attributes) {
await updateMemberAttributes(tx, memberId, attributes)
}
})
} catch (err) {
throw err
}
await svc.postgres.writer.connection().tx(async (tx) => {
for (const identity of identities) {
await createMemberIdentity(new PgPromiseQueryExecutor(tx), {
memberId,
platform: identity.platform,
value: identity.value,
type: identity.type,
verified: identity.verified || false,
source: 'enrichment',
})
}
if (attributes) {
await updateMemberAttributes(tx, memberId, attributes)
}
})
}

export async function mergeMembers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,11 @@ async function prepareMemberOrganizationAffiliationTimeline(

// solves conflicts in timeranges, always decides on one org when there are overlapping ranges
const buildTimeline = (
memberOrganizations: MemberOrganizationWithOverrides[],
manualAffiliations: IManualAffiliationData[],
affiliations: AffiliationItem[],
fallbackOrganizationId: string | null,
includeFallback = true,
): TimelineItem[] => {
const allAffiliationsWithDates = [...memberOrganizations, ...manualAffiliations].filter(
(row) => !!row.dateStart,
)
const allAffiliationsWithDates = affiliations.filter((row) => !!row.dateStart)

const earliestStartDate =
allAffiliationsWithDates.length > 0
Expand All @@ -148,7 +146,7 @@ async function prepareMemberOrganizationAffiliationTimeline(
let gapStartDate = null

for (let date = new Date(earliestStartDate); date <= now; date.setDate(date.getDate() + 1)) {
const orgs = findOrgsWithRolesInDate(date, [...memberOrganizations, ...manualAffiliations])
const orgs = findOrgsWithRolesInDate(date, affiliations)

if (orgs.length === 0) {
// means there's a gap in the timeline, close the current range if there's one
Expand Down Expand Up @@ -226,13 +224,15 @@ async function prepareMemberOrganizationAffiliationTimeline(
fallbackEnd = oneDayBefore(earliestStartDate)
}

// prepend range to cover all activities before the earliest affiliation date
// also handles edge case where fallback org is null and the timeline is empty.
timeline.unshift({
organizationId: fallbackOrganizationId,
dateStart: fallbackStart.toISOString(),
dateEnd: fallbackEnd.toISOString(),
})
if (includeFallback) {
// prepend range to cover all activities before the earliest affiliation date
// also handles edge case where fallback org is null and the timeline is empty.
timeline.unshift({
organizationId: fallbackOrganizationId,
dateStart: fallbackStart.toISOString(),
dateEnd: fallbackEnd.toISOString(),
})
}

return timeline
}
Expand Down Expand Up @@ -308,7 +308,38 @@ async function prepareMemberOrganizationAffiliationTimeline(
.value() ?? null
}

return buildTimeline(memberOrganizations, manualAffiliations, fallbackOrganizationId)
// We separate global and manual timelines to prevent 'stale' organizationIds
// Member organizations apply globally, while member segment affiliations only override specific segments.
const baseTimeline = buildTimeline(memberOrganizations, fallbackOrganizationId).map((item) => ({
...item,
skipManualAffiliationSegments: manualAffiliations.length > 0,
}))

// Only keep items with an actual org. Gaps (null org) are already handled by the base timeline.
const manualTimeline = _.flatMap(
_.groupBy(manualAffiliations, 'segmentId'),
(affiliations, segmentId) => {
const items = buildTimeline(affiliations, null, false)
.filter((item) => item.organizationId !== null)
.map((item) => ({ ...item, segmentId }))

// Undated MSAs are invisible to buildTimeline (no dateStart to anchor the loop).
// Create a catch-all so the base pass's NOT EXISTS still has a matching manual item.
if (items.length === 0) {
const primary = selectPrimaryWorkExperience(affiliations)
items.push({
organizationId: primary.organizationId,
dateStart: new Date('1970-01-01').toISOString(),
Comment thread
skwowet marked this conversation as resolved.
dateEnd: primary.dateEnd ? new Date(primary.dateEnd).toISOString() : null,
segmentId,
})
}
Comment thread
skwowet marked this conversation as resolved.

return items
},
)
Comment thread
skwowet marked this conversation as resolved.

return [...baseTimeline, ...manualTimeline]
}

async function processAffiliationActivities(
Expand All @@ -327,31 +358,47 @@ async function processAffiliationActivities(
}

// Build the where conditions for the subquery
const conditions = [`"memberId" = $(memberId)`]
const conditions = [`ar."memberId" = $(memberId)`]

// Organization filtering
if (affiliation.organizationId) {
conditions.push(`("organizationId" is null or "organizationId" <> $(organizationId))`)
conditions.push(`(ar."organizationId" is null or ar."organizationId" <> $(organizationId))`)
} else {
conditions.push(`"organizationId" is not null`)
conditions.push(`ar."organizationId" is not null`)
}

// Date filtering
if (affiliation.dateStart) {
conditions.push(`"timestamp" >= $(dateStart)::date`)
conditions.push(`ar."timestamp" >= $(dateStart)::date`)
params.dateStart = affiliation.dateStart
}
if (affiliation.dateEnd) {
conditions.push(`"timestamp" < $(dateEnd)::date + interval '1 day'`)
conditions.push(`ar."timestamp" < $(dateEnd)::date + interval '1 day'`)
params.dateEnd = affiliation.dateEnd
}

// Segment filtering (for manual affiliations)
if (affiliation.segmentId) {
conditions.push(`"segmentId" = $(segmentId)`)
conditions.push(`ar."segmentId" = $(segmentId)`)
params.segmentId = affiliation.segmentId
}

// Don't overwrite activities that a member segment affiliation covers
// Those are handled in the manual timeline.
if (affiliation.skipManualAffiliationSegments) {
conditions.push(`
NOT EXISTS (
SELECT 1
FROM "memberSegmentAffiliations" msa
WHERE msa."memberId" = $(memberId)
AND msa."segmentId" = ar."segmentId"
AND msa."organizationId" IS NOT NULL
AND (msa."dateStart" IS NULL OR ar."timestamp" >= msa."dateStart"::date)
AND (msa."dateEnd" IS NULL OR ar."timestamp" < msa."dateEnd"::date + interval '1 day')
Comment thread
skwowet marked this conversation as resolved.
)
Comment thread
skwowet marked this conversation as resolved.
`)
}
Comment thread
skwowet marked this conversation as resolved.

const whereClause = conditions.join(' and ')

do {
Expand All @@ -360,7 +407,7 @@ async function processAffiliationActivities(
UPDATE "activityRelations"
SET "organizationId" = $(organizationId), "updatedAt" = CURRENT_TIMESTAMP
WHERE "activityId" in (
select "activityId" from "activityRelations"
select ar."activityId" from "activityRelations" ar
where ${whereClause}
limit $(batchSize)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ export type TimelineItem = {
dateEnd: string | null
organizationId: string | null
segmentId?: string
skipManualAffiliationSegments?: boolean
}
Loading