fix(integral): 防止个人奖金重复生成积分
将个人奖金转积分流程改为先写唯一流水再加积分,并用 wa_selfbonus_logid 唯一索引兜底多入口并发场景;同时补充历史重复数据修复与索引落地 SQL 脚本。 Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -1,265 +1,22 @@
|
|||||||
package com.zbkj.service.service.impl;
|
package com.zbkj.service.service.impl;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
||||||
import com.zbkj.common.model.consignment.WaSelfbonusLog;
|
|
||||||
import com.zbkj.common.model.user.User;
|
|
||||||
import com.zbkj.common.model.user.UserIntegralRecord;
|
|
||||||
import com.zbkj.common.utils.CrmebUtil;
|
|
||||||
import com.zbkj.service.dao.UserDao;
|
|
||||||
import com.zbkj.service.dao.UserIntegralRecordDao;
|
|
||||||
import com.zbkj.service.dao.consignment.WaSelfbonusLogDao;
|
|
||||||
import com.zbkj.service.service.UserService;
|
|
||||||
import com.zbkj.service.service.WaSelfbonusSyncService;
|
import com.zbkj.service.service.WaSelfbonusSyncService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
|
||||||
import java.math.RoundingMode;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 用户积分并发安全服务实现类
|
* 保留并发安全服务 bean 名称兼容历史调用。
|
||||||
* 专门处理高并发场景下的积分更新,避免数据库锁等待超时
|
* 实际逻辑统一委托到 waSelfbonusSyncService,避免双实现规则漂移。
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
|
||||||
@Service("userIntegralConcurrencyService")
|
@Service("userIntegralConcurrencyService")
|
||||||
public class UserIntegralConcurrencyServiceImpl {
|
public class UserIntegralConcurrencyServiceImpl {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private WaSelfbonusLogDao waSelfbonusLogDao;
|
private WaSelfbonusSyncService waSelfbonusSyncService;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private UserIntegralRecordDao userIntegralRecordDao;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private UserDao userDao;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private UserService userService;
|
|
||||||
|
|
||||||
// 使用ConcurrentHashMap来缓存正在处理的用户ID,防止重复处理
|
|
||||||
private final ConcurrentHashMap<Integer, Object> processingUsers = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 同步个人奖金变动到用户积分 - 并发安全版本
|
|
||||||
* 根据个人奖金变动记录,为对应的用户增加积分(奖金金额的50%)
|
|
||||||
*/
|
|
||||||
@Transactional(rollbackFor = Exception.class)
|
|
||||||
public Map<String, Object> syncSelfbonusToIntegral() {
|
public Map<String, Object> syncSelfbonusToIntegral() {
|
||||||
log.info("开始同步个人奖金变动到用户积分(并发安全版)");
|
return waSelfbonusSyncService.syncSelfbonusToIntegral();
|
||||||
|
|
||||||
int successCount = 0;
|
|
||||||
int skipCount = 0;
|
|
||||||
int failCount = 0;
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 查询最新的个人奖金变动记录
|
|
||||||
LambdaQueryWrapper<WaSelfbonusLog> bonusLogWrapper = new LambdaQueryWrapper<>();
|
|
||||||
bonusLogWrapper.orderByDesc(WaSelfbonusLog::getCreatedAt); // 按创建时间倒序
|
|
||||||
bonusLogWrapper.last("LIMIT 500"); // 限制查询500条,避免一次性处理过多
|
|
||||||
List<WaSelfbonusLog> bonusLogList = waSelfbonusLogDao.selectList(bonusLogWrapper);
|
|
||||||
|
|
||||||
for (WaSelfbonusLog bonusLog : bonusLogList) {
|
|
||||||
try {
|
|
||||||
// 检查该奖金记录是否已经处理过
|
|
||||||
LambdaQueryWrapper<UserIntegralRecord> checkWrapper = new LambdaQueryWrapper<>();
|
|
||||||
checkWrapper.eq(UserIntegralRecord::getWaSelfbonusLogid, bonusLog.getId());
|
|
||||||
Integer existCount = userIntegralRecordDao.selectCount(checkWrapper);
|
|
||||||
|
|
||||||
if (existCount != null && existCount > 0) {
|
|
||||||
log.debug("奖金记录已处理,跳过: bonusLogId={}, userId={}", bonusLog.getId(), bonusLog.getUserId());
|
|
||||||
skipCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取用户ID
|
|
||||||
Integer ebUserId = bonusLog.getUserId();
|
|
||||||
|
|
||||||
// 使用同步块确保同一用户不会被重复处理
|
|
||||||
Object lock = processingUsers.computeIfAbsent(ebUserId, k -> new Object());
|
|
||||||
synchronized (lock) {
|
|
||||||
try {
|
|
||||||
// 再次检查积分记录是否已存在(双重检查)
|
|
||||||
existCount = userIntegralRecordDao.selectCount(checkWrapper);
|
|
||||||
if (existCount != null && existCount > 0) {
|
|
||||||
log.debug("奖金记录已在其他线程处理,跳过: bonusLogId={}", bonusLog.getId());
|
|
||||||
skipCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 查询用户信息
|
|
||||||
User user = userDao.selectById(ebUserId);
|
|
||||||
if (user == null) {
|
|
||||||
log.warn("未找到对应的系统用户,跳过: waUserId={}", bonusLog.getUserId());
|
|
||||||
skipCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 验证奖金类型和金额
|
|
||||||
if (!isValidBonusLog(bonusLog)) {
|
|
||||||
log.debug("奖金记录不符合处理条件,跳过: bonusLogId={}, type={}, amount={}",
|
|
||||||
bonusLog.getId(), bonusLog.getType(), bonusLog.getMoney());
|
|
||||||
skipCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 计算积分值
|
|
||||||
BigDecimal integralValue = calculateIntegralValue(bonusLog.getMoney());
|
|
||||||
if (integralValue.compareTo(BigDecimal.ZERO) <= 0) {
|
|
||||||
log.debug("计算出的积分为0或负数,跳过: bonusLogId={}, integralValue={}",
|
|
||||||
bonusLog.getId(), integralValue);
|
|
||||||
skipCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 使用CAS方式更新积分,避免锁竞争
|
|
||||||
Boolean updateResult = updateIntegralWithRetry(user.getUid(), integralValue, "add", 3);
|
|
||||||
|
|
||||||
if (!updateResult) {
|
|
||||||
log.error("更新用户积分失败(重试后): userId={}, integralValue={}", user.getUid(), integralValue);
|
|
||||||
failCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 插入积分记录
|
|
||||||
UserIntegralRecord integralRecord = createUserIntegralRecord(user.getUid(), bonusLog, integralValue);
|
|
||||||
int insertResult = userIntegralRecordDao.insert(integralRecord);
|
|
||||||
if (insertResult <= 0) {
|
|
||||||
log.error("插入积分记录失败: userId={}, bonusLogId={}", user.getUid(), bonusLog.getId());
|
|
||||||
failCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
successCount++;
|
|
||||||
log.info("成功同步奖金到积分: bonusLogId={}, userId={}, bonusAmount={}, integralValue={}",
|
|
||||||
bonusLog.getId(), user.getUid(), bonusLog.getMoney(), integralValue);
|
|
||||||
|
|
||||||
} finally {
|
|
||||||
// 清理锁对象
|
|
||||||
processingUsers.remove(ebUserId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
failCount++;
|
|
||||||
log.error("处理奖金记录失败: bonusLogId={}, error={}", bonusLog.getId(), e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, Object> result = new HashMap<>();
|
|
||||||
result.put("total", bonusLogList.size());
|
|
||||||
result.put("successCount", successCount);
|
|
||||||
result.put("skipCount", skipCount);
|
|
||||||
result.put("failCount", failCount);
|
|
||||||
|
|
||||||
log.info("同步个人奖金变动到用户积分完成(并发安全版): 总数={}, 成功={}, 跳过={}, 失败={}",
|
|
||||||
bonusLogList.size(), successCount, skipCount, failCount);
|
|
||||||
|
|
||||||
result.put("message", "同步完成");
|
|
||||||
return result;
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("同步个人奖金变动到用户积分异常", e);
|
|
||||||
Map<String, Object> result = new HashMap<>();
|
|
||||||
result.put("message", "同步失败: " + e.getMessage());
|
|
||||||
result.put("error", e.getMessage());
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 验证奖金记录是否符合处理条件
|
|
||||||
*/
|
|
||||||
private boolean isValidBonusLog(WaSelfbonusLog bonusLog) {
|
|
||||||
// 只处理收入类型(type=1)
|
|
||||||
if (bonusLog.getType() == null || bonusLog.getType() != 1) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 验证奖金金额有效
|
|
||||||
if (bonusLog.getMoney() == null || bonusLog.getMoney().compareTo(BigDecimal.ZERO) <= 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 计算积分值
|
|
||||||
*/
|
|
||||||
private BigDecimal calculateIntegralValue(BigDecimal bonusAmount) {
|
|
||||||
// 计算积分:奖金金额 * 50%,向下取整
|
|
||||||
BigDecimal integralDecimal = bonusAmount.multiply(new BigDecimal("0.5"));
|
|
||||||
return integralDecimal.setScale(3, RoundingMode.DOWN);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 创建积分记录对象
|
|
||||||
*/
|
|
||||||
private UserIntegralRecord createUserIntegralRecord(Integer userId, WaSelfbonusLog bonusLog, BigDecimal integralValue) {
|
|
||||||
User user = userDao.selectById(userId); // 重新查询用户获取最新积分
|
|
||||||
Integer newIntegral = user != null && user.getIntegral() != null ? user.getIntegral().intValue() : 0;
|
|
||||||
|
|
||||||
UserIntegralRecord integralRecord = new UserIntegralRecord();
|
|
||||||
integralRecord.setUid(userId);
|
|
||||||
integralRecord.setLinkId(String.valueOf(bonusLog.getId())); // 关联奖金记录ID
|
|
||||||
integralRecord.setLinkType("selfbonus"); // 关联类型:个人奖金
|
|
||||||
integralRecord.setType(1); // 类型:1-增加
|
|
||||||
integralRecord.setTitle("个人奖金奖励");
|
|
||||||
integralRecord.setIntegral(integralValue);
|
|
||||||
integralRecord.setBalance(newIntegral); // 实际上应该是更新后的积分,这里可能需要调整
|
|
||||||
integralRecord.setMark(String.format("个人奖金变动奖励,奖金金额:%.3f,积分:%d",
|
|
||||||
bonusLog.getMoney(), integralValue.intValue()));
|
|
||||||
integralRecord.setStatus(3); // 状态:3-完成
|
|
||||||
integralRecord.setWaSelfbonusLogid(bonusLog.getId()); // 关联个人奖金记录ID
|
|
||||||
integralRecord.setCreateTime(new Date());
|
|
||||||
integralRecord.setUpdateTime(new Date());
|
|
||||||
|
|
||||||
return integralRecord;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 带重试机制的积分更新
|
|
||||||
*/
|
|
||||||
private Boolean updateIntegralWithRetry(Integer uid, BigDecimal integral, String type, int maxRetries) {
|
|
||||||
int attempts = 0;
|
|
||||||
Exception lastException = null;
|
|
||||||
|
|
||||||
while (attempts < maxRetries) {
|
|
||||||
try {
|
|
||||||
attempts++;
|
|
||||||
|
|
||||||
// 直接更新积分,不再依赖乐观锁
|
|
||||||
Boolean result = userService.operationIntegral(uid, integral, BigDecimal.ZERO, type);
|
|
||||||
|
|
||||||
if (result) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
log.warn("积分更新失败,准备重试 (attempt {}/{})", attempts, maxRetries);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
lastException = e;
|
|
||||||
log.warn("积分更新异常,准备重试 (attempt {}/{}), error: {}", attempts, maxRetries, e.getMessage());
|
|
||||||
|
|
||||||
// 如果是数据库锁等待超时,等待一段时间再重试
|
|
||||||
if (e.getMessage() != null && e.getMessage().contains("Lock wait timeout")) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(100 * attempts); // 指数退避
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.error("积分更新达到最大重试次数仍然失败,最后一次异常: ", lastException);
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -11,8 +11,8 @@ import com.zbkj.service.service.UserService;
|
|||||||
import com.zbkj.service.service.WaSelfbonusSyncService;
|
import com.zbkj.service.service.WaSelfbonusSyncService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.dao.DuplicateKeyException;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
|
||||||
import org.springframework.transaction.support.TransactionTemplate;
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
@@ -44,12 +44,14 @@ public class WaSelfbonusServiceImpl implements WaSelfbonusSyncService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private UserService userService;
|
private UserService userService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TransactionTemplate transactionTemplate;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 同步个人奖金变动到用户积分
|
* 同步个人奖金变动到用户积分
|
||||||
* 根据个人奖金变动记录,为对应的用户增加积分(奖金金额的50%)
|
* 根据个人奖金变动记录,为对应的用户增加积分(奖金金额的50%)
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@Transactional(rollbackFor = Exception.class)
|
|
||||||
public Map<String, Object> syncSelfbonusToIntegral() {
|
public Map<String, Object> syncSelfbonusToIntegral() {
|
||||||
log.info("开始同步个人奖金变动到用户积分");
|
log.info("开始同步个人奖金变动到用户积分");
|
||||||
|
|
||||||
@@ -66,101 +68,16 @@ public class WaSelfbonusServiceImpl implements WaSelfbonusSyncService {
|
|||||||
|
|
||||||
for (WaSelfbonusLog bonusLog : bonusLogList) {
|
for (WaSelfbonusLog bonusLog : bonusLogList) {
|
||||||
try {
|
try {
|
||||||
// 检查该奖金记录是否已经处理过(通过 waSelfbonusLogid 字段查询积分记录)
|
ProcessStatus processStatus = transactionTemplate.execute(status -> processBonusLogAtomically(bonusLog));
|
||||||
LambdaQueryWrapper<UserIntegralRecord> checkWrapper = new LambdaQueryWrapper<>();
|
if (processStatus == ProcessStatus.SKIP) {
|
||||||
checkWrapper.eq(UserIntegralRecord::getWaSelfbonusLogid, bonusLog.getId());
|
|
||||||
Integer existCount = userIntegralRecordDao.selectCount(checkWrapper);
|
|
||||||
|
|
||||||
if (existCount != null && existCount > 0) {
|
|
||||||
// 已处理过,跳过
|
|
||||||
log.debug("奖金记录已处理,跳过: bonusLogId={}, userId={}", bonusLog.getId(), bonusLog.getUserId());
|
|
||||||
skipCount++;
|
skipCount++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (processStatus == ProcessStatus.SUCCESS) {
|
||||||
// 根据 wa_users 的 user_id 查找对应的 eb_user 表的 uid
|
successCount++;
|
||||||
// 注意:wa_users.id 对应 eb_user.uid(在同步时已建立关联)
|
|
||||||
Integer ebUserId = bonusLog.getUserId(); // wa_users.id 就是 eb_user.uid
|
|
||||||
|
|
||||||
// 查询 eb_user 表中的用户
|
|
||||||
User user = userDao.selectById(ebUserId);
|
|
||||||
if (user == null) {
|
|
||||||
log.warn("未找到对应的系统用户,跳过: waUserId={}", bonusLog.getUserId());
|
|
||||||
skipCount++;
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
failCount++;
|
||||||
// 计算积分值:个人奖金变更金额的50%(只处理收入类型的奖金变动)
|
|
||||||
if (bonusLog.getType() == null || bonusLog.getType() != 1) {
|
|
||||||
// 只处理收入类型(type=1),支出类型不处理
|
|
||||||
log.debug("跳过非收入类型的奖金变动: bonusLogId={}, type={}", bonusLog.getId(), bonusLog.getType());
|
|
||||||
skipCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 奖金金额(收入为正数)
|
|
||||||
BigDecimal bonusAmount = bonusLog.getMoney();
|
|
||||||
if (bonusAmount == null || bonusAmount.compareTo(BigDecimal.ZERO) <= 0) {
|
|
||||||
log.debug("奖金金额无效,跳过: bonusLogId={}, money={}", bonusLog.getId(), bonusAmount);
|
|
||||||
skipCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 计算积分:奖金金额 * 50%,向下取整
|
|
||||||
BigDecimal integralDecimal = bonusAmount.multiply(new BigDecimal("0.5"));
|
|
||||||
|
|
||||||
BigDecimal integralValue = integralDecimal.setScale(3, RoundingMode.DOWN);
|
|
||||||
|
|
||||||
if (integralValue.compareTo(BigDecimal.ZERO) <= 0) {
|
|
||||||
log.debug("计算出的积分为0,跳过: bonusLogId={}, integralValue={}", bonusLog.getId(), integralValue);
|
|
||||||
skipCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 更新用户积分 - 不再需要当前积分值作为乐观锁条件
|
|
||||||
Boolean updateResult = userService.operationIntegral(
|
|
||||||
user.getUid(),
|
|
||||||
integralValue,
|
|
||||||
BigDecimal.valueOf(0), // 不再使用当前积分作为乐观锁条件
|
|
||||||
"add"
|
|
||||||
);
|
|
||||||
|
|
||||||
if (!updateResult) {
|
|
||||||
log.error("更新用户积分失败: userId={}, integralValue={}", user.getUid(), integralValue);
|
|
||||||
failCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 重新查询用户获取最新积分
|
|
||||||
user = userDao.selectById(user.getUid());
|
|
||||||
Integer newIntegral = user.getIntegral() != null ? user.getIntegral().intValue() : 0;
|
|
||||||
|
|
||||||
// 新增积分记录
|
|
||||||
UserIntegralRecord integralRecord = new UserIntegralRecord();
|
|
||||||
integralRecord.setUid(user.getUid());
|
|
||||||
integralRecord.setLinkId(String.valueOf(bonusLog.getId())); // 关联奖金记录ID
|
|
||||||
integralRecord.setLinkType("selfbonus"); // 关联类型:个人奖金
|
|
||||||
integralRecord.setType(1); // 类型:1-增加
|
|
||||||
integralRecord.setTitle("个人奖金奖励");
|
|
||||||
integralRecord.setIntegral(integralValue);
|
|
||||||
integralRecord.setBalance(newIntegral);
|
|
||||||
integralRecord.setMark(String.format("个人奖金变动奖励,奖金金额:%.3f,积分:%d",
|
|
||||||
bonusAmount, integralValue.intValue()));
|
|
||||||
integralRecord.setStatus(3); // 状态:3-完成
|
|
||||||
integralRecord.setWaSelfbonusLogid(bonusLog.getId()); // 关联个人奖金记录ID
|
|
||||||
integralRecord.setCreateTime(new Date());
|
|
||||||
integralRecord.setUpdateTime(new Date());
|
|
||||||
|
|
||||||
int insertResult = userIntegralRecordDao.insert(integralRecord);
|
|
||||||
if (insertResult <= 0) {
|
|
||||||
log.error("插入积分记录失败: userId={}, bonusLogId={}", user.getUid(), bonusLog.getId());
|
|
||||||
failCount++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
successCount++;
|
|
||||||
log.info("成功同步奖金到积分: bonusLogId={}, userId={}, bonusAmount={}, integralValue={}",
|
|
||||||
bonusLog.getId(), user.getUid(), bonusAmount, integralValue);
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
failCount++;
|
failCount++;
|
||||||
@@ -190,45 +107,102 @@ public class WaSelfbonusServiceImpl implements WaSelfbonusSyncService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 带重试机制的用户积分更新
|
* 原子处理单条奖金日志:
|
||||||
|
* 1) 先插入积分流水(受唯一索引保护)
|
||||||
|
* 2) 插入成功后再更新用户总积分
|
||||||
|
* 3) 回填本条流水的 balance
|
||||||
*/
|
*/
|
||||||
private Boolean updateUserIntegralWithRetry(Integer uid, BigDecimal integralValue, int maxRetries) {
|
private ProcessStatus processBonusLogAtomically(WaSelfbonusLog bonusLog) {
|
||||||
int attempts = 0;
|
if (isAlreadyProcessed(bonusLog.getId())) {
|
||||||
Exception lastException = null;
|
log.debug("奖金记录已处理,跳过: bonusLogId={}, userId={}", bonusLog.getId(), bonusLog.getUserId());
|
||||||
|
return ProcessStatus.SKIP;
|
||||||
while (attempts < maxRetries) {
|
|
||||||
try {
|
|
||||||
attempts++;
|
|
||||||
|
|
||||||
Boolean result = userService.operationIntegral(
|
|
||||||
uid,
|
|
||||||
integralValue,
|
|
||||||
BigDecimal.ZERO, // 不再使用当前积分作为乐观锁条件
|
|
||||||
"add"
|
|
||||||
);
|
|
||||||
|
|
||||||
if (result) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
log.warn("积分更新失败,准备重试 (attempt {}/{})", attempts, maxRetries);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
lastException = e;
|
|
||||||
log.warn("积分更新异常,准备重试 (attempt {}/{}), error: {}", attempts, maxRetries, e.getMessage());
|
|
||||||
|
|
||||||
// 如果是数据库锁等待超时,等待一段时间再重试
|
|
||||||
if (e.getMessage() != null && e.getMessage().contains("Lock wait timeout")) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(100 * attempts); // 指数退避
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.error("积分更新达到最大重试次数仍然失败,最后一次异常: ", lastException);
|
Integer ebUserId = bonusLog.getUserId();
|
||||||
return false;
|
User user = userDao.selectById(ebUserId);
|
||||||
|
if (user == null) {
|
||||||
|
log.warn("未找到对应的系统用户,跳过: waUserId={}", bonusLog.getUserId());
|
||||||
|
return ProcessStatus.SKIP;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bonusLog.getType() == null || bonusLog.getType() != 1) {
|
||||||
|
log.debug("跳过非收入类型的奖金变动: bonusLogId={}, type={}", bonusLog.getId(), bonusLog.getType());
|
||||||
|
return ProcessStatus.SKIP;
|
||||||
|
}
|
||||||
|
|
||||||
|
BigDecimal bonusAmount = bonusLog.getMoney();
|
||||||
|
if (bonusAmount == null || bonusAmount.compareTo(BigDecimal.ZERO) <= 0) {
|
||||||
|
log.debug("奖金金额无效,跳过: bonusLogId={}, money={}", bonusLog.getId(), bonusAmount);
|
||||||
|
return ProcessStatus.SKIP;
|
||||||
|
}
|
||||||
|
|
||||||
|
BigDecimal integralValue = bonusAmount.multiply(new BigDecimal("0.5")).setScale(3, RoundingMode.DOWN);
|
||||||
|
if (integralValue.compareTo(BigDecimal.ZERO) <= 0) {
|
||||||
|
log.debug("计算出的积分为0,跳过: bonusLogId={}, integralValue={}", bonusLog.getId(), integralValue);
|
||||||
|
return ProcessStatus.SKIP;
|
||||||
|
}
|
||||||
|
|
||||||
|
UserIntegralRecord integralRecord = buildIntegralRecord(user.getUid(), bonusLog, bonusAmount, integralValue);
|
||||||
|
try {
|
||||||
|
int insertResult = userIntegralRecordDao.insert(integralRecord);
|
||||||
|
if (insertResult <= 0) {
|
||||||
|
throw new IllegalStateException("插入积分记录失败");
|
||||||
|
}
|
||||||
|
} catch (DuplicateKeyException duplicateKeyException) {
|
||||||
|
// 数据库唯一索引兜底,保证多入口并发只处理一次
|
||||||
|
log.info("奖金记录并发重复处理,已跳过: bonusLogId={}, userId={}", bonusLog.getId(), user.getUid());
|
||||||
|
return ProcessStatus.SKIP;
|
||||||
|
}
|
||||||
|
|
||||||
|
Boolean updateResult = userService.operationIntegral(
|
||||||
|
user.getUid(),
|
||||||
|
integralValue,
|
||||||
|
BigDecimal.ZERO,
|
||||||
|
"add"
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!updateResult) {
|
||||||
|
throw new IllegalStateException(String.format("更新用户积分失败: userId=%s, integralValue=%s", user.getUid(), integralValue));
|
||||||
|
}
|
||||||
|
|
||||||
|
User latestUser = userDao.selectById(user.getUid());
|
||||||
|
Integer latestIntegral = latestUser != null && latestUser.getIntegral() != null ? latestUser.getIntegral().intValue() : 0;
|
||||||
|
integralRecord.setBalance(latestIntegral);
|
||||||
|
integralRecord.setUpdateTime(new Date());
|
||||||
|
userIntegralRecordDao.updateById(integralRecord);
|
||||||
|
|
||||||
|
log.info("成功同步奖金到积分: bonusLogId={}, userId={}, bonusAmount={}, integralValue={}",
|
||||||
|
bonusLog.getId(), user.getUid(), bonusAmount, integralValue);
|
||||||
|
return ProcessStatus.SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isAlreadyProcessed(Integer waSelfbonusLogId) {
|
||||||
|
LambdaQueryWrapper<UserIntegralRecord> checkWrapper = new LambdaQueryWrapper<>();
|
||||||
|
checkWrapper.eq(UserIntegralRecord::getWaSelfbonusLogid, waSelfbonusLogId);
|
||||||
|
Integer existCount = userIntegralRecordDao.selectCount(checkWrapper);
|
||||||
|
return existCount != null && existCount > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private UserIntegralRecord buildIntegralRecord(Integer uid, WaSelfbonusLog bonusLog, BigDecimal bonusAmount, BigDecimal integralValue) {
|
||||||
|
UserIntegralRecord integralRecord = new UserIntegralRecord();
|
||||||
|
integralRecord.setUid(uid);
|
||||||
|
integralRecord.setLinkId(String.valueOf(bonusLog.getId()));
|
||||||
|
integralRecord.setLinkType("selfbonus");
|
||||||
|
integralRecord.setType(1);
|
||||||
|
integralRecord.setTitle("个人奖金奖励");
|
||||||
|
integralRecord.setIntegral(integralValue);
|
||||||
|
integralRecord.setBalance(0);
|
||||||
|
integralRecord.setMark(String.format("个人奖金变动奖励,奖金金额:%.3f,积分:%.3f",
|
||||||
|
bonusAmount, integralValue));
|
||||||
|
integralRecord.setStatus(3);
|
||||||
|
integralRecord.setWaSelfbonusLogid(bonusLog.getId());
|
||||||
|
integralRecord.setCreateTime(new Date());
|
||||||
|
integralRecord.setUpdateTime(new Date());
|
||||||
|
return integralRecord;
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum ProcessStatus {
|
||||||
|
SUCCESS,
|
||||||
|
SKIP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
23
backend/sql/add_unique_index_uk_integral_selfbonus_log.sql
Normal file
23
backend/sql/add_unique_index_uk_integral_selfbonus_log.sql
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
-- Add strong idempotency guard for selfbonus -> integral conversion.
|
||||||
|
-- This index guarantees one wa_selfbonus_log can map to at most one integral record.
|
||||||
|
-- Prerequisite: clear duplicate wa_selfbonus_logid rows first.
|
||||||
|
|
||||||
|
-- Pre-checks
|
||||||
|
SELECT wa_selfbonus_logid, COUNT(*) AS cnt
|
||||||
|
FROM eb_user_integral_record
|
||||||
|
WHERE wa_selfbonus_logid IS NOT NULL
|
||||||
|
GROUP BY wa_selfbonus_logid
|
||||||
|
HAVING COUNT(*) > 1
|
||||||
|
LIMIT 20;
|
||||||
|
|
||||||
|
SELECT COUNT(*) AS zero_cnt
|
||||||
|
FROM eb_user_integral_record
|
||||||
|
WHERE wa_selfbonus_logid = 0;
|
||||||
|
|
||||||
|
-- Apply unique index
|
||||||
|
ALTER TABLE eb_user_integral_record
|
||||||
|
ADD UNIQUE KEY uk_integral_selfbonus_log (wa_selfbonus_logid);
|
||||||
|
|
||||||
|
-- Verify index exists
|
||||||
|
SHOW INDEX FROM eb_user_integral_record
|
||||||
|
WHERE Key_name = 'uk_integral_selfbonus_log';
|
||||||
130
backend/sql/fix_duplicate_selfbonus_integral_records.sql
Normal file
130
backend/sql/fix_duplicate_selfbonus_integral_records.sql
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
-- Purpose:
|
||||||
|
-- 1) Find duplicate selfbonus integral rows generated from same wa_selfbonus_log
|
||||||
|
-- 2) Backup affected data
|
||||||
|
-- 3) Keep min(id), delete duplicate rows
|
||||||
|
-- 4) Resync eb_user.integral from remaining ledger rows
|
||||||
|
-- 5) Rebuild integer balance snapshot for affected users
|
||||||
|
--
|
||||||
|
-- Notes:
|
||||||
|
-- - Designed for MySQL 5.7
|
||||||
|
-- - Run during low traffic window
|
||||||
|
-- - Review backup table names before execution
|
||||||
|
|
||||||
|
-- 0) Preview duplicate groups
|
||||||
|
SELECT uid,
|
||||||
|
wa_selfbonus_logid,
|
||||||
|
link_id,
|
||||||
|
COUNT(*) AS cnt,
|
||||||
|
SUM(integral) AS total_integral,
|
||||||
|
MIN(id) AS keep_id,
|
||||||
|
GROUP_CONCAT(id ORDER BY id) AS record_ids
|
||||||
|
FROM eb_user_integral_record
|
||||||
|
WHERE link_type = 'selfbonus'
|
||||||
|
AND type = 1
|
||||||
|
AND wa_selfbonus_logid IS NOT NULL
|
||||||
|
GROUP BY uid, wa_selfbonus_logid, link_id
|
||||||
|
HAVING COUNT(*) > 1;
|
||||||
|
|
||||||
|
-- 1) Backup duplicate rows and affected users
|
||||||
|
DROP TABLE IF EXISTS backup_euir_selfbonus_dups_20260511_0959;
|
||||||
|
CREATE TABLE backup_euir_selfbonus_dups_20260511_0959 AS
|
||||||
|
SELECT e.*
|
||||||
|
FROM eb_user_integral_record e
|
||||||
|
JOIN (
|
||||||
|
SELECT uid, wa_selfbonus_logid, link_id, MIN(id) AS keep_id, COUNT(*) AS cnt
|
||||||
|
FROM eb_user_integral_record
|
||||||
|
WHERE link_type = 'selfbonus'
|
||||||
|
AND type = 1
|
||||||
|
AND wa_selfbonus_logid IS NOT NULL
|
||||||
|
GROUP BY uid, wa_selfbonus_logid, link_id
|
||||||
|
HAVING COUNT(*) > 1
|
||||||
|
) d
|
||||||
|
ON d.uid = e.uid
|
||||||
|
AND d.wa_selfbonus_logid = e.wa_selfbonus_logid
|
||||||
|
AND d.link_id = e.link_id;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS backup_eb_user_integral_before_fix_20260511_0959;
|
||||||
|
CREATE TABLE backup_eb_user_integral_before_fix_20260511_0959 AS
|
||||||
|
SELECT u.*
|
||||||
|
FROM eb_user u
|
||||||
|
WHERE u.uid IN (
|
||||||
|
SELECT DISTINCT uid FROM backup_euir_selfbonus_dups_20260511_0959
|
||||||
|
);
|
||||||
|
|
||||||
|
-- 2) Deduplicate + resync in one transaction
|
||||||
|
START TRANSACTION;
|
||||||
|
|
||||||
|
DROP TEMPORARY TABLE IF EXISTS tmp_dup_groups;
|
||||||
|
CREATE TEMPORARY TABLE tmp_dup_groups AS
|
||||||
|
SELECT uid, wa_selfbonus_logid, link_id, MIN(id) AS keep_id
|
||||||
|
FROM eb_user_integral_record
|
||||||
|
WHERE link_type = 'selfbonus'
|
||||||
|
AND type = 1
|
||||||
|
AND wa_selfbonus_logid IS NOT NULL
|
||||||
|
GROUP BY uid, wa_selfbonus_logid, link_id
|
||||||
|
HAVING COUNT(*) > 1;
|
||||||
|
|
||||||
|
DROP TEMPORARY TABLE IF EXISTS tmp_affected_uids;
|
||||||
|
CREATE TEMPORARY TABLE tmp_affected_uids AS
|
||||||
|
SELECT DISTINCT uid FROM tmp_dup_groups;
|
||||||
|
|
||||||
|
DELETE e
|
||||||
|
FROM eb_user_integral_record e
|
||||||
|
JOIN tmp_dup_groups d
|
||||||
|
ON d.uid = e.uid
|
||||||
|
AND d.wa_selfbonus_logid = e.wa_selfbonus_logid
|
||||||
|
AND d.link_id = e.link_id
|
||||||
|
WHERE e.id <> d.keep_id;
|
||||||
|
|
||||||
|
UPDATE eb_user u
|
||||||
|
JOIN (
|
||||||
|
SELECT r.uid, COALESCE(SUM(r.integral), 0) AS sum_integral
|
||||||
|
FROM eb_user_integral_record r
|
||||||
|
JOIN tmp_affected_uids t ON t.uid = r.uid
|
||||||
|
GROUP BY r.uid
|
||||||
|
) s ON s.uid = u.uid
|
||||||
|
SET u.integral = s.sum_integral;
|
||||||
|
|
||||||
|
SET @run_uid := 0;
|
||||||
|
SET @run_bal := 0;
|
||||||
|
UPDATE eb_user_integral_record e
|
||||||
|
JOIN (
|
||||||
|
SELECT t.id, FLOOR(t.running) AS new_balance
|
||||||
|
FROM (
|
||||||
|
SELECT s.id,
|
||||||
|
s.uid,
|
||||||
|
(@run_bal := IF(@run_uid = s.uid, @run_bal + s.integral, s.integral)) AS running,
|
||||||
|
(@run_uid := s.uid) AS uid_guard
|
||||||
|
FROM (
|
||||||
|
SELECT id, uid, integral
|
||||||
|
FROM eb_user_integral_record
|
||||||
|
WHERE uid IN (SELECT uid FROM tmp_affected_uids)
|
||||||
|
ORDER BY uid, id
|
||||||
|
) s
|
||||||
|
) t
|
||||||
|
) x ON x.id = e.id
|
||||||
|
SET e.balance = x.new_balance;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- 3) Post-checks
|
||||||
|
SELECT COUNT(*) AS remaining_dup_groups
|
||||||
|
FROM (
|
||||||
|
SELECT 1
|
||||||
|
FROM eb_user_integral_record
|
||||||
|
WHERE link_type = 'selfbonus'
|
||||||
|
AND type = 1
|
||||||
|
AND wa_selfbonus_logid IS NOT NULL
|
||||||
|
GROUP BY uid, wa_selfbonus_logid, link_id
|
||||||
|
HAVING COUNT(*) > 1
|
||||||
|
) a;
|
||||||
|
|
||||||
|
SELECT u.uid, u.integral, s.sum_integral
|
||||||
|
FROM eb_user u
|
||||||
|
JOIN (
|
||||||
|
SELECT uid, SUM(integral) AS sum_integral
|
||||||
|
FROM eb_user_integral_record
|
||||||
|
GROUP BY uid
|
||||||
|
) s ON s.uid = u.uid
|
||||||
|
WHERE u.uid IN (SELECT DISTINCT uid FROM backup_euir_selfbonus_dups_20260511_0959)
|
||||||
|
ORDER BY u.uid;
|
||||||
Reference in New Issue
Block a user