#!/usr/bin/env python3 """Run data cleanup for docs/com-sqszx202-data-imgration.md. Default mode is a read-only dry run. Use --execute to create a local SQL backup, delete rows according to the migration document, and commit the transaction. """ from __future__ import annotations import argparse import gzip import json import re import sys from datetime import datetime from pathlib import Path from typing import Any import pymysql from pymysql.cursors import SSCursor ROOT = Path(__file__).resolve().parents[2] DOC = ROOT / "docs" / "com-sqszx202-data-imgration.md" DEFAULT_DUMP = Path("/Users/mac/Works26/miao-july/宿迁盛泽鑫/anpengran-yangtangyoupin_2026-06-14_02-15-02_mysql_data.sql") CUTOFF = "2026-06-12 00:00:00" TABLES = [ "wa_order", "wa_withdraw", "eb_store_order", "wa_merchandise", "wa_selfbonus_log", "wa_sharebonus_log", "wa_coupon_log", "eb_user_integral_record", "eb_user", "wa_users", ] FILTERS = { "wa_users": "id", "eb_user": "uid", "wa_selfbonus_log": "user_id", "wa_sharebonus_log": "user_id", "wa_coupon_log": "user_id", "eb_user_integral_record": "uid", } CLEAR_TABLES = ["wa_order", "wa_withdraw", "eb_store_order"] def parse_doc() -> tuple[dict[str, str], list[int]]: text = DOC.read_text(encoding="utf-8") def grab(name: str) -> str: m = re.search(rf"^\s*{name}:\s*(.+?)\s*$", text, flags=re.M) if not m: raise ValueError(f"missing datasource {name} in {DOC}") return m.group(1).strip() ids_match = re.search(r"保留名单:\s*\n\s*`([^`]+)`", text) if not ids_match: raise ValueError(f"missing user id keep list in {DOC}") ids = [int(x.strip()) for x in ids_match.group(1).split(",") if x.strip()] if len(ids) != len(set(ids)): raise ValueError("duplicate ids in keep list") config = { "host": grab("rds"), "database": grab("name"), "user": grab("username"), "password": grab("password"), } return config, ids def split_top_level_tuples(values_blob: str) -> list[str]: out: list[str] = [] i = 0 n = len(values_blob) while i < n: if values_blob[i] != "(": i += 1 continue depth = 0 in_quote = False start = i j = i while j < n: c = values_blob[j] if in_quote: if c == "\\": j += 2 continue if c == "'": if j + 1 < n and values_blob[j + 1] == "'": j += 2 continue in_quote = False j += 1 continue if c == "'": in_quote = True elif c == "(": depth += 1 elif c == ")": depth -= 1 if depth == 0: out.append(values_blob[start : j + 1]) j += 1 break j += 1 i = j return out def split_mysql_fields(inner: str) -> list[str]: out: list[str] = [] cur: list[str] = [] i = 0 n = len(inner) while i < n: c = inner[i] if c == "'": cur.append(c) i += 1 while i < n: c = inner[i] cur.append(c) if c == "\\": if i + 1 < n: cur.append(inner[i + 1]) i += 2 continue if c == "'": if i + 1 < n and inner[i + 1] == "'": cur.append(inner[i + 1]) i += 2 continue i += 1 break i += 1 continue if c == ",": out.append("".join(cur).strip()) cur = [] i += 1 continue cur.append(c) i += 1 out.append("".join(cur).strip()) return out def unquote_sql_string(raw: str) -> str: raw = raw.strip() if raw.startswith("'") and raw.endswith("'"): body = raw[1:-1] body = body.replace("''", "'") body = body.replace("\\'", "'").replace("\\\\", "\\") return body return raw def extract_wa_merchandise_keep_ids(dump: Path, keep_users: set[int]) -> list[int]: if not dump.is_file(): raise FileNotFoundError(f"dump not found: {dump}") keep_ids: set[int] = set() total_rows = 0 insert_lines = 0 with dump.open("r", encoding="utf-8", errors="replace") as f: for line in f: if "INSERT INTO `wa_merchandise`" not in line or "VALUES" not in line: continue insert_lines += 1 blob = line[line.index("VALUES") + len("VALUES") :].strip() if blob.endswith(";"): blob = blob[:-1].strip() for tup in split_top_level_tuples(blob): total_rows += 1 fields = split_mysql_fields(tup.strip()[1:-1]) if len(fields) < 10: raise ValueError(f"malformed wa_merchandise tuple: {tup[:120]}") row_id = int(fields[0]) user_id = int(fields[2]) created_at = unquote_sql_string(fields[8]) if created_at >= CUTOFF and user_id in keep_users: keep_ids.add(row_id) if insert_lines == 0: raise ValueError("no INSERT INTO `wa_merchandise` found in dump") print(f"dump_wa_merchandise_rows={total_rows} keep_by_rule={len(keep_ids)}") return sorted(keep_ids) def connect(config: dict[str, str], cursorclass=None): kwargs = { "host": config["host"], "user": config["user"], "password": config["password"], "database": config["database"], "charset": "utf8mb4", "autocommit": False, "read_timeout": 120, "write_timeout": 120, } if cursorclass is not None: kwargs["cursorclass"] = cursorclass return pymysql.connect(**kwargs) def placeholders(items: list[int] | set[int]) -> str: if not items: return "NULL" return ",".join(["%s"] * len(items)) def count_where(cur, table: str, where: str = "1=1", params: tuple[Any, ...] = ()) -> int: cur.execute(f"SELECT COUNT(*) FROM `{table}` WHERE {where}", params) return int(cur.fetchone()[0]) def inspect_schema(cur) -> None: cur.execute("SELECT DATABASE()") database = cur.fetchone()[0] if database != "sqszx202": raise RuntimeError(f"refusing to run against database {database!r}") for table in TABLES: cur.execute("SHOW TABLES LIKE %s", (table,)) if not cur.fetchone(): raise RuntimeError(f"missing table `{table}`") required = { "wa_users": {"id"}, "eb_user": {"uid"}, "wa_order": set(), "wa_withdraw": set(), "eb_store_order": set(), "wa_merchandise": {"id", "user_id", "created_at"}, "wa_selfbonus_log": {"user_id"}, "wa_sharebonus_log": {"user_id"}, "wa_coupon_log": {"user_id"}, "eb_user_integral_record": {"uid"}, } for table, cols in required.items(): if not cols: continue cur.execute(f"SHOW COLUMNS FROM `{table}`") actual = {row[0] for row in cur.fetchall()} missing = cols - actual if missing: raise RuntimeError(f"table `{table}` missing columns: {sorted(missing)}") def collect_counts(cur, keep_users: list[int], keep_merchandise_ids: list[int]) -> dict[str, dict[str, int]]: counts: dict[str, dict[str, int]] = {} user_clause = placeholders(keep_users) merch_clause = placeholders(keep_merchandise_ids) for table in CLEAR_TABLES: total = count_where(cur, table) counts[table] = {"before": total, "keep": 0, "delete": total} for table, col in FILTERS.items(): total = count_where(cur, table) keep = count_where(cur, table, f"`{col}` IN ({user_clause})", tuple(keep_users)) counts[table] = {"before": total, "keep": keep, "delete": total - keep} total = count_where(cur, "wa_merchandise") keep = ( count_where(cur, "wa_merchandise", f"`id` IN ({merch_clause})", tuple(keep_merchandise_ids)) if keep_merchandise_ids else 0 ) counts["wa_merchandise"] = {"before": total, "keep": keep, "delete": total - keep} return counts def print_counts(title: str, counts: dict[str, dict[str, int]]) -> None: print(title) for table in TABLES: c = counts[table] print(f" {table}: before={c['before']} keep={c['keep']} delete={c['delete']}") def backup_tables(config: dict[str, str], backup_path: Path) -> None: backup_path.parent.mkdir(parents=True, exist_ok=True) backup_conn = connect(config, cursorclass=SSCursor) try: with gzip.open(backup_path, "wt", encoding="utf-8") as out: out.write("-- sqszx202 cleanup backup\n") out.write(f"-- created_at: {datetime.now().isoformat(timespec='seconds')}\n") out.write("SET NAMES utf8mb4;\n") for table in TABLES: with backup_conn.cursor() as cur: cur.execute(f"SHOW CREATE TABLE `{table}`") row = cur.fetchone() create_sql = row[1] out.write(f"\n-- Table `{table}`\n") out.write(f"DROP TABLE IF EXISTS `{table}`;\n") out.write(create_sql + ";\n") with backup_conn.cursor() as cur: cur.execute(f"SELECT * FROM `{table}`") batch: list[str] = [] row_count = 0 for row in cur: batch.append("(" + ",".join(backup_conn.literal(v) for v in row) + ")") row_count += 1 if len(batch) >= 200: out.write(f"INSERT INTO `{table}` VALUES\n") out.write(",\n".join(batch) + ";\n") batch = [] if batch: out.write(f"INSERT INTO `{table}` VALUES\n") out.write(",\n".join(batch) + ";\n") print(f"backup {table}: rows={row_count}") finally: backup_conn.close() def execute_cleanup(cur, keep_users: list[int], keep_merchandise_ids: list[int]) -> dict[str, int]: user_clause = placeholders(keep_users) merch_clause = placeholders(keep_merchandise_ids) deleted: dict[str, int] = {} cur.execute("SET FOREIGN_KEY_CHECKS = 0") for table in CLEAR_TABLES: cur.execute(f"DELETE FROM `{table}`") deleted[table] = cur.rowcount cur.execute(f"DELETE FROM `wa_merchandise` WHERE `id` NOT IN ({merch_clause})", tuple(keep_merchandise_ids)) deleted["wa_merchandise"] = cur.rowcount for table in ["wa_selfbonus_log", "wa_sharebonus_log", "wa_coupon_log"]: cur.execute(f"DELETE FROM `{table}` WHERE `user_id` NOT IN ({user_clause})", tuple(keep_users)) deleted[table] = cur.rowcount cur.execute(f"DELETE FROM `eb_user_integral_record` WHERE `uid` NOT IN ({user_clause})", tuple(keep_users)) deleted["eb_user_integral_record"] = cur.rowcount cur.execute(f"DELETE FROM `eb_user` WHERE `uid` NOT IN ({user_clause})", tuple(keep_users)) deleted["eb_user"] = cur.rowcount cur.execute(f"DELETE FROM `wa_users` WHERE `id` NOT IN ({user_clause})", tuple(keep_users)) deleted["wa_users"] = cur.rowcount cur.execute("SET FOREIGN_KEY_CHECKS = 1") return deleted def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--execute", action="store_true", help="perform DELETEs and COMMIT") parser.add_argument("--dump", type=Path, default=DEFAULT_DUMP) parser.add_argument("--backup-dir", type=Path, default=ROOT / "docs" / "sql" / "backups") args = parser.parse_args() config, keep_users = parse_doc() keep_merchandise_ids = extract_wa_merchandise_keep_ids(args.dump, set(keep_users)) print(f"keep_user_ids={len(keep_users)} keep_wa_merchandise_ids={len(keep_merchandise_ids)}") conn = connect(config) try: with conn.cursor() as cur: inspect_schema(cur) before = collect_counts(cur, keep_users, keep_merchandise_ids) print_counts("before_counts", before) if not args.execute: print("dry_run_only=true") conn.rollback() return 0 stamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = args.backup_dir / f"sqszx202_cleanup_before_{stamp}.sql.gz" print(f"backup_path={backup_path}") backup_tables(config, backup_path) with conn.cursor() as cur: deleted = execute_cleanup(cur, keep_users, keep_merchandise_ids) after = collect_counts(cur, keep_users, keep_merchandise_ids) print("deleted_rows") print(json.dumps(deleted, ensure_ascii=False, indent=2)) print_counts("after_counts_before_commit", after) conn.commit() print("COMMIT ok") with conn.cursor() as cur: final_counts = collect_counts(cur, keep_users, keep_merchandise_ids) print_counts("final_counts", final_counts) return 0 except Exception as e: try: with conn.cursor() as cur: cur.execute("SET FOREIGN_KEY_CHECKS = 1") except Exception: pass conn.rollback() print(f"ROLLBACK: {e}", file=sys.stderr) raise finally: conn.close() if __name__ == "__main__": raise SystemExit(main())