...
# Let's look for all 'a' tags that contain 'as_list' div?
# Or just find all divs with class 'as_list' and get parent 'a'?
list_divs = soup.find_all('div', class_='as_list')
for div in list_divs:
parent_a = div.find_parent('a')
if parent_a:
href = parent_a.get('href')
# Extract image and name
img_tag = div.find('img')
img_src = img_tag.get('src') if img_tag else ""
if img_src and not img_src.startswith('http'):
img_src = BASE_URL + img_src if img_src.startswith('/') else BASE_URL + '/' + img_src
name_div = div.find('div', class_='as_list_tit')
name = name_div.get_text(strip=True) if name_div else "Unknown"
if href:
# Construct detail URL. The href found was "29/show/36" (relative to something?)
# If category_url is http://www.ishen365.com/index.php/swcfb/index/29
# and href is "29/show/36", it might be relative to current path or base.
# Let's check the href carefully.
# If href doesn't start with /, it's relative to current path.
# But if we are at .../index/29, "29/show/36" would be .../index/29/29/show/36 which is wrong.
# It's likely relative to index.php or base.
# Let's resolve it properly.
full_detail_url = ""
if href.startswith('http'):
full_detail_url = href
elif href.startswith('/'):
full_detail_url = BASE_URL + href
else:
# Careful with relative paths.
# If the page is /index.php/swcfb/index/29
# and link is 29/show/36, it might mean /index.php/swcfb/index/29/show/36 ??
# Or maybe the href is actually "/index.php/swcfb/show/36" ?
# I need to be robust. I'll assume it needs to be joined with BASE_URL/index.php/swcfb/ maybe?
# Let's try to join with the current URL's directory.
# Actually, easiest is to just print it and see during debug, but I want to get it right.
# Let's assume standard relative URL resolution.
full_detail_url = urljoin(current_url, href)
food_items.append({
"name": name,
"url": full_detail_url,
"image": img_src
})
# Handle pagination
# Look for "下一页"
next_page = None
pagination = soup.find('ul', class_='pagination') # Bootstrap style? Or just search text.
# Searching by text is safer if class is unknown
next_link = soup.find('a', string=re.compile(r'下一页|Next'))
if next_link:
href = next_link.get('href')
if href and href != '#':
next_page = urljoin(current_url, href)
if next_page and next_page != current_url:
current_url = next_page
time.sleep(random.uniform(1, 2))
else:
current_url = None
return food_items
def extract_food_detail(url):
log(f" Scraping detail: {url}")
soup = get_soup(url)
if not soup:
return None
data = {}
# Extract Nutrients
# Based on previous analysis: table.am-table
table = soup.find('table', class_='am-table')
nutrients = {}
other_nutrients = {}
if table:
rows = table.find_all('tr')
for row in rows:
cols = row.find_all('td')
if len(cols) >= 2:
# Name often has extra text like "钾 (含量低)"
raw_name = cols[0].get_text(strip=True)
# Clean name: remove content in parens if it's just description like (含量低)
# But sometimes parens are part of name like "维生素C(抗坏血酸)"
# Let's just keep the main part.
# Check if it maps to our schema
db_field = None
for key, field in NUTRIENT_MAP.items():
if key in raw_name: # Simple substring match
db_field = field
break
value_str = cols[1].get_text(strip=True)
value = clean_number(value_str)
if db_field:
nutrients[db_field] = value
else:
# Store in other_nutrients
# Clean the key name a bit
clean_key = re.sub(r'\s*(.*?)|s*\(.*?\)', '', raw_name)
if value:
other_nutrients[clean_key] = value_str # Keep unit for json
data['nutrients'] = nutrients
data['other_nutrients'] = other_nutrients
# Extract suitability/tips if available (Optional, but good for completeness)
# Looking for other text blocks
return data
def generate_sql(food_data, category_enum):
name = food_data['name'].replace("'", "''")
image = food_data['image']
nutrients = food_data['details']['nutrients']
others = json.dumps(food_data['details']['other_nutrients'], ensure_ascii=False).replace("'", "''")
# Defaults
protein = nutrients.get('protein', '0')
fat = nutrients.get('fat', '0')
carbohydrate = nutrients.get('carbohydrate', '0')
energy = nutrients.get('energy', '0')
potassium = nutrients.get('potassium', '0')
phosphorus = nutrients.get('phosphorus', '0')
sodium = nutrients.get('sodium', '0')
calcium = nutrients.get('calcium', '0')
iron = nutrients.get('iron', '0')
vitamin_c = nutrients.get('vitamin_c', '0')
sql = f"""INSERT INTO `v2_foods`
(`name`, `category`, `image`, `protein`, `fat`, `carbohydrate`, `energy`, `potassium`, `phosphorus`, `sodium`, `calcium`, `iron`, `vitamin_c`, `nutrients_json`, `status`, `created_at`, `updated_at`)
VALUES
('{name}', '{category_enum}', '{image}', {protein}, {fat}, {carbohydrate}, {energy}, {potassium}, {phosphorus}, {sodium}, {calcium}, {iron}, {vitamin_c}, '{others}', 'active', NOW(), NOW());"""
return sql
def process_food_item(food, category_enum):
"""Worker function to process a single food item"""
try:
time.sleep(random.uniform(0.1, 0.5)) # Politeness delay
details = extract_food_detail(food['url'])
if details:
food['details'] = details
sql = generate_sql(food, category_enum)
# Thread-safe write
with file_lock:
with open(OUTPUT_FILE, "a", encoding="utf-8") as f:
f.write(sql + "\n")
return True
except Exception as e:
log(f"Error processing {food['name']}: {e}")
return False
def main():
# 1. Get Categories
categories = extract_categories()
all_food_items = []
# 2. Collect all items first
log("Collecting all food links...")
for cat in categories:
log(f"Scanning Category: {cat['name']} ({cat['db_category']})")
food_list = extract_food_list(cat['url'])
log(f" Found {len(food_list)} items in category.")
# Add category info to food item for later use
for food in food_list:
food['category_enum'] = cat['db_category']
all_food_items.append(food)
if LIMIT_PER_CATEGORY:
# Just for testing the collection logic if needed,
# but LIMIT_PER_CATEGORY was for processing.
# I'll ignore it for collection to keep it simple, or apply it here.
pass
total_items = len(all_food_items)
log(f"Total items to scrape: {total_items}")
# Apply limit if set
if LIMIT_PER_CATEGORY:
# This is a global limit now for simplicity in threaded mode?
# Or per category? The variable name implies per category.
# I'll just slice the list if I really want to limit.
# Let's keep it simple: if limit is set, we only take first N items total for testing.
# But wait, LIMIT_PER_CATEGORY was 3.
# If I want to test, I should probably limit per category during collection.
# I'll stick to full scrape now as LIMIT is None.
pass
# 3. Process in parallel
log(f"Starting parallel scraping with {MAX_WORKERS} workers...")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = []
for food in all_food_items:
futures.append(executor.submit(process_food_item, food, food['category_enum']))
# Monitor progress
completed = 0
for future in as_completed(futures):
completed += 1
if completed % 10 == 0:
log(f"Progress: {completed}/{total_items} ({completed/total_items*100:.1f}%)")
log("Done!")
if __name__ == "__main__":
# Initialize output file
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
f.write("-- Bulk Insert SQL for v2_foods\n")
f.write("DELETE FROM `v2_foods`;\n") # Optional: clear old data
main()