このコンテンツは自動機械翻訳サービスによる翻訳版であり、皆さまの便宜のために提供しています。原本の英語版と異なる誤り、省略、解釈の微妙な違いが含まれる場合があります。ご不明な点がある場合は、英語版原本をご確認ください。
大量のデータを扱うとき、簡単な概要を把握するのが役立ちます。これはまさにSQLで集約が提供するものです。「GROUP クエリ」と呼ばれる集約は全体像が示されるため、膨大な量のデータからインサイトを素早く得ることができます。
Cloudflareのサーバーレス分散型分析クエリエンジンであるR2 SQLでの集約対応を発表できることを嬉しく思います。R2 SQLは、R2データカタログに格納されたデータに対してSQLクエリを実行することができます。集約により、R2 SQLのユーザーはデータの重要な傾向や変化を見極め、レポートを作成し、ログの異常を発見することができます。
このリリースは、すでにサポートされているフィルタクエリに基づいており、分析ワークロードの基盤となり、ユーザーがApache Parquetファイルの干し草の山から針を見つけることができるようになります。
この記事では、集約の有用性と特異性を解き放ち、R2データカタログに保存された膨大な量のデータに対するクエリーの実行をサポートするために、どのようにR2SQLを拡張したかを掘り下げます。
集約(「クエリ別グループ」)で、元データの短い要約を生成します。
集約の一般的な使用例は、レポートの生成です。「売上」と呼ばれる表を想像してみてください。これには、ある企業のさまざまな国と部門の全売上の履歴データが含まれています。この集計クエリを使って、部門別の売上高に関するレポートを簡単に作成することができます。
SELECT department, sum(value)
FROM sales
GROUP BY department
「GROUP BY」文を使用することで、テーブル行をバケットに分割することができます。各バケットには、特定の部署に対応するラベルが付いています。バケットが満杯になると、各バケットの全行の「合計(value)」を計算し、対応する部門で行われる売上の総量を求めることができます。
レポートによっては、量が最も多い部門だけに関心があるかもしれません。そこで登場するのが「ORDER BY」文です。
SELECT department, sum(value)
FROM sales
GROUP BY department
ORDER BY sum(value) DESC
LIMIT 10
ここでは、すべての部門バケットを合計売上高で降順にソートし、上位10件のみを返すようにクエリーエンジンに指示しています。
最後に、異常をフィルタリングすることに関心があるかもしれません。たとえば、売上合計が5件を超える部門のみをレポートに含めたい場合があります。「HAVING」ステートメントを使うことで、簡単にそれを行うことができます。
SELECT department, sum(value), count(*)
FROM sales
GROUP BY department
HAVING count(*) > 5
ORDER BY sum(value) DESC
LIMIT 10
ここでは、クエリに新しい集計関数「count(*)」を追加しています。これは、各バケットの最終行数を計算するものです。これは、各部門の売上数に直接対応するため、「HAVING」ステートメントに予測条件を追加し、5行以上のバケットのみを残りするようにしました。
集約には2つのアプローチがあります:早い段階での計算
集計クエリには、どこにも保存されていない列を参照することができるという興味深い特性があります。「sum(value)」について考えてみましょう。このカラムは、R2に保存されたParquetファイルから取得される「部署」列とは異なり、クエリーエンジンによってその場で計算されます。この微妙な違いは、「sum」、「count」などの集計を参照するクエリは、2つのフェーズに分割する必要があることを意味します。
最初のフェーズは、新しい列を計算します。「ORDER BY」文を使って「count(*)」列でデータをソートしたり、「HAVING」文を使って行をフィルタリングする場合は、この列の値を知る必要があります。「count(*)」などの列の値が把握できたら、残りのクエリ実行に移ることができます。
クエリがHAVINGまたはORDER BY”で集約関数を参照せず、“SELECT”でそれらを使用している場合、騙すことができることに注意してください。集計関数の値は最後まで必要ないため、ユーザーに返す前に、それらを部分的に計算し、結果を統合することができます。
2つのアプローチの重要な違いは、集約関数を事前に、後で追加の計算を行う場合、その場で、ユーザーが必要とする結果を反復的に構築することもできるのです。
まず、その場で結果を構築していきます。これは「スキャッター集約」と呼ばれるテクニックです。そしてその上に、集計関数の上で「HAVING」や「ORDER BY」などの追加の計算を実行できる「シャッフル集約」を導入します。
「HAVING」と「ORDER BY」を使用しない集約クエリは、フィルタクエリに似た方法で実行できます。フィルタクエリの場合、R2 SQLはクエリ実行のコーディネーターとなるノードを1つ選択します。このノードはクエリを分析し、R2データカタログを参照して、どのParquet行グループにクエリに関連するデータが含まれているかを把握します。Parquetの各行グループは、単一のコンピューティングノードが処理できる比較的小さな作業を表します。コーディネーターノードは多くのWorkerノードに作業を分散し、結果を収集してユーザーに返します。
集約されたクエリを実行するために、すべて同じ手順に従い、Workerノード間で小さな作業を分散します。しかし今回は、「WHERE」ステートメントの前提条件に基づいて行をフィルタリングするだけでなく、Workerノードは事前集約も計算します。
プリ集約は、集約の中間状態を表します。これは、データのサブセット上で部分的に計算された集計関数を表すデータの不完全な断片です。複数の事前集約を結合して、集約関数の最終値を計算することができます。集約関数を事前集約に分割することで、集約の計算を水平にスケールすることができ、Cloudflareのネットワークにある膨大な計算リソースを活用することができます。
例えば、「count(*)」の事前集計は、データのサブセットの行数を表す数値です。最終的な「count(*)」の計算は、これらの数字を加算するのと同じくらい簡単です。「avg(値)」の事前集計は、「合計(値)」と「カウント(*)」の2つの数値で構成されています。「avg(value)」の値は、すべての「sum(value)」値を足し合わせ、すべての「count(*)」値を足し合わせ、最後に1つの数字を他の数字で割ることによって計算できます。
ワーカーノードは事前集計の計算を終えると、結果をコーディネーターノードにストリーミングします。コーディネーターノードは、すべての結果を収集し、事前集約から集計関数の最終値を計算し、結果をユーザーに返します。
スキャッターギャンブルは、コーディネーターがWorkerから小さな部分的な状態をマージすることで最終結果を計算できる場合、非常に効率的です。SELECT sum(sales) FROM ordersのようなクエリーを実行すると、コーディネーターは各workerから単一の数字を受け取り、それらを合計します。R2に存在するデータの量に関係なく、コーディネーターのメモリフットプリントは無視できます。
しかし、クエリが集計の結果に基づいてソートやフィルタリングを必要とする場合、この方法は非効率になります。売上高の上位2つの部門を見つけるクエリーについて考えてみましょう。
SELECT department, sum(sales)
FROM sales
GROUP BY department
ORDER BY sum(sales) DESC
LIMIT 2
グローバルトップ2を正しく判断するには、データセット全体の各部門の合計売上を知る必要があります。データは基盤となるParquetファイル全体にランダムに効果的に分散されるため、特定の部門の売上が多くの異なるWorkerに分割される可能性があります。同じ部署は、個々の労働者の売上が低い場合があり、ローカルトップ2リストから除外すれば、それらの労働者の合計売上がグローバルでは最高売上になります。
次の図は、このクエリに対して分散型収集アプローチが機能しないことを示しています。「Dept A」はグローバルなセールスリーダーですが、売上が従業員全体に均等に行き渡るため、ローカルのトップ2リストにランクインせず、コーディネーターから破棄されてしまいます。
したがって、クエリのグローバル集計による順序の結果がある場合、コーディネーターはWorkerからの事前フィルタリングされた結果に依存することはできません。分類する前に、全ワーカーから各部門の合計カウントをリクエストして、グローバル合計を計算する必要があります。IPアドレスやユーザーIDのような高カーディナリティ列でグループ化している場合、コーディネーターは何百万行もの行を取り込んでマージしなければならないため、単一のノードでリソースのボトルネックが発生します。
これを解決するためには、最終的な集計が行われる前に、特定のグループのデータを同じ場所に配置する方法であるシャッフルが必要になります。
そこで、ランダムなデータ分散の問題を解決するために、シャッフルステージを導入しました。コーディネーターに結果を送信する代わりに、Workerは互いに直接データを交換し、グループ化キーに基づいて行を同じ場所に配置します。
このルーティングは、決定論的なハッシュ分割に依存しています。Workerが行を処理する際、GROUP BY列をハッシュし、宛先Workerを識別します。このハッシュは決定性があるため、クラスター内のすべてのworkerが個別に特定のデータの送信先に同意します。「Engineering」ハッシュがWorker 5に送信された場合、すべてのWorkerは「Engineering」行をWorker 5にルーティングすることを知っています。中央レジストリは不要です。
下図は、この流れを示したものです。「Dept A」がWorkers 1、2、3上で始まることに注目してください。ハッシュ関数は「Dept A」をWorker 1にマッピングするため、すべてのWorkerはそれらの行を同じ宛先にルーティングします。
集計をシャッフルすると、正しい結果が得られます。しかし、この全面的な交換はタイミングの依存関係を生み出します。Worker 3がデータの送信を完了する前に、Worker 1が「Dept A」の最終合計を計算し始めた場合、結果は不完全になります。
これに対処するために、当社では厳格な同期バリアを適用しています。コーディネーターはクラスター全体の進行状況を追跡し、Workerは発信データをバッファリングし、gRPCストリーム経由でピアにフラッシュします。すべてのWorkerが入力ファイルの処理が完了したことを確認してからシャッフルバッファを消去すると、コーディネーターは続行するコマンドを発行します。このバリアにより、次のステージが開始された時に、各Workerのデータセットが完全かつ正確であることを保証します。
同期バリアが取り去られると、すべてのworkerは割り当てられたグループの完全なデータセットを保持するようになります。Worker 1は「Dept A」の売上記録の100%を持ち、最終的な合計を確実に計算できるようになりました。
これにより、コーディネーターに負担をかけるのではなく、フィルタリングやソートのような計算ロジックをworkerにプッシュすることができます。例えば、クエリにHAVING count(*) > 5が含まれている場合、Workerは集約直後にこの基準を満たさないグループを除外できます。
この段階の終了時に、各Workerは所有するグループに対してソートされた最終的な結果のストリームを生成します。
パズルの最後のピースは、コーディネーターです。分散型収集モデルでは、コーディネーターがデータセット全体の集計とソートという費用のかかるタスクを担当しました。シャッフルモデルでは、その役割は変わります。
Workerはすでに最終的な集計を計算し、ローカルでソートしているため、コーディネーターはk-wayマージを実行するだけで済みます。すべてのWorkerにストリームを開き、結果を行ごとに読み込んでいきます。各workerからの現在の行を比較し、ソート順に基づいて「winner」を選択し、ユーザーに送信されるクエリ結果に追加します。
このアプローチは、特にLIMITクエリに強力なアプローチとなります。ユーザーが上位10の部署を要求すると、コーディネーターは上位10の項目を見つけるまでストリームを統合し、その後すぐに処理を停止します。残りの数百万行をロードまたはマージする必要はなく、計算リソースを過剰に消費することなく、運用の規模を拡大できます。
集約の追加により、R2 SQLは、データのフィルタリングに優れたツールから、膨大なデータセットのデータ処理が可能な強力なエンジンに変貌します。これは、スクレイピング、シャッフルなどの分散型実行戦略を実装することで実現します。Cloudflareのグローバルな計算能力とネットワークの規模を使って、データがある場所にコンピューティングをプッシュすることができるのです。
レポートの作成、大量のログの異常の監視、単にデータの傾向を見極めるなど、複雑なOLAPインフラストラクチャの管理やR2からのデータ移動にかかるオーバーヘッドなしに、そのすべてをCloudflareの開発者プラットフォーム内で簡単にできるようになりました。
R2 SQLの集約のサポートは、本日よりご利用いただけます。これらの新機能を、R2 Data Catalogのデータでどのように活用するか、楽しみにしています。