diff --git a/Server/application/command/WorkbenchCommand.php b/Server/application/command/WorkbenchAutoLikeCommand.php similarity index 72% rename from Server/application/command/WorkbenchCommand.php rename to Server/application/command/WorkbenchAutoLikeCommand.php index ce09c5a9..878ee864 100644 --- a/Server/application/command/WorkbenchCommand.php +++ b/Server/application/command/WorkbenchAutoLikeCommand.php @@ -8,24 +8,24 @@ use think\console\Output; use think\console\input\Option; use think\facade\Log; use think\Queue; -use app\job\WorkbenchJob; +use app\job\WorkbenchAutoLikeJob; use think\facade\Cache; -class WorkbenchCommand extends Command +class WorkbenchAutoLikeCommand extends Command { // 队列名称 - protected $queueName = 'workbench'; + protected $queueName = 'workbench_auto_like'; protected function configure() { - $this->setName('workbench:run') - ->setDescription('工作台任务队列') + $this->setName('workbench:autoLike') + ->setDescription('工作台自动点赞任务队列') ->addOption('jobId', null, Option::VALUE_OPTIONAL, '任务ID,用于区分不同实例', date('YmdHis') . rand(1000, 9999)); } protected function execute(Input $input, Output $output) { - $output->writeln('开始处理工作台任务...'); + $output->writeln('开始处理工作台自动点赞任务...'); try { // 获取任务ID @@ -49,10 +49,10 @@ class WorkbenchCommand extends Command // 将任务添加到队列 $this->addToQueue($jobId, $queueLockKey); - $output->writeln('工作台任务已添加到队列'); + $output->writeln('工作台自动点赞任务已添加到队列'); } catch (\Exception $e) { - Log::error('工作台任务添加失败:' . $e->getMessage()); - $output->writeln('工作台任务添加失败:' . $e->getMessage()); + Log::error('工作台自动点赞任务添加失败:' . $e->getMessage()); + $output->writeln('工作台自动点赞任务添加失败:' . $e->getMessage()); return false; } @@ -71,7 +71,7 @@ class WorkbenchCommand extends Command 'queueLockKey' => $queueLockKey ]; - // 添加到队列,设置任务名为 workbench - Queue::push(WorkbenchJob::class, $data, $this->queueName); + // 添加到队列,设置任务名为 workbench_auto_like + Queue::push(WorkbenchAutoLikeJob::class, $data, $this->queueName); } } \ No newline at end of file diff --git a/Server/application/command/WorkbenchMomentsCommand.php b/Server/application/command/WorkbenchMomentsCommand.php new file mode 100644 index 00000000..760b5a0c --- /dev/null +++ b/Server/application/command/WorkbenchMomentsCommand.php @@ -0,0 +1,76 @@ +setName('workbench:moments') + ->setDescription('工作台朋友圈同步任务队列') + ->addOption('jobId', null, Option::VALUE_OPTIONAL, '任务ID,用于区分不同实例', date('YmdHis') . rand(1000, 9999)); + } + + protected function execute(Input $input, Output $output) + { + $output->writeln('开始处理工作台朋友圈同步任务...'); + + try { + // 获取任务ID + $jobId = $input->getOption('jobId'); + + $output->writeln('任务ID: ' . $jobId); + + // 检查队列是否已经在运行 + $queueLockKey = "queue_lock:{$this->queueName}"; + Cache::rm($queueLockKey); + if (Cache::get($queueLockKey)) { + $output->writeln("队列 {$this->queueName} 已经在运行中,跳过执行"); + Log::warning("队列 {$this->queueName} 已经在运行中,跳过执行"); + return false; + } + + // 设置队列运行锁,有效期1小时 + Cache::set($queueLockKey, $jobId, 3600); + $output->writeln("已设置队列运行锁,键名:{$queueLockKey},值:{$jobId},有效期:1小时"); + + // 将任务添加到队列 + $this->addToQueue($jobId, $queueLockKey); + + $output->writeln('工作台朋友圈同步任务已添加到队列'); + } catch (\Exception $e) { + Log::error('工作台朋友圈同步任务添加失败:' . $e->getMessage()); + $output->writeln('工作台朋友圈同步任务添加失败:' . $e->getMessage()); + return false; + } + + return true; + } + + /** + * 添加任务到队列 + * @param string $jobId 任务ID + * @param string $queueLockKey 队列锁键名 + */ + public function addToQueue($jobId = '', $queueLockKey = '') + { + $data = [ + 'jobId' => $jobId, + 'queueLockKey' => $queueLockKey + ]; + + // 添加到队列,设置任务名为 workbench_moments + Queue::push(WorkbenchMomentsJob::class, $data, $this->queueName); + } +} \ No newline at end of file diff --git a/Server/application/job/WorkbenchJob.php b/Server/application/job/WorkbenchJob.php deleted file mode 100644 index 01f06dd8..00000000 --- a/Server/application/job/WorkbenchJob.php +++ /dev/null @@ -1,620 +0,0 @@ -logJobStart($jobId, $queueLockKey); - $workbenches = $this->getActiveWorkbenches(); - if (empty($workbenches)) { - $this->handleEmptyWorkbenches($job, $queueLockKey); - return true; - } - $this->processWorkbenches($workbenches); - $this->handleJobSuccess($job, $queueLockKey); - return true; - - } catch (\Exception $e) { - return $this->handleJobError($e, $job, $queueLockKey); - } - } - - /************************************ - * 工作台基础功能 - ************************************/ - - /** - * 获取活跃的工作台 - * @return \think\Collection - */ - protected function getActiveWorkbenches() - { - return Workbench::where([ - ['status', '=', 1], - ['isDel', '=', 0] - ])->order('id DESC')->select(); - } - - /** - * 处理工作台列表 - * @param \think\Collection $workbenches - */ - protected function processWorkbenches($workbenches) - { - foreach ($workbenches as $workbench) { - try { - $this->processSingleWorkbench($workbench); - } catch (\Exception $e) { - Log::error("处理工作台 {$workbench->id} 失败: " . $e->getMessage()); - } - } - } - - /** - * 处理单个工作台 - * @param Workbench $workbench - */ - protected function processSingleWorkbench($workbench) - { - $config = $this->getWorkbenchConfig($workbench); - if (!$config) { - Log::error("工作台 {$workbench->id} 配置获取失败"); - return; - } - - $handler = $this->getWorkbenchHandler($workbench->type); - if ($handler) { - $handler($workbench, $config); - } - } - - /** - * 获取工作台处理器 - * @param int $type - * @return callable|null - */ - protected function getWorkbenchHandler($type) - { - $handlers = [ - self::TYPE_AUTO_LIKE => [$this, 'handleAutoLike'], - self::TYPE_MOMENTS_SYNC => [$this, 'handleMomentsSync'], - self::TYPE_GROUP_PUSH => [$this, 'handleGroupPush'], - self::TYPE_GROUP_CREATE => [$this, 'handleGroupCreate'] - ]; - - return $handlers[$type] ?? null; - } - - /** - * 获取工作台配置 - * @param Workbench $workbench 工作台实例 - * @return mixed - */ - protected function getWorkbenchConfig($workbench) - { - switch ($workbench->type) { - case self::TYPE_AUTO_LIKE: - return WorkbenchAutoLike::where('workbenchId', $workbench->id)->find(); - - case self::TYPE_MOMENTS_SYNC: - return WorkbenchMomentsSync::where('workbenchId', $workbench->id)->find(); - - case self::TYPE_GROUP_PUSH: - return WorkbenchGroupPush::where('workbenchId', $workbench->id)->find(); - - case self::TYPE_GROUP_CREATE: - return WorkbenchGroupCreate::where('workbenchId', $workbench->id)->find(); - - default: - return null; - } - } - - /************************************ - * 自动点赞功能 - ************************************/ - - /** - * 处理自动点赞任务 - * @param Workbench $workbench - * @param WorkbenchAutoLike $config - */ - protected function handleAutoLike($workbench, $config) - { - if (!$this->validateAutoLikeConfig($workbench, $config)) { - return; - } - - // 验证是否在点赞时间范围内 - if (!$this->isWithinLikeTimeRange($config)) { - return; - } - - // 处理分页获取好友列表 - $this->processAllFriends($workbench, $config); - } - - /** - * 处理所有好友分页 - * @param Workbench $workbench - * @param WorkbenchAutoLike $config - * @param int $page 当前页码 - * @param int $pageSize 每页大小 - */ - protected function processAllFriends($workbench, $config, $page = 1, $pageSize = 100) - { - $friendList = $this->getFriendList($config, $page, $pageSize); - if (empty($friendList)) { - return; - } - - // 将好友列表分成20组 - $friendGroups = array_chunk($friendList, 20); - $processes = []; - - foreach ($friendGroups as $groupIndex => $friendGroup) { - // 创建子进程 - $pid = pcntl_fork(); - - if ($pid == -1) { - // 创建进程失败 - Log::error("工作台 {$workbench->id} 创建进程失败"); - continue; - } else if ($pid) { - // 父进程 - $processes[] = $pid; - } else { - // 子进程 - try { - foreach ($friendGroup as $friend) { - // 验证是否达到点赞次数上限 - $likeCount = $this->getTodayLikeCount($workbench, $config, $friend['deviceId']); - if ($likeCount >= $config['maxLikes']) { - Log::info("工作台 {$workbench->id} 点赞次数已达上限"); - continue; - } - - // 验证是否达到好友点赞次数上限 - $friendMaxLikes = Db::name('workbench_auto_like_item') - ->where('workbenchId', $workbench->id) - ->where('wechatFriendId', $friend['friendId']) - ->count(); - - if ($friendMaxLikes < $config['friendMaxLikes']) { - $this->processFriendMoments($workbench, $config, $friend); - } - } - } catch (\Exception $e) { - Log::error("工作台 {$workbench->id} 子进程异常: " . $e->getMessage()); - } - - // 子进程执行完毕后退出 - exit(0); - } - } - - // 等待所有子进程完成 - foreach ($processes as $pid) { - pcntl_waitpid($pid, $status); - } - - // 如果当前页数据量等于页大小,说明可能还有更多数据,继续处理下一页 - if (count($friendList) == $pageSize) { - $this->processAllFriends($workbench, $config, $page + 1, $pageSize); - } - } - - /** - * 获取好友列表 - * @param WorkbenchAutoLike $config 配置 - * @param int $page 页码 - * @param int $pageSize 每页大小 - * @return array - */ - protected function getFriendList($config, $page = 1, $pageSize = 100) - { - $friends = json_decode($config['friends'], true); - $devices = json_decode($config['devices'], true); - - $list = Db::table('s2_company_account') - ->alias('ca') - ->join(['s2_wechat_account' => 'wa'], 'ca.id = wa.deviceAccountId') - ->join(['s2_wechat_friend' => 'wf'], 'ca.id = wf.accountId AND wf.wechatAccountId = wa.id') - ->join('workbench_auto_like_item wali', 'wali.wechatFriendId = wf.id AND wali.workbenchId = ' . $config['workbenchId'], 'left') - ->where([ - 'ca.status' => 0, - 'wf.isDeleted' => 0, - 'wa.deviceAlive' => 1, - 'wa.wechatAlive' => 1 - ]) - ->whereIn('wa.currentDeviceId', $devices) - ->field([ - 'ca.id as accountId', - 'ca.userName', - 'wf.id as friendId', - 'wf.wechatId', - 'wf.wechatAccountId', - 'wa.wechatId as wechatAccountWechatId', - 'wa.currentDeviceId as deviceId', - 'COUNT(wali.id) as like_count' - ]); - - if (!empty($friends) && is_array($friends) && count($friends) > 0) { - $list = $list->whereIn('wf.id', $friends); - } - - $list = $list->group('wf.wechatId') - ->having('like_count < ' . $config['friendMaxLikes']) - ->order('wf.id DESC') - ->page($page, $pageSize) - ->select(); - - return $list; - } - - /** - * 处理好友朋友圈 - * @param Workbench $workbench - * @param WorkbenchAutoLike $config - * @param array $friend - */ - protected function processFriendMoments($workbench, $config, $friend) - { - $toAccountId = ''; - $username = Env::get('api.username', ''); - $password = Env::get('api.password', ''); - if (!empty($username) || !empty($password)) { - $toAccountId = Db::name('users')->where('account',$username)->value('s2_accountId'); - } - - try { - // 执行切换好友命令 - $automaticAssign = new AutomaticAssign(); - $automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $toAccountId], true); - - // 执行采集朋友圈命令 - $webSocket = new WebSocketController(['userName' => $username, 'password' => $password, 'accountId' => $toAccountId]); - $webSocket->getMoments(['wechatFriendId' => $friend['friendId'], 'wechatAccountId' => $friend['wechatAccountId']]); - - // 查询未点赞的朋友圈 - $moments = $this->getUnlikedMoments($friend['friendId']); - if (empty($moments)) { - Log::info("好友 {$friend['friendId']} 没有需要点赞的朋友圈"); - // 处理完毕切换回原账号 - $automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true); - return; - } - - foreach ($moments as $moment) { - // 点赞朋友圈 - $this->likeMoment($workbench, $config, $friend, $moment, $webSocket); - - if(!empty($config['enableFriendTags']) && !empty($config['friendTags'])){ - // 修改好友标签 - $labels = $this->getFriendLabels($friend); - $labels[] = $config['friendTags']; - $webSocket->modifyFriendLabel(['wechatFriendId' => $friend['friendId'], 'wechatAccountId' => $friend['wechatAccountId'], 'labels' => $labels]); - } - - // 每个好友只点赞一条朋友圈,然后退出 - break; - } - - // 处理完毕切换回原账号 - $automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true); - } catch (\Exception $e) { - // 异常情况下也要确保切换回原账号 - $automaticAssign->allotWechatFriend(['wechatFriendId' => $friend['friendId'], 'toAccountId' => $friend['accountId']], true); - - Log::error("处理好友 {$friend['friendId']} 朋友圈失败: " . $e->getMessage()); - } - } - - /** - * 获取未点赞的朋友圈 - * @param int $friendId - * @return \think\Collection - */ - protected function getUnlikedMoments($friendId) - { - return Db::table('s2_wechat_moments') - ->alias('wm') - ->join('workbench_auto_like_item wali', 'wali.momentsId = wm.id', 'left') - ->where([ - ['wm.wechatFriendId', '=', $friendId], - ['wali.id', 'null', null] - ]) - ->field('wm.id, wm.snsId') - ->group('wali.wechatFriendId') - ->order('wm.createTime DESC') - ->select(); - } - - /** - * 点赞朋友圈 - * @param Workbench $workbench - * @param WorkbenchAutoLike $config - * @param array $friend - * @param array $moment - * @param WebSocketController $webSocket - */ - protected function likeMoment($workbench, $config, $friend, $moment, $webSocket) - { - try { - $result = $webSocket->momentInteract([ - 'snsId' => $moment['snsId'], - 'wechatAccountId' => $friend['wechatAccountId'], - ]); - - $result = json_decode($result, true); - - if ($result['code'] == 200) { - $this->recordLike($workbench, $moment, $friend); - - // 添加间隔时间 - if (!empty($config['interval'])) { - sleep($config['interval']); - } - } else { - Log::error("工作台 {$workbench->id} 点赞失败: " . ($result['msg'] ?? '未知错误')); - } - } catch (\Exception $e) { - Log::error("工作台 {$workbench->id} 点赞异常: " . $e->getMessage()); - } - } - - /** - * 记录点赞 - * @param Workbench $workbench - * @param array $moment - * @param array $friend - */ - protected function recordLike($workbench, $moment, $friend) - { - Db::name('workbench_auto_like_item')->insert([ - 'workbenchId' => $workbench->id, - 'deviceId' => $friend['deviceId'], - 'momentsId' => $moment['id'], - 'snsId' => $moment['snsId'], - 'wechatAccountId' => $friend['wechatAccountId'], - 'wechatFriendId' => $friend['friendId'], - 'createTime' => time() - ]); - Log::info("工作台 {$workbench->id} 点赞成功: {$moment['snsId']}"); - } - - /** - * 获取好友标签 - * @param array $friend - * @return array - */ - protected function getFriendLabels($friend) - { - $wechatFriendController = new WechatFriendController(); - $result = $wechatFriendController->getlist([ - 'friendKeyword' => $friend['wechatId'], - 'wechatAccountKeyword' => $friend['wechatAccountWechatId'] - ], true); - - $result = json_decode($result, true); - $labels = []; - - if(!empty($result['data'])){ - foreach($result['data'] as $item){ - $labels = array_merge($labels, $item['labels']); - } - } - - return $labels; - } - - /** - * 验证自动点赞配置 - * @param Workbench $workbench - * @param WorkbenchAutoLike $config - * @return bool - */ - protected function validateAutoLikeConfig($workbench, $config) - { - $requiredFields = ['contentTypes', 'interval', 'maxLikes', 'startTime', 'endTime']; - foreach ($requiredFields as $field) { - if (empty($config[$field])) { - Log::error("工作台 {$workbench->id} 配置字段 {$field} 为空"); - return false; - } - } - return true; - } - - /** - * 获取今日点赞次数 - * @param Workbench $workbench - * @param WorkbenchAutoLike $config - * @return int - */ - protected function getTodayLikeCount($workbench, $config, $deviceId) - { - return Db::name('workbench_auto_like_item') - ->where('workbenchId', $workbench->id) - ->where('deviceId', $deviceId) - ->whereTime('createTime', 'between', [ - strtotime(date('Y-m-d') . ' ' . $config['startTime'] . ':00'), - strtotime(date('Y-m-d') . ' ' . $config['endTime'] . ':00') - ]) - ->count(); - } - - /** - * 检查是否在点赞时间范围内 - * @param WorkbenchAutoLike $config - * @return bool - */ - protected function isWithinLikeTimeRange($config) - { - $currentTime = date('H:i'); - if ($currentTime < $config['startTime'] || $currentTime > $config['endTime']) { - Log::info("当前时间 {$currentTime} 不在点赞时间范围内 ({$config['startTime']} - {$config['endTime']})"); - return false; - } - return true; - } - - /************************************ - * 朋友圈同步功能 - ************************************/ - - /** - * 处理朋友圈同步任务 - * @param Workbench $workbench 工作台实例 - * @param WorkbenchMomentsSync $config 配置实例 - */ - protected function handleMomentsSync($workbench, $config) - { - // TODO: 实现朋友圈同步逻辑 - Log::info("处理朋友圈同步任务: {$workbench->id}"); - } - - /************************************ - * 群消息推送功能 - ************************************/ - - /** - * 处理群消息推送任务 - * @param Workbench $workbench 工作台实例 - * @param WorkbenchGroupPush $config 配置实例 - */ - protected function handleGroupPush($workbench, $config) - { - // TODO: 实现群消息推送逻辑 - Log::info("处理群消息推送任务: {$workbench->id}"); - } - - /************************************ - * 自动建群功能 - ************************************/ - - /** - * 处理自动建群任务 - * @param Workbench $workbench 工作台实例 - * @param WorkbenchGroupCreate $config 配置实例 - */ - protected function handleGroupCreate($workbench, $config) - { - // TODO: 实现自动建群逻辑 - Log::info("处理自动建群任务: {$workbench->id}"); - } - - /************************************ - * 任务处理辅助方法 - ************************************/ - - /** - * 记录任务开始 - * @param string $jobId - * @param string $queueLockKey - */ - protected function logJobStart($jobId, $queueLockKey) - { - Log::info('开始处理工作台任务: ' . json_encode([ - 'jobId' => $jobId, - 'queueLockKey' => $queueLockKey - ])); - } - - /** - * 处理任务成功 - * @param Job $job - * @param string $queueLockKey - */ - protected function handleJobSuccess($job, $queueLockKey) - { - $job->delete(); - Cache::rm($queueLockKey); - Log::info('工作台任务执行成功'); - } - - /** - * 处理任务错误 - * @param \Exception $e - * @param Job $job - * @param string $queueLockKey - * @return bool - */ - protected function handleJobError(\Exception $e, $job, $queueLockKey) - { - Log::error('工作台任务异常:' . $e->getMessage()); - - if (!empty($queueLockKey)) { - Cache::rm($queueLockKey); - Log::info("由于异常释放队列锁: {$queueLockKey}"); - } - - if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) { - $job->delete(); - } else { - $job->release(Config::get('queue.failed_delay', 10)); - } - - return false; - } - - /** - * 处理空工作台情况 - * @param Job $job - * @param string $queueLockKey - */ - protected function handleEmptyWorkbenches(Job $job, $queueLockKey) - { - Log::info('没有需要处理的工作台任务'); - $job->delete(); - Cache::rm($queueLockKey); - } -} \ No newline at end of file