Files
msh-system/msh_crmeb_22/scraper/knowledge_scraper.py

381 lines
14 KiB
Python

import requests
from bs4 import BeautifulSoup
import time
import random
import json
import re
import os
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# Configuration
BASE_URL = "http://www.ishen365.com"
OUTPUT_FILE = "knowledge_data_insert.sql"
LOG_FILE = "knowledge_scraper.log"
MAX_WORKERS = 10
# Lock for file writing
file_lock = threading.Lock()
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36"
}
def log(msg):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
formatted_msg = f"[{timestamp}] {msg}"
print(formatted_msg)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(formatted_msg + "\n")
def get_soup(url):
try:
response = requests.get(url, headers=HEADERS, timeout=10)
response.raise_for_status()
response.encoding = 'utf-8'
return BeautifulSoup(response.text, 'html.parser')
except Exception as e:
log(f"Error fetching {url}: {e}")
return None
def clean_text(text):
if not text:
return ""
return text.strip().replace("'", "''").replace("\\", "\\\\")
def clean_html(html_content):
if not html_content:
return ""
return str(html_content).replace("'", "''").replace("\\", "\\\\")
def save_sql(sql):
with file_lock:
with open(OUTPUT_FILE, "a", encoding="utf-8") as f:
f.write(sql + "\n")
def generate_sql(data, table_name='v2_knowledge'):
title = clean_text(data.get('title', ''))
content = clean_html(data.get('content', ''))
summary = clean_text(data.get('summary', ''))
cover_image = data.get('cover_image', '')
if table_name == 'v2_knowledge':
type_ = data.get('type', 'article')
category = data.get('category', '')
nutrient_name = clean_text(data.get('nutrient_name', ''))
sql = f"""INSERT INTO `v2_knowledge`
(`title`, `content`, `summary`, `cover_image`, `type`, `category`, `nutrient_name`, `status`, `created_at`, `updated_at`)
VALUES
('{title}', '{content}', '{summary}', '{cover_image}', '{type_}', '{category}', '{nutrient_name}', 'published', NOW(), NOW());"""
return sql
elif table_name == 'v2_recipes':
# v2_recipes fields: name, description, cover_image, category, meal_type, ingredients_json, steps_json, status, is_official
# We map content to description
# We try to extract video if available in content (not implemented here but content has it)
sql = f"""INSERT INTO `v2_recipes`
(`name`, `description`, `cover_image`, `category`, `ingredients_json`, `steps_json`, `status`, `is_official`, `created_at`, `updated_at`)
VALUES
('{title}', '{content}', '{cover_image}', 'godkitchen', '[]', '[]', 'published', 1, NOW(), NOW());"""
return sql
elif table_name == 'v2_community_posts':
# v2_community_posts fields: title, content, cover_image, user_id, status, audit_status, privacy
# user_id = 1 (System)
sql = f"""INSERT INTO `v2_community_posts`
(`title`, `content`, `cover_image`, `user_id`, `status`, `audit_status`, `privacy`, `created_at`, `updated_at`)
VALUES
('{title}', '{content}', '{cover_image}', 1, 'published', 'approved', 'public', NOW(), NOW());"""
return sql
return ""
def scrape_detail(url, item_data):
# Fetch detail page to get content
soup = get_soup(url)
if not soup:
return None
# Extract content
# Common pattern: div.txt for articles
# For recipes (godkitchen): div.content_left (contains .video_play and p tags)
content_div = soup.find('div', class_='txt')
if not content_div:
# Try finding content_left for recipes or other structures
content_left = soup.find('div', class_='content_left')
if content_left:
# Remove title and time divs if present to avoid duplication
for div in content_left.find_all('div', class_=['tit', 'tit2', 'lst_btns']):
div.decompose()
content_div = content_left
if content_div:
item_data['content'] = content_div
else:
item_data['content'] = "<p>No content extracted.</p>"
return item_data
def process_item(item, source_type, table_name='v2_knowledge', extra_info=None):
try:
full_data = scrape_detail(item['url'], item)
if full_data:
full_data['type'] = source_type
if extra_info:
full_data.update(extra_info)
sql = generate_sql(full_data, table_name)
save_sql(sql)
return True
except Exception as e:
log(f"Error processing item {item.get('url')}: {e}")
return False
# ---------------------------------------------------------------------------
# Task Specific Scrapers
# ---------------------------------------------------------------------------
def task2_nutrients():
log("Starting Task 2: Nutrients")
url = "http://www.ishen365.com/index.php/rsyys"
soup = get_soup(url)
if not soup:
return
nutrients = []
links = soup.find_all('a')
for link in links:
div = link.find('div', class_='tianchong')
if div:
name = div.get_text(strip=True)
href = link.get('href')
if href:
nutrients.append({'name': name, 'url': urljoin(url, href)})
log(f"Found {len(nutrients)} nutrient categories.")
all_items = []
for nutrient in nutrients:
log(f"Scanning Nutrient: {nutrient['name']}")
n_soup = get_soup(nutrient['url'])
if not n_soup:
continue
list_items = n_soup.find_all('li', class_='am-g')
for li in list_items:
h3 = li.find('h3', class_='am-list-item-hd')
if h3:
a = h3.find('a')
if a:
title = a.get_text(strip=True)
href = urljoin(nutrient['url'], a.get('href'))
summary_div = li.find('div', class_='am-list-item-text')
summary = summary_div.get_text(strip=True) if summary_div else ""
thumb = li.find('div', class_='am-list-item-thumb')
img = thumb.find('img') if thumb else None
img_src = urljoin(nutrient['url'], img.get('src')) if img else ""
all_items.append({
'title': title,
'url': href,
'summary': summary,
'cover_image': img_src,
'nutrient_name': nutrient['name'],
'category': 'nutrients'
})
log(f"Total Nutrient Articles to scrape: {len(all_items)}")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(process_item, item, 'nutrients', 'v2_knowledge') for item in all_items]
for _ in as_completed(futures):
pass
def scrape_list_items(task):
log(f"Processing {task['name']}...")
all_items = []
# Determine start page
start_page = task.get('start_page', 1)
page_url = f"{task['url']}?page={start_page}" if start_page > 1 else task['url']
soup = get_soup(page_url)
if not soup:
return []
current_soup = soup
page_count = start_page
while True:
if task['limit'] and len(all_items) >= task['limit']:
all_items = all_items[:task['limit']]
break
# Extract items
items_list = current_soup.find_all('li', class_='am-g')
new_items_found = False
for li in items_list:
if task['limit'] and len(all_items) >= task['limit']:
break
h3 = li.find('h3', class_='am-list-item-hd')
if h3:
a = h3.find('a')
if a:
title = a.get_text(strip=True)
href = urljoin(task['url'], a.get('href'))
summary_div = li.find('div', class_='am-list-item-text')
summary = summary_div.get_text(strip=True) if summary_div else ""
thumb = li.find('div', class_='am-list-item-thumb')
img = thumb.find('img') if thumb else None
img_src = urljoin(task['url'], img.get('src')) if img else ""
all_items.append({
'title': title,
'url': href,
'summary': summary,
'cover_image': img_src,
'category': task['name']
})
new_items_found = True
if not new_items_found:
# Maybe the page is empty, or structure changed
# If start_page was high (e.g. 21), it might be empty if not that many items.
log(f" No items found on page {page_count}. Stopping.")
break
# Next page
next_link = current_soup.find('a', string=re.compile(r'下一页|Next'))
if not next_link:
break
next_href = next_link.get('href')
if not next_href or next_href == '#':
break
next_url = urljoin(task['url'], next_href)
log(f" Fetching next page: {next_url}")
current_soup = get_soup(next_url)
if not current_soup:
break
page_count += 1
# Safety break for unlimited tasks
if not task['limit'] and page_count > 20:
break
log(f" Total items collected for {task['name']}: {len(all_items)}")
return all_items
def task3_articles_guides():
log("Starting Task 3: Articles & Guides")
tasks = [
{'name': 'scienceteach', 'url': 'http://www.ishen365.com/index.php/article/scienceteach', 'type': 'article', 'table': 'v2_knowledge', 'limit': None},
{'name': 'wiki', 'url': 'http://www.ishen365.com/index.php/wiki', 'type': 'guide', 'table': 'v2_knowledge', 'limit': 100},
{'name': 'doctornews', 'url': 'http://www.ishen365.com/index.php/article/doctornews', 'type': 'article', 'table': 'v2_knowledge', 'limit': 100}
]
for task in tasks:
items = scrape_list_items(task)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(process_item, item, task['type'], task['table']) for item in items]
for _ in as_completed(futures):
pass
def task4_recipes():
log("Starting Task 4: Recipes")
url = "http://www.ishen365.com/index.php/article/godkitchen"
soup = get_soup(url)
if not soup:
return
all_items = []
current_soup = soup
page_count = 1
while True:
video_list_ul = current_soup.find('ul', class_='video_list')
if video_list_ul:
lis = video_list_ul.find_all('li', class_='f_l')
for li in lis:
a = li.find('a', class_='video')
if a:
href = urljoin(url, a.get('href'))
img = a.find('img')
img_src = urljoin(url, img.get('src')) if img else ""
title_span = a.find('span', class_='video_title')
title = title_span.get_text(strip=True) if title_span else ""
all_items.append({
'title': title,
'url': href,
'cover_image': img_src,
'summary': '',
'category': 'godkitchen'
})
next_link = current_soup.find('a', string=re.compile(r'下一页|Next'))
if not next_link:
break
next_href = next_link.get('href')
if not next_href or next_href == '#':
break
next_url = urljoin(url, next_href)
log(f" Fetching next page: {next_url}")
current_soup = get_soup(next_url)
if not current_soup:
break
page_count += 1
if page_count > 20:
break
log(f"Total Recipes to scrape: {len(all_items)}")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(process_item, item, 'recipe', 'v2_recipes') for item in all_items]
for _ in as_completed(futures):
pass
def task5_community():
log("Starting Task 5: Community Posts")
# Source: Wiki, items 200-500
# Page 21 starts at item 201 (assuming 10 per page)
task = {
'name': 'community_wiki',
'url': 'http://www.ishen365.com/index.php/wiki',
'type': 'article', # not used
'table': 'v2_community_posts',
'limit': 300,
'start_page': 21
}
items = scrape_list_items(task)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(process_item, item, 'article', 'v2_community_posts') for item in items]
for _ in as_completed(futures):
pass
def main():
# Initialize output file
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
f.write("-- Bulk Insert SQL for Knowledge, Recipes, and Community\n")
f.write("DELETE FROM `v2_knowledge`;\n")
f.write("DELETE FROM `v2_recipes`;\n")
f.write("DELETE FROM `v2_community_posts`;\n")
task2_nutrients()
task3_articles_guides()
task4_recipes()
task5_community()
log("All tasks completed!")
if __name__ == "__main__":
main()