氢能站Agent系列4 -- 第二个 Agent

2026-06-18

这是一个多 Agent 系统,在开始第二个 Agent 之前, 把现在 Agent 工程代码整体再浏览一遍。


先搞清楚现在站在哪里

analytics-agent/src/chat.rs,发现 /chat 接口并不是直接把问题塞给 LLM:

// 第一次非流式请求:让 DeepSeek 判断是否需要调用工具
let routing = state.llm.chat_with_tools(&req.messages, &tools).await?;
let tool_call = routing.tool_calls.as_ref().and_then(|calls| {
    calls.iter().find(|c| {
        c.function.name == QUERY_TOOL || c.function.name == DISPATCH_TOOL
    })
});

每次对话,先把历史消息 + 工具定义一起发给 DeepSeek,让它决定:这个问题需要查数据库,还是直接闲聊?

命中工具就走数据管线(NL→SQL→结果→流式回答),没命中就直接流式回答。用户问"我们有几个站",DeepSeek 调 query_hydrogen_business_data,查的是真实数据库。这已经是工具调用了,不是 RAG,不是 few-shot。

现有工具一共两个:

工具干什么
query_hydrogen_business_dataNL→SQL,查任意业务数据
plan_hydrogen_dispatch调度规划,返回候选车+候选罐

这个结论影响了后续所有决策:既然工具调用已经通了,Multi-Agent 的增量价值在于分治,而不在于「能查数据」这件事。单 Agent 跑不好的场景,才值得拆。


改动一:mp-agent 消费报表接真实数据

mp-agent 是给小程序车主用的。扫到 consumption_report 函数:

fn consumption_report(q: &str) -> String {
    let period = if q.contains("上月") { "上月" } else { "本月" };
    format!(
        "<b>{period}</b>加氢消费摘要:<br>\
        累计加注 <b>32 kg</b><br>\
        合计支出 <b>¥1,241.60</b><br>\
        均价 <b>¥38.80/kg</b><br>\
        出行 <b>11 次</b><br>\
        最常去:虹桥示范站"
    )
}

硬编码假数据。这段代码大概率是占位符,得先改一下。

改法分两步。

第一步,ChatBodyuid 字段——小程序端知道当前用户是谁,把 uid 传过来:

#[derive(Deserialize)]
pub struct ChatBody {
    pub q: String,
    pub lat: Option<f64>,
    pub lng: Option<f64>,
    pub uid: Option<String>,   // 新增:车主 users.uid
    pub history: Vec<HistMsg>,
}

第二步,consumption_report 改成异步函数,查 refuel_orders 真实记录:

async fn consumption_report(state: &AppState, uid: &str, q: &str) -> String {
    if uid.is_empty() {
        return "请先登录,再查看你的消费记录。".into();
    }

    let (start, end, label) = period_range(q);  // 按关键词算时间范围

    let summary = sqlx::query_as::<_, ConsumptionSummary>(
        "SELECT
           CAST(COUNT(*) AS bigint) AS trips,
           CAST(COALESCE(SUM(actual_h2_kg), 0) AS float8) AS total_kg,
           CAST(COALESCE(SUM(total_amount), 0) AS float8) AS total_amount,
           CAST(CASE WHEN SUM(actual_h2_kg) > 0
                THEN SUM(total_amount) / SUM(actual_h2_kg)
                ELSE 0 END AS float8) AS avg_price
         FROM refuel_orders
         WHERE driver_user_id = $1
           AND order_status = 'completed'
           AND completed_at >= $2
           AND completed_at < $3
           AND deleted_at IS NULL",
    )
    .bind(uid).bind(start).bind(end)
    .fetch_one(&state.pool)
    .await;
    // ...
}

时间段的计算用 chrono

fn period_range(q: &str) -> (DateTime<Utc>, DateTime<Utc>, &'static str) {
    let now = Utc::now();
    if q.contains("上月") || q.contains("上个月") {
        let (year, month) = if now.month() == 1 {
            (now.year() - 1, 12u32)
        } else {
            (now.year(), now.month() - 1)
        };
        let start = Utc.with_ymd_and_hms(year, month, 1, 0, 0, 0).single().unwrap_or(now);
        let end   = Utc.with_ymd_and_hms(now.year(), now.month(), 1, 0, 0, 0).single().unwrap_or(now);
        (start, end, "上月")
    } else if q.contains("近七天") || q.contains("近7天") {
        (now - Duration::days(7), now, "近 7 天")
    } else {
        let start = Utc.with_ymd_and_hms(now.year(), now.month(), 1, 0, 0, 0).single().unwrap_or(now);
        (start, now, "本月")
    }
}

最常去哪个站,再发一条查询:

