table('crm_api.crm_event_consumer'); // 只查询Agent监听的topic $query->whereIn('event_name', self::AGENT_LISTENED_TOPICS); if ($startTime) { $query->where('created', '>=', $startTime); } if ($endTime) { $query->where('created', '<=', $endTime); } if (!empty($excludeMessages)) { $query->whereNotIn('event_name', $excludeMessages); } $messages = $query->select([ 'msg_id', 'event_name', 'msg_body', 'created', 'updated' ])->get(); return $messages->map(function ($msg) { return [ 'msg_id' => $msg->msg_id, 'event_name' => $msg->event_name, 'msg_body' => $msg->msg_body, 'created' => $msg->created, 'updated' => $msg->updated, ]; })->toArray(); } catch (\Exception $e) { throw new \RuntimeException('查询CRM事件消费者表失败: ' . $e->getMessage()); } } /** * 查询Agent事件消费者表的消息数据(通过msg_id列表,分批查询) * @param array $msgIds msg_id列表 * @param int $batchSize 每批查询的数量,默认1000 */ public function getAgentEventConsumersByMsgIds(array $msgIds, int $batchSize = 1000): array { try { if (empty($msgIds)) { return []; } $messages = []; // 分批查询,避免一次性查询过多数据导致慢查询 $batches = array_chunk($msgIds, $batchSize); foreach ($batches as $batch) { $batchMessages = DB::connection('agentslave') ->table('crm_event_consumer') ->whereIn('msg_id', $batch) ->select([ 'msg_id', 'event_name', 'msg_body', 'created', 'updated' ])->get(); foreach ($batchMessages as $msg) { $messages[] = [ 'msg_id' => $msg->msg_id, 'event_name' => $msg->event_name, 'msg_body' => $msg->msg_body, 'created' => $msg->created, 'updated' => $msg->updated, ]; } } return $messages; } catch (\Exception $e) { throw new \RuntimeException('查询Agent事件消费者表失败: ' . $e->getMessage()); } } /** * 对比CRM和Agent的消息,找出缺失的消息 * 策略:先查CRM的msg_id,然后用这些msg_id到Agent中查询,避免时间差异导致的缺失 */ public function compareSyncStatus( ?Carbon $startTime = null, ?Carbon $endTime = null, array $excludeMessages = [], ?string $messageName = null ): array { // 1. 先查询CRM中的所有消息 $crmMessages = $this->getCrmEventConsumers($startTime, $endTime, $excludeMessages); // 如果指定了消息名称,则进一步过滤 if ($messageName) { $crmMessages = array_filter($crmMessages, function ($msg) use ($messageName) { return $msg['event_name'] === $messageName; }); } $crmMsgIds = array_column($crmMessages, 'msg_id'); // 2. 用CRM的msg_id到Agent中查询(不受时间限制) $agentMessages = $this->getAgentEventConsumersByMsgIds($crmMsgIds); $agentMsgIds = array_column($agentMessages, 'msg_id'); // 3. 找出在CRM中但不在Agent中的消息 $missingMsgIds = array_diff($crmMsgIds, $agentMsgIds); $missingMessages = array_filter($crmMessages, function ($msg) use ($missingMsgIds) { return in_array($msg['msg_id'], $missingMsgIds); }); // 4. 按topic统计缺失消息数量 $missingByTopic = $this->groupMissingMessagesByTopic($missingMessages); return [ 'crm_total' => count($crmMessages), 'agent_total' => count($agentMessages), 'missing_count' => count($missingMessages), 'sync_rate' => count($crmMessages) > 0 ? round((count($crmMessages) - count($missingMessages)) / count($crmMessages) * 100, 2) : 100, 'missing_messages' => array_values($missingMessages), 'missing_by_topic' => $missingByTopic, 'summary' => [ 'start_time' => $startTime?->toDateTimeString(), 'end_time' => $endTime?->toDateTimeString(), 'message_name' => $messageName, 'excluded_messages' => $excludeMessages, ] ]; } /** * 获取Agent监听的所有topic列表 */ public function getAgentListenedTopics(): array { return self::AGENT_LISTENED_TOPICS; } /** * 按topic统计缺失消息数量 */ private function groupMissingMessagesByTopic(array $missingMessages): array { $grouped = []; foreach ($missingMessages as $msg) { $topic = $msg['event_name'] ?? 'unknown'; if (!isset($grouped[$topic])) { $grouped[$topic] = [ 'topic' => $topic, 'count' => 0, 'messages' => [] ]; } $grouped[$topic]['count']++; $grouped[$topic]['messages'][] = $msg['msg_id']; } // 按缺失数量降序排序 uasort($grouped, function ($a, $b) { return $b['count'] - $a['count']; }); return array_values($grouped); } }