Files
toolbox/app/Services/MessageDispatchService.php
2025-12-02 10:16:32 +08:00

350 lines
12 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace App\Services;
use App\Clients\MonoClient;
use Illuminate\Support\Facades\DB;
use Carbon\Carbon;
class MessageDispatchService
{
private MonoClient $monoClient;
public function __construct(MonoClient $monoClient)
{
$this->monoClient = $monoClient;
}
/**
* 查询异常的消息分发数据
*/
public function getAbnormalDispatches(
?array $msgIds = null,
?int $requestStatus = null,
?int $businessStatus = null,
?array $targetServices = null,
?array $countryCodes = null,
?array $domains = null
): array
{
try {
$query = DB::connection('monoslave')
->table('message_dispatch as md')
->join('message_consumer as mc', 'mc.msg_id', '=', 'md.msg_id')
->leftJoin('service_routes as sr', 'md.target_service', '=', 'sr.id')
->where('md.request_status', '<>', 5)
->where(function ($q) {
$q->where('md.request_status', '<>', 1)
->orWhere('md.business_status', '<>', 1);
})
->where('md.created', '<', Carbon::now()->subMinutes(5));
// 筛选条件
if ($msgIds && count($msgIds) > 0) {
$query->whereIn('md.msg_id', $msgIds);
}
if ($requestStatus !== null) {
$query->where('md.request_status', $requestStatus);
}
if ($businessStatus !== null) {
$query->where('md.business_status', $businessStatus);
}
if ($targetServices && count($targetServices) > 0) {
$query->whereIn('md.target_service', $targetServices);
}
if ($countryCodes && count($countryCodes) > 0) {
$query->where(function ($q) use ($countryCodes) {
$hasNull = in_array('', $countryCodes) || in_array(null, $countryCodes);
$nonNullCodes = array_filter($countryCodes, fn($c) => $c !== '' && $c !== null);
if ($hasNull) {
$q->whereNull('sr.country_code');
if (count($nonNullCodes) > 0) {
$q->orWhereIn('sr.country_code', $nonNullCodes);
}
} else {
$q->whereIn('sr.country_code', $nonNullCodes);
}
});
}
if ($domains && count($domains) > 0) {
$query->where(function ($q) use ($domains) {
$hasNull = in_array('', $domains) || in_array(null, $domains);
$nonNullDomains = array_filter($domains, fn($d) => $d !== '' && $d !== null);
if ($hasNull) {
$q->whereNull('sr.service_endpoint');
if (count($nonNullDomains) > 0) {
$q->orWhere(function ($subQ) use ($nonNullDomains) {
foreach ($nonNullDomains as $domain) {
$subQ->orWhere('sr.service_endpoint', 'like', "%{$domain}%");
}
});
}
} else {
$q->where(function ($subQ) use ($nonNullDomains) {
foreach ($nonNullDomains as $domain) {
$subQ->orWhere('sr.service_endpoint', 'like', "%{$domain}%");
}
});
}
});
}
$results = $query->select([
'md.id',
'md.msg_id',
'md.target_service',
'md.request_status',
'md.business_status',
'md.retry_count',
'md.request_error_message',
'md.business_error_message',
'mc.event_name',
'mc.entity_code',
DB::raw("mc.msg_body->>'$.data.delAccountList' as delAccountList"),
DB::raw("mc.msg_body->>'$.data.afterStatus' as afterStatus"),
'mc.msg_body',
'md.created',
'md.updated',
'sr.country_code',
'sr.service_endpoint'
])->get();
// 获取所有msg_id排除US域名的消息
$nonUsMsgIds = $results->filter(function ($item) {
$domain = $item->service_endpoint ? (parse_url($item->service_endpoint, PHP_URL_HOST) ?? $item->service_endpoint) : null;
return $domain !== 'partner-us.eainc.com';
})->pluck('msg_id')->unique()->toArray();
// 从Agent库查询consumer状态仅非US域名
$consumerStatuses = $this->getConsumerStatuses($nonUsMsgIds);
return $results->map(function ($item) use ($consumerStatuses) {
$domain = $item->service_endpoint ? (parse_url($item->service_endpoint, PHP_URL_HOST) ?? $item->service_endpoint) : null;
$isUsDomain = $domain === 'partner-us.eainc.com';
return [
'id' => $item->id,
'msg_id' => $item->msg_id,
'target_service' => $item->target_service,
'country_code' => $item->country_code,
'domain' => $domain,
'request_status' => $item->request_status,
'business_status' => $item->business_status,
'retry_count' => $item->retry_count,
'request_error_message' => $item->request_error_message,
'request_error_code' => $item->request_error_code ?? null,
'business_error_message' => $item->business_error_message,
'business_error_code' => $item->business_error_code ?? null,
'event_name' => $item->event_name,
'entity_code' => $item->entity_code,
'delAccountList' => $item->delAccountList,
'afterStatus' => $item->afterStatus,
'msg_body' => $item->msg_body,
'created' => $item->created,
'updated' => $item->updated,
'consumer_status' => $isUsDomain ? null : ($consumerStatuses[$item->msg_id] ?? null),
];
})->toArray();
} catch (\Exception $e) {
throw new \RuntimeException('查询异常消息分发数据失败: ' . $e->getMessage());
}
}
/**
* 从Agent库查询消费者状态
*/
private function getConsumerStatuses(array $msgIds): array
{
if (empty($msgIds)) {
return [];
}
try {
$consumers = DB::connection('agentslave')
->table('crm_event_consumer')
->whereIn('msg_id', $msgIds)
->select(['msg_id', 'status'])
->get();
$statuses = [];
foreach ($consumers as $consumer) {
$statuses[$consumer->msg_id] = $consumer->status;
}
return $statuses;
} catch (\Exception $e) {
return [];
}
}
/**
* 格式化服务名称country_code(域名)
*/
private function formatServiceName(?string $countryCode, ?string $serviceEndpoint): string
{
if (!$countryCode || !$serviceEndpoint) {
return 'Unknown';
}
// 从URL中提取域名
$domain = parse_url($serviceEndpoint, PHP_URL_HOST) ?? $serviceEndpoint;
return "{$countryCode}({$domain})";
}
/**
* 获取所有可用的服务列表
*/
public function getAvailableServices(): array
{
try {
$services = DB::connection('monoslave')
->table('service_routes')
->select(['id', 'country_code', 'service_endpoint'])
->get();
return $services->map(function ($service) {
return [
'id' => $service->id,
'name' => $this->formatServiceName($service->country_code, $service->service_endpoint),
];
})->toArray();
} catch (\Exception $e) {
return [];
}
}
/**
* 获取所有国家代码列表
*/
public function getAvailableCountryCodes(): array
{
try {
$codes = DB::connection('monoslave')
->table('service_routes')
->select('country_code')
->distinct()
->orderBy('country_code')
->get()
->pluck('country_code')
->filter()
->values()
->toArray();
return $codes;
} catch (\Exception $e) {
return [];
}
}
/**
* 获取所有域名列表
*/
public function getAvailableDomains(): array
{
try {
$endpoints = DB::connection('monoslave')
->table('service_routes')
->select('service_endpoint')
->distinct()
->get()
->pluck('service_endpoint')
->filter()
->map(function ($endpoint) {
return parse_url($endpoint, PHP_URL_HOST) ?? $endpoint;
})
->unique()
->values()
->toArray();
return $endpoints;
} catch (\Exception $e) {
return [];
}
}
/**
* 获取所有服务路由列表
*/
public function getServiceRoutes(): array
{
try {
$routes = DB::connection('monoslave')
->table('service_routes')
->select('id', 'country_code', 'service_endpoint')
->orderBy('country_code')
->orderBy('id')
->get()
->map(function ($route) {
$domain = parse_url($route->service_endpoint, PHP_URL_HOST) ?? $route->service_endpoint;
return [
'id' => $route->id,
'country_code' => $route->country_code,
'domain' => $domain,
'display_name' => ($route->country_code ?: '(空)') . ' - ' . $domain,
];
})
->toArray();
return $routes;
} catch (\Exception $e) {
return [];
}
}
/**
* 批量更新消息分发状态
*/
public function batchUpdateDispatch(array $updates): array
{
$results = [];
foreach ($updates as $update) {
try {
$result = $this->updateDispatch($update);
$results[] = [
'id' => $update['id'],
'success' => $result['success'],
'message' => $result['message'] ?? null,
];
} catch (\Exception $e) {
$results[] = [
'id' => $update['id'],
'success' => false,
'message' => $e->getMessage(),
];
}
}
return $results;
}
/**
* 更新单个消息分发状态
*/
private function updateDispatch(array $data): array
{
$response = $this->monoClient->updateDispatch($data);
if ($response->successful()) {
return [
'success' => true,
'data' => $response->json(),
];
}
return [
'success' => false,
'message' => 'HTTP ' . $response->status() . ': ' . $response->body(),
];
}
}