FC/FFAICilent/manger.py
Friendfeng b3b036429a renamed: FFAI/__pycache__/analyzer.cpython-313.pyc -> FFAICilent/__pycache__/analyzer.cpython-313.pyc
renamed:    FFAI/__pycache__/catch.cpython-313.pyc -> FFAICilent/__pycache__/catch.cpython-313.pyc
	renamed:    FFAI/__pycache__/crawlers.cpython-313.pyc -> FFAICilent/__pycache__/crawlers.cpython-313.pyc
	renamed:    FFAI/__pycache__/crawlers_core.cpython-313.pyc -> FFAICilent/__pycache__/crawlers_core.cpython-313.pyc
	renamed:    FFAI/analyzer.py -> FFAICilent/analyzer.py
	renamed:    FFAI/catch.py -> FFAICilent/catch.py
	new file:   FFAICilent/cloud.py
	new file:   FFAICilent/config/config.ini
	new file:   FFAICilent/config/configloder.py
	renamed:    FFAI/crawlers.py -> FFAICilent/crawlers.py
	renamed:    FFAI/crawlers_core.py -> FFAICilent/crawlers_core.py
	new file:   FFAICilent/local.py
	new file:   FFAICilent/logger.py
	renamed:    FFAI/main.py -> FFAICilent/main.py
	new file:   FFAICilent/manger.py
2025-06-15 13:29:11 +08:00

40 lines
1.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from cloud import CloudConnector
from local import LocalConnector
from ..FFAICilent.config.configloder import load_config
class ConnectionManager:
def __init__(self):
self.config = load_config()
self.mode = self.config['Deployment']['default_mode']
self._init_connectors()
def _init_connectors(self):
self.cloud = CloudConnector(self.config)
self.local = LocalConnector(self.config)
def execute(self, command):
# 优先使用默认模式
if self.mode == 'cloud':
try:
return self.cloud.execute(command)
except ConnectionError as e:
if self.config.getboolean('Deployment', 'local_fallback'):
return self._fallback_to_local(command)
raise
else:
return self.local.execute(command)
def _fallback_to_local(self, command):
"""自动降级到本地模式"""
from logger import log
log.warning("Falling back to local mode")
self.mode = 'local'
# 转换云指令到本地查询
local_query = self._adapt_command(command)
return self.local.execute(local_query)
def _adapt_command(self, cloud_cmd):
"""将云API指令转换为本地查询示例"""
# 这里添加你的业务逻辑转换规则
return f"SELECT * FROM cache WHERE key='{cloud_cmd}'"