当用户问“北京今天适合穿什么?”时,传统问答系统只能回复“气温25℃”,而基于博查AI Search的系统能回答:“气温25℃(天气卡),紫外线较强(百科卡),建议穿短袖+防晒衣(大模型生成)”——这才是真正的智能问答!本文将手把手实现这套系统!


引言:问答系统的革命性升级

经典问题:为什么ChatGPT回答“今日金价”总是过时?

传统问答系统的痛点

  • 数据滞后:无法获取实时股价/天气
  • 交互单一:仅支持文本输入
  • 结果简陋:纯文本缺乏可视化

解决方案
博查AI Search API + 流式响应 + 多模态解析 = 会思考的问答系统


一、系统架构设计:三模块解耦

                +-------------------+  
用户提问  -->   | 前端交互层         |  --> 语音/文本输入
                | (Flask+JavaScript)|  <--> 答案流式渲染
                +-------------------++-------------------+  
                | 逻辑处理层         |  --> 博查API调用
                | (多模态解析/会话管理)|  <--> 大模型生成
                +-------------------++-------------------+  
                | 数据持久层         |  --> 会话记录存储
                | (SQLite/Redis)    |  <--> 上下文缓存
                +-------------------+  

二、手把手搭建:从零到一的完整实现

2.1 环境准备

# 创建项目目录
mkdir smart-qa && cd smart-qa

# 安装依赖
pip install flask requests python-dotenv

# 文件结构
touch app.py requirements.txt .env
mkdir templates

2.2 流式API封装

import json
from typing import Generator

def stream_ai_search(query, api_key):
    url = "https://api.bochaai.com/v1/ai-search"
    headers = {"Authorization": f"Bearer {api_key}"}
    payload = {
        "query": query,
        "answer": True,
        "stream": True,
        "count": 5
    }

    try:
        response = requests.post(url, headers=headers, json=payload, stream=True)
        response.raise_for_status()
        
        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8').strip()
                if decoded_line.startswith('data:'):
                    try:
                        json_str = decoded_line.split('data:', 1)[1].strip()
                        data = json.loads(json_str)
                        yield format_response(data)
                    except json.JSONDecodeError as e:
                        print(f"JSON解析失败: {e}\n原始数据: {decoded_line}")
                        continue
    except requests.exceptions.RequestException as e:
        print(f"API请求失败: {str(e)}")
        yield json.dumps({"error": "服务暂时不可用,请稍后重试"})

def format_response(data):
    """统一格式化响应数据"""
    result = {"type": "unknown", "content": ""}
    
    if data.get("event") == "message":
        msg = data.get("message", {})
        
        # 处理网页搜索结果
        if msg.get("type") == "source" and msg.get("content_type") == "webpage":
            try:
                webpage = json.loads(msg.get("content", "{}"))
                result.update({
                    "type": "webpage",
                    "content": {
                        "title": webpage.get("name", "无标题"),
                        "url": webpage.get("url", "#"),
                        "snippet": webpage.get("snippet", "无摘要")
                    }
                })
            except json.JSONDecodeError:
                pass
        
        # 处理文本回答
        elif msg.get("type") == "answer":
            result.update({
                "type": "answer",
                "content": msg.get("content", "").replace("\n", " ")
            })
        
        # 处理天气卡片
        elif msg.get("content_type") == "weather_china":
            try:
                content = json.loads(msg.get("content", "{}"))
                result.update({
                    "type": "weather",
                    "content": {
                        "city": content.get("city", "未知"),
                        "temp": content.get("temp", "--"),
                        "weather": content.get("weather", "--")
                    }
                })
            except json.JSONDecodeError:
                pass
        
    return f"data: {json.dumps(result)}\n\n"

2.2 多轮对话管理

from uuid import uuid4

class DialogueManager:
    def __init__(self):
        self.sessions = {}

    def create_session(self):
        session_id = str(uuid4())
        self.sessions[session_id] = {"history": []}
        return session_id

# 使用示例
manager = DialogueManager()
session_id = manager.new_session()

三、前端交互:动态流式渲染

HTML实时渲染(EventSource)

