订阅以接收新文章的通知:

构建 Jetflow:Cloudflare 实现灵活、高性能数据管道的框架

2025-07-23

9 分钟阅读时间
这篇博文也有 English日本語版本。

此内容已使用自动机器翻译服务进行翻译,仅供您参考及阅读便利。其中可能包含错误、遗漏,或与原始英文版本存在理解方面的细微差别。如有疑问,请参考原始英文版本。

Cloudflare 商业智能团队管理着一个 PB 级数据湖,每天从许多不同的来源获取数千个表的数据。包括 Postgres 和 ClickHouse 等内部数据库,以及 Salesforce 等外部 SaaS 应用程序。这些任务通常很复杂,而且表格每天可能会有数亿或数十亿行新数据。它们对于产品决策、增长规划和内部监控也是业务关键型。总体而言,每天大约会摄取 1410 亿行

随着 Cloudflare 的不断发展,数据量也变得越来越大、越来越复杂。我们的 Extract Load Transform (ELT) 解决方案已无法再满足我们的技术和业务要求。在评估了其他常见的 ELT 解决方案后,我们的结论是,它们的性能总体上也无法超越我们当前的系统。

显然,我们需要构建自己的框架来满足我们的独特需求——Jetflow 应运而生。

我们取得的成就

超过 100 倍的 GB-s 效率提升

  • 我们处理 190 亿行数据的运行时间最长的作业在使用 300 GB 内存 时需要 48 小时 ,现在,使用 4 GB 内存 只需 5.5 小时即可完成

  • 根据商业云提供商公布的费率,我们估计通过 Jetflow 从 Postgres 引入 50 TB 的成本可能不到 100 美元

超过 10 倍性能提升:

  • 我们的最大数据集速度为每秒 60-80,000 行,现在是每个数据库连接每秒 2-500 万 行。

  • 此外,这些数字对于某些数据库可以通过多个数据库连接进行扩展。

可扩展性:

  • 模块化设计易于扩展和测试今天 Jetflow 可以与 ClickHouse、Postgres、Kafka、许多不同的 SaaS API、Google BigQuery 等配合使用。它继续良好地运行并通过增加新的用例来保持灵活性。

我们是怎样做到的?

要求

设计新框架的第一步必须是清楚我们要解决的问题,并提出明确的要求,以防造成新的问题。

高性能和高效

我们需要能够在更短的时间内迁移更多数据,因为一些摄取作业需要大约 24 小时,而且我们的数据只会增长。数据应该以流传输的方式摄取,并且使用的内存和计算资源比我们现有的解决方案要少。

向后兼容

考虑到每天摄取数千个表,所选解决方案需要允许按需迁移单个表。由于我们使用 Spark 下游,而 Spark 在合并不同的 Parquet 模式时有限制,所选择的解决方案必须提供灵活性,以生成每种情况所需的精确模式,以匹配传统的模式。

我们还需要与我们的自定义元数据系统无缝集成,用于依赖项检查和作业状态信息。

易用性

我们想要一个可以进行版本控制的配置文件,不会因为许多并发更改而给存储库带来瓶颈。

为了提高团队中不同角色的可访问性,另一个要求是在绝大多数情况下使用无代码(或配置即代码)。用户不应该担心源系统和目标系统之间数据类型的可用性或转换,也不必为每个新的摄取编写新的代码。所需的配置也应该尽可能简单,例如,数据模式应该是从源系统推断出来的,而不需要由用户提供。

可自定义

在与上述无代码要求之间取得平衡,尽管我们希望降低进入门槛,但我们也希望可以根据需要提供调优和覆盖选项,提供一个灵活且可选的配置层。例如,写入 Parquet 文件通常比从数据库中读取更昂贵,因此我们希望能够根据需要分配更多的资源和并发。

此外,我们希望允许控制工作的执行位置,能够在不同的线程、不同的容器或不同的机器上启动并发 Worker。Workers 的执行和数据通信被抽象为一个接口,并且可以编写和注入不同的实现,通过作业配置来控制。

可测试

