new file: FFAI/__pycache__/catch.cpython-313.pyc

modified:   FFAI/__pycache__/crawlers.cpython-313.pyc
	new file:   FFAI/__pycache__/crawlers_core.cpython-313.pyc
	new file:   缓存文件
	modified:   旧文件
	new file:   爬虫文件
	modified:   主文件
	new file:   cache/cache__E4_BA_BA_E5_B7_A5_E6_99_BA_E8_83_BD_json.txt
	new file:   测试文件
	modified:   readme.md
This commit is contained in:
Friendfeng 2025-06-07 09:01:37 +08:00
parent 583e7574ee
commit 6b7ae8f26e
10 changed files with 274 additions and 108 deletions

Binary file not shown.

Binary file not shown.

49
FFAI/catch.py Normal file
View File

@ -0,0 +1,49 @@
import os
import json
import hashlib
from datetime import datetime
class CacheManager:
def __init__(self, cache_dir=".cache"):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
def _get_cache_path(self, query: str) -> str:
"""生成基于查询内容的缓存文件名"""
query_hash = hashlib.md5(query.encode('utf-8')).hexdigest()
return os.path.join(self.cache_dir, f"{query_hash}.json")
def save_to_cache(self, query: str, data: dict) -> bool:
"""保存数据到缓存(带时间戳)"""
cache_data = {
'timestamp': datetime.now().isoformat(),
'query': query,
'data': data
}
try:
with open(self._get_cache_path(query), 'w', encoding='utf-8') as f:
json.dump(cache_data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"缓存保存失败: {e}")
return False
def load_from_cache(self, query: str, max_age_hours=24) -> dict:
"""从缓存加载数据(可设置最大有效期)"""
cache_file = self._get_cache_path(query)
if not os.path.exists(cache_file):
return None
try:
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
# 检查缓存有效期
cache_time = datetime.fromisoformat(cache_data['timestamp'])
if (datetime.now() - cache_time).total_seconds() > max_age_hours * 3600:
return None
return cache_data['data']
except Exception as e:
print(f"缓存读取失败: {e}")
return None

View File