<script>
        let answerBuffer = '';
        let isStreaming = false;
        let eventSource = null;

        function askQuestion() {
            if (isStreaming) return;
            
            const query = document.getElementById('query-input').value;
            if (!query) return;

            // 初始化界面
            const container = document.getElementById('answer-container');
            const loading = document.getElementById('loading');
            container.innerHTML = '<div class="answer-text"></div><span class="cursor"></span>';
            loading.style.display = 'block';
            answerBuffer = '';
            isStreaming = true;

            // 关闭已有连接
            if (eventSource) eventSource.close();

            eventSource = new EventSource(`/ask?q=${encodeURIComponent(query)}`);

            eventSource.onmessage = (e) => {
                loading.style.display = 'none';
                try {
                    const data = JSON.parse(e.data);
                    
                    switch(data.type) {
                        case 'answer':
                            answerBuffer += data.content;
                            updateAnswerDisplay();
                            break;
                            
                        case 'webpage':
                            container.innerHTML += `
                                <div class="webpage-card">
                                    <a href="${data.content.url}" target="_blank" class="webpage-title">
                                        ${data.content.title}
                                    </a>
                                    <p class="webpage-snippet">${data.content.snippet}</p>
                                </div>`;
                            break;
                            
                        // 其他类型处理...
                    }
                } catch (error) {
                    console.error('数据处理错误:', error);
                }
            };

            eventSource.onerror = () => {
                isStreaming = false;
                loading.style.display = 'none';
                eventSource.close();
            };
        }

        function updateAnswerDisplay() {
            const answerDiv = document.querySelector('.answer-text');
            if (answerDiv) {
                answerDiv.textContent = answerBuffer;
                // 自动滚动
                answerDiv.scrollIntoView({ behavior: 'smooth', block: 'end' });
            }
        }

        function followUp(question) {
            document.getElementById('query-input').value = question;
            askQuestion();
        }
    </script>

四、实战效果:从搜索到智能问答

4.1 问答流程示例

用户输入:杭州今天天气
系统响应
在这里插入图片描述

亮点:

  • 搜索返回时长为毫秒级
  • 智能生成回答

五、优化方向:让系统更“聪明”

  1. 语义缓存

    # 缓存相似问题的答案
    from sentence_transformers import SentenceTransformer
    encoder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
    
    def get_semantic_key(query: str) -> str:
        embedding = encoder.encode(query)
        return hash(tuple(embedding))  # 生成语义哈希
    
  2. 异常熔断

    from circuitbreaker import circuit
    
    @circuit(failure_threshold=5, recovery_timeout=60)
    def call_api_safely(query: str):
        # 添加重试和降级逻辑
    

六、项目完整代码

注意事项:

  • 代码文件命名为 app.py
  • app.py 所在的工作目录下新建一个文件夹:templates
  • templates 文件夹中新建 html 文件 index.html ,并复制5.1节中的代码
  • app.py 所在的工作目录下新建 .env 文件,然后在里面写入:BOCHA_API_KEY = “sk-你的密钥”
  • 最后运行 app.py ,打开http://localhost:5000,即可访问!

后端代码

# app.py
import os
import json
import requests
from flask import Flask, render_template, request, Response, jsonify
from dotenv import load_dotenv
from uuid import uuid4

load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY", "supersecretkey")

class DialogueManager:
    def __init__(self):
        self.sessions = {}

    def create_session(self):
        session_id = str(uuid4())
        self.sessions[session_id] = {"history": []}
        return session_id

manager = DialogueManager()

def stream_ai_search(query, api_key):
    url = "https://api.bochaai.com/v1/ai-search"
    headers = {"Authorization": f"Bearer {api_key}"}
    payload = {
        "query": query,
        "answer": True,
        "stream": True,
        "count": 5
    }

    try:
        response = requests.post(url, headers=headers, json=payload, stream=True)
        response.raise_for_status()
        
        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8').strip()
                if decoded_line.startswith('data:'):
                    try:
                        json_str = decoded_line.split('data:', 1)[1].strip()
                        data = json.loads(json_str)
                        yield format_response(data)
                    except json.JSONDecodeError as e:
                        print(f"JSON解析失败: {e}\n原始数据: {decoded_line}")
                        continue
    except requests.exceptions.RequestException as e:
        print(f"API请求失败: {str(e)}")
        yield json.dumps({"error": "服务暂时不可用,请稍后重试"})

def format_response(data):
    """统一格式化响应数据"""
    result = {"type": "unknown", "content": ""}
    
    if data.get("event") == "message":
        msg = data.get("message", {})
        
        # 处理网页搜索结果
        if msg.get("type") == "source" and msg.get("content_type") == "webpage":
            try:
                webpage = json.loads(msg.get("content", "{}"))
                result.update({
                    "type": "webpage",
                    "content": {
                        "title": webpage.get("name", "无标题"),
                        "url": webpage.get("url", "#"),
                        "snippet": webpage.get("snippet", "无摘要")
                    }
                })
            except json.JSONDecodeError:
                pass
        
        # 处理文本回答
        elif msg.get("type") == "answer":
            result.update({
                "type": "answer",
                "content": msg.get("content", "").replace("\n", " ")
            })
        
        # 处理天气卡片
        elif msg.get("content_type") == "weather_china":
            try:
                content = json.loads(msg.get("content", "{}"))
                result.update({
                    "type": "weather",
                    "content": {
                        "city": content.get("city", "未知"),
                        "temp": content.get("temp", "--"),
                        "weather": content.get("weather", "--")
                    }
                })
            except json.JSONDecodeError:
                pass
        
    return f"data: {json.dumps(result)}\n\n"

@app.route('/')
def index():
    session_id = manager.create_session()
    return render_template('index.html', session_id=session_id)

