119 lines
4.6 KiB
Python
119 lines
4.6 KiB
Python
import urllib.request
|
||
import os
|
||
import time
|
||
from urllib.parse import quote
|
||
from html.parser import HTMLParser
|
||
import requests # type: ignore
|
||
from bs4 import BeautifulSoup # type: ignore
|
||
from urllib.parse import quote_plus
|
||
|
||
class PureHTMLParser(HTMLParser):
|
||
|
||
# ...(保持之前的HTML解析器代码不变)...
|
||
def __init__(self, cache_dir="cache"):
|
||
self.user_agent = "Mozilla/5.0"
|
||
# self.parser = PureHTMLParser()
|
||
self.cache_dir = cache_dir
|
||
os.makedirs(cache_dir, exist_ok=True)
|
||
|
||
def _is_cache_valid(self, cache_file):
|
||
"""检查缓存是否有效"""
|
||
if not os.path.exists(cache_file):
|
||
return False
|
||
|
||
file_time = os.path.getmtime(cache_file)
|
||
return (time.time() - file_time) < self.cache_expiry
|
||
|
||
def _get_cache_path(self, query: str) -> str:
|
||
"""生成缓存文件名"""
|
||
safe_query = "".join(c if c.isalnum() else "_" for c in query)
|
||
return f"{self.cache_dir}/{safe_query}.txt"
|
||
|
||
def _save_to_cache(self, query: str, data: list):
|
||
"""保存搜索结果到缓存"""
|
||
with open(self._get_cache_path(query), "w", encoding="utf-8") as f:
|
||
for item in data:
|
||
f.write(f"URL: {item['url','']}\n")
|
||
f.write(f"Text: {'abstract', item.get('text', '')}\n")
|
||
f.write("="*50 + "\n")
|
||
|
||
def _load_from_cache(self, query: str) -> list:
|
||
"""从缓存加载数据"""
|
||
cache_file = self._get_cache_path(query)
|
||
if not os.path.exists(cache_file):
|
||
return None
|
||
|
||
with open(cache_file, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
# 解析缓存文件
|
||
items = []
|
||
for block in content.split("="*50):
|
||
if not block.strip():
|
||
continue
|
||
url = text = ""
|
||
for line in block.split("\n"):
|
||
if line.startswith("URL: "):
|
||
url = line[5:]
|
||
elif line.startswith("Text: "):
|
||
text = line[6:]
|
||
if url:
|
||
items.append({"url": url, "text": text})
|
||
return items
|
||
|
||
def fetch(self, query, force_update=False):
|
||
# 确保有默认headers
|
||
if not hasattr(self, 'headers'):
|
||
self.headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
||
}
|
||
|
||
cache_file = os.path.join(self.cache_dir, f"{quote_plus(query)}.json")
|
||
|
||
# 检查缓存是否有效
|
||
if not force_update and os.path.exists(cache_file) and self._is_cache_valid(cache_file):
|
||
return self._load_from_cache(cache_file)
|
||
|
||
try:
|
||
# 实际抓取逻辑 - 以百度搜索为例
|
||
search_url = f"https://www.baidu.com/s?wd={quote_plus(query)}"
|
||
response = requests.get(search_url, headers=self.headers, timeout=10)
|
||
response.raise_for_status()
|
||
|
||
# 解析网页内容
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
results = []
|
||
|
||
# 提取搜索结果 - 百度搜索结果的实际选择器可能需要调整
|
||
for item in soup.select('.result.c-container'):
|
||
title_elem = item.select_one('h3')
|
||
link_elem = item.find('a')
|
||
abstract_elem = item.select_one('.c-abstract')
|
||
|
||
if title_elem and link_elem:
|
||
results.append({
|
||
'title': title_elem.get_text(strip=True),
|
||
'url': link_elem.get('href'),
|
||
'abstract': abstract_elem.get_text(strip=True) if abstract_elem else ""
|
||
})
|
||
|
||
data = {
|
||
'query': query,
|
||
'results': results if results else [{'title': '无结果', 'url': '', 'abstract': ''}],
|
||
'timestamp': int(time.time()),
|
||
'sources': [search_url]
|
||
}
|
||
|
||
# 保存到缓存
|
||
self._save_to_cache(cache_file, data)
|
||
return {
|
||
'data': data,
|
||
'sources': ["www.baidu.com"]
|
||
}
|
||
|
||
except Exception as e:
|
||
# 如果抓取失败但缓存存在,使用缓存
|
||
if os.path.exists(cache_file):
|
||
print(f"抓取失败,使用缓存数据: {str(e)}")
|
||
return self._load_from_cache(cache_file)
|
||
raise RuntimeError(f"抓取失败且无缓存可用: {str(e)}") |