From 0f1edf3f9cd47c79cb58055c1ab96c03c91552c0 Mon Sep 17 00:00:00 2001 From: wong <106998207@qq.com> Date: Wed, 10 Sep 2025 16:58:22 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E8=AE=AF=E5=BD=95=E5=AF=BC=E5=85=A5?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/controller/DeviceController.php | 12 +- Server/application/command.php | 1 + .../command/WorkbenchImportContactCommand.php | 125 ++++++ .../job/WorkbenchImportContactJob.php | 399 ++++++++++++++++++ 4 files changed, 535 insertions(+), 2 deletions(-) create mode 100644 Server/application/command/WorkbenchImportContactCommand.php create mode 100644 Server/application/job/WorkbenchImportContactJob.php diff --git a/Server/application/api/controller/DeviceController.php b/Server/application/api/controller/DeviceController.php index 509a5ee7..7ebe1d28 100644 --- a/Server/application/api/controller/DeviceController.php +++ b/Server/application/api/controller/DeviceController.php @@ -419,9 +419,17 @@ class DeviceController extends BaseController $response = handleApiResponse($result); if(empty($response)){ - return successJson([], '操作成功'); + if($isInner){ + return json_encode(['code'=>200,'msg'=>'更新设备联系人成功' ]); + }else{ + return successJson([],'更新设备联系人失败:通讯录不能为空' ); + } }else{ - return errorJson([],$response); + if($isInner){ + return json_encode(['code'=>200,'msg'=> $response ]); + }else{ + return successJson([],$response ); + } } } catch (\Exception $e) { if($isInner){ diff --git a/Server/application/command.php b/Server/application/command.php index cc0dbacf..bc98bd35 100644 --- a/Server/application/command.php +++ b/Server/application/command.php @@ -37,4 +37,5 @@ return [ 'workbench:trafficDistribute' => 'app\command\WorkbenchTrafficDistributeCommand', // 工作台流量分发任务 'workbench:groupPush' => 'app\command\WorkbenchGroupPushCommand', // 工作台群推送任务 'workbench:groupCreate' => 'app\command\WorkbenchGroupCreateCommand', // 工作台群创建任务 + 'workbench:import-contact' => 'app\command\WorkbenchImportContactCommand', // 工作台通讯录导入任务 ]; diff --git a/Server/application/command/WorkbenchImportContactCommand.php b/Server/application/command/WorkbenchImportContactCommand.php new file mode 100644 index 00000000..55593cc1 --- /dev/null +++ b/Server/application/command/WorkbenchImportContactCommand.php @@ -0,0 +1,125 @@ +setName('workbench:import-contact') + ->setDescription('执行工作台通讯录导入任务'); + } + + /** + * 执行命令 + * @param Input $input + * @param Output $output + * @return int + */ + protected function execute(Input $input, Output $output) + { + $output->writeln('开始执行工作台通讯录导入任务...'); + + try { + // 检查是否有任务正在执行 + $lockKey = 'workbench_import_contact_lock'; + if (Cache::has($lockKey)) { + $output->writeln('通讯录导入任务正在执行中,跳过本次执行'); + return 0; + } + + // 设置执行锁,防止重复执行 + Cache::set($lockKey, time(), 3600); // 1小时锁定时间 + + // 生成任务ID + $jobId = 'workbench_import_contact_' . date('YmdHis') . '_' . mt_rand(1000, 9999); + + // 准备任务数据 + $jobData = [ + 'jobId' => $jobId, + 'queueLockKey' => $lockKey, + 'executeTime' => time() + ]; + // 判断是否使用队列 + if ($this->shouldUseQueue()) { + // 推送到队列 + Queue::push(WorkbenchImportContactJob::class, $jobData, 'workbench_import_contact'); + $output->writeln("通讯录导入任务已推送到队列,任务ID: {$jobId}"); + } else { + // 直接执行 + $job = new WorkbenchImportContactJob(); + $result = $job->execute(); + + // 释放锁 + Cache::rm($lockKey); + + if ($result !== false) { + $output->writeln('通讯录导入任务执行成功'); + } else { + $output->writeln('通讯录导入任务执行失败'); + return 1; + } + } + + } catch (\Exception $e) { + // 释放锁 + Cache::rm($lockKey ?? ''); + + $errorMsg = '通讯录导入任务执行异常: ' . $e->getMessage(); + $output->writeln($errorMsg); + Log::error($errorMsg); + return 1; + } + + return 0; + } + + /** + * 判断是否应该使用队列 + * @return bool + */ + protected function shouldUseQueue() + { + // 检查队列配置是否启用 + $queueConfig = config('queue'); + if (empty($queueConfig) || !isset($queueConfig['default'])) { + return false; + } + + // 检查队列连接是否可用 + try { + $connection = $queueConfig['connections'][$queueConfig['default']] ?? []; + if (empty($connection)) { + return false; + } + + // 如果是数据库队列,检查表是否存在 + if ($connection['type'] === 'database') { + $tableName = $connection['table'] ?? 'jobs'; + $exists = \think\Db::query("SHOW TABLES LIKE '{$tableName}'"); + return !empty($exists); + } + + return true; + } catch (\Exception $e) { + Log::warning('队列检查失败,将使用同步执行: ' . $e->getMessage()); + return false; + } + } +} \ No newline at end of file diff --git a/Server/application/job/WorkbenchImportContactJob.php b/Server/application/job/WorkbenchImportContactJob.php new file mode 100644 index 00000000..ec36380f --- /dev/null +++ b/Server/application/job/WorkbenchImportContactJob.php @@ -0,0 +1,399 @@ +logJobStart($jobId, $queueLockKey); + $this->execute(); + $this->handleJobSuccess($job, $queueLockKey); + return true; + } catch (\Exception $e) { + return $this->handleJobError($e, $job, $queueLockKey); + } + } + + /** + * 执行任务 + * @throws \Exception + */ + public function execute() + { + try { + // 获取所有启用的通讯录导入工作台 + $workbenches = Workbench::where(['status' => 1, 'type' => 6, 'isDel' => 0])->order('id desc')->select(); + foreach ($workbenches as $workbench) { + // 获取工作台配置 + $config = WorkbenchImportContact::where('workbenchId', $workbench->id)->find(); + if (!$config) { + continue; + } + + // 判断是否需要导入 + $shouldImport = $this->shouldImport($workbench, $config); + if (!$shouldImport) { + continue; + } + + // 获取需要导入的设备列表 + $devices = $this->getDeviceList($workbench, $config); + if (empty($devices)) { + continue; + } + + // 获取通讯录数据 + $contactData = $this->getContactFromDatabase($workbench, $config); + if (empty($contactData)) { + continue; + } + + // 执行通讯录导入 + $this->importContactToDevices($workbench, $config, $devices, $contactData); + } + } catch (\Exception $e) { + Log::error("通讯录导入任务异常: " . $e->getMessage()); + throw $e; + } + } + + /** + * 导入通讯录到设备 + * @param Workbench $workbench + * @param WorkbenchImportContact $config + * @param array $devices + * @param array $contactData + */ + public function importContactToDevices($workbench, $config, $devices, $contactData) + { + $deviceController = new DeviceController(); + + // 根据设备数量平分通讯录数据 + $deviceCount = count($devices); + if ($deviceCount == 0) { + Log::warning("没有可用设备进行通讯录导入"); + return; + } + + $contactCount = count($contactData); + if ($contactCount == 0) { + Log::warning("没有通讯录数据需要导入"); + return; + } + + // 计算每个设备分配的联系人数量 + $contactsPerDevice = ceil($contactCount / $deviceCount); + foreach ($devices as $index => $device) { + try { + // 计算当前设备的联系人数据范围 + $startIndex = $index * $contactsPerDevice; + $endIndex = min($startIndex + $contactsPerDevice, $contactCount); + + // 如果起始索引超出范围,跳过 + if ($startIndex >= $contactCount) { + continue; + } + + // 获取当前设备的联系人数据片段 + $deviceContactData = array_slice($contactData, $startIndex, $endIndex - $startIndex); + + if (empty($deviceContactData)) { + continue; + } + // 准备联系人数据 + $contactJson = $this->formatContactData($deviceContactData, $config); + + // 调用设备控制器的导入联系人方法 + $result = $deviceController->importContact([ + 'deviceId' => $device['deviceId'], + 'contactJson' => $contactJson, + 'clearContact' => $config['clearContact'] ?? false + ], true); + + $resultData = json_decode($result, true); + + // 记录导入历史 + $this->recordImportHistory($workbench, $device, $deviceContactData); + + if ($resultData['code'] == 200) { + Log::info("设备 {$device['deviceId']} 通讯录导入成功,导入联系人数量: " . count($deviceContactData)); + } else { + Log::error("设备 {$device['deviceId']} 通讯录导入失败: " . ($resultData['msg'] ?? '未知错误')); + } + + // 添加延迟,避免频繁请求 + if ($config['importInterval'] ?? 0 > 0) { + sleep($config['importInterval']); + } + + } catch (\Exception $e) { + Log::error("设备 {$device['deviceId']} 通讯录导入异常: " . $e->getMessage()); + } + } + } + + /** + * 格式化联系人数据 + * @param array $contactData + * @param WorkbenchImportContact $config + * @return array|string + */ + protected function formatContactData($contactData, $config) + { + $remarkType = $config['remarkType'] ?? 0; + $remark = $config['remark'] ?? ''; + + // 根据remarkType添加备注 + $suffix = ''; + switch ($remarkType) { + case 0: + // 不添加备注 + $suffix = ''; + break; + case 1: + // 添加年月日 + $suffix = date('Ymd') . '_'; + break; + case 2: + // 添加月日 + $suffix = date('md') . '_'; + break; + case 3: + // 自定义备注 + $suffix = $remark . '_'; + break; + default: + $suffix = ''; + break; + } + // 返回数组格式 + $contacts = []; + foreach ($contactData as $contact) { + $name = !empty($contact['name']) ? trim($contact['name']) : trim($contact['phone']); + if (!empty($suffix)) { + $name = $suffix . $name; + } + $contacts[] = [ + 'name' => $name, + 'phone' => trim($contact['phone']) + ]; + } + return $contacts; + + } + + /** + * 记录导入历史 + * @param Workbench $workbench + * @param array $device + * @param array $contactData + * @param array $result + */ + protected function recordImportHistory($workbench, $device, $contactData) + { + $data = []; + foreach ($contactData as $v){ + $data[] = [ + 'workbenchId' => $workbench->id, + 'deviceId' => $device['deviceId'], + 'packageId' => !empty($v['packageId']) ? $v['packageId'] : 0, + 'poolId' => !empty($v['id']) ? $v['id'] : 0, + 'createTime' => time(), + ]; + } + Db::name('workbench_import_contact_item')->insertAll($data); + } + + /** + * 获取设备列表 + * @param Workbench $workbench 工作台 + * @param WorkbenchImportContact $config 配置 + * @return array + */ + protected function getDeviceList($workbench, $config) + { + $deviceIds = json_decode($config['deviceId'], true); + if (empty($deviceIds)) { + return []; + } + + // 从数据库获取设备信息 + $devices = Db::table('s2_device') + ->whereIn('id', $deviceIds) + ->where('isDeleted', 0) + ->where('alive', 1) // 只选择在线设备 + ->field('id as deviceId, imei, nickname') + ->select(); + + return $devices; + } + + /** + * 判断是否需要导入 + * @param Workbench $workbench 工作台 + * @param WorkbenchImportContact $config 配置 + * @return bool + */ + protected function shouldImport($workbench, $config) + { + // 检查导入间隔 + $today = date('Y-m-d'); + $startTimestamp = strtotime($today . ' ' . $config['startTime'] . ':00'); + $endTimestamp = strtotime($today . ' ' . $config['endTime'] . ':00'); + // 如果不在指定时间范围内,则跳过 + if ($startTimestamp > time() || $endTimestamp < time()) { + return false; + } + + $maxPerDay = $config['num']; + if ($maxPerDay <= 0) { + return false; + } + + // 查询今日已导入次数 + $count = Db::name('workbench_import_contact_item') + ->where('workbenchId', $workbench->id) + ->whereTime('createTime', 'between', [$startTimestamp, $endTimestamp]) + ->count(); + + if ($count >= $maxPerDay) { + return false; + } + + // 计算导入间隔 + $totalSeconds = $endTimestamp - $startTimestamp; + $interval = floor($totalSeconds / $maxPerDay); + $nextImportTime = $startTimestamp + $count * $interval; + + if (time() < $nextImportTime) { + return false; + } + + return true; + } + + + + /** + * 从数据库读取通讯录 + * @param WorkbenchImportContact $config + * @return array + */ + protected function getContactFromDatabase($workbench,$config) + { + $pools = json_decode($config['pools'], true); + $deviceIds = json_decode($config['deviceId'], true); + if (empty($pools) || empty($deviceIds)) { + return false; + } + $deviceNum = count($deviceIds); + $contactNum = $deviceNum * $config['num']; + if (empty($contactNum)) { + return false; + } + //过滤已删除的数据 + $packageIds = Db::name('traffic_source_package') + ->where(['isDel' => 0]) + ->whereIn('id', $pools) + ->column('id'); + + if (empty($packageIds)) { + return false; + } + + $data = Db::name('traffic_source_package_item')->alias('tpi') + ->join('traffic_pool tp', 'tp.identifier = tpi.identifier') + ->join('traffic_source ts', 'ts.identifier = tpi.identifier','left') + ->join('workbench_import_contact_item wici', 'wici.poolId = tp.id AND wici.workbenchId = '.$workbench->id,'left') + ->where('tp.mobile', '>',0) + ->where('wici.id','null') + ->whereIn('tpi.packageId',$packageIds) + ->field('tp.id,tpi.packageId,tp.mobile as phone,ts.name') + ->order('tp.id DESC') + ->group('tpi.identifier') + ->limit($contactNum) + ->select(); + return $data; + } + + /** + * 记录任务开始 + * @param string $jobId + * @param string $queueLockKey + */ + protected function logJobStart($jobId, $queueLockKey) + { + Log::info('开始处理工作台通讯录导入任务: ' . json_encode([ + 'jobId' => $jobId, + 'queueLockKey' => $queueLockKey + ])); + } + + /** + * 处理任务成功 + * @param Job $job + * @param string $queueLockKey + */ + protected function handleJobSuccess($job, $queueLockKey) + { + $job->delete(); + Cache::rm($queueLockKey); + Log::info('工作台通讯录导入任务执行成功'); + } + + /** + * 处理任务错误 + * @param \Exception $e + * @param Job $job + * @param string $queueLockKey + * @return bool + */ + protected function handleJobError(\Exception $e, $job, $queueLockKey) + { + Log::error('工作台通讯录导入任务异常:' . $e->getMessage()); + + if (!empty($queueLockKey)) { + Cache::rm($queueLockKey); + Log::info("由于异常释放队列锁: {$queueLockKey}"); + } + + if ($job->attempts() > self::MAX_RETRY_ATTEMPTS) { + $job->delete(); + } else { + $job->release(Config::get('queue.failed_delay', 10)); + } + + return false; + } +} \ No newline at end of file