#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 从 mom-db-260206.sql 解析 CREATE TABLE 定义,更新 mom系统数据库设计-数据模型.md。 用实际 DDL 中的表、字段、类型、允许空、注释 覆盖文档中的 2.2 表清单 和 4-13 节表结构。 """ import re from pathlib import Path from collections import OrderedDict BASE = Path(__file__).resolve().parent SQL_PATH = BASE / "mom-db-260206.sql" MD_PATH = BASE / "mom系统数据库设计-数据模型.md" # 表前缀 -> (节号, 模块标题),仅 4-13 与文档一致,erp_/mo_ 等归入“其他”不单独成节 PREFIX_TO_MODULE = [ ("md_", 4, "基础数据模块"), ("pro_", 5, "生产管理模块"), ("qc_", 6, "质量管理模块"), ("dv_", 7, "设备管理模块"), ("wm_", 8, "仓库管理模块"), ("cal_", 9, "排班管理模块"), ("iot_", 10, "IoT数据采集模块"), ("tm_", 11, "工具管理模块"), ("sys_", 12, "系统管理模块"), ("print_", 13, "打印管理模块"), ("erp_", None, None), ("mo_", None, None), ("gen_", None, None), ("report_", None, None), ] def table_module(table_name): """返回 (节号, 模块标题) 或 (None, None)""" t = table_name.lower() for prefix, num, title in PREFIX_TO_MODULE: if num is not None and t.startswith(prefix): return (num, title) return (None, None) def parse_sql(path): """解析 SQL 文件,返回 [(table_name, table_comment, columns), ...].""" text = path.read_text(encoding="utf-8") tables = [] i = 0 while True: i = text.find("CREATE TABLE ", i) if i == -1: break # 表名: CREATE TABLE `name` ( name_m = re.match(r"CREATE\s+TABLE\s+`([^`]+)`\s*\(", text[i:], re.IGNORECASE) if not name_m: i += 1 continue name = name_m.group(1).strip() paren_start = i + name_m.end() - 1 # position of '(' # 找匹配的 ')' depth = 1 pos = paren_start + 1 while pos < len(text) and depth > 0: if text[pos] == "(": depth += 1 elif text[pos] == ")": depth -= 1 pos += 1 body = text[paren_start + 1 : pos - 1] # 表注释: ) ENGINE=... COMMENT='...' rest = text[pos - 1 : pos + 200] comment_m = re.search(r"COMMENT\s*=\s*'([^']*)'", rest) table_comment = comment_m.group(1).strip() if comment_m else "" columns = parse_columns(body) tables.append((name, table_comment, columns)) i = pos return tables def parse_columns(body): """从 CREATE TABLE 体解析列: `name` type NOT NULL ... COMMENT '...', 跳过 PRIMARY KEY/KEY/UNIQUE.""" lines = [ln.strip() for ln in body.split("\n") if ln.strip()] cols = [] for line in lines: if line.startswith("PRIMARY KEY") or line.startswith("KEY ") or line.startswith("UNIQUE KEY"): continue if not line.startswith("`"): continue # 列行: `col_name` type [NOT NULL] [NULL] [AUTO_INCREMENT] [DEFAULT ...] [COMMENT '...'], match = re.match(r"`([^`]+)`\s+([a-z]+(?:\([^)]+\))?)\s*(NOT NULL|NULL)?", line, re.IGNORECASE) if not match: continue col_name = match.group(1) col_type = match.group(2) not_null = match.group(3) nullable = "N" if (not_null and not_null.upper() == "NOT NULL") else "Y" # COMMENT: 找 COMMENT '...',注意可能 '' 转义 comment_match = re.search(r"COMMENT\s+'((?:[^']|'')*)'", line) comment = comment_match.group(1).replace("''", "'").strip() if comment_match else "-" cols.append({"name": col_name, "type": col_type, "nullable": nullable, "comment": comment or "-"}) return cols def build_table_list_and_sections(tables): """按模块分组,生成 2.2 表清单和 4-13 节内容。仅包含 4-13 模块表;其他表(erp_/mo_/gen_ 等)仍入表清单模块「其他」.""" by_module = OrderedDict() table_list = [] for name, comment, columns in tables: num, title = table_module(name) if num is not None: by_module.setdefault((num, title), []).append((name, comment, columns)) table_list.append((name, comment, num, title)) else: if name.startswith(("gen_", "report_", "ureport_", "test_", "db_test")): continue table_list.append((name, comment, 99, "其他")) table_list.sort(key=lambda x: (x[2], x[0])) return table_list, by_module def gen_table_list_md(table_list): """生成 2.2 表清单总览 markdown.""" module_short = { 4: "基础数据", 5: "生产管理", 6: "质量管理", 7: "设备管理", 8: "仓库管理", 9: "排班管理", 10: "IoT数据采集", 11: "工具管理", 12: "系统管理", 13: "打印管理", 99: "其他", } lines = ["### 2.2 表清单总览", "", "| 序号 | 表名 | 中文名 | 模块 |", "|------|------|--------|------|"] for idx, (name, comment, num, title) in enumerate(table_list, 1): short = module_short.get(num, title or "其他") cn = comment or name lines.append(f"| {idx} | {name} | {cn} | {short} |") lines.append("") return "\n".join(lines) def gen_module_section(module_num, title, tables): """生成一个模块的 markdown.""" lines = [f"## {module_num}. {title}", ""] for idx, (name, comment, columns) in enumerate(tables, 1): lines.append(f"### {module_num}.{idx} {comment or name} ({name})") lines.append("") lines.append("| 字段名 | 类型 | 允许空 | 说明 |") lines.append("|--------|------|--------|------|") for col in columns: lines.append(f"| {col['name']} | {col['type']} | {col['nullable']} | {col['comment']} |") lines.append("") lines.append("---") lines.append("") return "\n".join(lines) def main(): if not SQL_PATH.exists(): raise SystemExit(f"SQL 文件不存在: {SQL_PATH}") if not MD_PATH.exists(): raise SystemExit(f"Markdown 文件不存在: {MD_PATH}") tables = parse_sql(SQL_PATH) table_list, by_module = build_table_list_and_sections(tables) table_list_md = gen_table_list_md(table_list) section_parts = [] for (num, title), tbls in sorted(by_module.items(), key=lambda x: x[0][0]): section_parts.append(gen_module_section(num, title, tbls)) new_content_4_13 = "\n".join(section_parts) md_text = MD_PATH.read_text(encoding="utf-8") # 替换 2.2 表清单总览 start_22 = md_text.find("### 2.2 表清单总览") if start_22 != -1: end_22 = md_text.find("\n## ", start_22 + 1) if end_22 == -1: end_22 = len(md_text) md_text = md_text[:start_22] + table_list_md.rstrip() + "\n\n" + md_text[end_22:] # 替换 ## 4. 基础数据模块 到 ## 14. 枚举值定义 之间的内容 # 若新增了 14 ERP、15 模具,则替换到 “## 15. 枚举值定义” 或原 “## 14. 枚举值定义” 前 start_marker = "## 4. 基础数据模块" end_marker = "\n## 14. 枚举值定义" start = md_text.find(start_marker) end = md_text.find(end_marker) if start == -1: raise SystemExit("未找到「## 4. 基础数据模块」") if end == -1: end = len(md_text) md_text = md_text[:start] + new_content_4_13.rstrip() + "\n\n" + md_text[end:] MD_PATH.write_text(md_text, encoding="utf-8") print(f"已更新: {MD_PATH}") print(f" - 解析表数: {len(tables)}") print(f" - 表清单: {len(table_list)} 张") for (num, title), tbls in sorted(by_module.items(), key=lambda x: x[0][0]): print(f" - {num}. {title}: {len(tbls)} 张表") if __name__ == "__main__": main()