// 数据接入服务 // 实现数据映射、转换、标准化和关联全局userId的核心逻辑 import { DATA_DICTIONARY } from "@/lib/data-dictionary" import { IdentityService } from "./IdentityService" import { getCollectionData } from "@/lib/mongodb-mock-connector" export interface IngestionRequest { source: string sourceUserId?: string sourceRecordId?: string originalData: Record timestamp?: string } export interface ProcessedUserData { userId: string coreProfile: Record unifiedTags: string[] unifiedAttributes: Record sourceProfiles: Array<{ source: string sourceUserId?: string sourceRecordId?: string originalData: Record contentActivity?: any[] interactionHistory?: any[] ingestionTimestamp: string }> aiInsights?: Record crmInfo?: Record createdAt?: string updatedAt: string } export class IngestionService { private static instance: IngestionService private identityService: IdentityService private constructor() { this.identityService = IdentityService.getInstance() } public static getInstance(): IngestionService { if (!IngestionService.instance) { IngestionService.instance = new IngestionService() } return IngestionService.instance } // 处理数据接入请求 public async processIngestionRequest(request: IngestionRequest): Promise { try { // 1. 数据映射与转换 const mappedData = await this.mapAndTransformData(request) // 2. 身份识别与关联 const userId = await this.identifyOrCreateUser(mappedData) // 3. 构建统一用户数据结构 const processedData = await this.buildUnifiedUserData(userId, request, mappedData) // 4. 数据质量检查 await this.validateDataQuality(processedData) // 5. 存储到数据库(这里是模拟) await this.storeUserData(processedData) return processedData } catch (error) { console.error("数据接入处理失败:", error) throw new Error(`数据接入处理失败: ${(error as Error).message}`) } } // 数据映射与转换 private async mapAndTransformData(request: IngestionRequest): Promise> { const mappedData: Record = {} const sourceMappings = DATA_DICTIONARY.sourceMappings[request.source] || [] for (const [sourceField, sourceValue] of Object.entries(request.originalData)) { const mapping = sourceMappings.find((m) => m.sourceField === sourceField) if (mapping) { let transformedValue = sourceValue // 应用转换规则 if (mapping.transformRule) { transformedValue = this.applyTransformRule(sourceValue, mapping.transformRule) } // 应用验证规则 if (mapping.validationRule) { const isValid = this.applyValidationRule(transformedValue, mapping.validationRule) if (!isValid) { console.warn(`字段 ${sourceField} 验证失败,跳过映射`) continue } } mappedData[mapping.targetField] = transformedValue } else { // 如果没有映射规则,保留原始字段名 mappedData[sourceField] = sourceValue } } return mappedData } // 应用转换规则 private applyTransformRule(value: any, rule: string): any { switch (rule) { case "prefix_dy_": return `dy_${value}` case "prefix_xhs_": return `xhs_${value}` case "lowercase": return typeof value === "string" ? value.toLowerCase() : value case "uppercase": return typeof value === "string" ? value.toUpperCase() : value case "trim": return typeof value === "string" ? value.trim() : value default: return value } } // 应用验证规则 private applyValidationRule(value: any, rule: string): boolean { switch (rule) { case "required": return value !== null && value !== undefined && value !== "" case "email": return typeof value === "string" && /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(value) case "phone": return typeof value === "string" && /^1[3-9]\d{9}$/.test(value) default: return true } } // 身份识别与关联 private async identifyOrCreateUser(mappedData: Record): Promise { // 查找匹配的用户身份 const matches = await this.identityService.findMatchingIdentity(mappedData) if (matches.length > 0 && matches[0].confidence > 0.8) { // 找到高置信度匹配,使用现有用户ID const bestMatch = matches[0] await this.identityService.updateIdentity(bestMatch.userId, mappedData) return bestMatch.userId } else { // 创建新用户身份 return await this.identityService.createNewIdentity(mappedData) } } // 构建统一用户数据结构 private async buildUnifiedUserData( userId: string, request: IngestionRequest, mappedData: Record, ): Promise { // 获取现有用户数据(模拟) const existingUsers = await getCollectionData("users") const existingUser = existingUsers.find((user: any) => user._id === userId) const now = new Date().toISOString() // 构建核心档案 const coreProfile: Record = {} Object.keys(DATA_DICTIONARY.coreFields).forEach((field) => { if (mappedData[field]) { coreProfile[field] = mappedData[field] } else if (existingUser?.core_profile?.[field]) { coreProfile[field] = existingUser.core_profile[field] } }) // 构建统一标签 const unifiedTags = this.generateUnifiedTags(mappedData, request.source) // 构建统一属性 const unifiedAttributes = this.generateUnifiedAttributes(mappedData, request.source) // 构建源档案 const newSourceProfile = { source: request.source, sourceUserId: request.sourceUserId, sourceRecordId: request.sourceRecordId, originalData: request.originalData, contentActivity: this.extractContentActivity(request.originalData, request.source), interactionHistory: this.extractInteractionHistory(request.originalData, request.source), ingestionTimestamp: request.timestamp || now, } const sourceProfiles = existingUser?.source_profiles || [] // 检查是否已存在相同来源的档案 const existingSourceIndex = sourceProfiles.findIndex( (profile: any) => profile.source === request.source && (profile.sourceUserId === request.sourceUserId || profile.sourceRecordId === request.sourceRecordId), ) if (existingSourceIndex >= 0) { // 更新现有源档案 sourceProfiles[existingSourceIndex] = newSourceProfile } else { // 添加新的源档案 sourceProfiles.push(newSourceProfile) } // 生成AI洞察 const aiInsights = this.generateAIInsights(mappedData, request.source) return { userId, coreProfile, unifiedTags: [...new Set([...(existingUser?.unified_tags || []), ...unifiedTags])], unifiedAttributes: { ...(existingUser?.unified_attributes || {}), ...unifiedAttributes }, sourceProfiles, aiInsights: { ...(existingUser?.ai_insights || {}), ...aiInsights }, crmInfo: existingUser?.crm_info || {}, createdAt: existingUser?.createdAt || now, updatedAt: now, } } // 生成统一标签 private generateUnifiedTags(mappedData: Record, source: string): string[] { const tags: string[] = [] // 基于来源生成标签 if (source === "douyin") { tags.push("抖音用户") if (mappedData.followerCount > 10000) { tags.push("抖音达人") } } else if (source === "xiaohongshu") { tags.push("小红书用户") } else if (source === "cunkebao_form") { tags.push("表单用户") } else if (source === "touchkebao_call") { tags.push("电话咨询用户") } // 基于数据内容生成标签 if (mappedData.city) { tags.push(`${mappedData.city}地区`) } if (mappedData.birthDate) { const birthYear = new Date(mappedData.birthDate).getFullYear() const currentYear = new Date().getFullYear() const age = currentYear - birthYear if (age >= 18 && age < 30) { tags.push("年轻用户") } else if (age >= 30 && age < 50) { tags.push("中年用户") } } return tags } // 生成统一属性 private generateUnifiedAttributes(mappedData: Record, source: string): Record { const attributes: Record = {} // 基于来源设置属性 if (source === "douyin" && mappedData.followerCount) { attributes.socialInfluence = mappedData.followerCount } if (source === "touchkebao_call") { attributes.lastContactTime = mappedData.callTime || new Date().toISOString() attributes.contactChannel = "phone" } if (source === "cunkebao_form") { attributes.lastContactTime = new Date().toISOString() attributes.contactChannel = "form" } // 设置活跃度 attributes.lastActiveDays = 0 // 刚接入的数据认为是活跃的 return attributes } // 提取内容活动 private extractContentActivity(originalData: Record, source: string): any[] { const activities: any[] = [] if (source === "douyin" && originalData.videos) { originalData.videos.forEach((video: any) => { activities.push({ contentId: video.id, type: "video", url: video.url, text: video.description, publishTime: video.createTime, likes: video.likeCount, comments: video.commentCount, }) }) } if (source === "xiaohongshu" && originalData.notes) { originalData.notes.forEach((note: any) => { activities.push({ contentId: note.id, type: "note", url: note.url, text: note.content, publishTime: note.createTime, likes: note.likeCount, comments: note.commentCount, }) }) } return activities } // 提取互动历史 private extractInteractionHistory(originalData: Record, source: string): any[] { const interactions: any[] = [] if (originalData.interactions) { originalData.interactions.forEach((interaction: any) => { interactions.push({ type: interaction.type, targetId: interaction.targetId, content: interaction.content, time: interaction.time, }) }) } return interactions } // 生成AI洞察 private generateAIInsights(mappedData: Record, source: string): Record { const insights: Record = {} // 基于数据生成人设标签 const personaTags: string[] = [] if (source === "douyin") { personaTags.push("视频爱好者") } if (source === "xiaohongshu") { personaTags.push("生活分享者") } if (mappedData.city === "北京") { personaTags.push("一线城市用户") } insights.personaTags = personaTags insights.sentimentScore = 0.7 // 默认中性偏正面 insights.potentialNeeds = this.inferPotentialNeeds(mappedData, source) return insights } // 推断潜在需求 private inferPotentialNeeds(mappedData: Record, source: string): string[] { const needs: string[] = [] if (source === "touchkebao_call") { needs.push("产品咨询", "客服支持") } if (source === "cunkebao_form") { needs.push("产品了解", "营销活动") } if (source === "douyin") { needs.push("内容创作工具", "社交媒体管理") } return needs } // 数据质量检查 private async validateDataQuality(data: ProcessedUserData): Promise { const errors: string[] = [] // 检查必需字段 if (!data.userId) { errors.push("缺少用户ID") } // 检查数据完整性 if (!data.coreProfile || Object.keys(data.coreProfile).length === 0) { errors.push("核心档案为空") } // 检查源档案 if (!data.sourceProfiles || data.sourceProfiles.length === 0) { errors.push("缺少源档案数据") } if (errors.length > 0) { throw new Error(`数据质量检查失败: ${errors.join(", ")}`) } } // 存储用户数据(模拟) private async storeUserData(data: ProcessedUserData): Promise { // 在实际应用中,这里会将数据存储到MongoDB console.log("存储用户数据:", { userId: data.userId, coreProfileFields: Object.keys(data.coreProfile).length, tagsCount: data.unifiedTags.length, sourceProfilesCount: data.sourceProfiles.length, }) } // 批量处理数据接入 public async processBatchIngestion(requests: IngestionRequest[]): Promise { const results: ProcessedUserData[] = [] const errors: Array<{ request: IngestionRequest; error: string }> = [] for (const request of requests) { try { const result = await this.processIngestionRequest(request) results.push(result) } catch (error) { errors.push({ request, error: (error as Error).message, }) } } if (errors.length > 0) { console.warn("批量处理中的错误:", errors) } return results } }