在处理大量数据时,快速获取概览是非常有帮助的——这正是 SQL 中的聚合所提供的功能。聚合,也被称为“GROUP BY 查询”,能提供鸟瞰式的视角,让您能迅速从海量数据中获得洞察。
正因如此,我们非常激动地宣布:R2 SQL 现已支持聚合功能。R2 SQL 是 Cloudflare 推出的无服务器、分布式分析查询引擎,能够对存储在 R2 Data Catalog 中的数据进行 SQL 查询。聚合功能将帮助 R2 SQL 用户发现数据中的重要趋势与变化、生成报告,并在日志中找出异常。
此次发布基于已支持的过滤查询功能,后者是分析工作负载的基础,允许用户在 Apache Parquet 文件的大量数据中找到所需的信息。
在本文中,我们将详细介绍聚合功能的用途和特点,然后深入探讨我们如何扩展 R2 SQL 以支持在存储在 R2 Data Catalog 中的海量数据上运行此类查询。
聚合也称为“GROUP BY 查询”,可以生成底层数据的简要汇总。
一个常见的聚合用例是生成报告。假设有一个名为“sales”的表,其中包含某组织在各个国家/地区和部门的历史销售数据。我们可以使用如下聚合查询轻松生成按部门统计的销售报告:
SELECT department, sum(value)
FROM sales
GROUP BY department
我们可以使用“GROUP BY”语句,将表中的行划分为多个存储桶。每个存储桶都有一个标签,对应一个特定部门。当所有行被划分进各自的存储桶后,我们就可以对每个存储桶中的所有行计算“sum(value)”,从而得到该部门的总销售量。
对于某些报告,我们可能只关心销售量最高的部门。这时,“ORDER BY”语句就派上用场了:
SELECT department, sum(value)
FROM sales
GROUP BY department
ORDER BY sum(value) DESC
LIMIT 10
这里我们指示查询引擎按照各部门的总销售量降序排列,并仅返回前 10 个销售量最高的部门。
最后,我们有时可能希望过滤掉异常数据。例如,我们只想在报告中包含总销售量大于 5 的部门。我们可以轻松地通过“HAVING”语句实现这一点:
SELECT department, sum(value), count(*)
FROM sales
GROUP BY department
HAVING count(*) > 5
ORDER BY sum(value) DESC
LIMIT 10
我们在查询中添加了一个新的聚合函数“count(*)”,它用于计算每个存储桶中有多少行。这直接对应于该部门的销售次数,因此我们也在“HAVING”子句中添加了一个条件,确保只保留那些行数大于 5 的存储桶。
聚合查询有一个有趣的特性:它们可以引用并不实际存在于原始数据中的列。以“sum(value)”为例:这个列是由查询引擎在运行时动态计算出来的,而像“department”这样的列则是直接从存储在 R2 上的 Parquet 文件中读取的。这个细微差别意味着,任何引用了如“sum”、“count”等聚合函数的查询,都需要分成两个阶段来处理。
第一阶段是计算新列。如果我们要使用“ORDER BY”语句按“count(*)”列对数据进行排序,或使用“HAVING”语句基于该列过滤行,我们需要知道该列的值。一旦知道“count(*)”等列的值,我们就可以继续执行查询的其余部分。
请注意,如果查询在“HAVING”或“ORDER BY”子句中没有引用聚合函数,但仍在“SELECT”子句中使用它们,我们可以使用一种技巧。由于我们直到最后才需要聚合函数的值,因此我们可以部分计算它们,并在准备向用户返回结果之前再合并结果。
这两种方法之间的关键区别在于我们何时计算聚合函数:是提前算好,以便后续做更多处理;还是按需即时计算,边处理边构建最终结果。
首先,我们来探讨“即时构建结果”的方式——我们称之为“分散-聚集聚合”(scatter-gather aggregations)。接着在此基础上,我们会介绍“洗牌式聚合”(shuffling aggregations),它支持在聚合函数之上执行额外的操作,例如“HAVING”和“ORDER BY”。
没有使用“HAVING”和“ORDER BY”子句的聚合查询能够以类似于过滤查询的方式执行。对于过滤查询,R2 SQL 会选择一个节点作为查询执行的协调节点。该节点分析查询内容,并查阅 R2 Data Catalog,以确定哪些 Parquet 行组可能包含与查询相关的数据。每一个 Parquet 行组代表一个相对较小的任务单元,可由单个计算节点处理。协调节点将任务分发给多个工作节点,收集结果并返回给用户。
为了执行聚合查询,我们遵循相同的步骤,将小任务分发给工作节点。但这一次,工作节点不仅要依据 WHERE 子句中的条件过滤行,还要计算“预聚合”(pre-aggregates)。
预聚合是聚合过程的中间状态。它是对一部分数据做部分聚合后的不完整结果。多个预聚合可以被合并,以计算出聚合函数的最终值。将聚合函数拆分为多个预聚合,使我们可以水平扩展聚合计算,充分利用 Cloudflare 网络中的庞大计算资源。
例如,“count(*)”的预聚合结果就是一个数字,代表数据子集中行的数量。计算最终的“count(*)”就像将这些数字相加一样简单。“avg(value)”的预聚合结果包含两个数字:“sum(value)”和“count(*)”。然后,可以通过将所有“sum(value)”值相加,将所有“count(*)”值相加,最后将第一个数字除以第二个数字来计算“avg(value)”的值。
当工作节点完成预聚合的计算后,会将结果流式传输给协调节点。协调节点收集所有结果,根据预聚合计算出聚合函数的最终值,并将最终结果返回给用户。
当协调节点可以通过合并来自各个工作节点的小型、部分状态来计算最终结果时,分散-聚合的方式非常高效。如果您执行类似 SELECT sum(sales) FROM orders 这样的查询,协调节点会从每个工作节点收到一个单一数值并相加。无论 R2 中存储了多少数据,协调节点的内存占用都可以忽略不计。
然而,当查询需要根据聚合结果进行排序或过滤时,这种方式就会变得低效。考虑下面这个查询——它用于找出销售额最高的前两个部门:
SELECT department, sum(sales)
FROM sales
GROUP BY department
ORDER BY sum(sales) DESC
LIMIT 2
要正确确定全局的前 2 名,需要知道整个数据集中每个部门的销售总额。由于数据在底层 Parquet 文件中是随机分布的,某个特定部门的销售记录很可能分散在许多不同的工作节点上。一个部门在每个单独的工作节点上的销售额可能都很低,因此不会进入任何本地的“前 2”列表,但在全局汇总后却可能是销售额最高的部门。
下图展示了分散-聚合方法为何不适用于此查询。“Dept A”是全球销售额冠军,但由于它的销售记录均匀分布在多个工作节点上,它没有进入某些本地的前 2 列表,最终被协调节点丢弃。
因此,当查询按全局聚合结果排序时,协调节点无法依赖来自工作节点的预筛选结果。它必须向每个工作节点请求每个部门的销售总额,以便在计算全局总数后进行排序。如果按高基数列(如 IP 地址或用户 ID)进行分组,这就会迫使协调节点接收并合并数百万行数据,从而在单个节点上造成资源瓶颈。
为了解决这个问题,我们需要引入洗牌 (shuffling)——一种在最终聚合发生之前,将特定分组的数据重新聚集到一起的方法。
为了解决数据随机分布带来的挑战,我们引入了一个洗牌阶段。工作节点不再将结果发送给协调节点,而是直接相互交换数据,根据分组键将行数据进行归类。
这种路由依赖于确定性哈希分区。当工作节点处理一行数据时,它会对 GROUP BY 列进行哈希计算,以确定目标工作节点。由于该哈希是确定性的,集群中的每个工作节点都能独立地就特定数据应发送到哪里达成一致。例如,如果“Engineering”的哈希值指向工作节点 5,那么所有工作节点都知道应将“Engineering”相关的行路由到工作节点 5。无需中央注册表。
下图展示了这一流程。注意“Dept A”最初位于工作节点 1、2 和 3 上。因为哈希函数将“Dept A”映射到工作节点 1,所有工作节点都会将这些行路由到同一个目标节点。
洗牌聚合能够产生正确的结果。然而,这种“全部对全部”(all-to-all) 的数据交换会引入时序依赖。如果工作节点 1 在工作节点 3 尚未完成发送其“Dept A”数据份额时就提前开始计算最终总额,那么结果将是不完整的。
为了解决这个问题,我们强制执行严格的同步屏障。协调节点跟踪整个集群的进度,而工作节点则缓冲它们的输出数据,并通过 gRPC 流刷新到对等节点。只有当每个工作节点确认已完成输入文件的处理并已刷新完洗牌缓冲区时,协调节点才会发出继续执行的命令。这一屏障保证在进入下一阶段时,每个工作节点上的数据集都是完整且准确的。
一旦同步屏障解除,每个工作节点都持有其被分配组的完整数据集。此时,工作节点 1 拥有“Dept A”100% 的销售记录,并能确定地计算出最终总额。
这使得我们可以将过滤、排序等计算逻辑下推到工作节点执行,而不必让协调节点承担这些负担。例如,如果查询包含 HAVING count(*) > 5,工作节点可以在聚合完成后立即过滤掉不满足该条件的分组。
在此阶段的末尾,每个工作节点会为它所负责的分组生成一个已排序的最终结果流。
最后一块拼图是协调节点。在分散-聚合模型中,协调节点需要承担对整个数据集进行聚合与排序的高开销任务。而在洗牌模型中,它的角色发生了变化。
由于工作节点已经在本地计算出了最终的聚合结果并完成了排序,协调节点只需执行一次 k 路归并 (k-way merge)。它会为每个工作节点打开一个数据流,逐行读取结果;比较每个工作节点当前行的排序值,根据排序规则选出“胜出者”,并将其加入即将返回给用户的最终查询结果中。
这种方法对于 LIMIT 查询尤其高效。如果用户请求前 10 个部门,协调节点会在归并过程中找到前 10 条记录后立即停止处理,而无需加载或归并剩余的数百万行数据。这样可以在不大量消耗计算资源的前提下,支持更大规模的操作。
随着聚合功能的加入,R2 SQL 从一个擅长过滤数据的工具,转变为能够在海量数据集上进行数据处理的强大引擎。这得益于我们实现了诸如“分散-聚合”与“洗牌”等分布式执行策略,使我们能够将计算推送到数据存储的位置,充分利用 Cloudflare 的全球计算与网络规模。
无论您是要生成报表、监控大批量日志以发现异常,还是仅仅想从数据中洞察趋势,现在都可以在 Cloudflare 开发人员平台内轻松完成这一切,而无需承担管理复杂 OLAP 基础设施的开销,也不必将数据移出 R2。
R2 SQL 的聚合功能现已可用。我们非常期待看到您使用这些新功能处理 R2 Data Catalog 中的数据。