Files
users/services/IngestionService.ts
v0 4eed69520c feat: refactor data asset center for enhanced search and analytics
Refactor homepage for focused search and data display; streamline data platform; enhance user and tag management; focus AI assistant on data analysis and report generation.

Co-authored-by: null <4804959+fnvtk@users.noreply.github.com>
2025-07-25 06:42:34 +00:00

447 lines
13 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// 数据接入服务
// 实现数据映射、转换、标准化和关联全局userId的核心逻辑
import { DATA_DICTIONARY } from "@/lib/data-dictionary"
import { IdentityService } from "./IdentityService"
import { getCollectionData } from "@/lib/mongodb-mock-connector"
export interface IngestionRequest {
source: string
sourceUserId?: string
sourceRecordId?: string
originalData: Record<string, any>
timestamp?: string
}
export interface ProcessedUserData {
userId: string
coreProfile: Record<string, any>
unifiedTags: string[]
unifiedAttributes: Record<string, any>
sourceProfiles: Array<{
source: string
sourceUserId?: string
sourceRecordId?: string
originalData: Record<string, any>
contentActivity?: any[]
interactionHistory?: any[]
ingestionTimestamp: string
}>
aiInsights?: Record<string, any>
crmInfo?: Record<string, any>
createdAt?: string
updatedAt: string
}
export class IngestionService {
private static instance: IngestionService
private identityService: IdentityService
private constructor() {
this.identityService = IdentityService.getInstance()
}
public static getInstance(): IngestionService {
if (!IngestionService.instance) {
IngestionService.instance = new IngestionService()
}
return IngestionService.instance
}
// 处理数据接入请求
public async processIngestionRequest(request: IngestionRequest): Promise<ProcessedUserData> {
try {
// 1. 数据映射与转换
const mappedData = await this.mapAndTransformData(request)
// 2. 身份识别与关联
const userId = await this.identifyOrCreateUser(mappedData)
// 3. 构建统一用户数据结构
const processedData = await this.buildUnifiedUserData(userId, request, mappedData)
// 4. 数据质量检查
await this.validateDataQuality(processedData)
// 5. 存储到数据库(这里是模拟)
await this.storeUserData(processedData)
return processedData
} catch (error) {
console.error("数据接入处理失败:", error)
throw new Error(`数据接入处理失败: ${(error as Error).message}`)
}
}
// 数据映射与转换
private async mapAndTransformData(request: IngestionRequest): Promise<Record<string, any>> {
const mappedData: Record<string, any> = {}
const sourceMappings = DATA_DICTIONARY.sourceMappings[request.source] || []
for (const [sourceField, sourceValue] of Object.entries(request.originalData)) {
const mapping = sourceMappings.find((m) => m.sourceField === sourceField)
if (mapping) {
let transformedValue = sourceValue
// 应用转换规则
if (mapping.transformRule) {
transformedValue = this.applyTransformRule(sourceValue, mapping.transformRule)
}
// 应用验证规则
if (mapping.validationRule) {
const isValid = this.applyValidationRule(transformedValue, mapping.validationRule)
if (!isValid) {
console.warn(`字段 ${sourceField} 验证失败,跳过映射`)
continue
}
}
mappedData[mapping.targetField] = transformedValue
} else {
// 如果没有映射规则,保留原始字段名
mappedData[sourceField] = sourceValue
}
}
return mappedData
}
// 应用转换规则
private applyTransformRule(value: any, rule: string): any {
switch (rule) {
case "prefix_dy_":
return `dy_${value}`
case "prefix_xhs_":
return `xhs_${value}`
case "lowercase":
return typeof value === "string" ? value.toLowerCase() : value
case "uppercase":
return typeof value === "string" ? value.toUpperCase() : value
case "trim":
return typeof value === "string" ? value.trim() : value
default:
return value
}
}
// 应用验证规则
private applyValidationRule(value: any, rule: string): boolean {
switch (rule) {
case "required":
return value !== null && value !== undefined && value !== ""
case "email":
return typeof value === "string" && /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(value)
case "phone":
return typeof value === "string" && /^1[3-9]\d{9}$/.test(value)
default:
return true
}
}
// 身份识别与关联
private async identifyOrCreateUser(mappedData: Record<string, any>): Promise<string> {
// 查找匹配的用户身份
const matches = await this.identityService.findMatchingIdentity(mappedData)
if (matches.length > 0 && matches[0].confidence > 0.8) {
// 找到高置信度匹配使用现有用户ID
const bestMatch = matches[0]
await this.identityService.updateIdentity(bestMatch.userId, mappedData)
return bestMatch.userId
} else {
// 创建新用户身份
return await this.identityService.createNewIdentity(mappedData)
}
}
// 构建统一用户数据结构
private async buildUnifiedUserData(
userId: string,
request: IngestionRequest,
mappedData: Record<string, any>,
): Promise<ProcessedUserData> {
// 获取现有用户数据(模拟)
const existingUsers = await getCollectionData("users")
const existingUser = existingUsers.find((user: any) => user._id === userId)
const now = new Date().toISOString()
// 构建核心档案
const coreProfile: Record<string, any> = {}
Object.keys(DATA_DICTIONARY.coreFields).forEach((field) => {
if (mappedData[field]) {
coreProfile[field] = mappedData[field]
} else if (existingUser?.core_profile?.[field]) {
coreProfile[field] = existingUser.core_profile[field]
}
})
// 构建统一标签
const unifiedTags = this.generateUnifiedTags(mappedData, request.source)
// 构建统一属性
const unifiedAttributes = this.generateUnifiedAttributes(mappedData, request.source)
// 构建源档案
const newSourceProfile = {
source: request.source,
sourceUserId: request.sourceUserId,
sourceRecordId: request.sourceRecordId,
originalData: request.originalData,
contentActivity: this.extractContentActivity(request.originalData, request.source),
interactionHistory: this.extractInteractionHistory(request.originalData, request.source),
ingestionTimestamp: request.timestamp || now,
}
const sourceProfiles = existingUser?.source_profiles || []
// 检查是否已存在相同来源的档案
const existingSourceIndex = sourceProfiles.findIndex(
(profile: any) =>
profile.source === request.source &&
(profile.sourceUserId === request.sourceUserId || profile.sourceRecordId === request.sourceRecordId),
)
if (existingSourceIndex >= 0) {
// 更新现有源档案
sourceProfiles[existingSourceIndex] = newSourceProfile
} else {
// 添加新的源档案
sourceProfiles.push(newSourceProfile)
}
// 生成AI洞察
const aiInsights = this.generateAIInsights(mappedData, request.source)
return {
userId,
coreProfile,
unifiedTags: [...new Set([...(existingUser?.unified_tags || []), ...unifiedTags])],
unifiedAttributes: { ...(existingUser?.unified_attributes || {}), ...unifiedAttributes },
sourceProfiles,
aiInsights: { ...(existingUser?.ai_insights || {}), ...aiInsights },
crmInfo: existingUser?.crm_info || {},
createdAt: existingUser?.createdAt || now,
updatedAt: now,
}
}
// 生成统一标签
private generateUnifiedTags(mappedData: Record<string, any>, source: string): string[] {
const tags: string[] = []
// 基于来源生成标签
if (source === "douyin") {
tags.push("抖音用户")
if (mappedData.followerCount > 10000) {
tags.push("抖音达人")
}
} else if (source === "xiaohongshu") {
tags.push("小红书用户")
} else if (source === "cunkebao_form") {
tags.push("表单用户")
} else if (source === "touchkebao_call") {
tags.push("电话咨询用户")
}
// 基于数据内容生成标签
if (mappedData.city) {
tags.push(`${mappedData.city}地区`)
}
if (mappedData.birthDate) {
const birthYear = new Date(mappedData.birthDate).getFullYear()
const currentYear = new Date().getFullYear()
const age = currentYear - birthYear
if (age >= 18 && age < 30) {
tags.push("年轻用户")
} else if (age >= 30 && age < 50) {
tags.push("中年用户")
}
}
return tags
}
// 生成统一属性
private generateUnifiedAttributes(mappedData: Record<string, any>, source: string): Record<string, any> {
const attributes: Record<string, any> = {}
// 基于来源设置属性
if (source === "douyin" && mappedData.followerCount) {
attributes.socialInfluence = mappedData.followerCount
}
if (source === "touchkebao_call") {
attributes.lastContactTime = mappedData.callTime || new Date().toISOString()
attributes.contactChannel = "phone"
}
if (source === "cunkebao_form") {
attributes.lastContactTime = new Date().toISOString()
attributes.contactChannel = "form"
}
// 设置活跃度
attributes.lastActiveDays = 0 // 刚接入的数据认为是活跃的
return attributes
}
// 提取内容活动
private extractContentActivity(originalData: Record<string, any>, source: string): any[] {
const activities: any[] = []
if (source === "douyin" && originalData.videos) {
originalData.videos.forEach((video: any) => {
activities.push({
contentId: video.id,
type: "video",
url: video.url,
text: video.description,
publishTime: video.createTime,
likes: video.likeCount,
comments: video.commentCount,
})
})
}
if (source === "xiaohongshu" && originalData.notes) {
originalData.notes.forEach((note: any) => {
activities.push({
contentId: note.id,
type: "note",
url: note.url,
text: note.content,
publishTime: note.createTime,
likes: note.likeCount,
comments: note.commentCount,
})
})
}
return activities
}
// 提取互动历史
private extractInteractionHistory(originalData: Record<string, any>, source: string): any[] {
const interactions: any[] = []
if (originalData.interactions) {
originalData.interactions.forEach((interaction: any) => {
interactions.push({
type: interaction.type,
targetId: interaction.targetId,
content: interaction.content,
time: interaction.time,
})
})
}
return interactions
}
// 生成AI洞察
private generateAIInsights(mappedData: Record<string, any>, source: string): Record<string, any> {
const insights: Record<string, any> = {}
// 基于数据生成人设标签
const personaTags: string[] = []
if (source === "douyin") {
personaTags.push("视频爱好者")
}
if (source === "xiaohongshu") {
personaTags.push("生活分享者")
}
if (mappedData.city === "北京") {
personaTags.push("一线城市用户")
}
insights.personaTags = personaTags
insights.sentimentScore = 0.7 // 默认中性偏正面
insights.potentialNeeds = this.inferPotentialNeeds(mappedData, source)
return insights
}
// 推断潜在需求
private inferPotentialNeeds(mappedData: Record<string, any>, source: string): string[] {
const needs: string[] = []
if (source === "touchkebao_call") {
needs.push("产品咨询", "客服支持")
}
if (source === "cunkebao_form") {
needs.push("产品了解", "营销活动")
}
if (source === "douyin") {
needs.push("内容创作工具", "社交媒体管理")
}
return needs
}
// 数据质量检查
private async validateDataQuality(data: ProcessedUserData): Promise<void> {
const errors: string[] = []
// 检查必需字段
if (!data.userId) {
errors.push("缺少用户ID")
}
// 检查数据完整性
if (!data.coreProfile || Object.keys(data.coreProfile).length === 0) {
errors.push("核心档案为空")
}
// 检查源档案
if (!data.sourceProfiles || data.sourceProfiles.length === 0) {
errors.push("缺少源档案数据")
}
if (errors.length > 0) {
throw new Error(`数据质量检查失败: ${errors.join(", ")}`)
}
}
// 存储用户数据(模拟)
private async storeUserData(data: ProcessedUserData): Promise<void> {
// 在实际应用中这里会将数据存储到MongoDB
console.log("存储用户数据:", {
userId: data.userId,
coreProfileFields: Object.keys(data.coreProfile).length,
tagsCount: data.unifiedTags.length,
sourceProfilesCount: data.sourceProfiles.length,
})
}
// 批量处理数据接入
public async processBatchIngestion(requests: IngestionRequest[]): Promise<ProcessedUserData[]> {
const results: ProcessedUserData[] = []
const errors: Array<{ request: IngestionRequest; error: string }> = []
for (const request of requests) {
try {
const result = await this.processIngestionRequest(request)
results.push(result)
} catch (error) {
errors.push({
request,
error: (error as Error).message,
})
}
}
if (errors.length > 0) {
console.warn("批量处理中的错误:", errors)
}
return results
}
}