Files
users/lib/mongodb.ts

516 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* 神射手 MongoDB 连接器
* 连接到卡若AI神射手后端数据库
*
* 数据库概览:
* - KR.用户估值: 1436万条 - 统一画像、RFM评分
* - KR_腾讯.QQ+手机: 7.05亿条 - QQ↔手机关联
* - KR_微博.微博uid+手机: 2.17亿条 - 微博UID↔手机
* - KR_京东.jd_com: 1.42亿条 - 京东用户
* - KR_存客宝.用户资产统一视图: 21.6万条 - 存客宝用户
*/
import { MongoClient, Db, Collection, Document } from 'mongodb'
// MongoDB 连接配置
const MONGODB_URI = process.env.MONGODB_URI || 'mongodb://admin:admin123@localhost:27017/?authSource=admin'
// 数据库名称常量
export const DB_NAMES = {
KR: 'KR',
KR_TENCENT: 'KR_腾讯',
KR_WEIBO: 'KR_微博',
KR_JD: 'KR_京东',
KR_HOTEL: 'KR_酒店',
KR_SF: 'KR_顺丰',
KR_CKB: 'KR_存客宝',
} as const
// 集合名称常量
export const COLLECTION_NAMES = {
USER_VALUATION: '用户估值',
QQ_PHONE: 'QQ+手机',
WEIBO_UID_PHONE: '微博uid+手机',
JD_COM: 'jd_com',
HOTEL_RECORDS: '酒店开房记录_2013年8月_2000万内',
SF_EXPRESS: '顺丰快递数据',
CKB_UNIFIED: '用户资产统一视图',
} as const
// 全局客户端实例(连接池复用)
let client: MongoClient | null = null
let clientPromise: Promise<MongoClient> | null = null
/**
* 获取 MongoDB 客户端(单例模式)
*/
export async function getMongoClient(): Promise<MongoClient> {
if (client) {
return client
}
if (!clientPromise) {
clientPromise = MongoClient.connect(MONGODB_URI, {
maxPoolSize: 10,
minPoolSize: 2,
maxIdleTimeMS: 60000,
serverSelectionTimeoutMS: 5000,
connectTimeoutMS: 10000,
})
}
client = await clientPromise
return client
}
/**
* 获取指定数据库
*/
export async function getDatabase(dbName: string): Promise<Db> {
const client = await getMongoClient()
return client.db(dbName)
}
/**
* 获取指定集合
*/
export async function getCollection<T extends Document = Document>(
dbName: string,
collectionName: string
): Promise<Collection<T>> {
const db = await getDatabase(dbName)
return db.collection<T>(collectionName)
}
/**
* 用户估值文档类型(匹配实际数据库结构)
*/
export interface UserValuationDoc {
_id?: any
user_key?: string
phone?: string
phone_masked?: string
name?: string
email?: string
email_masked?: string
id_number_masked?: string
gender?: string
age_range?: string
province?: string
city?: string
address?: string
// 评分字段
user_evaluation_score?: number // 用户估值分
rfm_composite_score?: number
rfm_r_score?: number
rfm_f_score?: number
rfm_m_score?: number
user_level?: string
// 来源和标签
source_collections?: string[]
source_channels?: string[]
first_channel?: string
merge_evidence?: string[]
tags?: string[]
// 数据质量
data_quality?: {
completeness?: number
source_count?: number
}
// 时间
computed_at?: Date
created_at?: Date
updated_at?: Date
version?: string
}
/**
* QQ+手机文档类型
*/
export interface QQPhoneDoc {
_id?: any
qq?: string
phone?: string
省份?: string
地区?: string
运营商?: string
手机号评分?: number
QQ号评分?: number
}
/**
* 存客宝用户资产文档类型
*/
export interface CKBUserAssetDoc {
_id?: any
user_key?: string
phone?: string
phone_masked?: string
core_profile?: {
name?: string
province?: string
city?: string
address?: string
email?: string
}
social_accounts?: {
wechat?: string
qq?: string
}
rfm_scores?: {
composite_score?: number
user_level?: string
R?: number
F?: number
M?: number
}
user_evaluation_score?: number
unified_tags?: string[]
traffic_pool?: {
pool_id?: string
pool_name?: string
}
source_channels?: string[]
}
// ========== 快捷查询函数 ==========
/**
* 标准化手机号格式
* 支持: 13800138000, +8613800138000, 8613800138000
*/
function normalizePhone(phone: string): string[] {
const cleaned = phone.replace(/\D/g, '')
const variants: string[] = []
if (cleaned.startsWith('86') && cleaned.length === 13) {
// 8613800138000 -> 多种格式
const base = cleaned.slice(2)
variants.push(base, `+86${base}`, `86${base}`, cleaned)
} else if (cleaned.length === 11 && cleaned.startsWith('1')) {
// 13800138000 -> 多种格式
variants.push(cleaned, `+86${cleaned}`, `86${cleaned}`)
} else {
variants.push(phone, cleaned)
}
return [...new Set(variants)]
}
/**
* 按手机号查询用户估值
*/
export async function queryUserByPhone(phone: string): Promise<UserValuationDoc | null> {
const coll = await getCollection<UserValuationDoc>(DB_NAMES.KR, COLLECTION_NAMES.USER_VALUATION)
// 尝试多种手机号格式
const phoneVariants = normalizePhone(phone)
return coll.findOne({
$or: phoneVariants.map(p => ({ phone: p }))
})
}
/**
* 按手机号查询QQ
*/
export async function queryQQByPhone(phone: string): Promise<QQPhoneDoc | null> {
const coll = await getCollection<QQPhoneDoc>(DB_NAMES.KR_TENCENT, COLLECTION_NAMES.QQ_PHONE)
return coll.findOne({ phone })
}
/**
* 按QQ号查询手机
*/
export async function queryPhoneByQQ(qq: string): Promise<QQPhoneDoc | null> {
const coll = await getCollection<QQPhoneDoc>(DB_NAMES.KR_TENCENT, COLLECTION_NAMES.QQ_PHONE)
return coll.findOne({ qq })
}
/**
* 跨库完整画像查询(并行)
*/
export async function queryFullProfile(phone: string): Promise<{
valuation: UserValuationDoc | null
qq: QQPhoneDoc | null
ckb: CKBUserAssetDoc | null
}> {
const [valuation, qq, ckb] = await Promise.all([
queryUserByPhone(phone),
queryQQByPhone(phone),
(async () => {
const coll = await getCollection<CKBUserAssetDoc>(DB_NAMES.KR_CKB, COLLECTION_NAMES.CKB_UNIFIED)
return coll.findOne({ phone })
})()
])
return { valuation, qq, ckb }
}
/**
* 用户列表查询(分页)
*/
export async function queryUserList(options: {
page?: number
pageSize?: number
userLevel?: string
minRfm?: number
maxRfm?: number
search?: string
tags?: string[]
}): Promise<{ data: UserValuationDoc[], total: number }> {
const {
page = 1,
pageSize = 20,
userLevel,
minRfm,
maxRfm,
search,
tags
} = options
const coll = await getCollection<UserValuationDoc>(DB_NAMES.KR, COLLECTION_NAMES.USER_VALUATION)
// 构建查询条件
const query: any = {}
if (userLevel) {
query.user_level = userLevel
}
if (minRfm !== undefined || maxRfm !== undefined) {
query.rfm_composite_score = {}
if (minRfm !== undefined) query.rfm_composite_score.$gte = minRfm
if (maxRfm !== undefined) query.rfm_composite_score.$lte = maxRfm
}
if (search) {
query.$or = [
{ phone: { $regex: search, $options: 'i' } },
{ name: { $regex: search, $options: 'i' } },
{ phone_masked: { $regex: search, $options: 'i' } }
]
}
if (tags && tags.length > 0) {
query.tags = { $in: tags }
}
const [data, total] = await Promise.all([
coll.find(query)
.skip((page - 1) * pageSize)
.limit(pageSize)
.sort({ rfm_composite_score: -1 })
.toArray(),
coll.countDocuments(query)
])
return { data, total }
}
/**
* 获取数据库统计信息
*/
export async function getDatabaseStats(): Promise<{
connected: boolean
databases: { name: string, collections: number, documents: number, sizeGB: number }[]
totalDocuments: number
totalSizeGB: number
}> {
try {
const client = await getMongoClient()
const adminDb = client.db('admin')
// 获取数据库列表
const dbs = await adminDb.admin().listDatabases()
const databases = []
let totalDocuments = 0
let totalSizeGB = 0
for (const db of dbs.databases) {
if (db.name.startsWith('KR')) {
const database = client.db(db.name)
const collections = await database.listCollections().toArray()
let dbDocCount = 0
for (const coll of collections) {
try {
const count = await database.collection(coll.name).estimatedDocumentCount()
dbDocCount += count
} catch (e) {
// 忽略错误
}
}
const sizeGB = (db.sizeOnDisk || 0) / (1024 * 1024 * 1024)
databases.push({
name: db.name,
collections: collections.length,
documents: dbDocCount,
sizeGB: Math.round(sizeGB * 100) / 100
})
totalDocuments += dbDocCount
totalSizeGB += sizeGB
}
}
return {
connected: true,
databases,
totalDocuments,
totalSizeGB: Math.round(totalSizeGB * 100) / 100
}
} catch (error) {
console.error('MongoDB stats error:', error)
return {
connected: false,
databases: [],
totalDocuments: 0,
totalSizeGB: 0
}
}
}
/**
* RFM 用户分组统计
*/
export async function getRFMGroupSummary(): Promise<{
levels: { level: string, count: number, percentage: number }[]
total: number
}> {
const coll = await getCollection<UserValuationDoc>(DB_NAMES.KR, COLLECTION_NAMES.USER_VALUATION)
const pipeline = [
{
$group: {
_id: '$user_level',
count: { $sum: 1 }
}
},
{
$sort: { count: -1 }
}
]
const result = await coll.aggregate(pipeline).toArray()
const total = result.reduce((sum, item) => sum + item.count, 0)
const levels = result.map(item => ({
level: item._id || '未分类',
count: item.count,
percentage: Math.round((item.count / total) * 10000) / 100
}))
return { levels, total }
}
/**
* 获取所有标签
*/
export async function getDistinctTags(): Promise<string[]> {
const coll = await getCollection<UserValuationDoc>(DB_NAMES.KR, COLLECTION_NAMES.USER_VALUATION)
const tags = await coll.distinct('tags')
return tags.filter(Boolean) as string[]
}
/**
* 智能搜索 - 跨库查询
*/
export async function intelligentSearch(query: string, options: {
limit?: number
offset?: number
}): Promise<{
users: UserValuationDoc[]
total: number
queryType: 'phone' | 'name' | 'qq' | 'email' | 'general'
}> {
const { limit = 20, offset = 0 } = options
// 识别查询类型
let queryType: 'phone' | 'name' | 'qq' | 'email' | 'general' = 'general'
const trimmed = query.trim()
if (/^1[3-9]\d{9}$/.test(trimmed)) {
queryType = 'phone'
} else if (/^\d{5,12}$/.test(trimmed)) {
queryType = 'qq'
} else if (/@/.test(trimmed)) {
queryType = 'email'
} else if (/^[\u4e00-\u9fa5]{2,4}$/.test(trimmed)) {
queryType = 'name'
}
const coll = await getCollection<UserValuationDoc>(DB_NAMES.KR, COLLECTION_NAMES.USER_VALUATION)
let filter: any = {}
switch (queryType) {
case 'phone':
// 使用多种格式匹配
const phoneVariants = normalizePhone(trimmed)
filter = { $or: phoneVariants.map(p => ({ phone: p })) }
break
case 'qq':
// 先查QQ找手机再查用户估值
const qqResult = await queryPhoneByQQ(trimmed)
if (qqResult?.phone) {
const qqPhoneVariants = normalizePhone(qqResult.phone)
filter = { $or: qqPhoneVariants.map(p => ({ phone: p })) }
} else {
filter = { _id: null } // 查不到
}
break
case 'email':
filter = { email: { $regex: trimmed, $options: 'i' } }
break
case 'name':
filter = { name: trimmed }
break
default:
filter = {
$or: [
{ phone: { $regex: trimmed, $options: 'i' } },
{ name: { $regex: trimmed, $options: 'i' } },
{ phone_masked: { $regex: trimmed, $options: 'i' } }
]
}
}
const [users, total] = await Promise.all([
coll.find(filter).skip(offset).limit(limit).toArray(),
coll.countDocuments(filter)
])
return { users, total, queryType }
}
/**
* 健康检查
*/
export async function healthCheck(): Promise<{
mongodb: boolean
latencyMs: number
error?: string
}> {
const start = Date.now()
try {
const client = await getMongoClient()
await client.db('admin').command({ ping: 1 })
return {
mongodb: true,
latencyMs: Date.now() - start
}
} catch (error) {
return {
mongodb: false,
latencyMs: Date.now() - start,
error: error instanceof Error ? error.message : 'Unknown error'
}
}
}