import urllib.request import os import time from urllib.parse import quote from html.parser import HTMLParser import requests # type: ignore from bs4 import BeautifulSoup # type: ignore from urllib.parse import quote_plus class PureHTMLParser(HTMLParser): # ...(保持之前的HTML解析器代码不变)... def __init__(self, cache_dir="cache"): self.user_agent = "Mozilla/5.0" # self.parser = PureHTMLParser() self.cache_dir = cache_dir os.makedirs(cache_dir, exist_ok=True) def _is_cache_valid(self, cache_file): """检查缓存是否有效""" if not os.path.exists(cache_file): return False file_time = os.path.getmtime(cache_file) return (time.time() - file_time) < self.cache_expiry def _get_cache_path(self, query: str) -> str: """生成缓存文件名""" safe_query = "".join(c if c.isalnum() else "_" for c in query) return f"{self.cache_dir}/{safe_query}.txt" def _save_to_cache(self, query: str, data: list): """保存搜索结果到缓存""" with open(self._get_cache_path(query), "w", encoding="utf-8") as f: for item in data: f.write(f"URL: {item['url','']}\n") f.write(f"Text: {'abstract', item.get('text', '')}\n") f.write("="*50 + "\n") def _load_from_cache(self, query: str) -> list: """从缓存加载数据""" cache_file = self._get_cache_path(query) if not os.path.exists(cache_file): return None with open(cache_file, "r", encoding="utf-8") as f: content = f.read() # 解析缓存文件 items = [] for block in content.split("="*50): if not block.strip(): continue url = text = "" for line in block.split("\n"): if line.startswith("URL: "): url = line[5:] elif line.startswith("Text: "): text = line[6:] if url: items.append({"url": url, "text": text}) return items def fetch(self, query, force_update=False): # 确保有默认headers if not hasattr(self, 'headers'): self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } cache_file = os.path.join(self.cache_dir, f"{quote_plus(query)}.json") # 检查缓存是否有效 if not force_update and os.path.exists(cache_file) and self._is_cache_valid(cache_file): return self._load_from_cache(cache_file) try: # 实际抓取逻辑 - 以百度搜索为例 search_url = f"https://www.baidu.com/s?wd={quote_plus(query)}" response = requests.get(search_url, headers=self.headers, timeout=10) response.raise_for_status() # 解析网页内容 soup = BeautifulSoup(response.text, 'html.parser') results = [] # 提取搜索结果 - 百度搜索结果的实际选择器可能需要调整 for item in soup.select('.result.c-container'): title_elem = item.select_one('h3') link_elem = item.find('a') abstract_elem = item.select_one('.c-abstract') if title_elem and link_elem: results.append({ 'title': title_elem.get_text(strip=True), 'url': link_elem.get('href'), 'abstract': abstract_elem.get_text(strip=True) if abstract_elem else "" }) data = { 'query': query, 'results': results if results else [{'title': '无结果', 'url': '', 'abstract': ''}], 'timestamp': int(time.time()), 'sources': [search_url] } # 保存到缓存 self._save_to_cache(cache_file, data) return { 'data': data, 'sources': ["www.baidu.com"] } except Exception as e: # 如果抓取失败但缓存存在,使用缓存 if os.path.exists(cache_file): print(f"抓取失败,使用缓存数据: {str(e)}") return self._load_from_cache(cache_file) raise RuntimeError(f"抓取失败且无缓存可用: {str(e)}")