氢能站Agent系列1 —— Function Calling

2026-06-09

今天来给氢能管理系统 chatbot 做了一个智能路由:用 DeepSeek 的 function calling 判断"这个问题该不该查业务数据库",而不是用关键词匹配去猜。

效果很直接——问"现在有多少个氢能站点?"会去真正查数据库然后回答;问"你是谁?"就直接让 DeepSeek 回答,不会傻乎乎地跑一条 SQL。顺手把这个机制从头到尾整理一遍,力求讲清楚。


先说清楚"function calling"到底是啥

名字有点误导——function calling 和"调函数"其实没什么关系

更准确的描述是:你给模型一份"你有什么能力"的菜单,模型看完用户的问题,告诉你"这题需要用第 3 道菜"——具体怎么做那道菜、做完之后把菜端上来,都是你自己的事,模型不管。

所以整个流程是这样的:

你                          DeepSeek
 │                               │
 │── 问题 + 工具菜单 ──────────> │
 │                               │  "这个问题要查数据"
 │ <──── tool_calls ───────────  │
 │                               │
 │  (你自己去查数据库)         │
 │                               │
 │── 查询结果回填 ─────────────> │
 │                               │  "目前系统有 10 个站点"
 │ <──── 最终回答 ─────────────  │

如果模型觉得直接回答就够了,就不会有 tool_calls,直接给你 content


协议长什么样——先看 JSON

在写代码之前,先把请求和响应的结构搞清楚,后面就不会懵了。

请求:你给模型的"菜单"

比标准的 chat completions 多了两个字段:tools(菜单)和 tool_choice(让模型自己决定还是强制点某道菜)。

POST /v1/chat/completions

{
  "model": "deepseek-v4-flash",
  "messages": [
    { "role": "user", "content": "现在系统里有多少个氢能站点?" }
  ],
  "tools": [
    {
      "type": "function",
      "function": {
        "name": "query_hydrogen_business_data",
        "description": "查询本系统(氢能业务管理平台)的真实业务数据,例如站点储量、加氢订单等。仅当问题需要查询本系统数据时调用;闲聊不要调用。",
        "parameters": {
          "type": "object",
          "properties": {
            "question": {
              "type": "string",
              "description": "改写后的、清晰描述待查询数据的自然语言问题"
            }
          },
          "required": ["question"]
        }
      }
    }
  ],
  "tool_choice": "auto",
  "temperature": 0.0
}

tool_choice: "auto" 的意思是"你自己判断要不要调用",这是我们路由场景想要的。如果写 "none" 则禁止调用,强制直接回答;写 {"type":"function","function":{"name":"xxx"}} 则强制调用某个具体工具。


响应 A:模型决定调用工具

contentnull,多了一个 tool_calls 数组:

{
  "choices": [{
    "message": {
      "role": "assistant",
      "content": null,
      "tool_calls": [{
        "id": "call_abc123",
        "type": "function",
        "function": {
          "name": "query_hydrogen_business_data",
          "arguments": "{\"question\": \"系统中氢能站点的总数\"}"
        }
      }]
    }
  }]
}

注意 arguments 是一个 JSON 字符串,不是对象——需要再 JSON.parse 一次才能拿到参数值。这是个容易踩的坑。


响应 B:模型决定直接回答

普通 chat 响应,没有 tool_calls

{
  "choices": [{
    "message": {
      "role": "assistant",
      "content": "你好!我是 DeepSeek,由中国的 DeepSeek 公司开发的 AI 助手……"
    }
  }]
}

工具执行后的"回填"请求

这是第二次请求,需要把"模型决定调用工具"这件事和"工具执行结果"都拼进对话历史:

{
  "model": "deepseek-v4-flash",
  "messages": [
    { "role": "user", "content": "现在系统里有多少个氢能站点?" },

    // 第一次响应里模型说的"我要调用工具"
    {
      "role": "assistant",
      "content": null,
      "tool_calls": [{ "id": "call_abc123", ... }]
    },

    // 你执行工具后的结果,id 必须和上面对应
    {
      "role": "tool",
      "tool_call_id": "call_abc123",
      "content": "{\"sql\": \"SELECT COUNT(*) FROM stations\", \"rows\": [{\"count\": 10}]}"
    }
  ]
}

模型会基于这份完整历史,生成最终的自然语言回答。


代码实现:从简到繁

Step 1:最小可运行版(curl 验证思路)

在写 Rust/Python 之前,先用 curl 确认流程是对的:

