From 693c66c25892bcd1d40fd4b8a6cd9a772f13b9d2 Mon Sep 17 00:00:00 2001 From: danaisuiyuan Date: Mon, 11 May 2026 10:04:07 +0800 Subject: [PATCH] =?UTF-8?q?fix(integral):=20=E9=98=B2=E6=AD=A2=E4=B8=AA?= =?UTF-8?q?=E4=BA=BA=E5=A5=96=E9=87=91=E9=87=8D=E5=A4=8D=E7=94=9F=E6=88=90?= =?UTF-8?q?=E7=A7=AF=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将个人奖金转积分流程改为先写唯一流水再加积分,并用 wa_selfbonus_logid 唯一索引兜底多入口并发场景;同时补充历史重复数据修复与索引落地 SQL 脚本。 Co-authored-by: Cursor --- .../UserIntegralConcurrencyServiceImpl.java | 251 +----------------- .../service/impl/WaSelfbonusServiceImpl.java | 236 ++++++++-------- ...unique_index_uk_integral_selfbonus_log.sql | 23 ++ ...x_duplicate_selfbonus_integral_records.sql | 130 +++++++++ 4 files changed, 262 insertions(+), 378 deletions(-) create mode 100644 backend/sql/add_unique_index_uk_integral_selfbonus_log.sql create mode 100644 backend/sql/fix_duplicate_selfbonus_integral_records.sql diff --git a/backend/crmeb-service/src/main/java/com/zbkj/service/service/impl/UserIntegralConcurrencyServiceImpl.java b/backend/crmeb-service/src/main/java/com/zbkj/service/service/impl/UserIntegralConcurrencyServiceImpl.java index a0054b3..eca6b9e 100644 --- a/backend/crmeb-service/src/main/java/com/zbkj/service/service/impl/UserIntegralConcurrencyServiceImpl.java +++ b/backend/crmeb-service/src/main/java/com/zbkj/service/service/impl/UserIntegralConcurrencyServiceImpl.java @@ -1,265 +1,22 @@ package com.zbkj.service.service.impl; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.zbkj.common.model.consignment.WaSelfbonusLog; -import com.zbkj.common.model.user.User; -import com.zbkj.common.model.user.UserIntegralRecord; -import com.zbkj.common.utils.CrmebUtil; -import com.zbkj.service.dao.UserDao; -import com.zbkj.service.dao.UserIntegralRecordDao; -import com.zbkj.service.dao.consignment.WaSelfbonusLogDao; -import com.zbkj.service.service.UserService; import com.zbkj.service.service.WaSelfbonusSyncService; -import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.util.Date; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** - * 用户积分并发安全服务实现类 - * 专门处理高并发场景下的积分更新,避免数据库锁等待超时 + * 保留并发安全服务 bean 名称兼容历史调用。 + * 实际逻辑统一委托到 waSelfbonusSyncService,避免双实现规则漂移。 */ -@Slf4j @Service("userIntegralConcurrencyService") public class UserIntegralConcurrencyServiceImpl { @Autowired - private WaSelfbonusLogDao waSelfbonusLogDao; + private WaSelfbonusSyncService waSelfbonusSyncService; - @Autowired - private UserIntegralRecordDao userIntegralRecordDao; - - @Autowired - private UserDao userDao; - - @Autowired - private UserService userService; - - // 使用ConcurrentHashMap来缓存正在处理的用户ID,防止重复处理 - private final ConcurrentHashMap processingUsers = new ConcurrentHashMap<>(); - - /** - * 同步个人奖金变动到用户积分 - 并发安全版本 - * 根据个人奖金变动记录,为对应的用户增加积分(奖金金额的50%) - */ - @Transactional(rollbackFor = Exception.class) public Map syncSelfbonusToIntegral() { - log.info("开始同步个人奖金变动到用户积分(并发安全版)"); - - int successCount = 0; - int skipCount = 0; - int failCount = 0; - - try { - // 查询最新的个人奖金变动记录 - LambdaQueryWrapper bonusLogWrapper = new LambdaQueryWrapper<>(); - bonusLogWrapper.orderByDesc(WaSelfbonusLog::getCreatedAt); // 按创建时间倒序 - bonusLogWrapper.last("LIMIT 500"); // 限制查询500条,避免一次性处理过多 - List bonusLogList = waSelfbonusLogDao.selectList(bonusLogWrapper); - - for (WaSelfbonusLog bonusLog : bonusLogList) { - try { - // 检查该奖金记录是否已经处理过 - LambdaQueryWrapper checkWrapper = new LambdaQueryWrapper<>(); - checkWrapper.eq(UserIntegralRecord::getWaSelfbonusLogid, bonusLog.getId()); - Integer existCount = userIntegralRecordDao.selectCount(checkWrapper); - - if (existCount != null && existCount > 0) { - log.debug("奖金记录已处理,跳过: bonusLogId={}, userId={}", bonusLog.getId(), bonusLog.getUserId()); - skipCount++; - continue; - } - - // 获取用户ID - Integer ebUserId = bonusLog.getUserId(); - - // 使用同步块确保同一用户不会被重复处理 - Object lock = processingUsers.computeIfAbsent(ebUserId, k -> new Object()); - synchronized (lock) { - try { - // 再次检查积分记录是否已存在(双重检查) - existCount = userIntegralRecordDao.selectCount(checkWrapper); - if (existCount != null && existCount > 0) { - log.debug("奖金记录已在其他线程处理,跳过: bonusLogId={}", bonusLog.getId()); - skipCount++; - continue; - } - - // 查询用户信息 - User user = userDao.selectById(ebUserId); - if (user == null) { - log.warn("未找到对应的系统用户,跳过: waUserId={}", bonusLog.getUserId()); - skipCount++; - continue; - } - - // 验证奖金类型和金额 - if (!isValidBonusLog(bonusLog)) { - log.debug("奖金记录不符合处理条件,跳过: bonusLogId={}, type={}, amount={}", - bonusLog.getId(), bonusLog.getType(), bonusLog.getMoney()); - skipCount++; - continue; - } - - // 计算积分值 - BigDecimal integralValue = calculateIntegralValue(bonusLog.getMoney()); - if (integralValue.compareTo(BigDecimal.ZERO) <= 0) { - log.debug("计算出的积分为0或负数,跳过: bonusLogId={}, integralValue={}", - bonusLog.getId(), integralValue); - skipCount++; - continue; - } - - // 使用CAS方式更新积分,避免锁竞争 - Boolean updateResult = updateIntegralWithRetry(user.getUid(), integralValue, "add", 3); - - if (!updateResult) { - log.error("更新用户积分失败(重试后): userId={}, integralValue={}", user.getUid(), integralValue); - failCount++; - continue; - } - - // 插入积分记录 - UserIntegralRecord integralRecord = createUserIntegralRecord(user.getUid(), bonusLog, integralValue); - int insertResult = userIntegralRecordDao.insert(integralRecord); - if (insertResult <= 0) { - log.error("插入积分记录失败: userId={}, bonusLogId={}", user.getUid(), bonusLog.getId()); - failCount++; - continue; - } - - successCount++; - log.info("成功同步奖金到积分: bonusLogId={}, userId={}, bonusAmount={}, integralValue={}", - bonusLog.getId(), user.getUid(), bonusLog.getMoney(), integralValue); - - } finally { - // 清理锁对象 - processingUsers.remove(ebUserId); - } - } - - } catch (Exception e) { - failCount++; - log.error("处理奖金记录失败: bonusLogId={}, error={}", bonusLog.getId(), e.getMessage(), e); - } - } - - Map result = new HashMap<>(); - result.put("total", bonusLogList.size()); - result.put("successCount", successCount); - result.put("skipCount", skipCount); - result.put("failCount", failCount); - - log.info("同步个人奖金变动到用户积分完成(并发安全版): 总数={}, 成功={}, 跳过={}, 失败={}", - bonusLogList.size(), successCount, skipCount, failCount); - - result.put("message", "同步完成"); - return result; - - } catch (Exception e) { - log.error("同步个人奖金变动到用户积分异常", e); - Map result = new HashMap<>(); - result.put("message", "同步失败: " + e.getMessage()); - result.put("error", e.getMessage()); - return result; - } - } - - /** - * 验证奖金记录是否符合处理条件 - */ - private boolean isValidBonusLog(WaSelfbonusLog bonusLog) { - // 只处理收入类型(type=1) - if (bonusLog.getType() == null || bonusLog.getType() != 1) { - return false; - } - - // 验证奖金金额有效 - if (bonusLog.getMoney() == null || bonusLog.getMoney().compareTo(BigDecimal.ZERO) <= 0) { - return false; - } - - return true; - } - - /** - * 计算积分值 - */ - private BigDecimal calculateIntegralValue(BigDecimal bonusAmount) { - // 计算积分:奖金金额 * 50%,向下取整 - BigDecimal integralDecimal = bonusAmount.multiply(new BigDecimal("0.5")); - return integralDecimal.setScale(3, RoundingMode.DOWN); - } - - /** - * 创建积分记录对象 - */ - private UserIntegralRecord createUserIntegralRecord(Integer userId, WaSelfbonusLog bonusLog, BigDecimal integralValue) { - User user = userDao.selectById(userId); // 重新查询用户获取最新积分 - Integer newIntegral = user != null && user.getIntegral() != null ? user.getIntegral().intValue() : 0; - - UserIntegralRecord integralRecord = new UserIntegralRecord(); - integralRecord.setUid(userId); - integralRecord.setLinkId(String.valueOf(bonusLog.getId())); // 关联奖金记录ID - integralRecord.setLinkType("selfbonus"); // 关联类型:个人奖金 - integralRecord.setType(1); // 类型:1-增加 - integralRecord.setTitle("个人奖金奖励"); - integralRecord.setIntegral(integralValue); - integralRecord.setBalance(newIntegral); // 实际上应该是更新后的积分,这里可能需要调整 - integralRecord.setMark(String.format("个人奖金变动奖励,奖金金额:%.3f,积分:%d", - bonusLog.getMoney(), integralValue.intValue())); - integralRecord.setStatus(3); // 状态:3-完成 - integralRecord.setWaSelfbonusLogid(bonusLog.getId()); // 关联个人奖金记录ID - integralRecord.setCreateTime(new Date()); - integralRecord.setUpdateTime(new Date()); - - return integralRecord; - } - - /** - * 带重试机制的积分更新 - */ - private Boolean updateIntegralWithRetry(Integer uid, BigDecimal integral, String type, int maxRetries) { - int attempts = 0; - Exception lastException = null; - - while (attempts < maxRetries) { - try { - attempts++; - - // 直接更新积分,不再依赖乐观锁 - Boolean result = userService.operationIntegral(uid, integral, BigDecimal.ZERO, type); - - if (result) { - return true; - } else { - log.warn("积分更新失败,准备重试 (attempt {}/{})", attempts, maxRetries); - } - } catch (Exception e) { - lastException = e; - log.warn("积分更新异常,准备重试 (attempt {}/{}), error: {}", attempts, maxRetries, e.getMessage()); - - // 如果是数据库锁等待超时,等待一段时间再重试 - if (e.getMessage() != null && e.getMessage().contains("Lock wait timeout")) { - try { - Thread.sleep(100 * attempts); // 指数退避 - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - break; - } - } - } - } - - log.error("积分更新达到最大重试次数仍然失败,最后一次异常: ", lastException); - return false; + return waSelfbonusSyncService.syncSelfbonusToIntegral(); } } \ No newline at end of file diff --git a/backend/crmeb-service/src/main/java/com/zbkj/service/service/impl/WaSelfbonusServiceImpl.java b/backend/crmeb-service/src/main/java/com/zbkj/service/service/impl/WaSelfbonusServiceImpl.java index 300f43e..57f287e 100644 --- a/backend/crmeb-service/src/main/java/com/zbkj/service/service/impl/WaSelfbonusServiceImpl.java +++ b/backend/crmeb-service/src/main/java/com/zbkj/service/service/impl/WaSelfbonusServiceImpl.java @@ -11,8 +11,8 @@ import com.zbkj.service.service.UserService; import com.zbkj.service.service.WaSelfbonusSyncService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionTemplate; import java.math.BigDecimal; @@ -44,12 +44,14 @@ public class WaSelfbonusServiceImpl implements WaSelfbonusSyncService { @Autowired private UserService userService; + @Autowired + private TransactionTemplate transactionTemplate; + /** * 同步个人奖金变动到用户积分 * 根据个人奖金变动记录,为对应的用户增加积分(奖金金额的50%) */ @Override - @Transactional(rollbackFor = Exception.class) public Map syncSelfbonusToIntegral() { log.info("开始同步个人奖金变动到用户积分"); @@ -66,101 +68,16 @@ public class WaSelfbonusServiceImpl implements WaSelfbonusSyncService { for (WaSelfbonusLog bonusLog : bonusLogList) { try { - // 检查该奖金记录是否已经处理过(通过 waSelfbonusLogid 字段查询积分记录) - LambdaQueryWrapper checkWrapper = new LambdaQueryWrapper<>(); - checkWrapper.eq(UserIntegralRecord::getWaSelfbonusLogid, bonusLog.getId()); - Integer existCount = userIntegralRecordDao.selectCount(checkWrapper); - - if (existCount != null && existCount > 0) { - // 已处理过,跳过 - log.debug("奖金记录已处理,跳过: bonusLogId={}, userId={}", bonusLog.getId(), bonusLog.getUserId()); + ProcessStatus processStatus = transactionTemplate.execute(status -> processBonusLogAtomically(bonusLog)); + if (processStatus == ProcessStatus.SKIP) { skipCount++; continue; } - - // 根据 wa_users 的 user_id 查找对应的 eb_user 表的 uid - // 注意:wa_users.id 对应 eb_user.uid(在同步时已建立关联) - Integer ebUserId = bonusLog.getUserId(); // wa_users.id 就是 eb_user.uid - - // 查询 eb_user 表中的用户 - User user = userDao.selectById(ebUserId); - if (user == null) { - log.warn("未找到对应的系统用户,跳过: waUserId={}", bonusLog.getUserId()); - skipCount++; + if (processStatus == ProcessStatus.SUCCESS) { + successCount++; continue; } - - // 计算积分值:个人奖金变更金额的50%(只处理收入类型的奖金变动) - if (bonusLog.getType() == null || bonusLog.getType() != 1) { - // 只处理收入类型(type=1),支出类型不处理 - log.debug("跳过非收入类型的奖金变动: bonusLogId={}, type={}", bonusLog.getId(), bonusLog.getType()); - skipCount++; - continue; - } - - // 奖金金额(收入为正数) - BigDecimal bonusAmount = bonusLog.getMoney(); - if (bonusAmount == null || bonusAmount.compareTo(BigDecimal.ZERO) <= 0) { - log.debug("奖金金额无效,跳过: bonusLogId={}, money={}", bonusLog.getId(), bonusAmount); - skipCount++; - continue; - } - - // 计算积分:奖金金额 * 50%,向下取整 - BigDecimal integralDecimal = bonusAmount.multiply(new BigDecimal("0.5")); - - BigDecimal integralValue = integralDecimal.setScale(3, RoundingMode.DOWN); - - if (integralValue.compareTo(BigDecimal.ZERO) <= 0) { - log.debug("计算出的积分为0,跳过: bonusLogId={}, integralValue={}", bonusLog.getId(), integralValue); - skipCount++; - continue; - } - - // 更新用户积分 - 不再需要当前积分值作为乐观锁条件 - Boolean updateResult = userService.operationIntegral( - user.getUid(), - integralValue, - BigDecimal.valueOf(0), // 不再使用当前积分作为乐观锁条件 - "add" - ); - - if (!updateResult) { - log.error("更新用户积分失败: userId={}, integralValue={}", user.getUid(), integralValue); - failCount++; - continue; - } - - // 重新查询用户获取最新积分 - user = userDao.selectById(user.getUid()); - Integer newIntegral = user.getIntegral() != null ? user.getIntegral().intValue() : 0; - - // 新增积分记录 - UserIntegralRecord integralRecord = new UserIntegralRecord(); - integralRecord.setUid(user.getUid()); - integralRecord.setLinkId(String.valueOf(bonusLog.getId())); // 关联奖金记录ID - integralRecord.setLinkType("selfbonus"); // 关联类型:个人奖金 - integralRecord.setType(1); // 类型:1-增加 - integralRecord.setTitle("个人奖金奖励"); - integralRecord.setIntegral(integralValue); - integralRecord.setBalance(newIntegral); - integralRecord.setMark(String.format("个人奖金变动奖励,奖金金额:%.3f,积分:%d", - bonusAmount, integralValue.intValue())); - integralRecord.setStatus(3); // 状态:3-完成 - integralRecord.setWaSelfbonusLogid(bonusLog.getId()); // 关联个人奖金记录ID - integralRecord.setCreateTime(new Date()); - integralRecord.setUpdateTime(new Date()); - - int insertResult = userIntegralRecordDao.insert(integralRecord); - if (insertResult <= 0) { - log.error("插入积分记录失败: userId={}, bonusLogId={}", user.getUid(), bonusLog.getId()); - failCount++; - continue; - } - - successCount++; - log.info("成功同步奖金到积分: bonusLogId={}, userId={}, bonusAmount={}, integralValue={}", - bonusLog.getId(), user.getUid(), bonusAmount, integralValue); + failCount++; } catch (Exception e) { failCount++; @@ -188,47 +105,104 @@ public class WaSelfbonusServiceImpl implements WaSelfbonusSyncService { return result; } } - + /** - * 带重试机制的用户积分更新 + * 原子处理单条奖金日志: + * 1) 先插入积分流水(受唯一索引保护) + * 2) 插入成功后再更新用户总积分 + * 3) 回填本条流水的 balance */ - private Boolean updateUserIntegralWithRetry(Integer uid, BigDecimal integralValue, int maxRetries) { - int attempts = 0; - Exception lastException = null; - - while (attempts < maxRetries) { - try { - attempts++; - - Boolean result = userService.operationIntegral( - uid, - integralValue, - BigDecimal.ZERO, // 不再使用当前积分作为乐观锁条件 - "add" - ); - - if (result) { - return true; - } else { - log.warn("积分更新失败,准备重试 (attempt {}/{})", attempts, maxRetries); - } - } catch (Exception e) { - lastException = e; - log.warn("积分更新异常,准备重试 (attempt {}/{}), error: {}", attempts, maxRetries, e.getMessage()); - - // 如果是数据库锁等待超时,等待一段时间再重试 - if (e.getMessage() != null && e.getMessage().contains("Lock wait timeout")) { - try { - Thread.sleep(100 * attempts); // 指数退避 - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - break; - } - } - } + private ProcessStatus processBonusLogAtomically(WaSelfbonusLog bonusLog) { + if (isAlreadyProcessed(bonusLog.getId())) { + log.debug("奖金记录已处理,跳过: bonusLogId={}, userId={}", bonusLog.getId(), bonusLog.getUserId()); + return ProcessStatus.SKIP; } - - log.error("积分更新达到最大重试次数仍然失败,最后一次异常: ", lastException); - return false; + + Integer ebUserId = bonusLog.getUserId(); + User user = userDao.selectById(ebUserId); + if (user == null) { + log.warn("未找到对应的系统用户,跳过: waUserId={}", bonusLog.getUserId()); + return ProcessStatus.SKIP; + } + + if (bonusLog.getType() == null || bonusLog.getType() != 1) { + log.debug("跳过非收入类型的奖金变动: bonusLogId={}, type={}", bonusLog.getId(), bonusLog.getType()); + return ProcessStatus.SKIP; + } + + BigDecimal bonusAmount = bonusLog.getMoney(); + if (bonusAmount == null || bonusAmount.compareTo(BigDecimal.ZERO) <= 0) { + log.debug("奖金金额无效,跳过: bonusLogId={}, money={}", bonusLog.getId(), bonusAmount); + return ProcessStatus.SKIP; + } + + BigDecimal integralValue = bonusAmount.multiply(new BigDecimal("0.5")).setScale(3, RoundingMode.DOWN); + if (integralValue.compareTo(BigDecimal.ZERO) <= 0) { + log.debug("计算出的积分为0,跳过: bonusLogId={}, integralValue={}", bonusLog.getId(), integralValue); + return ProcessStatus.SKIP; + } + + UserIntegralRecord integralRecord = buildIntegralRecord(user.getUid(), bonusLog, bonusAmount, integralValue); + try { + int insertResult = userIntegralRecordDao.insert(integralRecord); + if (insertResult <= 0) { + throw new IllegalStateException("插入积分记录失败"); + } + } catch (DuplicateKeyException duplicateKeyException) { + // 数据库唯一索引兜底,保证多入口并发只处理一次 + log.info("奖金记录并发重复处理,已跳过: bonusLogId={}, userId={}", bonusLog.getId(), user.getUid()); + return ProcessStatus.SKIP; + } + + Boolean updateResult = userService.operationIntegral( + user.getUid(), + integralValue, + BigDecimal.ZERO, + "add" + ); + + if (!updateResult) { + throw new IllegalStateException(String.format("更新用户积分失败: userId=%s, integralValue=%s", user.getUid(), integralValue)); + } + + User latestUser = userDao.selectById(user.getUid()); + Integer latestIntegral = latestUser != null && latestUser.getIntegral() != null ? latestUser.getIntegral().intValue() : 0; + integralRecord.setBalance(latestIntegral); + integralRecord.setUpdateTime(new Date()); + userIntegralRecordDao.updateById(integralRecord); + + log.info("成功同步奖金到积分: bonusLogId={}, userId={}, bonusAmount={}, integralValue={}", + bonusLog.getId(), user.getUid(), bonusAmount, integralValue); + return ProcessStatus.SUCCESS; + } + + private boolean isAlreadyProcessed(Integer waSelfbonusLogId) { + LambdaQueryWrapper checkWrapper = new LambdaQueryWrapper<>(); + checkWrapper.eq(UserIntegralRecord::getWaSelfbonusLogid, waSelfbonusLogId); + Integer existCount = userIntegralRecordDao.selectCount(checkWrapper); + return existCount != null && existCount > 0; + } + + private UserIntegralRecord buildIntegralRecord(Integer uid, WaSelfbonusLog bonusLog, BigDecimal bonusAmount, BigDecimal integralValue) { + UserIntegralRecord integralRecord = new UserIntegralRecord(); + integralRecord.setUid(uid); + integralRecord.setLinkId(String.valueOf(bonusLog.getId())); + integralRecord.setLinkType("selfbonus"); + integralRecord.setType(1); + integralRecord.setTitle("个人奖金奖励"); + integralRecord.setIntegral(integralValue); + integralRecord.setBalance(0); + integralRecord.setMark(String.format("个人奖金变动奖励,奖金金额:%.3f,积分:%.3f", + bonusAmount, integralValue)); + integralRecord.setStatus(3); + integralRecord.setWaSelfbonusLogid(bonusLog.getId()); + integralRecord.setCreateTime(new Date()); + integralRecord.setUpdateTime(new Date()); + return integralRecord; + } + + private enum ProcessStatus { + SUCCESS, + SKIP } } diff --git a/backend/sql/add_unique_index_uk_integral_selfbonus_log.sql b/backend/sql/add_unique_index_uk_integral_selfbonus_log.sql new file mode 100644 index 0000000..cfe5f52 --- /dev/null +++ b/backend/sql/add_unique_index_uk_integral_selfbonus_log.sql @@ -0,0 +1,23 @@ +-- Add strong idempotency guard for selfbonus -> integral conversion. +-- This index guarantees one wa_selfbonus_log can map to at most one integral record. +-- Prerequisite: clear duplicate wa_selfbonus_logid rows first. + +-- Pre-checks +SELECT wa_selfbonus_logid, COUNT(*) AS cnt +FROM eb_user_integral_record +WHERE wa_selfbonus_logid IS NOT NULL +GROUP BY wa_selfbonus_logid +HAVING COUNT(*) > 1 +LIMIT 20; + +SELECT COUNT(*) AS zero_cnt +FROM eb_user_integral_record +WHERE wa_selfbonus_logid = 0; + +-- Apply unique index +ALTER TABLE eb_user_integral_record +ADD UNIQUE KEY uk_integral_selfbonus_log (wa_selfbonus_logid); + +-- Verify index exists +SHOW INDEX FROM eb_user_integral_record +WHERE Key_name = 'uk_integral_selfbonus_log'; diff --git a/backend/sql/fix_duplicate_selfbonus_integral_records.sql b/backend/sql/fix_duplicate_selfbonus_integral_records.sql new file mode 100644 index 0000000..8879c7d --- /dev/null +++ b/backend/sql/fix_duplicate_selfbonus_integral_records.sql @@ -0,0 +1,130 @@ +-- Purpose: +-- 1) Find duplicate selfbonus integral rows generated from same wa_selfbonus_log +-- 2) Backup affected data +-- 3) Keep min(id), delete duplicate rows +-- 4) Resync eb_user.integral from remaining ledger rows +-- 5) Rebuild integer balance snapshot for affected users +-- +-- Notes: +-- - Designed for MySQL 5.7 +-- - Run during low traffic window +-- - Review backup table names before execution + +-- 0) Preview duplicate groups +SELECT uid, + wa_selfbonus_logid, + link_id, + COUNT(*) AS cnt, + SUM(integral) AS total_integral, + MIN(id) AS keep_id, + GROUP_CONCAT(id ORDER BY id) AS record_ids +FROM eb_user_integral_record +WHERE link_type = 'selfbonus' + AND type = 1 + AND wa_selfbonus_logid IS NOT NULL +GROUP BY uid, wa_selfbonus_logid, link_id +HAVING COUNT(*) > 1; + +-- 1) Backup duplicate rows and affected users +DROP TABLE IF EXISTS backup_euir_selfbonus_dups_20260511_0959; +CREATE TABLE backup_euir_selfbonus_dups_20260511_0959 AS +SELECT e.* +FROM eb_user_integral_record e +JOIN ( + SELECT uid, wa_selfbonus_logid, link_id, MIN(id) AS keep_id, COUNT(*) AS cnt + FROM eb_user_integral_record + WHERE link_type = 'selfbonus' + AND type = 1 + AND wa_selfbonus_logid IS NOT NULL + GROUP BY uid, wa_selfbonus_logid, link_id + HAVING COUNT(*) > 1 +) d + ON d.uid = e.uid + AND d.wa_selfbonus_logid = e.wa_selfbonus_logid + AND d.link_id = e.link_id; + +DROP TABLE IF EXISTS backup_eb_user_integral_before_fix_20260511_0959; +CREATE TABLE backup_eb_user_integral_before_fix_20260511_0959 AS +SELECT u.* +FROM eb_user u +WHERE u.uid IN ( + SELECT DISTINCT uid FROM backup_euir_selfbonus_dups_20260511_0959 +); + +-- 2) Deduplicate + resync in one transaction +START TRANSACTION; + +DROP TEMPORARY TABLE IF EXISTS tmp_dup_groups; +CREATE TEMPORARY TABLE tmp_dup_groups AS +SELECT uid, wa_selfbonus_logid, link_id, MIN(id) AS keep_id +FROM eb_user_integral_record +WHERE link_type = 'selfbonus' + AND type = 1 + AND wa_selfbonus_logid IS NOT NULL +GROUP BY uid, wa_selfbonus_logid, link_id +HAVING COUNT(*) > 1; + +DROP TEMPORARY TABLE IF EXISTS tmp_affected_uids; +CREATE TEMPORARY TABLE tmp_affected_uids AS +SELECT DISTINCT uid FROM tmp_dup_groups; + +DELETE e +FROM eb_user_integral_record e +JOIN tmp_dup_groups d + ON d.uid = e.uid + AND d.wa_selfbonus_logid = e.wa_selfbonus_logid + AND d.link_id = e.link_id +WHERE e.id <> d.keep_id; + +UPDATE eb_user u +JOIN ( + SELECT r.uid, COALESCE(SUM(r.integral), 0) AS sum_integral + FROM eb_user_integral_record r + JOIN tmp_affected_uids t ON t.uid = r.uid + GROUP BY r.uid +) s ON s.uid = u.uid +SET u.integral = s.sum_integral; + +SET @run_uid := 0; +SET @run_bal := 0; +UPDATE eb_user_integral_record e +JOIN ( + SELECT t.id, FLOOR(t.running) AS new_balance + FROM ( + SELECT s.id, + s.uid, + (@run_bal := IF(@run_uid = s.uid, @run_bal + s.integral, s.integral)) AS running, + (@run_uid := s.uid) AS uid_guard + FROM ( + SELECT id, uid, integral + FROM eb_user_integral_record + WHERE uid IN (SELECT uid FROM tmp_affected_uids) + ORDER BY uid, id + ) s + ) t +) x ON x.id = e.id +SET e.balance = x.new_balance; + +COMMIT; + +-- 3) Post-checks +SELECT COUNT(*) AS remaining_dup_groups +FROM ( + SELECT 1 + FROM eb_user_integral_record + WHERE link_type = 'selfbonus' + AND type = 1 + AND wa_selfbonus_logid IS NOT NULL + GROUP BY uid, wa_selfbonus_logid, link_id + HAVING COUNT(*) > 1 +) a; + +SELECT u.uid, u.integral, s.sum_integral +FROM eb_user u +JOIN ( + SELECT uid, SUM(integral) AS sum_integral + FROM eb_user_integral_record + GROUP BY uid +) s ON s.uid = u.uid +WHERE u.uid IN (SELECT DISTINCT uid FROM backup_euir_selfbonus_dups_20260511_0959) +ORDER BY u.uid;