chat2db调用ollama实现数据库的操作。
发布时间:2025-06-24 17:21:52 作者:北方职教升学中心 阅读量:920
只试了mysql的调用。
其它的我也不用,本来想充钱算了。最后一看单位是美刀。就放弃了这分心。于是折腾了一下。
本地运行chat2db 及chat2db ui
https://gitee.com/ooooinfo/Chat2DB
clone 后运行起来 chat2db的java端,我现在搞不清这一个项目是有没有链接到数据库里去。
在idea项目中运行
前端在:chat2db-client中。我的环境是 node 20 ,yarn , 注意可能需要 yarn add electron
直接运行:yarn start
安装ollama 以及模型 qwen2.5 这里我也不懂。不知装那一个好。
https://ollama.com/
在powershell下运行:ollama run qwen2.5 或 ollama pull qwen2.5
最终你要保证ollama启运。
仿一下openai的接口 调用ollama 提供给chat2db:
chat2db中这样设置,所以需要我自己写一个app.py 去做一下代理请求ollama,不是我不想写自定义,主要是总不成功。不如直接仿openai .
app.py的部分代码。
我用的conda 创建的3.9的环境:
requirements.txt
fastapi==0.104.1uvicorn==0.24.0httpx==0.25.1tenacity==8.2.3backoff
相关的app.py的代码:
from fastapi import FastAPI, HTTPException, Request, Dependsfrom fastapi.middleware.cors import CORSMiddlewarefrom fastapi.responses import JSONResponse, StreamingResponseimport httpximport uvicornimport tracebackimport sysimport timeimport jsonimport asyncioimport reimport backoff # 确保已安装 backoff 库app = FastAPI( title="Ollama API Adapter", description="An adapter for Ollama API that mimics OpenAI API format", version="1.0.0")app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"],)API_KEY = "sk-123456"OLLAMA_BASE_URL = "http://127.0.0.1:11434"DEFAULT_MODEL = "qwen2.5:latest"def verify_api_key(request: Request): authorization: str = request.headers.get('authorization') if not authorization: raise HTTPException(status_code=401, detail="Authorization header is missing.") token_type, _, token = authorization.partition(' ') if token_type.lower() != 'bearer' or token != API_KEY: raise HTTPException(status_code=401, detail="Unauthorized: API Key is invalid or missing.")@app.get("/v1", dependencies=[Depends(verify_api_key)])async def root(): """ Root endpoint that returns API information """ return { "version": "1.0.0", "status": "ok", "endpoints": [ "/v1/chat/completions", "/v1/models", "/health" ] }@app.get("/v1/models", dependencies=[Depends(verify_api_key)])async def list_models(): """ List available models """ try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{OLLAMA_BASE_URL}/api/tags") if response.status_code == 200: models = response.json().get("models", []) return { "data": [ { "id": model["name"], "object": "model", "created": 0, "owned_by": "ollama" } for model in models ] } else: raise HTTPException(status_code=503, detail="Ollama service unavailable") except Exception as e: print(f"Error listing models: {str(e)}") raise HTTPException(status_code=503, detail=str(e))@app.get("/health", dependencies=[Depends(verify_api_key)])async def health_check(): """ 健康检查接口 """ try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(f"{OLLAMA_BASE_URL}/api/tags") if response.status_code == 200: print("Ollama service is healthy.") return {"status": "healthy", "message": "服务运行正常"} else: print("Ollama service is not healthy.") raise HTTPException(status_code=503, detail="Ollama 服务不可用") except httpx.HTTPStatusError as exc: print(f"HTTP error occurred: {exc.response.status_code}") raise HTTPException(status_code=exc.response.status_code, detail=str(exc)) except httpx.RequestError as exc: print(f"An error occurred while requesting {exc.request.url!r}.") raise HTTPException(status_code=500, detail=str(exc))async def generate_sse_response(content): # 提取 SQL 语句并添加换行 sql_match = re.search(r'```sql\n(.*?)\n```', content, re.DOTALL) sql = sql_match.group(1).strip() if sql_match else content # 添加换行符 formatted_content = f"{sql}\n" # 构造 OpenAI API 格式的响应 response_data = { "id": f"chatcmpl-{int(time.time())}", "object": "chat.completion.chunk", "created": int(time.time()), "model": "gpt-3.5-turbo", "choices": [{ "delta": { "content": formatted_content # 使用带换行的内容 }, "finish_reason": None, "index": 0 }] } # 发送主要内容 yield f"data: {json.dumps(response_data, ensure_ascii=False)}\n\n" # 发送结束消息 finish_response = { "id": f"chatcmpl-{int(time.time())}", "object": "chat.completion.chunk", "created": int(time.time()), "model": "gpt-3.5-turbo", "choices": [{ "delta": {}, "finish_reason": "stop", "index": 0}] } yield f"data: {json.dumps(finish_response, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n"# 重试策略装饰器@backoff.on_exception(backoff.expo, httpx.ReadTimeout, max_tries=5, max_time=300)async def send_request(ollama_request): timeout_config = httpx.Timeout(10.0, read=120.0) # 连接超10秒,读取超时120秒 async with httpx.AsyncClient(timeout=timeout_config) as client: try: response = await client.post( f"{OLLAMA_BASE_URL}/api/chat", json=ollama_request ) print(f"Response received with status {response.status_code}") return response except httpx.RequestError as exc: print(f"An error occurred while requesting {exc.request.url!r}.") raise HTTPException(status_code=500, detail=str(exc))@app.post("/v1/chat/completions", dependencies=[Depends(verify_api_key)])@app.post("/chat/completions", dependencies=[Depends(verify_api_key)])@app.post("/", dependencies=[Depends(verify_api_key)])async def chat_completions(request: Request): try: body = await request.json() messages = body.get("messages", []) stream = body.get("stream", True) print(f"Received request with body: {body}") # 使用 print 打印请求体 ollama_request = { "model": DEFAULT_MODEL, "messages": messages, "stream": False } response = await send_request(ollama_request) print(f"Received response: {response.text}") # 使用 print 打印响应文本 if response.status_code != 200: print(f"Failed to get response from model, status code: {response.status_code}") raise HTTPException(status_code=400, detail="Failed to get response from model") ollama_response = response.json() content = ollama_response.get("message", {}).get("content", "") print(f"Processed content: {content}") # 使用 print 打印处理后的内容 if not stream: result = { "id": f"chatcmpl-{int(time.time())}", "object": "chat.completion", "created": int(time.time()), "model": DEFAULT_MODEL, "choices": [{ "message": { "role": "database developer and expert", "content": content }, "finish_reason": "stop", "index": 0 }] } print(f"Returning non-stream response: {result}") # 使用 print 打印非流响应 return result headers = { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive" } return StreamingResponse( generate_sse_response(content), media_type="text/event-stream", headers=headers ) except json.JSONDecodeError as e: print(f"JSON decoding error: {str(e)}") return JSONResponse(status_code=400, content={"message": "Invalid JSON data"}) except Exception as e: print(f"Error during chat completions: {str(e)}") print(traceback.format_exc()) # 使用 print 打印堆栈跟踪 return JSONResponse( status_code=500, content={"message": "Internal server error"} ) if __name__ == "__main__": print("Starting server on 0.0.0.0:8080") uvicorn.run(app, host="0.0.0.0", port=8080)
上面代码装key及model都写死,所以你一下要先下载下来相关的模型 。
python app.py
再注意以下本置:
在chat2db做好链接,再输入你的提示词。见下面效果:
响应速度几秒钟,当时看自己电脑响应速度了。都不花钱了,就不要什么自行车了。