本文由 资源共享网 – ziyuan 发布,转载请注明出处,如有问题请联系我们![免费]ollama客户端源码
收藏ollama可以本地部署DeepSeek等AI模型,只是没有安装后没有图型界面;所以写了一个
import tkinter as tk from tkinter import ttk, messagebox import requests import threading import json import webbrowser from tkinter import scrolledtext import time import tkinter.font as font # Ollama API 地址 OLLAMA_API_URL = "http://localhost:11434/api" # 定义颜色常量,使用多彩颜色方案 BG_COLOR = "#f5f5f5" # 背景色 FG_COLOR = "#333333" # 前景色 BUTTON_BG_COLORS = ["#00C2FE", "#6CE264", "#FF3A48", "#FF8A30","#84BDFE"] # 按钮背景色数组 BUTTON_HOVER_BG_COLORS = ["#0087B1", "#1E7F18", "#B7000D", "#BE5200","#0059BE"] # 按钮悬停背景色数组 BUTTON_FG_COLOR = "white" # 按钮前景色 LABEL_BG_COLOR = "#e0e0e0" # 标签背景色 CODE_BG_COLOR = "#f8f9fa" # 代码块背景色 class OllamaClient: def __init__(self, root): self.root = root self.root.title("08i8 本地客户端 V1.0") self.root.configure(bg=BG_COLOR) # 使窗口在桌面居中 self.center_window() # 创建界面组件 self.create_widgets() # 初始化模型列表 self.update_model_list() # 下载取消标志位 self.cancel_download = False # 关闭程序标志位 self.cancel_APP = False # 绑定窗口关闭事件 self.root.protocol("WM_DELETE_WINDOW", self.on_close) def center_window(self): # 获取屏幕宽度和高度 screen_width = self.root.winfo_screenwidth() screen_height = self.root.winfo_screenheight() # 窗口宽度和高度 window_width = 800 window_height = 600 # 计算窗口左上角的坐标 x = (screen_width - window_width) // 2 y = (screen_height - window_height) // 2 # 设置窗口的位置和大小 self.root.geometry(f"{window_width}x{window_height}+{x}+{y}") def create_widgets(self): # 使用 grid 布局管理器 self.root.columnconfigure(0, weight=1) self.root.rowconfigure(1, weight=1) # 顶部框架,用于放置模型选择和下载相关组件 top_frame = tk.Frame(self.root, bg=BG_COLOR) top_frame.grid(row=0, column=0, padx=10, pady=10, sticky="ew") top_frame.columnconfigure(0, weight=1) # 模型选择下拉框 self.model_combobox = ttk.Combobox(top_frame, background=LABEL_BG_COLOR) self.model_combobox.grid(row=0, column=0, padx=10, sticky="w") # 打开模型库按钮 open_library_button = tk.Button(top_frame, text="打开模型库", command=self.open_model_library, bg=BUTTON_BG_COLORS[0], fg=BUTTON_FG_COLOR,width=10, height=1) open_library_button.grid(row=0, column=1, padx=10) open_library_button.bind("<Enter>", lambda e: e.widget.config(bg=BUTTON_HOVER_BG_COLORS[0])) open_library_button.bind("<Leave>", lambda e: e.widget.config(bg=BUTTON_BG_COLORS[0])) # 下载模型按钮 self.download_button = tk.Button(top_frame, text="下载模型", command=self.download_model, bg=BUTTON_BG_COLORS[1], fg=BUTTON_FG_COLOR,width=10, height=1) self.download_button.grid(row=0, column=2, padx=10) self.download_button.bind("<Enter>", lambda e: e.widget.config(bg=BUTTON_HOVER_BG_COLORS[1])) self.download_button.bind("<Leave>", lambda e: e.widget.config(bg=BUTTON_BG_COLORS[1])) # 删除模型按钮 self.delete_button = tk.Button(top_frame, text="删除模型", command=self.delete_model, bg=BUTTON_BG_COLORS[2], fg=BUTTON_FG_COLOR,width=10, height=1) self.delete_button.grid(row=0, column=3, padx=10) self.delete_button.bind("<Enter>", lambda e: e.widget.config(bg=BUTTON_HOVER_BG_COLORS[2])) self.delete_button.bind("<Leave>", lambda e: e.widget.config(bg=BUTTON_BG_COLORS[2])) # 下载进度条及相关信息框架 self.download_frame = tk.Frame(self.root, bg=BG_COLOR) self.download_frame.grid(row=2, column=0, padx=10, pady=5, sticky="ew") self.download_frame.grid_forget() self.download_progress = ttk.Progressbar(self.download_frame, orient="horizontal", length=300, mode="determinate") self.download_progress.pack(side=tk.LEFT, padx=10) self.download_info_label = tk.Label(self.download_frame, text="", bg=LABEL_BG_COLOR, fg=FG_COLOR) self.download_info_label.pack(side=tk.LEFT, padx=10) # 创建字体对象 title_font = font.Font(family="黑体", size=12) # 聊天记录显示框,使用 scrolledtext 增加滚动条 self.chat_text = scrolledtext.ScrolledText(self.root, width=80, height=20, bg=LABEL_BG_COLOR, fg=FG_COLOR) self.chat_text.grid(row=1, column=0, padx=10, pady=10, sticky="nsew") self.chat_text.bind("<Button-3>", self.show_chat_context_menu) self.chat_text.tag_configure("title", font=title_font) self.chat_text.tag_configure("code", background=CODE_BG_COLOR) # 输入框使用 scrolledtext self.input_text = scrolledtext.ScrolledText(self.root, width=80, height=5, bg=LABEL_BG_COLOR, fg=FG_COLOR) self.input_text.grid(row=3, column=0, padx=10, pady=10, sticky="ew") self.input_text.bind("<Button-3>", self.show_input_context_menu) # 创建一个新的框架来放置发送按钮和清除按钮 button_frame = tk.Frame(self.root, bg=BG_COLOR) button_frame.grid(row=4, column=0, padx=10, pady=10, sticky="e") # 发送按钮 self.send_button = tk.Button(button_frame, text="发送", command=self.send_message, bg=BUTTON_BG_COLORS[3], fg=BUTTON_FG_COLOR,width=10, height=1) self.send_button.grid(row=0, column=0, padx=10) self.send_button.bind("<Enter>", lambda e: e.widget.config(bg=BUTTON_HOVER_BG_COLORS[3])) self.send_button.bind("<Leave>", lambda e: e.widget.config(bg=BUTTON_BG_COLORS[3])) # 清空聊天记录按钮 self.clear_button = tk.Button(button_frame, text="清空聊天", command=self.clear_chat, bg=BUTTON_BG_COLORS[4], fg=BUTTON_FG_COLOR,width=10, height=1) self.clear_button.grid(row=0, column=1, padx=10) self.clear_button.bind("<Enter>", lambda e: e.widget.config(bg=BUTTON_HOVER_BG_COLORS[4])) self.clear_button.bind("<Leave>", lambda e: e.widget.config(bg=BUTTON_BG_COLORS[4])) def update_model_list(self): try: response = requests.get(f"{OLLAMA_API_URL}/tags") if response.status_code == 200: models = [model["name"] for model in response.json().get("models", [])] self.model_combobox['values'] = models if models: self.model_combobox.set(models[0]) except Exception as e: messagebox.showerror("错误", f"获取模型列表失败: {str(e)}") def open_model_library(self): webbrowser.open("https://ollama.com/library") def download_model(self): # 创建自定义输入对话框并居中显示 dialog = tk.Toplevel(self.root) dialog.title("下载模型") dialog.geometry("300x150") dialog.configure(bg=BG_COLOR) dialog.attributes("-topmost", True) # 使对话框居中 dialog.update_idletasks() width = dialog.winfo_width() height = dialog.winfo_height() x = (self.root.winfo_screenwidth() // 2) - (width // 2) y = (self.root.winfo_screenheight() // 2) - (height // 2) dialog.geometry(f"{width}x{height}+{x}+{y}") tk.Label(dialog, text="请输入要下载的模型名称:", bg=BG_COLOR, fg=FG_COLOR).pack(pady=20) model_entry = tk.Entry(dialog, bg=LABEL_BG_COLOR, fg=FG_COLOR) model_entry.pack(pady=10) def start_download(): model_name = model_entry.get() if model_name: dialog.destroy() self.download_frame.grid() self.cancel_download = False # 重置取消标志位 threading.Thread(target=self._download_model_thread, args=(model_name,)).start() download_btn = tk.Button(dialog, text="开始下载", command=start_download, bg=BUTTON_BG_COLORS[1], fg=BUTTON_FG_COLOR) download_btn.pack(pady=10) download_btn.bind("<Enter>", lambda e: e.widget.config(bg=BUTTON_HOVER_BG_COLORS[1])) download_btn.bind("<Leave>", lambda e: e.widget.config(bg=BUTTON_BG_COLORS[1])) def on_close(self): # 设置下载取消标志 self.cancel_download = True self.cancel_APP = True # 关闭主窗口 self.root.destroy() def _download_model_thread(self, model_name): try: data = { "name": model_name } response = requests.post(f"{OLLAMA_API_URL}/pull", json=data, stream=True) if response.status_code == 200: total_size = None downloaded_size = 0 start_time = time.time() prev_downloaded = 0 prev_progress = 0 prev_time = start_time self.download_progress['value'] = 0 for line in response.iter_lines(): if self.cancel_download: # 检查取消标志位 messagebox.showinfo("提示", "下载已取消") break if line: try: chunk = line.decode('utf-8') data = json.loads(chunk) if 'total' in data: total_size = data['total'] if 'completed' in data: downloaded_size = data['completed'] if total_size: progress = (downloaded_size / total_size) * 100 current_time = time.time() # 设置进度更新阈值和时间间隔 if progress - prev_progress >= 1 or current_time - prev_time >= 0.5: elapsed_time = current_time - start_time if elapsed_time > 0: speed = (downloaded_size - prev_downloaded) / elapsed_time / 1024 # 根据文件大小动态调整单位 if total_size / (1024 * 1024 * 1024) >= 1: total_size_gb = total_size / (1024 * 1024 * 1024) downloaded_size_gb = downloaded_size / (1024 * 1024 * 1024) info_text = f"速度: {speed:.2f} KB/s, 已下载: {downloaded_size_gb:.2f} GB, 总大小: {total_size_gb:.2f} GB, 进度: {progress:.2f}%" else: total_size_kb = total_size / 1024 downloaded_size_kb = downloaded_size / 1024 info_text = f"速度: {speed:.2f} KB/s, 已下载: {downloaded_size_kb:.2f} KB, 总大小: {total_size_kb:.2f} KB, 进度: {progress:.2f}%" self.download_info_label.config(text=info_text) prev_downloaded = downloaded_size self.download_progress['value'] = progress self.root.update_idletasks() prev_progress = progress prev_time = current_time except Exception as e: messagebox.showerror("错误", f"解析下载进度出错: {str(e)}") if not self.cancel_download: messagebox.showinfo("成功", "模型下载成功") self.update_model_list() else: messagebox.showerror("错误", f"模型下载失败: {response.text}") except Exception as e: messagebox.showerror("错误", f"发生异常: {str(e)}") finally: self.download_frame.grid_forget() def delete_model(self): model_name = self.model_combobox.get() if model_name: try: data = { "model": model_name } print(f"尝试删除模型: {model_name}") # 打印要删除的模型名称 response = requests.delete(f"{OLLAMA_API_URL}/delete", json=data) print(f"请求响应状态码: {response.status_code}") # 打印响应状态码 print(f"请求响应内容: {response.text}") # 打印响应内容 if response.status_code == 200: messagebox.showinfo("成功", "模型删除成功") self.update_model_list() else: messagebox.showerror("错误", f"模型删除失败: {response.text}") except Exception as e: messagebox.showerror("错误", f"发生异常: {str(e)}") else: messagebox.showwarning("警告", "请选择要删除的模型") # 在 OllamaClient 类中修改 send_message 方法 def send_message(self): message = self.input_text.get(1.0, tk.END).strip() if message: model_name = self.model_combobox.get() if model_name: # 使用线程来执行推理任务 threading.Thread(target=self._send_message_thread, args=(message, model_name)).start() else: messagebox.showwarning("警告", "请选择要使用的模型") def _send_message_thread(self, message, model_name): try: start_time = time.time() # 记录开始时间 data = { "model": model_name, "prompt": message } # 使用 stream=True 开启流式响应 response = requests.post(f"{OLLAMA_API_URL}/generate", json=data, stream=True) if response.status_code == 200: self.chat_text.insert(tk.END, f"问题: {message}\n","title") end_time = time.time() # 记录结束时间 inference_time = end_time - start_time # 计算推理时间 self.chat_text.insert(tk.END, f"\n思考时间: {inference_time:.2f} 秒\n",) self.root.update_idletasks() # 更新界面 full_response = "" chunk_count = 0 for line in response.iter_lines(): if self.cancel_APP: # 检查取消标志位 break if line: try: chunk = line.decode('utf-8') data = json.loads(chunk) part = data.get("response", "") part = part.replace("<think>", "").replace("</think>", "") full_response += part self.chat_text.insert(tk.END, part) chunk_count += 1 if chunk_count % 10 == 0: # 每接收 10 个数据块更新一次界面 self.root.update_idletasks() except Exception as e: messagebox.showerror("错误", f"解析响应数据出错: {str(e)}") self.root.update_idletasks() # 确保最后一次更新界面 self.input_text.delete(1.0, tk.END) self.highlight_code() else: messagebox.showerror("错误", f"请求失败: {response.text}") except Exception as e: messagebox.showerror("错误", f"发生异常: {str(e)}") def clear_chat(self): self.chat_text.delete(1.0, tk.END) def show_chat_context_menu(self, event): context_menu = tk.Menu(self.root, tearoff=0) context_menu.add_command(label="复制", command=lambda: self.copy_chat_text()) # 获取点击位置的标签 tags = self.chat_text.tag_names(tk.CURRENT) if "code" in tags: context_menu.add_command(label="复制当前代码块", command=lambda: self.copy_current_code_block()) context_menu.add_command(label="复制所有代码块", command=lambda: self.copy_code_block()) context_menu.post(event.x_root, event.y_root) def copy_chat_text(self): selected_text = self.chat_text.get(tk.SEL_FIRST, tk.SEL_LAST) if selected_text: self.root.clipboard_clear() self.root.clipboard_append(selected_text) def copy_code_block(self): tag_ranges = self.chat_text.tag_ranges("code") if tag_ranges: code_text = "" for i in range(0, len(tag_ranges), 2): start = tag_ranges[i] end = tag_ranges[i + 1] code_text += self.chat_text.get(start, end) self.root.clipboard_clear() self.root.clipboard_append(code_text) def copy_current_code_block(self): tag_ranges = self.chat_text.tag_ranges("code") current_pos = self.chat_text.index(tk.CURRENT) print(f"当前鼠标位置: {current_pos}") # 打印当前鼠标位置用于调试 code_text = None # 初始化 code_text 为 None for i in range(0, len(tag_ranges), 2): start = str(tag_ranges[i]) # 将 _tkinter.Tcl_Obj 转换为字符串 end = str(tag_ranges[i + 1]) # 将 _tkinter.Tcl_Obj 转换为字符串 print(f"代码块范围: {start} - {end}") # 打印代码块范围用于调试 if start <= current_pos <= end: code_text = self.chat_text.get(start, end) break if code_text is not None: try: self.root.clipboard_clear() self.root.clipboard_append(code_text) print("代码块复制成功") except tk.TclError as e: print(f"剪贴板操作失败: {e}") else: print("未找到包含当前位置的代码块") def show_input_context_menu(self, event): context_menu = tk.Menu(self.root, tearoff=0) context_menu.add_command(label="复制", command=lambda: self.copy_input_text()) context_menu.add_command(label="粘贴", command=lambda: self.paste_input_text()) context_menu.post(event.x_root, event.y_root) def copy_input_text(self): selected_text = self.input_text.get(tk.SEL_FIRST, tk.SEL_LAST) if selected_text: self.root.clipboard_clear() self.root.clipboard_append(selected_text) def paste_input_text(self): clipboard_text = self.root.clipboard_get() if clipboard_text: self.input_text.insert(tk.INSERT, clipboard_text) def highlight_code(self): content = self.chat_text.get(1.0, tk.END) start_index = 0 while True: start = content.find("===", start_index) if start == -1: break end = content.find("===", start + 3) if end == -1: break start_tag = f"1.0+{start}c" end_tag = f"1.0+{end + 3}c" self.chat_text.tag_add("code", start_tag, end_tag) print(f"代码块范围: {start_tag} - {end_tag}") # 调试信息,打印代码块范围 start_index = end + 3 if __name__ == "__main__": root = tk.Tk() client = OllamaClient(root) root.mainloop()