curl -s https://api.deepseek.com/v1/chat/completions \
  -H "Authorization: Bearer $DEEPSEEK_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "deepseek-v4-flash",
    "messages": [{"role":"user","content":"你好,今天天气怎么样?"}],
    "tools": [{
      "type": "function",
      "function": {
        "name": "get_weather",
        "description": "获取某地的实时天气数据",
        "parameters": {
          "type":"object",
          "properties":{"city":{"type":"string"}},
          "required":["city"]
        }
      }
    }],
    "tool_choice": "auto",
    "temperature": 0
  }'

"今天天气怎么样"没有指定城市,模型可能直接回答说"请告诉我城市",也可能调用工具问北京——看模型的判断。换成"北京今天天气怎么样",大概率会触发 tool_calls


Step 2:Rust 类型定义——协议类型层(openai.rs

先把协议里的 JSON 结构映射成 Rust 类型。这些类型会贯穿整个实现:

// openai.rs

use serde::{Deserialize, Serialize};

/// 对话消息,涵盖 user/assistant/tool 三种角色
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChatMessage {
    pub role: String,
    // assistant 直接回答时有 content;调用工具时 content 为 null
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub content: Option<String>,
    // assistant 角色、调用工具时才有这个字段
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub tool_calls: Option<Vec<ToolCall>>,
    // tool 角色才有,对应 assistant 消息里的 tool_calls[].id
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub tool_call_id: Option<String>,
}

impl ChatMessage {
    /// 构造"工具执行结果"消息,role=tool,tool_call_id 对应上
    pub fn tool_result(tool_call_id: impl Into<String>, content: impl Into<String>) -> Self {
        Self {
            role: "tool".into(),
            content: Some(content.into()),
            tool_calls: None,
            tool_call_id: Some(tool_call_id.into()),
        }
    }
    /// 构造 assistant 的"我决定调用这些工具"消息
    pub fn assistant_with_tool_calls(tool_calls: Vec<ToolCall>) -> Self {
        Self {
            role: "assistant".into(),
            content: None,
            tool_calls: Some(tool_calls),
            tool_call_id: None,
        }
    }
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ToolCall {
    pub id: String,             // 这个 id 要在回填时原样带回去
    #[serde(rename = "type", default = "default_function_type")]
    pub kind: String,           // 目前只有 "function"
    pub function: FunctionCall,
}

fn default_function_type() -> String { "function".into() }

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FunctionCall {
    pub name: String,
    #[serde(default)]
    pub arguments: String,  // JSON 字符串!不是对象
}

/// 你给模型的"工具菜单条目"
#[derive(Clone, Debug, Serialize)]
pub struct ToolDef {
    #[serde(rename = "type")]
    pub kind: &'static str,       // 固定 "function"
    pub function: FunctionDef,
}

#[derive(Clone, Debug, Serialize)]
pub struct FunctionDef {
    pub name: &'static str,
    pub description: &'static str,
    pub parameters: serde_json::Value,  // JSON Schema 对象
}

Step 3:DeepSeek 客户端——两个关键方法(llm.rs

路由判断和最终生成各用一个方法:

// llm.rs(节选)

impl DeepSeek {
    /// 路由判断:非流式,传入工具定义,返回 assistant 消息。
    /// 返回的消息要么有 content(直接回答),要么有 tool_calls(需要执行工具)。
    pub async fn chat_with_tools(
        &self,
        messages: &[ChatMessage],
        tools: &[ToolDef],
    ) -> Result<ChatMessage> {
        let url = format!("{}/v1/chat/completions", self.base_url.trim_end_matches('/'));

        let body = serde_json::json!({
            "model": &self.model,
            "messages": messages,
            "tools": tools,
            "tool_choice": "auto",
            "stream": false,
            "temperature": 0.0   // 路由判断要稳,不需要随机性
        });

        let resp = self.client.post(&url)
            .bearer_auth(&self.api_key)
            .json(&body)
            .send().await?
            .json::<serde_json::Value>().await?;

        // 取出 choices[0].message
        let message: ChatMessage = serde_json::from_value(
            resp["choices"][0]["message"].clone()
        )?;

        Ok(message)
    }

    /// 最终生成:流式,接受完整的多轮 messages(含 tool 角色回填),返回文本增量流。
    /// 工具路由命中后的"生成自然语言回答"走这里;通用问题也走这里。
    pub async fn chat_messages_stream(
        &self,
        messages: &[ChatMessage],
    ) -> Result<BoxStream<'static, Result<String>>> {
        let url = format!("{}/v1/chat/completions", self.base_url.trim_end_matches('/'));

        let body = serde_json::json!({
            "model": &self.model,
            "messages": messages,
            "stream": true,
            "temperature": 0.3   // 生成回答要自然,适当放开随机性
        });

        let resp = self.client.post(&url)
            .bearer_auth(&self.api_key)
            .json(&body)
            .send().await?;

        // 把 SSE 字节流解析成纯文本增量
        Ok(Box::pin(parse_sse(resp.bytes_stream())))
    }
}

注意两个方法的 temperature 不同——这是分两次请求的额外收益:路由判断和最终生成可以独立调参。


Step 4:核心路由逻辑(chat.rs

这是最关键的部分,串起上面所有的东西:

// chat.rs

pub async fn chat(
    State(state): State<AppState>,
    Json(req): Json<ChatCompletionRequest>,  // 前端传来的 { messages: [...] }
) -> Result<Response, AppError> {

    // ── 第一步:定义工具菜单 ──────────────────────────────────────────
    let tools = [ToolDef {
        kind: "function",
        function: FunctionDef {
            name: "query_hydrogen_business_data",
            description: "查询本系统(氢能业务管理平台)数据库中的真实业务数据,\
                          例如站点储量、能源现价、加氢订单、调度单、车辆能耗等。\
                          仅当用户问题需要查询本系统业务数据时才调用;\
                          常识性、闲聊、通用知识类问题不要调用。",
            parameters: serde_json::json!({
                "type": "object",
                "properties": {
                    "question": {
                        "type": "string",
                        "description": "改写后的、清晰描述待查询业务数据的自然语言问题"
                    }
                },
                "required": ["question"],
            }),
        },
    }];

    // ── 第二步:第一次非流式请求,让模型做路由判断 ──────────────────
    let routing = state.llm.chat_with_tools(&req.messages, &tools).await?;

    // 有 tool_calls 且工具名匹配 → 命中,需要查业务数据
    let tool_call = routing
        .tool_calls.as_ref()
        .and_then(|calls| calls.iter().find(|c| c.function.name == "query_hydrogen_business_data"))
        .cloned();

    // ── 第三步:根据路由结果决定走哪条路 ─────────────────────────────
    let stream = async_stream::stream! {
        let final_messages: Vec<ChatMessage> = match &tool_call {

            // ✅ 命中:执行工具,把查询结果回填进对话
            Some(call) => {
                // 从 arguments(JSON 字符串)里解析出 question 参数
                let question = serde_json::from_str::<serde_json::Value>(&call.function.arguments)
                    .ok()
                    .and_then(|v| v["question"].as_str().map(str::to_string))
                    .unwrap_or_else(|| {
                        // 解析失败时兜底:用用户最后说的那句话
                        req.messages.iter().rev()
                            .find(|m| m.role == "user")
                            .and_then(|m| m.content.clone())
                            .unwrap_or_default()
                    });

                // 真正执行工具:NL → SQL → 查库 → JSON
                let tool_content = match run_sql_pipeline(&state, &question).await {
                    Ok((sql, rows_json)) => format!(
                        r#"{{"sql": {}, "rows": {}}}"#,
                        serde_json::to_string(&sql).unwrap_or_default(),
                        rows_json,
                    ),
                    Err(e) => serde_json::json!({ "error": e }).to_string(),
                };

                // 拼接完整对话历史:原始问题 + 助手的"调用决定" + 工具执行结果
                let mut messages = req.messages.clone();
                messages.push(ChatMessage::assistant_with_tool_calls(vec![call.clone()]));
                messages.push(ChatMessage::tool_result(call.id.clone(), tool_content));
                messages
            }

            // ✅ 未命中:原样转发,直接流式生成通用回答
            None => req.messages.clone(),
        };

        // ── 第四步:第二次流式请求,生成最终回答 ────────────────────
        // 两条分支(命中/未命中)在这里汇合,统一流式输出
        let mut tokens = match state.llm.chat_messages_stream(&final_messages).await {
            Ok(s) => s,
            Err(e) => {
                // 生成失败,给前端一个错误提示,然后正常结束流
                yield Ok(Event::default().data(delta_chunk(&format!("生成失败:{e}"))));
                yield Ok(Event::default().data("[DONE]"));
                return;
            }
        };

        while let Some(Ok(text)) = tokens.next().await {
            // delta_chunk() 把文本片段封装成 OpenAI streaming chunk 格式
            // {"choices":[{"delta":{"content":"..."},"index":0}]}
            yield Ok(Event::default().data(delta_chunk(&text)));
        }
        yield Ok(Event::default().data("[DONE]"));
    };

    // 返回 SSE 响应,前端 OpenAIChatProvider 能直接消费
    Ok(Sse::new(Box::pin(stream)).into_response())
}

把这两次请求画成时序图

前端 chatbot          /chat (POST)         DeepSeek API           PostgreSQL
     │                     │                    │                      │
     │─ {messages:[...]} ->│                    │                      │
     │                     │── 第一次请求 ───>  │                      │
     │                     │   非流式,带 tools │                      │
     │                     │                    │  判断:要查数据!    │
     │                     │<── tool_calls ───  │                      │
     │                     │                    │                      │
     │                     │───── NL→SQL ────>  │                      │
     │                     │<─── SQL ────────── │                      │
     │                     │─────────────────────────── 执行 SQL ────> │
     │                     │<─────────────────────────── 查询结果 ──── │
     │                     │                    │                      │
     │                     │── 第二次请求 ───>  │                      │
     │                     │   流式,含工具结果 │                      │
     │<── token token token│<── stream ───────  │                      │
     │    [DONE]           │                    │                      │

总共两次 LLM 请求 + 一次数据库查询。

对比"未命中"的路径(纯闲聊):

前端 chatbot          /chat (POST)         DeepSeek API
     │                     │                    │
     │── {messages:[...]} ->│                    │
     │                     │── 第一次请求 ───>  │
     │                     │<── content:"你好…" │  (没有 tool_calls,直接有内容)
     │                     │                    │
     │                     │── 第二次请求 ───>  │
     │<── token token token │<── stream ─────── │

还是两次 LLM 请求,只是省掉了数据库那一趟。


几个容易踩的坑

坑 1:arguments 是字符串,不是对象

// ❌ 错误:直接把 arguments 当 Value 用
let question = call.function.arguments["question"];

// ✅ 正确:先 parse,再取字段
let args: Value = serde_json::from_str(&call.function.arguments)?;
let question = args["question"].as_str().unwrap_or_default();

坑 2:tool_call_id 必须和 id 对应

assistant 消息里的 tool_calls[].id(比如 "call_abc123")和随后 tool 消息里的 tool_call_id 必须完全一致。模型靠这个 id 把"调用请求"和"执行结果"对应起来。写错了,模型会困惑,或者干脆报错。

坑 3:路由判断和流式生成无法合并成一次请求

这是架构层面的硬约束:

  • 流式响应(SSE)是增量到达的,等所有 chunk 拼完才能知道有没有 tool_calls
  • 在拼完之前,你没法决定"该不该查数据库"
  • 所以:先发一次非流式请求拿路由结果,再发一次流式请求生成回答——这是 function calling + streaming 的标准做法

坑 4:description 直接影响路由准确率

description 写得越精确,模型判断越准。"查询本系统的业务数据""有用的工具" 好很多。建议加上几个典型例子,也加上"不适用"的情况("闲聊、通用知识类问题不要调用")——这个负向说明非常有效,能防止模型"什么都往工具上靠"。

坑 5:temperature=0 for 路由判断

路由判断要稳定可复现,同一个问题问 10 次应该得到同样的路由结果。temperature=0(或尽量低)能保证这一点。最终生成回答才需要适当的随机性,让语言更自然。


为什么不用关键词匹配或意图分类

我最开始也想过用正则:包含"多少个""查询""站点"这些词就走数据库,其他走通用回答。

但这个方案很快就会撑不住:

  • "帮我看看最近的补氢调度" → 没有关键词但要查库
  • "一共有多少种加氢方式" → 有"多少"但这是常识问题
  • "补氢速率计算公式是什么" → 要查专业知识还是查数据库?

维护这套规则会越来越累,而且永远有边界情况。

function calling 把这个判断交给了模型本身,只要 description 写得好,它能基于语义理解做出准确判断——这才是真正适合 AI 时代的"路由器"。


下一步

现在的实现是单轮的——每次请求都是独立的,没有上下文。"那上个月的数据呢?"这类追问会丢失上下文,SQL 生成会失败。

多轮追问的支持需要在 function-calling 路由里引入对话历史管理,后面继续做。