@app.route('/ask')
def ask():
    query = request.args.get('q')
    if not query:
        return jsonify({"error": "缺少查询参数"}), 400
    
    return Response(
        stream_ai_search(
            query=query,
            api_key=os.getenv("BOCHA_API_KEY")
        ),
        mimetype='text/event-stream'
    )

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

前端代码

<!-- templates/index.html -->
<!DOCTYPE html>
<html>
<head>
    <title>智能问答系统 - 优化版</title>
    <style>
        body { max-width: 800px; margin: 20px auto; font-family: Arial, sans-serif; }
        #answer-container { 
            min-height: 300px; 
            border: 1px solid #ddd; 
            padding: 15px; 
            border-radius: 8px;
            margin: 10px 0;
        }
        .card {
            padding: 10px;
            margin: 10px 0;
            border-radius: 4px;
        }
        .weather-card { background: #e3f2fd; }
        .loading { 
            display: none; 
            color: #666;
            margin: 10px 0;
        }
        #query-input {
            width: 70%;
            padding: 8px;
            border: 1px solid #ddd;
            border-radius: 4px;
        }
        button {
            padding: 8px 15px;
            background: #007bff;
            color: white;
            border: none;
            border-radius: 4px;
            cursor: pointer;
        }
        .cursor {
            display: inline-block;
            width: 8px;
            height: 1em;
            background: #333;
            vertical-align: middle;
            animation: blink 1s infinite;
        }
        .webpage-card {
            border-left: 4px solid #007bff;
            margin: 15px 0;
            padding: 10px;
            background: #f8f9fa;
        }
        .webpage-title {
            color: #007bff;
            font-weight: bold;
        }
        .webpage-snippet {
            color: #666;
            font-size: 0.9em;
        }
        @keyframes blink {
            50% { opacity: 0; }
        }
    </style>
</head>
<body>
    <h1>智能问答系统</h1>
    <div>
        <input type="text" id="query-input" placeholder="输入您的问题...">
        <button onclick="askQuestion()">提问</button>
    </div>
    <div id="loading" class="loading">正在思考中...</div>
    <div id="answer-container"></div>

    <script>
        let answerBuffer = '';
        let isStreaming = false;
        let eventSource = null;

        function askQuestion() {
            if (isStreaming) return;
            
            const query = document.getElementById('query-input').value;
            if (!query) return;

            // 初始化界面
            const container = document.getElementById('answer-container');
            const loading = document.getElementById('loading');
            container.innerHTML = '<div class="answer-text"></div><span class="cursor"></span>';
            loading.style.display = 'block';
            answerBuffer = '';
            isStreaming = true;

            // 关闭已有连接
            if (eventSource) eventSource.close();

            eventSource = new EventSource(`/ask?q=${encodeURIComponent(query)}`);

            eventSource.onmessage = (e) => {
                loading.style.display = 'none';
                try {
                    const data = JSON.parse(e.data);
                    
                    switch(data.type) {
                        case 'answer':
                            answerBuffer += data.content;
                            updateAnswerDisplay();
                            break;
                            
                        case 'webpage':
                            container.innerHTML += `
                                <div class="webpage-card">
                                    <a href="${data.content.url}" target="_blank" class="webpage-title">
                                        ${data.content.title}
                                    </a>
                                    <p class="webpage-snippet">${data.content.snippet}</p>
                                </div>`;
                            break;
                            
                        // 其他类型处理...
                    }
                } catch (error) {
                    console.error('数据处理错误:', error);
                }
            };

            eventSource.onerror = () => {
                isStreaming = false;
                loading.style.display = 'none';
                eventSource.close();
            };
        }

        function updateAnswerDisplay() {
            const answerDiv = document.querySelector('.answer-text');
            if (answerDiv) {
                answerDiv.textContent = answerBuffer;
                // 自动滚动
                answerDiv.scrollIntoView({ behavior: 'smooth', block: 'end' });
            }
        }

        function followUp(question) {
            document.getElementById('query-input').value = question;
            askQuestion();
        }
    </script>
</body>
</html>

总结:为什么选择博查AI Search API?

  1. 全链路支持:从搜索到生成一站式解决
  2. 成本效益:比自建爬虫节省70%开发成本
  3. 合规保障:所有结果均来自合法公开数据源

下一步计划

  • 实现语音输入
  • 实现跨模态问答(如“显示北京天气的卫星云图”)

Q&A:欢迎在评论区留言讨论,遇到问题可提供完整错误日志以便快速定位!

🔗 系列文章推荐:

【实战教程】用博查Web Search API构建定制化搜索引擎:从零到一打造专属搜索工具!

【极简实战】用博查AI Search + Python+DeepSeek打造你的私有知识库!无需Elasticsearch,三步搞定RAG!

【硬核教程】博查AI Search API实战:全网搜索+智能问答+多模态卡片一键集成!AI时代的搜索引擎!

让AI告别“古董知识库”,拥抱实时智能新时代!🚀

更多推荐