在處理大量資料時,快速獲取概觀是非常有幫助的——這正是 SQL 中的彙總所提供的功能。彙總,也被稱為「GROUP BY 查詢」,能提供鳥瞰式的視角,讓您能迅速從海量資料中獲得見解。
正因如此,我們非常激動地宣佈:R2 SQL 現已支援彙總功能。R2 SQL 是 Cloudflare 推出的無伺服器、分散式分析查詢引擎,能夠對儲存在 R2 Data Catalog 中的資料進行 SQL 查詢。彙總功能將協助 R2 SQL 使用者發現資料中的重要趨勢與變化、產生報告,並在記錄中找出異常。
此次發佈基於已支援的篩選查詢功能,後者是分析工作負載的基礎,允許使用者在 Apache Parquet 檔案的大量資料中找到所需的資訊。
在本文中,我們將詳細介紹彙總功能的用途和特點,然後深入探討我們如何擴展 R2 SQL 以支援在儲存在 R2 Data Catalog 中的海量資料上執行此類查詢。
彙總也稱為「GROUP BY 查詢」,可以產生底層資料的簡要匯總。
一個常見的彙總用例是產生報告。假設有一個名為「sales」的表格,其中包含某組織在各個國家/地區和部門的歷史銷售資料。我們可以使用如下彙總查詢輕鬆產生按部門統計的銷售報告:
SELECT department, sum(value)
FROM sales
GROUP BY department
我們可以使用「GROUP BY」陳述式,將表中的列劃分為多個儲存桶。每個儲存桶都有一個標籤,對應一個特定部門。當所有列被劃分進各自的儲存桶後,我們就可以對每個儲存桶中的所有列計算「sum(value)」,從而得到該部門的總銷售量。
對於某些報告,我們可能只關心銷售量最高的部門。這時,「ORDER BY」陳述式就派上用場了:
SELECT department, sum(value)
FROM sales
GROUP BY department
ORDER BY sum(value) DESC
LIMIT 10
這裡我們指示查詢引擎按照各部門的總銷售量降序排列,並僅返回前 10 個銷售量最高的部門。
最後,我們有時可能希望濾除異常資料。例如,我們只想在報告中包含總銷售量大於 5 的部門。我們可以輕鬆地透過「HAVING」陳述式實現這一點:
SELECT department, sum(value), count(*)
FROM sales
GROUP BY department
HAVING count(*) > 5
ORDER BY sum(value) DESC
LIMIT 10
我們在查詢中添加了一個新的彙總函式「count(*)」,它用於計算每個儲存桶中有多少列。這直接對應於該部門的銷售次數,因此我們也在「HAVING」陳述式中新增了一個條件,確保只保留那些列數大於 5 的儲存桶。
彙總查詢有一個有趣的特性:它們可以參照並不實際存在於原始資料中的欄。以「sum(value)」為例:這個欄是由查詢引擎在執行時動態計算出來的,而像「department」這樣的欄則是直接從儲存在 R2 上的 Parquet 檔案中讀取的。這個細微差別意味著,任何參照了如「sum」、「count」等彙總函式的查詢,都需要分成兩個階段來處理。
第一階段是計算新欄。如果我們要使用「ORDER BY」陳述式按「count(*)」欄對資料進行排序,或使用「HAVING」陳述式基於該欄篩選列,我們需要知道該欄的值。一旦知道「count(*)」等欄的值,我們就可以繼續執行查詢的其餘部分。
請注意,如果查詢在「HAVING」或「ORDER BY」子句中沒有參照彙總函式,但仍在「SELECT」子句中使用它們,我們可以使用一種技巧。由於我們直到最後才需要彙總函式的值,因此我們可以部分計算它們,並在準備向使用者傳回結果之前再合併結果。
這兩種方法之間的關鍵區別在於我們何時計算彙總函式:是提前算好,以便後續做更多處理;還是按需即時計算,邊處理邊建立最終結果。
首先,我們來探討「即時建立結果」的方式——我們稱之為「分散-聚集彙總」(scatter-gather aggregations)。接著在此基礎上,我們會介紹「洗牌式彙總」(shuffling aggregations),它支持在彙總函式之上執行額外的操作,例如「HAVING」和「ORDER BY」。
沒有使用「HAVING」和「ORDER BY」子句的彙總查詢能夠以類似於篩選查詢的方式執行。對於篩選查詢,R2 SQL 會選擇一個節點作為查詢執行的協調節點。該節點分析查詢內容,並查閱 R2 Data Catalog,以確定哪些 Parquet 列群組可能包含與查詢相關的資料。每一個 Parquet 列群組代表一個相對較小的任務單元,可由單個計算節點處理。協調節點將任務分發給多個工作節點,收集結果並傳回給使用者。
為了執行彙總查詢,我們遵循相同的步驟,將小任務分發給工作節點。但這一次,工作節點不僅要依據「WHERE」陳述式中的條件篩選列,還要計算「預彙總」(pre-aggregates)。
預彙總是彙總過程的中間狀態。它是對一部分資料做部分彙總後的不完整結果。多個預彙總可以被合併,以計算出彙總函式的最終值。將彙總函式拆分為多個預彙總,使我們可以水平擴展彙總計算,充分利用 Cloudflare 網路中的龐大計算資源。
例如,「count(*)」的預彙總結果就是一個數字,代表資料子集中列的數量。計算最終的「count(*)」就像將這些數字相加一樣簡單。「avg(value)」的預彙總結果包含兩個數字:「sum(value)」和「count(*)」。然後,可以透過將所有「sum(value)」值相加,將所有「count(*)」值相加,最後將第一個數位除以第二個數位來計算「avg(value)」的值。
當工作節點完成預彙總的計算後,會將結果資料流給協調節點。協調節點收集所有結果,根據預彙總計算出彙總函式的最終值,並將最終結果傳回給使用者。
當協調節點可以透過合併來自各個工作節點的小型、部分狀態來計算最終結果時,分散-彙總的方式非常高效。如果您執行類似 SELECT sum(sales) FROM orders 這樣的查詢,協調節點會從每個工作節點收到一個單一數值並相加。無論 R2 中儲存了多少資料,協調節點的記憶體佔用都可以忽略不計。
然而,當查詢需要根據彙總結果進行排序或篩選時,這種方式就會變得低效。考慮下面這個查詢——它用於找出銷售額最高的前兩個部門:
SELECT department, sum(sales)
FROM sales
GROUP BY department
ORDER BY sum(sales) DESC
LIMIT 2
要正確確定全域的前 2 名,需要知道整個資料集中每個部門的銷售總額。由於資料在底層 Parquet 檔案中是隨機分佈的,某個特定部門的銷售記錄很可能分散在許多不同的工作節點上。一個部門在每個單獨的工作節點上的銷售額可能都很低,因此不會進入任何本地的「前 2」清單,但在全域匯總後卻可能是銷售額最高的部門。
下圖展示了分散-彙總方法為何不適用於此查詢。「Dept A」是全球銷售額冠軍,但由於它的銷售記錄均勻分佈在多個工作節點上,它沒有進入某些本地的前 2 清單,最終被協調節點丟棄。
因此,當查詢按全域彙總結果排序時,協調節點無法依賴來自工作節點的預篩選結果。它必須向每個工作節點請求每個部門的銷售總額,以便在計算全域總數後進行排序。如果按高基數欄(如 IP 位址或使用者 ID)進行分組,這就會迫使協調節點接收並合併數百萬列資料,從而在單個節點上造成資源瓶頸。
為了解決這個問題,我們需要引入洗牌 (shuffling)——一種在最終彙總發生之前,將特定分組的資料重新聚集到一起的方法。
為了解決資料隨機分佈帶來的挑戰,我們引入了一個洗牌階段。工作節點不再將結果傳送給協調節點,而是直接相互交換資料,根據分組鍵將列資料進行歸類。
這種路由依賴於確定性雜湊分區。當工作節點處理一列資料時,它會對 GROUP BY 列進行雜湊計算,以確定目標工作節點。由於該雜湊是確定性的,叢集中的每個工作節點都能獨立地就特定資料應傳送到哪裡達成一致。例如,如果「Engineering」的雜湊值指向工作節點 5,那麼所有工作節點都知道應將「Engineering」相關的列路由到工作節點 5。無需中央註冊表。
下圖展示了這一流程。注意「Dept A」最初位於工作節點 1、2 和 3 上。因為雜湊函數將「Dept A」對應到工作節點 1,所有工作節點都會將這些列路由到同一個目的地。
洗牌彙總能夠產生正確的結果。然而,這種「全部對全部」(all-to-all) 的資料交換會引入時序依賴。如果工作節點 1 在工作節點 3 尚未完成傳送其「Dept A」資料份額時就提前開始計算最終總額,那麼結果將是不完整的。
為了解決這個問題,我們強制執行嚴格的同步屏障。協調節點追蹤整個叢集的進度,而工作節點則緩衝它們的輸出資料,並透過 gRPC 流刷新到對等節點。只有當每個工作節點確認已完成輸入檔案的處理並已刷新完洗牌緩衝區時,協調節點才會發出繼續執行的命令。這一屏障保證在進入下一階段時,每個工作節點上的資料集都是完整且準確的。
一旦同步屏障解除,每個工作節點都持有其被指派群組的完整資料集。此時,工作節點 1 擁有「Dept A」100% 的銷售記錄,並能確定地計算出最終總額。
這使得我們可以將篩選、排序等計算邏輯下推到工作節點執行,而不必讓協調節點承擔這些負擔。例如,如果查詢包含 HAVING count(*) > 5,工作節點可以在彙總完成後立即濾除不滿足該條件的群組。
在此階段的末尾,每個工作節點會為它所負責的群組產生一個已排序的最終結果流。
最後一塊拼圖是協調節點。在分散-彙總模型中,協調節點需要承擔對整個資料集進行彙總與排序的高開銷任務。而在洗牌模型中,它的角色發生了變化。
由於工作節點已經在本地計算出了最終的彙總結果並完成了排序,協調節點只需執行一次 k 路歸併 (k-way merge)。它會為每個工作節點打開一個資料流程,逐列讀取結果;比較每個工作節點當前列的排序值,根據排序規則選出「勝出者」,並將其加入即將傳回給使用者的最終查詢結果中。
這種方法對於 LIMIT 查詢尤其高效。如果使用者請求前 10 個部門,協調節點會在歸併過程中找到前 10 條記錄後立即停止處理,而無需載入或歸併剩餘的數百萬行資料。這樣可以在不大量消耗計算資源的前提下,支援更大規模的操作。
隨著彙總功能的加入,R2 SQL 從一個擅長篩選資料的工具,轉變為能夠在海量資料集上進行資料處理的強大引擎。這得益于我們實現了諸如「分散-彙總」與「洗牌」等分散式執行策略,使我們能夠將計算推送到資料儲存的位置,充分利用 Cloudflare 的全球計算與網路規模。
無論您是要產生報表、監控大批量記錄以發現異常,還是僅僅想從資料中洞察趨勢,現在都可以在 Cloudflare 開發人員平台內輕鬆完成這一切,而無需承擔管理複雜 OLAP 基礎架構的開銷,也不必將資料移出 R2。
R2 SQL 的彙總功能現已可用。我們非常期待看到您使用這些新功能處理 R2 Data Catalog 中的資料。