API/CC3.1.py

239 lines
8.3 KiB
Python

import tkinter as tk
import socket
import json
import threading
import queue
from tkinter import scrolledtext, messagebox
class ChatClient:
def __init__(self, root):
self.root = root
self.root.title("聊天客户端")
self.socket = None
self.connected = False
self.username = None
self.message_queue = queue.Queue()
self.network_thread = None
self.setup_ui()
self.check_messages()
def setup_ui(self):
self.login_frame = tk.Frame(self.root)
self.login_frame.pack(pady=10)
tk.Label(self.login_frame, text="用户名:").grid(row=0, column=0)
self.username_entry = tk.Entry(self.login_frame)
self.username_entry.grid(row=0, column=1)
tk.Label(self.login_frame, text="密码:").grid(row=1, column=0)
self.password_entry = tk.Entry(self.login_frame, show="*")
self.password_entry.grid(row=1, column=1)
self.login_btn = tk.Button(self.login_frame, text="登录", command=self.start_login)
self.login_btn.grid(row=2, column=0, pady=5)
self.register_btn = tk.Button(self.login_frame, text="注册", command=self.start_register)
self.register_btn.grid(row=2, column=1, pady=5)
self.chat_frame = tk.Frame(self.root)
self.chat_history = scrolledtext.ScrolledText(self.chat_frame, state='disabled')
self.chat_history.pack(pady=5, padx=5, fill=tk.BOTH, expand=True)
self.message_entry = tk.Entry(self.chat_frame)
self.message_entry.pack(pady=5, padx=5, fill=tk.X)
self.send_btn = tk.Button(self.chat_frame, text="发送", command=self.send_message)
self.send_btn.pack(pady=5)
def check_messages(self):
try:
while True:
msg_type, data = self.message_queue.get_nowait()
if msg_type == "login_result":
self.handle_login_result(data)
elif msg_type == "register_result":
self.handle_register_result(data)
elif msg_type == "chat_message":
self.display_message(data)
elif msg_type == "error":
messagebox.showerror("错误", data)
elif msg_type == "disconnect":
self.handle_disconnect()
except queue.Empty:
pass
finally:
self.root.after(100, self.check_messages)
def start_network_thread(self, task, *args):
if self.network_thread and self.network_thread.is_alive():
return False
self.network_thread = threading.Thread(
target=task,
args=args,
daemon=True
)
self.network_thread.start()
return True
def start_login(self):
if not self.start_network_thread(self.perform_login):
messagebox.showwarning("警告", "已有操作正在进行")
def start_register(self):
if not self.start_network_thread(self.perform_register):
messagebox.showwarning("警告", "已有操作正在进行")
def perform_login(self):
try:
username = self.username_entry.get()
password = self.password_entry.get()
if not username or not password:
self.message_queue.put(("error", "用户名和密码不能为空"))
return
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
try:
sock.connect(("localhost", 8889))
sock.settimeout(10)
sock.sendall(json.dumps({
"type": "login",
"username": username,
"password": password
}).encode())
response = sock.recv(1024)
if not response:
self.message_queue.put(("error", "服务器无响应"))
return
result = json.loads(response.decode())
self.message_queue.put(("login_result", (result, sock)))
except socket.timeout:
self.message_queue.put(("error", "连接超时"))
except Exception as e:
self.message_queue.put(("error", f"登录错误: {str(e)}"))
if sock:
sock.close()
except Exception as e:
self.message_queue.put(("error", f"系统错误: {str(e)}"))
def perform_register(self):
try:
username = self.username_entry.get()
password = self.password_entry.get()
if not username or not password:
self.message_queue.put(("error", "用户名和密码不能为空"))
return
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
try:
sock.connect(("localhost", 8888))
sock.settimeout(10)
sock.sendall(json.dumps({
"type": "register",
"username": username,
"password": password
}).encode())
response = sock.recv(1024)
if not response:
self.message_queue.put(("error", "服务器无响应"))
return
result = json.loads(response.decode())
self.message_queue.put(("register_result", result))
except socket.timeout:
self.message_queue.put(("error", "连接超时"))
except Exception as e:
self.message_queue.put(("error", f"注册错误: {str(e)}"))
finally:
if sock:
sock.close()
except Exception as e:
self.message_queue.put(("error", f"系统错误: {str(e)}"))
def handle_login_result(self, data):
result, sock = data
if result.get("status") == "success":
self.socket = sock
self.connected = True
self.username = self.username_entry.get()
self.login_frame.pack_forget()
self.chat_frame.pack(fill=tk.BOTH, expand=True)
threading.Thread(target=self.receive_messages, daemon=True).start()
else:
self.message_queue.put(("error", result.get("message", "登录失败")))
if sock:
sock.close()
def handle_register_result(self, result):
if result.get("success"):
messagebox.showinfo("注册成功", "账号注册成功,请登录")
else:
self.message_queue.put(("error", result.get("message", "注册失败")))
def send_message(self):
message = self.message_entry.get()
if not message or not self.connected:
return
try:
self.socket.sendall(json.dumps({
"type": "chat",
"message": message
}).encode())
self.message_entry.delete(0, tk.END)
except Exception as e:
self.message_queue.put(("error", f"发送消息失败: {str(e)}"))
self.connected = False
self.message_queue.put(("disconnect", None))
def receive_messages(self):
while self.connected:
try:
data = self.socket.recv(1024)
if not data:
break
message = json.loads(data.decode())
self.message_queue.put(("chat_message", message))
except:
break
self.connected = False
self.message_queue.put(("disconnect", None))
def display_message(self, message):
self.chat_history.config(state='normal')
self.chat_history.insert(tk.END, f"{message.get('user', '系统')}: {message.get('message')}\n")
self.chat_history.config(state='disabled')
self.chat_history.see(tk.END)
def handle_disconnect(self):
if self.connected:
self.socket.close()
self.connected = False
self.chat_frame.pack_forget()
self.login_frame.pack(pady=10)
messagebox.showinfo("断开连接", "与服务器的连接已断开")
def on_closing(self):
if self.connected and self.socket:
self.socket.close()
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
root.geometry("500x400")
client = ChatClient(root)
root.protocol("WM_DELETE_WINDOW", client.on_closing)
root.mainloop()