272 lines
8.3 KiB
PHP
272 lines
8.3 KiB
PHP
<?php
|
|
|
|
namespace App\Services;
|
|
|
|
use Illuminate\Support\Facades\DB;
|
|
use Carbon\Carbon;
|
|
|
|
class EventConsumerSyncService
|
|
{
|
|
/**
|
|
* Agent监听的所有topic列表
|
|
*/
|
|
private const AGENT_LISTENED_TOPICS = [
|
|
// Case
|
|
'CASE_CREATE',
|
|
'CASE_FILE',
|
|
'CASE_3D_SEND',
|
|
'CASE_3D_CONFIRM',
|
|
'CASE_3D_CANCEL_CONFIRM',
|
|
'CASE_STAGE_FINISH',
|
|
'CASE_STAGE_CREATE',
|
|
'CASE_FINISH',
|
|
'CASE_PAUSE',
|
|
'CASE_DELETED',
|
|
'CASE_CONTINUE',
|
|
'CASE_MONEY_IN',
|
|
'CASE_BASIC_INFO_CHANGE',
|
|
'CASE_REFERRAL',
|
|
'CASE_TAGS_CHANGE',
|
|
'CASE_RECOVER',
|
|
'CASE_PRODUCT_WAIT_CONFIRM',
|
|
'CASE_PRODUCT_CONFIRM',
|
|
'CASE_PRODUCT_CANCEL_CONFIRM',
|
|
'CASE_START_DESIGN',
|
|
'CASE_NOT_TREATED',
|
|
'CASE_REOPEN',
|
|
'MEDICAL_DESIGN_EVENT',
|
|
// Production
|
|
'PRODUCTION_CREATE',
|
|
'PRODUCTION_DELIVER',
|
|
'SHIPPING_STATUS',
|
|
'PRODUCTION_INFO_CHANGE',
|
|
// Doctor
|
|
'DOCTOR_CREATE',
|
|
'DOCTOR_INFO_CHANGE',
|
|
'DOCTOR_STATUS_CHANGE',
|
|
'DOCTOR_DELETE',
|
|
// Hospital
|
|
'ACCOUNT_CREATE',
|
|
'ACCOUNT_INFO_CHANGE',
|
|
'ACCOUNT_AUTH_CHANGE',
|
|
'ACCOUNT_STATUS_CHANGE',
|
|
// BA
|
|
'BA_CREATE',
|
|
'BA_INFO_CHANGE',
|
|
// Business Document
|
|
'BUSINESS_ORDER_CREATE',
|
|
'BUSINESS_ORDER_DATA_CHANGE',
|
|
'BUSINESS_ORDER_STATUS_CHANGE',
|
|
'BUSINESS_ORDER_CHECK',
|
|
// Sale Document
|
|
'SALES_ORDER_CREATE',
|
|
'SALES_ORDER_DATA_CHANGE',
|
|
'SALES_ORDER_STATUS_CHANGE',
|
|
// Contract
|
|
'CONTRACT_CREATE',
|
|
'CONTRACT_INFO_CHANGE',
|
|
// Lead Hospital
|
|
'LEAD_HOSPITAL_CREATE',
|
|
'LEAD_HOSPITAL_CHANGE',
|
|
// Lead Doctor
|
|
'CREATE_LEADS',
|
|
'LEAD_UPDATE',
|
|
// GROUP
|
|
'GROUP_CREATE',
|
|
'GROUP_INFO_CHANGE',
|
|
// Hospital Enter
|
|
'ACCOUNT_ENTER_INFO_REJECT',
|
|
// Orthodontic
|
|
'APPLIANCE_CREATE',
|
|
'APPLIANCE_CHANGE',
|
|
'RETAINER_CHANGE',
|
|
'SET_MENU_CHANGE',
|
|
];
|
|
|
|
/**
|
|
* 查询CRM事件消费者表的消息数据
|
|
* 只查询Agent监听的topic
|
|
*/
|
|
public function getCrmEventConsumers(
|
|
?Carbon $startTime = null,
|
|
?Carbon $endTime = null,
|
|
array $excludeMessages = []
|
|
): array {
|
|
try {
|
|
$query = DB::connection('crmslave')
|
|
->table('crm_api.crm_event_consumer');
|
|
|
|
// 只查询Agent监听的topic
|
|
$query->whereIn('event_name', self::AGENT_LISTENED_TOPICS);
|
|
|
|
if ($startTime) {
|
|
$query->where('created', '>=', $startTime);
|
|
}
|
|
|
|
if ($endTime) {
|
|
$query->where('created', '<=', $endTime);
|
|
}
|
|
|
|
if (!empty($excludeMessages)) {
|
|
$query->whereNotIn('event_name', $excludeMessages);
|
|
}
|
|
|
|
$messages = $query->select([
|
|
'msg_id',
|
|
'event_name',
|
|
'msg_body',
|
|
'created',
|
|
'updated'
|
|
])->get();
|
|
|
|
return $messages->map(function ($msg) {
|
|
return [
|
|
'msg_id' => $msg->msg_id,
|
|
'event_name' => $msg->event_name,
|
|
'msg_body' => $msg->msg_body,
|
|
'created' => $msg->created,
|
|
'updated' => $msg->updated,
|
|
];
|
|
})->toArray();
|
|
} catch (\Exception $e) {
|
|
throw new \RuntimeException('查询CRM事件消费者表失败: ' . $e->getMessage());
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 查询Agent事件消费者表的消息数据(通过msg_id列表,分批查询)
|
|
* @param array $msgIds msg_id列表
|
|
* @param int $batchSize 每批查询的数量,默认1000
|
|
*/
|
|
public function getAgentEventConsumersByMsgIds(array $msgIds, int $batchSize = 1000): array {
|
|
try {
|
|
if (empty($msgIds)) {
|
|
return [];
|
|
}
|
|
|
|
$messages = [];
|
|
|
|
// 分批查询,避免一次性查询过多数据导致慢查询
|
|
$batches = array_chunk($msgIds, $batchSize);
|
|
|
|
foreach ($batches as $batch) {
|
|
$batchMessages = DB::connection('agentslave')
|
|
->table('crm_event_consumer')
|
|
->whereIn('msg_id', $batch)
|
|
->select([
|
|
'msg_id',
|
|
'event_name',
|
|
'msg_body',
|
|
'created',
|
|
'updated'
|
|
])->get();
|
|
|
|
foreach ($batchMessages as $msg) {
|
|
$messages[] = [
|
|
'msg_id' => $msg->msg_id,
|
|
'event_name' => $msg->event_name,
|
|
'msg_body' => $msg->msg_body,
|
|
'created' => $msg->created,
|
|
'updated' => $msg->updated,
|
|
];
|
|
}
|
|
}
|
|
|
|
return $messages;
|
|
} catch (\Exception $e) {
|
|
throw new \RuntimeException('查询Agent事件消费者表失败: ' . $e->getMessage());
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 对比CRM和Agent的消息,找出缺失的消息
|
|
* 策略:先查CRM的msg_id,然后用这些msg_id到Agent中查询,避免时间差异导致的缺失
|
|
*/
|
|
public function compareSyncStatus(
|
|
?Carbon $startTime = null,
|
|
?Carbon $endTime = null,
|
|
array $excludeMessages = [],
|
|
?string $messageName = null
|
|
): array {
|
|
// 1. 先查询CRM中的所有消息
|
|
$crmMessages = $this->getCrmEventConsumers($startTime, $endTime, $excludeMessages);
|
|
|
|
// 如果指定了消息名称,则进一步过滤
|
|
if ($messageName) {
|
|
$crmMessages = array_filter($crmMessages, function ($msg) use ($messageName) {
|
|
return $msg['event_name'] === $messageName;
|
|
});
|
|
}
|
|
|
|
$crmMsgIds = array_column($crmMessages, 'msg_id');
|
|
|
|
// 2. 用CRM的msg_id到Agent中查询(不受时间限制)
|
|
$agentMessages = $this->getAgentEventConsumersByMsgIds($crmMsgIds);
|
|
$agentMsgIds = array_column($agentMessages, 'msg_id');
|
|
|
|
// 3. 找出在CRM中但不在Agent中的消息
|
|
$missingMsgIds = array_diff($crmMsgIds, $agentMsgIds);
|
|
|
|
$missingMessages = array_filter($crmMessages, function ($msg) use ($missingMsgIds) {
|
|
return in_array($msg['msg_id'], $missingMsgIds);
|
|
});
|
|
|
|
// 4. 按topic统计缺失消息数量
|
|
$missingByTopic = $this->groupMissingMessagesByTopic($missingMessages);
|
|
|
|
return [
|
|
'crm_total' => count($crmMessages),
|
|
'agent_total' => count($agentMessages),
|
|
'missing_count' => count($missingMessages),
|
|
'sync_rate' => count($crmMessages) > 0
|
|
? round((count($crmMessages) - count($missingMessages)) / count($crmMessages) * 100, 2)
|
|
: 100,
|
|
'missing_messages' => array_values($missingMessages),
|
|
'missing_by_topic' => $missingByTopic,
|
|
'summary' => [
|
|
'start_time' => $startTime?->toDateTimeString(),
|
|
'end_time' => $endTime?->toDateTimeString(),
|
|
'message_name' => $messageName,
|
|
'excluded_messages' => $excludeMessages,
|
|
]
|
|
];
|
|
}
|
|
|
|
/**
|
|
* 获取Agent监听的所有topic列表
|
|
*/
|
|
public function getAgentListenedTopics(): array {
|
|
return self::AGENT_LISTENED_TOPICS;
|
|
}
|
|
|
|
/**
|
|
* 按topic统计缺失消息数量
|
|
*/
|
|
private function groupMissingMessagesByTopic(array $missingMessages): array {
|
|
$grouped = [];
|
|
|
|
foreach ($missingMessages as $msg) {
|
|
$topic = $msg['event_name'] ?? 'unknown';
|
|
|
|
if (!isset($grouped[$topic])) {
|
|
$grouped[$topic] = [
|
|
'topic' => $topic,
|
|
'count' => 0,
|
|
'messages' => []
|
|
];
|
|
}
|
|
|
|
$grouped[$topic]['count']++;
|
|
$grouped[$topic]['messages'][] = $msg['msg_id'];
|
|
}
|
|
|
|
// 按缺失数量降序排序
|
|
uasort($grouped, function ($a, $b) {
|
|
return $b['count'] - $a['count'];
|
|
});
|
|
|
|
return array_values($grouped);
|
|
}
|
|
}
|
|
|