Files
my-mom-system/prd/db/sync_data_model_from_excel.py
panchengyong c28ada5050 commit content
2026-03-06 02:02:59 +08:00

276 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
从 mom系统数据库设计.xlsx 读取表与字段定义,更新 mom系统数据库设计-数据模型.md。
保留 md 的 1-3 节(概述、模块清单、公共字段)和 14 节及以后枚举、ER、附录等
用 Excel 内容更新 2.2 表清单总览 和 4-13 节各模块表结构。
"""
import re
from pathlib import Path
try:
import openpyxl
except ImportError:
raise SystemExit("请安装 openpyxl: pip install openpyxl")
# 路径
BASE = Path(__file__).resolve().parent
XLSX_PATH = BASE / "mom系统数据库设计.xlsx"
MD_PATH = BASE / "mom系统数据库设计-数据模型.md"
# Excel 表清单中的「模块」-> 文档中的 (节号, 模块标题, 表前缀)
# 节号与原文一致: 4基础数据 5生产 6质量 7设备 8仓库 9排班 10IOT 11工具 12系统 13打印
MODULE_DOC = {
"主数据管理": (4, "基础数据模块", "md_*"),
"生产管理": (5, "生产管理模块", "pro_*"),
"质量管理": (6, "质量管理模块", "qc_*"),
"设备管理": (7, "设备管理模块", "dv_*"),
"仓储管理": (8, "仓库管理模块", "wm_*"),
"排班管理": (9, "排班管理模块", "cal_*"),
"IoT数据采集": (10, "IoT数据采集模块", "iot_*"),
"工装夹具管理": (11, "工具管理模块", "tm_*"),
"系统管理": (12, "系统管理模块", "sys_*"),
"打印机配置": (13, "打印管理模块", "print_*"),
}
# Excel 工作表名 -> 表清单中的模块名
SHEET_TO_MODULE = {
"MD主数据": "主数据管理",
"PRO生产管理": "生产管理",
"CAL排班管理": "排班管理",
"QC质量管理": "质量管理",
"WM仓储管理": "仓储管理",
"DV设备管理": "设备管理",
"TM工装夹具管理": "工装夹具管理",
"MO模具管理": None,
"SYS系统管理": "系统管理",
"PRINT打印管理": "打印机配置",
"IOT数采": "IoT数据采集",
}
def load_table_list(wb):
"""从「表清单」sheet 加载 (表名, 模块, 表描述, 备注),并建 (模块, 表描述)->表名 映射"""
ws = wb["表清单"]
rows = list(ws.iter_rows(min_row=2, values_only=True))
table_list = []
desc_to_name = {}
for row in rows:
if not row or not row[1]:
continue
name = (row[1] or "").strip()
module = (row[2] or "").strip()
desc = (row[3] or "").strip()
remark = (row[4] or "").strip() if len(row) > 4 else ""
if not name:
continue
table_list.append((name, module, desc, remark))
if module and desc:
key = (module, desc)
if key not in desc_to_name:
desc_to_name[key] = name
return table_list, desc_to_name
def parse_sheet_tables(ws, module_name, desc_to_name):
"""
解析一个 sheet 中所有表块。
每个表块:一行 表名称 + 一行 中文名称 + 一行 列序号|列名|列中文名|列类型|列设置|备注 + 数据行。
返回 [(table_name, cn_name, remark, columns), ...]
"""
rows = list(ws.iter_rows(values_only=True))
result = []
i = 0
while i < len(rows):
row = list(rows[i]) if rows[i] else []
# 找「表名称」在第二列
if len(row) >= 3 and row[1] == "表名称" and row[2]:
excel_table_name = (row[2] or "").strip()
cn_name = ""
if i + 1 < len(rows):
next_row = list(rows[i + 1]) if rows[i + 1] else []
if len(next_row) >= 3 and next_row[1] == "中文名称":
cn_name = (next_row[2] or "").strip()
# 表名解析:优先用 表清单 中 (模块, 中文名称) 对应的表名
if module_name and cn_name and (module_name, cn_name) in desc_to_name:
table_name = desc_to_name[(module_name, cn_name)]
else:
table_name = excel_table_name if excel_table_name and len(excel_table_name) > 2 else (cn_name or excel_table_name)
# 列头在 i+2
col_start = i + 2
if col_start >= len(rows):
i += 1
continue
header = list(rows[col_start]) if rows[col_start] else []
if len(header) < 5 or header[1] != "列序号":
i += 1
continue
columns = []
for j in range(col_start + 1, len(rows)):
r = list(rows[j]) if rows[j] else []
if len(r) < 4:
break
col_no, col_name, col_cn, col_type, col_setting, col_remark = (
r[1], r[2] if len(r) > 2 else "", r[3] if len(r) > 3 else "",
r[4] if len(r) > 4 else "", r[5] if len(r) > 5 else "", r[6] if len(r) > 6 else ""
)
if col_name is None or not str(col_name).strip():
break
col_name = str(col_name).strip()
col_type = (col_type or "").strip()
nullable = "N" if col_setting and "not null" in str(col_setting).lower() else "Y"
desc = (col_cn or "").strip()
if col_remark and str(col_remark).strip():
desc = desc + " " + str(col_remark).strip()
columns.append({
"name": col_name.lower(),
"type": col_type or "-",
"nullable": nullable,
"desc": desc or "-",
})
result.append((table_name, cn_name, "", columns))
i = col_start + len(columns) + 1
continue
i += 1
return result
def collect_all_tables(wb, table_list, desc_to_name):
"""从各数据 sheet 收集表,按模块归类。返回 { 模块名: [(表名, 中文名, 备注, columns), ...] }"""
by_module = {}
for sheet_name in wb.sheetnames:
if sheet_name == "表清单":
continue
module_name = SHEET_TO_MODULE.get(sheet_name)
if not module_name or module_name not in MODULE_DOC:
continue
ws = wb[sheet_name]
tables = parse_sheet_tables(ws, module_name, desc_to_name)
for t in tables:
by_module.setdefault(module_name, []).append(t)
return by_module
def table_name_to_lower(name):
"""将 Excel 表名转为小写(如 WM_WAREHOUSE -> wm_warehouse"""
if not name:
return name
return name.strip().lower()
def gen_table_list_md(table_list):
"""生成 2.2 表清单总览 markdown"""
module_order = list(MODULE_DOC.keys())
# 模块简写(用于表格第三列)
module_short = {
"主数据管理": "基础数据",
"生产管理": "生产管理",
"质量管理": "质量管理",
"设备管理": "设备管理",
"仓储管理": "仓库管理",
"排班管理": "排班管理",
"IoT数据采集": "IoT数据采集",
"工装夹具管理": "工具管理",
"系统管理": "系统管理",
"打印机配置": "打印管理",
}
lines = [
"### 2.2 表清单总览",
"",
"| 序号 | 表名 | 中文名 | 模块 |",
"|------|------|--------|------|",
]
for idx, (name, module, desc, _) in enumerate(table_list, 1):
short = module_short.get(module, module)
cn_display = (desc or "").strip() or name # 表描述为空时用表名
lines.append(f"| {idx} | {table_name_to_lower(name)} | {cn_display} | {short} |")
lines.append("")
return "\n".join(lines)
def gen_module_section(module_name, tables, section_num):
"""生成一个模块的 markdown## 4. 基础数据模块 及下属 ### 4.x 表)"""
if not tables:
return ""
title = MODULE_DOC[module_name][1]
lines = [
f"## {section_num}. {title}",
"",
]
for idx, (table_name, cn_name, remark, columns) in enumerate(tables, 1):
tname_lower = table_name_to_lower(table_name)
lines.append(f"### {section_num}.{idx} {cn_name or table_name} ({tname_lower})")
lines.append("")
if remark:
lines.append(f"> {remark}")
lines.append("")
lines.append("| 字段名 | 类型 | 允许空 | 说明 |")
lines.append("|--------|------|--------|------|")
for col in columns:
lines.append(f"| {col['name']} | {col['type']} | {col['nullable']} | {col['desc']} |")
lines.append("")
lines.append("---")
lines.append("")
return "\n".join(lines)
def main():
if not XLSX_PATH.exists():
raise SystemExit(f"Excel 文件不存在: {XLSX_PATH}")
if not MD_PATH.exists():
raise SystemExit(f"Markdown 文件不存在: {MD_PATH}")
wb = openpyxl.load_workbook(XLSX_PATH, read_only=True, data_only=True)
table_list, desc_to_name = load_table_list(wb)
by_module = collect_all_tables(wb, table_list, desc_to_name)
wb.close()
# 生成 2.2 表清单
table_list_md = gen_table_list_md(table_list)
# 生成 4-12 节(按 MODULE_DOC 顺序)
module_sections = []
for mod_name in MODULE_DOC:
if mod_name not in by_module:
continue
num, _, _ = MODULE_DOC[mod_name]
module_sections.append(gen_module_section(mod_name, by_module[mod_name], num))
new_content_4_13 = "\n".join(module_sections)
# 读取现有 md
md_text = MD_PATH.read_text(encoding="utf-8")
# 替换 2.2 表清单总览:从 ### 2.2 表清单总览 到下一个 ## 之前
def replace_2_2(content):
start = content.find("### 2.2 表清单总览")
if start == -1:
return content
end = content.find("\n## ", start + 1)
if end == -1:
end = len(content)
return content[:start] + table_list_md.rstrip() + "\n\n" + content[end:]
md_text = replace_2_2(md_text)
# 替换 ## 4. ... 到 ## 14. 之前
start_marker = "## 4. 基础数据模块"
end_marker = "\n## 14. 枚举值定义"
start = md_text.find(start_marker)
end = md_text.find(end_marker)
if start == -1:
raise SystemExit("未找到「## 4. 基础数据模块」")
if end == -1:
end = len(md_text)
md_text = md_text[:start] + new_content_4_13.rstrip() + "\n\n" + md_text[end:]
MD_PATH.write_text(md_text, encoding="utf-8")
print(f"已更新: {MD_PATH}")
print(f" - 表清单: {len(table_list)} 张表")
for mod_name, tbls in by_module.items():
print(f" - {mod_name}: {len(tbls)} 张表")
if __name__ == "__main__":
main()