我们需要一个能够在容器化环境中本地运行的解决方案,这将允许我们为管道的每个阶段编写测试。对于“黑匣子”解决方案,测试往往意味着在更改后验证输出,这是一个缓慢的反馈循环,有可能因为内部对所有代码路径没有良好的可见性而无法测试所有的边缘情况,而且调试问题也很痛苦。

设计灵活的框架

为了构建一个真正灵活的框架,我们将管道分解为不同的阶段,然后创建一个配置层来定义这些阶段中管道的组成,以及任何配置覆盖。每个逻辑上有意义的管道配置都应该正确执行,并且用户不应该创建不起作用的管道配置。

管道配置

因此,我们在设计中创建了一个阶段,这些阶段根据有意义的不同类别进行了分类:

  • 消费者

  • 变换器

  • 加载程序

该管道是通过一个 YAML 文件构建的,需要一个消费者、零个或多个 Transformer 以及至少一个加载程序。消费者(通过从源系统读取)创建数据流,变换器(例如数据转换、验证)接受数据流输入并输出符合相同 API 的数据流,以便它们可以链接起来。Loader 具有相同的数据流式传输接口,但这些阶段具有持续影响,即保存数据的阶段丢弃在外部系统中的

这种模块化设计意味着每个阶段都是可独立测试的,并且具有从共享基本阶段继承的共享行为(例如错误处理和并发),从而显着减少新用例的开发时间,并增强对代码正确性的信心。

数据部门

接下来,我们为数据设计了一个分解,这允许管道在整个管道重新运行时以及由于暂时性错误而内部重试任何数据分区时都是幂等的。我们决定进行一种设计,让我们在维持有意义的数据划分的同时,进行并行处理,使管道能够在需要重试的地方执行数据清理。

  • Runinstance:最细粒度的划分,对应于管道单次运行的业务单位(例如,一个月/天/小时的数据)。

  • 分区: RunInstance 的一个划分,允许从行数据分配到一个分区,这种方式是确定性和不言而喻的,无需外部状态,因此在重试时具有幂等性。(例如 accountId 范围,10 分钟间隔)

  • 批处理:对分区数据的一种非确定性划分,仅用于将数据分解成较小的块进行流式/并行处理,以便使用更少的资源实现更快的处理。(例如,1 万行,50 MB)

用户在消费者阶段 YAML 中配置的选项,不仅可以构建用于从源系统检索数据的查询,还以一种与系统无关的方式对该数据划分的语义进行编码,以便后续阶段能够理解此数据划分的含义。例如,此分区包含所有帐户 ID 0-500 的数据。这意味着,我们可以进行有针对性的数据清理,例如,如果由于错误而重试单个数据分区,则可以避免重复的数据条目。

框架实施

阶段兼容性的标准内部状态

我们最常见的用例是从数据库读取,转换为 Parquet 格式,然后保存到对象存储,其中每个步骤都是一个单独的阶段。随着越来越多的用例加入到 Jetflow,我们必须确保,如果有人编写了一个新的阶段,它将与其他阶段兼容。我们不想造成这种情况:需要为每种输出格式和目标系统编写新的代码,或者最终会为每个不同的用例使用自定义管道。

解决这个问题的方法是让我们的阶段提取器类只允许以单一格式输出数据。这意味着,只要任何下游阶段在输入和输出格式中支持这种格式,它们就会与管道的其余部分兼容。回想起来,这似乎是显而易见的,但在内部这是一次痛苦的学习经历,因为我们最初创建了一个自定义类型系统,并为不同阶段的互操作性而苦苦挣扎。

对于这种内部格式,我们选择使用 Arrow,这是一种内存中列数据格式。这种格式给我们带来的主要好处是:

  • Arrow 生态系统:现在许多数据项目都支持 Arrow 作为输出格式。这意味着,当我们为新数据源编写提取器阶段时,生成 Arrow 输出通常很容易。

  • 无序列化开销:可以以最小的开销在机器甚至编程语言之间轻松移动 Arrow 数据。Jetflow 的设计从一开始就通过作业控制器接口灵活地通过,以便能够在各种系统中运行。这种数据传输效率意味着在创建分布式实现时对性能的影响最小。

  • 以固定大小的大批次保留内存,以避免内存分配:由于 Go 是一种垃圾收集 (GC) 语言,并且 GC 周期时间主要受对象数量而非大小的影响,因此更少的堆对象可以减少所花费的 CPU 时间进行大量垃圾收集,即使总大小相同。在一个 GC 周期中,要扫描和可能收集的对象数量会随着分配次数而增加。如果我们有 8192 行,每行 10 列,Arrow 只需要我们进行 10 次分配,而大多数驱动程序需要进行 8192 次分配。逐行分配,意味着 Arrow 可以减少对象数量和缩短 GC 周期时间。

