diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..4af24f06 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +.idea/ +Cunkebao/.next/ +Store_vue/node_modules/ +*.zip +Cunkebao/.specstory/ +*.cursorindexingignore +Server/.specstory/ +Store_vue/.specstory/ +Store_vue/unpackage/ +Store_vue/.vscode/ +SuperAdmin/.specstory/ diff --git a/Server/application/command/SyncWechatDataToCkbTask.php b/Server/application/command/SyncWechatDataToCkbTask.php index fe95c1c0..420040d9 100644 --- a/Server/application/command/SyncWechatDataToCkbTask.php +++ b/Server/application/command/SyncWechatDataToCkbTask.php @@ -52,6 +52,7 @@ class SyncWechatDataToCkbTask extends Command $this->syncWechatDevice($ChuKeBaoAdapter); $this->syncWechatCustomer($ChuKeBaoAdapter); $this->syncWechatGroup($ChuKeBaoAdapter); + $this->syncWechatGroupCustomer($ChuKeBaoAdapter); $this->syncWechatFriendToTrafficPoolBatch($ChuKeBaoAdapter); $this->syncTrafficSourceUser($ChuKeBaoAdapter); $this->syncTrafficSourceGroup($ChuKeBaoAdapter); @@ -113,6 +114,10 @@ class SyncWechatDataToCkbTask extends Command { return $ChuKeBaoAdapter->syncWechatGroup(); } + protected function syncWechatGroupCustomer(ChuKeBaoAdapter $ChuKeBaoAdapter) + { + return $ChuKeBaoAdapter->syncWechatGroupCustomer(); + } } \ No newline at end of file diff --git a/Server/application/command/WorkbenchGroupPushCommand.php b/Server/application/command/WorkbenchGroupPushCommand.php index 4fa7dfd7..59c70ddc 100644 --- a/Server/application/command/WorkbenchGroupPushCommand.php +++ b/Server/application/command/WorkbenchGroupPushCommand.php @@ -18,13 +18,13 @@ class WorkbenchGroupPushCommand extends Command protected function configure() { $this->setName('workbench:groupPush') - ->setDescription('工作台群组同步任务队列') + ->setDescription('工作台群发同步任务队列') ->addOption('jobId', null, Option::VALUE_OPTIONAL, '任务ID,用于区分不同实例', date('YmdHis') . rand(1000, 9999)); } protected function execute(Input $input, Output $output) { - $output->writeln('开始处理工作台群组同步任务...'); + $output->writeln('开始处理工作台群发同步任务...'); try { // 获取任务ID @@ -48,10 +48,10 @@ class WorkbenchGroupPushCommand extends Command // 将任务添加到队列 $this->addToQueue($jobId, $queueLockKey); - $output->writeln('工作台群组同步任务已添加到队列'); + $output->writeln('工作台群发同步任务已添加到队列'); } catch (\Exception $e) { - Log::error('工作台群组同步任务添加失败:' . $e->getMessage()); - $output->writeln('工作台群组同步任务添加失败:' . $e->getMessage()); + Log::error('工作台群发同步任务添加失败:' . $e->getMessage()); + $output->writeln('工作台群发同步任务添加失败:' . $e->getMessage()); return false; } diff --git a/Server/application/cunkebao/controller/plan/PostCreateAddFriendPlanV1Controller.php b/Server/application/cunkebao/controller/plan/PostCreateAddFriendPlanV1Controller.php index a6d0cec2..3429f4b7 100644 --- a/Server/application/cunkebao/controller/plan/PostCreateAddFriendPlanV1Controller.php +++ b/Server/application/cunkebao/controller/plan/PostCreateAddFriendPlanV1Controller.php @@ -127,7 +127,6 @@ class PostCreateAddFriendPlanV1Controller extends Controller ]; - try { Db::startTrans(); // 插入数据 @@ -263,6 +262,12 @@ class PostCreateAddFriendPlanV1Controller extends Controller } } + //群获客 + if($params['sceneId'] == 7){ + + } + + Db::commit(); return ResponseHelper::success(['planId' => $planId], '添加计划任务成功'); diff --git a/Server/application/job/WorkbenchGroupPushJob.php b/Server/application/job/WorkbenchGroupPushJob.php new file mode 100644 index 00000000..cf9e77c2 --- /dev/null +++ b/Server/application/job/WorkbenchGroupPushJob.php @@ -0,0 +1,401 @@ +logJobStart($jobId, $queueLockKey); + $this->execute(); + $this->handleJobSuccess($job, $queueLockKey); + return true; + } catch (\Exception $e) { + return $this->handleJobError($e, $job, $queueLockKey); + } + } + + /** + * 执行任务 + * @throws \Exception + */ + public function execute() + { + try { + // 获取所有工作台 + $workbenches = Workbench::where(['status' => 1, 'type' => 3, 'isDel' => 0])->order('id desc')->select(); + foreach ($workbenches as $workbench) { + // 获取工作台配置 + $config = WorkbenchGroupPush::where('workbenchId', $workbench->id)->find(); + if (!$config) { + continue; + } + + //判断是否推送 + $isPush = $this->isPush($workbench, $config); + if (empty($isPush)) { + continue; + } + + // 获取内容库 + $contentLibrary = $this->getContentLibrary($workbench, $config); + if (empty($contentLibrary)) { + continue; + } + // 处理内容发送 + $this->sendMsgToGroup($workbench, $config, $contentLibrary); + } + } catch (\Exception $e) { + Log::error("消息群发任务异常: " . $e->getMessage()); + throw $e; + } + } + + + // 发微信个人消息 + public function sendMsgToGroup($workbench, $config, $msgConf) + { + // 消息拼接 msgType(1:文本 3:图片 43:视频 47:动图表情包(gif、其他表情包) 49:小程序/其他:图文、文件) + // 当前,type 为文本、图片、动图表情包的时候,content为string, 其他情况为对象 {type: 'file/link/...', url: '', title: '', thunmbPath: '', desc: ''} + // $result = [ + // "content" => $dataArray['content'], + // "msgSubType" => 0, + // "msgType" => $dataArray['msgType'], + // "seq" => time(), + // "wechatAccountId" => $dataArray['wechatAccountId'], + // "wechatChatroomId" => 0, + // "wechatFriendId" => $dataArray['wechatFriendId'], + // ]; + + + $groups = json_decode($config['groups'], true); + $groupsData = Db::name('wechat_group')->whereIn('id', $groups)->field('id,wechatAccountId,chatroomId,companyId,ownerWechatId')->select(); + if (empty($groupsData)) { + return false; + } + + $toAccountId = ''; + $username = Env::get('api.username', ''); + $password = Env::get('api.password', ''); + if (!empty($username) || !empty($password)) { + $toAccountId = Db::name('users')->where('account', $username)->value('s2_accountId'); + } + // 建立WebSocket + $wsController = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]); + foreach ($msgConf as $content) { + $sendData = []; + $sqlData = []; + + foreach ($groupsData as $groups) { + // msgType(1:文本 3:图片 43:视频 47:动图表情包(gif、其他表情包) 49:小程序/其他:图文、文件) + $sqlData[] = [ + 'workbenchId' => $workbench['id'], + 'contentId' => $content['id'], + 'groupId' => $groups['id'], + 'wechatAccountId' => $groups['wechatAccountId'], + 'createTime' => time() + ]; + + //内容 + if (!empty($content['content'])) { + $sendData[] = [ + 'content' => $content['content'], + 'msgType' => 1, + 'wechatAccountId' => $groups['wechatAccountId'], + 'wechatChatroomId' => $groups['id'], + ]; + } + + switch ($content['contentType']) { + case 1: + //图片解析 + $imgs = json_decode($content['resUrls'], true); + if (!empty($imgs)) { + foreach ($imgs as $img) { + $sendData[] = [ + 'content' => $img, + 'msgType' => 3, + 'wechatAccountId' => $groups['wechatAccountId'], + 'wechatChatroomId' => $groups['id'], + ]; + } + } + break; + case 2: + //链接解析 + $url = json_decode($content['urls'], true); + if (!empty($url[0])) { + $url = $url[0]; + $sendData[] = [ + 'content' => [ + 'desc' => '', + 'thumbPath' => $url['image'], + 'title' => $url['desc'], + 'type' => 'link', + 'url' => $url['url'], + ], + 'msgType' => 49, + 'wechatAccountId' => $groups['wechatAccountId'], + 'wechatChatroomId' => $groups['id'], + ]; + } + + break; + case 3: + //视频解析 + $video = json_decode($content['urls'], true); + if (!empty($video)) { + $video = $video[0]; + } + $sendData[] = [ + 'content' => $video, + 'msgType' => 43, + 'wechatAccountId' => $groups['wechatAccountId'], + 'wechatChatroomId' => $groups['id'], + ]; + break; + } + + if (empty($sendData)) { + continue; + } + + //发送消息 + foreach ($sendData as $send) { + $wsController->sendCommunity($send); + } + //插入发送记录 + Db::name('workbench_group_push_item')->insertAll($sqlData); + } + } + } + + + /** + * 记录发送历史 + * @param Workbench $workbench + * @param array $devices + * @param array $contentLibrary + */ + protected function recordSendHistory($workbench, $devices, $contentLibrary) + { + $now = time(); + $data = []; + foreach ($devices as $device) { + $data = [ + 'workbenchId' => $workbench->id, + 'deviceId' => $device['deviceId'], + 'contentId' => $contentLibrary['id'], + 'wechatAccountId' => $device['wechatAccountId'], + 'createTime' => $now, + ]; + Db::name('workbench_group_push_item')->insert($data); + } + + } + + /** + * 获取设备列表 + * @param Workbench $workbench 工作台 + * @param WorkbenchGroupPush $config 配置 + * @return array|bool + */ + protected function isPush($workbench, $config) + { + // 检查发送间隔(新逻辑:根据startTime、endTime、maxPerDay动态计算) + $today = date('Y-m-d'); + $startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00'); + $endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00'); + + // 如果时间不符,则跳过 + if (($startTimestamp > time() || $endTimestamp < time()) && empty($config['pushType'])) { + return false; + } + + $totalSeconds = $endTimestamp - $startTimestamp; + if ($totalSeconds <= 0 || empty($config['maxPerDay'])) { + return false; + } + $interval = floor($totalSeconds / $config['maxPerDay']); + + + // 查询今日已同步次数 + $count = Db::name('workbench_group_push_item') + ->where('workbenchId', $workbench->id) + ->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp]) + ->count(); + if ($count >= $config['maxPerDay']) { + return false; + } + + // 计算本次同步的最早允许时间 + $nextSyncTime = $startTimestamp + $count * $interval; + if (time() < $nextSyncTime) { + return false; + } + return true; + } + + /** + * 获取内容库 + * @param Workbench $workbench 工作台 + * @param WorkbenchGroupPush $config 配置 + * @return array|bool + */ + protected function getContentLibrary($workbench, $config) + { + $contentids = json_decode($config['contentLibraries'], true); + if (empty($contentids)) { + return false; + } + + if ($config['pushType'] == 1) { + $limit = 10; + } else { + $limit = 1; + } + + + //推送顺序 + if ($config['pushOrder'] == 1) { + $order = 'ci.sendTime desc, ci.id asc'; + } else { + $order = 'ci.sendTime desc, ci.id desc'; + } + + // 基础查询 + $query = Db::name('content_library')->alias('cl') + ->join('content_item ci', 'ci.libraryId = cl.id') + ->join('workbench_group_push_item wgpi', 'wgpi.contentId = ci.id and wgpi.workbenchId = ' . $workbench->id, 'left') + ->where(['cl.isDel' => 0, 'ci.isDel' => 0]) + ->where('ci.sendTime <= ' . (time() + 60)) + ->whereIn('cl.id', $contentids) + ->field([ + 'ci.id', + 'ci.libraryId', + 'ci.contentType', + 'ci.title', + 'ci.content', + 'ci.resUrls', + 'ci.urls', + 'ci.comment', + 'ci.sendTime' + ]); + // 复制 query + $query2 = clone $query; + $query3 = clone $query; + // 根据accountType处理不同的发送逻辑 + if ($config['isLoop'] == 1) { + // 可以循环发送 + // 1. 优先获取未发送的内容 + $unsentContent = $query->where('wgpi.id', 'null') + ->order($order) + ->limit(0, $limit) + ->select(); + + if (!empty($unsentContent)) { + return $unsentContent; + } + $lastSendData = Db::name('workbench_group_push_item')->where('workbenchId', $workbench->id)->order('id desc')->find(); + $fastSendData = Db::name('workbench_group_push_item')->where('workbenchId', $workbench->id)->order('id asc')->find(); + + $sentContent = $query2->where('wgpi.contentId', '<', $lastSendData['contentId'])->order('wgpi.id ASC')->group('wgpi.contentId')->limit(0, $limit)->select(); + + if (empty($sentContent)) { + $sentContent = $query3->where('wgpi.contentId', '=', $fastSendData['contentId'])->order('wgpi.id ASC')->group('wgpi.contentId')->limit(0, $limit)->select(); + } + return $sentContent; + } else { + // 不能循环发送,只获取未发送的内容 + $list = $query->where('wgpi.id', 'null') + ->order($order) + ->limit(0, $limit) + ->select(); + return $list; + } + } + + /** + * 记录任务开始 + * @param string $jobId + * @param string $queueLockKey + */ + protected function logJobStart($jobId, $queueLockKey) + { + Log::info('开始处理工作台消息群发任务: ' . json_encode([ + 'jobId' => $jobId, + 'queueLockKey' => $queueLockKey + ])); + } + + /** + * 处理任务成功 + * @param Job $job + * @param string $queueLockKey + */ + protected function handleJobSuccess($job, $queueLockKey) + { + $job->delete(); + Cache::rm($queueLockKey); + Log::info('工作台消息群发任务执行成功'); + } + + /** + * 处理任务错误 + * @param \Exception $e + * @param Job $job + * @param string $queueLockKey + * @return bool + */ + protected function handleJobError(\Exception $e, $job, $queueLockKey) + { + Log::error('工作台消息群发任务异常:' . $e->getMessage()); + + if (!empty($queueLockKey)) { + Cache::rm($queueLockKey); + Log::info("由于异常释放队列锁: {$queueLockKey}"); + } + + if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) { + $job->delete(); + } else { + $job->release(Config::get('queue.failed_delay', 10)); + } + + return false; + } +} \ No newline at end of file diff --git a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php index a3751975..5c4159ad 100644 --- a/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php +++ b/Server/extend/WeChatDeviceApi/Adapters/ChuKeBao/Adapter.php @@ -1356,6 +1356,39 @@ class Adapter implements WeChatServiceInterface } while ($affected > 0); } + public function syncWechatGroupCustomer() + { + $sql = "insert into ck_wechat_group_member(`identifier`,`chatroomId`,`companyId`,`groupId`,`createTime`) + SELECT + m.wechatId identifier, + g.chatroomId chatroomId, + c.departmentId companyId, + g.id groupId, + m.createTime createTime + FROM + s2_wechat_chatroom_member m + LEFT JOIN s2_wechat_chatroom g ON g.chatroomId = m.chatroomId + LEFT JOIN s2_company_account c ON g.accountId = c.id + ORDER BY m.id DESC + LIMIT ?, ? + ON DUPLICATE KEY UPDATE + identifier=VALUES(identifier), + chatroomId=VALUES(chatroomId), + companyId=VALUES(companyId), + groupId=VALUES(groupId)"; + + $offset = 0; + $limit = 2000; + $usleepTime = 50000; + do { + $affected = Db::execute($sql, [$offset, $limit]); + $offset += $limit; + if ($affected > 0) { + usleep($usleepTime); + } + } while ($affected > 0); + } + } diff --git a/nkebao/.env.development b/nkebao/.env.development index 7afdd84c..b9f53856 100644 --- a/nkebao/.env.development +++ b/nkebao/.env.development @@ -1,4 +1,4 @@ # 基础环境变量示例 # VITE_API_BASE_URL=http://www.yishi.com -VITE_API_BASE_URL=https://ckbapi.quwanzhi.com +VITE_API_BASE_URL=http://www.yishi.com VITE_APP_TITLE=存客宝