此内容已使用自动机器翻译服务进行翻译,仅供您参考及阅读便利。其中可能包含错误、遗漏,或与原始英文版本存在理解方面的细微差别。如有疑问,请参考原始英文版本。
这是 Knock 首席技术官 Chris Bell 的客座帖子
现在有很多关于构建 AI 代理的讨论,但却很少讨论如何使这些代理真正有用。
代理是一种自治系统,旨在做出决策和执行行动以实现特定目标或一组目标,而无需人工输入。
无论您的智能体多么擅长决策,您都需要有人为它实现目标提供指导或建议。毕竟,一个无法与外界互动或响应的智能体及管理它的系统将是有限的,它能够解决的问题将会是有限的。
这就是“人机交互”交互模式的用武之地了。您让一个人进入智能体的控制循环,并要求人类进行输入,然后智能体才能继续执行其任务。
在本篇博客文章中,我们将使用 Knock 和 Cloudflare Agents SDK 为虚拟发卡工作流程构建 AI 代理,该工作流程在请求新信用卡时需要人工批准。
您可以在存储库中找到此示例的完整代码。
Knock 是一种消息传递基础设施,可让您用于在应用内、电子邮件、短信、推送和 Slack 中发送多渠道消息,无需编写任何集成代码。
使用 Knock,您可以完全了解发送给用户的消息,同时处理可靠交付、用户通知首选项等。
您可以使用 Knock 的 Agent Toolkit为您的智能体提供人机交互流程,这是一套工具,向您的 AI 智能体公开 Knock 的 API 和消息传递功能。
使用 Agent SDK 作为 AI 智能体的基础
Agents SDK 提供了一个抽象概念,用于在 Durable Object 之上构建有状态的实时代理,这些代理是全局可寻址的,并使用嵌入式、零延迟 SQLite 数据库持久状态。
使用 Agents SDK 和 Cloudflare 平台之外构建 AI 智能体意味着我们需要考虑 WebSocket 服务器、状态持久性以及如何水平扩展服务。由于 Durable Object 支持 Agents SDK,我们免费获得了这些好处,同时拥有一个全球可寻址的内置存储的计算资源,完全无服务器,可扩展至零。
在本示例中,我们将使用这些功能来构建一个代理。用户可以通过聊天与之实时交互,并且可以根据需要暂停和恢复聊天。Agents SDK 是支持异步智能体工作流程的理想平台,例如人机交互交互中所需的工作流程。
在 Knock 中,我们使用可视化工作流构建器来创建跨渠道消息传递逻辑,从而设计审批工作流。然后,我们制作与要将消息发送到的每个渠道相关联的通知模板。
作为工作流执行的一部分,Knock 会自动应用用户的首选项,确保用户的通知设置得到尊重。
您可以在存储库中找到我们已经为此演示创建的示例工作流程。您可以通过 Knock CLI 将这个工作流模板导入您的帐户,以使用该工作流模板。
我们在 Cloudflare Agents SDK 的 AIChatAgent 抽象基础上构建了这个聊天界面的 AI 智能体(文档)。Agents SDK 在这里解决了大部分复杂问题,剩下来就是利用系统提示词实现 LLM 调用代码。
// src/index.ts
import { AIChatAgent } from "agents/ai-chat-agent";
import { openai } from "@ai-sdk/openai";
import { createDataStreamResponse, streamText } from "ai";
export class AIAgent extends AIChatAgent {
async onChatMessage(onFinish) {
return createDataStreamResponse({
execute: async (dataStream) => {
try {
const stream = streamText({
model: openai("gpt-4o-mini"),
system: `You are a helpful assistant for a financial services company. You help customers with credit card issuing.`,
messages: this.messages,
onFinish,
maxSteps: 5,
});
stream.mergeIntoDataStream(dataStream);
} catch (error) {
console.error(error);
}
},
});
}
}
在客户端,我们使用 agents/ai-react 包中的 useAgentChat hook 来支持用户与智能体的实时聊天。
我们已将代理建模为每个用户的聊天,通过将进程的名称指定为 userId ,来使用 useAgent 钩子来设置代理。
// src/index.ts
import { useAgent } from "agents/react";
import { useAgentChat } from "agents/ai-react";
function Chat({ userId }: { userId: string }) {
const agent = useAgent({ agent: "AIAgent", name: userId });
const { messages, input, handleInputChange, handleSubmit, isLoading } = useAgentChat({ agent });
// ...
}
这意味着我们有一个代理进程,因此每个用户的对象也有一个持久化。对于我们的人机交互用例,这在稍后我们要恢复延迟的工具调用时会变得很重要。
我们通过公开 issueCard 工具,为代理提供我们的发卡能力。但是,我们没有自己编写审批流程和跨渠道逻辑,而是通过将问题卡工具包装在 requireHumanInput 方法中,将其完全委托给 Knock。
现在,当用户要求申请新卡片时,我们会调用 Knock 以发起我们的卡片请求,这将通知组织中相应的管理员以请求批准。
为此,我们需要使用 Knock 的智能体工具包,该工具包提供了在我们的 AI 智能体中与 Knock 协同工作的方法,并支持跨渠道消息传递。
import { createKnockToolkit } from "@knocklabs/agent-toolkit/ai-sdk";
import { tool } from "ai";
import { z } from "zod";
import { AIAgent } from "./index";
import { issueCard } from "./api";
import { BASE_URL } from "./constants";
async function initializeToolkit(agent: AIAgent) {
const toolkit = await createKnockToolkit({ serviceToken: agent.env.KNOCK_SERVICE_TOKEN });
const issueCardTool = tool({
description: "Issue a new credit card to a customer.",
parameters: z.object({
customerId: z.string(),
}),
execute: async ({ customerId }) => {
return await issueCard(customerId);
},
});
const { issueCard } = toolkit.requireHumanInput(
{ issueCard: issueCardTool },
{
workflow: "approve-issued-card",
actor: agent.name,
recipients: ["admin_user_1"],
metadata: {
approve_url: `${BASE_URL}/card-issued/approve`,
reject_url: `${BASE_URL}/card-issued/reject`,
},
}
);
return { toolkit, tools: { issueCard } };
}
这里包含很多内容,让我们过一下关键部分:
我们将 issueCard 工具包装在 requireHumanInput 方法中,由 Knock Agent 工具包公开
我们希望调用消息传递工作流,成为批准发放卡工作流
我们传递 Agent.name 作为请求的参与者,后者会转换为用户 ID
我们将此工作流的收件人设置为用户 admin_user_1
我们传递批准和拒绝 URL,以便在我们的消息模板中使用
包装的工具将作为issueCard返回
在底层,这些选项被传递到Knock 工作流触发器 API以调用每个收件人的工作流。此处列出的收件人集可以是动态的,或者转到通过Knock 的订阅 API的一组用户。
然后,我们可以将打包的发行卡工具传递给代理上 onChatMessage 方法中的 LLM 调用,以便可以在与代理进行交互的过程中调用工具调用。
export class AIAgent extends AIChatAgent {
// ... other methods
async onChatMessage(onFinish) {
const { tools } = await initializeToolkit(this);
return createDataStreamResponse({
execute: async (dataStream) => {
const stream = streamText({
model: openai("gpt-4o-mini"),
system: "You are a helpful assistant for a financial services company. You help customers with credit card issuing.",
messages: this.messages,
onFinish,
tools,
maxSteps: 5,
});
stream.mergeIntoDataStream(dataStream);
},
});
}
}
现在,当智能体调用issueCardTool 时,我们调用 Knock 来发送审批通知,从而推迟了工具调用以发行卡片,直到收到批准为止。Knock 的工作流程负责将消息发送到一组指定的收件人,根据每个用户的偏好生成并传递消息。
将 Knock 工作流程用于审批消息,可以轻松构建跨渠道消息传递,以根据用户的通信 偏好到达用户。我们还可以利用延迟、限流、批处理和条件来编排更复杂的消息传递。
消息发送给批准者后,下一步就是处理返回的批准,将人工交互带入智能体的循环中。
批准请求是异步的,这意味着响应可以在未来的任何时候到达。幸运的是,Knock 已经为您处理这里的繁重工作,通过一个跟踪与底层消息交互的 webhook 将事件路由到智能体 worker。在我们的例子中,只需点击“批准”或“拒绝”按钮。
首先,我们在 Knock 仪表板中设置一个 message.interacted webhook 处理程序,将交互转发给我们的 Worker,并最终转发给我们的代理进程。
在我们的示例中,我们将批准点击路由回 Worker 进行处理,将 Knock 消息 ID 附加到批准 URL 和拒绝 URL的末尾,以跟踪对所发送特定消息的参与度。我们通过 Knock 中消息模板中的 Liquid 来实现这一点:{{ data.approve_url }}?messageId={{ current_message.id }} 。需要注意的是,如果这是一个生产应用程序,我们很可能会在另一个应用程序中处理批准点击,而不是运行此代理。我们将其放在这里仅为了演示之目的。
点击链接后,worker 中的处理程序将使用 Knock 的消息交互 API将消息标记为已交互,并将状态作为元数据传递,以便以后使用。
import Knock from '@knocklabs/node';
import { Hono } from "hono";
const app = new Hono();
const client = new Knock();
app.get("/card-issued/approve", async (c) => {
const { messageId } = c.req.query();
if (!messageId) return c.text("No message ID found", { status: 400 });
await client.messages.markAsInteracted(messageId, {
status: "approved",
});
return c.text("Approved");
});
消息交互将通过我们设置的 webhook 从 Knock 进行,确保这个过程完全异步。webhook 的恶意有效负载包括完整的消息,其中包括有关生成原始请求的用户的元数据,并保留了有关请求本身的详细信息(在我们的例子中包含了工具调用)。
import { getAgentByName, routeAgentRequest } from "agents";
import { Hono } from "hono";
const app = new Hono();
app.post("/incoming/knock/webhook", async (c) => {
const body = await c.req.json();
const env = c.env as Env;
// Find the user ID from the tool call for the calling user
const userId = body?.data?.actors[0];
if (!userId) {
return c.text("No user ID found", { status: 400 });
}
// Find the agent DO for the user
const existingAgent = await getAgentByName(env.AIAgent, userId);
if (existingAgent) {
// Route the request to the agent DO to process
const result = await existingAgent.handleIncomingWebhook(body);
return c.json(result);
} else {
return c.text("Not found", { status: 404 });
}
});
我们利用代理通过命名标识符寻址的能力,将请求从 Worker 路由到代理。在我们的例子中,就是 userId 。因为代理由一个 Durable 对象提供支持,所以从传入 Worker 请求到查找并恢复代理的过程非常简单。
然后,我们使用原始工具调用的上下文(传递给 Knock 并往返回给智能体),恢复工具执行并发放卡片。
export class AIAgent extends AIChatAgent {
// ... other methods
async handleIncomingWebhook(body: any) {
const { toolkit } = await initializeToolkit(this);
const deferredToolCall = toolkit.handleMessageInteraction(body);
if (!deferredToolCall) {
return { error: "No deferred tool call given" };
}
// If we received an "approved" status then we know the call was approved
// so we can resume the deferred tool call execution
if (result.interaction.status === "approved") {
const toolCallResult =
await toolkit.resumeToolExecution(result.toolCall);
const { response } = await generateText({
model: openai("gpt-4o-mini"),
prompt: `You were asked to issue a card for a customer. The card is now approved. The result was: ${JSON.stringify(toolCallResult)}.`,
});
const message = responseToAssistantMessage(
response.messages[0],
result.toolCall,
toolCallResult
);
// Save the message so that it's displayed to the user
this.persistMessages([...this.messages, message]);
}
return { status: "success" };
}
}
同样,这里发生了很多事情,所以我们来逐步了解重要的部分:
我们尝试通过 handleMessageInteraction 方法将正文(来自 Knock 的 webhook 有效负载)转换为延迟工具调用
如果我们传递给之前交互调用的元数据状态为“approved”,那么我们通过 restoreToolExecution 方法恢复工具调用
最后,我们从 LLM 生成一条消息并将其保留,以确保用户了解已批准的信用卡
完成最后一部分后,我们现在可以要求签发新卡,从代理发出批准请求,发送批准消息,并将这些批准路由回我们的代理进行处理。只需很少的代码,代理将异步处理我们的发卡请求,并为我们恢复延迟的工具调用。
上述实现的一个问题是,如果有人点击批准按钮以上,我们很容易发放多张卡。为纠正这一问题,我们要跟踪正在发出的工具调用,并确保最多处理一次调用。
为了驱动这一点,我们利用 代理的内置 state,它可用于持久化信息,而无需访问其他持久性存储(如数据库或 Redis),尽管如果我们愿意,我们绝对可以这样做。我们可以通过工具 ID 跟踪工具调用,并在代理进程内捕获其当前状态。
type ToolCallStatus = "requested" | "approved" | "rejected";
export interface AgentState {
toolCalls: Record<string, ToolCallStatus>;
}
class AIAgent extends AIChatAgent<Env, AgentState> {
initialState: AgentState = {
toolCalls: {},
};
setToolCallStatus(toolCallId: string, status: ToolCallStatus) {
this.setState({
...this.state,
toolCalls: { ...this.state.toolCalls, [toolCallId]: status },
});
}
// ...
}
在这里,我们将工具调用的初始状态创建为空对象。我们还添加了一个快速设置辅助方法,以便更轻松地交互。
下一步,我们需要记录正在进行的工具调用。为此,我们可以使用 requireHumanInput 帮助程序中的 onAfterCallKnock 选项来捕获已为用户请求工具调用。
const { issueCard } = toolkit.requireHumanInput(
{ issueCard: issueCardTool },
{
// Keep track of the tool call state once it's been sent to Knock
onAfterCallKnock: async (toolCall) =>
agent.setToolCallStatus(toolCall.id, "requested"),
// ... as before
}
);
最后,我们需要在处理传入 webhook 时检查状态,并将工具调用标记为已批准(为简洁起见,省略了一些代码)。
export class AIAgent extends AIChatAgent {
async handleIncomingWebhook(body: any) {
const { toolkit } = await initializeToolkit(this);
const deferredToolCall = toolkit.handleMessageInteraction(body);
const toolCallId = result.toolCall.id;
// Make sure this is a tool call that can be processed
if (this.state.toolCalls[toolCallId] !== "requested") {
return { error: "Tool call is not requested" };
}
if (result.interaction.status === "approved") {
const toolCallResult = await toolkit.resumeToolExecution(result.toolCall);
this.setToolCallStatus(toolCallId, "approved");
// ... rest as before
}
}
}
使用 Agents SDK 和 Knock,可轻松构建推迟工具调用的高级人机交互体验。
Knock 的工作流构建器和通知引擎为您提供构建块,为您的代理创建复杂的跨渠道消息传递。您能轻松创建升级流程,通过短信、推送、电子邮件或 Slack 发送消息,同时尊重用户的通知偏好。Knock 还可以让您对用户接收的消息拥有完全的可见性。
Agents SDK 下方的 Durable Object 抽象意味着我们获得了一个可全局寻址的代理进程,很容易让出并恢复到这个进程。利用 Durable Object 中的持久性存储,我们可以保留每个用户的完整聊天历史记录,以及在智能体进程中恢复智能体所需的任何其他状态(例如我们的工具调用)。最后,底层 Durable Object 的无服务器特性意味着我们能够水平扩展,毫不费力地支持大量用户。
如果您希望利用多人循环体验构建自己的 AI 智能体聊天体验,可在GitHub中找到本指南中的完整代码。