氢能站Agent系列5 -- 客户端(小程序)Agent Tool Call

2026-06-23

小程序 AI 助手(mp-agent)从"关键词匹配"升级成了真正的 Tool Call 驱动。

以前用户问"哪里能加氢?"触发不了,现在可以正确识别并查数据库。


Tool Call 的思路

LLM 本身支持"工具调用"能力:你告诉它有哪些工具可用、每个工具干什么,它自己判断要不要调、调哪个、传什么参数。

整个流程变成两阶段:

用户问题

[第一次 LLM 调用] 非流式,携带工具定义
   ↓ finish_reason = "tool_calls"
执行工具(查数据库)

[第二次 LLM 调用] 流式,携带工具结果

流式回答推给用户

如果第一次 LLM 判断不需要工具(比如问"氢燃料电池原理"),直接把文字答案返回,跳过数据库,也是流式输出。


核心类型设计

先把消息结构统一成一个 OwnedMsg,能表达所有角色:

#[derive(Serialize, Clone)]
pub struct OwnedMsg {
    pub role: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub content: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub tool_calls: Option<Vec<ToolCall>>,   // assistant 返回工具调用时用
    #[serde(skip_serializing_if = "Option::is_none")]
    pub tool_call_id: Option<String>,        // tool 角色回传结果时用
}

impl OwnedMsg {
    pub fn system(content: impl Into<String>) -> Self { ... }
    pub fn user(content: impl Into<String>) -> Self { ... }
    pub fn assistant_tool_calls(tool_calls: Vec<ToolCall>) -> Self { ... }
    pub fn tool(id: impl Into<String>, content: impl Into<String>) -> Self { ... }
}

工具定义也很简单,就是名字 + 描述 + JSON Schema 参数:

fn tool_defs() -> Vec<ToolDef> {
    vec![
        ToolDef {
            kind: "function",
            function: FunctionDef {
                name: "find_nearby_stations",
                description: "查询用户附近的加氢站,返回距离、价格、压力等级、空闲枪数",
                parameters: serde_json::json!({
                    "type": "object",
                    "properties": {},
                    "required": []
                }),
            },
        },
        ToolDef {
            kind: "function",
            function: FunctionDef {
                name: "get_consumption_report",
                description: "查询当前登录用户的加氢消费记录汇总",
                parameters: serde_json::json!({
                    "type": "object",
                    "properties": {
                        "period": {
                            "type": "string",
                            "enum": ["this_month", "last_month", "last_7_days"],
                            "description": "查询周期"
                        }
                    },
                    "required": ["period"]
                }),
            },
        },
    ]
}

注意 find_nearby_stations 的参数是空的——经纬度从 HTTP 请求里拿,不经过 LLM,这样更安全;get_consumption_report 的 uid 也是服务端注入,LLM 只负责判断查哪个时间段。


LLM 客户端:两个新方法

llm.rs 里新增两个方法,替代原来的 stream_messages

chat_with_tools — 非流式,检测工具意图:

pub async fn chat_with_tools(
    &self,
    messages: &[OwnedMsg],
    tools: &[ToolDef],
) -> Result<ToolCallOutcome> {
    // 发请求,temperature=0.0 让判断更确定
    let resp: ToolChatResp = self.client
        .post(self.url())
        .bearer_auth(&self.api_key)
        .json(&ToolChatReq { model: &self.model, messages, tools,
                             tool_choice: "auto", stream: false, temperature: 0.0 })
        .send().await?
        .json().await?;

    let choice = resp.choices.into_iter().next()?;

    if choice.finish_reason == "tool_calls" && !choice.message.tool_calls.is_empty() {
        let calls = choice.message.tool_calls;
        Ok(ToolCallOutcome::ToolCalls {
            assistant_msg: OwnedMsg::assistant_tool_calls(calls.clone()),
            calls,
        })
    } else {
        Ok(ToolCallOutcome::Text(choice.message.content.unwrap_or_default()))
    }
}

stream_with_history — 流式,带完整上下文(含工具结果)生成最终回答:

pub async fn stream_with_history(
    &self,
    messages: &[OwnedMsg],
) -> Result<BoxStream<'static, Result<String>>> {
    let resp = self.client
        .post(self.url())
        .bearer_auth(&self.api_key)
        .json(&StreamReq { model: &self.model, messages, stream: true, temperature: 0.3 })
        .send().await?;
    Ok(Box::pin(parse_sse(resp.bytes_stream())))
}

Chat Handler:两段式流程

