Files
my-mom-system/prd/db/sync_data_model_from_sql.py
panchengyong c28ada5050 commit content
2026-03-06 02:02:59 +08:00

201 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
从 mom-db-260206.sql 解析 CREATE TABLE 定义,更新 mom系统数据库设计-数据模型.md。
用实际 DDL 中的表、字段、类型、允许空、注释 覆盖文档中的 2.2 表清单 和 4-13 节表结构。
"""
import re
from pathlib import Path
from collections import OrderedDict
BASE = Path(__file__).resolve().parent
SQL_PATH = BASE / "mom-db-260206.sql"
MD_PATH = BASE / "mom系统数据库设计-数据模型.md"
# 表前缀 -> (节号, 模块标题),仅 4-13 与文档一致erp_/mo_ 等归入“其他”不单独成节
PREFIX_TO_MODULE = [
("md_", 4, "基础数据模块"),
("pro_", 5, "生产管理模块"),
("qc_", 6, "质量管理模块"),
("dv_", 7, "设备管理模块"),
("wm_", 8, "仓库管理模块"),
("cal_", 9, "排班管理模块"),
("iot_", 10, "IoT数据采集模块"),
("tm_", 11, "工具管理模块"),
("sys_", 12, "系统管理模块"),
("print_", 13, "打印管理模块"),
("erp_", None, None),
("mo_", None, None),
("gen_", None, None),
("report_", None, None),
]
def table_module(table_name):
"""返回 (节号, 模块标题) 或 (None, None)"""
t = table_name.lower()
for prefix, num, title in PREFIX_TO_MODULE:
if num is not None and t.startswith(prefix):
return (num, title)
return (None, None)
def parse_sql(path):
"""解析 SQL 文件,返回 [(table_name, table_comment, columns), ...]."""
text = path.read_text(encoding="utf-8")
tables = []
i = 0
while True:
i = text.find("CREATE TABLE ", i)
if i == -1:
break
# 表名: CREATE TABLE `name` (
name_m = re.match(r"CREATE\s+TABLE\s+`([^`]+)`\s*\(", text[i:], re.IGNORECASE)
if not name_m:
i += 1
continue
name = name_m.group(1).strip()
paren_start = i + name_m.end() - 1 # position of '('
# 找匹配的 ')'
depth = 1
pos = paren_start + 1
while pos < len(text) and depth > 0:
if text[pos] == "(":
depth += 1
elif text[pos] == ")":
depth -= 1
pos += 1
body = text[paren_start + 1 : pos - 1]
# 表注释: ) ENGINE=... COMMENT='...'
rest = text[pos - 1 : pos + 200]
comment_m = re.search(r"COMMENT\s*=\s*'([^']*)'", rest)
table_comment = comment_m.group(1).strip() if comment_m else ""
columns = parse_columns(body)
tables.append((name, table_comment, columns))
i = pos
return tables
def parse_columns(body):
"""从 CREATE TABLE 体解析列: `name` type NOT NULL ... COMMENT '...', 跳过 PRIMARY KEY/KEY/UNIQUE."""
lines = [ln.strip() for ln in body.split("\n") if ln.strip()]
cols = []
for line in lines:
if line.startswith("PRIMARY KEY") or line.startswith("KEY ") or line.startswith("UNIQUE KEY"):
continue
if not line.startswith("`"):
continue
# 列行: `col_name` type [NOT NULL] [NULL] [AUTO_INCREMENT] [DEFAULT ...] [COMMENT '...'],
match = re.match(r"`([^`]+)`\s+([a-z]+(?:\([^)]+\))?)\s*(NOT NULL|NULL)?", line, re.IGNORECASE)
if not match:
continue
col_name = match.group(1)
col_type = match.group(2)
not_null = match.group(3)
nullable = "N" if (not_null and not_null.upper() == "NOT NULL") else "Y"
# COMMENT: 找 COMMENT '...',注意可能 '' 转义
comment_match = re.search(r"COMMENT\s+'((?:[^']|'')*)'", line)
comment = comment_match.group(1).replace("''", "'").strip() if comment_match else "-"
cols.append({"name": col_name, "type": col_type, "nullable": nullable, "comment": comment or "-"})
return cols
def build_table_list_and_sections(tables):
"""按模块分组,生成 2.2 表清单和 4-13 节内容。仅包含 4-13 模块表其他表erp_/mo_/gen_ 等)仍入表清单模块「其他」."""
by_module = OrderedDict()
table_list = []
for name, comment, columns in tables:
num, title = table_module(name)
if num is not None:
by_module.setdefault((num, title), []).append((name, comment, columns))
table_list.append((name, comment, num, title))
else:
if name.startswith(("gen_", "report_", "ureport_", "test_", "db_test")):
continue
table_list.append((name, comment, 99, "其他"))
table_list.sort(key=lambda x: (x[2], x[0]))
return table_list, by_module
def gen_table_list_md(table_list):
"""生成 2.2 表清单总览 markdown."""
module_short = {
4: "基础数据", 5: "生产管理", 6: "质量管理", 7: "设备管理", 8: "仓库管理",
9: "排班管理", 10: "IoT数据采集", 11: "工具管理", 12: "系统管理", 13: "打印管理",
99: "其他",
}
lines = ["### 2.2 表清单总览", "", "| 序号 | 表名 | 中文名 | 模块 |", "|------|------|--------|------|"]
for idx, (name, comment, num, title) in enumerate(table_list, 1):
short = module_short.get(num, title or "其他")
cn = comment or name
lines.append(f"| {idx} | {name} | {cn} | {short} |")
lines.append("")
return "\n".join(lines)
def gen_module_section(module_num, title, tables):
"""生成一个模块的 markdown."""
lines = [f"## {module_num}. {title}", ""]
for idx, (name, comment, columns) in enumerate(tables, 1):
lines.append(f"### {module_num}.{idx} {comment or name} ({name})")
lines.append("")
lines.append("| 字段名 | 类型 | 允许空 | 说明 |")
lines.append("|--------|------|--------|------|")
for col in columns:
lines.append(f"| {col['name']} | {col['type']} | {col['nullable']} | {col['comment']} |")
lines.append("")
lines.append("---")
lines.append("")
return "\n".join(lines)
def main():
if not SQL_PATH.exists():
raise SystemExit(f"SQL 文件不存在: {SQL_PATH}")
if not MD_PATH.exists():
raise SystemExit(f"Markdown 文件不存在: {MD_PATH}")
tables = parse_sql(SQL_PATH)
table_list, by_module = build_table_list_and_sections(tables)
table_list_md = gen_table_list_md(table_list)
section_parts = []
for (num, title), tbls in sorted(by_module.items(), key=lambda x: x[0][0]):
section_parts.append(gen_module_section(num, title, tbls))
new_content_4_13 = "\n".join(section_parts)
md_text = MD_PATH.read_text(encoding="utf-8")
# 替换 2.2 表清单总览
start_22 = md_text.find("### 2.2 表清单总览")
if start_22 != -1:
end_22 = md_text.find("\n## ", start_22 + 1)
if end_22 == -1:
end_22 = len(md_text)
md_text = md_text[:start_22] + table_list_md.rstrip() + "\n\n" + md_text[end_22:]
# 替换 ## 4. 基础数据模块 到 ## 14. 枚举值定义 之间的内容
# 若新增了 14 ERP、15 模具,则替换到 “## 15. 枚举值定义” 或原 “## 14. 枚举值定义” 前
start_marker = "## 4. 基础数据模块"
end_marker = "\n## 14. 枚举值定义"
start = md_text.find(start_marker)
end = md_text.find(end_marker)
if start == -1:
raise SystemExit("未找到「## 4. 基础数据模块」")
if end == -1:
end = len(md_text)
md_text = md_text[:start] + new_content_4_13.rstrip() + "\n\n" + md_text[end:]
MD_PATH.write_text(md_text, encoding="utf-8")
print(f"已更新: {MD_PATH}")
print(f" - 解析表数: {len(tables)}")
print(f" - 表清单: {len(table_list)}")
for (num, title), tbls in sorted(by_module.items(), key=lambda x: x[0][0]):
print(f" - {num}. {title}: {len(tbls)} 张表")
if __name__ == "__main__":
main()