import urllib.request import urllib.robotparser from urllib.parse import urlparse import time from bs4 import BeautifulSoup import random from urllib.parse import quote from fake_useragent import UserAgent class CrawlerEngine: def __init__(self, cache_manager): self.cache = cache_manager self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)', 'Accept-Language': 'zh-CN,zh;q=0.9' } self.ua = UserAgent() self.search_engines = [ "https://www.baidu.com/s?wd={}", "https://www.sogou.com/web?query={}", "https://cn.bing.com/search?q={}" ] self.delay_range = (2, 5) # 随机延迟秒数 def _can_fetch(self, url) -> bool: """检查robots.txt权限""" try: parsed = urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}" rp = urllib.robotparser.RobotFileParser() rp.set_url(f"{base_url}/robots.txt") rp.read() return rp.can_fetch(self.headers['User-Agent'], url) except: return True def _get_random_header(self): return { 'User-Agent': self.ua.random, 'Accept': 'text/html,application/xhtml+xml', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Referer': 'https://www.google.com/' } def _smart_delay(self): time.sleep(random.uniform(*self.delay_range)) def _bypass_anti_spider(self, url): """智能绕过反爬策略""" try: req = urllib.request.Request( url, headers=self._get_random_header(), method='GET' ) # 添加代理支持(可选) # proxy = random.choice(proxies) # req.set_proxy(proxy, 'http') with urllib.request.urlopen(req, timeout=15) as response: if response.status == 200: return response.read().decode('utf-8', errors='ignore') return None except Exception: return None def crawl(self, query, max_retries=3): """增强版爬取方法""" cached = self.cache.load_from_cache(query) if cached: return cached for attempt in range(max_retries): try: search_url = random.choice(self.search_engines).format(quote(query)) print(f"尝试爬取: {search_url} (第{attempt+1}次)") html = self._bypass_anti_spider(search_url) self._smart_delay() if html: data = self._extract_data(html) self.cache.save_to_cache(query, data) return data except Exception as e: print(f"尝试失败: {str(e)}") if attempt == max_retries - 1: if cached: return cached raise RuntimeError(f"爬取失败且无缓存可用: {str(e)}") def _extract_data(self, html): """使用BeautifulSoup提取数据""" soup = BeautifulSoup(html, 'html.parser') # 添加针对不同搜索引擎的解析逻辑 results = [] for item in soup.select('.result, .res, .b_algo')[:10]: # 通用选择器 title = item.find('h3') link = item.find('a', href=True) if title and link: results.append({ 'title': title.get_text(strip=True), 'url': link['href'], 'snippet': item.find('p').get_text(strip=True)[:200] if item.find('p') else '' }) return {'query': query, 'results': results} def _fetch_html(self, url) -> str: """安全获取网页内容""" if not self._can_fetch(url): raise PermissionError(f"无权限爬取: {url}") req = urllib.request.Request(url, headers=self.headers) try: with urllib.request.urlopen(req, timeout=10) as response: if response.status == 200: return response.read().decode('utf-8') raise ConnectionError(f"HTTP {response.status}") except Exception as e: raise ConnectionError(f"获取失败: {url} - {str(e)}") def _extract_content(self, html: str) -> dict: """从HTML提取结构化数据""" soup = BeautifulSoup(html, 'html.parser') # 移除不需要的标签 for tag in ['script', 'style', 'nav', 'footer']: for element in soup(tag): element.decompose() # 提取核心内容 title = soup.title.string if soup.title else '' text = ' '.join(p.get_text() for p in soup.find_all('p')) return { 'title': title.strip(), 'content': text.strip(), 'links': [a['href'] for a in soup.find_all('a', href=True)] } def crawl(self, query: str, max_results=5) -> dict: """执行完整爬取流程""" # 先检查缓存 cached = self.cache.load_from_cache(query) if cached: print(f"使用缓存数据: {query}") return cached print(f"开始爬取: {query}") results = [] try: # 模拟搜索引擎查询(示例使用百度) search_url = f"https://www.bing.com/search?q={urllib.parse.quote(query)}" html = self._fetch_html(search_url) data = self._extract_content(html) # 限制抓取数量并添加延迟 for link in data['links'][:max_results]: if link.startswith('http'): try: page_html = self._fetch_html(link) page_data = self._extract_content(page_html) results.append({ 'source_url': link, 'title': page_data['title'], 'content': page_data['content'] }) time.sleep(self.delay) except Exception as e: print(f"子页面抓取失败: {link} - {str(e)}") # 保存结果到缓存 result_data = {'query': query, 'results': results} self.cache.save_to_cache(query, result_data) return result_data except Exception as e: print(f"爬取失败: {str(e)}") if cached: return cached raise RuntimeError(f"爬取失败且无缓存可用: {str(e)}")