chatserver/ltsv2.py
2025-05-31 18:33:51 +08:00

473 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
from datetime import datetime
import hashlib
import uuid
import json
import socket
import threading
from flask import Flask, request, jsonify
from werkzeug.serving import make_server
import time
print("man !!! what can I say ! Manba out!!!!")
class IntegratedChatServer:
def __init__(self, http_host='0.0.0.0', http_port=5000, socket_host='0.0.0.0', socket_port=12345):
self.http_host = http_host
self.http_port = http_port
self.socket_host = socket_host
self.socket_port = socket_port
self.init_db()
self.app = Flask(__name__)
self.setup_http_routes()
self.http_server = make_server(self.http_host, self.http_port, self.app)
self.socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.clients = {}
self.lock = threading.Lock()
self.http_thread = None
self.socket_thread = None
def init_db(self):
"""初始化数据库"""
conn = sqlite3.connect('chat_server.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
created_at TEXT NOT NULL,
last_login TEXT,
is_online INTEGER DEFAULT 0
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_id INTEGER NOT NULL,
receiver_id INTEGER NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
is_recalled INTEGER DEFAULT 0,
FOREIGN KEY (sender_id) REFERENCES users(id),
FOREIGN KEY (receiver_id) REFERENCES users(id)
)
''')
conn.commit()
conn.close()
def get_db_connection(self):
"""获取数据库连接"""
conn = sqlite3.connect('chat_server.db')
conn.row_factory = sqlite3.Row
return conn
def setup_http_routes(self):
"""设置HTTP路由"""
@self.app.route('/api/register', methods=['POST'])
def register():
global conn
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'success': False, 'message': 'Username and password are required'}), 400
hashed_password = hashlib.sha256(password.encode()).hexdigest()
created_at = datetime.now().isoformat()
try:
conn = self.get_db_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO users (username, password, created_at)
VALUES (?, ?, ?)
''', (username, hashed_password, created_at))
conn.commit()
return jsonify({'success': True, 'message': 'User registered successfully'}), 201
except sqlite3.IntegrityError:
return jsonify({'success': False, 'message': 'Username already exists'}), 400
finally:
conn.close()
@self.app.route('/api/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'success': False, 'message': 'Username and password are required'}), 400
hashed_password = hashlib.sha256(password.encode()).hexdigest()
conn = None
try:
conn = self.get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE username = ? AND password = ?', (username, hashed_password))
user = cursor.fetchone()
if user:
last_login = datetime.now().isoformat()
cursor.execute('UPDATE users SET last_login = ?, is_online = 1 WHERE id = ?', (last_login, user['id']))
conn.commit()
token = str(uuid.uuid4())
return jsonify({
'success': True,
'message': 'Login successful',
'token': token,
'user_id': user['id'],
'username': user['username']
}), 200
else:
return jsonify({'success': False, 'message': 'Invalid username or password'}), 401
except Exception as e:
return jsonify({'success': False, 'message': str(e)}), 500
finally:
if conn:
conn.close()
@self.app.route('/api/users', methods=['GET'])
def get_users():
conn = self.get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT id, username, is_online, last_login FROM users')
users = cursor.fetchall()
conn.close()
users_list = [dict(user) for user in users]
return jsonify({'success': True, 'users': users_list}), 200
@self.app.route('/api/messages', methods=['GET'])
def get_messages():
user_id = request.args.get('user_id')
other_id = request.args.get('other_id')
limit = request.args.get('limit', 100)
if not user_id or not other_id:
return jsonify({'success': False, 'message': 'user_id and other_id are required'}), 400
conn = self.get_db_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT m.id, m.sender_id, m.receiver_id, m.content, m.timestamp, m.is_recalled,
u1.username as sender_name, u2.username as receiver_name
FROM messages m
JOIN users u1 ON m.sender_id = u1.id
JOIN users u2 ON m.receiver_id = u2.id
WHERE (m.sender_id = ? AND m.receiver_id = ?) OR (m.sender_id = ? AND m.receiver_id = ?)
ORDER BY m.timestamp DESC
LIMIT ?
''', (user_id, other_id, other_id, user_id, limit))
messages = cursor.fetchall()
conn.close()
messages_list = []
for msg in messages:
msg_dict = dict(msg)
if msg_dict['is_recalled']:
msg_dict['content'] = '[消息已撤回]'
messages_list.append(msg_dict)
return jsonify({'success': True, 'messages': messages_list}), 200
@self.app.route('/api/recall_message', methods=['POST'])
def recall_message():
data = request.get_json()
message_id = data.get('message_id')
user_id = data.get('user_id')
if not message_id or not user_id:
return jsonify({'success': False, 'message': 'message_id and user_id are required'}), 400
conn = self.get_db_connection()
cursor = conn.cursor()
#紧 急 撤 离
cursor.execute('SELECT sender_id, receiver_id FROM messages WHERE id = ?', (message_id,))
message = cursor.fetchone()
if not message:
return jsonify({'success': False, 'message': 'Message not found'}), 404
if message['sender_id'] != int(user_id):
return jsonify({'success': False, 'message': 'You can only recall your own messages'}), 403
cursor.execute('UPDATE messages SET is_recalled = 1 WHERE id = ?', (message_id,))
conn.commit()
conn.close()
recall_notification = {
'type': 'message_recalled',
'message_id': message_id,
'sender_id': user_id,
'receiver_id': message['receiver_id'],
'timestamp': datetime.now().isoformat()
}
self.notify_clients(recall_notification, message['receiver_id'], user_id)
return jsonify({'success': True, 'message': 'Message recalled successfully'}), 200
#socket来喽
def broadcast(self, message, exclude_user_id=None):
"""广播消息给所有客户端"""
with self.lock:
for user_id, (client_socket, _) in self.clients.items():
if user_id != exclude_user_id:
try:
client_socket.send((json.dumps(message) + '\n').encode('utf-8'))
except:
self.remove_client(user_id)
def remove_client(self, user_id):
"""移除客户端并更新在线状态"""
with self.lock:
if user_id in self.clients:
try:
self.clients[user_id][0].close()
except:
pass
del self.clients[user_id]
conn = self.get_db_connection()
cursor = conn.cursor()
cursor.execute('UPDATE users SET is_online = 0 WHERE id = ?', (user_id,))
conn.commit()
conn.close()
self.broadcast({
'type': 'user_offline',
'user_id': user_id,
'timestamp': datetime.now().isoformat()
})
def notify_clients(self, message, *user_ids):
"""通知特定客户端"""
with self.lock:
for user_id in user_ids:
if str(user_id) in self.clients:
try:
self.clients[str(user_id)][0].send((json.dumps(message) + '\n').encode('utf-8'))
except:
self.remove_client(str(user_id))
def handle_client(self, client_socket, address):
"""处理客户端连接"""
print(f"New connection from {address}")
try:
while True:
data = client_socket.recv(4096)
if not data:
break
messages = data.decode('utf-8').split('\n')
for msg in messages:
if not msg.strip():
continue
try:
message = json.loads(msg)
self.process_message(message, client_socket)
except json.JSONDecodeError:
print(f"Invalid JSON from {address}: {msg}")
except ConnectionResetError:
print(f"Client {address} disconnected abruptly")
finally:
user_id_to_remove = None
with self.lock:
for user_id, (sock, _) in self.clients.items():
if sock == client_socket:
user_id_to_remove = user_id
break
if user_id_to_remove:
self.remove_client(user_id_to_remove)
client_socket.close()
print(f"Connection from {address} closed")
def process_message(self, message, client_socket):
"""处理不同类型的消息"""
msg_type = message.get('type')
if msg_type == 'login':
self.handle_login(message, client_socket)
elif msg_type == 'message':
self.handle_chat_message(message)
elif msg_type == 'recall':
self.handle_recall_message(message)
def handle_login(self, message, client_socket):
"""处理用户登录到socket服务器"""
user_id = message.get('user_id')
username = message.get('username')
token = message.get('token')
if not user_id or not username:
return
with self.lock:
self.clients[user_id] = (client_socket, username)
conn = self.get_db_connection()
cursor = conn.cursor()
cursor.execute('UPDATE users SET is_online = 1 WHERE id = ?', (user_id,))
conn.commit()
conn.close()
self.broadcast({
'type': 'user_online',
'user_id': user_id,
'username': username,
'timestamp': datetime.now().isoformat()
})
online_users = []
with self.lock:
for uid, (_, uname) in self.clients.items():
online_users.append({'user_id': uid, 'username': uname})
client_socket.send(json.dumps({
'type': 'online_users',
'users': online_users,
'timestamp': datetime.now().isoformat()
}).encode('utf-8'))
def handle_chat_message(self, message):
"""处理聊天消息"""
sender_id = message.get('sender_id')
receiver_id = message.get('receiver_id')
content = message.get('content')
if not sender_id or not receiver_id or not content:
return
conn = self.get_db_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO messages (sender_id, receiver_id, content, timestamp)
VALUES (?, ?, ?, ?)
''', (sender_id, receiver_id, content, datetime.now().isoformat()))
conn.commit()
message_id = cursor.lastrowid
conn.close()
message_to_send = {
'type': 'message',
'message_id': message_id,
'sender_id': sender_id,
'receiver_id': receiver_id,
'content': content,
'timestamp': datetime.now().isoformat(),
'is_recalled': False
}
self.notify_clients(message_to_send, receiver_id, sender_id)
def handle_recall_message(self, message):
"""处理撤回消息请求"""
message_id = message.get('message_id')
user_id = message.get('user_id')
if not message_id or not user_id:
return
conn = self.get_db_connection()
cursor = conn.cursor()
cursor.execute('SELECT sender_id, receiver_id FROM messages WHERE id = ?', (message_id,))
msg = cursor.fetchone()
if not msg or msg['sender_id'] != int(user_id):
conn.close()
return
cursor.execute('UPDATE messages SET is_recalled = 1 WHERE id = ?', (message_id,))
conn.commit()
conn.close()
recall_notification = {
'type': 'message_recalled',
'message_id': message_id,
'sender_id': user_id,
'receiver_id': msg['receiver_id'],
'timestamp': datetime.now().isoformat()
}
self.notify_clients(recall_notification, msg['receiver_id'], user_id)
def start_http_server(self):
"""启动HTTP服务器"""
print(f"HTTP server running on http://{self.http_host}:{self.http_port}")
self.http_server.serve_forever()
def start_socket_server(self):
"""启动Socket服务器"""
self.socket_server.bind((self.socket_host, self.socket_port))
self.socket_server.listen(5)
print(f"Socket server listening on {self.socket_host}:{self.socket_port}")
try:
while True:
client_socket, address = self.socket_server.accept()
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address),
daemon=True
)
client_thread.start()
except KeyboardInterrupt:
print("Shutting down socket server...")
finally:
self.socket_server.close()
def run(self):
"""启动服务器"""
self.http_thread = threading.Thread(target=self.start_http_server, daemon=True)
self.http_thread.start()
self.socket_thread = threading.Thread(target=self.start_socket_server, daemon=True)
self.socket_thread.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down servers...")
self.http_server.shutdown()
self.socket_server.close()
if __name__ == '__main__':
server = IntegratedChatServer()
server.run()