测试版本确实有助于收集反馈和迭代改进,但并非所有用户都愿意充当测试者,或能够容忍测试软件中偶尔出现的问题。有时您需要那个大而闪亮的“正式发布”标签(或博客文章),现在轮到 Workflows 了。
我们的无服务器持久执行引擎 Workflows 现已正式发布,它允许您在 Workers 上构建长时间运行的多步骤应用(又称为“状态机”)。
简而言之,这意味着它已经生产就绪,但这并不意味着 Workflows 将变得僵化。我们将继续扩展 Workflows(包括更多并发实例),引入新功能(如新的 waitForEvent
API),并使利用我们的 Agents SDK 和 Workflows 构建 AI 智能体更加便捷。
如果您喜欢代码胜于文字,可以通过如下命令快速安装 Workflows 入门项目,并开始探索代码和 API:
npm create cloudflare@latest workflows-starter --
--template="cloudflare/workflows-starter"
Workflows 如何工作?我可以用它构建什么?我如何考虑使用 Workflows 和 Agents SDK 构建 AI 智能体?请继续阅读。
使用 Workflows 构建
Workflows 是一个基于 Cloudflare Workers 构建的持久执行引擎,让您能够构建有韧性的多步骤应用。
在其核心,Workflows 实现了一个基于步骤的架构,应用中到步骤均可独立重试,并且步骤间的状态会自动持久化。这意味着即使某个步骤因暂时性错误或网络问题而失败,Workflows 也可以仅重试该步骤,无需从头重新启动整个应用。
在定义 Workflow 时,您需要将应用拆分为逻辑步骤。
每个步骤可以执行代码(
step.do
),让 Workflow 进入休眠状态(step.sleep
或step.sleepUntil
),或等待事件(step.waitForEvent
)。在 Workflow 执行时,它会自动保留每个步骤返回的状态,从而确保您的应用即使在发生故障或休眠期之后也能从中断的地方继续执行。
这种持久执行模型特别适用于这行如下任务的应用:在多个系统间协调,按顺序处理数据,或需要处理可能持续数分钟、数小时甚至数天的长时间运行任务。
Workflows 特别适合处理传统无状态函数难以应对的复杂业务流程。
例如,一个电子商务订单处理 Workflow 可能包括检查库存、使用某个支付方式扣款、发送确认邮件以及更新数据库 —— 这些都是独立的步骤。如果支付处理步骤因临时服务中断而失败,Workflows 将在支付服务恢复可用时自动仅重试该步骤,且不会重复库存检查或重新启动整个流程。
您可以在下面看到其工作原理:对服务的每个调用都可以建模为一个步骤,可以独立重试,并在需要时从该步骤开始恢复:
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';
// The params we expect when triggering this Workflow
type OrderParams = {
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number }>;
paymentMethod: {
type: string;
id: string;
};
};
// Our Workflow definition
export class OrderProcessingWorkflow extends WorkflowEntrypoint<Env, OrderParams> {
async run(event: WorkflowEvent<OrderParams>, step: WorkflowStep) {
// Step 1: Check inventory
const inventoryResult = await step.do('check-inventory', async () => {
console.log(`Checking inventory for order ${event.payload.orderId}`);
// Mock: In a real workflow, you'd query your inventory system
const inventoryCheck = await this.env.INVENTORY_SERVICE.checkAvailability(event.payload.items);
// Return inventory status as state for the next step
return {
inStock: true,
reservationId: 'inv-123456',
itemsChecked: event.payload.items.length,
};
});
// Exit workflow if items aren't in stock
if (!inventoryResult.inStock) {
return { status: 'failed', reason: 'out-of-stock' };
}
// Step 2: Process payment
// Configure specific retry logic for payment processing
const paymentResult = await step.do(
'process-payment',
{
retries: {
limit: 3,
delay: '30 seconds',
backoff: 'exponential',
},
timeout: '2 minutes',
},
async () => {
console.log(`Processing payment for order ${event.payload.orderId}`);
// Mock: In a real workflow, you'd call your payment processor
const paymentResponse = await this.env.PAYMENT_SERVICE.processPayment({
customerId: event.payload.customerId,
orderId: event.payload.orderId,
amount: calculateTotal(event.payload.items),
paymentMethodId: event.payload.paymentMethod.id,
});
// If payment failed, throw an error that will trigger retry logic
if (paymentResponse.status !== 'success') {
throw new Error(`Payment failed: ${paymentResponse.message}`);
}
// Return payment info as state for the next step
return {
transactionId: 'txn-789012',
amount: 129.99,
timestamp: new Date().toISOString(),
};
},
);
// Step 3: Send email confirmation
await step.do('send-confirmation-email', async () => {
console.log(`Sending confirmation email for order ${event.payload.orderId}`);
console.log(`Including payment confirmation ${paymentResult.transactionId}`);
return await this.env.EMAIL_SERVICE.sendOrderConfirmation({ ... })
});
// Step 4: Update database
const dbResult = await step.do('update-database', async () => {
console.log(`Updating database for order ${event.payload.orderId}`);
await this.updateOrderStatus(...)
return { dbUpdated: true };
});
// Return final workflow state
return {
orderId: event.payload.orderId,
processedAt: new Date().toISOString(),
};
}
}
这种持久性、自动重试和状态持久化的组合,使 Workflows 非常适用于构建可靠的分布式应用,能够优雅地处理现实世界中的故障。
人机交互
Workflows 本质上就是代码,这为它们赋予了强大能力:您可以动态、实时定义步骤,按条件执行分支,并向任何所需的系统发起 API 调用。但有时,您还需要一个 Workflow 等待现实世界中的某个事件发生。
例如:
需要人工批准以继续进行。
传入的 webhook,例如来自 Stripe 付款或 GitHub 活动的 webhook。
状态更改,例如一个文件上传到 R2,会触发事件通知,然后将对文件的引用推送到 Workflow,以便后者处理文件(或由 AI 模型处理)。
Workflows 新增的 waitForEvent
API 就能帮您实现这一点:
let event = await step.waitForEvent<IncomingStripeWebhook>("receive invoice paid webhook from Stripe", { type: "stripe-webhook", timeout: "1 hour" })
然后,您可以从任何可以发出 HTTP 请求的外部服务:
curl -d '{"transaction":"complete","id":"1234-6789"}' \
-H "Authorization: Bearer ${CF_TOKEN}" \
\ "https://api.cloudflare.com/client/v4/accounts/{account_id}/workflows/{workflow_name}/instances/{instance_id}/events/{event_type}"
……或通过 Worker 本身内的 Workers API 向一个特定实例发送事件:
interface Env {
MY_WORKFLOW: Workflow;
}
interface Payload {
transaction: string;
id: string;
}
export default {
async fetch(req: Request, env: Env) {
const instanceId = new URL(req.url).searchParams.get("instanceId")
const webhookPayload = await req.json<Payload>()
let instance = await env.MY_WORKFLOW.get(instanceId);
// Send our event, with `type` matching the event type defined in
// our step.waitForEvent call
await instance.sendEvent({type: "stripe-webhook", payload: webhookPayload})
return Response.json({
status: await instance.status(),
});
},
};
您甚至可以使用 type
参数来等待多个事件,和/或使用 Promise.race
以便根据哪个事件最先接收到来继续执行:
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
let state = await step.do("get some data", () => { /* step call here /* })
// Race the events, resolving the Promise based on which event
// we receive first
let value = Promise.race([
step.waitForEvent("payment success", { type: "payment-success-webhook", timeout: "4 hours" ),
step.waitForEvent("payment failure", { type: "payment-failure-webhook", timeout: "4 hours" ),
])
// Continue on based on the value and event received
}
}
为了更详细一点可视化 waitForEvent
,我们假设有一个 Workflow,由监视 GitHub 存储库的代码审查代理触发。
如果没有等待事件的能力,我们的 Workflow 无法轻松获得人类批准以写回建议(甚至不能提交自己的 PR)。它可以轮询某个已更新的状态,但这意味着我们必须在任意时间段调用 step.sleep
,轮询存储服务以获取更新值,如果不存在则重复此操作。这需要大量代码,并存在可能出错的空间:
如果没有 waitForEvent,会更难将数据发送到运行中的 Workflow 实例
如果我们修改同一示例以整合新的 waitForEvent API,就可以在进行变更操作前等待人工批准:
将 waitForEvent 添加到我们的代码审查工作流程,以便它可以寻求明确的批准。
您甚至可以设想一个 AI 智能体自己代表人类发送和/或执行操作:waitForEvent
只是为 Workflow 提供了一种方法,可以在继续(或不继续)之前检索并暂停等待现实世界中的某件事发生变化。
最重要的是,您可以像调用 Workflows 中的任何其他步骤一样调用 waitForEvent
:您可以有条件地调用它,和/或多次调用,和/或在循环中调用。Workflows 就是 Workers:您拥有某种编程语言的全部能力,不受领域特定语言(DSL)或配置语言的限制。
定价
好消息:自最初的测试版公告以来,我们并没有做出太多调整!我们加上了 Workflows 存储状态所用存储的价格,并维持基于 CPU 和请求(调用)的定价,具体如下:
单位 | Workers Free | Workers Paid |
CPU 时间 (ms) | 10 ms / Workflow | 包含 3000 万 CPU ms/月 +0.02 美元/每增加 100 万 CPU ms |
请求 | 100,000 次/日 Workflow 调用(与Workers 共享) | 包含 1000 万/月 + 0.30 美元/每增加 100 万 |
存储 (GB) | 1 GB | 包含 1 GB 每月 + 0.20 美元/ GB-月 |
由于存储定价是新增项目,我们将到 2025 年 9 月 15 日才才开始对存储进行计费。对于超过 1 GB 限制的情况,我们将在收费前提前通知用户。默认情况下,Workflows 的存储状态将在三 (3) 天(Free 计划)或三十 (30) 天(付费计划)后过期。
这里的“CPU 时间”是指您的 Workflow 消耗计算资源的时间。它不包括等待API调用、推理 LLM 或其他 I/O(如写入数据库)所花费的时间。这看似微小,但在实践中会产生累积效应:大多数应用的 CPU 时间只有几毫秒,而挂钟时间达到数秒。一两个 API 响应加起来耗时就可能长达 100 - 250 毫秒。
按 CPU 计费,而不是按 Workflow 空闲或等待时所花费的时间计费。
Workflow 引擎往往会花费大量时间等待:从对象存储(例如 Cloudflare R2)读取数据,调用第三方 API 或 LLM (例如 o3-mini 或 Claude 3.7),甚至是查询数据库(例如 D1、 Postgres 或 MySQL)。使用 Workflows 与 Workers 完全相同:您不需要为应用的等待时间付费。
开始构建
现在,您已经明白 Workflows 是什么,如何工作,并且准备开始构建。下一步是什么?
访问 Workflows 文档,了解其工作原理、 Workflows API 和最佳实践
查看入门项目的代码
最后,只需点击几下,即可将入门项目部署到您自己的 Cloudflare 帐户: