import requests from bs4 import BeautifulSoup import time import random import json import re import os from urllib.parse import urljoin from concurrent.futures import ThreadPoolExecutor, as_completed import threading # Configuration BASE_URL = "http://www.ishen365.com" START_URL = "http://www.ishen365.com/index.php/swcfb" OUTPUT_FILE = "food_data_insert.sql" LOG_FILE = "scraper.log" LIMIT_PER_CATEGORY = None # Set to None for full scrape MAX_WORKERS = 10 # Number of concurrent threads # Lock for file writing file_lock = threading.Lock() # Database Schema Mapping CATEGORY_MAP = { "谷": "grain", "薯": "grain", "豆": "grain", "蔬菜": "vegetable", "菌": "vegetable", "水果": "fruit", "坚果": "other", "肉": "meat", "乳": "dairy", "蛋": "meat", "鱼": "seafood", "蟹": "seafood", "贝": "seafood", "婴": "other", "小吃": "other", "速食": "other", "饮料": "other", "酒": "other", "糖": "other", "蜜": "other", "调味": "other", "药": "other", "油": "other" } NUTRIENT_MAP = { "蛋白质": "protein", "脂肪": "fat", "碳水化合物": "carbohydrate", "能量": "energy", "钾": "potassium", "磷": "phosphorus", "钠": "sodium", "钙": "calcium", "铁": "iron", "维生素C": "vitamin_c" } # Headers to mimic a browser HEADERS = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } def log(msg): timestamp = time.strftime("%Y-%m-%d %H:%M:%S") formatted_msg = f"[{timestamp}] {msg}" print(formatted_msg) with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(formatted_msg + "\n") def clean_number(value_str): """Extracts number from string like '39.00mg' or '123kcal'""" if not value_str: return None match = re.search(r"([\d\.]+)", value_str) if match: return match.group(1) return None def get_soup(url): try: response = requests.get(url, headers=HEADERS, timeout=10) response.raise_for_status() response.encoding = 'utf-8' return BeautifulSoup(response.text, 'html.parser') except Exception as e: log(f"Error fetching {url}: {e}") return None def extract_categories(): log("Extracting categories...") soup = get_soup(START_URL) if not soup: return [] categories = [] # Based on previous analysis: div.content6_left_bottom table a container = soup.find('div', class_='content6_left_bottom') if container: links = container.find_all('a') for link in links: name = link.get_text(strip=True) href = link.get('href') if href: # Ensure full URL if not href.startswith('http'): full_url = BASE_URL + href if href.startswith('/') else BASE_URL + '/' + href else: full_url = href # Map category name to DB enum db_category = "other" for key, value in CATEGORY_MAP.items(): if key in name: db_category = value break categories.append({ "name": name, "url": full_url, "db_category": db_category }) log(f"Found {len(categories)} categories.") return categories def extract_food_list(category_url): """Extracts all food items from a category, handling pagination.""" food_items = [] current_url = category_url while current_url: log(f"Processing list page: {current_url}") soup = get_soup(current_url) if not soup: break # Extract food items # Based on previous analysis: div.as_list -> a[href] # The structure is
...
# Wait, usually the wraps the div or is inside. # From previous analysis:
... # Let's look for all 'a' tags that contain 'as_list' div? # Or just find all divs with class 'as_list' and get parent 'a'? list_divs = soup.find_all('div', class_='as_list') for div in list_divs: parent_a = div.find_parent('a') if parent_a: href = parent_a.get('href') # Extract image and name img_tag = div.find('img') img_src = img_tag.get('src') if img_tag else "" if img_src and not img_src.startswith('http'): img_src = BASE_URL + img_src if img_src.startswith('/') else BASE_URL + '/' + img_src name_div = div.find('div', class_='as_list_tit') name = name_div.get_text(strip=True) if name_div else "Unknown" if href: # Construct detail URL. The href found was "29/show/36" (relative to something?) # If category_url is http://www.ishen365.com/index.php/swcfb/index/29 # and href is "29/show/36", it might be relative to current path or base. # Let's check the href carefully. # If href doesn't start with /, it's relative to current path. # But if we are at .../index/29, "29/show/36" would be .../index/29/29/show/36 which is wrong. # It's likely relative to index.php or base. # Let's resolve it properly. full_detail_url = "" if href.startswith('http'): full_detail_url = href elif href.startswith('/'): full_detail_url = BASE_URL + href else: # Careful with relative paths. # If the page is /index.php/swcfb/index/29 # and link is 29/show/36, it might mean /index.php/swcfb/index/29/show/36 ?? # Or maybe the href is actually "/index.php/swcfb/show/36" ? # I need to be robust. I'll assume it needs to be joined with BASE_URL/index.php/swcfb/ maybe? # Let's try to join with the current URL's directory. # Actually, easiest is to just print it and see during debug, but I want to get it right. # Let's assume standard relative URL resolution. full_detail_url = urljoin(current_url, href) food_items.append({ "name": name, "url": full_detail_url, "image": img_src }) # Handle pagination # Look for "下一页" next_page = None pagination = soup.find('ul', class_='pagination') # Bootstrap style? Or just search text. # Searching by text is safer if class is unknown next_link = soup.find('a', string=re.compile(r'下一页|Next')) if next_link: href = next_link.get('href') if href and href != '#': next_page = urljoin(current_url, href) if next_page and next_page != current_url: current_url = next_page time.sleep(random.uniform(1, 2)) else: current_url = None return food_items def extract_food_detail(url): log(f" Scraping detail: {url}") soup = get_soup(url) if not soup: return None data = {} # Extract Nutrients # Based on previous analysis: table.am-table table = soup.find('table', class_='am-table') nutrients = {} other_nutrients = {} if table: rows = table.find_all('tr') for row in rows: cols = row.find_all('td') if len(cols) >= 2: # Name often has extra text like "钾 (含量低)" raw_name = cols[0].get_text(strip=True) # Clean name: remove content in parens if it's just description like (含量低) # But sometimes parens are part of name like "维生素C(抗坏血酸)" # Let's just keep the main part. # Check if it maps to our schema db_field = None for key, field in NUTRIENT_MAP.items(): if key in raw_name: # Simple substring match db_field = field break value_str = cols[1].get_text(strip=True) value = clean_number(value_str) if db_field: nutrients[db_field] = value else: # Store in other_nutrients # Clean the key name a bit clean_key = re.sub(r'\s*(.*?)|s*\(.*?\)', '', raw_name) if value: other_nutrients[clean_key] = value_str # Keep unit for json data['nutrients'] = nutrients data['other_nutrients'] = other_nutrients # Extract suitability/tips if available (Optional, but good for completeness) # Looking for other text blocks return data def generate_sql(food_data, category_enum): name = food_data['name'].replace("'", "''") image = food_data['image'] nutrients = food_data['details']['nutrients'] others = json.dumps(food_data['details']['other_nutrients'], ensure_ascii=False).replace("'", "''") # Defaults protein = nutrients.get('protein', '0') fat = nutrients.get('fat', '0') carbohydrate = nutrients.get('carbohydrate', '0') energy = nutrients.get('energy', '0') potassium = nutrients.get('potassium', '0') phosphorus = nutrients.get('phosphorus', '0') sodium = nutrients.get('sodium', '0') calcium = nutrients.get('calcium', '0') iron = nutrients.get('iron', '0') vitamin_c = nutrients.get('vitamin_c', '0') sql = f"""INSERT INTO `v2_foods` (`name`, `category`, `image`, `protein`, `fat`, `carbohydrate`, `energy`, `potassium`, `phosphorus`, `sodium`, `calcium`, `iron`, `vitamin_c`, `nutrients_json`, `status`, `created_at`, `updated_at`) VALUES ('{name}', '{category_enum}', '{image}', {protein}, {fat}, {carbohydrate}, {energy}, {potassium}, {phosphorus}, {sodium}, {calcium}, {iron}, {vitamin_c}, '{others}', 'active', NOW(), NOW());""" return sql def process_food_item(food, category_enum): """Worker function to process a single food item""" try: time.sleep(random.uniform(0.1, 0.5)) # Politeness delay details = extract_food_detail(food['url']) if details: food['details'] = details sql = generate_sql(food, category_enum) # Thread-safe write with file_lock: with open(OUTPUT_FILE, "a", encoding="utf-8") as f: f.write(sql + "\n") return True except Exception as e: log(f"Error processing {food['name']}: {e}") return False def main(): # 1. Get Categories categories = extract_categories() all_food_items = [] # 2. Collect all items first log("Collecting all food links...") for cat in categories: log(f"Scanning Category: {cat['name']} ({cat['db_category']})") food_list = extract_food_list(cat['url']) log(f" Found {len(food_list)} items in category.") # Add category info to food item for later use for food in food_list: food['category_enum'] = cat['db_category'] all_food_items.append(food) if LIMIT_PER_CATEGORY: # Just for testing the collection logic if needed, # but LIMIT_PER_CATEGORY was for processing. # I'll ignore it for collection to keep it simple, or apply it here. pass total_items = len(all_food_items) log(f"Total items to scrape: {total_items}") # Apply limit if set if LIMIT_PER_CATEGORY: # This is a global limit now for simplicity in threaded mode? # Or per category? The variable name implies per category. # I'll just slice the list if I really want to limit. # Let's keep it simple: if limit is set, we only take first N items total for testing. # But wait, LIMIT_PER_CATEGORY was 3. # If I want to test, I should probably limit per category during collection. # I'll stick to full scrape now as LIMIT is None. pass # 3. Process in parallel log(f"Starting parallel scraping with {MAX_WORKERS} workers...") with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [] for food in all_food_items: futures.append(executor.submit(process_food_item, food, food['category_enum'])) # Monitor progress completed = 0 for future in as_completed(futures): completed += 1 if completed % 10 == 0: log(f"Progress: {completed}/{total_items} ({completed/total_items*100:.1f}%)") log("Done!") if __name__ == "__main__": # Initialize output file with open(OUTPUT_FILE, "w", encoding="utf-8") as f: f.write("-- Bulk Insert SQL for v2_foods\n") f.write("DELETE FROM `v2_foods`;\n") # Optional: clear old data main()