Files
msh-system/msh_crmeb_22/scraper/full_scraper.py

374 lines
14 KiB
Python
Raw Normal View History

import requests
from bs4 import BeautifulSoup
import time
import random
import json
import re
import os
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# Configuration
BASE_URL = "http://www.ishen365.com"
START_URL = "http://www.ishen365.com/index.php/swcfb"
OUTPUT_FILE = "food_data_insert.sql"
LOG_FILE = "scraper.log"
LIMIT_PER_CATEGORY = None # Set to None for full scrape
MAX_WORKERS = 10 # Number of concurrent threads
# Lock for file writing
file_lock = threading.Lock()
# Database Schema Mapping
CATEGORY_MAP = {
"": "grain",
"": "grain",
"": "grain",
"蔬菜": "vegetable",
"": "vegetable",
"水果": "fruit",
"坚果": "other",
"": "meat",
"": "dairy",
"": "meat",
"": "seafood",
"": "seafood",
"": "seafood",
"": "other",
"小吃": "other",
"速食": "other",
"饮料": "other",
"": "other",
"": "other",
"": "other",
"调味": "other",
"": "other",
"": "other"
}
NUTRIENT_MAP = {
"蛋白质": "protein",
"脂肪": "fat",
"碳水化合物": "carbohydrate",
"能量": "energy",
"": "potassium",
"": "phosphorus",
"": "sodium",
"": "calcium",
"": "iron",
"维生素C": "vitamin_c"
}
# Headers to mimic a browser
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
def log(msg):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
formatted_msg = f"[{timestamp}] {msg}"
print(formatted_msg)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(formatted_msg + "\n")
def clean_number(value_str):
"""Extracts number from string like '39.00mg' or '123kcal'"""
if not value_str:
return None
match = re.search(r"([\d\.]+)", value_str)
if match:
return match.group(1)
return None
def get_soup(url):
try:
response = requests.get(url, headers=HEADERS, timeout=10)
response.raise_for_status()
response.encoding = 'utf-8'
return BeautifulSoup(response.text, 'html.parser')
except Exception as e:
log(f"Error fetching {url}: {e}")
return None
def extract_categories():
log("Extracting categories...")
soup = get_soup(START_URL)
if not soup:
return []
categories = []
# Based on previous analysis: div.content6_left_bottom table a
container = soup.find('div', class_='content6_left_bottom')
if container:
links = container.find_all('a')
for link in links:
name = link.get_text(strip=True)
href = link.get('href')
if href:
# Ensure full URL
if not href.startswith('http'):
full_url = BASE_URL + href if href.startswith('/') else BASE_URL + '/' + href
else:
full_url = href
# Map category name to DB enum
db_category = "other"
for key, value in CATEGORY_MAP.items():
if key in name:
db_category = value
break
categories.append({
"name": name,
"url": full_url,
"db_category": db_category
})
log(f"Found {len(categories)} categories.")
return categories
def extract_food_list(category_url):
"""Extracts all food items from a category, handling pagination."""
food_items = []
current_url = category_url
while current_url:
log(f"Processing list page: {current_url}")
soup = get_soup(current_url)
if not soup:
break
# Extract food items
# Based on previous analysis: div.as_list -> a[href]
# The structure is <a href="..."> <div class="as_list"> ... </div> </a>
# Wait, usually the <a> wraps the div or is inside.
# From previous analysis: <a href="29/show/36"><div class="as_list one">...
# Let's look for all 'a' tags that contain 'as_list' div?
# Or just find all divs with class 'as_list' and get parent 'a'?
list_divs = soup.find_all('div', class_='as_list')
for div in list_divs:
parent_a = div.find_parent('a')
if parent_a:
href = parent_a.get('href')
# Extract image and name
img_tag = div.find('img')
img_src = img_tag.get('src') if img_tag else ""
if img_src and not img_src.startswith('http'):
img_src = BASE_URL + img_src if img_src.startswith('/') else BASE_URL + '/' + img_src
name_div = div.find('div', class_='as_list_tit')
name = name_div.get_text(strip=True) if name_div else "Unknown"
if href:
# Construct detail URL. The href found was "29/show/36" (relative to something?)
# If category_url is http://www.ishen365.com/index.php/swcfb/index/29
# and href is "29/show/36", it might be relative to current path or base.
# Let's check the href carefully.
# If href doesn't start with /, it's relative to current path.
# But if we are at .../index/29, "29/show/36" would be .../index/29/29/show/36 which is wrong.
# It's likely relative to index.php or base.
# Let's resolve it properly.
full_detail_url = ""
if href.startswith('http'):
full_detail_url = href
elif href.startswith('/'):
full_detail_url = BASE_URL + href
else:
# Careful with relative paths.
# If the page is /index.php/swcfb/index/29
# and link is 29/show/36, it might mean /index.php/swcfb/index/29/show/36 ??
# Or maybe the href is actually "/index.php/swcfb/show/36" ?
# I need to be robust. I'll assume it needs to be joined with BASE_URL/index.php/swcfb/ maybe?
# Let's try to join with the current URL's directory.
# Actually, easiest is to just print it and see during debug, but I want to get it right.
# Let's assume standard relative URL resolution.
full_detail_url = urljoin(current_url, href)
food_items.append({
"name": name,
"url": full_detail_url,
"image": img_src
})
# Handle pagination
# Look for "下一页"
next_page = None
pagination = soup.find('ul', class_='pagination') # Bootstrap style? Or just search text.
# Searching by text is safer if class is unknown
next_link = soup.find('a', string=re.compile(r'下一页|Next'))
if next_link:
href = next_link.get('href')
if href and href != '#':
next_page = urljoin(current_url, href)
if next_page and next_page != current_url:
current_url = next_page
time.sleep(random.uniform(1, 2))
else:
current_url = None
return food_items
def extract_food_detail(url):
log(f" Scraping detail: {url}")
soup = get_soup(url)
if not soup:
return None
data = {}
# Extract Nutrients
# Based on previous analysis: table.am-table
table = soup.find('table', class_='am-table')
nutrients = {}
other_nutrients = {}
if table:
rows = table.find_all('tr')
for row in rows:
cols = row.find_all('td')
if len(cols) >= 2:
# Name often has extra text like "钾 (含量低)"
raw_name = cols[0].get_text(strip=True)
# Clean name: remove content in parens if it's just description like (含量低)
# But sometimes parens are part of name like "维生素C(抗坏血酸)"
# Let's just keep the main part.
# Check if it maps to our schema
db_field = None
for key, field in NUTRIENT_MAP.items():
if key in raw_name: # Simple substring match
db_field = field
break
value_str = cols[1].get_text(strip=True)
value = clean_number(value_str)
if db_field:
nutrients[db_field] = value
else:
# Store in other_nutrients
# Clean the key name a bit
clean_key = re.sub(r'\s*.*?|s*\(.*?\)', '', raw_name)
if value:
other_nutrients[clean_key] = value_str # Keep unit for json
data['nutrients'] = nutrients
data['other_nutrients'] = other_nutrients
# Extract suitability/tips if available (Optional, but good for completeness)
# Looking for other text blocks
return data
def generate_sql(food_data, category_enum):
name = food_data['name'].replace("'", "''")
image = food_data['image']
nutrients = food_data['details']['nutrients']
others = json.dumps(food_data['details']['other_nutrients'], ensure_ascii=False).replace("'", "''")
# Defaults
protein = nutrients.get('protein', '0')
fat = nutrients.get('fat', '0')
carbohydrate = nutrients.get('carbohydrate', '0')
energy = nutrients.get('energy', '0')
potassium = nutrients.get('potassium', '0')
phosphorus = nutrients.get('phosphorus', '0')
sodium = nutrients.get('sodium', '0')
calcium = nutrients.get('calcium', '0')
iron = nutrients.get('iron', '0')
vitamin_c = nutrients.get('vitamin_c', '0')
sql = f"""INSERT INTO `v2_foods`
(`name`, `category`, `image`, `protein`, `fat`, `carbohydrate`, `energy`, `potassium`, `phosphorus`, `sodium`, `calcium`, `iron`, `vitamin_c`, `nutrients_json`, `status`, `created_at`, `updated_at`)
VALUES
('{name}', '{category_enum}', '{image}', {protein}, {fat}, {carbohydrate}, {energy}, {potassium}, {phosphorus}, {sodium}, {calcium}, {iron}, {vitamin_c}, '{others}', 'active', NOW(), NOW());"""
return sql
def process_food_item(food, category_enum):
"""Worker function to process a single food item"""
try:
time.sleep(random.uniform(0.1, 0.5)) # Politeness delay
details = extract_food_detail(food['url'])
if details:
food['details'] = details
sql = generate_sql(food, category_enum)
# Thread-safe write
with file_lock:
with open(OUTPUT_FILE, "a", encoding="utf-8") as f:
f.write(sql + "\n")
return True
except Exception as e:
log(f"Error processing {food['name']}: {e}")
return False
def main():
# 1. Get Categories
categories = extract_categories()
all_food_items = []
# 2. Collect all items first
log("Collecting all food links...")
for cat in categories:
log(f"Scanning Category: {cat['name']} ({cat['db_category']})")
food_list = extract_food_list(cat['url'])
log(f" Found {len(food_list)} items in category.")
# Add category info to food item for later use
for food in food_list:
food['category_enum'] = cat['db_category']
all_food_items.append(food)
if LIMIT_PER_CATEGORY:
# Just for testing the collection logic if needed,
# but LIMIT_PER_CATEGORY was for processing.
# I'll ignore it for collection to keep it simple, or apply it here.
pass
total_items = len(all_food_items)
log(f"Total items to scrape: {total_items}")
# Apply limit if set
if LIMIT_PER_CATEGORY:
# This is a global limit now for simplicity in threaded mode?
# Or per category? The variable name implies per category.
# I'll just slice the list if I really want to limit.
# Let's keep it simple: if limit is set, we only take first N items total for testing.
# But wait, LIMIT_PER_CATEGORY was 3.
# If I want to test, I should probably limit per category during collection.
# I'll stick to full scrape now as LIMIT is None.
pass
# 3. Process in parallel
log(f"Starting parallel scraping with {MAX_WORKERS} workers...")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = []
for food in all_food_items:
futures.append(executor.submit(process_food_item, food, food['category_enum']))
# Monitor progress
completed = 0
for future in as_completed(futures):
completed += 1
if completed % 10 == 0:
log(f"Progress: {completed}/{total_items} ({completed/total_items*100:.1f}%)")
log("Done!")
if __name__ == "__main__":
# Initialize output file
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
f.write("-- Bulk Insert SQL for v2_foods\n")
f.write("DELETE FROM `v2_foods`;\n") # Optional: clear old data
main()