import requests from bs4 import BeautifulSoup import time import random import json import re import os from urllib.parse import urljoin from concurrent.futures import ThreadPoolExecutor, as_completed import threading # Configuration BASE_URL = "http://www.ishen365.com" OUTPUT_FILE = "knowledge_data_insert.sql" LOG_FILE = "knowledge_scraper.log" MAX_WORKERS = 10 # Lock for file writing file_lock = threading.Lock() HEADERS = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36" } def log(msg): timestamp = time.strftime("%Y-%m-%d %H:%M:%S") formatted_msg = f"[{timestamp}] {msg}" print(formatted_msg) with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(formatted_msg + "\n") def get_soup(url): try: response = requests.get(url, headers=HEADERS, timeout=10) response.raise_for_status() response.encoding = 'utf-8' return BeautifulSoup(response.text, 'html.parser') except Exception as e: log(f"Error fetching {url}: {e}") return None def clean_text(text): if not text: return "" return text.strip().replace("'", "''").replace("\\", "\\\\") def clean_html(html_content): if not html_content: return "" return str(html_content).replace("'", "''").replace("\\", "\\\\") def save_sql(sql): with file_lock: with open(OUTPUT_FILE, "a", encoding="utf-8") as f: f.write(sql + "\n") def generate_sql(data, table_name='v2_knowledge'): title = clean_text(data.get('title', '')) content = clean_html(data.get('content', '')) summary = clean_text(data.get('summary', '')) cover_image = data.get('cover_image', '') if table_name == 'v2_knowledge': type_ = data.get('type', 'article') category = data.get('category', '') nutrient_name = clean_text(data.get('nutrient_name', '')) sql = f"""INSERT INTO `v2_knowledge` (`title`, `content`, `summary`, `cover_image`, `type`, `category`, `nutrient_name`, `status`, `created_at`, `updated_at`) VALUES ('{title}', '{content}', '{summary}', '{cover_image}', '{type_}', '{category}', '{nutrient_name}', 'published', NOW(), NOW());""" return sql elif table_name == 'v2_recipes': # v2_recipes fields: name, description, cover_image, category, meal_type, ingredients_json, steps_json, status, is_official # We map content to description # We try to extract video if available in content (not implemented here but content has it) sql = f"""INSERT INTO `v2_recipes` (`name`, `description`, `cover_image`, `category`, `ingredients_json`, `steps_json`, `status`, `is_official`, `created_at`, `updated_at`) VALUES ('{title}', '{content}', '{cover_image}', 'godkitchen', '[]', '[]', 'published', 1, NOW(), NOW());""" return sql elif table_name == 'v2_community_posts': # v2_community_posts fields: title, content, cover_image, user_id, status, audit_status, privacy # user_id = 1 (System) sql = f"""INSERT INTO `v2_community_posts` (`title`, `content`, `cover_image`, `user_id`, `status`, `audit_status`, `privacy`, `created_at`, `updated_at`) VALUES ('{title}', '{content}', '{cover_image}', 1, 'published', 'approved', 'public', NOW(), NOW());""" return sql return "" def scrape_detail(url, item_data): # Fetch detail page to get content soup = get_soup(url) if not soup: return None # Extract content # Common pattern: div.txt for articles # For recipes (godkitchen): div.content_left (contains .video_play and p tags) content_div = soup.find('div', class_='txt') if not content_div: # Try finding content_left for recipes or other structures content_left = soup.find('div', class_='content_left') if content_left: # Remove title and time divs if present to avoid duplication for div in content_left.find_all('div', class_=['tit', 'tit2', 'lst_btns']): div.decompose() content_div = content_left if content_div: item_data['content'] = content_div else: item_data['content'] = "

No content extracted.

