新聊天程序

Closes #1
This commit is contained in:
DZY 2025-05-31 19:01:43 +08:00
parent ddb9433d0b
commit 04373736da
2 changed files with 501 additions and 0 deletions

312
chatclient3.0.py Normal file
View File

@ -0,0 +1,312 @@
import socket
import threading
import json
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
from datetime import datetime
from tkinter import font as tkfont
class ChatClient:
def __init__(self, root):
self.root = root
self.root.title("Python Chat")
self.root.geometry("800x600")
self.root.configure(bg='#f0f0f0')
self.current_user = None
self.current_chat = None
self.is_group_chat = False
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.setup_ui()
def setup_ui(self):
self.custom_font = tkfont.Font(family="Helvetica", size=10)
self.bold_font = tkfont.Font(family="Helvetica", size=10, weight="bold")
self.main_frame = ttk.Frame(self.root)
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
self.left_panel = ttk.Frame(self.main_frame, width=200)
self.left_panel.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 10))
self.left_panel.pack_propagate(False)
self.right_panel = ttk.Frame(self.main_frame)
self.right_panel.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
self.login_frame = ttk.Frame(self.left_panel)
self.login_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(self.login_frame, text="Username:").pack(anchor=tk.W)
self.username_entry = ttk.Entry(self.login_frame)
self.username_entry.pack(fill=tk.X)
ttk.Label(self.login_frame, text="Password:").pack(anchor=tk.W, pady=(5, 0))
self.password_entry = ttk.Entry(self.login_frame, show="*")
self.password_entry.pack(fill=tk.X)
self.login_button = ttk.Button(self.login_frame, text="Login", command=self.login)
self.login_button.pack(fill=tk.X, pady=(5, 0))
self.register_button = ttk.Button(self.login_frame, text="Register", command=self.register)
self.register_button.pack(fill=tk.X, pady=(5, 0))
self.user_list_frame = ttk.LabelFrame(self.left_panel, text="Users")
self.user_list_frame.pack(fill=tk.BOTH, expand=True)
self.user_list = tk.Listbox(self.user_list_frame, font=self.custom_font, selectmode=tk.SINGLE)
self.user_list.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.user_list.bind('<<ListboxSelect>>', self.select_user)
self.group_list_frame = ttk.LabelFrame(self.left_panel, text="Groups")
self.group_list_frame.pack(fill=tk.BOTH, pady=(10, 0))
self.group_list = tk.Listbox(self.group_list_frame, font=self.custom_font, selectmode=tk.SINGLE)
self.group_list.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.group_list.bind('<<ListboxSelect>>', self.select_group)
self.create_group_button = ttk.Button(self.left_panel, text="Create Group", command=self.show_create_group_dialog)
self.create_group_button.pack(fill=tk.X, pady=(10, 0))
self.chat_frame = ttk.Frame(self.right_panel)
self.chat_frame.pack(fill=tk.BOTH, expand=True)
self.chat_header = ttk.Label(self.chat_frame, text="Select a chat", font=self.bold_font)
self.chat_header.pack(anchor=tk.W, padx=5, pady=5)
self.chat_display = scrolledtext.ScrolledText(
self.chat_frame, wrap=tk.WORD, state=tk.DISABLED,
font=self.custom_font, bg='white', padx=10, pady=10
)
self.chat_display.pack(fill=tk.BOTH, expand=True)
self.input_frame = ttk.Frame(self.right_panel)
self.input_frame.pack(fill=tk.X, pady=(10, 0))
self.message_entry = ttk.Entry(self.input_frame)
self.message_entry.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 5))
self.message_entry.bind('<Return>', self.send_message)
self.send_button = ttk.Button(self.input_frame, text="Send", command=self.send_message)
self.send_button.pack(side=tk.RIGHT)
self.update_ui_state(False)
def update_ui_state(self, logged_in):
state = tk.NORMAL if logged_in else tk.DISABLED
self.user_list.config(state=state)
self.group_list.config(state=state)
self.message_entry.config(state=state)
self.send_button.config(state=state)
self.create_group_button.config(state=state)
if not logged_in:
self.user_list.delete(0, tk.END)
self.group_list.delete(0, tk.END)
self.chat_header.config(text="Select a chat")
self.chat_display.config(state=tk.NORMAL)
self.chat_display.delete(1.0, tk.END)
self.chat_display.config(state=tk.DISABLED)
self.current_chat = None
def login(self):
username = self.username_entry.get()
password = self.password_entry.get()
if not username or not password:
messagebox.showerror("Error", "Username and password are required")
return
try:
self.socket.connect(('localhost', 5555))
self.socket.send(json.dumps({
'type': 'login',
'username': username,
'password': password
}).encode('utf-8'))
response = json.loads(self.socket.recv(1024).decode('utf-8'))
if response['type'] == 'login_success':
self.current_user = username
self.update_ui_state(True)
threading.Thread(target=self.receive_messages, daemon=True).start()
else:
messagebox.showerror("Error", "Login failed")
self.socket.close()
except Exception as e:
messagebox.showerror("Error", f"Connection error: {str(e)}")
def register(self):
username = self.username_entry.get()
password = self.password_entry.get()
if not username or not password:
messagebox.showerror("Error", "Username and password are required")
return
try:
temp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
temp_socket.connect(('localhost', 5555))
temp_socket.send(json.dumps({
'type': 'register',
'username': username,
'password': password
}).encode('utf-8'))
response = json.loads(temp_socket.recv(1024).decode('utf-8'))
if response['type'] == 'register_success':
messagebox.showinfo("Success", "Registration successful")
else:
messagebox.showerror("Error", "Registration failed - username may be taken")
temp_socket.close()
except Exception as e:
messagebox.showerror("Error", f"Connection error: {str(e)}")
def receive_messages(self):
while True:
try:
message = self.socket.recv(1024).decode('utf-8')
if not message:
break
data = json.loads(message)
if data['type'] == 'user_list':
self.user_list.delete(0, tk.END)
for user in data['users']:
if user != self.current_user:
self.user_list.insert(tk.END, user)
elif data['type'] == 'group_list':
self.group_list.delete(0, tk.END)
for group in data['groups']:
self.group_list.insert(tk.END, group)
elif data['type'] == 'message':
self.display_message(
data['sender'],
data['receiver'],
data['message'],
data['is_group']
)
elif data['type'] == 'initial_messages':
for msg in data['messages']:
self.display_message(
msg['sender'],
msg['receiver'],
msg['message'],
msg['is_group'],
msg['timestamp']
)
except Exception as e:
print(f"Error receiving message: {e}")
break
def display_message(self, sender, receiver, message, is_group, timestamp=None):
timestamp = timestamp or datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if (is_group and receiver == self.current_chat and sender != self.current_user) or \
(not is_group and ((sender == self.current_chat and receiver == self.current_user) or \
(sender == self.current_user and receiver == self.current_chat))):
self.chat_display.config(state=tk.NORMAL)
if sender == self.current_user:
self.chat_display.tag_config('right', justify='right', foreground='blue')
self.chat_display.insert(tk.END, f"{message} ({timestamp})\n", 'right')
else:
sender_label = f"{sender} (group)" if is_group and sender != self.current_chat else sender
self.chat_display.tag_config('left', justify='left', foreground='green')
self.chat_display.insert(tk.END, f"{sender_label}: {message} ({timestamp})\n", 'left')
self.chat_display.config(state=tk.DISABLED)
self.chat_display.see(tk.END)
def select_user(self, event):
selection = event.widget.curselection()
if selection:
self.current_chat = event.widget.get(selection[0])
self.is_group_chat = False
self.chat_header.config(text=f"Chat with {self.current_chat}")
self.chat_display.config(state=tk.NORMAL)
self.chat_display.delete(1.0, tk.END)
self.chat_display.config(state=tk.DISABLED)
def select_group(self, event):
selection = event.widget.curselection()
if selection:
self.current_chat = event.widget.get(selection[0])
self.is_group_chat = True
self.chat_header.config(text=f"Group: {self.current_chat}")
self.chat_display.config(state=tk.NORMAL)
self.chat_display.delete(1.0, tk.END)
self.chat_display.config(state=tk.DISABLED)
def send_message(self, event=None):
if not self.current_chat or not self.message_entry.get():
return
message = self.message_entry.get()
self.socket.send(json.dumps({
'type': 'message',
'sender': self.current_user,
'receiver': self.current_chat,
'message': message,
'is_group': self.is_group_chat
}).encode('utf-8'))
self.display_message(
self.current_user,
self.current_chat,
message,
self.is_group_chat
)
self.message_entry.delete(0, tk.END)
def show_create_group_dialog(self):
dialog = tk.Toplevel(self.root)
dialog.title("Create Group")
dialog.geometry("300x300")
ttk.Label(dialog, text="Group Name:").pack(anchor=tk.W, padx=10, pady=(10, 0))
group_name_entry = ttk.Entry(dialog)
group_name_entry.pack(fill=tk.X, padx=10, pady=(0, 10))
ttk.Label(dialog, text="Select Members:").pack(anchor=tk.W, padx=10, pady=(10, 0))
members_frame = ttk.Frame(dialog)
members_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=(0, 10))
members_list = tk.Listbox(members_frame, selectmode=tk.MULTIPLE)
members_list.pack(fill=tk.BOTH, expand=True)
for i in range(self.user_list.size()):
members_list.insert(tk.END, self.user_list.get(i))
def create_group():
group_name = group_name_entry.get()
selected_indices = members_list.curselection()
selected_members = [members_list.get(i) for i in selected_indices]
if not group_name or not selected_members:
messagebox.showerror("Error", "Group name and at least one member are required")
return
self.socket.send(json.dumps({
'type': 'create_group',
'group_name': group_name,
'members': selected_members + [self.current_user]
}).encode('utf-8'))
dialog.destroy()
button_frame = ttk.Frame(dialog)
button_frame.pack(fill=tk.X, padx=10, pady=(0, 10))
ttk.Button(button_frame, text="Create", command=create_group).pack(side=tk.RIGHT)
ttk.Button(button_frame, text="Cancel", command=dialog.destroy).pack(side=tk.RIGHT, padx=(0, 5))
def on_closing(self):
if self.current_user:
self.socket.close()
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
client = ChatClient(root)
root.protocol("WM_DELETE_WINDOW", client.on_closing)
root.mainloop()

