diff --git a/.gitignore b/.gitignore index 1b6f070..832efa4 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,8 @@ deploy/docker/scripts/server.env deploy/docker/step1-integral/.env deploy/docker/step1-integral/houtai.env deploy/docker/step2-single-shop/.env + +# Local migration backups and generated Python cache +docs/sql/backups/ +__pycache__/ +**/__pycache__/ diff --git a/docs/com-sqszx202-data-imgration.md b/docs/com-sqszx202-data-imgration.md new file mode 100644 index 0000000..23b9fdc --- /dev/null +++ b/docs/com-sqszx202-data-imgration.md @@ -0,0 +1,80 @@ +# 公司名称:宿迁盛泽鑫商贸 + +- host ip: 59.110.91.202 + +## mysql数据库配置信息 + +- datasource: + rds: rm-bp1a178eq62lxba9xbo.mysql.rds.aliyuncs.com + name: sqszx202 + username: yangtangyoupin + password: 5Fn8eWrbYFtAhCZw + + +## 数据清理任务 + +- **用户数据范围**:`wa_users.id` / `eb_user.uid` 保留名单: + `93164, 93133, 93132, 93113, 93216, 93230, 93238, 93258, 93277, 93283, 93284, 93291, 93293, 93308, 93299, 93290, 93280, 93279, 93255, 93252, 93251, 93250, 93249, 93248, 93223, 93221, 93111, 93099, 93263, 93265, 93272, 93270, 93269, 93268, 93267, 93266, 93276, 93278, 93286, 93296, 93309, 93281, 93275, 93271, 93139, 93264, 93247, 93218, 93217, 93194, 93142` + +- 保留wa_users表中id在用户id数据范围的 ,删除其余用户数据 +- 保留eb_user表中uid在用户id数据范围的 ,删除其余用户数据 + +- wa_order +清空wa_order表中数据 + +- wa_merchandise +保留数据库中“created_at >= 2026-06-12”并且seller_id或buyer_id在用户id数据范围的寄售商品,删除其余数据 +(当前库表字段为 `user_id` 表示卖家,实现时按 `user_id` 与日期条件过滤。) + +- wa_selfbonus_log +只保留 `user_id` 在用户id数据范围内的记录,删除其余数据 + +- wa_sharebonus_log +只保留 `user_id` 在用户id数据范围内的记录,删除其余数据 + +- wa_coupon_log +只保留 `user_id` 在用户id数据范围内的记录,删除其余数据 + +- wa_withdraw +清空wa_withdraw表中数据 + +- eb_store_order +清空eb_store_order表中数据 + +- eb_user_integral_record +只保留用户在名单内的记录;表字段为 `uid`(与 `wa_users.id` / `eb_user.uid` 对应),实现按 `uid` 过滤。 + +## 执行结果 + +- 已于 **2026-06-14** 按当前保留名单执行清理并 `COMMIT`。 +- 执行脚本:`docs/sql/run_com_sqszx202_cleanup.py` +- 执行前备份:`docs/sql/backups/sqszx202_cleanup_before_20260614_090658.sql.gz`(已通过 `gzip -t` 校验) +- `wa_merchandise` 从源 dump 解析结果:dump 中共 3156 行,满足 `created_at >= 2026-06-12` 且 `user_id` 在名单内的保留商品为 54 行。 +- 删除行数: + - `wa_order`:3168 + - `wa_withdraw`:171 + - `eb_store_order`:348 + - `wa_merchandise`:3102 + - `wa_selfbonus_log`:2255 + - `wa_sharebonus_log`:2514 + - `wa_coupon_log`:208 + - `eb_user_integral_record`:2343 + - `eb_user`:79 + - `wa_users`:79 +- 保留后行数: + - `wa_order`:0 + - `wa_withdraw`:0 + - `eb_store_order`:0 + - `wa_merchandise`:54 + - `wa_selfbonus_log`:1644 + - `wa_sharebonus_log`:1791 + - `wa_coupon_log`:8 + - `eb_user_integral_record`:1993 + - `eb_user`:51 + - `wa_users`:51 +- 复核:清理后再次 dry-run,以上表剩余待删除行数均为 0。 + +## 相关文件 + +- 新公司初始会员信息: '/Users/mac/Works26/miao-july/宿迁盛泽鑫/盛泽鑫团队成员信息表.xlsx' +- 源数据dump文件: '/Users/mac/Works26/miao-july/宿迁盛泽鑫/anpengran-yangtangyoupin_2026-06-14_02-15-02_mysql_data.sql' diff --git a/docs/sql/run_com_sqszx202_cleanup.py b/docs/sql/run_com_sqszx202_cleanup.py new file mode 100644 index 0000000..27e4b11 --- /dev/null +++ b/docs/sql/run_com_sqszx202_cleanup.py @@ -0,0 +1,388 @@ +#!/usr/bin/env python3 +"""Run data cleanup for docs/com-sqszx202-data-imgration.md. + +Default mode is a read-only dry run. Use --execute to create a local SQL backup, +delete rows according to the migration document, and commit the transaction. +""" +from __future__ import annotations + +import argparse +import gzip +import json +import re +import sys +from datetime import datetime +from pathlib import Path +from typing import Any + +import pymysql +from pymysql.cursors import SSCursor + +ROOT = Path(__file__).resolve().parents[2] +DOC = ROOT / "docs" / "com-sqszx202-data-imgration.md" +DEFAULT_DUMP = Path("/Users/mac/Works26/miao-july/宿迁盛泽鑫/anpengran-yangtangyoupin_2026-06-14_02-15-02_mysql_data.sql") +CUTOFF = "2026-06-12 00:00:00" +TABLES = [ + "wa_order", + "wa_withdraw", + "eb_store_order", + "wa_merchandise", + "wa_selfbonus_log", + "wa_sharebonus_log", + "wa_coupon_log", + "eb_user_integral_record", + "eb_user", + "wa_users", +] +FILTERS = { + "wa_users": "id", + "eb_user": "uid", + "wa_selfbonus_log": "user_id", + "wa_sharebonus_log": "user_id", + "wa_coupon_log": "user_id", + "eb_user_integral_record": "uid", +} +CLEAR_TABLES = ["wa_order", "wa_withdraw", "eb_store_order"] + + +def parse_doc() -> tuple[dict[str, str], list[int]]: + text = DOC.read_text(encoding="utf-8") + def grab(name: str) -> str: + m = re.search(rf"^\s*{name}:\s*(.+?)\s*$", text, flags=re.M) + if not m: + raise ValueError(f"missing datasource {name} in {DOC}") + return m.group(1).strip() + + ids_match = re.search(r"保留名单:\s*\n\s*`([^`]+)`", text) + if not ids_match: + raise ValueError(f"missing user id keep list in {DOC}") + ids = [int(x.strip()) for x in ids_match.group(1).split(",") if x.strip()] + if len(ids) != len(set(ids)): + raise ValueError("duplicate ids in keep list") + config = { + "host": grab("rds"), + "database": grab("name"), + "user": grab("username"), + "password": grab("password"), + } + return config, ids + + +def split_top_level_tuples(values_blob: str) -> list[str]: + out: list[str] = [] + i = 0 + n = len(values_blob) + while i < n: + if values_blob[i] != "(": + i += 1 + continue + depth = 0 + in_quote = False + start = i + j = i + while j < n: + c = values_blob[j] + if in_quote: + if c == "\\": + j += 2 + continue + if c == "'": + if j + 1 < n and values_blob[j + 1] == "'": + j += 2 + continue + in_quote = False + j += 1 + continue + if c == "'": + in_quote = True + elif c == "(": + depth += 1 + elif c == ")": + depth -= 1 + if depth == 0: + out.append(values_blob[start : j + 1]) + j += 1 + break + j += 1 + i = j + return out + + +def split_mysql_fields(inner: str) -> list[str]: + out: list[str] = [] + cur: list[str] = [] + i = 0 + n = len(inner) + while i < n: + c = inner[i] + if c == "'": + cur.append(c) + i += 1 + while i < n: + c = inner[i] + cur.append(c) + if c == "\\": + if i + 1 < n: + cur.append(inner[i + 1]) + i += 2 + continue + if c == "'": + if i + 1 < n and inner[i + 1] == "'": + cur.append(inner[i + 1]) + i += 2 + continue + i += 1 + break + i += 1 + continue + if c == ",": + out.append("".join(cur).strip()) + cur = [] + i += 1 + continue + cur.append(c) + i += 1 + out.append("".join(cur).strip()) + return out + + +def unquote_sql_string(raw: str) -> str: + raw = raw.strip() + if raw.startswith("'") and raw.endswith("'"): + body = raw[1:-1] + body = body.replace("''", "'") + body = body.replace("\\'", "'").replace("\\\\", "\\") + return body + return raw + + +def extract_wa_merchandise_keep_ids(dump: Path, keep_users: set[int]) -> list[int]: + if not dump.is_file(): + raise FileNotFoundError(f"dump not found: {dump}") + keep_ids: set[int] = set() + total_rows = 0 + insert_lines = 0 + with dump.open("r", encoding="utf-8", errors="replace") as f: + for line in f: + if "INSERT INTO `wa_merchandise`" not in line or "VALUES" not in line: + continue + insert_lines += 1 + blob = line[line.index("VALUES") + len("VALUES") :].strip() + if blob.endswith(";"): + blob = blob[:-1].strip() + for tup in split_top_level_tuples(blob): + total_rows += 1 + fields = split_mysql_fields(tup.strip()[1:-1]) + if len(fields) < 10: + raise ValueError(f"malformed wa_merchandise tuple: {tup[:120]}") + row_id = int(fields[0]) + user_id = int(fields[2]) + created_at = unquote_sql_string(fields[8]) + if created_at >= CUTOFF and user_id in keep_users: + keep_ids.add(row_id) + if insert_lines == 0: + raise ValueError("no INSERT INTO `wa_merchandise` found in dump") + print(f"dump_wa_merchandise_rows={total_rows} keep_by_rule={len(keep_ids)}") + return sorted(keep_ids) + + +def connect(config: dict[str, str], cursorclass=None): + kwargs = { + "host": config["host"], + "user": config["user"], + "password": config["password"], + "database": config["database"], + "charset": "utf8mb4", + "autocommit": False, + "read_timeout": 120, + "write_timeout": 120, + } + if cursorclass is not None: + kwargs["cursorclass"] = cursorclass + return pymysql.connect(**kwargs) + + +def placeholders(items: list[int] | set[int]) -> str: + if not items: + return "NULL" + return ",".join(["%s"] * len(items)) + + +def count_where(cur, table: str, where: str = "1=1", params: tuple[Any, ...] = ()) -> int: + cur.execute(f"SELECT COUNT(*) FROM `{table}` WHERE {where}", params) + return int(cur.fetchone()[0]) + + +def inspect_schema(cur) -> None: + cur.execute("SELECT DATABASE()") + database = cur.fetchone()[0] + if database != "sqszx202": + raise RuntimeError(f"refusing to run against database {database!r}") + for table in TABLES: + cur.execute("SHOW TABLES LIKE %s", (table,)) + if not cur.fetchone(): + raise RuntimeError(f"missing table `{table}`") + required = { + "wa_users": {"id"}, + "eb_user": {"uid"}, + "wa_order": set(), + "wa_withdraw": set(), + "eb_store_order": set(), + "wa_merchandise": {"id", "user_id", "created_at"}, + "wa_selfbonus_log": {"user_id"}, + "wa_sharebonus_log": {"user_id"}, + "wa_coupon_log": {"user_id"}, + "eb_user_integral_record": {"uid"}, + } + for table, cols in required.items(): + if not cols: + continue + cur.execute(f"SHOW COLUMNS FROM `{table}`") + actual = {row[0] for row in cur.fetchall()} + missing = cols - actual + if missing: + raise RuntimeError(f"table `{table}` missing columns: {sorted(missing)}") + + +def collect_counts(cur, keep_users: list[int], keep_merchandise_ids: list[int]) -> dict[str, dict[str, int]]: + counts: dict[str, dict[str, int]] = {} + user_clause = placeholders(keep_users) + merch_clause = placeholders(keep_merchandise_ids) + for table in CLEAR_TABLES: + total = count_where(cur, table) + counts[table] = {"before": total, "keep": 0, "delete": total} + for table, col in FILTERS.items(): + total = count_where(cur, table) + keep = count_where(cur, table, f"`{col}` IN ({user_clause})", tuple(keep_users)) + counts[table] = {"before": total, "keep": keep, "delete": total - keep} + total = count_where(cur, "wa_merchandise") + keep = ( + count_where(cur, "wa_merchandise", f"`id` IN ({merch_clause})", tuple(keep_merchandise_ids)) + if keep_merchandise_ids + else 0 + ) + counts["wa_merchandise"] = {"before": total, "keep": keep, "delete": total - keep} + return counts + + +def print_counts(title: str, counts: dict[str, dict[str, int]]) -> None: + print(title) + for table in TABLES: + c = counts[table] + print(f" {table}: before={c['before']} keep={c['keep']} delete={c['delete']}") + + +def backup_tables(config: dict[str, str], backup_path: Path) -> None: + backup_path.parent.mkdir(parents=True, exist_ok=True) + backup_conn = connect(config, cursorclass=SSCursor) + try: + with gzip.open(backup_path, "wt", encoding="utf-8") as out: + out.write("-- sqszx202 cleanup backup\n") + out.write(f"-- created_at: {datetime.now().isoformat(timespec='seconds')}\n") + out.write("SET NAMES utf8mb4;\n") + for table in TABLES: + with backup_conn.cursor() as cur: + cur.execute(f"SHOW CREATE TABLE `{table}`") + row = cur.fetchone() + create_sql = row[1] + out.write(f"\n-- Table `{table}`\n") + out.write(f"DROP TABLE IF EXISTS `{table}`;\n") + out.write(create_sql + ";\n") + with backup_conn.cursor() as cur: + cur.execute(f"SELECT * FROM `{table}`") + batch: list[str] = [] + row_count = 0 + for row in cur: + batch.append("(" + ",".join(backup_conn.literal(v) for v in row) + ")") + row_count += 1 + if len(batch) >= 200: + out.write(f"INSERT INTO `{table}` VALUES\n") + out.write(",\n".join(batch) + ";\n") + batch = [] + if batch: + out.write(f"INSERT INTO `{table}` VALUES\n") + out.write(",\n".join(batch) + ";\n") + print(f"backup {table}: rows={row_count}") + finally: + backup_conn.close() + + +def execute_cleanup(cur, keep_users: list[int], keep_merchandise_ids: list[int]) -> dict[str, int]: + user_clause = placeholders(keep_users) + merch_clause = placeholders(keep_merchandise_ids) + deleted: dict[str, int] = {} + + cur.execute("SET FOREIGN_KEY_CHECKS = 0") + for table in CLEAR_TABLES: + cur.execute(f"DELETE FROM `{table}`") + deleted[table] = cur.rowcount + cur.execute(f"DELETE FROM `wa_merchandise` WHERE `id` NOT IN ({merch_clause})", tuple(keep_merchandise_ids)) + deleted["wa_merchandise"] = cur.rowcount + for table in ["wa_selfbonus_log", "wa_sharebonus_log", "wa_coupon_log"]: + cur.execute(f"DELETE FROM `{table}` WHERE `user_id` NOT IN ({user_clause})", tuple(keep_users)) + deleted[table] = cur.rowcount + cur.execute(f"DELETE FROM `eb_user_integral_record` WHERE `uid` NOT IN ({user_clause})", tuple(keep_users)) + deleted["eb_user_integral_record"] = cur.rowcount + cur.execute(f"DELETE FROM `eb_user` WHERE `uid` NOT IN ({user_clause})", tuple(keep_users)) + deleted["eb_user"] = cur.rowcount + cur.execute(f"DELETE FROM `wa_users` WHERE `id` NOT IN ({user_clause})", tuple(keep_users)) + deleted["wa_users"] = cur.rowcount + cur.execute("SET FOREIGN_KEY_CHECKS = 1") + return deleted + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--execute", action="store_true", help="perform DELETEs and COMMIT") + parser.add_argument("--dump", type=Path, default=DEFAULT_DUMP) + parser.add_argument("--backup-dir", type=Path, default=ROOT / "docs" / "sql" / "backups") + args = parser.parse_args() + + config, keep_users = parse_doc() + keep_merchandise_ids = extract_wa_merchandise_keep_ids(args.dump, set(keep_users)) + print(f"keep_user_ids={len(keep_users)} keep_wa_merchandise_ids={len(keep_merchandise_ids)}") + + conn = connect(config) + try: + with conn.cursor() as cur: + inspect_schema(cur) + before = collect_counts(cur, keep_users, keep_merchandise_ids) + print_counts("before_counts", before) + if not args.execute: + print("dry_run_only=true") + conn.rollback() + return 0 + + stamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_path = args.backup_dir / f"sqszx202_cleanup_before_{stamp}.sql.gz" + print(f"backup_path={backup_path}") + backup_tables(config, backup_path) + + with conn.cursor() as cur: + deleted = execute_cleanup(cur, keep_users, keep_merchandise_ids) + after = collect_counts(cur, keep_users, keep_merchandise_ids) + print("deleted_rows") + print(json.dumps(deleted, ensure_ascii=False, indent=2)) + print_counts("after_counts_before_commit", after) + conn.commit() + print("COMMIT ok") + + with conn.cursor() as cur: + final_counts = collect_counts(cur, keep_users, keep_merchandise_ids) + print_counts("final_counts", final_counts) + return 0 + except Exception as e: + try: + with conn.cursor() as cur: + cur.execute("SET FOREIGN_KEY_CHECKS = 1") + except Exception: + pass + conn.rollback() + print(f"ROLLBACK: {e}", file=sys.stderr) + raise + finally: + conn.close() + + +if __name__ == "__main__": + raise SystemExit(main())