From 7af7e35c50dcd01713768cae84cc27a86b89b28b Mon Sep 17 00:00:00 2001 From: wong <106998207@qq.com> Date: Fri, 30 May 2025 18:04:10 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E6=93=8D=E7=9B=98=E6=89=8B=E3=80=91?= =?UTF-8?q?=20=E6=B5=81=E9=87=8F=E5=88=86=E5=8F=91=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Server/application/command.php | 1 + .../command/WorkbenchAutoLikeCommand.php | 2 +- .../WorkbenchTrafficDistributeCommand.php | 61 ++++++ .../job/WorkbenchTrafficDistributeJob.php | 206 ++++++++++++++++++ 4 files changed, 269 insertions(+), 1 deletion(-) create mode 100644 Server/application/command/WorkbenchTrafficDistributeCommand.php create mode 100644 Server/application/job/WorkbenchTrafficDistributeJob.php diff --git a/Server/application/command.php b/Server/application/command.php index 152597a4..2dcd3178 100644 --- a/Server/application/command.php +++ b/Server/application/command.php @@ -31,4 +31,5 @@ return [ 'workbench:moments' => 'app\command\WorkbenchMomentsCommand', // 工作台朋友圈同步任务 'sync:wechatData' => 'app\command\SyncWechatDataToCkbTask', // 同步微信数据到存客宝 'sync:allFriends' => 'app\command\SyncAllFriendsCommand', // 同步所有在线好友 + 'workbench:trafficDistribute' => 'app\command\WorkbenchTrafficDistributeCommand', // 工作台流量分发任务 ]; diff --git a/Server/application/command/WorkbenchAutoLikeCommand.php b/Server/application/command/WorkbenchAutoLikeCommand.php index 878ee864..0d6e7851 100644 --- a/Server/application/command/WorkbenchAutoLikeCommand.php +++ b/Server/application/command/WorkbenchAutoLikeCommand.php @@ -35,7 +35,7 @@ class WorkbenchAutoLikeCommand extends Command // 检查队列是否已经在运行 $queueLockKey = "queue_lock:{$this->queueName}"; - Cache::rm($queueLockKey); + //Cache::rm($queueLockKey); if (Cache::get($queueLockKey)) { $output->writeln("队列 {$this->queueName} 已经在运行中,跳过执行"); Log::warning("队列 {$this->queueName} 已经在运行中,跳过执行"); diff --git a/Server/application/command/WorkbenchTrafficDistributeCommand.php b/Server/application/command/WorkbenchTrafficDistributeCommand.php new file mode 100644 index 00000000..c3da23c0 --- /dev/null +++ b/Server/application/command/WorkbenchTrafficDistributeCommand.php @@ -0,0 +1,61 @@ +setName('workbench:trafficDistribute') + ->setDescription('工作台流量分发任务队列') + ->addOption('jobId', null, Option::VALUE_OPTIONAL, '任务ID', date('YmdHis') . rand(1000, 9999)); + } + + protected function execute(Input $input, Output $output) + { + $output->writeln('开始处理流量分发任务...'); + try { + $jobId = $input->getOption('jobId'); + $output->writeln('任务ID: ' . $jobId); + + $queueLockKey = "queue_lock:{$this->queueName}"; + Cache::rm($queueLockKey); + if (Cache::get($queueLockKey)) { + $output->writeln("队列 {$this->queueName} 已经在运行中,跳过执行"); + Log::warning("队列 {$this->queueName} 已经在运行中,跳过执行"); + return false; + } + Cache::set($queueLockKey, $jobId, 3600); + $output->writeln("已设置队列运行锁,键名:{$queueLockKey},值:{$jobId},有效期:1小时"); + + $this->addToQueue($jobId, $queueLockKey); + + $output->writeln('流量分发任务已添加到队列'); + } catch (\Exception $e) { + Log::error('流量分发任务添加失败:' . $e->getMessage()); + $output->writeln('流量分发任务添加失败:' . $e->getMessage()); + return false; + } + return true; + } + + public function addToQueue($jobId = '', $queueLockKey = '') + { + $data = [ + 'jobId' => $jobId, + 'queueLockKey' => $queueLockKey + ]; + Queue::push(WorkbenchTrafficDistributeJob::class, $data, $this->queueName); + } +} \ No newline at end of file diff --git a/Server/application/job/WorkbenchTrafficDistributeJob.php b/Server/application/job/WorkbenchTrafficDistributeJob.php new file mode 100644 index 00000000..40d88c36 --- /dev/null +++ b/Server/application/job/WorkbenchTrafficDistributeJob.php @@ -0,0 +1,206 @@ +logJobStart($jobId, $queueLockKey); + $workbenches = $this->getActiveWorkbenches(); + if (empty($workbenches)) { + $this->handleEmptyWorkbenches($job, $queueLockKey); + return true; + } + $this->processWorkbenches($workbenches); + $this->handleJobSuccess($job, $queueLockKey); + return true; + } catch (\Exception $e) { + return $this->handleJobError($e, $job, $queueLockKey); + } + } + + protected function getActiveWorkbenches() + { + return Workbench::where([ + ['status', '=', 1], + ['isDel', '=', 0], + ['type', '=', 5] + ])->order('id DESC')->select(); + } + + protected function processWorkbenches($workbenches) + { + foreach ($workbenches as $workbench) { + try { + $this->processSingleWorkbench($workbench); + } catch (\Exception $e) { + Log::error("处理流量分发工作台 {$workbench->id} 失败: " . $e->getMessage()); + } + } + } + + protected function processSingleWorkbench($workbench) + { + $page = 1; + $pageSize = 20; + $config = WorkbenchTrafficConfig::where('workbenchId', $workbench->id)->find(); + if (!$config) { + Log::error("流量分发工作台 {$workbench->id} 配置获取失败"); + return; + } + + // 验证是否在流量分发时间范围内 + if (!$this->isTimeRange($config) && $config['timeType'] == 2) { + return; + } + + // 获取账号,userName不包含offline和delete + $account = Db::table('s2_company_account') + ->where(['departmentId' => $workbench->companyId, 'status' => 0]) + ->whereNotLike('userName', '%_offline%') + ->whereNotLike('userName', '%_delete%') + ->field('id,userName,realName') + ->select(); + $accountNum = count($account); + if($accountNum < 2){ + Log::info("流量分发工作台 {$workbench->id} 账号少于3个"); + return; + } + + // 默认第一页,每页20条 + $friends = $this->getFriendsByLabels($workbench,$config, $page, $pageSize); + if(empty($friends) || count($friends) == 0){ + Log::info("流量分发工作台 {$workbench->id} 没有可分配的好友"); + return; + } + + + print_r($friends); + exit; + + switch ($config->distributeType) { + case 1: + // 平均分配 + break; + case 2: + // 按客服优先等级 + break; + case 3: + // 比例分配 + break; + } + + + + + print_r($accountNum); + exit; + // 例如:分配好友/客户到设备、客服、流量池等 + Log::info("流量分发工作台 {$workbench->id} 执行分发逻辑"); + } + + + /** + * 检查是否在流量分发时间范围内 + * @param WorkbenchAutoLike $config + * @return bool + */ + protected function isTimeRange($config) + { + $currentTime = date('H:i'); + if ($currentTime < $config['startTime'] || $currentTime > $config['endTime']) { + Log::info("当前时间 {$currentTime} 不在流量分发时间范围内 ({$config['startTime']} - {$config['endTime']})"); + return false; + } + return true; + } + + + /** + * 一次性查出所有包含指定标签数组的好友(支持分页) + * @param array $workbench 标签数组 + * @param array $config 标签数组 + * @param int $page 页码 + * @param int $pageSize 每页数量 + * @return array + */ + protected function getFriendsByLabels($workbench,$config, $page = 1, $pageSize = 20) + { + $labels = json_decode($config['pools'],true); + $device = json_decode($config['devices'],true); + + + $query = Db::table('s2_wechat_friend')->alias('wf') + ->join(['s2_company_account'=>'sa'],'sa.id = wf.accountId','left') + ->join(['s2_wechat_account'=>'wa'],'wa.id = wf.wechatAccountId','left') + ->join('workbench_traffic_config_item wtci', 'wtci.wechatFriendId = wf.id AND wtci.workbenchId = '. $config['workbenchId'], 'left') + ->where([ + ['wf.isDeleted','=',0], + ['sa.departmentId' ,'=', $workbench->companyId], + ['wtci.id', 'null', null] + ]) + ->whereIn('wa.currentDeviceId',$device) + ->field('wf.id,wf.wechatAccountId,wf.wechatId,wf.labels,sa.userName,wa.currentDeviceId as deviceId'); + $query->where(function($q) use ($labels) { + foreach ($labels as $label) { + $q->whereOrRaw("JSON_CONTAINS(wf.labels, '\"{$label}\"')"); + } + }); + + $list = $query->page($page, $pageSize)->select(); + + return $list; + } + + protected function logJobStart($jobId, $queueLockKey) + { + Log::info('开始处理流量分发任务: ' . json_encode([ + 'jobId' => $jobId, + 'queueLockKey' => $queueLockKey + ])); + } + + protected function handleJobSuccess($job, $queueLockKey) + { + $job->delete(); + Cache::rm($queueLockKey); + Log::info('流量分发任务执行成功'); + } + + protected function handleJobError(\Exception $e, $job, $queueLockKey) + { + Log::error('流量分发任务异常:' . $e->getMessage()); + if (!empty($queueLockKey)) { + Cache::rm($queueLockKey); + Log::info("由于异常释放队列锁: {$queueLockKey}"); + } + if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) { + $job->delete(); + } else { + $job->release(Config::get('queue.failed_delay', 10)); + } + return false; + } + + protected function handleEmptyWorkbenches(Job $job, $queueLockKey) + { + Log::info('没有需要处理的流量分发任务'); + $job->delete(); + Cache::rm($queueLockKey); + } +} \ No newline at end of file