189
chatserver3.0.py Normal file
View File

@ -0,0 +1,189 @@
import socket
import threading
import json
import sqlite3
from datetime import datetime
class ChatServer:
def __init__(self, host='0.0.0.0', port=5555):
self.host = host
self.port = port
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind((host, port))
self.server.listen()
self.clients = {}
self.groups = {'General': set()}
self.setup_database()
def setup_database(self):
self.db = sqlite3.connect('chat_server.db', check_same_thread=False)
cursor = self.db.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE,
password TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender TEXT,
receiver TEXT,
message TEXT,
timestamp DATETIME,
is_group INTEGER
)
''')
self.db.commit()
def handle_client(self, client, address):
username = None
try:
while True:
message = client.recv(1024).decode('utf-8')
if not message:
break
data = json.loads(message)
if data['type'] == 'login':
username = data['username']
password = data['password']
if self.authenticate_user(username, password):
self.clients[username] = client
self.groups['General'].add(username)
client.send(json.dumps({'type': 'login_success'}).encode('utf-8'))
self.send_user_list()
self.send_group_list(username)
self.send_initial_messages(username)
else:
client.send(json.dumps({'type': 'login_fail'}).encode('utf-8'))
elif data['type'] == 'register':
username = data['username']
password = data['password']
if self.register_user(username, password):
client.send(json.dumps({'type': 'register_success'}).encode('utf-8'))
else:
client.send(json.dumps({'type': 'register_fail'}).encode('utf-8'))
elif data['type'] == 'message':
sender = data['sender']
receiver = data['receiver']
msg = data['message']
is_group = data['is_group']
self.save_message(sender, receiver, msg, is_group)
if is_group:
for member in self.groups.get(receiver, set()):
if member in self.clients and member != sender:
self.clients[member].send(json.dumps({
'type': 'message',
'sender': sender,
'receiver': receiver,
'message': msg,
'is_group': True
}).encode('utf-8'))
else:
if receiver in self.clients:
self.clients[receiver].send(json.dumps({
'type': 'message',
'sender': sender,
'receiver': receiver,
'message': msg,
'is_group': False
}).encode('utf-8'))
elif data['type'] == 'create_group':
group_name = data['group_name']
if group_name not in self.groups:
self.groups[group_name] = set()
for user in data['members']:
if user in self.clients:
self.groups[group_name].add(user)
self.send_group_list_to_all()
except Exception as e:
print(f"Error handling client: {e}")
finally:
if username and username in self.clients:
del self.clients[username]
for group in self.groups.values():
if username in group:
group.remove(username)
self.send_user_list()
client.close()
def authenticate_user(self, username, password):
cursor = self.db.cursor()
cursor.execute('SELECT * FROM users WHERE username=? AND password=?', (username, password))
return cursor.fetchone() is not None
def register_user(self, username, password):
try:
cursor = self.db.cursor()
cursor.execute('INSERT INTO users (username, password) VALUES (?, ?)', (username, password))
self.db.commit()
return True
except sqlite3.IntegrityError:
return False
def save_message(self, sender, receiver, message, is_group):
cursor = self.db.cursor()
cursor.execute('''
INSERT INTO messages (sender, receiver, message, timestamp, is_group)
VALUES (?, ?, ?, ?, ?)
''', (sender, receiver, message, datetime.now(), 1 if is_group else 0))
self.db.commit()
def send_user_list(self):
user_list = list(self.clients.keys())
for client in self.clients.values():
client.send(json.dumps({
'type': 'user_list',
'users': user_list
}).encode('utf-8'))
def send_group_list(self, username):
group_list = [group for group, members in self.groups.items() if username in members]
if username in self.clients:
self.clients[username].send(json.dumps({
'type': 'group_list',
'groups': group_list
}).encode('utf-8'))
def send_group_list_to_all(self):
for username in self.clients:
self.send_group_list(username)
def send_initial_messages(self, username):
cursor = self.db.cursor()
cursor.execute('''
SELECT sender, receiver, message, timestamp, is_group FROM messages
WHERE receiver=? OR (is_group=1 AND receiver IN (
SELECT group_name FROM groups WHERE member=?
)) OR sender=?
ORDER BY timestamp
''', (username, username, username))
messages = []
for sender, receiver, message, timestamp, is_group in cursor.fetchall():
messages.append({
'sender': sender,
'receiver': receiver,
'message': message,
'timestamp': timestamp,
'is_group': is_group
})
if username in self.clients:
self.clients[username].send(json.dumps({
'type': 'initial_messages',
'messages': messages
}).encode('utf-8'))
def start(self):
print(f"Server started on {self.host}:{self.port}")
while True:
client, address = self.server.accept()
thread = threading.Thread(target=self.handle_client, args=(client, address))
thread.start()
if __name__ == "__main__":
server = ChatServer()
server.start()