FC/FFAICilent/cloud.py

37 lines
1.3 KiB
Python
Raw Permalink Normal View History

import requests
from ..security import decrypt_secret
from logger import log
class CloudConnector:
def __init__(self, config):
self.endpoint = config['Cloud']['endpoint']
self.timeout = int(config['Deployment']['cloud_timeout'])
self.retry = int(config['Cloud']['retry_times'])
self.token = decrypt_secret('api_token')
def _make_request(self, data):
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
try:
resp = requests.post(
self.endpoint,
json=data,
headers=headers,
timeout=self.timeout
)
resp.raise_for_status()
return resp.json()
except requests.exceptions.RequestException as e:
log.error(f"Cloud request failed: {str(e)}")
raise ConnectionError("Cloud service unavailable")
def execute(self, command):
for attempt in range(self.retry):
try:
return self._make_request({"command": command})
except ConnectionError:
if attempt == self.retry - 1:
raise
log.warning(f"Retrying... ({attempt + 1}/{self.retry})")