Files
msh-system/msh_crmeb_22/scraper/knowledge_scraper.py

381 lines
14 KiB
Python
Raw Permalink Normal View History

import requests
from bs4 import BeautifulSoup
import time
import random
import json
import re
import os
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# Configuration
BASE_URL = "http://www.ishen365.com"
OUTPUT_FILE = "knowledge_data_insert.sql"
LOG_FILE = "knowledge_scraper.log"
MAX_WORKERS = 10
# Lock for file writing
file_lock = threading.Lock()
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36"
}
def log(msg):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
formatted_msg = f"[{timestamp}] {msg}"
print(formatted_msg)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(formatted_msg + "\n")
def get_soup(url):
try:
response = requests.get(url, headers=HEADERS, timeout=10)
response.raise_for_status()
response.encoding = 'utf-8'
return BeautifulSoup(response.text, 'html.parser')
except Exception as e:
log(f"Error fetching {url}: {e}")
return None
def clean_text(text):
if not text:
return ""
return text.strip().replace("'", "''").replace("\\", "\\\\")
def clean_html(html_content):
if not html_content:
return ""
return str(html_content).replace("'", "''").replace("\\", "\\\\")
def save_sql(sql):
with file_lock:
with open(OUTPUT_FILE, "a", encoding="utf-8") as f:
f.write(sql + "\n")
def generate_sql(data, table_name='v2_knowledge'):
title = clean_text(data.get('title', ''))
content = clean_html(data.get('content', ''))
summary = clean_text(data.get('summary', ''))
cover_image = data.get('cover_image', '')
if table_name == 'v2_knowledge':
type_ = data.get('type', 'article')
category = data.get('category', '')
nutrient_name = clean_text(data.get('nutrient_name', ''))
sql = f"""INSERT INTO `v2_knowledge`
(`title`, `content`, `summary`, `cover_image`, `type`, `category`, `nutrient_name`, `status`, `created_at`, `updated_at`)
VALUES
('{title}', '{content}', '{summary}', '{cover_image}', '{type_}', '{category}', '{nutrient_name}', 'published', NOW(), NOW());"""
return sql
elif table_name == 'v2_recipes':
# v2_recipes fields: name, description, cover_image, category, meal_type, ingredients_json, steps_json, status, is_official
# We map content to description
# We try to extract video if available in content (not implemented here but content has it)
sql = f"""INSERT INTO `v2_recipes`
(`name`, `description`, `cover_image`, `category`, `ingredients_json`, `steps_json`, `status`, `is_official`, `created_at`, `updated_at`)
VALUES
('{title}', '{content}', '{cover_image}', 'godkitchen', '[]', '[]', 'published', 1, NOW(), NOW());"""
return sql
elif table_name == 'v2_community_posts':
# v2_community_posts fields: title, content, cover_image, user_id, status, audit_status, privacy
# user_id = 1 (System)
sql = f"""INSERT INTO `v2_community_posts`
(`title`, `content`, `cover_image`, `user_id`, `status`, `audit_status`, `privacy`, `created_at`, `updated_at`)
VALUES
('{title}', '{content}', '{cover_image}', 1, 'published', 'approved', 'public', NOW(), NOW());"""
return sql
return ""
def scrape_detail(url, item_data):
# Fetch detail page to get content
soup = get_soup(url)
if not soup:
return None
# Extract content
# Common pattern: div.txt for articles
# For recipes (godkitchen): div.content_left (contains .video_play and p tags)
content_div = soup.find('div', class_='txt')
if not content_div:
# Try finding content_left for recipes or other structures
content_left = soup.find('div', class_='content_left')
if content_left:
# Remove title and time divs if present to avoid duplication
for div in content_left.find_all('div', class_=['tit', 'tit2', 'lst_btns']):
div.decompose()
content_div = content_left
if content_div:
item_data['content'] = content_div
else:
item_data['content'] = "<p>No content extracted.</p>"
return item_data
def process_item(item, source_type, table_name='v2_knowledge', extra_info=None):
try:
full_data = scrape_detail(item['url'], item)
if full_data:
full_data['type'] = source_type
if extra_info:
full_data.update(extra_info)
sql = generate_sql(full_data, table_name)
save_sql(sql)
return True
except Exception as e:
log(f"Error processing item {item.get('url')}: {e}")
return False
# ---------------------------------------------------------------------------
# Task Specific Scrapers
# ---------------------------------------------------------------------------
def task2_nutrients():
log("Starting Task 2: Nutrients")
url = "http://www.ishen365.com/index.php/rsyys"
soup = get_soup(url)
if not soup:
return
nutrients = []
links = soup.find_all('a')
for link in links:
div = link.find('div', class_='tianchong')
if div:
name = div.get_text(strip=True)
href = link.get('href')
if href:
nutrients.append({'name': name, 'url': urljoin(url, href)})
log(f"Found {len(nutrients)} nutrient categories.")
all_items = []
for nutrient in nutrients:
log(f"Scanning Nutrient: {nutrient['name']}")
n_soup = get_soup(nutrient['url'])
if not n_soup:
continue
list_items = n_soup.find_all('li', class_='am-g')
for li in list_items:
h3 = li.find('h3', class_='am-list-item-hd')
if h3:
a = h3.find('a')
if a:
title = a.get_text(strip=True)
href = urljoin(nutrient['url'], a.get('href'))
summary_div = li.find('div', class_='am-list-item-text')
summary = summary_div.get_text(strip=True) if summary_div else ""
thumb = li.find('div', class_='am-list-item-thumb')
img = thumb.find('img') if thumb else None
img_src = urljoin(nutrient['url'], img.get('src')) if img else ""
all_items.append({
'title': title,
'url': href,
'summary': summary,
'cover_image': img_src,
'nutrient_name': nutrient['name'],
'category': 'nutrients'
})
log(f"Total Nutrient Articles to scrape: {len(all_items)}")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(process_item, item, 'nutrients', 'v2_knowledge') for item in all_items]
for _ in as_completed(futures):
pass
def scrape_list_items(task):
log(f"Processing {task['name']}...")
all_items = []
# Determine start page
start_page = task.get('start_page', 1)
page_url = f"{task['url']}?page={start_page}" if start_page > 1 else task['url']
soup = get_soup(page_url)
if not soup:
return []
current_soup = soup
page_count = start_page
while True:
if task['limit'] and len(all_items) >= task['limit']:
all_items = all_items[:task['limit']]
break
# Extract items
items_list = current_soup.find_all('li', class_='am-g')
new_items_found = False
for li in items_list:
if task['limit'] and len(all_items) >= task['limit']:
break
h3 = li.find('h3', class_='am-list-item-hd')
if h3:
a = h3.find('a')
if a:
title = a.get_text(strip=True)
href = urljoin(task['url'], a.get('href'))
summary_div = li.find('div', class_='am-list-item-text')
summary = summary_div.get_text(strip=True) if summary_div else ""
thumb = li.find('div', class_='am-list-item-thumb')
img = thumb.find('img') if thumb else None
img_src = urljoin(task['url'], img.get('src')) if img else ""
all_items.append({
'title': title,
'url': href,
'summary': summary,
'cover_image': img_src,
'category': task['name']
})
new_items_found = True
if not new_items_found:
# Maybe the page is empty, or structure changed
# If start_page was high (e.g. 21), it might be empty if not that many items.
log(f" No items found on page {page_count}. Stopping.")
break
# Next page
next_link = current_soup.find('a', string=re.compile(r'下一页|Next'))
if not next_link:
break
next_href = next_link.get('href')
if not next_href or next_href == '#':
break
next_url = urljoin(task['url'], next_href)
log(f" Fetching next page: {next_url}")
current_soup = get_soup(next_url)
if not current_soup:
break
page_count += 1
# Safety break for unlimited tasks
if not task['limit'] and page_count > 20:
break
log(f" Total items collected for {task['name']}: {len(all_items)}")
return all_items
def task3_articles_guides():
log("Starting Task 3: Articles & Guides")
tasks = [
{'name': 'scienceteach', 'url': 'http://www.ishen365.com/index.php/article/scienceteach', 'type': 'article', 'table': 'v2_knowledge', 'limit': None},
{'name': 'wiki', 'url': 'http://www.ishen365.com/index.php/wiki', 'type': 'guide', 'table': 'v2_knowledge', 'limit': 100},
{'name': 'doctornews', 'url': 'http://www.ishen365.com/index.php/article/doctornews', 'type': 'article', 'table': 'v2_knowledge', 'limit': 100}
]
for task in tasks:
items = scrape_list_items(task)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(process_item, item, task['type'], task['table']) for item in items]
for _ in as_completed(futures):
pass
def task4_recipes():
log("Starting Task 4: Recipes")
url = "http://www.ishen365.com/index.php/article/godkitchen"
soup = get_soup(url)
if not soup:
return
all_items = []
current_soup = soup
page_count = 1
while True:
video_list_ul = current_soup.find('ul', class_='video_list')
if video_list_ul:
lis = video_list_ul.find_all('li', class_='f_l')
for li in lis:
a = li.find('a', class_='video')
if a:
href = urljoin(url, a.get('href'))
img = a.find('img')
img_src = urljoin(url, img.get('src')) if img else ""
title_span = a.find('span', class_='video_title')
title = title_span.get_text(strip=True) if title_span else ""
all_items.append({
'title': title,
'url': href,
'cover_image': img_src,
'summary': '',
'category': 'godkitchen'
})
next_link = current_soup.find('a', string=re.compile(r'下一页|Next'))
if not next_link:
break
next_href = next_link.get('href')
if not next_href or next_href == '#':
break
next_url = urljoin(url, next_href)
log(f" Fetching next page: {next_url}")
current_soup = get_soup(next_url)
if not current_soup:
break
page_count += 1
if page_count > 20:
break
log(f"Total Recipes to scrape: {len(all_items)}")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(process_item, item, 'recipe', 'v2_recipes') for item in all_items]
for _ in as_completed(futures):
pass
def task5_community():
log("Starting Task 5: Community Posts")
# Source: Wiki, items 200-500
# Page 21 starts at item 201 (assuming 10 per page)
task = {
'name': 'community_wiki',
'url': 'http://www.ishen365.com/index.php/wiki',
'type': 'article', # not used
'table': 'v2_community_posts',
'limit': 300,
'start_page': 21
}
items = scrape_list_items(task)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(process_item, item, 'article', 'v2_community_posts') for item in items]
for _ in as_completed(futures):
pass
def main():
# Initialize output file
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
f.write("-- Bulk Insert SQL for Knowledge, Recipes, and Community\n")
f.write("DELETE FROM `v2_knowledge`;\n")
f.write("DELETE FROM `v2_recipes`;\n")
f.write("DELETE FROM `v2_community_posts`;\n")
task2_nutrients()
task3_articles_guides()
task4_recipes()
task5_community()
log("All tasks completed!")
if __name__ == "__main__":
main()