将行转换为列

另一个重要的性能优化是减少读取和处理数据时发生的转换步骤数。大多数数据摄取框架在内部将数据表示为行。在我们的案例中,我们主要以 Parquet 格式写入数据,这是基于列的。从基于列的源(例如ClickHouse,其中大多数驱动程序接收 RowBinary 格式),转换为特定语言实现的基于行的内存表示的效率很低。然后,将其从行转换为列,以写入 Parquet 文件。这些转换会对性能产生显着的影响。

Jetflow 从基于列的源以列格式(例如 ClickHouse 原生的块格式)读取数据,然后将这些数据复制为 Arrow 列格式。然后直接从 Arrow 列写入 Parquet 文件。简化这个过程能够提高性能。

编写管道的每个阶段

案例研究:ClickHouse

在测试 Jetflow 的初始版本时,我们发现,由于 ClickHouse 的架构,使用额外的连接不会有任何好处,因为 ClickHouse 的读取数据的速度比我们接收数据的速度快。然后,通过更优化的数据库驱动程序,更好地利用这个连接来每秒读取更多的行,而不需要额外的连接。

最初,有一个为 ClickHouse 编写的自定义数据库驱动程序,但我们最终切换到出色的 ch-go 底层库,它直接从 ClickHouse 以列式格式读取区块。与标准的 Go 驱动相比,这对性能产生了显着的影响。结合上述框架优化,我们现在可以通过单个 ClickHouse 连接吸收每秒数百万行的数据

一个宝贵的经验是,与任何软件一样,通常是为了方便起见,或常见用例可能与您自己的用例不匹配而做出权衡。大多数数据库驱动程序往往不适合读取大批量的行,并且每行的开销很高。

案例研究:Postgres

对于 Postgres,我们使用优秀的 Jackc/pgx 驱动,但没有使用 database/sql Scan 接口,而是直接接收每一行的原始字节,并对每个 Postgres OID(对象标识符)类型使用 Jackc/pgx 内部扫描功能.

Go 中的 database/sql Scan 接口使用反射来理解传递给函数的类型,然后也使用反射将从 Postgres 接收的列值设置到每个字段。在典型场景中,这足够快速且易于使用,但就性能而言,我们的用例无法满足。Jackc/pgx 驱动程序会重用每次请求下一 Postgres 行时产生的行字节,从而导致每行零分配。这使我们能够在 Jetflow 内编写高性能、低分配的代码。通过这种设计,我们能够实现大多数表的每个 Postgres 连接每秒处理近 60 万行数据,而且内存使用量极低。

总结

截至 2025 年 7 月初,该团队每天通过 Jetflow 摄取 770 亿条记录。其余作业正在迁移到 Jetflow ,这将使每日总摄取量达到 1410 亿条记录。借助该框架,我们能够摄取原本无法实现的情况下的表。并且,由于摄取的运行时间更短、使用的资源更少,因此我们节省了大量成本。

未来,我们计划将该项目的源代码开源,如果您有兴趣加入我们的团队来帮助开发这样的工具,那么可以访问 https://www.cloudflare.com/careers/jobs/ 查看空缺职位。

我们保护整个企业网络,帮助客户高效构建互联网规模的应用程序,加速任何网站或互联网应用程序抵御 DDoS 攻击,防止黑客入侵,并能协助您实现 Zero Trust 的过程

从任何设备访问 1.1.1.1,以开始使用我们的免费应用程序,帮助您更快、更安全地访问互联网。要进一步了解我们帮助构建更美好互联网的使命,请从这里开始。如果您正在寻找新的职业方向,请查看我们的空缺职位
DataGo性能DesignEngineering

在 X 上关注

Cloudflare|@cloudflare

相关帖子