@ -1,119 +1,128 @@
import urllib.request # from typing import Self
import os # import urllib.request
import time # import os
from urllib.parse import quote # import time
from html.parser import HTMLParser # from urllib.parse import quote
import requests # type: ignore # from html.parser import HTMLParser
from bs4 import BeautifulSoup # type: ignore # import requests # type: ignore
from urllib.parse import quote_plus # from bs4 import BeautifulSoup # type: ignore
# from urllib.parse import quote_plus
class PureHTMLParser(HTMLParser): # class PureHTMLParser(HTMLParser):
# ...保持之前的HTML解析器代码不变... # def __init__(self, cache_dir="cache"):
def __init__(self, cache_dir="cache"): # self.user_agent = "Mozilla/5.0"
self.user_agent = "Mozilla/5.0" # # self.parser = PureHTMLParser()
# self.parser = PureHTMLParser() # self.cache_dir = cache_dir
self.cache_dir = cache_dir # os.makedirs(cache_dir, exist_ok=True)
os.makedirs(cache_dir, exist_ok=True)
def _is_cache_valid(self, cache_file): # def _is_cache_valid(self, cache_file):
"""检查缓存是否有效""" # """检查缓存是否有效"""
if not os.path.exists(cache_file): # if not os.path.exists(cache_file):
return False # return False
file_time = os.path.getmtime(cache_file) # file_time = os.path.getmtime(cache_file)
return (time.time() - file_time) < self.cache_expiry # return (time.time() - file_time) < self.cache_expiry
def _get_cache_path(self, query: str) -> str: # def _get_cache_path(self, query: str) -> str:
"""生成缓存文件名""" # """生成缓存文件名"""
safe_query = "".join(c if c.isalnum() else "_" for c in query) # safe_query = "".join(c if c.isalnum() else "_" for c in query)
return f"{self.cache_dir}/{safe_query}.txt" # return f"{self.cache_dir}/{safe_query}.txt"
def _save_to_cache(self, query: str, data: list): # def _save_to_cache(self, query: str, data: list):
"""保存搜索结果到缓存""" # """保存搜索结果到缓存(修正版)"""
with open(self._get_cache_path(query), "w", encoding="utf-8") as f: # cache_file = self._get_cache_path(query)
for item in data: # try:
f.write(f"URL: {item['url','']}\n") # with open(cache_file, "w", encoding="utf-8") as f:
f.write(f"Text: {'abstract', item.get('text', '')}\n") # for item in data:
f.write("="*50 + "\n") # # 修正点确保item是字典且包含url键
# url = item.get('url', '') # 安全访问
# text = item.get('text', '')
# f.write(f"URL: {url}\n")
# f.write(f"Text: {text}\n")
# f.write("="*50 + "\n")
# except Exception as e:
# print(f"缓存保存失败: {e}")
# def _load_from_cache(self, query: str) -> list:
# """从缓存加载数据"""
# cache_file = self._get_cache_path(query)
# if not os.path.exists(cache_file):
# return None
def _load_from_cache(self, query: str) -> list: # with open(cache_file, "r", encoding="utf-8") as f:
"""从缓存加载数据""" # content = f.read()
cache_file = self._get_cache_path(query)
if not os.path.exists(cache_file):
return None
with open(cache_file, "r", encoding="utf-8") as f: # # 解析缓存文件
content = f.read() # items = []
# for block in content.split("="*50):
# if not block.strip():
# continue
# url = text = ""
# for line in block.split("\n"):
# if line.startswith("URL: "):
# url = line[5:]
# elif line.startswith("Text: "):
# text = line[6:]
# if url:
# items.append({"url": url, "text": text})
# return items
# 解析缓存文件 # def fetch(self, query, force_update=False):
items = []
for block in content.split("="*50):
if not block.strip():
continue
url = text = ""
for line in block.split("\n"):
if line.startswith("URL: "):
url = line[5:]
elif line.startswith("Text: "):
text = line[6:]
if url:
items.append({"url": url, "text": text})
return items
def fetch(self, query, force_update=False):
# 确保有默认headers
if not hasattr(self, 'headers'):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
cache_file = os.path.join(self.cache_dir, f"{quote_plus(query)}.json")
# 检查缓存是否有效 # # 确保有默认headers
if not force_update and os.path.exists(cache_file) and self._is_cache_valid(cache_file): # if not hasattr(self, 'headers'):
return self._load_from_cache(cache_file) # self.headers = {
# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
# }
try: # cache_file = os.path.join(self.cache_dir, f"{quote_plus(query)}.json")
# 实际抓取逻辑 - 以百度搜索为例
search_url = f"https://www.baidu.com/s?wd={quote_plus(query)}"
response = requests.get(search_url, headers=self.headers, timeout=10)
response.raise_for_status()
# 解析网页内容 # # 检查缓存是否有效
soup = BeautifulSoup(response.text, 'html.parser') # if not force_update and os.path.exists(cache_file) and self._is_cache_valid(cache_file):
results = [] # return self._load_from_cache(cache_file)
# 提取搜索结果 - 百度搜索结果的实际选择器可能需要调整 # try:
for item in soup.select('.result.c-container'): # # 实际抓取
title_elem = item.select_one('h3') # search_url = f"https://www.baidu.com/s?wd={quote_plus(query)}"
link_elem = item.find('a') # response = requests.get(search_url, headers=self.headers, timeout=10)
abstract_elem = item.select_one('.c-abstract') # response.raise_for_status()
if title_elem and link_elem: # # 解析网页内容
results.append({ # soup = BeautifulSoup(response.text, 'html.parser')
'title': title_elem.get_text(strip=True), # results = []
'url': link_elem.get('href'),
'abstract': abstract_elem.get_text(strip=True) if abstract_elem else ""
})
data = { # # 提取搜索结果 - 百度搜索结果的实际选择器可能需要调整
'query': query, # for item in soup.select('.result.c-container'):
'results': results if results else [{'title': '无结果', 'url': '', 'abstract': ''}], # title_elem = item.select_one('h3')
'timestamp': int(time.time()), # link_elem = item.find('a')
'sources': [search_url] # abstract_elem = item.select_one('.c-abstract')
}
# 保存到缓存 # if title_elem and link_elem:
self._save_to_cache(cache_file, data) # results.append({
return { # 'title': title_elem.get_text(strip=True),
'data': data, # 'url': link_elem.get('href'),
'sources': ["www.baidu.com"] # 'abstract': abstract_elem.get_text(strip=True) if abstract_elem else ""
} # })
except Exception as e: # data = {
# 如果抓取失败但缓存存在,使用缓存 # 'query': query,
if os.path.exists(cache_file): # 'results': results if results else [{'title': '无结果', 'url': '', 'abstract': ''}],
print(f"抓取失败,使用缓存数据: {str(e)}") # 'timestamp': int(time.time()),
return self._load_from_cache(cache_file) # 'sources': [search_url]
raise RuntimeError(f"抓取失败且无缓存可用: {str(e)}") # }
# # 保存到缓存
# self._save_to_cache(cache_file, data)
# return {
# 'data': data,
# 'sources': ["www.baidu.com"]
# }
# except Exception as e:
# # 如果抓取失败但缓存存在,使用缓存
# if os.path.exists(cache_file):
# print(f"抓取失败,使用缓存数据: {str(e)}")
# return self._load_from_cache(cache_file)
# raise RuntimeError(f"抓取失败且无缓存可用: {str(e)}")

102
FFAI/crawlers_core.py Normal file
View File