pub async fn chat(State(state): State<AppState>, Json(body): Json<ChatBody>)
    -> Result<Response, AppError>
{
    // 组装消息历史
    let mut msgs = vec![OwnedMsg::system(SYSTEM)];
    for h in &body.history { /* 追加历史 */ }
    msgs.push(OwnedMsg::user(&question));

    // 第一阶段:让 LLM 决策
    let outcome = state.llm.chat_with_tools(&msgs, &tool_defs()).await?;

    match outcome {
        // 不需要工具,直接返回
        ToolCallOutcome::Text(text) => {
            let stream = async_stream::stream! {
                yield sse(&text.replace('\n', "<br>"));
                yield sse("[DONE]");
            };
            Ok(make_sse(stream))
        }

        // 需要工具:执行 → 注入结果 → 流式生成答案
        ToolCallOutcome::ToolCalls { assistant_msg, calls } => {
            msgs.push(assistant_msg);
            for call in &calls {
                let result = dispatch_tool(&state, &call.function.name,
                                          &call.function.arguments,
                                          body.lat, body.lng, &uid).await;
                msgs.push(OwnedMsg::tool(call.id.clone(), result));
            }

            let mut tokens = state.llm.stream_with_history(&msgs).await?;
            let stream = async_stream::stream! {
                while let Some(Ok(t)) = tokens.next().await {
                    if !t.is_empty() { yield sse(&t.replace('\n', "<br>")); }
                }
                yield sse("[DONE]");
            };
            Ok(make_sse(stream))
        }
    }
}

关键点:两次 LLM 调用都在进入 SSE stream 之前完成,所以第一次调用失败可以直接返回 HTTP 错误,而不是把错误混进流里。


工具实现:把数据查出来交给 LLM 组织语言

find_nearby_stations 的工作就是查 DB 返回原始文本,LLM 负责把它变成自然语言:

async fn query_stations(pool: &PgPool) -> Result<Vec<StationRow>> {
    let mut tx = pool.begin().await?;
    // 设置 RLS 会话变量,让当前事务能看到所有企业的站点
    sqlx::query("SELECT set_config('app.enterprise_id', '*', true)")
        .execute(&mut *tx).await?;

    let rows = sqlx::query_as::<_, StationRow>(r#"
        SELECT
            s.station_name AS name,
            CAST(s.latitude  AS float8) AS lat,
            CAST(s.longitude AS float8) AS lng,
            CAST(COALESCE(g.rated_pressure, 0) AS float8) AS pressure,
            CAST(COALESCE(p.price, 0) AS float8) AS price,
            CAST(COALESCE(gc.free,  0) AS int8) AS free_guns,
            CAST(COALESCE(gc.total, 0) AS int8) AS total_guns
        FROM stations s
        LEFT JOIN LATERAL (
            SELECT MAX(rated_pressure) FROM station_guns
            WHERE station_id = s.station_id AND deleted_at IS NULL
        ) g ON true
        LEFT JOIN LATERAL (
            SELECT price FROM prices
            WHERE owner_id = s.station_id AND owner_type = 'station'
              AND deleted_at IS NULL AND status = 1
            ORDER BY effective_date DESC LIMIT 1
        ) p ON true
        LEFT JOIN (
            SELECT station_id,
                COUNT(*) AS total,
                COUNT(*) FILTER (WHERE status = 'idle') AS free
            FROM station_guns WHERE deleted_at IS NULL GROUP BY station_id
        ) gc ON gc.station_id = s.station_id
        WHERE s.deleted_at IS NULL
    "#).fetch_all(&mut *tx).await?;

    tx.commit().await?;
    Ok(rows)
}

查出来的数据格式化成这样的文本再交给 LLM:

找到最近 5 个氢站:
1. 广州越秀区政府配套氢能站 — 0.7km · ¥30.50/kg · 35MPa · 空闲 4/7
2. 广州白云机场配套氢能站 — 5.7km · ¥30.00/kg · 35MPa · 空闲 5/9
...

LLM 拿到这个文本之后,会自己做推荐、排版、补充建议——这才是它该干的事。


实际效果

问:附近哪里能加氢
答:以下是您附近 5 个加氢站(按距离排序):

1. **广州越秀区政府配套氢能站** — 0.7 km,35MPa,¥30.50/kg,空闲 4/7
2. **广州白云机场配套氢能站** — 5.7 km,35MPa,¥30.00/kg,空闲 5/9
...

推荐距离近、价格低的 **越秀站** 或 **白云机场站**。
需要导航或查询消费记录请告诉我 😊

下一步

接下来打算做前端的小程序页面,把这个 SSE 接口接进去,让车主真正用起来。