487 lines
13 KiB
TypeScript
487 lines
13 KiB
TypeScript
/**
|
||
* MongoDB 连接器 - 神射手用户资产数字化查询
|
||
* 连接本地/Docker MongoDB,支持 20亿+ 用户数据查询
|
||
* 账号密码见卡若AI《00_账号与API索引》二、本机 MongoDB(统一):admin / admin123
|
||
*/
|
||
|
||
import { MongoClient, Db, Collection, ObjectId } from 'mongodb'
|
||
|
||
// 本地开发默认(有认证):卡若AI 本机 MongoDB 账号 admin/admin123
|
||
const DEFAULT_MONGODB_URI = 'mongodb://admin:admin123@localhost:27017/?authSource=admin'
|
||
|
||
// 优先使用环境变量;未设置时才用默认(Docker 下 MONGODB_URI 通常为 host.docker.internal 或无认证,不要被替换成 localhost)
|
||
function getMongoUri(): string {
|
||
const env = process.env.MONGODB_URI?.trim()
|
||
if (!env) return DEFAULT_MONGODB_URI
|
||
return env
|
||
}
|
||
|
||
const MONGODB_URI = getMongoUri()
|
||
|
||
// 数据库和集合常量
|
||
const DB_NAMES = {
|
||
KR: 'KR',
|
||
KR_腾讯: 'KR_腾讯',
|
||
KR_微博: 'KR_微博',
|
||
KR_京东: 'KR_京东',
|
||
KR_酒店: 'KR_酒店',
|
||
KR_顺丰: 'KR_顺丰',
|
||
KR_存客宝: 'KR_存客宝',
|
||
KR_点了码: 'KR_点了码'
|
||
}
|
||
|
||
// 全局连接缓存
|
||
let cachedClient: MongoClient | null = null
|
||
let cachedDb: Db | null = null
|
||
|
||
// 用户估值文档接口
|
||
export interface UserValuationDoc {
|
||
_id?: ObjectId
|
||
phone?: string
|
||
phone_masked?: string
|
||
name?: string
|
||
gender?: string
|
||
age_range?: string
|
||
province?: string
|
||
city?: string
|
||
user_level?: string
|
||
user_evaluation_score?: number
|
||
rfm_composite_score?: number
|
||
tags?: string[]
|
||
data_quality?: {
|
||
completeness: number
|
||
source_count: number
|
||
}
|
||
computed_at?: Date
|
||
}
|
||
|
||
// QQ+手机文档接口
|
||
export interface QQPhoneDoc {
|
||
_id?: ObjectId
|
||
qq: string
|
||
phone?: string
|
||
手机号?: string
|
||
QQ号评分?: number
|
||
手机号评分?: number
|
||
省份?: string
|
||
地区?: string
|
||
运营商?: string
|
||
更新时间?: Date
|
||
}
|
||
|
||
// 存客宝用户资产文档接口
|
||
export interface CKBUserAssetDoc {
|
||
_id?: ObjectId
|
||
phone?: string
|
||
wechat_id?: string
|
||
nickname?: string
|
||
total_assets?: number
|
||
tags?: string[]
|
||
}
|
||
|
||
/**
|
||
* 获取 MongoDB 客户端(连接池复用)
|
||
*/
|
||
export async function getMongoClient(): Promise<MongoClient> {
|
||
if (cachedClient) {
|
||
return cachedClient
|
||
}
|
||
|
||
const client = new MongoClient(MONGODB_URI, {
|
||
maxPoolSize: 10,
|
||
minPoolSize: 2,
|
||
maxIdleTimeMS: 60000,
|
||
serverSelectionTimeoutMS: 15000,
|
||
connectTimeoutMS: 10000,
|
||
})
|
||
|
||
try {
|
||
await client.connect()
|
||
} catch (err: any) {
|
||
cachedClient = null
|
||
throw new Error(`数据库连接失败: ${err.message}(请确认 MongoDB 已启动且 MONGODB_URI 正确)`)
|
||
}
|
||
cachedClient = client
|
||
return client
|
||
}
|
||
|
||
/**
|
||
* 获取指定数据库
|
||
*/
|
||
export async function getDatabase(dbName: string = DB_NAMES.KR): Promise<Db> {
|
||
const client = await getMongoClient()
|
||
return client.db(dbName)
|
||
}
|
||
|
||
/**
|
||
* 规范化手机号,返回多种格式用于匹配
|
||
*/
|
||
function normalizePhone(phone: string): string[] {
|
||
const cleaned = phone.replace(/\D/g, '')
|
||
const variants: string[] = []
|
||
|
||
if (cleaned.startsWith('86') && cleaned.length === 13) {
|
||
const base = cleaned.slice(2)
|
||
variants.push(base, `+86${base}`, `86${base}`, cleaned)
|
||
} else if (cleaned.length === 11 && cleaned.startsWith('1')) {
|
||
variants.push(cleaned, `+86${cleaned}`, `86${cleaned}`)
|
||
} else {
|
||
variants.push(phone, cleaned)
|
||
}
|
||
return [...new Set(variants)]
|
||
}
|
||
|
||
/**
|
||
* 手机号脱敏
|
||
*/
|
||
export function maskPhone(phone?: string): string {
|
||
if (!phone) return '***'
|
||
const cleaned = phone.replace(/\D/g, '')
|
||
if (cleaned.length >= 11) {
|
||
const phone11 = cleaned.slice(-11)
|
||
return `${phone11.slice(0, 3)}****${phone11.slice(-4)}`
|
||
}
|
||
return phone.slice(0, 3) + '****'
|
||
}
|
||
|
||
/**
|
||
* 按手机号查询用户
|
||
*/
|
||
export async function queryUserByPhone(phone: string): Promise<UserValuationDoc | null> {
|
||
const db = await getDatabase(DB_NAMES.KR)
|
||
const collection = db.collection<UserValuationDoc>('用户估值')
|
||
|
||
const phoneVariants = normalizePhone(phone)
|
||
const query = {
|
||
$or: phoneVariants.map(p => ({ phone: p }))
|
||
}
|
||
|
||
return collection.findOne(query)
|
||
}
|
||
|
||
/**
|
||
* 查询完整用户画像(跨库)
|
||
*/
|
||
export async function queryFullProfile(phone: string): Promise<{
|
||
valuation: UserValuationDoc | null
|
||
qqPhone: QQPhoneDoc | null
|
||
ckbAsset: CKBUserAssetDoc | null
|
||
}> {
|
||
const client = await getMongoClient()
|
||
const phoneVariants = normalizePhone(phone)
|
||
|
||
// 并行查询多个库
|
||
const [valuation, qqPhone, ckbAsset] = await Promise.all([
|
||
// KR.用户估值
|
||
client.db(DB_NAMES.KR).collection<UserValuationDoc>('用户估值').findOne({
|
||
$or: phoneVariants.map(p => ({ phone: p }))
|
||
}),
|
||
|
||
// KR_腾讯.QQ+手机
|
||
client.db(DB_NAMES.KR_腾讯).collection<QQPhoneDoc>('QQ+手机').findOne({
|
||
$or: phoneVariants.map(p => ({ phone: p }))
|
||
}),
|
||
|
||
// KR_存客宝.用户资产统一视图
|
||
client.db(DB_NAMES.KR_存客宝).collection<CKBUserAssetDoc>('用户资产统一视图').findOne({
|
||
$or: phoneVariants.map(p => ({ phone: p }))
|
||
}).catch(() => null)
|
||
])
|
||
|
||
return { valuation, qqPhone, ckbAsset }
|
||
}
|
||
|
||
/**
|
||
* 查询用户列表
|
||
*/
|
||
export async function queryUserList(options: {
|
||
page?: number
|
||
pageSize?: number
|
||
filters?: Record<string, any>
|
||
}): Promise<{ users: UserValuationDoc[], total: number }> {
|
||
const { page = 1, pageSize = 20, filters = {} } = options
|
||
const db = await getDatabase(DB_NAMES.KR)
|
||
const collection = db.collection<UserValuationDoc>('用户估值')
|
||
|
||
const skip = (page - 1) * pageSize
|
||
|
||
const [users, total] = await Promise.all([
|
||
collection.find(filters)
|
||
.sort({ user_evaluation_score: -1 })
|
||
.skip(skip)
|
||
.limit(pageSize)
|
||
.toArray(),
|
||
collection.countDocuments(filters)
|
||
])
|
||
|
||
return { users, total }
|
||
}
|
||
|
||
/**
|
||
* 获取数据库统计信息
|
||
*/
|
||
export async function getDatabaseStats(): Promise<{
|
||
connected: boolean
|
||
databases: string[]
|
||
totalDocuments: number
|
||
totalSize: number
|
||
latency: number
|
||
}> {
|
||
const startTime = Date.now()
|
||
|
||
try {
|
||
const client = await getMongoClient()
|
||
const dbList = await client.db().admin().listDatabases()
|
||
|
||
let totalDocuments = 0
|
||
let totalSize = 0
|
||
const databases: string[] = []
|
||
|
||
for (const dbInfo of dbList.databases) {
|
||
if (dbInfo.name.startsWith('KR')) {
|
||
databases.push(dbInfo.name)
|
||
totalSize += dbInfo.sizeOnDisk || 0
|
||
|
||
// 估算文档数
|
||
const db = client.db(dbInfo.name)
|
||
const collections = await db.listCollections().toArray()
|
||
for (const coll of collections) {
|
||
const count = await db.collection(coll.name).estimatedDocumentCount()
|
||
totalDocuments += count
|
||
}
|
||
}
|
||
}
|
||
|
||
return {
|
||
connected: true,
|
||
databases,
|
||
totalDocuments,
|
||
totalSize,
|
||
latency: Date.now() - startTime
|
||
}
|
||
} catch (error) {
|
||
return {
|
||
connected: false,
|
||
databases: [],
|
||
totalDocuments: 0,
|
||
totalSize: 0,
|
||
latency: Date.now() - startTime
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取 RFM 分组统计
|
||
*/
|
||
export async function getRFMGroupSummary(): Promise<{
|
||
gradeCount: Record<string, number>
|
||
valueCount: Record<string, number>
|
||
totalUsers: number
|
||
avgScore: number
|
||
}> {
|
||
const db = await getDatabase(DB_NAMES.KR)
|
||
const collection = db.collection('用户估值')
|
||
|
||
const pipeline = [
|
||
{
|
||
$group: {
|
||
_id: null,
|
||
total: { $sum: 1 },
|
||
avgScore: { $avg: '$user_evaluation_score' }
|
||
}
|
||
}
|
||
]
|
||
|
||
const [stats] = await collection.aggregate(pipeline).toArray()
|
||
|
||
return {
|
||
gradeCount: { S: 0, A: 0, B: 0, C: 0, D: stats?.total || 0 },
|
||
valueCount: { '高': 0, '中': 0, '低': stats?.total || 0 },
|
||
totalUsers: stats?.total || 0,
|
||
avgScore: stats?.avgScore || 0
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 智能搜索
|
||
*/
|
||
export async function intelligentSearch(query: string, options: {
|
||
limit?: number
|
||
offset?: number
|
||
} = {}): Promise<{
|
||
users: UserValuationDoc[]
|
||
total: number
|
||
queryType: string
|
||
}> {
|
||
const { limit = 50, offset = 0 } = options
|
||
const db = await getDatabase(DB_NAMES.KR)
|
||
const collection = db.collection<UserValuationDoc>('用户估值')
|
||
|
||
// 检测查询类型
|
||
let queryType = 'keyword'
|
||
let searchQuery: any = {}
|
||
|
||
// 手机号
|
||
if (/^1[3-9]\d{9}$/.test(query) || /^\+?86/.test(query)) {
|
||
queryType = 'phone'
|
||
const variants = normalizePhone(query)
|
||
searchQuery = { $or: variants.map(p => ({ phone: p })) }
|
||
}
|
||
// QQ号
|
||
else if (/^\d{5,11}$/.test(query)) {
|
||
queryType = 'qq'
|
||
// 先查 QQ 库获取手机号
|
||
const client = await getMongoClient()
|
||
const qqDoc = await client.db(DB_NAMES.KR_腾讯).collection<QQPhoneDoc>('QQ+手机').findOne({
|
||
$or: [{ qq: query }, { qq: parseInt(query) }]
|
||
})
|
||
|
||
if (qqDoc && (qqDoc.phone || qqDoc.手机号)) {
|
||
const phone = qqDoc.phone || qqDoc.手机号
|
||
const variants = normalizePhone(String(phone))
|
||
searchQuery = { $or: variants.map(p => ({ phone: p })) }
|
||
} else {
|
||
return { users: [], total: 0, queryType }
|
||
}
|
||
}
|
||
// 关键词搜索
|
||
else {
|
||
searchQuery = {
|
||
$or: [
|
||
{ name: { $regex: query, $options: 'i' } },
|
||
{ city: { $regex: query, $options: 'i' } },
|
||
{ province: { $regex: query, $options: 'i' } }
|
||
]
|
||
}
|
||
}
|
||
|
||
const [users, total] = await Promise.all([
|
||
collection.find(searchQuery)
|
||
.sort({ user_evaluation_score: -1 })
|
||
.skip(offset)
|
||
.limit(limit)
|
||
.toArray(),
|
||
collection.countDocuments(searchQuery)
|
||
])
|
||
|
||
return { users, total, queryType }
|
||
}
|
||
|
||
/** 身份证号规范化(18 位或 17 位+X) */
|
||
function normalizeIdCard(idCard: string): string {
|
||
const s = idCard.replace(/\s/g, '').toUpperCase()
|
||
if (/^\d{17}[\dXx]$/.test(s)) return s
|
||
if (/^\d{18}$/.test(s)) return s
|
||
return idCard
|
||
}
|
||
|
||
/**
|
||
* 按身份证号查询用户(KR.用户估值 中 id_card / 身份证 字段,若有)
|
||
* 返回手机号以便后续拉取完整画像
|
||
*/
|
||
export async function searchByIdCard(idCard: string): Promise<{ phone?: string; valuation?: UserValuationDoc } | null> {
|
||
const client = await getMongoClient()
|
||
const col = client.db(DB_NAMES.KR).collection<UserValuationDoc & { id_card?: string; 身份证?: string }>('用户估值')
|
||
const normalized = normalizeIdCard(idCard)
|
||
const doc = await col.findOne({
|
||
$or: [
|
||
{ id_card: normalized },
|
||
{ 身份证: normalized },
|
||
{ id_card: idCard },
|
||
{ 身份证: idCard }
|
||
]
|
||
} as any)
|
||
if (!doc) return null
|
||
const phone = doc.phone || (doc as any).phone_masked
|
||
return phone ? { phone: String(phone).replace(/\D/g, '').slice(-11) || String(phone), valuation: doc } : null
|
||
}
|
||
|
||
/** 将多种标识统一解析为手机号列表(去重,最多 max 条) */
|
||
export async function unifiedResolveToPhones(segments: { type: 'phone' | 'qq' | 'id_card' | 'keyword'; value: string }[], max: number = 10): Promise<string[]> {
|
||
const client = await getMongoClient()
|
||
const seen = new Set<string>()
|
||
const result: string[] = []
|
||
|
||
for (const seg of segments) {
|
||
if (result.length >= max) break
|
||
const v = seg.value.trim()
|
||
if (!v) continue
|
||
|
||
if (seg.type === 'phone') {
|
||
const cleaned = v.replace(/\D/g, '').replace(/^86/, '')
|
||
const phone11 = cleaned.length >= 11 ? cleaned.slice(-11) : cleaned
|
||
if (/^1[3-9]\d{9}$/.test(phone11) && !seen.has(phone11)) {
|
||
seen.add(phone11)
|
||
result.push(phone11)
|
||
}
|
||
continue
|
||
}
|
||
|
||
if (seg.type === 'qq') {
|
||
const qqDoc = await client.db(DB_NAMES.KR_腾讯).collection<QQPhoneDoc>('QQ+手机').findOne({
|
||
$or: [{ qq: v }, { qq: parseInt(v) }]
|
||
})
|
||
const phone = (qqDoc?.phone || qqDoc?.手机号)?.toString?.()
|
||
if (phone) {
|
||
const p11 = phone.replace(/\D/g, '').slice(-11)
|
||
if (/^1[3-9]\d{9}$/.test(p11) && !seen.has(p11)) {
|
||
seen.add(p11)
|
||
result.push(p11)
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
|
||
if (seg.type === 'id_card') {
|
||
const idRes = await searchByIdCard(v)
|
||
if (idRes?.phone && !seen.has(idRes.phone)) {
|
||
seen.add(idRes.phone)
|
||
result.push(idRes.phone)
|
||
}
|
||
continue
|
||
}
|
||
|
||
if (seg.type === 'keyword') {
|
||
const searchRes = await intelligentSearch(v, { limit: max - result.length })
|
||
for (const u of searchRes.users) {
|
||
if (result.length >= max) break
|
||
const phone = u.phone?.toString?.().replace(/\D/g, '').slice(-11)
|
||
if (phone && /^1[3-9]\d{9}$/.test(phone) && !seen.has(phone)) {
|
||
seen.add(phone)
|
||
result.push(phone)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return result.slice(0, max)
|
||
}
|
||
|
||
/**
|
||
* 健康检查
|
||
*/
|
||
export async function healthCheck(): Promise<{
|
||
status: string
|
||
latency: number
|
||
message: string
|
||
}> {
|
||
const startTime = Date.now()
|
||
|
||
try {
|
||
const client = await getMongoClient()
|
||
await client.db().admin().ping()
|
||
|
||
return {
|
||
status: 'healthy',
|
||
latency: Date.now() - startTime,
|
||
message: 'MongoDB 连接正常'
|
||
}
|
||
} catch (error: any) {
|
||
return {
|
||
status: 'unhealthy',
|
||
latency: Date.now() - startTime,
|
||
message: error.message
|
||
}
|
||
}
|
||
}
|