Files
toolbox/app/Services/MessageSyncService.php

163 lines
4.6 KiB
PHP

<?php
namespace App\Services;
use App\Clients\MonoClient;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Collection;
use Carbon\Carbon;
class MessageSyncService
{
private MonoClient $monoClient;
public function __construct(MonoClient $monoClient)
{
$this->monoClient = $monoClient;
}
/**
* 根据消息ID列表从crmslave数据库获取消息数据
*/
public function getMessagesByIds(array $messageIds): Collection
{
if (empty($messageIds)) {
return collect();
}
try {
$messages = DB::connection('crmslave')
->table('system_publish_event')
->whereIn('msg_id', $messageIds)
->select([
'msg_id',
'event_type',
'trace_id',
'event_param',
'event_property',
'timestamp'
])
->get();
return $messages->map(function ($message) {
return [
'msg_id' => $message->msg_id,
'event_type' => $message->event_type,
'trace_id' => $message->trace_id,
'event_param' => $message->event_param,
'event_property' => $message->event_property,
'timestamp' => $message->timestamp,
'parsed_param' => $this->parseJsonField($message->event_param),
'parsed_property' => $this->parseJsonField($message->event_property),
];
});
} catch (\Exception $e) {
throw new \RuntimeException('从crmslave数据库获取消息失败: ' . $e->getMessage());
}
}
/**
* 批量同步消息(通过mono消费)
*/
public function syncMessages(array $messageIds): array
{
$results = [];
foreach ($messageIds as $msgId) {
$results[] = $this->syncSingleMessage($msgId);
}
return $results;
}
/**
* 通过mono消费单个消息
*/
private function syncSingleMessage(string $msgId): array
{
try {
$response = $this->monoClient->consumeMessage($msgId);
$body = $response->json();
if ($response->successful() && ($body['code'] ?? -1) === 0) {
return [
'msg_id' => $msgId,
'success' => true,
'response' => $body,
];
} else {
return [
'msg_id' => $msgId,
'success' => false,
'error' => $body['message'] ?? ('HTTP ' . $response->status() . ': ' . $response->body()),
'response' => $body,
];
}
} catch (\Exception $e) {
return [
'msg_id' => $msgId,
'success' => false,
'error' => '请求失败: ' . $e->getMessage(),
];
}
}
/**
* 解析JSON字段
*/
private function parseJsonField(?string $jsonString): mixed
{
if (empty($jsonString)) {
return null;
}
try {
return json_decode($jsonString, true);
} catch (\Exception $e) {
return $jsonString; // 如果解析失败,返回原始字符串
}
}
/**
* 验证消息ID格式
*/
public function validateMessageIds(array $messageIds): array
{
$errors = [];
foreach ($messageIds as $index => $messageId) {
if (empty($messageId)) {
$errors[] = "" . ($index + 1) . " 行: 消息ID不能为空";
continue;
}
if (!is_string($messageId) && !is_numeric($messageId)) {
$errors[] = "" . ($index + 1) . " 行: 消息ID格式无效";
continue;
}
// 可以添加更多的格式验证规则
}
return $errors;
}
/**
* 获取消息统计信息
*/
public function getMessageStats(array $messageIds): array
{
$messages = $this->getMessagesByIds($messageIds);
$stats = [
'total_requested' => count($messageIds),
'total_found' => $messages->count(),
'total_missing' => count($messageIds) - $messages->count(),
'event_types' => $messages->groupBy('event_type')->map->count(),
'missing_ids' => array_diff($messageIds, $messages->pluck('msg_id')->toArray()),
];
return $stats;
}
}