/** * MongoDB 连接器 - 神射手用户资产数字化查询 * 连接本地 Docker MongoDB,支持 20亿+ 用户数据查询 */ import { MongoClient, Db, Collection, ObjectId } from 'mongodb' // MongoDB 连接配置 - 本地 Docker const MONGODB_URI = process.env.MONGODB_URI || 'mongodb://admin:admin123@localhost:27017/?authSource=admin' // 数据库和集合常量 const DB_NAMES = { KR: 'KR', KR_腾讯: 'KR_腾讯', KR_微博: 'KR_微博', KR_京东: 'KR_京东', KR_酒店: 'KR_酒店', KR_顺丰: 'KR_顺丰', KR_存客宝: 'KR_存客宝', KR_点了码: 'KR_点了码' } // 全局连接缓存 let cachedClient: MongoClient | null = null let cachedDb: Db | null = null // 用户估值文档接口 export interface UserValuationDoc { _id?: ObjectId phone?: string phone_masked?: string name?: string gender?: string age_range?: string province?: string city?: string user_level?: string user_evaluation_score?: number rfm_composite_score?: number tags?: string[] data_quality?: { completeness: number source_count: number } computed_at?: Date } // QQ+手机文档接口 export interface QQPhoneDoc { _id?: ObjectId qq: string phone?: string 手机号?: string QQ号评分?: number 手机号评分?: number 省份?: string 地区?: string 运营商?: string 更新时间?: Date } // 存客宝用户资产文档接口 export interface CKBUserAssetDoc { _id?: ObjectId phone?: string wechat_id?: string nickname?: string total_assets?: number tags?: string[] } /** * 获取 MongoDB 客户端(连接池复用) */ export async function getMongoClient(): Promise { if (cachedClient) { return cachedClient } const client = new MongoClient(MONGODB_URI, { maxPoolSize: 10, minPoolSize: 2, maxIdleTimeMS: 60000, serverSelectionTimeoutMS: 5000, }) await client.connect() cachedClient = client return client } /** * 获取指定数据库 */ export async function getDatabase(dbName: string = DB_NAMES.KR): Promise { const client = await getMongoClient() return client.db(dbName) } /** * 规范化手机号,返回多种格式用于匹配 */ function normalizePhone(phone: string): string[] { const cleaned = phone.replace(/\D/g, '') const variants: string[] = [] if (cleaned.startsWith('86') && cleaned.length === 13) { const base = cleaned.slice(2) variants.push(base, `+86${base}`, `86${base}`, cleaned) } else if (cleaned.length === 11 && cleaned.startsWith('1')) { variants.push(cleaned, `+86${cleaned}`, `86${cleaned}`) } else { variants.push(phone, cleaned) } return [...new Set(variants)] } /** * 手机号脱敏 */ export function maskPhone(phone?: string): string { if (!phone) return '***' const cleaned = phone.replace(/\D/g, '') if (cleaned.length >= 11) { const phone11 = cleaned.slice(-11) return `${phone11.slice(0, 3)}****${phone11.slice(-4)}` } return phone.slice(0, 3) + '****' } /** * 按手机号查询用户 */ export async function queryUserByPhone(phone: string): Promise { const db = await getDatabase(DB_NAMES.KR) const collection = db.collection('用户估值') const phoneVariants = normalizePhone(phone) const query = { $or: phoneVariants.map(p => ({ phone: p })) } return collection.findOne(query) } /** * 查询完整用户画像(跨库) */ export async function queryFullProfile(phone: string): Promise<{ valuation: UserValuationDoc | null qqPhone: QQPhoneDoc | null ckbAsset: CKBUserAssetDoc | null }> { const client = await getMongoClient() const phoneVariants = normalizePhone(phone) // 并行查询多个库 const [valuation, qqPhone, ckbAsset] = await Promise.all([ // KR.用户估值 client.db(DB_NAMES.KR).collection('用户估值').findOne({ $or: phoneVariants.map(p => ({ phone: p })) }), // KR_腾讯.QQ+手机 client.db(DB_NAMES.KR_腾讯).collection('QQ+手机').findOne({ $or: phoneVariants.map(p => ({ phone: p })) }), // KR_存客宝.用户资产统一视图 client.db(DB_NAMES.KR_存客宝).collection('用户资产统一视图').findOne({ $or: phoneVariants.map(p => ({ phone: p })) }).catch(() => null) ]) return { valuation, qqPhone, ckbAsset } } /** * 查询用户列表 */ export async function queryUserList(options: { page?: number pageSize?: number filters?: Record }): Promise<{ users: UserValuationDoc[], total: number }> { const { page = 1, pageSize = 20, filters = {} } = options const db = await getDatabase(DB_NAMES.KR) const collection = db.collection('用户估值') const skip = (page - 1) * pageSize const [users, total] = await Promise.all([ collection.find(filters) .sort({ user_evaluation_score: -1 }) .skip(skip) .limit(pageSize) .toArray(), collection.countDocuments(filters) ]) return { users, total } } /** * 获取数据库统计信息 */ export async function getDatabaseStats(): Promise<{ connected: boolean databases: string[] totalDocuments: number totalSize: number latency: number }> { const startTime = Date.now() try { const client = await getMongoClient() const dbList = await client.db().admin().listDatabases() let totalDocuments = 0 let totalSize = 0 const databases: string[] = [] for (const dbInfo of dbList.databases) { if (dbInfo.name.startsWith('KR')) { databases.push(dbInfo.name) totalSize += dbInfo.sizeOnDisk || 0 // 估算文档数 const db = client.db(dbInfo.name) const collections = await db.listCollections().toArray() for (const coll of collections) { const count = await db.collection(coll.name).estimatedDocumentCount() totalDocuments += count } } } return { connected: true, databases, totalDocuments, totalSize, latency: Date.now() - startTime } } catch (error) { return { connected: false, databases: [], totalDocuments: 0, totalSize: 0, latency: Date.now() - startTime } } } /** * 获取 RFM 分组统计 */ export async function getRFMGroupSummary(): Promise<{ gradeCount: Record valueCount: Record totalUsers: number avgScore: number }> { const db = await getDatabase(DB_NAMES.KR) const collection = db.collection('用户估值') const pipeline = [ { $group: { _id: null, total: { $sum: 1 }, avgScore: { $avg: '$user_evaluation_score' } } } ] const [stats] = await collection.aggregate(pipeline).toArray() return { gradeCount: { S: 0, A: 0, B: 0, C: 0, D: stats?.total || 0 }, valueCount: { '高': 0, '中': 0, '低': stats?.total || 0 }, totalUsers: stats?.total || 0, avgScore: stats?.avgScore || 0 } } /** * 智能搜索 */ export async function intelligentSearch(query: string, options: { limit?: number offset?: number } = {}): Promise<{ users: UserValuationDoc[] total: number queryType: string }> { const { limit = 50, offset = 0 } = options const db = await getDatabase(DB_NAMES.KR) const collection = db.collection('用户估值') // 检测查询类型 let queryType = 'keyword' let searchQuery: any = {} // 手机号 if (/^1[3-9]\d{9}$/.test(query) || /^\+?86/.test(query)) { queryType = 'phone' const variants = normalizePhone(query) searchQuery = { $or: variants.map(p => ({ phone: p })) } } // QQ号 else if (/^\d{5,11}$/.test(query)) { queryType = 'qq' // 先查 QQ 库获取手机号 const client = await getMongoClient() const qqDoc = await client.db(DB_NAMES.KR_腾讯).collection('QQ+手机').findOne({ $or: [{ qq: query }, { qq: parseInt(query) }] }) if (qqDoc && (qqDoc.phone || qqDoc.手机号)) { const phone = qqDoc.phone || qqDoc.手机号 const variants = normalizePhone(String(phone)) searchQuery = { $or: variants.map(p => ({ phone: p })) } } else { return { users: [], total: 0, queryType } } } // 关键词搜索 else { searchQuery = { $or: [ { name: { $regex: query, $options: 'i' } }, { city: { $regex: query, $options: 'i' } }, { province: { $regex: query, $options: 'i' } } ] } } const [users, total] = await Promise.all([ collection.find(searchQuery) .sort({ user_evaluation_score: -1 }) .skip(offset) .limit(limit) .toArray(), collection.countDocuments(searchQuery) ]) return { users, total, queryType } } /** * 健康检查 */ export async function healthCheck(): Promise<{ status: string latency: number message: string }> { const startTime = Date.now() try { const client = await getMongoClient() await client.db().admin().ping() return { status: 'healthy', latency: Date.now() - startTime, message: 'MongoDB 连接正常' } } catch (error: any) { return { status: 'unhealthy', latency: Date.now() - startTime, message: error.message } } }