Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 40 additions & 12 deletions services/apps/data_sink_worker/src/service/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,18 +334,40 @@ export default class ActivityService extends LoggerBase {
if (identities.length === 1) {
activity.username = identities[0].value
} else if (identities.length === 0) {
this.log.error(
{ platform, activity },
`Activity's member does not have an identity for the platform!`,
// Fall back to same-platform email identity — handles old gerrit records where
// only a type:email identity was stored (before the gerrit integration
// gained the email-as-username fallback).
const emailFallback = activity.member.identities.find(
(i) => i.platform === platform && i.type === MemberIdentityType.EMAIL && i.value,
)
results.set(resultId, {
success: false,
err: new UnrepeatableError(
`Activity's member does not have an identity for the platform: ${platform}!`,
),
})

continue
if (emailFallback && emailFallback.verified) {
activity.username = emailFallback.value
activity.member.identities.push({
platform,
type: MemberIdentityType.USERNAME,
value: emailFallback.value,
verified: true,
source: emailFallback.source,
})
} else if (emailFallback) {
// Email identity exists but is unverified — cannot safely use as username key.
results.set(resultId, {
success: false,
err: new UnrepeatableError(
`Activity's member has no verified username or email identity for platform: ${platform}!`,
),
})
Comment thread
themarolt marked this conversation as resolved.
continue
} else {
Comment thread
themarolt marked this conversation as resolved.
// No usable identity at all (e.g. git commit with empty author email).
// Nothing to attribute — skip silently rather than error.
this.log.warn(
{ platform, resultId },
`Activity's member has no usable identity for the platform, skipping.`,
)
results.set(resultId, { success: true })
Comment thread
themarolt marked this conversation as resolved.
continue
}
} else {
this.log.error(
{ platform, activity },
Expand Down Expand Up @@ -459,7 +481,13 @@ export default class ActivityService extends LoggerBase {
if (!success) {
resultMap.set(resultId, { success: false, err })
} else {
relevantPayloads.push(single(payloads, (a) => a.resultId === resultId))
const payload = single(payloads, (a) => a.resultId === resultId)
if (!payload.activity.username?.trim()) {
// prepareMemberData found no usable identity — mark as processed and skip.
resultMap.set(resultId, { success: true })
} else {
relevantPayloads.push(payload)
}
}
}

Expand Down
35 changes: 21 additions & 14 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,10 @@ export default class MemberService extends LoggerBase {
}

const key = orgCacheKey(org)
const cachedOrgPromise = key ? orgPromiseCache?.get(key) : undefined
let orgIdPromise: Promise<string | undefined>
if (key && orgPromiseCache?.has(key)) {
orgIdPromise = orgPromiseCache.get(key)
if (cachedOrgPromise) {
orgIdPromise = cachedOrgPromise
} else {
orgIdPromise = logExecutionTimeV2(
() => orgService.findOrCreate(platform, integrationId, org),
Expand All @@ -492,10 +493,12 @@ export default class MemberService extends LoggerBase {
}
}
const orgId = await orgIdPromise
organizations.push({
id: orgId,
source: org.source,
})
if (orgId) {
organizations.push({
id: orgId,
source: org.source,
})
}
}
}

Expand Down Expand Up @@ -711,9 +714,10 @@ export default class MemberService extends LoggerBase {
this.log.trace({ memberId: id }, 'Finding or creating organization!')

const key = orgCacheKey(org)
const cachedOrgPromise = key ? orgPromiseCache?.get(key) : undefined
let orgIdPromise: Promise<string | undefined>
if (key && orgPromiseCache?.has(key)) {
orgIdPromise = orgPromiseCache.get(key)
if (cachedOrgPromise) {
orgIdPromise = cachedOrgPromise
} else {
orgIdPromise = logExecutionTimeV2(
() => orgService.findOrCreate(platform, integrationId, org),
Expand All @@ -726,10 +730,12 @@ export default class MemberService extends LoggerBase {
}
}
const orgId = await orgIdPromise
organizations.push({
id: orgId,
source: data.source,
})
if (orgId) {
organizations.push({
id: orgId,
source: data.source,
})
}
}
}

Expand Down Expand Up @@ -839,9 +845,10 @@ export default class MemberService extends LoggerBase {
],
}
const key = orgCacheKey(org)
const cachedOrgPromise = key ? orgPromiseCache?.get(key) : undefined
let orgIdPromise: Promise<string | undefined>
if (key && orgPromiseCache?.has(key)) {
orgIdPromise = orgPromiseCache.get(key)
if (cachedOrgPromise) {
orgIdPromise = cachedOrgPromise
} else {
orgIdPromise = orgService.findOrCreate(
OrganizationAttributeSource.EMAIL,
Expand Down
13 changes: 3 additions & 10 deletions services/apps/data_sink_worker/src/service/organization.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,11 @@ export class OrganizationService extends LoggerBase {
source: string,
integrationId: string,
data: IOrganization,
): Promise<string> {
const id = await this.store.transactionally(async (txStore) => {
): Promise<string | undefined> {
return this.store.transactionally(async (txStore) => {
const qe = dbStoreQx(txStore)
const id = await findOrCreateOrganization(qe, source, data, integrationId, true)
return id
return findOrCreateOrganization(qe, source, data, integrationId, true)
})

if (!id) {
throw new Error('Organization not found or created!')
}

return id
}

public async addToMember(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,47 +538,22 @@ def create_activities_from_commit(
committer_name = commit["committer_name"]
committer_email = commit["committer_email"]

# Create author activity
author = {
"username": author_email,
"displayName": author_name,
"emails": [author_email],
}
activity = self.create_activity(
remote=remote,
commit=commit,
activity_type="authored-commit",
member=author,
source_id=commit_hash,
segment_id=segment_id,
re_onboarding_count=re_onboarding_count,
)
activity_db, activity_kafka = self.prepare_activity_for_db_and_queue(
activity, segment_id, integration_id
)
activities_db.append(activity_db)
activities_queue.append(activity_kafka)

# Only create committer activity if author and committer are different
if author_name != committer_name or author_email != committer_email:
# IMPORTANT: hash_input has a typo in "commited" instead of "committed"
# however fixing it requires recalculating sourceId/parentSourceId for ALL git activities in db
# so far the typo doesn't have any major effect, since the activity type "committed-commit" is correct
hash_input = f"{commit_hash}commited-commit{committer_email}"
committer_source_id = hashlib.sha1(hash_input.encode("utf-8")).hexdigest()

committer = {
"username": committer_email,
"displayName": committer_name,
"emails": [committer_email],
# Create author activity — skip if email is empty (no identity to attach to)
author_email = author_email.strip() if author_email else ""
if not author_email:
self.logger.warning(f"Skipping authored-commit for {commit_hash} — empty author email")
Comment thread
themarolt marked this conversation as resolved.
else:
author = {
Comment thread
themarolt marked this conversation as resolved.
"username": author_email,
"displayName": author_name,
"emails": [author_email],
}
activity = self.create_activity(
remote=remote,
commit=commit,
activity_type="committed-commit",
member=committer,
source_id=committer_source_id,
source_parent_id=commit_hash,
activity_type="authored-commit",
member=author,
source_id=commit_hash,
segment_id=segment_id,
re_onboarding_count=re_onboarding_count,
)
Expand All @@ -588,23 +563,65 @@ def create_activities_from_commit(
activities_db.append(activity_db)
activities_queue.append(activity_kafka)

# Only create committer activity if author and committer are different
committer_email = committer_email.strip() if committer_email else ""
if author_name != committer_name or author_email != committer_email:
if not committer_email:
self.logger.warning(
f"Skipping committed-commit for {commit_hash} — empty committer email"
)
else:
# IMPORTANT: hash_input has a typo in "commited" instead of "committed"
# however fixing it requires recalculating sourceId/parentSourceId for ALL git activities in db
# so far the typo doesn't have any major effect, since the activity type "committed-commit" is correct
hash_input = f"{commit_hash}commited-commit{committer_email}"
committer_source_id = hashlib.sha1(hash_input.encode("utf-8")).hexdigest()

committer = {
"username": committer_email,
"displayName": committer_name,
"emails": [committer_email],
}
activity = self.create_activity(
remote=remote,
commit=commit,
activity_type="committed-commit",
member=committer,
source_id=committer_source_id,
source_parent_id=commit_hash,
segment_id=segment_id,
re_onboarding_count=re_onboarding_count,
)
activity_db, activity_kafka = self.prepare_activity_for_db_and_queue(
activity, segment_id, integration_id
)
activities_db.append(activity_db)
activities_queue.append(activity_kafka)

# Process extracted activities from commit message
extracted_activities = self.extract_activities(commit["message"])
for extracted_activity in extracted_activities:
activity_type, member_data = list(extracted_activity.items())[0]

trailer_email = (member_data.get("email") or "").strip()
if not trailer_email:
self.logger.warning(
f"Skipping {activity_type} for {commit_hash} — empty email in commit trailer"
)
continue
Comment thread
themarolt marked this conversation as resolved.

# Convert activity type to lowercase and add "-commit" suffix
# This matches the legacy behavior: "signed-off-by" -> "signed-off-commit"
activity_type = activity_type.lower().replace("-by", "") + "-commit"

member = {
"displayName": member_data["name"],
"emails": [member_data["email"]],
"emails": [trailer_email],
}

# Generate unique source ID for extracted activity
source_id = hashlib.sha1(
(commit_hash + activity_type + member_data["email"]).encode("utf-8")
(commit_hash + activity_type + trailer_email).encode("utf-8")
).hexdigest()
activity = self.create_activity(
remote=remote,
Expand Down
Loading