> /path/to/log/scheduler.log 2>&1 */ class TaskSchedulerCommand extends Command { /** * 任务配置 */ protected $tasks = []; /** * 最大并发进程数 */ protected $maxConcurrent = 10; /** * 当前运行的进程数 */ protected $runningProcesses = []; /** * 日志目录 */ protected $logDir = ''; protected function configure() { $this->setName('scheduler:run') ->setDescription('统一任务调度器,支持多进程并发执行所有定时任务'); } protected function execute(Input $input, Output $output) { $output->writeln('=========================================='); $output->writeln('任务调度器启动'); $output->writeln('时间: ' . date('Y-m-d H:i:s')); $output->writeln('=========================================='); // 检查是否支持 pcntl 扩展 if (!function_exists('pcntl_fork')) { $output->writeln('错误:系统不支持 pcntl 扩展,无法使用多进程功能'); $output->writeln('提示:将使用单进程顺序执行任务'); $this->maxConcurrent = 1; } // 加载任务配置(优先使用框架配置,其次直接引入配置文件,避免加载失败) $this->tasks = Config::get('task_scheduler', []); // 如果通过 Config 没有读到,再尝试直接 include 配置文件 if (empty($this->tasks)) { // 以项目根目录为基准查找 config/task_scheduler.php $configFile = dirname(__DIR__, 2) . DIRECTORY_SEPARATOR . 'config' . DIRECTORY_SEPARATOR . 'task_scheduler.php'; if (is_file($configFile)) { $config = include $configFile; if (is_array($config) && !empty($config)) { $this->tasks = $config; } } } if (empty($this->tasks)) { $output->writeln('错误:未找到任务配置(task_scheduler),请检查 config/task_scheduler.php 是否存在且返回数组'); return false; } // 设置日志目录 $this->logDir = runtime_path() . 'log' . DIRECTORY_SEPARATOR; if (!is_dir($this->logDir)) { mkdir($this->logDir, 0755, true); } // 获取当前时间 $currentTime = time(); $currentMinute = date('i', $currentTime); $currentHour = date('H', $currentTime); $currentDay = date('d', $currentTime); $currentMonth = date('m', $currentTime); $currentWeekday = date('w', $currentTime); // 0=Sunday, 6=Saturday $output->writeln("当前时间: {$currentHour}:{$currentMinute}"); $output->writeln("已加载 " . count($this->tasks) . " 个任务配置"); // 筛选需要执行的任务 $tasksToRun = []; foreach ($this->tasks as $taskId => $task) { if (!isset($task['enabled']) || !$task['enabled']) { continue; } if ($this->shouldRun($task['schedule'], $currentMinute, $currentHour, $currentDay, $currentMonth, $currentWeekday)) { $tasksToRun[$taskId] = $task; } } if (empty($tasksToRun)) { $output->writeln('当前时间没有需要执行的任务'); return true; } $output->writeln("找到 " . count($tasksToRun) . " 个需要执行的任务"); // 执行任务 if ($this->maxConcurrent > 1 && function_exists('pcntl_fork')) { $this->executeConcurrent($tasksToRun, $output); } else { $this->executeSequential($tasksToRun, $output); } // 清理僵尸进程 $this->cleanupZombieProcesses(); $output->writeln('=========================================='); $output->writeln('任务调度器执行完成'); $output->writeln('=========================================='); return true; } /** * 判断任务是否应该执行 * * @param string $schedule cron表达式,格式:分钟 小时 日 月 星期 * @param int $minute 当前分钟 * @param int $hour 当前小时 * @param int $day 当前日期 * @param int $month 当前月份 * @param int $weekday 当前星期 * @return bool */ protected function shouldRun($schedule, $minute, $hour, $day, $month, $weekday) { $parts = preg_split('/\s+/', trim($schedule)); if (count($parts) < 5) { return false; } list($scheduleMinute, $scheduleHour, $scheduleDay, $scheduleMonth, $scheduleWeekday) = $parts; // 解析分钟 if (!$this->matchCronField($scheduleMinute, $minute)) { return false; } // 解析小时 if (!$this->matchCronField($scheduleHour, $hour)) { return false; } // 解析日期 if (!$this->matchCronField($scheduleDay, $day)) { return false; } // 解析月份 if (!$this->matchCronField($scheduleMonth, $month)) { return false; } // 解析星期(注意:cron中0和7都表示星期日) if ($scheduleWeekday !== '*') { $scheduleWeekday = str_replace('7', '0', $scheduleWeekday); if (!$this->matchCronField($scheduleWeekday, $weekday)) { return false; } } return true; } /** * 匹配cron字段 * * @param string $field cron字段表达式 * @param int $value 当前值 * @return bool */ protected function matchCronField($field, $value) { // 通配符 if ($field === '*') { return true; } // 列表(逗号分隔) if (strpos($field, ',') !== false) { $values = explode(',', $field); foreach ($values as $v) { if ($this->matchCronField(trim($v), $value)) { return true; } } return false; } // 范围(如 1-5) if (strpos($field, '-') !== false) { list($start, $end) = explode('-', $field); return $value >= (int)$start && $value <= (int)$end; } // 步长(如 */5 或 0-59/5) if (strpos($field, '/') !== false) { $parts = explode('/', $field); $base = $parts[0]; $step = (int)$parts[1]; if ($base === '*') { return $value % $step === 0; } else { // 处理范围步长,如 0-59/5 if (strpos($base, '-') !== false) { list($start, $end) = explode('-', $base); if ($value >= (int)$start && $value <= (int)$end) { return ($value - (int)$start) % $step === 0; } return false; } else { return $value % $step === 0; } } } // 精确匹配 return (int)$field === $value; } /** * 并发执行任务(多进程) * * @param array $tasks 任务列表 * @param Output $output 输出对象 */ protected function executeConcurrent($tasks, Output $output) { $output->writeln('使用多进程并发执行任务(最大并发数:' . $this->maxConcurrent . ')'); foreach ($tasks as $taskId => $task) { // 等待可用进程槽 while (count($this->runningProcesses) >= $this->maxConcurrent) { $this->waitForProcesses(); usleep(100000); // 等待100ms } // 检查任务是否已经在运行(防止重复执行) $lockKey = "scheduler_task_lock:{$taskId}"; $lockTime = Cache::get($lockKey); if ($lockTime && (time() - $lockTime) < 300) { // 5分钟内不重复执行 $output->writeln("任务 {$taskId} 正在运行中,跳过"); continue; } // 创建子进程 $pid = pcntl_fork(); if ($pid == -1) { // 创建进程失败 $output->writeln("创建子进程失败:{$taskId}"); Log::error("任务调度器:创建子进程失败", ['task' => $taskId]); continue; } elseif ($pid == 0) { // 子进程:执行任务 $this->runTask($taskId, $task); exit(0); } else { // 父进程:记录子进程PID $this->runningProcesses[$pid] = [ 'task_id' => $taskId, 'start_time' => time(), ]; $output->writeln("启动任务:{$taskId} (PID: {$pid})"); // 设置任务锁 Cache::set($lockKey, time(), 600); // 10分钟过期 } } // 等待所有子进程完成 while (!empty($this->runningProcesses)) { $this->waitForProcesses(); usleep(500000); // 等待500ms } } /** * 顺序执行任务(单进程) * * @param array $tasks 任务列表 * @param Output $output 输出对象 */ protected function executeSequential($tasks, Output $output) { $output->writeln('使用单进程顺序执行任务'); foreach ($tasks as $taskId => $task) { $output->writeln("执行任务:{$taskId}"); $this->runTask($taskId, $task); } } /** * 执行单个任务 * * @param string $taskId 任务ID * @param array $task 任务配置 */ protected function runTask($taskId, $task) { $startTime = microtime(true); $logFile = $this->logDir . ($task['log_file'] ?? "scheduler_{$taskId}.log"); // 确保日志目录存在 $logDir = dirname($logFile); if (!is_dir($logDir)) { mkdir($logDir, 0755, true); } // 构建命令 $thinkPath = root_path() . 'think'; $command = "php {$thinkPath} {$task['command']}"; if (!empty($task['options'])) { foreach ($task['options'] as $option) { $command .= ' ' . escapeshellarg($option); } } // 添加日志重定向 $command .= " >> " . escapeshellarg($logFile) . " 2>&1"; // 记录任务开始 $logMessage = "\n" . str_repeat('=', 60) . "\n"; $logMessage .= "任务开始执行: {$taskId}\n"; $logMessage .= "执行时间: " . date('Y-m-d H:i:s') . "\n"; $logMessage .= "命令: {$command}\n"; $logMessage .= str_repeat('=', 60) . "\n"; file_put_contents($logFile, $logMessage, FILE_APPEND); // 执行命令 $descriptorspec = [ 0 => ['file', (PHP_OS_FAMILY === 'Windows' ? 'NUL' : '/dev/null'), 'r'], // stdin 1 => ['file', $logFile, 'a'], // stdout 2 => ['file', $logFile, 'a'], // stderr ]; $process = @proc_open($command, $descriptorspec, $pipes, root_path()); if (is_resource($process)) { // 关闭管道 if (isset($pipes[0])) @fclose($pipes[0]); if (isset($pipes[1])) @fclose($pipes[1]); if (isset($pipes[2])) @fclose($pipes[2]); // 设置超时 $timeout = $task['timeout'] ?? 3600; $startWaitTime = time(); // 等待进程完成或超时 while (true) { $status = proc_get_status($process); if (!$status['running']) { break; } // 检查超时 if ((time() - $startWaitTime) > $timeout) { if (function_exists('proc_terminate')) { proc_terminate($process, SIGTERM); // 等待进程终止 sleep(2); $status = proc_get_status($process); if ($status['running']) { // 强制终止 proc_terminate($process, SIGKILL); } } Log::warning("任务执行超时", [ 'task' => $taskId, 'timeout' => $timeout, ]); break; } usleep(500000); // 等待500ms } // 关闭进程 proc_close($process); } else { // 如果 proc_open 失败,尝试直接执行(后台执行) if (PHP_OS_FAMILY === 'Windows') { pclose(popen("start /B " . $command, "r")); } else { exec($command . ' > /dev/null 2>&1 &'); } } $endTime = microtime(true); $duration = round($endTime - $startTime, 2); // 记录任务完成 $logMessage = "\n" . str_repeat('=', 60) . "\n"; $logMessage .= "任务执行完成: {$taskId}\n"; $logMessage .= "完成时间: " . date('Y-m-d H:i:s') . "\n"; $logMessage .= "执行时长: {$duration} 秒\n"; $logMessage .= str_repeat('=', 60) . "\n"; file_put_contents($logFile, $logMessage, FILE_APPEND); Log::info("任务执行完成", [ 'task' => $taskId, 'duration' => $duration, ]); } /** * 等待进程完成 */ protected function waitForProcesses() { foreach ($this->runningProcesses as $pid => $info) { $status = 0; $result = pcntl_waitpid($pid, $status, WNOHANG); if ($result == $pid || $result == -1) { // 进程已结束 unset($this->runningProcesses[$pid]); $duration = time() - $info['start_time']; Log::info("子进程执行完成", [ 'pid' => $pid, 'task' => $info['task_id'], 'duration' => $duration, ]); } } } /** * 清理僵尸进程 */ protected function cleanupZombieProcesses() { if (!function_exists('pcntl_waitpid')) { return; } $status = 0; while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) { // 清理僵尸进程 } } }