fix(integral): 防止个人奖金重复生成积分

将个人奖金转积分流程改为先写唯一流水再加积分,并用 wa_selfbonus_logid 唯一索引兜底多入口并发场景;同时补充历史重复数据修复与索引落地 SQL 脚本。

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
danaisuiyuan
2026-05-11 10:04:07 +08:00
parent f43950eabf
commit 693c66c258
4 changed files with 262 additions and 378 deletions

View File

@@ -1,265 +1,22 @@
package com.zbkj.service.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zbkj.common.model.consignment.WaSelfbonusLog;
import com.zbkj.common.model.user.User;
import com.zbkj.common.model.user.UserIntegralRecord;
import com.zbkj.common.utils.CrmebUtil;
import com.zbkj.service.dao.UserDao;
import com.zbkj.service.dao.UserIntegralRecordDao;
import com.zbkj.service.dao.consignment.WaSelfbonusLogDao;
import com.zbkj.service.service.UserService;
import com.zbkj.service.service.WaSelfbonusSyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 用户积分并发安全服务实现类
* 专门处理高并发场景下的积分更新,避免数据库锁等待超时
* 保留并发安全服务 bean 名称兼容历史调用。
* 实际逻辑统一委托到 waSelfbonusSyncService避免双实现规则漂移。
*/
@Slf4j
@Service("userIntegralConcurrencyService")
public class UserIntegralConcurrencyServiceImpl {
@Autowired
private WaSelfbonusLogDao waSelfbonusLogDao;
private WaSelfbonusSyncService waSelfbonusSyncService;
@Autowired
private UserIntegralRecordDao userIntegralRecordDao;
@Autowired
private UserDao userDao;
@Autowired
private UserService userService;
// 使用ConcurrentHashMap来缓存正在处理的用户ID防止重复处理
private final ConcurrentHashMap<Integer, Object> processingUsers = new ConcurrentHashMap<>();
/**
* 同步个人奖金变动到用户积分 - 并发安全版本
* 根据个人奖金变动记录为对应的用户增加积分奖金金额的50%
*/
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> syncSelfbonusToIntegral() {
log.info("开始同步个人奖金变动到用户积分(并发安全版)");
int successCount = 0;
int skipCount = 0;
int failCount = 0;
try {
// 查询最新的个人奖金变动记录
LambdaQueryWrapper<WaSelfbonusLog> bonusLogWrapper = new LambdaQueryWrapper<>();
bonusLogWrapper.orderByDesc(WaSelfbonusLog::getCreatedAt); // 按创建时间倒序
bonusLogWrapper.last("LIMIT 500"); // 限制查询500条避免一次性处理过多
List<WaSelfbonusLog> bonusLogList = waSelfbonusLogDao.selectList(bonusLogWrapper);
for (WaSelfbonusLog bonusLog : bonusLogList) {
try {
// 检查该奖金记录是否已经处理过
LambdaQueryWrapper<UserIntegralRecord> checkWrapper = new LambdaQueryWrapper<>();
checkWrapper.eq(UserIntegralRecord::getWaSelfbonusLogid, bonusLog.getId());
Integer existCount = userIntegralRecordDao.selectCount(checkWrapper);
if (existCount != null && existCount > 0) {
log.debug("奖金记录已处理,跳过: bonusLogId={}, userId={}", bonusLog.getId(), bonusLog.getUserId());
skipCount++;
continue;
}
// 获取用户ID
Integer ebUserId = bonusLog.getUserId();
// 使用同步块确保同一用户不会被重复处理
Object lock = processingUsers.computeIfAbsent(ebUserId, k -> new Object());
synchronized (lock) {
try {
// 再次检查积分记录是否已存在(双重检查)
existCount = userIntegralRecordDao.selectCount(checkWrapper);
if (existCount != null && existCount > 0) {
log.debug("奖金记录已在其他线程处理,跳过: bonusLogId={}", bonusLog.getId());
skipCount++;
continue;
}
// 查询用户信息
User user = userDao.selectById(ebUserId);
if (user == null) {
log.warn("未找到对应的系统用户,跳过: waUserId={}", bonusLog.getUserId());
skipCount++;
continue;
}
// 验证奖金类型和金额
if (!isValidBonusLog(bonusLog)) {
log.debug("奖金记录不符合处理条件,跳过: bonusLogId={}, type={}, amount={}",
bonusLog.getId(), bonusLog.getType(), bonusLog.getMoney());
skipCount++;
continue;
}
// 计算积分值
BigDecimal integralValue = calculateIntegralValue(bonusLog.getMoney());
if (integralValue.compareTo(BigDecimal.ZERO) <= 0) {
log.debug("计算出的积分为0或负数跳过: bonusLogId={}, integralValue={}",
bonusLog.getId(), integralValue);
skipCount++;
continue;
}
// 使用CAS方式更新积分避免锁竞争
Boolean updateResult = updateIntegralWithRetry(user.getUid(), integralValue, "add", 3);
if (!updateResult) {
log.error("更新用户积分失败(重试后): userId={}, integralValue={}", user.getUid(), integralValue);
failCount++;
continue;
}
// 插入积分记录
UserIntegralRecord integralRecord = createUserIntegralRecord(user.getUid(), bonusLog, integralValue);
int insertResult = userIntegralRecordDao.insert(integralRecord);
if (insertResult <= 0) {
log.error("插入积分记录失败: userId={}, bonusLogId={}", user.getUid(), bonusLog.getId());
failCount++;
continue;
}
successCount++;
log.info("成功同步奖金到积分: bonusLogId={}, userId={}, bonusAmount={}, integralValue={}",
bonusLog.getId(), user.getUid(), bonusLog.getMoney(), integralValue);
} finally {
// 清理锁对象
processingUsers.remove(ebUserId);
}
}
} catch (Exception e) {
failCount++;
log.error("处理奖金记录失败: bonusLogId={}, error={}", bonusLog.getId(), e.getMessage(), e);
}
}
Map<String, Object> result = new HashMap<>();
result.put("total", bonusLogList.size());
result.put("successCount", successCount);
result.put("skipCount", skipCount);
result.put("failCount", failCount);
log.info("同步个人奖金变动到用户积分完成(并发安全版): 总数={}, 成功={}, 跳过={}, 失败={}",
bonusLogList.size(), successCount, skipCount, failCount);
result.put("message", "同步完成");
return result;
} catch (Exception e) {
log.error("同步个人奖金变动到用户积分异常", e);
Map<String, Object> result = new HashMap<>();
result.put("message", "同步失败: " + e.getMessage());
result.put("error", e.getMessage());
return result;
}
}
/**
* 验证奖金记录是否符合处理条件
*/
private boolean isValidBonusLog(WaSelfbonusLog bonusLog) {
// 只处理收入类型type=1
if (bonusLog.getType() == null || bonusLog.getType() != 1) {
return false;
}
// 验证奖金金额有效
if (bonusLog.getMoney() == null || bonusLog.getMoney().compareTo(BigDecimal.ZERO) <= 0) {
return false;
}
return true;
}
/**
* 计算积分值
*/
private BigDecimal calculateIntegralValue(BigDecimal bonusAmount) {
// 计算积分:奖金金额 * 50%,向下取整
BigDecimal integralDecimal = bonusAmount.multiply(new BigDecimal("0.5"));
return integralDecimal.setScale(3, RoundingMode.DOWN);
}
/**
* 创建积分记录对象
*/
private UserIntegralRecord createUserIntegralRecord(Integer userId, WaSelfbonusLog bonusLog, BigDecimal integralValue) {
User user = userDao.selectById(userId); // 重新查询用户获取最新积分
Integer newIntegral = user != null && user.getIntegral() != null ? user.getIntegral().intValue() : 0;
UserIntegralRecord integralRecord = new UserIntegralRecord();
integralRecord.setUid(userId);
integralRecord.setLinkId(String.valueOf(bonusLog.getId())); // 关联奖金记录ID
integralRecord.setLinkType("selfbonus"); // 关联类型:个人奖金
integralRecord.setType(1); // 类型1-增加
integralRecord.setTitle("个人奖金奖励");
integralRecord.setIntegral(integralValue);
integralRecord.setBalance(newIntegral); // 实际上应该是更新后的积分,这里可能需要调整
integralRecord.setMark(String.format("个人奖金变动奖励,奖金金额:%.3f,积分:%d",
bonusLog.getMoney(), integralValue.intValue()));
integralRecord.setStatus(3); // 状态3-完成
integralRecord.setWaSelfbonusLogid(bonusLog.getId()); // 关联个人奖金记录ID
integralRecord.setCreateTime(new Date());
integralRecord.setUpdateTime(new Date());
return integralRecord;
}
/**
* 带重试机制的积分更新
*/
private Boolean updateIntegralWithRetry(Integer uid, BigDecimal integral, String type, int maxRetries) {
int attempts = 0;
Exception lastException = null;
while (attempts < maxRetries) {
try {
attempts++;
// 直接更新积分,不再依赖乐观锁
Boolean result = userService.operationIntegral(uid, integral, BigDecimal.ZERO, type);
if (result) {
return true;
} else {
log.warn("积分更新失败,准备重试 (attempt {}/{})", attempts, maxRetries);
}
} catch (Exception e) {
lastException = e;
log.warn("积分更新异常,准备重试 (attempt {}/{}), error: {}", attempts, maxRetries, e.getMessage());
// 如果是数据库锁等待超时,等待一段时间再重试
if (e.getMessage() != null && e.getMessage().contains("Lock wait timeout")) {
try {
Thread.sleep(100 * attempts); // 指数退避
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
log.error("积分更新达到最大重试次数仍然失败,最后一次异常: ", lastException);
return false;
return waSelfbonusSyncService.syncSelfbonusToIntegral();
}
}

View File

@@ -11,8 +11,8 @@ import com.zbkj.service.service.UserService;
import com.zbkj.service.service.WaSelfbonusSyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionTemplate;
import java.math.BigDecimal;
@@ -44,12 +44,14 @@ public class WaSelfbonusServiceImpl implements WaSelfbonusSyncService {
@Autowired
private UserService userService;
@Autowired
private TransactionTemplate transactionTemplate;
/**
* 同步个人奖金变动到用户积分
* 根据个人奖金变动记录为对应的用户增加积分奖金金额的50%
*/
@Override
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> syncSelfbonusToIntegral() {
log.info("开始同步个人奖金变动到用户积分");
@@ -66,101 +68,16 @@ public class WaSelfbonusServiceImpl implements WaSelfbonusSyncService {
for (WaSelfbonusLog bonusLog : bonusLogList) {
try {
// 检查该奖金记录是否已经处理过(通过 waSelfbonusLogid 字段查询积分记录)
LambdaQueryWrapper<UserIntegralRecord> checkWrapper = new LambdaQueryWrapper<>();
checkWrapper.eq(UserIntegralRecord::getWaSelfbonusLogid, bonusLog.getId());
Integer existCount = userIntegralRecordDao.selectCount(checkWrapper);
if (existCount != null && existCount > 0) {
// 已处理过,跳过
log.debug("奖金记录已处理,跳过: bonusLogId={}, userId={}", bonusLog.getId(), bonusLog.getUserId());
ProcessStatus processStatus = transactionTemplate.execute(status -> processBonusLogAtomically(bonusLog));
if (processStatus == ProcessStatus.SKIP) {
skipCount++;
continue;
}
// 根据 wa_users 的 user_id 查找对应的 eb_user 表的 uid
// 注意wa_users.id 对应 eb_user.uid在同步时已建立关联
Integer ebUserId = bonusLog.getUserId(); // wa_users.id 就是 eb_user.uid
// 查询 eb_user 表中的用户
User user = userDao.selectById(ebUserId);
if (user == null) {
log.warn("未找到对应的系统用户,跳过: waUserId={}", bonusLog.getUserId());
skipCount++;
if (processStatus == ProcessStatus.SUCCESS) {
successCount++;
continue;
}
// 计算积分值个人奖金变更金额的50%(只处理收入类型的奖金变动)
if (bonusLog.getType() == null || bonusLog.getType() != 1) {
// 只处理收入类型type=1支出类型不处理
log.debug("跳过非收入类型的奖金变动: bonusLogId={}, type={}", bonusLog.getId(), bonusLog.getType());
skipCount++;
continue;
}
// 奖金金额(收入为正数)
BigDecimal bonusAmount = bonusLog.getMoney();
if (bonusAmount == null || bonusAmount.compareTo(BigDecimal.ZERO) <= 0) {
log.debug("奖金金额无效,跳过: bonusLogId={}, money={}", bonusLog.getId(), bonusAmount);
skipCount++;
continue;
}
// 计算积分:奖金金额 * 50%,向下取整
BigDecimal integralDecimal = bonusAmount.multiply(new BigDecimal("0.5"));
BigDecimal integralValue = integralDecimal.setScale(3, RoundingMode.DOWN);
if (integralValue.compareTo(BigDecimal.ZERO) <= 0) {
log.debug("计算出的积分为0跳过: bonusLogId={}, integralValue={}", bonusLog.getId(), integralValue);
skipCount++;
continue;
}
// 更新用户积分 - 不再需要当前积分值作为乐观锁条件
Boolean updateResult = userService.operationIntegral(
user.getUid(),
integralValue,
BigDecimal.valueOf(0), // 不再使用当前积分作为乐观锁条件
"add"
);
if (!updateResult) {
log.error("更新用户积分失败: userId={}, integralValue={}", user.getUid(), integralValue);
failCount++;
continue;
}
// 重新查询用户获取最新积分
user = userDao.selectById(user.getUid());
Integer newIntegral = user.getIntegral() != null ? user.getIntegral().intValue() : 0;
// 新增积分记录
UserIntegralRecord integralRecord = new UserIntegralRecord();
integralRecord.setUid(user.getUid());
integralRecord.setLinkId(String.valueOf(bonusLog.getId())); // 关联奖金记录ID
integralRecord.setLinkType("selfbonus"); // 关联类型:个人奖金
integralRecord.setType(1); // 类型1-增加
integralRecord.setTitle("个人奖金奖励");
integralRecord.setIntegral(integralValue);
integralRecord.setBalance(newIntegral);
integralRecord.setMark(String.format("个人奖金变动奖励,奖金金额:%.3f,积分:%d",
bonusAmount, integralValue.intValue()));
integralRecord.setStatus(3); // 状态3-完成
integralRecord.setWaSelfbonusLogid(bonusLog.getId()); // 关联个人奖金记录ID
integralRecord.setCreateTime(new Date());
integralRecord.setUpdateTime(new Date());
int insertResult = userIntegralRecordDao.insert(integralRecord);
if (insertResult <= 0) {
log.error("插入积分记录失败: userId={}, bonusLogId={}", user.getUid(), bonusLog.getId());
failCount++;
continue;
}
successCount++;
log.info("成功同步奖金到积分: bonusLogId={}, userId={}, bonusAmount={}, integralValue={}",
bonusLog.getId(), user.getUid(), bonusAmount, integralValue);
failCount++;
} catch (Exception e) {
failCount++;
@@ -188,47 +105,104 @@ public class WaSelfbonusServiceImpl implements WaSelfbonusSyncService {
return result;
}
}
/**
* 带重试机制的用户积分更新
* 原子处理单条奖金日志:
* 1) 先插入积分流水(受唯一索引保护)
* 2) 插入成功后再更新用户总积分
* 3) 回填本条流水的 balance
*/
private Boolean updateUserIntegralWithRetry(Integer uid, BigDecimal integralValue, int maxRetries) {
int attempts = 0;
Exception lastException = null;
while (attempts < maxRetries) {
try {
attempts++;
Boolean result = userService.operationIntegral(
uid,
integralValue,
BigDecimal.ZERO, // 不再使用当前积分作为乐观锁条件
"add"
);
if (result) {
return true;
} else {
log.warn("积分更新失败,准备重试 (attempt {}/{})", attempts, maxRetries);
}
} catch (Exception e) {
lastException = e;
log.warn("积分更新异常,准备重试 (attempt {}/{}), error: {}", attempts, maxRetries, e.getMessage());
// 如果是数据库锁等待超时,等待一段时间再重试
if (e.getMessage() != null && e.getMessage().contains("Lock wait timeout")) {
try {
Thread.sleep(100 * attempts); // 指数退避
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
private ProcessStatus processBonusLogAtomically(WaSelfbonusLog bonusLog) {
if (isAlreadyProcessed(bonusLog.getId())) {
log.debug("奖金记录已处理,跳过: bonusLogId={}, userId={}", bonusLog.getId(), bonusLog.getUserId());
return ProcessStatus.SKIP;
}
log.error("积分更新达到最大重试次数仍然失败,最后一次异常: ", lastException);
return false;
Integer ebUserId = bonusLog.getUserId();
User user = userDao.selectById(ebUserId);
if (user == null) {
log.warn("未找到对应的系统用户,跳过: waUserId={}", bonusLog.getUserId());
return ProcessStatus.SKIP;
}
if (bonusLog.getType() == null || bonusLog.getType() != 1) {
log.debug("跳过非收入类型的奖金变动: bonusLogId={}, type={}", bonusLog.getId(), bonusLog.getType());
return ProcessStatus.SKIP;
}
BigDecimal bonusAmount = bonusLog.getMoney();
if (bonusAmount == null || bonusAmount.compareTo(BigDecimal.ZERO) <= 0) {
log.debug("奖金金额无效,跳过: bonusLogId={}, money={}", bonusLog.getId(), bonusAmount);
return ProcessStatus.SKIP;
}
BigDecimal integralValue = bonusAmount.multiply(new BigDecimal("0.5")).setScale(3, RoundingMode.DOWN);
if (integralValue.compareTo(BigDecimal.ZERO) <= 0) {
log.debug("计算出的积分为0跳过: bonusLogId={}, integralValue={}", bonusLog.getId(), integralValue);
return ProcessStatus.SKIP;
}
UserIntegralRecord integralRecord = buildIntegralRecord(user.getUid(), bonusLog, bonusAmount, integralValue);
try {
int insertResult = userIntegralRecordDao.insert(integralRecord);
if (insertResult <= 0) {
throw new IllegalStateException("插入积分记录失败");
}
} catch (DuplicateKeyException duplicateKeyException) {
// 数据库唯一索引兜底,保证多入口并发只处理一次
log.info("奖金记录并发重复处理,已跳过: bonusLogId={}, userId={}", bonusLog.getId(), user.getUid());
return ProcessStatus.SKIP;
}
Boolean updateResult = userService.operationIntegral(
user.getUid(),
integralValue,
BigDecimal.ZERO,
"add"
);
if (!updateResult) {
throw new IllegalStateException(String.format("更新用户积分失败: userId=%s, integralValue=%s", user.getUid(), integralValue));
}
User latestUser = userDao.selectById(user.getUid());
Integer latestIntegral = latestUser != null && latestUser.getIntegral() != null ? latestUser.getIntegral().intValue() : 0;
integralRecord.setBalance(latestIntegral);
integralRecord.setUpdateTime(new Date());
userIntegralRecordDao.updateById(integralRecord);
log.info("成功同步奖金到积分: bonusLogId={}, userId={}, bonusAmount={}, integralValue={}",
bonusLog.getId(), user.getUid(), bonusAmount, integralValue);
return ProcessStatus.SUCCESS;
}
private boolean isAlreadyProcessed(Integer waSelfbonusLogId) {
LambdaQueryWrapper<UserIntegralRecord> checkWrapper = new LambdaQueryWrapper<>();
checkWrapper.eq(UserIntegralRecord::getWaSelfbonusLogid, waSelfbonusLogId);
Integer existCount = userIntegralRecordDao.selectCount(checkWrapper);
return existCount != null && existCount > 0;
}
private UserIntegralRecord buildIntegralRecord(Integer uid, WaSelfbonusLog bonusLog, BigDecimal bonusAmount, BigDecimal integralValue) {
UserIntegralRecord integralRecord = new UserIntegralRecord();
integralRecord.setUid(uid);
integralRecord.setLinkId(String.valueOf(bonusLog.getId()));
integralRecord.setLinkType("selfbonus");
integralRecord.setType(1);
integralRecord.setTitle("个人奖金奖励");
integralRecord.setIntegral(integralValue);
integralRecord.setBalance(0);
integralRecord.setMark(String.format("个人奖金变动奖励,奖金金额:%.3f,积分:%.3f",
bonusAmount, integralValue));
integralRecord.setStatus(3);
integralRecord.setWaSelfbonusLogid(bonusLog.getId());
integralRecord.setCreateTime(new Date());
integralRecord.setUpdateTime(new Date());
return integralRecord;
}
private enum ProcessStatus {
SUCCESS,
SKIP
}
}

View File

@@ -0,0 +1,23 @@
-- Add strong idempotency guard for selfbonus -> integral conversion.
-- This index guarantees one wa_selfbonus_log can map to at most one integral record.
-- Prerequisite: clear duplicate wa_selfbonus_logid rows first.
-- Pre-checks
SELECT wa_selfbonus_logid, COUNT(*) AS cnt
FROM eb_user_integral_record
WHERE wa_selfbonus_logid IS NOT NULL
GROUP BY wa_selfbonus_logid
HAVING COUNT(*) > 1
LIMIT 20;
SELECT COUNT(*) AS zero_cnt
FROM eb_user_integral_record
WHERE wa_selfbonus_logid = 0;
-- Apply unique index
ALTER TABLE eb_user_integral_record
ADD UNIQUE KEY uk_integral_selfbonus_log (wa_selfbonus_logid);
-- Verify index exists
SHOW INDEX FROM eb_user_integral_record
WHERE Key_name = 'uk_integral_selfbonus_log';

View File

@@ -0,0 +1,130 @@
-- Purpose:
-- 1) Find duplicate selfbonus integral rows generated from same wa_selfbonus_log
-- 2) Backup affected data
-- 3) Keep min(id), delete duplicate rows
-- 4) Resync eb_user.integral from remaining ledger rows
-- 5) Rebuild integer balance snapshot for affected users
--
-- Notes:
-- - Designed for MySQL 5.7
-- - Run during low traffic window
-- - Review backup table names before execution
-- 0) Preview duplicate groups
SELECT uid,
wa_selfbonus_logid,
link_id,
COUNT(*) AS cnt,
SUM(integral) AS total_integral,
MIN(id) AS keep_id,
GROUP_CONCAT(id ORDER BY id) AS record_ids
FROM eb_user_integral_record
WHERE link_type = 'selfbonus'
AND type = 1
AND wa_selfbonus_logid IS NOT NULL
GROUP BY uid, wa_selfbonus_logid, link_id
HAVING COUNT(*) > 1;
-- 1) Backup duplicate rows and affected users
DROP TABLE IF EXISTS backup_euir_selfbonus_dups_20260511_0959;
CREATE TABLE backup_euir_selfbonus_dups_20260511_0959 AS
SELECT e.*
FROM eb_user_integral_record e
JOIN (
SELECT uid, wa_selfbonus_logid, link_id, MIN(id) AS keep_id, COUNT(*) AS cnt
FROM eb_user_integral_record
WHERE link_type = 'selfbonus'
AND type = 1
AND wa_selfbonus_logid IS NOT NULL
GROUP BY uid, wa_selfbonus_logid, link_id
HAVING COUNT(*) > 1
) d
ON d.uid = e.uid
AND d.wa_selfbonus_logid = e.wa_selfbonus_logid
AND d.link_id = e.link_id;
DROP TABLE IF EXISTS backup_eb_user_integral_before_fix_20260511_0959;
CREATE TABLE backup_eb_user_integral_before_fix_20260511_0959 AS
SELECT u.*
FROM eb_user u
WHERE u.uid IN (
SELECT DISTINCT uid FROM backup_euir_selfbonus_dups_20260511_0959
);
-- 2) Deduplicate + resync in one transaction
START TRANSACTION;
DROP TEMPORARY TABLE IF EXISTS tmp_dup_groups;
CREATE TEMPORARY TABLE tmp_dup_groups AS
SELECT uid, wa_selfbonus_logid, link_id, MIN(id) AS keep_id
FROM eb_user_integral_record
WHERE link_type = 'selfbonus'
AND type = 1
AND wa_selfbonus_logid IS NOT NULL
GROUP BY uid, wa_selfbonus_logid, link_id
HAVING COUNT(*) > 1;
DROP TEMPORARY TABLE IF EXISTS tmp_affected_uids;
CREATE TEMPORARY TABLE tmp_affected_uids AS
SELECT DISTINCT uid FROM tmp_dup_groups;
DELETE e
FROM eb_user_integral_record e
JOIN tmp_dup_groups d
ON d.uid = e.uid
AND d.wa_selfbonus_logid = e.wa_selfbonus_logid
AND d.link_id = e.link_id
WHERE e.id <> d.keep_id;
UPDATE eb_user u
JOIN (
SELECT r.uid, COALESCE(SUM(r.integral), 0) AS sum_integral
FROM eb_user_integral_record r
JOIN tmp_affected_uids t ON t.uid = r.uid
GROUP BY r.uid
) s ON s.uid = u.uid
SET u.integral = s.sum_integral;
SET @run_uid := 0;
SET @run_bal := 0;
UPDATE eb_user_integral_record e
JOIN (
SELECT t.id, FLOOR(t.running) AS new_balance
FROM (
SELECT s.id,
s.uid,
(@run_bal := IF(@run_uid = s.uid, @run_bal + s.integral, s.integral)) AS running,
(@run_uid := s.uid) AS uid_guard
FROM (
SELECT id, uid, integral
FROM eb_user_integral_record
WHERE uid IN (SELECT uid FROM tmp_affected_uids)
ORDER BY uid, id
) s
) t
) x ON x.id = e.id
SET e.balance = x.new_balance;
COMMIT;
-- 3) Post-checks
SELECT COUNT(*) AS remaining_dup_groups
FROM (
SELECT 1
FROM eb_user_integral_record
WHERE link_type = 'selfbonus'
AND type = 1
AND wa_selfbonus_logid IS NOT NULL
GROUP BY uid, wa_selfbonus_logid, link_id
HAVING COUNT(*) > 1
) a;
SELECT u.uid, u.integral, s.sum_integral
FROM eb_user u
JOIN (
SELECT uid, SUM(integral) AS sum_integral
FROM eb_user_integral_record
GROUP BY uid
) s ON s.uid = u.uid
WHERE u.uid IN (SELECT DISTINCT uid FROM backup_euir_selfbonus_dups_20260511_0959)
ORDER BY u.uid;