diff --git a/FFAI/__pycache__/catch.cpython-313.pyc b/FFAI/__pycache__/catch.cpython-313.pyc new file mode 100644 index 0000000..bfaa071 Binary files /dev/null and b/FFAI/__pycache__/catch.cpython-313.pyc differ diff --git a/FFAI/__pycache__/crawlers.cpython-313.pyc b/FFAI/__pycache__/crawlers.cpython-313.pyc index 7502fc1..4e6ee63 100644 Binary files a/FFAI/__pycache__/crawlers.cpython-313.pyc and b/FFAI/__pycache__/crawlers.cpython-313.pyc differ diff --git a/FFAI/__pycache__/crawlers_core.cpython-313.pyc b/FFAI/__pycache__/crawlers_core.cpython-313.pyc new file mode 100644 index 0000000..d09c6e8 Binary files /dev/null and b/FFAI/__pycache__/crawlers_core.cpython-313.pyc differ diff --git a/FFAI/catch.py b/FFAI/catch.py new file mode 100644 index 0000000..33029f8 --- /dev/null +++ b/FFAI/catch.py @@ -0,0 +1,49 @@ +import os +import json +import hashlib +from datetime import datetime + +class CacheManager: + def __init__(self, cache_dir=".cache"): + self.cache_dir = cache_dir + os.makedirs(cache_dir, exist_ok=True) + + def _get_cache_path(self, query: str) -> str: + """生成基于查询内容的缓存文件名""" + query_hash = hashlib.md5(query.encode('utf-8')).hexdigest() + return os.path.join(self.cache_dir, f"{query_hash}.json") + + def save_to_cache(self, query: str, data: dict) -> bool: + """保存数据到缓存(带时间戳)""" + cache_data = { + 'timestamp': datetime.now().isoformat(), + 'query': query, + 'data': data + } + try: + with open(self._get_cache_path(query), 'w', encoding='utf-8') as f: + json.dump(cache_data, f, ensure_ascii=False, indent=2) + return True + except Exception as e: + print(f"缓存保存失败: {e}") + return False + + def load_from_cache(self, query: str, max_age_hours=24) -> dict: + """从缓存加载数据(可设置最大有效期)""" + cache_file = self._get_cache_path(query) + if not os.path.exists(cache_file): + return None + + try: + with open(cache_file, 'r', encoding='utf-8') as f: + cache_data = json.load(f) + + # 检查缓存有效期 + cache_time = datetime.fromisoformat(cache_data['timestamp']) + if (datetime.now() - cache_time).total_seconds() > max_age_hours * 3600: + return None + + return cache_data['data'] + except Exception as e: + print(f"缓存读取失败: {e}") + return None \ No newline at end of file diff --git a/FFAI/crawlers.py b/FFAI/crawlers.py index ed37cb3..fceb4a6 100644 --- a/FFAI/crawlers.py +++ b/FFAI/crawlers.py @@ -1,119 +1,128 @@ -import urllib.request -import os -import time -from urllib.parse import quote -from html.parser import HTMLParser -import requests # type: ignore -from bs4 import BeautifulSoup # type: ignore -from urllib.parse import quote_plus +# from typing import Self +# import urllib.request +# import os +# import time +# from urllib.parse import quote +# from html.parser import HTMLParser +# import requests # type: ignore +# from bs4 import BeautifulSoup # type: ignore +# from urllib.parse import quote_plus -class PureHTMLParser(HTMLParser): +# class PureHTMLParser(HTMLParser): - # ...(保持之前的HTML解析器代码不变)... - def __init__(self, cache_dir="cache"): - self.user_agent = "Mozilla/5.0" - # self.parser = PureHTMLParser() - self.cache_dir = cache_dir - os.makedirs(cache_dir, exist_ok=True) +# def __init__(self, cache_dir="cache"): +# self.user_agent = "Mozilla/5.0" +# # self.parser = PureHTMLParser() +# self.cache_dir = cache_dir +# os.makedirs(cache_dir, exist_ok=True) - def _is_cache_valid(self, cache_file): - """检查缓存是否有效""" - if not os.path.exists(cache_file): - return False +# def _is_cache_valid(self, cache_file): +# """检查缓存是否有效""" +# if not os.path.exists(cache_file): +# return False - file_time = os.path.getmtime(cache_file) - return (time.time() - file_time) < self.cache_expiry +# file_time = os.path.getmtime(cache_file) +# return (time.time() - file_time) < self.cache_expiry - def _get_cache_path(self, query: str) -> str: - """生成缓存文件名""" - safe_query = "".join(c if c.isalnum() else "_" for c in query) - return f"{self.cache_dir}/{safe_query}.txt" +# def _get_cache_path(self, query: str) -> str: +# """生成缓存文件名""" +# safe_query = "".join(c if c.isalnum() else "_" for c in query) +# return f"{self.cache_dir}/{safe_query}.txt" - def _save_to_cache(self, query: str, data: list): - """保存搜索结果到缓存""" - with open(self._get_cache_path(query), "w", encoding="utf-8") as f: - for item in data: - f.write(f"URL: {item['url','']}\n") - f.write(f"Text: {'abstract', item.get('text', '')}\n") - f.write("="*50 + "\n") +# def _save_to_cache(self, query: str, data: list): +# """保存搜索结果到缓存(修正版)""" +# cache_file = self._get_cache_path(query) +# try: +# with open(cache_file, "w", encoding="utf-8") as f: +# for item in data: +# # 修正点:确保item是字典且包含url键 +# url = item.get('url', '') # 安全访问 +# text = item.get('text', '') +# f.write(f"URL: {url}\n") +# f.write(f"Text: {text}\n") +# f.write("="*50 + "\n") +# except Exception as e: +# print(f"缓存保存失败: {e}") +# def _load_from_cache(self, query: str) -> list: +# """从缓存加载数据""" +# cache_file = self._get_cache_path(query) +# if not os.path.exists(cache_file): +# return None + +# with open(cache_file, "r", encoding="utf-8") as f: +# content = f.read() + +# # 解析缓存文件 +# items = [] +# for block in content.split("="*50): +# if not block.strip(): +# continue +# url = text = "" +# for line in block.split("\n"): +# if line.startswith("URL: "): +# url = line[5:] +# elif line.startswith("Text: "): +# text = line[6:] +# if url: +# items.append({"url": url, "text": text}) +# return items - def _load_from_cache(self, query: str) -> list: - """从缓存加载数据""" - cache_file = self._get_cache_path(query) - if not os.path.exists(cache_file): - return None +# def fetch(self, query, force_update=False): + - with open(cache_file, "r", encoding="utf-8") as f: - content = f.read() + +# # 确保有默认headers +# if not hasattr(self, 'headers'): +# self.headers = { +# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' +# } - # 解析缓存文件 - items = [] - for block in content.split("="*50): - if not block.strip(): - continue - url = text = "" - for line in block.split("\n"): - if line.startswith("URL: "): - url = line[5:] - elif line.startswith("Text: "): - text = line[6:] - if url: - items.append({"url": url, "text": text}) - return items - - def fetch(self, query, force_update=False): - # 确保有默认headers - if not hasattr(self, 'headers'): - self.headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' - } +# cache_file = os.path.join(self.cache_dir, f"{quote_plus(query)}.json") - cache_file = os.path.join(self.cache_dir, f"{quote_plus(query)}.json") - - # 检查缓存是否有效 - if not force_update and os.path.exists(cache_file) and self._is_cache_valid(cache_file): - return self._load_from_cache(cache_file) +# # 检查缓存是否有效 +# if not force_update and os.path.exists(cache_file) and self._is_cache_valid(cache_file): +# return self._load_from_cache(cache_file) - try: - # 实际抓取逻辑 - 以百度搜索为例 - search_url = f"https://www.baidu.com/s?wd={quote_plus(query)}" - response = requests.get(search_url, headers=self.headers, timeout=10) - response.raise_for_status() +# try: +# # 实际抓取 +# search_url = f"https://www.baidu.com/s?wd={quote_plus(query)}" +# response = requests.get(search_url, headers=self.headers, timeout=10) +# response.raise_for_status() - # 解析网页内容 - soup = BeautifulSoup(response.text, 'html.parser') - results = [] +# # 解析网页内容 +# soup = BeautifulSoup(response.text, 'html.parser') +# results = [] - # 提取搜索结果 - 百度搜索结果的实际选择器可能需要调整 - for item in soup.select('.result.c-container'): - title_elem = item.select_one('h3') - link_elem = item.find('a') - abstract_elem = item.select_one('.c-abstract') +# # 提取搜索结果 - 百度搜索结果的实际选择器可能需要调整 +# for item in soup.select('.result.c-container'): +# title_elem = item.select_one('h3') +# link_elem = item.find('a') +# abstract_elem = item.select_one('.c-abstract') - if title_elem and link_elem: - results.append({ - 'title': title_elem.get_text(strip=True), - 'url': link_elem.get('href'), - 'abstract': abstract_elem.get_text(strip=True) if abstract_elem else "" - }) +# if title_elem and link_elem: +# results.append({ +# 'title': title_elem.get_text(strip=True), +# 'url': link_elem.get('href'), +# 'abstract': abstract_elem.get_text(strip=True) if abstract_elem else "" +# }) - data = { - 'query': query, - 'results': results if results else [{'title': '无结果', 'url': '', 'abstract': ''}], - 'timestamp': int(time.time()), - 'sources': [search_url] - } +# data = { +# 'query': query, +# 'results': results if results else [{'title': '无结果', 'url': '', 'abstract': ''}], +# 'timestamp': int(time.time()), +# 'sources': [search_url] +# } - # 保存到缓存 - self._save_to_cache(cache_file, data) - return { - 'data': data, - 'sources': ["www.baidu.com"] - } +# # 保存到缓存 +# self._save_to_cache(cache_file, data) +# return { +# 'data': data, +# 'sources': ["www.baidu.com"] +# } - except Exception as e: - # 如果抓取失败但缓存存在,使用缓存 - if os.path.exists(cache_file): - print(f"抓取失败,使用缓存数据: {str(e)}") - return self._load_from_cache(cache_file) - raise RuntimeError(f"抓取失败且无缓存可用: {str(e)}") \ No newline at end of file +# except Exception as e: +# # 如果抓取失败但缓存存在,使用缓存 +# if os.path.exists(cache_file): +# print(f"抓取失败,使用缓存数据: {str(e)}") +# return self._load_from_cache(cache_file) +# raise RuntimeError(f"抓取失败且无缓存可用: {str(e)}") \ No newline at end of file diff --git a/FFAI/crawlers_core.py b/FFAI/crawlers_core.py new file mode 100644 index 0000000..af6d6ec --- /dev/null +++ b/FFAI/crawlers_core.py @@ -0,0 +1,102 @@ +import urllib.request +import urllib.robotparser +from urllib.parse import urlparse +import time +from bs4 import BeautifulSoup + +class CrawlerEngine: + def __init__(self, cache_manager): + self.cache = cache_manager + self.headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)', + 'Accept-Language': 'zh-CN,zh;q=0.9' + } + self.delay = 2 # 爬取延迟(秒) + + def _can_fetch(self, url) -> bool: + """检查robots.txt权限""" + try: + parsed = urlparse(url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + rp = urllib.robotparser.RobotFileParser() + rp.set_url(f"{base_url}/robots.txt") + rp.read() + return rp.can_fetch(self.headers['User-Agent'], url) + except: + return True + + def _fetch_html(self, url) -> str: + """安全获取网页内容""" + if not self._can_fetch(url): + raise PermissionError(f"无权限爬取: {url}") + + req = urllib.request.Request(url, headers=self.headers) + try: + with urllib.request.urlopen(req, timeout=10) as response: + if response.status == 200: + return response.read().decode('utf-8') + raise ConnectionError(f"HTTP {response.status}") + except Exception as e: + raise ConnectionError(f"获取失败: {url} - {str(e)}") + + def _extract_content(self, html: str) -> dict: + """从HTML提取结构化数据""" + soup = BeautifulSoup(html, 'html.parser') + + # 移除不需要的标签 + for tag in ['script', 'style', 'nav', 'footer']: + for element in soup(tag): + element.decompose() + + # 提取核心内容 + title = soup.title.string if soup.title else '' + text = ' '.join(p.get_text() for p in soup.find_all('p')) + + return { + 'title': title.strip(), + 'content': text.strip(), + 'links': [a['href'] for a in soup.find_all('a', href=True)] + } + + def crawl(self, query: str, max_results=5) -> dict: + """执行完整爬取流程""" + # 先检查缓存 + cached = self.cache.load_from_cache(query) + if cached: + print(f"使用缓存数据: {query}") + return cached + + print(f"开始爬取: {query}") + results = [] + + try: + # 模拟搜索引擎查询(示例使用百度) + search_url = f"https://www.baidu.com/s?wd={urllib.parse.quote(query)}" + html = self._fetch_html(search_url) + data = self._extract_content(html) + + # 限制抓取数量并添加延迟 + for link in data['links'][:max_results]: + if link.startswith('http'): + try: + page_html = self._fetch_html(link) + page_data = self._extract_content(page_html) + results.append({ + 'source_url': link, + 'title': page_data['title'], + 'content': page_data['content'] + }) + time.sleep(self.delay) + except Exception as e: + print(f"子页面抓取失败: {link} - {str(e)}") + + # 保存结果到缓存 + result_data = {'query': query, 'results': results} + self.cache.save_to_cache(query, result_data) + return result_data + + except Exception as e: + print(f"爬取失败: {str(e)}") + if cached: + return cached + raise RuntimeError(f"爬取失败且无缓存可用: {str(e)}") \ No newline at end of file diff --git a/FFAI/main.py b/FFAI/main.py index 11a846d..80c0d2a 100644 --- a/FFAI/main.py +++ b/FFAI/main.py @@ -1,14 +1,19 @@ -from crawlers import PureHTMLParser # type: ignore +# from crawlers import PureHTMLParser # type: ignore from analyzer import PureAnalyzer # type: ignore +from crawlers_core import CrawlerEngine +from catch import CacheManager + class PureInfoHunter: def __init__(self): - self.crawler = PureHTMLParser() + self.cache_manager = CacheManager() + self.crawler = CrawlerEngine(self.cache_manager) self.analyzer = PureAnalyzer() + self.catch = CacheManager() def run(self, query: str): # 1. 获取数据(优先缓存) - data = self.crawler.fetch(query) + data = self.catch(query) # 2. 分析(自动检索历史缓存) result = self.analyzer.analyze(data, query) @@ -46,16 +51,17 @@ if __name__ == "__main__": print("使用方法: python pure_main.py '搜索关键词' [force_update]") print("示例: python pure_main.py '人工智能' true") query = input("请输入要搜索的关键词: ") # 改为交互式输入 + force_update = input("是否强制更新(true/false)? ").lower() == "true" else: - query = sys.argv[1] + # query = sys.argv[1] force_update = len(sys.argv) > 2 and sys.argv[2].lower() == "true" hunter = PureInfoHunter() if force_update: print("强制更新模式(忽略缓存)") - data = hunter.crawler.fetch(query) # 使用实际存在的方法名 + data = hunter.crawler.crawl(query) # 使用实际存在的方法名 result = hunter.analyzer.analyze(data, query) else: result = hunter.run(query) diff --git a/cache/cache__E4_BA_BA_E5_B7_A5_E6_99_BA_E8_83_BD_json.txt b/cache/cache__E4_BA_BA_E5_B7_A5_E6_99_BA_E8_83_BD_json.txt new file mode 100644 index 0000000..e69de29 diff --git a/cache/cache_bilibili_json.txt b/cache/cache_bilibili_json.txt new file mode 100644 index 0000000..e69de29 diff --git a/readme.md b/readme.md index 3105392..6a84d1c 100644 --- a/readme.md +++ b/readme.md @@ -8,7 +8,7 @@ Windows:CMD到FFAIall或者FFAInobug文件夹下然后使用***python main.py 你要问的内容*** -# 通知 +# 通知:可以正常运行了 ## 调试版本