monoClient = $monoClient; } /** * 根据消息ID列表从crmslave数据库获取消息数据 */ public function getMessagesByIds(array $messageIds): Collection { if (empty($messageIds)) { return collect(); } try { $messages = DB::connection('crmslave') ->table('system_publish_event') ->whereIn('msg_id', $messageIds) ->select([ 'msg_id', 'event_type', 'trace_id', 'event_param', 'event_property', 'timestamp' ]) ->get(); return $messages->map(function ($message) { return [ 'msg_id' => $message->msg_id, 'event_type' => $message->event_type, 'trace_id' => $message->trace_id, 'event_param' => $message->event_param, 'event_property' => $message->event_property, 'timestamp' => $message->timestamp, 'parsed_param' => $this->parseJsonField($message->event_param), 'parsed_property' => $this->parseJsonField($message->event_property), ]; }); } catch (\Exception $e) { throw new \RuntimeException('从crmslave数据库获取消息失败: ' . $e->getMessage()); } } /** * 批量同步消息(通过mono消费) */ public function syncMessages(array $messageIds): array { $results = []; foreach ($messageIds as $msgId) { $results[] = $this->syncSingleMessage($msgId); } return $results; } /** * 通过mono消费单个消息 */ private function syncSingleMessage(string $msgId): array { try { $response = $this->monoClient->consumeMessage($msgId); $body = $response->json(); if ($response->successful() && ($body['code'] ?? -1) === 0) { return [ 'msg_id' => $msgId, 'success' => true, 'response' => $body, ]; } else { return [ 'msg_id' => $msgId, 'success' => false, 'error' => $body['message'] ?? ('HTTP ' . $response->status() . ': ' . $response->body()), 'response' => $body, ]; } } catch (\Exception $e) { return [ 'msg_id' => $msgId, 'success' => false, 'error' => '请求失败: ' . $e->getMessage(), ]; } } /** * 解析JSON字段 */ private function parseJsonField(?string $jsonString): mixed { if (empty($jsonString)) { return null; } try { return json_decode($jsonString, true); } catch (\Exception $e) { return $jsonString; // 如果解析失败,返回原始字符串 } } /** * 验证消息ID格式 */ public function validateMessageIds(array $messageIds): array { $errors = []; foreach ($messageIds as $index => $messageId) { if (empty($messageId)) { $errors[] = "第 " . ($index + 1) . " 行: 消息ID不能为空"; continue; } if (!is_string($messageId) && !is_numeric($messageId)) { $errors[] = "第 " . ($index + 1) . " 行: 消息ID格式无效"; continue; } // 可以添加更多的格式验证规则 } return $errors; } /** * 获取消息统计信息 */ public function getMessageStats(array $messageIds): array { $messages = $this->getMessagesByIds($messageIds); $stats = [ 'total_requested' => count($messageIds), 'total_found' => $messages->count(), 'total_missing' => count($messageIds) - $messages->count(), 'event_types' => $messages->groupBy('event_type')->map->count(), 'missing_ids' => array_diff($messageIds, $messages->pluck('msg_id')->toArray()), ]; return $stats; } }