diff --git a/ltsv2.py b/ltsv2.py new file mode 100644 index 0000000..a75d4de --- /dev/null +++ b/ltsv2.py @@ -0,0 +1,471 @@ +import sqlite3 +from datetime import datetime +import hashlib +import uuid +import json +import socket +import threading +from flask import Flask, request, jsonify +from werkzeug.serving import make_server +import time +class IntegratedChatServer: + def __init__(self, http_host='0.0.0.0', http_port=5000, socket_host='0.0.0.0', socket_port=12345): + self.http_host = http_host + self.http_port = http_port + self.socket_host = socket_host + self.socket_port = socket_port + + self.init_db() + + + self.app = Flask(__name__) + self.setup_http_routes() + self.http_server = make_server(self.http_host, self.http_port, self.app) + + + self.socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.clients = {} + self.lock = threading.Lock() + + + self.http_thread = None + self.socket_thread = None + + def init_db(self): + """初始化数据库""" + conn = sqlite3.connect('chat_server.db') + cursor = conn.cursor() + + + cursor.execute(''' + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT UNIQUE NOT NULL, + password TEXT NOT NULL, + created_at TEXT NOT NULL, + last_login TEXT, + is_online INTEGER DEFAULT 0 + ) + ''') + + + cursor.execute(''' + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + sender_id INTEGER NOT NULL, + receiver_id INTEGER NOT NULL, + content TEXT NOT NULL, + timestamp TEXT NOT NULL, + is_recalled INTEGER DEFAULT 0, + FOREIGN KEY (sender_id) REFERENCES users(id), + FOREIGN KEY (receiver_id) REFERENCES users(id) + ) + ''') + + conn.commit() + conn.close() + + def get_db_connection(self): + """获取数据库连接""" + conn = sqlite3.connect('chat_server.db') + conn.row_factory = sqlite3.Row + return conn + + + def setup_http_routes(self): + """设置HTTP路由""" + + @self.app.route('/api/register', methods=['POST']) + def register(): + data = request.get_json() + username = data.get('username') + password = data.get('password') + + if not username or not password: + return jsonify({'success': False, 'message': 'Username and password are required'}), 400 + + hashed_password = hashlib.sha256(password.encode()).hexdigest() + created_at = datetime.now().isoformat() + + try: + conn = self.get_db_connection() + cursor = conn.cursor() + cursor.execute(''' + INSERT INTO users (username, password, created_at) + VALUES (?, ?, ?) + ''', (username, hashed_password, created_at)) + conn.commit() + return jsonify({'success': True, 'message': 'User registered successfully'}), 201 + except sqlite3.IntegrityError: + return jsonify({'success': False, 'message': 'Username already exists'}), 400 + finally: + conn.close() + + @self.app.route('/api/login', methods=['POST']) + def login(): + data = request.get_json() + username = data.get('username') + password = data.get('password') + + if not username or not password: + return jsonify({'success': False, 'message': 'Username and password are required'}), 400 + + hashed_password = hashlib.sha256(password.encode()).hexdigest() + + conn = None + try: + conn = self.get_db_connection() + cursor = conn.cursor() + cursor.execute('SELECT * FROM users WHERE username = ? AND password = ?', (username, hashed_password)) + user = cursor.fetchone() + + if user: + last_login = datetime.now().isoformat() + cursor.execute('UPDATE users SET last_login = ?, is_online = 1 WHERE id = ?', (last_login, user['id'])) + conn.commit() + + token = str(uuid.uuid4()) + return jsonify({ + 'success': True, + 'message': 'Login successful', + 'token': token, + 'user_id': user['id'], + 'username': user['username'] + }), 200 + else: + return jsonify({'success': False, 'message': 'Invalid username or password'}), 401 + except Exception as e: + return jsonify({'success': False, 'message': str(e)}), 500 + finally: + if conn: + conn.close() + + @self.app.route('/api/users', methods=['GET']) + def get_users(): + conn = self.get_db_connection() + cursor = conn.cursor() + cursor.execute('SELECT id, username, is_online, last_login FROM users') + users = cursor.fetchall() + conn.close() + + users_list = [dict(user) for user in users] + return jsonify({'success': True, 'users': users_list}), 200 + + @self.app.route('/api/messages', methods=['GET']) + def get_messages(): + user_id = request.args.get('user_id') + other_id = request.args.get('other_id') + limit = request.args.get('limit', 100) + + if not user_id or not other_id: + return jsonify({'success': False, 'message': 'user_id and other_id are required'}), 400 + + conn = self.get_db_connection() + cursor = conn.cursor() + cursor.execute(''' + SELECT m.id, m.sender_id, m.receiver_id, m.content, m.timestamp, m.is_recalled, + u1.username as sender_name, u2.username as receiver_name + FROM messages m + JOIN users u1 ON m.sender_id = u1.id + JOIN users u2 ON m.receiver_id = u2.id + WHERE (m.sender_id = ? AND m.receiver_id = ?) OR (m.sender_id = ? AND m.receiver_id = ?) + ORDER BY m.timestamp DESC + LIMIT ? + ''', (user_id, other_id, other_id, user_id, limit)) + + messages = cursor.fetchall() + conn.close() + + messages_list = [] + for msg in messages: + msg_dict = dict(msg) + if msg_dict['is_recalled']: + msg_dict['content'] = '[消息已撤回]' + messages_list.append(msg_dict) + + return jsonify({'success': True, 'messages': messages_list}), 200 + + @self.app.route('/api/recall_message', methods=['POST']) + def recall_message(): + data = request.get_json() + message_id = data.get('message_id') + user_id = data.get('user_id') + + if not message_id or not user_id: + return jsonify({'success': False, 'message': 'message_id and user_id are required'}), 400 + + conn = self.get_db_connection() + cursor = conn.cursor() + + #紧 急 撤 离 ! + cursor.execute('SELECT sender_id, receiver_id FROM messages WHERE id = ?', (message_id,)) + message = cursor.fetchone() + + if not message: + return jsonify({'success': False, 'message': 'Message not found'}), 404 + + if message['sender_id'] != int(user_id): + return jsonify({'success': False, 'message': 'You can only recall your own messages'}), 403 + + + cursor.execute('UPDATE messages SET is_recalled = 1 WHERE id = ?', (message_id,)) + conn.commit() + conn.close() + + recall_notification = { + 'type': 'message_recalled', + 'message_id': message_id, + 'sender_id': user_id, + 'receiver_id': message['receiver_id'], + 'timestamp': datetime.now().isoformat() + } + + self.notify_clients(recall_notification, message['receiver_id'], user_id) + + return jsonify({'success': True, 'message': 'Message recalled successfully'}), 200 + + #socket来喽 + def broadcast(self, message, exclude_user_id=None): + """广播消息给所有客户端""" + with self.lock: + for user_id, (client_socket, _) in self.clients.items(): + if user_id != exclude_user_id: + try: + client_socket.send((json.dumps(message) + '\n').encode('utf-8')) + except: + self.remove_client(user_id) + + def remove_client(self, user_id): + """移除客户端并更新在线状态""" + with self.lock: + if user_id in self.clients: + try: + self.clients[user_id][0].close() + except: + pass + del self.clients[user_id] + + + conn = self.get_db_connection() + cursor = conn.cursor() + cursor.execute('UPDATE users SET is_online = 0 WHERE id = ?', (user_id,)) + conn.commit() + conn.close() + + + self.broadcast({ + 'type': 'user_offline', + 'user_id': user_id, + 'timestamp': datetime.now().isoformat() + }) + + def notify_clients(self, message, *user_ids): + """通知特定客户端""" + with self.lock: + for user_id in user_ids: + if str(user_id) in self.clients: + try: + self.clients[str(user_id)][0].send((json.dumps(message) + '\n').encode('utf-8')) + except: + self.remove_client(str(user_id)) + + def handle_client(self, client_socket, address): + """处理客户端连接""" + print(f"New connection from {address}") + + try: + while True: + data = client_socket.recv(4096) + if not data: + break + + + messages = data.decode('utf-8').split('\n') + for msg in messages: + if not msg.strip(): + continue + + try: + message = json.loads(msg) + self.process_message(message, client_socket) + except json.JSONDecodeError: + print(f"Invalid JSON from {address}: {msg}") + except ConnectionResetError: + print(f"Client {address} disconnected abruptly") + finally: + + user_id_to_remove = None + with self.lock: + for user_id, (sock, _) in self.clients.items(): + if sock == client_socket: + user_id_to_remove = user_id + break + + if user_id_to_remove: + self.remove_client(user_id_to_remove) + + client_socket.close() + print(f"Connection from {address} closed") + + def process_message(self, message, client_socket): + """处理不同类型的消息""" + msg_type = message.get('type') + + if msg_type == 'login': + self.handle_login(message, client_socket) + elif msg_type == 'message': + self.handle_chat_message(message) + elif msg_type == 'recall': + self.handle_recall_message(message) + + def handle_login(self, message, client_socket): + """处理用户登录到socket服务器""" + user_id = message.get('user_id') + username = message.get('username') + token = message.get('token') + + if not user_id or not username: + return + + + with self.lock: + self.clients[user_id] = (client_socket, username) + + conn = self.get_db_connection() + cursor = conn.cursor() + cursor.execute('UPDATE users SET is_online = 1 WHERE id = ?', (user_id,)) + conn.commit() + conn.close() + + self.broadcast({ + 'type': 'user_online', + 'user_id': user_id, + 'username': username, + 'timestamp': datetime.now().isoformat() + }) + + online_users = [] + with self.lock: + for uid, (_, uname) in self.clients.items(): + online_users.append({'user_id': uid, 'username': uname}) + + client_socket.send(json.dumps({ + 'type': 'online_users', + 'users': online_users, + 'timestamp': datetime.now().isoformat() + }).encode('utf-8')) + + def handle_chat_message(self, message): + """处理聊天消息""" + sender_id = message.get('sender_id') + receiver_id = message.get('receiver_id') + content = message.get('content') + + if not sender_id or not receiver_id or not content: + return + + + conn = self.get_db_connection() + cursor = conn.cursor() + cursor.execute(''' + INSERT INTO messages (sender_id, receiver_id, content, timestamp) + VALUES (?, ?, ?, ?) + ''', (sender_id, receiver_id, content, datetime.now().isoformat())) + conn.commit() + message_id = cursor.lastrowid + conn.close() + + + message_to_send = { + 'type': 'message', + 'message_id': message_id, + 'sender_id': sender_id, + 'receiver_id': receiver_id, + 'content': content, + 'timestamp': datetime.now().isoformat(), + 'is_recalled': False + } + + + self.notify_clients(message_to_send, receiver_id, sender_id) + + def handle_recall_message(self, message): + """处理撤回消息请求""" + message_id = message.get('message_id') + user_id = message.get('user_id') + + if not message_id or not user_id: + return + + + conn = self.get_db_connection() + cursor = conn.cursor() + + + cursor.execute('SELECT sender_id, receiver_id FROM messages WHERE id = ?', (message_id,)) + msg = cursor.fetchone() + + if not msg or msg['sender_id'] != int(user_id): + conn.close() + return + + + cursor.execute('UPDATE messages SET is_recalled = 1 WHERE id = ?', (message_id,)) + conn.commit() + conn.close() + + + recall_notification = { + 'type': 'message_recalled', + 'message_id': message_id, + 'sender_id': user_id, + 'receiver_id': msg['receiver_id'], + 'timestamp': datetime.now().isoformat() + } + + self.notify_clients(recall_notification, msg['receiver_id'], user_id) + + def start_http_server(self): + """启动HTTP服务器""" + print(f"HTTP server running on http://{self.http_host}:{self.http_port}") + self.http_server.serve_forever() + + def start_socket_server(self): + """启动Socket服务器""" + self.socket_server.bind((self.socket_host, self.socket_port)) + self.socket_server.listen(5) + print(f"Socket server listening on {self.socket_host}:{self.socket_port}") + + try: + while True: + client_socket, address = self.socket_server.accept() + client_thread = threading.Thread( + target=self.handle_client, + args=(client_socket, address), + daemon=True + ) + client_thread.start() + except KeyboardInterrupt: + print("Shutting down socket server...") + finally: + self.socket_server.close() + + def run(self): + """启动服务器""" + self.http_thread = threading.Thread(target=self.start_http_server, daemon=True) + self.http_thread.start() + self.socket_thread = threading.Thread(target=self.start_socket_server, daemon=True) + self.socket_thread.start() + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("Shutting down servers...") + self.http_server.shutdown() + self.socket_server.close() + +if __name__ == '__main__': + server = IntegratedChatServer() + server.run() \ No newline at end of file