import socket import threading import json import sqlite3 from datetime import datetime class ChatServer: def __init__(self, host='0.0.0.0', port=5555): self.host = host self.port = port self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.bind((host, port)) self.server.listen() self.clients = {} self.groups = {'General': set()} self.setup_database() def setup_database(self): self.db = sqlite3.connect('chat_server.db', check_same_thread=False) cursor = self.db.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE, password TEXT ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, sender TEXT, receiver TEXT, message TEXT, timestamp DATETIME, is_group INTEGER ) ''') self.db.commit() def handle_client(self, client, address): username = None try: while True: message = client.recv(1024).decode('utf-8') if not message: break data = json.loads(message) if data['type'] == 'login': username = data['username'] password = data['password'] if self.authenticate_user(username, password): self.clients[username] = client self.groups['General'].add(username) client.send(json.dumps({'type': 'login_success'}).encode('utf-8')) self.send_user_list() self.send_group_list(username) self.send_initial_messages(username) else: client.send(json.dumps({'type': 'login_fail'}).encode('utf-8')) elif data['type'] == 'register': username = data['username'] password = data['password'] if self.register_user(username, password): client.send(json.dumps({'type': 'register_success'}).encode('utf-8')) else: client.send(json.dumps({'type': 'register_fail'}).encode('utf-8')) elif data['type'] == 'message': sender = data['sender'] receiver = data['receiver'] msg = data['message'] is_group = data['is_group'] self.save_message(sender, receiver, msg, is_group) if is_group: for member in self.groups.get(receiver, set()): if member in self.clients and member != sender: self.clients[member].send(json.dumps({ 'type': 'message', 'sender': sender, 'receiver': receiver, 'message': msg, 'is_group': True }).encode('utf-8')) else: if receiver in self.clients: self.clients[receiver].send(json.dumps({ 'type': 'message', 'sender': sender, 'receiver': receiver, 'message': msg, 'is_group': False }).encode('utf-8')) elif data['type'] == 'create_group': group_name = data['group_name'] if group_name not in self.groups: self.groups[group_name] = set() for user in data['members']: if user in self.clients: self.groups[group_name].add(user) self.send_group_list_to_all() # 管理API处理逻辑 elif data['type'] == 'admin_get_users': cursor = self.db.cursor() cursor.execute('SELECT id, username FROM users') users = [{'id': row[0], 'username': row[1]} for row in cursor.fetchall()] client.send(json.dumps({'type': 'admin_response', 'data': users}).encode('utf-8')) elif data['type'] == 'admin_create_user': try: cursor = self.db.cursor() cursor.execute('INSERT INTO users (username, password) VALUES (?, ?)', (data['username'], data['password'])) self.db.commit() client.send(json.dumps({'type': 'admin_response', 'success': True}).encode('utf-8')) except sqlite3.IntegrityError: client.send(json.dumps({'type': 'admin_response', 'success': False, 'error': 'Username exists'}).encode('utf-8')) elif data['type'] == 'admin_delete_user': cursor = self.db.cursor() try: cursor.execute('DELETE FROM messages WHERE sender=? OR receiver=?', (data['username'], data['username'])) cursor.execute('DELETE FROM users WHERE username=?', (data['username'],)) self.db.commit() if data['username'] in self.clients: self.clients[data['username']].close() del self.clients[data['username']] for group in self.groups.values(): if data['username'] in group: group.remove(data['username']) self.send_user_list() self.send_group_list_to_all() client.send(json.dumps({'type': 'admin_response', 'success': True}).encode('utf-8')) except Exception as e: client.send(json.dumps({'type': 'admin_response', 'success': False, 'error': str(e)}).encode('utf-8')) elif data['type'] == 'admin_get_messages': cursor = self.db.cursor() query = 'SELECT id, sender, receiver, message, timestamp, is_group FROM messages WHERE 1=1' params = [] if 'sender' in data and data['sender']: query += ' AND sender=?' params.append(data['sender']) if 'receiver' in data and data['receiver']: query += ' AND receiver=?' params.append(data['receiver']) if 'is_group' in data and data['is_group'] is not None: query += ' AND is_group=?' params.append(1 if data['is_group'] else 0) if 'start_time' in data and data['start_time']: query += ' AND timestamp >= ?' params.append(data['start_time']) if 'end_time' in data and data['end_time']: query += ' AND timestamp <= ?' params.append(data['end_time']) query += ' ORDER BY timestamp DESC LIMIT 1000' cursor.execute(query, params) messages = [] for row in cursor.fetchall(): messages.append({ 'id': row[0], 'sender': row[1], 'receiver': row[2], 'message': row[3], 'timestamp': row[4], 'is_group': bool(row[5]) }) client.send(json.dumps({'type': 'admin_response', 'data': messages}).encode('utf-8')) elif data['type'] == 'admin_delete_message': cursor = self.db.cursor() cursor.execute('DELETE FROM messages WHERE id=?', (data['message_id'],)) self.db.commit() client.send(json.dumps({'type': 'admin_response', 'success': True}).encode('utf-8')) elif data['type'] == 'admin_get_groups': groups = [] for name, members in self.groups.items(): groups.append({ 'name': name, 'members': list(members) }) client.send(json.dumps({'type': 'admin_response', 'data': groups}).encode('utf-8')) elif data['type'] == 'admin_create_group': group_name = data['group_name'] if group_name not in self.groups: self.groups[group_name] = set() for user in data['members']: if user in self.clients or self.user_exists(user): self.groups[group_name].add(user) self.send_group_list_to_all() client.send(json.dumps({'type': 'admin_response', 'success': True}).encode('utf-8')) else: client.send(json.dumps({'type': 'admin_response', 'success': False, 'error': 'Group already exists'}).encode('utf-8')) elif data['type'] == 'admin_delete_group': if data['group_name'] in self.groups: del self.groups[data['group_name']] cursor = self.db.cursor() cursor.execute('DELETE FROM messages WHERE receiver=? AND is_group=1', (data['group_name'],)) self.db.commit() self.send_group_list_to_all() client.send(json.dumps({'type': 'admin_response', 'success': True}).encode('utf-8')) else: client.send(json.dumps({'type': 'admin_response', 'success': False, 'error': 'Group not found'}).encode('utf-8')) elif data['type'] == 'admin_add_group_member': if data['group_name'] in self.groups: if data['username'] in self.clients or self.user_exists(data['username']): self.groups[data['group_name']].add(data['username']) if data['username'] in self.clients: self.send_group_list(data['username']) client.send(json.dumps({'type': 'admin_response', 'success': True}).encode('utf-8')) else: client.send(json.dumps({'type': 'admin_response', 'success': False, 'error': 'User not found'}).encode('utf-8')) else: client.send(json.dumps({'type': 'admin_response', 'success': False, 'error': 'Group not found'}).encode('utf-8')) elif data['type'] == 'admin_remove_group_member': if data['group_name'] in self.groups and data['username'] in self.groups[data['group_name']]: self.groups[data['group_name']].remove(data['username']) if data['username'] in self.clients: self.send_group_list(data['username']) client.send(json.dumps({'type': 'admin_response', 'success': True}).encode('utf-8')) else: client.send(json.dumps({'type': 'admin_response', 'success': False, 'error': 'Member not in group'}).encode('utf-8')) elif data['type'] == 'admin_get_server_status': status = { 'status': 'running', 'users_online': len(self.clients), 'groups': len(self.groups), 'start_time': str(datetime.now()) } client.send(json.dumps({'type': 'admin_response', 'data': status}).encode('utf-8')) elif data['type'] == 'admin_restart_server': client.send(json.dumps({'type': 'admin_response', 'success': True, 'message': 'Restart command received'}).encode('utf-8')) elif data['type'] == 'admin_shutdown_server': client.send(json.dumps({'type': 'admin_response', 'success': True, 'message': 'Shutdown command received'}).encode('utf-8')) threading.Thread(target=self.shutdown).start() except Exception as e: print(f"Error handling client: {e}") finally: if username and username in self.clients: del self.clients[username] for group in self.groups.values(): if username in group: group.remove(username) self.send_user_list() client.close() def authenticate_user(self, username, password): cursor = self.db.cursor() cursor.execute('SELECT * FROM users WHERE username=? AND password=?', (username, password)) return cursor.fetchone() is not None def register_user(self, username, password): try: cursor = self.db.cursor() cursor.execute('INSERT INTO users (username, password) VALUES (?, ?)', (username, password)) self.db.commit() return True except sqlite3.IntegrityError: return False def save_message(self, sender, receiver, message, is_group): cursor = self.db.cursor() cursor.execute(''' INSERT INTO messages (sender, receiver, message, timestamp, is_group) VALUES (?, ?, ?, ?, ?) ''', (sender, receiver, message, datetime.now(), 1 if is_group else 0)) self.db.commit() def send_user_list(self): user_list = list(self.clients.keys()) for client in self.clients.values(): client.send(json.dumps({ 'type': 'user_list', 'users': user_list }).encode('utf-8')) def send_group_list(self, username): group_list = [group for group, members in self.groups.items() if username in members] if username in self.clients: self.clients[username].send(json.dumps({ 'type': 'group_list', 'groups': group_list }).encode('utf-8')) def send_group_list_to_all(self): for username in self.clients: self.send_group_list(username) def send_initial_messages(self, username): cursor = self.db.cursor() cursor.execute(''' SELECT sender, receiver, message, timestamp, is_group FROM messages WHERE receiver=? OR (is_group=1 AND receiver IN ( SELECT group_name FROM groups WHERE member=? )) OR sender=? ORDER BY timestamp ''', (username, username, username)) messages = [] for sender, receiver, message, timestamp, is_group in cursor.fetchall(): messages.append({ 'sender': sender, 'receiver': receiver, 'message': message, 'timestamp': timestamp, 'is_group': is_group }) if username in self.clients: self.clients[username].send(json.dumps({ 'type': 'initial_messages', 'messages': messages }).encode('utf-8')) def start(self): print(f"Server started on {self.host}:{self.port}") while True: client, address = self.server.accept() thread = threading.Thread(target=self.handle_client, args=(client, address)) thread.start() if __name__ == "__main__": server = ChatServer() server.start()