" return item_data def process_item(item, source_type, table_name='v2_knowledge', extra_info=None): try: full_data = scrape_detail(item['url'], item) if full_data: full_data['type'] = source_type if extra_info: full_data.update(extra_info) sql = generate_sql(full_data, table_name) save_sql(sql) return True except Exception as e: log(f"Error processing item {item.get('url')}: {e}") return False # --------------------------------------------------------------------------- # Task Specific Scrapers # --------------------------------------------------------------------------- def task2_nutrients(): log("Starting Task 2: Nutrients") url = "http://www.ishen365.com/index.php/rsyys" soup = get_soup(url) if not soup: return nutrients = [] links = soup.find_all('a') for link in links: div = link.find('div', class_='tianchong') if div: name = div.get_text(strip=True) href = link.get('href') if href: nutrients.append({'name': name, 'url': urljoin(url, href)}) log(f"Found {len(nutrients)} nutrient categories.") all_items = [] for nutrient in nutrients: log(f"Scanning Nutrient: {nutrient['name']}") n_soup = get_soup(nutrient['url']) if not n_soup: continue list_items = n_soup.find_all('li', class_='am-g') for li in list_items: h3 = li.find('h3', class_='am-list-item-hd') if h3: a = h3.find('a') if a: title = a.get_text(strip=True) href = urljoin(nutrient['url'], a.get('href')) summary_div = li.find('div', class_='am-list-item-text') summary = summary_div.get_text(strip=True) if summary_div else "" thumb = li.find('div', class_='am-list-item-thumb') img = thumb.find('img') if thumb else None img_src = urljoin(nutrient['url'], img.get('src')) if img else "" all_items.append({ 'title': title, 'url': href, 'summary': summary, 'cover_image': img_src, 'nutrient_name': nutrient['name'], 'category': 'nutrients' }) log(f"Total Nutrient Articles to scrape: {len(all_items)}") with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [executor.submit(process_item, item, 'nutrients', 'v2_knowledge') for item in all_items] for _ in as_completed(futures): pass def scrape_list_items(task): log(f"Processing {task['name']}...") all_items = [] # Determine start page start_page = task.get('start_page', 1) page_url = f"{task['url']}?page={start_page}" if start_page > 1 else task['url'] soup = get_soup(page_url) if not soup: return [] current_soup = soup page_count = start_page while True: if task['limit'] and len(all_items) >= task['limit']: all_items = all_items[:task['limit']] break # Extract items items_list = current_soup.find_all('li', class_='am-g') new_items_found = False for li in items_list: if task['limit'] and len(all_items) >= task['limit']: break h3 = li.find('h3', class_='am-list-item-hd') if h3: a = h3.find('a') if a: title = a.get_text(strip=True) href = urljoin(task['url'], a.get('href')) summary_div = li.find('div', class_='am-list-item-text') summary = summary_div.get_text(strip=True) if summary_div else "" thumb = li.find('div', class_='am-list-item-thumb') img = thumb.find('img') if thumb else None img_src = urljoin(task['url'], img.get('src')) if img else "" all_items.append({ 'title': title, 'url': href, 'summary': summary, 'cover_image': img_src, 'category': task['name'] }) new_items_found = True if not new_items_found: # Maybe the page is empty, or structure changed # If start_page was high (e.g. 21), it might be empty if not that many items. log(f" No items found on page {page_count}. Stopping.") break # Next page next_link = current_soup.find('a', string=re.compile(r'下一页|Next')) if not next_link: break next_href = next_link.get('href') if not next_href or next_href == '#': break next_url = urljoin(task['url'], next_href) log(f" Fetching next page: {next_url}") current_soup = get_soup(next_url) if not current_soup: break page_count += 1 # Safety break for unlimited tasks if not task['limit'] and page_count > 20: break log(f" Total items collected for {task['name']}: {len(all_items)}") return all_items def task3_articles_guides(): log("Starting Task 3: Articles & Guides") tasks = [ {'name': 'scienceteach', 'url': 'http://www.ishen365.com/index.php/article/scienceteach', 'type': 'article', 'table': 'v2_knowledge', 'limit': None}, {'name': 'wiki', 'url': 'http://www.ishen365.com/index.php/wiki', 'type': 'guide', 'table': 'v2_knowledge', 'limit': 100}, {'name': 'doctornews', 'url': 'http://www.ishen365.com/index.php/article/doctornews', 'type': 'article', 'table': 'v2_knowledge', 'limit': 100} ] for task in tasks: items = scrape_list_items(task) with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [executor.submit(process_item, item, task['type'], task['table']) for item in items] for _ in as_completed(futures): pass def task4_recipes(): log("Starting Task 4: Recipes") url = "http://www.ishen365.com/index.php/article/godkitchen" soup = get_soup(url) if not soup: return all_items = [] current_soup = soup page_count = 1 while True: video_list_ul = current_soup.find('ul', class_='video_list') if video_list_ul: lis = video_list_ul.find_all('li', class_='f_l') for li in lis: a = li.find('a', class_='video') if a: href = urljoin(url, a.get('href')) img = a.find('img') img_src = urljoin(url, img.get('src')) if img else "" title_span = a.find('span', class_='video_title') title = title_span.get_text(strip=True) if title_span else "" all_items.append({ 'title': title, 'url': href, 'cover_image': img_src, 'summary': '', 'category': 'godkitchen' }) next_link = current_soup.find('a', string=re.compile(r'下一页|Next')) if not next_link: break next_href = next_link.get('href') if not next_href or next_href == '#': break next_url = urljoin(url, next_href) log(f" Fetching next page: {next_url}") current_soup = get_soup(next_url) if not current_soup: break page_count += 1 if page_count > 20: break log(f"Total Recipes to scrape: {len(all_items)}") with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [executor.submit(process_item, item, 'recipe', 'v2_recipes') for item in all_items] for _ in as_completed(futures): pass def task5_community(): log("Starting Task 5: Community Posts") # Source: Wiki, items 200-500 # Page 21 starts at item 201 (assuming 10 per page) task = { 'name': 'community_wiki', 'url': 'http://www.ishen365.com/index.php/wiki', 'type': 'article', # not used 'table': 'v2_community_posts', 'limit': 300, 'start_page': 21 } items = scrape_list_items(task) with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [executor.submit(process_item, item, 'article', 'v2_community_posts') for item in items] for _ in as_completed(futures): pass def main(): # Initialize output file with open(OUTPUT_FILE, "w", encoding="utf-8") as f: f.write("-- Bulk Insert SQL for Knowledge, Recipes, and Community\n") f.write("DELETE FROM `v2_knowledge`;\n") f.write("DELETE FROM `v2_recipes`;\n") f.write("DELETE FROM `v2_community_posts`;\n") task2_nutrients() task3_articles_guides() task4_recipes() task5_community() log("All tasks completed!") if __name__ == "__main__": main()