Initial commit: queue workspace
Made-with: Cursor
This commit is contained in:
87
pro_v3.5.1/app/jobs/hjf/HjfOrderPayJob.php
Normal file
87
pro_v3.5.1/app/jobs/hjf/HjfOrderPayJob.php
Normal file
@@ -0,0 +1,87 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace app\jobs\hjf;
|
||||
|
||||
use app\services\hjf\MemberLevelServices;
|
||||
use app\services\hjf\PointsRewardServices;
|
||||
use app\services\hjf\QueuePoolServices;
|
||||
use crmeb\basic\BaseJobs;
|
||||
use crmeb\traits\QueueTrait;
|
||||
use think\exception\ValidateException;
|
||||
use think\facade\Log;
|
||||
|
||||
/**
|
||||
* 报单商品支付成功异步处理 Job
|
||||
*
|
||||
* 触发时机:Pay 监听器检测到 is_queue_goods=1 时派发。
|
||||
*
|
||||
* 执行流程:
|
||||
* 1. 调用 QueuePoolServices::enqueue() 将订单写入公排池
|
||||
* (内部含 Redis 分布式锁 + 退款触发检测)
|
||||
* 2. 调用 PointsRewardServices::reward() 沿推荐链发放级差积分
|
||||
* 3. 调用 MemberLevelServices::checkUpgrade() 检查下单用户上级链是否触发等级升级
|
||||
*
|
||||
* Class HjfOrderPayJob
|
||||
* @package app\jobs\hjf
|
||||
*/
|
||||
class HjfOrderPayJob extends BaseJobs
|
||||
{
|
||||
use QueueTrait;
|
||||
|
||||
/**
|
||||
* @param int $uid 下单用户 ID
|
||||
* @param string $orderId 订单号(eb_store_order.order_id)
|
||||
* @param float $amount 报单金额(默认 3600.00)
|
||||
* @return bool
|
||||
*/
|
||||
public function doJob(int $uid, string $orderId, float $amount = 3600.00): bool
|
||||
{
|
||||
try {
|
||||
// 1. 公排入队
|
||||
/** @var QueuePoolServices $queueServices */
|
||||
$queueServices = app()->make(QueuePoolServices::class);
|
||||
$queueServices->enqueue($uid, $orderId, $amount);
|
||||
Log::info("[HjfOrderPay] 公排入队成功 uid={$uid} orderId={$orderId}");
|
||||
} catch (ValidateException $e) {
|
||||
// 锁竞争导致入队失败,重新投递到队列(延迟5秒)
|
||||
Log::warning("[HjfOrderPay] 入队被锁,延迟重试 uid={$uid} orderId={$orderId}: " . $e->getMessage());
|
||||
static::dispatchSece(5, [$uid, $orderId, $amount]);
|
||||
return true;
|
||||
} catch (\Throwable $e) {
|
||||
Log::error("[HjfOrderPay] 公排入队异常 uid={$uid} orderId={$orderId}: " . $e->getMessage());
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// 2. 积分奖励(级差发放)
|
||||
/** @var PointsRewardServices $pointsServices */
|
||||
$pointsServices = app()->make(PointsRewardServices::class);
|
||||
$pointsServices->reward($uid, $orderId);
|
||||
Log::info("[HjfOrderPay] 积分奖励发放完成 uid={$uid} orderId={$orderId}");
|
||||
} catch (\Throwable $e) {
|
||||
// 积分发放失败不阻塞主流程,记录错误即可
|
||||
Log::error("[HjfOrderPay] 积分奖励失败 uid={$uid} orderId={$orderId}: " . $e->getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
// 3. 触发推荐链等级升级检查(对买家本人及其直推上级)
|
||||
/** @var MemberLevelServices $levelServices */
|
||||
$levelServices = app()->make(MemberLevelServices::class);
|
||||
$levelServices->checkUpgrade($uid);
|
||||
|
||||
// 同时检查直推上级(支付行为可能满足上级的伞下业绩门槛)
|
||||
$spreadUid = (int)\think\facade\Db::name('user')
|
||||
->where('uid', $uid)
|
||||
->value('spread_uid');
|
||||
if ($spreadUid > 0) {
|
||||
$levelServices->checkUpgrade($spreadUid);
|
||||
}
|
||||
Log::info("[HjfOrderPay] 等级升级检查完成 uid={$uid}");
|
||||
} catch (\Throwable $e) {
|
||||
Log::error("[HjfOrderPay] 等级升级检查失败 uid={$uid}: " . $e->getMessage());
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
45
pro_v3.5.1/app/jobs/hjf/MemberLevelCheckJob.php
Normal file
45
pro_v3.5.1/app/jobs/hjf/MemberLevelCheckJob.php
Normal file
@@ -0,0 +1,45 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace app\jobs\hjf;
|
||||
|
||||
use app\services\hjf\MemberLevelServices;
|
||||
use crmeb\basic\BaseJobs;
|
||||
use crmeb\traits\QueueTrait;
|
||||
use think\facade\Log;
|
||||
|
||||
/**
|
||||
* 会员等级异步检查 Job
|
||||
*
|
||||
* 每次订单支付回调完成后,对推荐链上的上级异步派发此 Job 检查是否达到升级条件。
|
||||
* 调用方式:MemberLevelCheckJob::dispatch($uid)
|
||||
*
|
||||
* Class MemberLevelCheckJob
|
||||
* @package app\jobs\hjf
|
||||
*/
|
||||
class MemberLevelCheckJob extends BaseJobs
|
||||
{
|
||||
use QueueTrait;
|
||||
|
||||
/**
|
||||
* @param int $uid 需要检查升级的用户 ID
|
||||
* @return bool
|
||||
*/
|
||||
public function doJob(int $uid): bool
|
||||
{
|
||||
try {
|
||||
/** @var MemberLevelServices $levelServices */
|
||||
$levelServices = app()->make(MemberLevelServices::class);
|
||||
$levelServices->checkUpgrade($uid);
|
||||
} catch (\Throwable $e) {
|
||||
response_log_write([
|
||||
'message' => "会员等级检查失败 uid={$uid}: " . $e->getMessage(),
|
||||
'file' => $e->getFile(),
|
||||
'line' => $e->getLine(),
|
||||
]);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
47
pro_v3.5.1/app/jobs/hjf/PointsReleaseJob.php
Normal file
47
pro_v3.5.1/app/jobs/hjf/PointsReleaseJob.php
Normal file
@@ -0,0 +1,47 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace app\jobs\hjf;
|
||||
|
||||
use app\services\hjf\PointsReleaseServices;
|
||||
use crmeb\basic\BaseJobs;
|
||||
use crmeb\traits\QueueTrait;
|
||||
use think\facade\Log;
|
||||
|
||||
/**
|
||||
* 积分每日释放 Job
|
||||
*
|
||||
* 由定时任务(crontab 或 Swoole Timer)在每天凌晨 00:01 触发。
|
||||
* 调用方式:PointsReleaseJob::dispatch()
|
||||
*
|
||||
* Class PointsReleaseJob
|
||||
* @package app\jobs\hjf
|
||||
*/
|
||||
class PointsReleaseJob extends BaseJobs
|
||||
{
|
||||
use QueueTrait;
|
||||
|
||||
/**
|
||||
* 执行积分释放
|
||||
* @return bool
|
||||
*/
|
||||
public function doJob(): bool
|
||||
{
|
||||
try {
|
||||
/** @var PointsReleaseServices $releaseServices */
|
||||
$releaseServices = app()->make(PointsReleaseServices::class);
|
||||
$result = $releaseServices->executeRelease();
|
||||
|
||||
Log::info('[PointsReleaseJob] 执行完成', $result);
|
||||
} catch (\Throwable $e) {
|
||||
response_log_write([
|
||||
'message' => '积分每日释放任务失败: ' . $e->getMessage(),
|
||||
'file' => $e->getFile(),
|
||||
'line' => $e->getLine(),
|
||||
]);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
95
pro_v3.5.1/app/jobs/hjf/QueueRefundJob.php
Normal file
95
pro_v3.5.1/app/jobs/hjf/QueueRefundJob.php
Normal file
@@ -0,0 +1,95 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace app\jobs\hjf;
|
||||
|
||||
use app\dao\hjf\QueuePoolDao;
|
||||
use app\dao\user\UserBillDao;
|
||||
use app\dao\user\UserDao;
|
||||
use crmeb\basic\BaseJobs;
|
||||
use crmeb\traits\QueueTrait;
|
||||
use think\facade\Db;
|
||||
use think\facade\Log;
|
||||
|
||||
/**
|
||||
* 公排退款异步 Job
|
||||
*
|
||||
* 由 QueuePoolServices::checkAndTriggerRefund() 派发。
|
||||
* 执行流程:
|
||||
* 1. 二次检查记录状态(防止重复退款)
|
||||
* 2. 在数据库事务中:标记记录已退款 + 写入用户余额 + 写 user_bill 流水
|
||||
*
|
||||
* Class QueueRefundJob
|
||||
* @package app\jobs\hjf
|
||||
*/
|
||||
class QueueRefundJob extends BaseJobs
|
||||
{
|
||||
use QueueTrait;
|
||||
|
||||
/**
|
||||
* 执行退款
|
||||
*
|
||||
* @param int $queueId eb_queue_pool.id
|
||||
* @param int $uid 用户 ID
|
||||
* @param float $amount 退款金额
|
||||
* @param int $batchNo 批次号
|
||||
* @return bool
|
||||
*/
|
||||
public function doJob(int $queueId, int $uid, float $amount, int $batchNo): bool
|
||||
{
|
||||
try {
|
||||
/** @var QueuePoolDao $queueDao */
|
||||
$queueDao = app()->make(QueuePoolDao::class);
|
||||
|
||||
// 二次检查:防止重复退款
|
||||
$record = $queueDao->get($queueId);
|
||||
if (!$record || (int)$record['status'] === 1) {
|
||||
Log::info("[QueueRefund] 记录 {$queueId} 已退款或不存在,跳过");
|
||||
return true;
|
||||
}
|
||||
|
||||
Db::transaction(function () use ($queueId, $uid, $amount, $batchNo, $queueDao) {
|
||||
// 1. 标记公排记录为已退款
|
||||
$queueDao->markRefunded($queueId, $batchNo);
|
||||
|
||||
// 2. 写入用户余额(使用 bcadd 避免浮点误差)
|
||||
/** @var UserDao $userDao */
|
||||
$userDao = app()->make(UserDao::class);
|
||||
$user = $userDao->get($uid);
|
||||
if (!$user) {
|
||||
throw new \RuntimeException("用户 {$uid} 不存在");
|
||||
}
|
||||
$newMoney = bcadd((string)$user['now_money'], (string)$amount, 2);
|
||||
$userDao->update($uid, ['now_money' => $newMoney], 'uid');
|
||||
|
||||
// 3. 写 user_bill 流水记录
|
||||
/** @var UserBillDao $billDao */
|
||||
$billDao = app()->make(UserBillDao::class);
|
||||
$billDao->save([
|
||||
'uid' => $uid,
|
||||
'link_id' => $queueId,
|
||||
'pm' => 1,
|
||||
'title' => '公排退款',
|
||||
'type' => 'queue_refund',
|
||||
'category' => 'now_money',
|
||||
'number' => $amount,
|
||||
'balance' => $newMoney,
|
||||
'mark' => "公排触发退款,批次#{$batchNo}",
|
||||
'status' => 1,
|
||||
'add_time' => time(),
|
||||
]);
|
||||
});
|
||||
|
||||
Log::info("[QueueRefund] 退款成功 uid={$uid} amount={$amount} batch={$batchNo}");
|
||||
} catch (\Throwable $e) {
|
||||
response_log_write([
|
||||
'message' => '公排退款失败: ' . $e->getMessage(),
|
||||
'file' => $e->getFile(),
|
||||
'line' => $e->getLine(),
|
||||
]);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user