【硬核实战】用博查AI Search API打造智能问答系统:多模态+流式响应,让AI“会思考”!
经典问题:为什么ChatGPT回答“今日金价”总是过时?传统问答系统的痛点:解决方案:博查AI Search API + 流式响应 + 多模态解析 = 会思考的问答系统!
·
当用户问“北京今天适合穿什么?”时,传统问答系统只能回复“气温25℃”,而基于博查AI Search的系统能回答:“气温25℃(天气卡),紫外线较强(百科卡),建议穿短袖+防晒衣(大模型生成)”——这才是真正的智能问答!本文将手把手实现这套系统!
引言:问答系统的革命性升级
经典问题:为什么ChatGPT回答“今日金价”总是过时?
传统问答系统的痛点:
- 数据滞后:无法获取实时股价/天气
- 交互单一:仅支持文本输入
- 结果简陋:纯文本缺乏可视化
解决方案:
博查AI Search API + 流式响应 + 多模态解析 = 会思考的问答系统!
一、系统架构设计:三模块解耦
+-------------------+
用户提问 --> | 前端交互层 | --> 语音/文本输入
| (Flask+JavaScript)| <--> 答案流式渲染
+-------------------+
↓
+-------------------+
| 逻辑处理层 | --> 博查API调用
| (多模态解析/会话管理)| <--> 大模型生成
+-------------------+
↓
+-------------------+
| 数据持久层 | --> 会话记录存储
| (SQLite/Redis) | <--> 上下文缓存
+-------------------+
二、手把手搭建:从零到一的完整实现
2.1 环境准备
# 创建项目目录
mkdir smart-qa && cd smart-qa
# 安装依赖
pip install flask requests python-dotenv
# 文件结构
touch app.py requirements.txt .env
mkdir templates
2.2 流式API封装
import json
from typing import Generator
def stream_ai_search(query, api_key):
url = "https://api.bochaai.com/v1/ai-search"
headers = {"Authorization": f"Bearer {api_key}"}
payload = {
"query": query,
"answer": True,
"stream": True,
"count": 5
}
try:
response = requests.post(url, headers=headers, json=payload, stream=True)
response.raise_for_status()
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8').strip()
if decoded_line.startswith('data:'):
try:
json_str = decoded_line.split('data:', 1)[1].strip()
data = json.loads(json_str)
yield format_response(data)
except json.JSONDecodeError as e:
print(f"JSON解析失败: {e}\n原始数据: {decoded_line}")
continue
except requests.exceptions.RequestException as e:
print(f"API请求失败: {str(e)}")
yield json.dumps({"error": "服务暂时不可用,请稍后重试"})
def format_response(data):
"""统一格式化响应数据"""
result = {"type": "unknown", "content": ""}
if data.get("event") == "message":
msg = data.get("message", {})
# 处理网页搜索结果
if msg.get("type") == "source" and msg.get("content_type") == "webpage":
try:
webpage = json.loads(msg.get("content", "{}"))
result.update({
"type": "webpage",
"content": {
"title": webpage.get("name", "无标题"),
"url": webpage.get("url", "#"),
"snippet": webpage.get("snippet", "无摘要")
}
})
except json.JSONDecodeError:
pass
# 处理文本回答
elif msg.get("type") == "answer":
result.update({
"type": "answer",
"content": msg.get("content", "").replace("\n", " ")
})
# 处理天气卡片
elif msg.get("content_type") == "weather_china":
try:
content = json.loads(msg.get("content", "{}"))
result.update({
"type": "weather",
"content": {
"city": content.get("city", "未知"),
"temp": content.get("temp", "--"),
"weather": content.get("weather", "--")
}
})
except json.JSONDecodeError:
pass
return f"data: {json.dumps(result)}\n\n"
2.2 多轮对话管理
from uuid import uuid4
class DialogueManager:
def __init__(self):
self.sessions = {}
def create_session(self):
session_id = str(uuid4())
self.sessions[session_id] = {"history": []}
return session_id
# 使用示例
manager = DialogueManager()
session_id = manager.new_session()
三、前端交互:动态流式渲染
HTML实时渲染(EventSource)
<script>
let answerBuffer = '';
let isStreaming = false;
let eventSource = null;
function askQuestion() {
if (isStreaming) return;
const query = document.getElementById('query-input').value;
if (!query) return;
// 初始化界面
const container = document.getElementById('answer-container');
const loading = document.getElementById('loading');
container.innerHTML = '<div class="answer-text"></div><span class="cursor"></span>';
loading.style.display = 'block';
answerBuffer = '';
isStreaming = true;
// 关闭已有连接
if (eventSource) eventSource.close();
eventSource = new EventSource(`/ask?q=${encodeURIComponent(query)}`);
eventSource.onmessage = (e) => {
loading.style.display = 'none';
try {
const data = JSON.parse(e.data);
switch(data.type) {
case 'answer':
answerBuffer += data.content;
updateAnswerDisplay();
break;
case 'webpage':
container.innerHTML += `
<div class="webpage-card">
<a href="${data.content.url}" target="_blank" class="webpage-title">
${data.content.title}
</a>
<p class="webpage-snippet">${data.content.snippet}</p>
</div>`;
break;
// 其他类型处理...
}
} catch (error) {
console.error('数据处理错误:', error);
}
};
eventSource.onerror = () => {
isStreaming = false;
loading.style.display = 'none';
eventSource.close();
};
}
function updateAnswerDisplay() {
const answerDiv = document.querySelector('.answer-text');
if (answerDiv) {
answerDiv.textContent = answerBuffer;
// 自动滚动
answerDiv.scrollIntoView({ behavior: 'smooth', block: 'end' });
}
}
function followUp(question) {
document.getElementById('query-input').value = question;
askQuestion();
}
</script>
四、实战效果:从搜索到智能问答
4.1 问答流程示例
用户输入:杭州今天天气
系统响应:
亮点:
- 搜索返回时长为毫秒级
- 智能生成回答
五、优化方向:让系统更“聪明”
-
语义缓存:
# 缓存相似问题的答案 from sentence_transformers import SentenceTransformer encoder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') def get_semantic_key(query: str) -> str: embedding = encoder.encode(query) return hash(tuple(embedding)) # 生成语义哈希
-
异常熔断:
from circuitbreaker import circuit @circuit(failure_threshold=5, recovery_timeout=60) def call_api_safely(query: str): # 添加重试和降级逻辑
六、项目完整代码
注意事项:
- 代码文件命名为 app.py
- 在 app.py 所在的工作目录下新建一个文件夹:templates
- 在 templates 文件夹中新建 html 文件 index.html ,并复制5.1节中的代码
- 在 app.py 所在的工作目录下新建 .env 文件,然后在里面写入:BOCHA_API_KEY = “sk-你的密钥”
- 最后运行 app.py ,打开
http://localhost:5000
,即可访问!
后端代码
# app.py
import os
import json
import requests
from flask import Flask, render_template, request, Response, jsonify
from dotenv import load_dotenv
from uuid import uuid4
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY", "supersecretkey")
class DialogueManager:
def __init__(self):
self.sessions = {}
def create_session(self):
session_id = str(uuid4())
self.sessions[session_id] = {"history": []}
return session_id
manager = DialogueManager()
def stream_ai_search(query, api_key):
url = "https://api.bochaai.com/v1/ai-search"
headers = {"Authorization": f"Bearer {api_key}"}
payload = {
"query": query,
"answer": True,
"stream": True,
"count": 5
}
try:
response = requests.post(url, headers=headers, json=payload, stream=True)
response.raise_for_status()
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8').strip()
if decoded_line.startswith('data:'):
try:
json_str = decoded_line.split('data:', 1)[1].strip()
data = json.loads(json_str)
yield format_response(data)
except json.JSONDecodeError as e:
print(f"JSON解析失败: {e}\n原始数据: {decoded_line}")
continue
except requests.exceptions.RequestException as e:
print(f"API请求失败: {str(e)}")
yield json.dumps({"error": "服务暂时不可用,请稍后重试"})
def format_response(data):
"""统一格式化响应数据"""
result = {"type": "unknown", "content": ""}
if data.get("event") == "message":
msg = data.get("message", {})
# 处理网页搜索结果
if msg.get("type") == "source" and msg.get("content_type") == "webpage":
try:
webpage = json.loads(msg.get("content", "{}"))
result.update({
"type": "webpage",
"content": {
"title": webpage.get("name", "无标题"),
"url": webpage.get("url", "#"),
"snippet": webpage.get("snippet", "无摘要")
}
})
except json.JSONDecodeError:
pass
# 处理文本回答
elif msg.get("type") == "answer":
result.update({
"type": "answer",
"content": msg.get("content", "").replace("\n", " ")
})
# 处理天气卡片
elif msg.get("content_type") == "weather_china":
try:
content = json.loads(msg.get("content", "{}"))
result.update({
"type": "weather",
"content": {
"city": content.get("city", "未知"),
"temp": content.get("temp", "--"),
"weather": content.get("weather", "--")
}
})
except json.JSONDecodeError:
pass
return f"data: {json.dumps(result)}\n\n"
@app.route('/')
def index():
session_id = manager.create_session()
return render_template('index.html', session_id=session_id)
@app.route('/ask')
def ask():
query = request.args.get('q')
if not query:
return jsonify({"error": "缺少查询参数"}), 400
return Response(
stream_ai_search(
query=query,
api_key=os.getenv("BOCHA_API_KEY")
),
mimetype='text/event-stream'
)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
前端代码
<!-- templates/index.html -->
<!DOCTYPE html>
<html>
<head>
<title>智能问答系统 - 优化版</title>
<style>
body { max-width: 800px; margin: 20px auto; font-family: Arial, sans-serif; }
#answer-container {
min-height: 300px;
border: 1px solid #ddd;
padding: 15px;
border-radius: 8px;
margin: 10px 0;
}
.card {
padding: 10px;
margin: 10px 0;
border-radius: 4px;
}
.weather-card { background: #e3f2fd; }
.loading {
display: none;
color: #666;
margin: 10px 0;
}
#query-input {
width: 70%;
padding: 8px;
border: 1px solid #ddd;
border-radius: 4px;
}
button {
padding: 8px 15px;
background: #007bff;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.cursor {
display: inline-block;
width: 8px;
height: 1em;
background: #333;
vertical-align: middle;
animation: blink 1s infinite;
}
.webpage-card {
border-left: 4px solid #007bff;
margin: 15px 0;
padding: 10px;
background: #f8f9fa;
}
.webpage-title {
color: #007bff;
font-weight: bold;
}
.webpage-snippet {
color: #666;
font-size: 0.9em;
}
@keyframes blink {
50% { opacity: 0; }
}
</style>
</head>
<body>
<h1>智能问答系统</h1>
<div>
<input type="text" id="query-input" placeholder="输入您的问题...">
<button onclick="askQuestion()">提问</button>
</div>
<div id="loading" class="loading">正在思考中...</div>
<div id="answer-container"></div>
<script>
let answerBuffer = '';
let isStreaming = false;
let eventSource = null;
function askQuestion() {
if (isStreaming) return;
const query = document.getElementById('query-input').value;
if (!query) return;
// 初始化界面
const container = document.getElementById('answer-container');
const loading = document.getElementById('loading');
container.innerHTML = '<div class="answer-text"></div><span class="cursor"></span>';
loading.style.display = 'block';
answerBuffer = '';
isStreaming = true;
// 关闭已有连接
if (eventSource) eventSource.close();
eventSource = new EventSource(`/ask?q=${encodeURIComponent(query)}`);
eventSource.onmessage = (e) => {
loading.style.display = 'none';
try {
const data = JSON.parse(e.data);
switch(data.type) {
case 'answer':
answerBuffer += data.content;
updateAnswerDisplay();
break;
case 'webpage':
container.innerHTML += `
<div class="webpage-card">
<a href="${data.content.url}" target="_blank" class="webpage-title">
${data.content.title}
</a>
<p class="webpage-snippet">${data.content.snippet}</p>
</div>`;
break;
// 其他类型处理...
}
} catch (error) {
console.error('数据处理错误:', error);
}
};
eventSource.onerror = () => {
isStreaming = false;
loading.style.display = 'none';
eventSource.close();
};
}
function updateAnswerDisplay() {
const answerDiv = document.querySelector('.answer-text');
if (answerDiv) {
answerDiv.textContent = answerBuffer;
// 自动滚动
answerDiv.scrollIntoView({ behavior: 'smooth', block: 'end' });
}
}
function followUp(question) {
document.getElementById('query-input').value = question;
askQuestion();
}
</script>
</body>
</html>
总结:为什么选择博查AI Search API?
- 全链路支持:从搜索到生成一站式解决
- 成本效益:比自建爬虫节省70%开发成本
- 合规保障:所有结果均来自合法公开数据源
下一步计划:
- 实现语音输入
- 实现跨模态问答(如“显示北京天气的卫星云图”)
Q&A:欢迎在评论区留言讨论,遇到问题可提供完整错误日志以便快速定位!
🔗 系列文章推荐:
【实战教程】用博查Web Search API构建定制化搜索引擎:从零到一打造专属搜索工具!
【极简实战】用博查AI Search + Python+DeepSeek打造你的私有知识库!无需Elasticsearch,三步搞定RAG!
【硬核教程】博查AI Search API实战:全网搜索+智能问答+多模态卡片一键集成!AI时代的搜索引擎!
让AI告别“古董知识库”,拥抱实时智能新时代!🚀
更多推荐
所有评论(0)