@ -0,0 +1,102 @@
import urllib.request
import urllib.robotparser
from urllib.parse import urlparse
import time
from bs4 import BeautifulSoup
class CrawlerEngine:
def __init__(self, cache_manager):
self.cache = cache_manager
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'Accept-Language': 'zh-CN,zh;q=0.9'
}
self.delay = 2 # 爬取延迟(秒)
def _can_fetch(self, url) -> bool:
"""检查robots.txt权限"""
try:
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
rp = urllib.robotparser.RobotFileParser()
rp.set_url(f"{base_url}/robots.txt")
rp.read()
return rp.can_fetch(self.headers['User-Agent'], url)
except:
return True
def _fetch_html(self, url) -> str:
"""安全获取网页内容"""
if not self._can_fetch(url):
raise PermissionError(f"无权限爬取: {url}")
req = urllib.request.Request(url, headers=self.headers)
try:
with urllib.request.urlopen(req, timeout=10) as response:
if response.status == 200:
return response.read().decode('utf-8')
raise ConnectionError(f"HTTP {response.status}")
except Exception as e:
raise ConnectionError(f"获取失败: {url} - {str(e)}")
def _extract_content(self, html: str) -> dict:
"""从HTML提取结构化数据"""
soup = BeautifulSoup(html, 'html.parser')
# 移除不需要的标签
for tag in ['script', 'style', 'nav', 'footer']:
for element in soup(tag):
element.decompose()
# 提取核心内容
title = soup.title.string if soup.title else ''
text = ' '.join(p.get_text() for p in soup.find_all('p'))
return {
'title': title.strip(),
'content': text.strip(),
'links': [a['href'] for a in soup.find_all('a', href=True)]
}
def crawl(self, query: str, max_results=5) -> dict:
"""执行完整爬取流程"""
# 先检查缓存
cached = self.cache.load_from_cache(query)
if cached:
print(f"使用缓存数据: {query}")
return cached
print(f"开始爬取: {query}")
results = []
try:
# 模拟搜索引擎查询(示例使用百度)
search_url = f"https://www.baidu.com/s?wd={urllib.parse.quote(query)}"
html = self._fetch_html(search_url)
data = self._extract_content(html)
# 限制抓取数量并添加延迟
for link in data['links'][:max_results]:
if link.startswith('http'):
try:
page_html = self._fetch_html(link)
page_data = self._extract_content(page_html)
results.append({
'source_url': link,
'title': page_data['title'],
'content': page_data['content']
})
time.sleep(self.delay)
except Exception as e:
print(f"子页面抓取失败: {link} - {str(e)}")
# 保存结果到缓存
result_data = {'query': query, 'results': results}
self.cache.save_to_cache(query, result_data)
return result_data
except Exception as e:
print(f"爬取失败: {str(e)}")
if cached:
return cached
raise RuntimeError(f"爬取失败且无缓存可用: {str(e)}")

View File

@ -1,14 +1,19 @@
from crawlers import PureHTMLParser # type: ignore # from crawlers import PureHTMLParser # type: ignore
from analyzer import PureAnalyzer # type: ignore from analyzer import PureAnalyzer # type: ignore
from crawlers_core import CrawlerEngine
from catch import CacheManager
class PureInfoHunter: class PureInfoHunter:
def __init__(self): def __init__(self):
self.crawler = PureHTMLParser() self.cache_manager = CacheManager()
self.crawler = CrawlerEngine(self.cache_manager)
self.analyzer = PureAnalyzer() self.analyzer = PureAnalyzer()
self.catch = CacheManager()
def run(self, query: str): def run(self, query: str):
# 1. 获取数据(优先缓存) # 1. 获取数据(优先缓存)
data = self.crawler.fetch(query) data = self.catch(query)
# 2. 分析(自动检索历史缓存) # 2. 分析(自动检索历史缓存)
result = self.analyzer.analyze(data, query) result = self.analyzer.analyze(data, query)
@ -46,16 +51,17 @@ if __name__ == "__main__":
print("使用方法: python pure_main.py '搜索关键词' [force_update]") print("使用方法: python pure_main.py '搜索关键词' [force_update]")
print("示例: python pure_main.py '人工智能' true") print("示例: python pure_main.py '人工智能' true")
query = input("请输入要搜索的关键词: ") # 改为交互式输入 query = input("请输入要搜索的关键词: ") # 改为交互式输入
force_update = input("是否强制更新(true/false)? ").lower() == "true" force_update = input("是否强制更新(true/false)? ").lower() == "true"
else: else:
query = sys.argv[1] # query = sys.argv[1]
force_update = len(sys.argv) > 2 and sys.argv[2].lower() == "true" force_update = len(sys.argv) > 2 and sys.argv[2].lower() == "true"
hunter = PureInfoHunter() hunter = PureInfoHunter()
if force_update: if force_update:
print("强制更新模式(忽略缓存)") print("强制更新模式(忽略缓存)")
data = hunter.crawler.fetch(query) # 使用实际存在的方法名 data = hunter.crawler.crawl(query) # 使用实际存在的方法名
result = hunter.analyzer.analyze(data, query) result = hunter.analyzer.analyze(data, query)
else: else:
result = hunter.run(query) result = hunter.run(query)

0
cache/cache_bilibili_json.txt vendored Normal file
View File

View File

@ -8,7 +8,7 @@
WindowsCMD到FFAIall或者FFAInobug文件夹下然后使用***python main.py 你要问的内容*** WindowsCMD到FFAIall或者FFAInobug文件夹下然后使用***python main.py 你要问的内容***
# 通知 # 通知:可以正常运行了
## 调试版本 ## 调试版本