Files
integral-shop/docs/sql/run_com_bygsf212_cleanup.py
2026-06-15 09:33:00 +08:00

402 lines
14 KiB
Python

#!/usr/bin/env python3
"""Run data cleanup for docs/com-bygsf212-data-imgration.md.
Default mode is a read-only dry run. Use --execute to create a local SQL backup,
delete rows according to the migration document, and commit the transaction.
"""
from __future__ import annotations
import argparse
import gzip
import json
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
import pymysql
from pymysql.cursors import SSCursor
ROOT = Path(__file__).resolve().parents[2]
DOC = ROOT / "docs" / "com-bygsf212-data-imgration.md"
DEFAULT_DUMPS = [
Path("/Users/mac/Works26/miao-july/宝应鼎信汇/bsy-yangtangyoupin_2026-06-14_14-25-01_mysql_data.sql"),
Path("/Users/mac/Works26/miao-july/宝应鼎信汇/jyw-yangtangyoupin_2026-06-14_14-55-01_mysql_data.sql"),
]
CUTOFF = "2026-06-12 00:00:00"
EXPECTED_DATABASE = "bygsf212"
TABLES = [
"wa_order",
"wa_withdraw",
"eb_store_order",
"wa_merchandise",
"wa_selfbonus_log",
"wa_sharebonus_log",
"wa_coupon_log",
"eb_user_integral_record",
"eb_user",
"wa_users",
]
FILTERS = {
"wa_users": "id",
"eb_user": "uid",
"wa_selfbonus_log": "user_id",
"wa_sharebonus_log": "user_id",
"wa_coupon_log": "user_id",
"eb_user_integral_record": "uid",
}
CLEAR_TABLES = ["wa_order", "wa_withdraw", "eb_store_order"]
def parse_doc() -> tuple[dict[str, str], list[int]]:
text = DOC.read_text(encoding="utf-8")
def grab(name: str) -> str:
m = re.search(rf"^\s*{name}:\s*(.+?)\s*$", text, flags=re.M)
if not m:
raise ValueError(f"missing datasource {name} in {DOC}")
return m.group(1).strip()
ids_match = re.search(r"保留名单:\s*\n\s*`([^`]+)`", text)
if not ids_match:
raise ValueError(f"missing user id keep list in {DOC}")
ids = [int(x.strip()) for x in ids_match.group(1).split(",") if x.strip()]
if len(ids) != len(set(ids)):
raise ValueError("duplicate ids in keep list")
config = {
"host": grab("rds"),
"database": grab("name"),
"user": grab("username"),
"password": grab("password"),
}
return config, ids
def split_top_level_tuples(values_blob: str) -> list[str]:
out: list[str] = []
i = 0
n = len(values_blob)
while i < n:
if values_blob[i] != "(":
i += 1
continue
depth = 0
in_quote = False
start = i
j = i
while j < n:
c = values_blob[j]
if in_quote:
if c == "\\":
j += 2
continue
if c == "'":
if j + 1 < n and values_blob[j + 1] == "'":
j += 2
continue
in_quote = False
j += 1
continue
if c == "'":
in_quote = True
elif c == "(":
depth += 1
elif c == ")":
depth -= 1
if depth == 0:
out.append(values_blob[start : j + 1])
j += 1
break
j += 1
i = j
return out
def split_mysql_fields(inner: str) -> list[str]:
out: list[str] = []
cur: list[str] = []
i = 0
n = len(inner)
while i < n:
c = inner[i]
if c == "'":
cur.append(c)
i += 1
while i < n:
c = inner[i]
cur.append(c)
if c == "\\":
if i + 1 < n:
cur.append(inner[i + 1])
i += 2
continue
if c == "'":
if i + 1 < n and inner[i + 1] == "'":
cur.append(inner[i + 1])
i += 2
continue
i += 1
break
i += 1
continue
if c == ",":
out.append("".join(cur).strip())
cur = []
i += 1
continue
cur.append(c)
i += 1
out.append("".join(cur).strip())
return out
def unquote_sql_string(raw: str) -> str:
raw = raw.strip()
if raw.startswith("'") and raw.endswith("'"):
body = raw[1:-1]
body = body.replace("''", "'")
body = body.replace("\\'", "'").replace("\\\\", "\\")
return body
return raw
def extract_wa_merchandise_keep_ids(dumps: list[Path], keep_users: set[int]) -> list[int]:
keep_ids: set[int] = set()
for dump in dumps:
if not dump.is_file():
raise FileNotFoundError(f"dump not found: {dump}")
total_rows = 0
insert_lines = 0
file_keep_ids: set[int] = set()
with dump.open("r", encoding="utf-8", errors="replace") as f:
for line in f:
if "INSERT INTO `wa_merchandise`" not in line or "VALUES" not in line:
continue
insert_lines += 1
blob = line[line.index("VALUES") + len("VALUES") :].strip()
if blob.endswith(";"):
blob = blob[:-1].strip()
for tup in split_top_level_tuples(blob):
total_rows += 1
fields = split_mysql_fields(tup.strip()[1:-1])
if len(fields) < 9:
raise ValueError(f"malformed wa_merchandise tuple: {tup[:120]}")
row_id = int(fields[0])
user_id = int(fields[2])
created_at = unquote_sql_string(fields[8])
if created_at >= CUTOFF and user_id in keep_users:
file_keep_ids.add(row_id)
if insert_lines == 0:
raise ValueError(f"no INSERT INTO `wa_merchandise` found in dump: {dump}")
keep_ids.update(file_keep_ids)
print(f"dump={dump.name} wa_merchandise_rows={total_rows} keep_by_rule={len(file_keep_ids)}")
return sorted(keep_ids)
def connect(config: dict[str, str], cursorclass=None):
kwargs = {
"host": config["host"],
"user": config["user"],
"password": config["password"],
"database": config["database"],
"charset": "utf8mb4",
"autocommit": False,
"connect_timeout": 10,
"read_timeout": 120,
"write_timeout": 120,
}
if cursorclass is not None:
kwargs["cursorclass"] = cursorclass
return pymysql.connect(**kwargs)
def placeholders(items: list[int] | set[int]) -> str:
if not items:
return "NULL"
return ",".join(["%s"] * len(items))
def count_where(cur, table: str, where: str = "1=1", params: tuple[Any, ...] = ()) -> int:
cur.execute(f"SELECT COUNT(*) FROM `{table}` WHERE {where}", params)
return int(cur.fetchone()[0])
def inspect_schema(cur) -> None:
cur.execute("SELECT DATABASE()")
database = cur.fetchone()[0]
if database != EXPECTED_DATABASE:
raise RuntimeError(f"refusing to run against database {database!r}")
for table in TABLES:
cur.execute("SHOW TABLES LIKE %s", (table,))
if not cur.fetchone():
raise RuntimeError(f"missing table `{table}`")
required = {
"wa_users": {"id"},
"eb_user": {"uid"},
"wa_order": set(),
"wa_withdraw": set(),
"eb_store_order": set(),
"wa_merchandise": {"id", "user_id", "created_at"},
"wa_selfbonus_log": {"user_id"},
"wa_sharebonus_log": {"user_id"},
"wa_coupon_log": {"user_id"},
"eb_user_integral_record": {"uid"},
}
for table, cols in required.items():
if not cols:
continue
cur.execute(f"SHOW COLUMNS FROM `{table}`")
actual = {row[0] for row in cur.fetchall()}
missing = cols - actual
if missing:
raise RuntimeError(f"table `{table}` missing columns: {sorted(missing)}")
def collect_counts(cur, keep_users: list[int], keep_merchandise_ids: list[int]) -> dict[str, dict[str, int]]:
counts: dict[str, dict[str, int]] = {}
user_clause = placeholders(keep_users)
merch_clause = placeholders(keep_merchandise_ids)
for table in CLEAR_TABLES:
total = count_where(cur, table)
counts[table] = {"before": total, "keep": 0, "delete": total}
for table, col in FILTERS.items():
total = count_where(cur, table)
keep = count_where(cur, table, f"`{col}` IN ({user_clause})", tuple(keep_users))
counts[table] = {"before": total, "keep": keep, "delete": total - keep}
total = count_where(cur, "wa_merchandise")
keep = (
count_where(cur, "wa_merchandise", f"`id` IN ({merch_clause})", tuple(keep_merchandise_ids))
if keep_merchandise_ids
else 0
)
counts["wa_merchandise"] = {"before": total, "keep": keep, "delete": total - keep}
return counts
def print_counts(title: str, counts: dict[str, dict[str, int]]) -> None:
print(title)
for table in TABLES:
c = counts[table]
print(f" {table}: before={c['before']} keep={c['keep']} delete={c['delete']}")
def backup_tables(config: dict[str, str], backup_path: Path) -> None:
backup_path.parent.mkdir(parents=True, exist_ok=True)
backup_conn = connect(config, cursorclass=SSCursor)
try:
with gzip.open(backup_path, "wt", encoding="utf-8") as out:
out.write("-- bygsf212 cleanup backup\n")
out.write(f"-- created_at: {datetime.now().isoformat(timespec='seconds')}\n")
out.write("SET NAMES utf8mb4;\n")
for table in TABLES:
with backup_conn.cursor() as cur:
cur.execute(f"SHOW CREATE TABLE `{table}`")
row = cur.fetchone()
create_sql = row[1]
out.write(f"\n-- Table `{table}`\n")
out.write(f"DROP TABLE IF EXISTS `{table}`;\n")
out.write(create_sql + ";\n")
with backup_conn.cursor() as cur:
cur.execute(f"SELECT * FROM `{table}`")
batch: list[str] = []
row_count = 0
for row in cur:
batch.append("(" + ",".join(backup_conn.literal(v) for v in row) + ")")
row_count += 1
if len(batch) >= 200:
out.write(f"INSERT INTO `{table}` VALUES\n")
out.write(",\n".join(batch) + ";\n")
batch = []
if batch:
out.write(f"INSERT INTO `{table}` VALUES\n")
out.write(",\n".join(batch) + ";\n")
print(f"backup {table}: rows={row_count}")
finally:
backup_conn.close()
def execute_cleanup(cur, keep_users: list[int], keep_merchandise_ids: list[int]) -> dict[str, int]:
user_clause = placeholders(keep_users)
deleted: dict[str, int] = {}
cur.execute("SET FOREIGN_KEY_CHECKS = 0")
for table in CLEAR_TABLES:
cur.execute(f"DELETE FROM `{table}`")
deleted[table] = cur.rowcount
if keep_merchandise_ids:
merch_clause = placeholders(keep_merchandise_ids)
cur.execute(f"DELETE FROM `wa_merchandise` WHERE `id` NOT IN ({merch_clause})", tuple(keep_merchandise_ids))
else:
cur.execute("DELETE FROM `wa_merchandise`")
deleted["wa_merchandise"] = cur.rowcount
for table in ["wa_selfbonus_log", "wa_sharebonus_log", "wa_coupon_log"]:
cur.execute(f"DELETE FROM `{table}` WHERE `user_id` NOT IN ({user_clause})", tuple(keep_users))
deleted[table] = cur.rowcount
cur.execute(f"DELETE FROM `eb_user_integral_record` WHERE `uid` NOT IN ({user_clause})", tuple(keep_users))
deleted["eb_user_integral_record"] = cur.rowcount
cur.execute(f"DELETE FROM `eb_user` WHERE `uid` NOT IN ({user_clause})", tuple(keep_users))
deleted["eb_user"] = cur.rowcount
cur.execute(f"DELETE FROM `wa_users` WHERE `id` NOT IN ({user_clause})", tuple(keep_users))
deleted["wa_users"] = cur.rowcount
cur.execute("SET FOREIGN_KEY_CHECKS = 1")
return deleted
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--execute", action="store_true", help="perform DELETEs and COMMIT")
parser.add_argument("--dump", action="append", type=Path, dest="dumps")
parser.add_argument("--backup-dir", type=Path, default=ROOT / "docs" / "sql" / "backups")
args = parser.parse_args()
dumps = args.dumps if args.dumps else DEFAULT_DUMPS
config, keep_users = parse_doc()
keep_merchandise_ids = extract_wa_merchandise_keep_ids(dumps, set(keep_users))
print(f"keep_user_ids={len(keep_users)} keep_wa_merchandise_ids={len(keep_merchandise_ids)}")
conn = connect(config)
try:
with conn.cursor() as cur:
inspect_schema(cur)
before = collect_counts(cur, keep_users, keep_merchandise_ids)
print_counts("before_counts", before)
if not args.execute:
print("dry_run_only=true")
conn.rollback()
return 0
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = args.backup_dir / f"bygsf212_cleanup_before_{stamp}.sql.gz"
print(f"backup_path={backup_path}")
backup_tables(config, backup_path)
with conn.cursor() as cur:
deleted = execute_cleanup(cur, keep_users, keep_merchandise_ids)
after = collect_counts(cur, keep_users, keep_merchandise_ids)
print("deleted_rows")
print(json.dumps(deleted, ensure_ascii=False, indent=2))
print_counts("after_counts_before_commit", after)
conn.commit()
print("COMMIT ok")
with conn.cursor() as cur:
final_counts = collect_counts(cur, keep_users, keep_merchandise_ids)
print_counts("final_counts", final_counts)
return 0
except Exception as e:
try:
with conn.cursor() as cur:
cur.execute("SET FOREIGN_KEY_CHECKS = 1")
except Exception:
pass
conn.rollback()
print(f"ROLLBACK: {e}", file=sys.stderr)
raise
finally:
conn.close()
if __name__ == "__main__":
raise SystemExit(main())