FC/FFAICilent/crawlers_core.py
Friendfeng b3b036429a renamed: FFAI/__pycache__/analyzer.cpython-313.pyc -> FFAICilent/__pycache__/analyzer.cpython-313.pyc
renamed:    FFAI/__pycache__/catch.cpython-313.pyc -> FFAICilent/__pycache__/catch.cpython-313.pyc
	renamed:    FFAI/__pycache__/crawlers.cpython-313.pyc -> FFAICilent/__pycache__/crawlers.cpython-313.pyc
	renamed:    FFAI/__pycache__/crawlers_core.cpython-313.pyc -> FFAICilent/__pycache__/crawlers_core.cpython-313.pyc
	renamed:    FFAI/analyzer.py -> FFAICilent/analyzer.py
	renamed:    FFAI/catch.py -> FFAICilent/catch.py
	new file:   FFAICilent/cloud.py
	new file:   FFAICilent/config/config.ini
	new file:   FFAICilent/config/configloder.py
	renamed:    FFAI/crawlers.py -> FFAICilent/crawlers.py
	renamed:    FFAI/crawlers_core.py -> FFAICilent/crawlers_core.py
	new file:   FFAICilent/local.py
	new file:   FFAICilent/logger.py
	renamed:    FFAI/main.py -> FFAICilent/main.py
	new file:   FFAICilent/manger.py
2025-06-15 13:29:11 +08:00

183 lines
6.7 KiB
Python

import urllib.request
import urllib.robotparser
from urllib.parse import urlparse
import time
from bs4 import BeautifulSoup
import random
from urllib.parse import quote
from fake_useragent import UserAgent
class CrawlerEngine:
def __init__(self, cache_manager):
self.cache = cache_manager
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'Accept-Language': 'zh-CN,zh;q=0.9'
}
self.ua = UserAgent()
self.search_engines = [
"https://www.baidu.com/s?wd={}",
"https://www.sogou.com/web?query={}",
"https://cn.bing.com/search?q={}"
]
self.delay_range = (2, 5) # 随机延迟秒数
def _can_fetch(self, url) -> bool:
"""检查robots.txt权限"""
try:
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
rp = urllib.robotparser.RobotFileParser()
rp.set_url(f"{base_url}/robots.txt")
rp.read()
return rp.can_fetch(self.headers['User-Agent'], url)
except:
return True
def _get_random_header(self):
return {
'User-Agent': self.ua.random,
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Referer': 'https://www.google.com/'
}
def _smart_delay(self):
time.sleep(random.uniform(*self.delay_range))
def _bypass_anti_spider(self, url):
"""智能绕过反爬策略"""
try:
req = urllib.request.Request(
url,
headers=self._get_random_header(),
method='GET'
)
# 添加代理支持(可选)
# proxy = random.choice(proxies)
# req.set_proxy(proxy, 'http')
with urllib.request.urlopen(req, timeout=15) as response:
if response.status == 200:
return response.read().decode('utf-8', errors='ignore')
return None
except Exception:
return None
def crawl(self, query, max_retries=3):
"""增强版爬取方法"""
cached = self.cache.load_from_cache(query)
if cached:
return cached
for attempt in range(max_retries):
try:
search_url = random.choice(self.search_engines).format(quote(query))
print(f"尝试爬取: {search_url} (第{attempt+1}次)")
html = self._bypass_anti_spider(search_url)
self._smart_delay()
if html:
data = self._extract_data(html)
self.cache.save_to_cache(query, data)
return data
except Exception as e:
print(f"尝试失败: {str(e)}")
if attempt == max_retries - 1:
if cached:
return cached
raise RuntimeError(f"爬取失败且无缓存可用: {str(e)}")
def _extract_data(self, html):
"""使用BeautifulSoup提取数据"""
soup = BeautifulSoup(html, 'html.parser')
# 添加针对不同搜索引擎的解析逻辑
results = []
for item in soup.select('.result, .res, .b_algo')[:10]: # 通用选择器
title = item.find('h3')
link = item.find('a', href=True)
if title and link:
results.append({
'title': title.get_text(strip=True),
'url': link['href'],
'snippet': item.find('p').get_text(strip=True)[:200] if item.find('p') else ''
})
return {'query': query, 'results': results}
def _fetch_html(self, url) -> str:
"""安全获取网页内容"""
if not self._can_fetch(url):
raise PermissionError(f"无权限爬取: {url}")
req = urllib.request.Request(url, headers=self.headers)
try:
with urllib.request.urlopen(req, timeout=10) as response:
if response.status == 200:
return response.read().decode('utf-8')
raise ConnectionError(f"HTTP {response.status}")
except Exception as e:
raise ConnectionError(f"获取失败: {url} - {str(e)}")
def _extract_content(self, html: str) -> dict:
"""从HTML提取结构化数据"""
soup = BeautifulSoup(html, 'html.parser')
# 移除不需要的标签
for tag in ['script', 'style', 'nav', 'footer']:
for element in soup(tag):
element.decompose()
# 提取核心内容
title = soup.title.string if soup.title else ''
text = ' '.join(p.get_text() for p in soup.find_all('p'))
return {
'title': title.strip(),
'content': text.strip(),
'links': [a['href'] for a in soup.find_all('a', href=True)]
}
def crawl(self, query: str, max_results=5) -> dict:
"""执行完整爬取流程"""
# 先检查缓存
cached = self.cache.load_from_cache(query)
if cached:
print(f"使用缓存数据: {query}")
return cached
print(f"开始爬取: {query}")
results = []
try:
# 模拟搜索引擎查询(示例使用百度)
search_url = f"https://www.bing.com/search?q={urllib.parse.quote(query)}"
html = self._fetch_html(search_url)
data = self._extract_content(html)
# 限制抓取数量并添加延迟
for link in data['links'][:max_results]:
if link.startswith('http'):
try:
page_html = self._fetch_html(link)
page_data = self._extract_content(page_html)
results.append({
'source_url': link,
'title': page_data['title'],
'content': page_data['content']
})
time.sleep(self.delay)
except Exception as e:
print(f"子页面抓取失败: {link} - {str(e)}")
# 保存结果到缓存
result_data = {'query': query, 'results': results}
self.cache.save_to_cache(query, result_data)
return result_data
except Exception as e:
print(f"爬取失败: {str(e)}")
if cached:
return cached
raise RuntimeError(f"爬取失败且无缓存可用: {str(e)}")