Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ export default class MemberService extends LoggerBase {
(!member.identities || member.identities.length === 0)
) {
const errorMessage = `Member can't be enriched. It is missing both emails and identities fields.`
this.log.error(errorMessage)
throw new Error(errorMessage)
this.log.warn(errorMessage)
return
}

await this.store.transactionally(async (txStore) => {
Expand All @@ -317,7 +317,10 @@ export default class MemberService extends LoggerBase {
const segmentId = dbIntegration.segmentId

// first try finding the member using the identity
const identity = singleOrDefault(member.identities, (i) => i.platform === platform)
const identity = singleOrDefault(
member.identities,
(i) => i.platform === platform && i.sourceId !== null,
)
let dbMember = await txRepo.findMember(tenantId, segmentId, platform, identity.username)

if (!dbMember && member.emails && member.emails.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ export class OrganizationService extends LoggerBase {
const txRepo = new OrganizationRepository(txStore, this.log)
const txIntegrationRepo = new IntegrationRepository(txStore, this.log)

const txService = new OrganizationService(txStore, this.log)

const dbIntegration = await txIntegrationRepo.findById(integrationId)
const segmentId = dbIntegration.segmentId

Expand Down Expand Up @@ -381,7 +383,7 @@ export class OrganizationService extends LoggerBase {
organization.identities.unshift(...existingIdentities)
}

await this.findOrCreate(tenantId, segmentId, integrationId, organization)
await txService.findOrCreate(tenantId, segmentId, integrationId, organization)
} else {
this.log.debug(
'No organization found for enriching. This organization enrich process had no affect.',
Expand Down
5 changes: 5 additions & 0 deletions services/apps/integration_sync_worker/src/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const integrationNotFound = (integrationId: string): string =>
`Integration ${integrationId} not found!`

export const automationNotFound = (automationId: string): string =>
`Automation ${automationId} not found!`
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
import { IDbIntegration } from '@/repo/integration.data'
import { AutomationRepository } from '@/repo/automation.repo'
import { AutomationExecutionRepository } from '@/repo/automationExecution.repo'
import { automationNotFound, integrationNotFound } from '@/errors'

export class MemberSyncService extends LoggerBase {
private readonly memberRepo: MemberRepository
Expand Down Expand Up @@ -45,14 +46,33 @@ export class MemberSyncService extends LoggerBase {
): Promise<void> {
const integration = await this.integrationRepo.findById(integrationId)

if (!integration) {
const message = integrationNotFound(integrationId)
this.log.warn(message)
return
}

const member = await this.memberRepo.findMember(memberId)

if (!member) {
this.log.warn(`Member ${memberId} is not found for syncing remote!`)
return
}

const syncRemote = await this.memberRepo.findSyncRemoteById(syncRemoteId)

const membersToCreate = []
const membersToUpdate = []

if (syncRemote.sourceId) {
member.attributes = {
...member.attributes,
sourceId: {
...(member.attributes.sourceId || {}),
[integration.platform]: syncRemote.sourceId,
},
}

membersToUpdate.push(member)
} else {
membersToCreate.push(member)
Expand Down Expand Up @@ -96,6 +116,12 @@ export class MemberSyncService extends LoggerBase {
): Promise<void> {
const integration: IDbIntegration = await this.integrationRepo.findById(integrationId)

if (!integration) {
const message = integrationNotFound(integrationId)
this.log.warn(message)
return
}

const platforms = await this.memberRepo.getExistingPlatforms(tenantId)

const attributes = await this.memberRepo.getTenantMemberAttributes(tenantId)
Expand Down Expand Up @@ -173,6 +199,7 @@ export class MemberSyncService extends LoggerBase {
translatedMembers[0].attributes = {
...translatedMembers[0].attributes,
sourceId: {
...(translatedMembers[0].attributes.sourceId || {}),
[integration.platform]: memberToSync.sourceId,
},
}
Expand Down Expand Up @@ -228,8 +255,20 @@ export class MemberSyncService extends LoggerBase {
batchSize = 50,
) {
const integration: IDbIntegration = await this.integrationRepo.findById(integrationId)

if (!integration) {
const message = integrationNotFound(integrationId)
this.log.warn(message)
return
}
const automation = await this.automationRepo.findById(automationId)

if (!automation) {
const message = automationNotFound(automationId)
this.log.warn(message)
return
}

const platforms = await this.memberRepo.getExistingPlatforms(tenantId)

const attributes = await this.memberRepo.getTenantMemberAttributes(tenantId)
Expand Down Expand Up @@ -362,6 +401,7 @@ export class MemberSyncService extends LoggerBase {
memberToSync.attributes = {
...memberToSync.attributes,
sourceId: {
...(memberToSync.attributes.sourceId || {}),
[integration.platform]: syncRemote.sourceId,
},
}
Expand Down Expand Up @@ -453,8 +493,20 @@ export class MemberSyncService extends LoggerBase {
) {
const integration: IDbIntegration = await this.integrationRepo.findById(integrationId)

if (!integration) {
const message = integrationNotFound(integrationId)
this.log.warn(message)
return
}

const automation = await this.automationRepo.findById(automationId)

if (!automation) {
const message = automationNotFound(automationId)
this.log.warn(message)
return
}

let organizationMembers
let offset

Expand Down Expand Up @@ -495,6 +547,7 @@ export class MemberSyncService extends LoggerBase {
member.attributes = {
...member.attributes,
sourceId: {
...(member.attributes.sourceId || {}),
[integration.platform]: syncRemote.sourceId,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ export class OrganizationSyncService extends LoggerBase {
const organizationsToUpdate = []

if (syncRemote.sourceId) {
organization.attributes = {
...organization.attributes,
sourceId: {
...(organization.attributes.sourceId || {}),
[integration.platform]: syncRemote.sourceId,
},
}
organizationsToUpdate.push(organization)
} else {
oranizationsToCreate.push(organization)
Expand Down Expand Up @@ -121,6 +128,7 @@ export class OrganizationSyncService extends LoggerBase {
organization.attributes = {
...organization.attributes,
sourceId: {
...(organization.attributes.sourceId || {}),
[integration.platform]: organizationToSync.sourceId,
},
}
Expand Down Expand Up @@ -235,6 +243,7 @@ export class OrganizationSyncService extends LoggerBase {
organization.attributes = {
...organization.attributes,
sourceId: {
...(organization.attributes.sourceId || {}),
[integration.platform]: syncRemote.sourceId,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,49 +26,51 @@ export const batchUpdateMembers = async (
const hubspotMembers = []

for (const member of members) {
const hubspotSourceId = member.attributes?.sourceId?.hubspot

if (!hubspotSourceId) {
ctx.log.warn(
`Member ${member.id} can't be updated in hubspot! Member doesn't have a hubspot sourceId in attributes.`,
)
} else {
const hsMember = {
id: hubspotSourceId,
properties: {},
} as any

const fields = memberMapper.getAllCrowdFields()

for (const crowdField of fields) {
const hubspotField = memberMapper.getHubspotFieldName(crowdField)
if (crowdField.startsWith('attributes')) {
const attributeName = crowdField.split('.')[1] || null

if (
attributeName &&
hubspotField &&
member.attributes[attributeName]?.default !== undefined
) {
hsMember.properties[hubspotField] = member.attributes[attributeName].default
if (member) {
const hubspotSourceId = member.attributes?.sourceId?.hubspot

if (!hubspotSourceId) {
ctx.log.warn(
`Member ${member.id} can't be updated in hubspot! Member doesn't have a hubspot sourceId in attributes.`,
)
} else {
const hsMember = {
id: hubspotSourceId,
properties: {},
} as any

const fields = memberMapper.getAllCrowdFields()

for (const crowdField of fields) {
const hubspotField = memberMapper.getHubspotFieldName(crowdField)
if (crowdField.startsWith('attributes')) {
const attributeName = crowdField.split('.')[1] || null

if (
attributeName &&
hubspotField &&
member.attributes[attributeName]?.default !== undefined
) {
hsMember.properties[hubspotField] = member.attributes[attributeName].default
}
} else if (crowdField.startsWith('identities')) {
const identityPlatform = crowdField.split('.')[1] || null

const identityFound = member.identities.find((i) => i.platform === identityPlatform)

if (identityPlatform && hubspotField && identityFound) {
hsMember.properties[hubspotField] = identityFound.username
}
} else if (crowdField === 'organizationName') {
// send latest org of member as value
} else if (hubspotField && member[crowdField] !== undefined) {
hsMember.properties[hubspotField] = memberMapper.getHubspotValue(member, crowdField)
}
} else if (crowdField.startsWith('identities')) {
const identityPlatform = crowdField.split('.')[1] || null

const identityFound = member.identities.find((i) => i.platform === identityPlatform)

if (identityPlatform && hubspotField && identityFound) {
hsMember.properties[hubspotField] = identityFound.username
}
} else if (crowdField === 'organizationName') {
// send latest org of member as value
} else if (hubspotField && member[crowdField] !== undefined) {
hsMember.properties[hubspotField] = memberMapper.getHubspotValue(member, crowdField)
}
}

if (Object.keys(hsMember.properties).length > 0) {
hubspotMembers.push(hsMember)
if (Object.keys(hsMember.properties).length > 0) {
hubspotMembers.push(hsMember)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,35 @@ export const batchUpdateOrganizations = async (
const hubspotCompanies = []

for (const organization of organizations) {
const hubspotSourceId = organization.attributes?.sourceId?.hubspot
if (organization) {
const hubspotSourceId = organization.attributes?.sourceId?.hubspot

if (!hubspotSourceId) {
ctx.log.warn(
`Organization ${organization.id} can't be updated in hubspot! Organization doesn't have a hubspot sourceId.`,
)
} else {
const hubspotCompany = {
id: hubspotSourceId,
properties: {},
} as any
if (!hubspotSourceId) {
ctx.log.warn(
`Organization ${organization.id} can't be updated in hubspot! Organization doesn't have a hubspot sourceId.`,
)
} else {
const hubspotCompany = {
id: hubspotSourceId,
properties: {},
} as any

const fields = organizationMapper.getAllCrowdFields()
const fields = organizationMapper.getAllCrowdFields()

for (const crowdField of fields) {
const hubspotField = organizationMapper.getHubspotFieldName(crowdField)
for (const crowdField of fields) {
const hubspotField = organizationMapper.getHubspotFieldName(crowdField)

if (hubspotField && organization[crowdField] !== undefined) {
hubspotCompany.properties[hubspotField] = organizationMapper.getHubspotValue(
organization,
crowdField,
)
if (hubspotField && organization[crowdField] !== undefined) {
hubspotCompany.properties[hubspotField] = organizationMapper.getHubspotValue(
organization,
crowdField,
)
}
}
}

if (Object.keys(hubspotCompany.properties).length > 0) {
hubspotCompanies.push(hubspotCompany)
if (Object.keys(hubspotCompany.properties).length > 0) {
hubspotCompanies.push(hubspotCompany)
}
}
}
}
Expand Down