Initial commit: MSH System\n\n- msh_single_uniapp: Vue 2 + UniApp 前端(微信小程序/H5/App/支付宝小程序)\n- msh_crmeb_22: Spring Boot 2.2 后端(C端API/管理端/业务逻辑)\n- models-integration: AI服务集成(Coze/KieAI/腾讯ASR)\n- docs: 产品文档与设计稿

This commit is contained in:
2026-02-28 05:40:21 +08:00
commit 14d29d51c0
2182 changed files with 482509 additions and 0 deletions

View File

@@ -0,0 +1,373 @@
import requests
from bs4 import BeautifulSoup
import time
import random
import json
import re
import os
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# Configuration
BASE_URL = "http://www.ishen365.com"
START_URL = "http://www.ishen365.com/index.php/swcfb"
OUTPUT_FILE = "food_data_insert.sql"
LOG_FILE = "scraper.log"
LIMIT_PER_CATEGORY = None # Set to None for full scrape
MAX_WORKERS = 10 # Number of concurrent threads
# Lock for file writing
file_lock = threading.Lock()
# Database Schema Mapping
CATEGORY_MAP = {
"": "grain",
"": "grain",
"": "grain",
"蔬菜": "vegetable",
"": "vegetable",
"水果": "fruit",
"坚果": "other",
"": "meat",
"": "dairy",
"": "meat",
"": "seafood",
"": "seafood",
"": "seafood",
"": "other",
"小吃": "other",
"速食": "other",
"饮料": "other",
"": "other",
"": "other",
"": "other",
"调味": "other",
"": "other",
"": "other"
}
NUTRIENT_MAP = {
"蛋白质": "protein",
"脂肪": "fat",
"碳水化合物": "carbohydrate",
"能量": "energy",
"": "potassium",
"": "phosphorus",
"": "sodium",
"": "calcium",
"": "iron",
"维生素C": "vitamin_c"
}
# Headers to mimic a browser
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
def log(msg):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
formatted_msg = f"[{timestamp}] {msg}"
print(formatted_msg)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(formatted_msg + "\n")
def clean_number(value_str):
"""Extracts number from string like '39.00mg' or '123kcal'"""
if not value_str:
return None
match = re.search(r"([\d\.]+)", value_str)
if match:
return match.group(1)
return None
def get_soup(url):
try:
response = requests.get(url, headers=HEADERS, timeout=10)
response.raise_for_status()
response.encoding = 'utf-8'
return BeautifulSoup(response.text, 'html.parser')
except Exception as e:
log(f"Error fetching {url}: {e}")
return None
def extract_categories():
log("Extracting categories...")
soup = get_soup(START_URL)
if not soup:
return []
categories = []
# Based on previous analysis: div.content6_left_bottom table a
container = soup.find('div', class_='content6_left_bottom')
if container:
links = container.find_all('a')
for link in links:
name = link.get_text(strip=True)
href = link.get('href')
if href:
# Ensure full URL
if not href.startswith('http'):
full_url = BASE_URL + href if href.startswith('/') else BASE_URL + '/' + href
else:
full_url = href
# Map category name to DB enum
db_category = "other"
for key, value in CATEGORY_MAP.items():
if key in name:
db_category = value
break
categories.append({
"name": name,
"url": full_url,
"db_category": db_category
})
log(f"Found {len(categories)} categories.")
return categories
def extract_food_list(category_url):
"""Extracts all food items from a category, handling pagination."""
food_items = []
current_url = category_url
while current_url:
log(f"Processing list page: {current_url}")
soup = get_soup(current_url)
if not soup:
break
# Extract food items
# Based on previous analysis: div.as_list -> a[href]
# The structure is <a href="..."> <div class="as_list"> ... </div> </a>
# Wait, usually the <a> wraps the div or is inside.
# From previous analysis: <a href="29/show/36"><div class="as_list one">...
# Let's look for all 'a' tags that contain 'as_list' div?
# Or just find all divs with class 'as_list' and get parent 'a'?
list_divs = soup.find_all('div', class_='as_list')
for div in list_divs:
parent_a = div.find_parent('a')
if parent_a:
href = parent_a.get('href')
# Extract image and name
img_tag = div.find('img')
img_src = img_tag.get('src') if img_tag else ""
if img_src and not img_src.startswith('http'):
img_src = BASE_URL + img_src if img_src.startswith('/') else BASE_URL + '/' + img_src
name_div = div.find('div', class_='as_list_tit')
name = name_div.get_text(strip=True) if name_div else "Unknown"
if href:
# Construct detail URL. The href found was "29/show/36" (relative to something?)
# If category_url is http://www.ishen365.com/index.php/swcfb/index/29
# and href is "29/show/36", it might be relative to current path or base.
# Let's check the href carefully.
# If href doesn't start with /, it's relative to current path.
# But if we are at .../index/29, "29/show/36" would be .../index/29/29/show/36 which is wrong.
# It's likely relative to index.php or base.
# Let's resolve it properly.
full_detail_url = ""
if href.startswith('http'):
full_detail_url = href
elif href.startswith('/'):
full_detail_url = BASE_URL + href
else:
# Careful with relative paths.
# If the page is /index.php/swcfb/index/29
# and link is 29/show/36, it might mean /index.php/swcfb/index/29/show/36 ??
# Or maybe the href is actually "/index.php/swcfb/show/36" ?
# I need to be robust. I'll assume it needs to be joined with BASE_URL/index.php/swcfb/ maybe?
# Let's try to join with the current URL's directory.
# Actually, easiest is to just print it and see during debug, but I want to get it right.
# Let's assume standard relative URL resolution.
full_detail_url = urljoin(current_url, href)
food_items.append({
"name": name,
"url": full_detail_url,
"image": img_src
})
# Handle pagination
# Look for "下一页"
next_page = None
pagination = soup.find('ul', class_='pagination') # Bootstrap style? Or just search text.
# Searching by text is safer if class is unknown
next_link = soup.find('a', string=re.compile(r'下一页|Next'))
if next_link:
href = next_link.get('href')
if href and href != '#':
next_page = urljoin(current_url, href)
if next_page and next_page != current_url:
current_url = next_page
time.sleep(random.uniform(1, 2))
else:
current_url = None
return food_items
def extract_food_detail(url):
log(f" Scraping detail: {url}")
soup = get_soup(url)
if not soup:
return None
data = {}
# Extract Nutrients
# Based on previous analysis: table.am-table
table = soup.find('table', class_='am-table')
nutrients = {}
other_nutrients = {}
if table:
rows = table.find_all('tr')
for row in rows:
cols = row.find_all('td')
if len(cols) >= 2:
# Name often has extra text like "钾 (含量低)"
raw_name = cols[0].get_text(strip=True)
# Clean name: remove content in parens if it's just description like (含量低)
# But sometimes parens are part of name like "维生素C(抗坏血酸)"
# Let's just keep the main part.
# Check if it maps to our schema
db_field = None
for key, field in NUTRIENT_MAP.items():
if key in raw_name: # Simple substring match
db_field = field
break
value_str = cols[1].get_text(strip=True)
value = clean_number(value_str)
if db_field:
nutrients[db_field] = value
else:
# Store in other_nutrients
# Clean the key name a bit
clean_key = re.sub(r'\s*.*?|s*\(.*?\)', '', raw_name)
if value:
other_nutrients[clean_key] = value_str # Keep unit for json
data['nutrients'] = nutrients
data['other_nutrients'] = other_nutrients
# Extract suitability/tips if available (Optional, but good for completeness)
# Looking for other text blocks
return data
def generate_sql(food_data, category_enum):
name = food_data['name'].replace("'", "''")
image = food_data['image']
nutrients = food_data['details']['nutrients']
others = json.dumps(food_data['details']['other_nutrients'], ensure_ascii=False).replace("'", "''")
# Defaults
protein = nutrients.get('protein', '0')
fat = nutrients.get('fat', '0')
carbohydrate = nutrients.get('carbohydrate', '0')
energy = nutrients.get('energy', '0')
potassium = nutrients.get('potassium', '0')
phosphorus = nutrients.get('phosphorus', '0')
sodium = nutrients.get('sodium', '0')
calcium = nutrients.get('calcium', '0')
iron = nutrients.get('iron', '0')
vitamin_c = nutrients.get('vitamin_c', '0')
sql = f"""INSERT INTO `v2_foods`
(`name`, `category`, `image`, `protein`, `fat`, `carbohydrate`, `energy`, `potassium`, `phosphorus`, `sodium`, `calcium`, `iron`, `vitamin_c`, `nutrients_json`, `status`, `created_at`, `updated_at`)
VALUES
('{name}', '{category_enum}', '{image}', {protein}, {fat}, {carbohydrate}, {energy}, {potassium}, {phosphorus}, {sodium}, {calcium}, {iron}, {vitamin_c}, '{others}', 'active', NOW(), NOW());"""
return sql
def process_food_item(food, category_enum):
"""Worker function to process a single food item"""
try:
time.sleep(random.uniform(0.1, 0.5)) # Politeness delay
details = extract_food_detail(food['url'])
if details:
food['details'] = details
sql = generate_sql(food, category_enum)
# Thread-safe write
with file_lock:
with open(OUTPUT_FILE, "a", encoding="utf-8") as f:
f.write(sql + "\n")
return True
except Exception as e:
log(f"Error processing {food['name']}: {e}")
return False
def main():
# 1. Get Categories
categories = extract_categories()
all_food_items = []
# 2. Collect all items first
log("Collecting all food links...")
for cat in categories:
log(f"Scanning Category: {cat['name']} ({cat['db_category']})")
food_list = extract_food_list(cat['url'])
log(f" Found {len(food_list)} items in category.")
# Add category info to food item for later use
for food in food_list:
food['category_enum'] = cat['db_category']
all_food_items.append(food)
if LIMIT_PER_CATEGORY:
# Just for testing the collection logic if needed,
# but LIMIT_PER_CATEGORY was for processing.
# I'll ignore it for collection to keep it simple, or apply it here.
pass
total_items = len(all_food_items)
log(f"Total items to scrape: {total_items}")
# Apply limit if set
if LIMIT_PER_CATEGORY:
# This is a global limit now for simplicity in threaded mode?
# Or per category? The variable name implies per category.
# I'll just slice the list if I really want to limit.
# Let's keep it simple: if limit is set, we only take first N items total for testing.
# But wait, LIMIT_PER_CATEGORY was 3.
# If I want to test, I should probably limit per category during collection.
# I'll stick to full scrape now as LIMIT is None.
pass
# 3. Process in parallel
log(f"Starting parallel scraping with {MAX_WORKERS} workers...")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = []
for food in all_food_items:
futures.append(executor.submit(process_food_item, food, food['category_enum']))
# Monitor progress
completed = 0
for future in as_completed(futures):
completed += 1
if completed % 10 == 0:
log(f"Progress: {completed}/{total_items} ({completed/total_items*100:.1f}%)")
log("Done!")
if __name__ == "__main__":
# Initialize output file
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
f.write("-- Bulk Insert SQL for v2_foods\n")
f.write("DELETE FROM `v2_foods`;\n") # Optional: clear old data
main()