let top_station: Option<String> = sqlx::query_scalar(
    "SELECT station_name FROM refuel_orders
     WHERE driver_user_id = $1
       AND order_status = 'completed'
       AND completed_at >= $2 AND completed_at < $3
       AND deleted_at IS NULL AND station_name != ''
     GROUP BY station_name ORDER BY COUNT(*) DESC LIMIT 1",
)
.bind(uid).bind(start).bind(end)
.fetch_optional(&state.pool).await
.unwrap_or(None);

这次就算用户加氢记录真的是 0 条,也会如实返回"本月暂无加氢记录",而不是永远给同一个假数字。


改动二:station-agent 的定位转了一次

原计划:新建 station-agent crate,定时扫描低库存站点,写入 supply_alert_events

写完 monitor.rs,准备提交,顺手看了眼 mng-api 的 supply_chain.go

func (s *supplyChainService) StartJobs() {
    go func() {
        s.ScanAlerts()
        t := time.NewTicker(2 * time.Minute)
        defer t.Stop()
        for range t.C {
            s.ScanAlerts()  // 每 2 分钟扫一次
        }
    }()
    // ...
}

ScanAlerts() 里:查各站点真实储量,扣掉近期已锁定的预约预估消耗,低于安全下限就写 supply_alert_events逻辑比我写的还要精细——它还会估算「几小时后跌破下限」,区分 warningurgent

如果我的 station-agent 也往同一张表写,预警就会重复,运营人员每次看到的都是两条一样的记录。

所以改了定位:station-agent 不做预警检测,改为响应预警

mng-api 扫出预警,但它不会自动建 supply_request(补给需求单),还是要运营人员手动操作。station-agent 填补这个空白——轮询 supply_alert_eventsstatus=active 且没有 supply_request_id 的记录,自动建草稿单。

async fn find_pending_alerts(pool: &PgPool) -> anyhow::Result<Vec<PendingAlert>> {
    sqlx::query_as::<_, PendingAlert>(
        "SELECT event_id, station_id, enterprise_id
         FROM supply_alert_events
         WHERE status = 'active'
           AND (supply_request_id IS NULL OR supply_request_id = '')
           AND deleted_at IS NULL
         ORDER BY created_at ASC
         LIMIT 20",
    )
    .fetch_all(pool).await
    .map_err(Into::into)
}

建草稿单有个麻烦:需求单号是 SRQ_yyyyMMdd0001 格式的流水号,两个 goroutine/异步任务同时写就会冲突。

解法是 PostgreSQL advisory lock

let mut tx = pool.begin().await?;

// 事务内的 advisory lock,事务结束自动释放
// 所有写需求单的路径都用同一个 lock key,串行化编号生成
sqlx::query("SELECT pg_advisory_xact_lock(hashtext('supply_request_no'))")
    .execute(&mut *tx).await?;

let request_no: String = sqlx::query_scalar(
    "SELECT 'SRQ_' || TO_CHAR(NOW(), 'YYYYMMDD') || LPAD(
         (SELECT COALESCE(COUNT(*), 0) + 1
          FROM supply_requests
          WHERE request_no LIKE 'SRQ_' || TO_CHAR(NOW(), 'YYYYMMDD') || '%'
            AND deleted_at IS NULL
         )::text, 4, '0'
     )",
)
.fetch_one(&mut *tx).await?;

hashtext('supply_request_no') 把字符串变成整数用作 lock key,同一个 key 在同一时刻只有一个事务能持有。不同服务(station-agent 和 mng-api 自己)都用这个 key,编号就不会撞车。

插完需求单,把预警状态改成 converted 并写入 supply_request_id,下次轮询就不会重复处理:

sqlx::query(
    "UPDATE supply_alert_events
     SET supply_request_id = $1, status = 'converted', updated_at = NOW()
     WHERE event_id = $2",
)
.bind(&request_id).bind(&alert.event_id)
.execute(&mut *tx).await?;

tx.commit().await?;

这样整条链路变成:

mng-api ScanAlerts()(每 2 分钟)
   → 写 supply_alert_events(status=active)

station-agent(每 120 秒轮询)
   → 发现 active + 无 supply_request 的预警
   → 自动建草稿 supply_request
   → 预警更新为 converted

运营人员
   → 在管理端看到草稿单,选工厂、确认价格,提交

关于 Multi-Agent,今天得到的结论

Multi-Agent 不是「多写几个 crate」,而是分治:把超出单 Agent 可靠处理范围的任务,拆给专注的多个角色。

衡量标准很简单:如果一句话能说清楚某个 Agent 的职责边界,它就值得存在。

  • analytics-agent:把自然语言翻译成 SQL,查业务数据 ✓
  • mp-agent:给车主回答附近站点和个人消费 ✓
  • station-agent:响应低库存预警,自动建补给草稿单 ✓

三个 Agent,三条清晰的边界。还没动手的「Orchestrator」要等到一个问题需要同时跨越两个以上 Agent 时再说——过早的路由层只会让调试变成噩梦。

下一步:让 analytics-agent 的调度工具从「只读建议」变成「确认后落单」——这才是 Agent 从「说话」到「做事」的关键一跳。