これは、KnockのCTOであるChris Bell氏によるゲスト投稿です
今、AIエージェントの構築についてはよく話題になっていますが、AIエージェントを真に有用にするために必要なことについてはあまり知られていません。
エージェントは、人間の介入なしに、特定の目標または一連の目標を達成するために意思決定を行い、アクションを実行するように設計された自律システムです。
エージェントがどれほど意思決定が優れていても、エージェントが目標に向けて進む道のりについてガイダンスやインプットを提供する人が必要になります。結局、外部の世界とそれを管理するシステムとの対話や応答ができないエージェントは、解決できる問題が限定的になります。
そこで登場するのが「ヒューマン・イン・ザ・ループ」と呼ばれる対話パターンです。人間をエージェントのループに入れて、エージェントがタスクを継続する前に、その人間からの入力を必要とします。
このブログ記事では、KnockとCloudflareAgents SDKを使用して、新規カードがリクエストされた際に人間の承認が必要な仮想カード発行ワークフロー用のAIエージェントを構築します。
この例の完全なコードは、リポジトリで確認できます。
Knockとは?
Knockはメッセージングインフラストラクチャで、統合コードを書くことなく、アプリ内、メール、SMS、プッシュ、Slackでマルチチャンネルメッセージを送信することができます。
Knockを使用すると、ユーザーに送信されるメッセージを完全に可視化できると同時に、信頼性の高い配信やユーザー通知の設定などの処理が可能になります。
Knockを使用して、KnockのAPIとメッセージ機能をAIエージェントに公開する一連のツールであるKnockのエージェントツールキットを使用して、エージェントのヒューマンインザループフローを強化することができます。
AIエージェントの基盤としてエージェントSDKを使用する
Agents SDKは、Durable Objects上にステートフルなリアルタイムエージェントを構築するための抽象化を提供します。Durable Objectsは、グローバルにアドレス指定でき、埋め込まれた、ゼロ遅延のSQLiteデータベースを使って状態を保持します。
Agents SDKとCloudflareプラットフォームの外でAIエージェントを構築する場合、WebSocketサーバー、状態の永続性、サービスを水平に拡張する方法を考慮する必要があります。Durable ObjectがAgents SDKをサポートしているため、これらのメリットを無料で享受できます。また、ストレージが組み込まれたグローバルでアドレス指定可能なコンピュートを持つことができます。これは完全にサーバーレスで、ゼロに拡張できます。
例では、これらの機能を使って、ユーザーがチャットを介してリアルタイムでやり取りし、必要に応じて一時停止や再開が可能なエージェントを構築します。Agents SDKは、ヒューマンインザループ(Human-in-the-Loop)対話で必要となるような非同期のエージェントワークフローを実行するのに理想的なプラットフォームです。
Knockメッセージングのワークフローを設定する
Knock内では、Visual Workflow Builderを使って承認ワークフローを設計し、クロスチャネルメッセージのロジックを作成しています。次に、メッセージを送信する各チャンネルに関連付けた通知テンプレートを作成します。
Knockは、ワークフローの実行の一部としてユーザーの環境設定を自動的に適用し、ユーザーの通知設定を確実に反映します。
このデモ用にすでに作成したワークフロー例をリポジトリで確認できます。このKnock CLIを介してこのワークフローテンプレートを使用して、アカウントにインポートすることができます。
チャットUIの構築
CloudflareのエージェントSDK(ドキュメント )の AIChatAgent の抽象化の上に、チャットインターフェースとしてAIエージェントを構築しました。
ここのAgents SDKは複雑さの大部分を処理し、私たちはプロンプトでLLM呼び出しコードを実装するだけです。
// src/index.ts
import { AIChatAgent } from "agents/ai-chat-agent";
import { openai } from "@ai-sdk/openai";
import { createDataStreamResponse, streamText } from "ai";
export class AIAgent extends AIChatAgent {
async onChatMessage(onFinish) {
return createDataStreamResponse({
execute: async (dataStream) => {
try {
const stream = streamText({
model: openai("gpt-4o-mini"),
system: `You are a helpful assistant for a financial services company. You help customers with credit card issuing.`,
messages: this.messages,
onFinish,
maxSteps: 5,
});
stream.mergeIntoDataStream(dataStream);
} catch (error) {
console.error(error);
}
},
});
}
}
クライアント側では、 agents/ai-react
パッケージの useAgentChat
フックを使用して、ユーザーとエージェントのリアルタイムチャットを強化します。
エージェントをユーザーごとのチャットとしてモデル化しました。ユーザーIdとしてプロセス名を指定し、
useAgentフックを使用してセットアップします。
// src/index.ts
import { useAgent } from "agents/react";
import { useAgentChat } from "agents/ai-react";
function Chat({ userId }: { userId: string }) {
const agent = useAgent({ agent: "AIAgent", name: userId });
const { messages, input, handleInputChange, handleSubmit, isLoading } = useAgentChat({ agent });
// ...
}
これはつまり、ユーザーごとにエージェントプロセスがあり、耐久性のあるオブジェクトがあるということです。ヒューマン・イン・ザ・ループのユースケースでは、これは後に、遅延したツール呼び出しを再開することについて説明する際に重要になります。
ツールの呼び出しをKnockに先延ばしに
弊社ではissueCard
ツールを公開することで、エージェントにカード発行機能を提供します。しかし、承認フローとクロスチャネルロジックを自分たちで書くのではなく、問題カードツールを必要とするHumanInput
メソッドでラップすることによって、すべてをKnockに委任しました。
今、ユーザーが新しいカードを要求すると、Knockにカード要求を開始するための呼び出しを行い、Knockは組織内の適切な管理者に通知して承認を要求します。
これを設定するには、Knockのエージェントツールキットを使用する必要があります。このキットは、AIエージェントでKnockと連携し、クロスチャネルメッセージングを強化する方法を公開します。
import { createKnockToolkit } from "@knocklabs/agent-toolkit/ai-sdk";
import { tool } from "ai";
import { z } from "zod";
import { AIAgent } from "./index";
import { issueCard } from "./api";
import { BASE_URL } from "./constants";
async function initializeToolkit(agent: AIAgent) {
const toolkit = await createKnockToolkit({ serviceToken: agent.env.KNOCK_SERVICE_TOKEN });
const issueCardTool = tool({
description: "Issue a new credit card to a customer.",
parameters: z.object({
customerId: z.string(),
}),
execute: async ({ customerId }) => {
return await issueCard(customerId);
},
});
const { issueCard } = toolkit.requireHumanInput(
{ issueCard: issueCardTool },
{
workflow: "approve-issued-card",
actor: agent.name,
recipients: ["admin_user_1"],
metadata: {
approve_url: `${BASE_URL}/card-issued/approve`,
reject_url: `${BASE_URL}/card-issued/reject`,
},
}
);
return { toolkit, tools: { issueCard } };
}
多くのことが行われているため、主要な部分について順に説明しましょう。
IsissueCard
ツールをrecreedHumanInput
メソッドでラップし、Knock Agentツールキットから公開されます。メッセージングのワークフローを
カード発行承認
のワークフローにするリクエストのアクター
としてagent.nameを渡します。これはユーザーIDに変換されます。このワークフローの受信者をユーザー
admin_user_1
に設定します。メッセージテンプレートで使用できるように、承認URLと拒否URLを渡します
ラップされたツールが
issueCard
として返される
内部では、これらのオプションがKnockワークフロートリガーAPIに渡され、受信者ごとにワークフローを呼び出します。ここに一覧化されている受信者のセットは動的であるか、KnockのサブスクリプションAPIを通じてユーザーグループに送られているかもしれません。
そして、ラップされた発行カードツールをエージェントのonChatMessage
メソッドでLLMコールに渡すことができ、エージェントとのインタラクションの一部としてツールコールを呼び出すことができます。
export class AIAgent extends AIChatAgent {
// ... other methods
async onChatMessage(onFinish) {
const { tools } = await initializeToolkit(this);
return createDataStreamResponse({
execute: async (dataStream) => {
const stream = streamText({
model: openai("gpt-4o-mini"),
system: "You are a helpful assistant for a financial services company. You help customers with credit card issuing.",
messages: this.messages,
onFinish,
tools,
maxSteps: 5,
});
stream.mergeIntoDataStream(dataStream);
},
});
}
}
エージェントがissueCardTool
を呼び出すと、Knockを呼び出して承認通知を送信し、承認を受けるまでカードを発行するツールコールを遅らせます。Knockのワークフローは、指定された受信者グループへのメッセージの送信を処理し、各ユーザーの好みに合わせてメッセージを生成・配信します。
当社の承認メッセージにKnockワークフローを使用することで、ユーザーのコミュニケーションプリファレンスに合わせたクロスチャネルメッセージングを簡単に構築することができます。また、遅延、スロットル、バッチ、条件を利用して、より複雑なメッセージングを調整することもできます。
承認の処理
メッセージが承認者に送信されたら、次のステップは、返されてくる承認の処理を行い、人間をエージェントのループに入れることです。
承認リクエストは非同期的です。つまり、将来の時点はいつでも応答可能です。幸いなことに、Knockがここでの面倒な作業を担い、基盤となるメッセージとのインタラクションを追跡するWebhookを介してエージェントWorkerにイベントをルーティングします。この場合は、「承認」または「却下」ボタンをクリックします。
まず、Knockダッシュボード内にmessage.interacted
webhookハンドラーを設定し、インタラクションをWorkerに、そして最終的にはエージェントプロセスに転送します。
ここの例では、承認のクリックをWorkerにルーティングして処理し、確認用URL
と排除_URL
の最後にKnockメッセージIDを追加して、送信された特定のメッセージに対してエンゲージメントを追跡します。これは、Knockのメッセージテンプレートの中にあるリキッドを介して行っています: {{ data.approve_url }}?messageId={{ current_message.id }}
。ここで注意すべき点は、これが本番アプリケーションの場合、このエージェントが実行しているわけではなく、別のアプリケーションで承認のクリックを処理する可能性があるということです。このデモのためだけに、こちらに設置しました。
リンクがクリックされると、workerにハンドラーがあり、KnockのメッセージインタラクションAPIを使ってメッセージが対話済みであるとマークし、ステータスをメタデータとして通過させて、後で使用できるようにします。
import Knock from '@knocklabs/node';
import { Hono } from "hono";
const app = new Hono();
const client = new Knock();
app.get("/card-issued/approve", async (c) => {
const { messageId } = c.req.query();
if (!messageId) return c.text("No message ID found", { status: 400 });
await client.messages.markAsInteracted(messageId, {
status: "approved",
});
return c.text("Approved");
});
メッセージの対話は、当社が設定したWebhookを介してKnockからworkerに流れ、プロセスが完全に非同期であるようにします。Webhookの悪意のあるペイロードには、元のリクエストを生成したユーザーに関するメタデータを含む完全なメッセージが含まれ、リクエスト自体(この場合はツール呼び出しを含む)に関する詳細を保持します。
import { getAgentByName, routeAgentRequest } from "agents";
import { Hono } from "hono";
const app = new Hono();
app.post("/incoming/knock/webhook", async (c) => {
const body = await c.req.json();
const env = c.env as Env;
// Find the user ID from the tool call for the calling user
const userId = body?.data?.actors[0];
if (!userId) {
return c.text("No user ID found", { status: 400 });
}
// Find the agent DO for the user
const existingAgent = await getAgentByName(env.AIAgent, userId);
if (existingAgent) {
// Route the request to the agent DO to process
const result = await existingAgent.handleIncomingWebhook(body);
return c.json(result);
} else {
return c.text("Not found", { status: 404 });
}
});
名前付きの識別子によってアドレス指定されるエージェントの能力を利用して、リクエストをworkerからエージェントにルーティングします。この場合、これは userId
です。エージェントは耐久性のあるオブジェクトで支えられているため、incoming workerリクエストからエージェントを見つけて再開するまでのプロセスは簡単です。
遅延したツール呼び出しを再開します
次に、元のツール呼び出しに関するコンテキストを使用し、Knockに渡してエージェントにラウンドトリップし、ツールの実行を再開し、カードを発行します。
export class AIAgent extends AIChatAgent {
// ... other methods
async handleIncomingWebhook(body: any) {
const { toolkit } = await initializeToolkit(this);
const deferredToolCall = toolkit.handleMessageInteraction(body);
if (!deferredToolCall) {
return { error: "No deferred tool call given" };
}
// If we received an "approved" status then we know the call was approved
// so we can resume the deferred tool call execution
if (result.interaction.status === "approved") {
const toolCallResult =
await toolkit.resumeToolExecution(result.toolCall);
const { response } = await generateText({
model: openai("gpt-4o-mini"),
prompt: `You were asked to issue a card for a customer. The card is now approved. The result was: ${JSON.stringify(toolCallResult)}.`,
});
const message = responseToAssistantMessage(
response.messages[0],
result.toolCall,
toolCallResult
);
// Save the message so that it's displayed to the user
this.persistMessages([...this.messages, message]);
}
return { status: "success" };
}
}
ここでも多くのことが行われているため、重要な部分について順に説明します。
本文(Knockからのwebhookペイロード)を
handleMessageInteraction
メソッドを介して遅延ツール呼び出しに変換しようとします。先ほどインタラクション呼び出しに渡したメタデータのステータスが「承認」ステータスの場合、
ResumeToolExecution
メソッドを介してツール呼び出しを再開します最後に、LLMからメッセージを生成して保持し、ユーザーに承認されたカードについて知らせます
この最後のピースが配置されたことで、新しいカードの発行をリクエストし、エージェントから承認リクエストを送付し、承認メッセージを送信し、その承認をエージェントにルーティングして処理することができます。エージェントはカード発行リクエストを非同期的に処理し、遅延ツールコールはほとんど少ないコードで再開されます。
重複承認から保護
上記の実装の問題の1つは、誰かが承認ボタンを複数回クリックすると、複数のカードが発行される傾向があることです。この問題を是正するために、発行されているツールコールを追跡し、呼び出しが最も一度に処理されるようにしたいと考えています。
これを強化するために、データベースやRedisのような他の永続ストアに到達することなく、情報を保持することができるエージェントのビルトインステートを活用しますが、その気になれば絶対にそうすることもできます。エージェントプロセスの中で、IDによってツールの呼び出しを追跡し、現在のステータスを把握することができます。
type ToolCallStatus = "requested" | "approved" | "rejected";
export interface AgentState {
toolCalls: Record<string, ToolCallStatus>;
}
class AIAgent extends AIChatAgent<Env, AgentState> {
initialState: AgentState = {
toolCalls: {},
};
setToolCallStatus(toolCallId: string, status: ToolCallStatus) {
this.setState({
...this.state,
toolCalls: { ...this.state.toolCalls, [toolCallId]: status },
});
}
// ...
}
ここでは、ツール呼び出しの初期状態を空のオブジェクトとして作成します。また、クイックセクターヘルパーメソッドを追加して、インタラクションを容易にします。
次に、ツールの呼び出しを記録する必要があります。これを行うには、 requireHumanInput
ヘルパーのonAfterCallKnockオプションを使って、ツール呼び出しがユーザーにリクエストされたことをキャプチャします。
const { issueCard } = toolkit.requireHumanInput(
{ issueCard: issueCardTool },
{
// Keep track of the tool call state once it's been sent to Knock
onAfterCallKnock: async (toolCall) =>
agent.setToolCallStatus(toolCall.id, "requested"),
// ... as before
}
);
最後に、受信webhookを処理しているときに状態をチェックし、ツール呼び出しを承認としてマークする必要があります(簡潔にするために一部のコードを省略しています)。
export class AIAgent extends AIChatAgent {
async handleIncomingWebhook(body: any) {
const { toolkit } = await initializeToolkit(this);
const deferredToolCall = toolkit.handleMessageInteraction(body);
const toolCallId = result.toolCall.id;
// Make sure this is a tool call that can be processed
if (this.state.toolCalls[toolCallId] !== "requested") {
return { error: "Tool call is not requested" };
}
if (result.interaction.status === "approved") {
const toolCallResult = await toolkit.resumeToolExecution(result.toolCall);
this.setToolCallStatus(toolCallId, "approved");
// ... rest as before
}
}
}
まとめ
Agents SDKとKnockを使用すれば、ツールの呼び出しを遅らせる高度なヒューマンインザループエクスペリエンスを簡単に構築できます。
Knockのワークフロービルダーと通知エンジンは、エージェントに洗練されたクロスチャンネルメッセージを作成するためのビルディングブロックを提供します。SMS、プッシュ、メール、Slackでメッセージを送信し、ユーザーの通知設定を尊重するエスカレーションフローを簡単に作成することができます。また、Knockはユーザーが受信しているメッセージを完全に可視化することもできます。
Agents SDKの下にあるDurable Object抽象化は、中断や再開が簡単にできるグローバルにアドレス指定できるエージェントプロセスを提供することを意味します。Durable Objectの永続ストレージは、ユーザーごとの完全なチャット履歴や、エージェントを再開するために必要なその他の状態(ツール呼び出しなど)を保持できることを意味します。最後に、基盤となるDurable Objectのサーバーレス性質により、労力をかけずに水平方向に拡張し、多数のユーザーをサポートします。
マルチプレイヤーのヒューマンインザループ体験で独自のAIエージェントチャット体験を構築することをお考えの方は、このガイドに掲載されている完全なコードがGitHubで利用可能です。