このコンテンツは自動機械翻訳サービスによる翻訳版であり、皆さまの便宜のために提供しています。原本の英語版と異なる誤り、省略、解釈の微妙な違いが含まれる場合があります。ご不明な点がある場合は、英語版原本をご確認ください。
CloudflareのBusiness Intelligenceチームは、ペタバイト規模のデータレイクを管理し、多くの異なるソースから毎日数千のテーブルを取り込みます。これには、PostgresやClickHouseなどの内部データベースや、Salesforceのような外部SaaSアプリケーションが含まれます。これらのタスクはしばしば複雑で、テーブルは毎日何億行もの新しいデータ行があることがあります。また、製品の決定、成長計画、社内の監視にもビジネス上の重要な役割を担っています。毎日合計約1410億行が取り込まれています。
Cloudflareが成長するにつれ、データはこれまで以上に大きく複雑になりました。既存の抽出負荷変換(ELT)ソリューションでは、技術面とビジネス面での要件を満たすことができなくなりました。他の一般的なELTソリューションを評価した結果、そのパフォーマンスも概ね現行システムを上回らないという結論に至りました。
そして、当社独自の要件に対処するために独自のフレームワークを構築する必要があることが明らかになりました。そして、Jetflowが誕生したのです。
当社が実現したこと
100倍以上の効率向上(GB-S):
190億行で最も長時間実行されているジョブは、 300 GBのメモリ を使用して48時間かかっていましたが、4 GBのメモリ を使用して5.5時間で完了します
Cloudflareでは、クラウドプロバイダーが発表した料金を基に、Jetflow経由でPostgresから50TBのコストを取り込む場合、100ドル未満のコストが発生すると推定されています。
10倍以上のパフォーマンス向上:
最大のデータセットは毎秒60~80,000行を取り込んでおり、今ではデータベース接続ごとに毎秒200万~500万行になっています。
さらに、これらの数はデータベースによっては複数のデータベース接続を使っても拡張性があります。
拡張性:
モジュラー設計により、拡張やテストが容易になります。現在、JetflowはClickHouse、Postgres、Kafka、多くの異なるSaaS APIs、Google BigQuery、その他多くの企業と連携しています。新たなユースケースの追加にも柔軟に対応し続けてきました。
これを実現した方法
要件
新しいフレームワークを設計するための第一歩は、解決しようとしている問題を明確に理解し、新しいものを作るのです。
パフォーマンスと効率性
取り込みジョブによっては最大24時間かかることもあり、データは増加の一途になるため、より多くのデータをより短い時間で移動できるようにする必要がありました。データはストリーミング形式で取り込まれ、既存のソリューションよりも少ないメモリと計算リソースを使用する必要があります。
後方互換性
毎日何千ものテーブルが取り込まれることを考えると、このソリューションは必要に応じて個々のテーブルの移行を可能にする必要がありました。Sparkダウンストリームの使用と、異なるParquetスキーマのマージにおけるSparkの制限により、選択したソリューションは、レガシーと一致するために各ケースに必要な正確なスキーマを生成する柔軟性を提供する必要がありました。
また、依存関係チェックとジョブステータス情報に使用されるカスタムメタデータシステムとのシームレスな統合も必要でした。
使いやすさ
同時変更が多いリポジトリにボトルネックが発生することなく、バージョン管理できる設定ファイルが求められています。
チーム内の異なる役割のためのアクセシビリティを高めるために、もう1つの要件はノーコード(またはコードとしての構成)です。ユーザーは、ソースシステムとターゲットシステム間でデータタイプの可用性や変換を心配したり、新しい取り込みのたびに新しいコードを書く必要はありません。必要な設定も最小限に抑える必要があります。たとえば、データスキーマはソースシステムから推測できるものであり、ユーザーからの提供は必要ありません。
カスタマイズ可能
上記のノーコード要件とバランスを取るために、参入のハードルを低く抑えながら、柔軟でオプションの設定レイヤーで、必要に応じてオプションを調整し、オーバーライドできるオプションを持ちたいと考えています。例えば、Parquetファイルの書き込みは、データベースからの読み取りよりもコストが高いことが多いので、必要に応じて、より多くのリソースやコンカレンシーを割り当てることができるようにしたいのです。
さらに、私たちは別のスレッド、異なるコンテナ、または異なるマシンで同時Workerをスピンアップする機能によって、作業が実行される場所を制御できるようにしたいと考えました。Workersの実行とデータの通信はインターフェースで抽象化され、ジョブ設定によってさまざまな実装を書いて注入し、制御することができます。
テスト可能
私たちは、パイプラインのすべての段階でテストを書くことができる、コンテナ化された環境でローカルで実行できるソリューションを求めていました。「ブラックボックス」ソリューションでは、テストは変更を加えた後の出力の検証を意味することが多いですが、これはフィードバックループが遅く、すべてのコードパスの可視性が社内にないと、すべてのエッジケースをテストしないリスクがあり、デバッグが面倒になります。
柔軟な枠組みを設計する
真に柔軟なフレームワークを構築するために、パイプラインを異なるステージに分割し、設定レイヤーを作成して、これらのステージからのパイプラインのコンポジションと、あらゆる設定のオーバーライドを定義します。論理的に意味のあるパイプライン設定はすべて正しく実行されなければならず、ユーザーは機能しないパイプライン設定を作成することができないはずです。
パイプラインの構成
これが以下の意味を持つ異なるカテゴリーに応じて分類された段階を作る設計になりました。
消費者
トランスフォーマー(Transformers)
ローダー
パイプラインは、コンシューマー、0つ以上のトランスフォーマー、少なくとも1つのローダーを必要とするYAMLファイルを介して構築されました。コンシューマーは(ソースシステムからの読み取りで)データストリームを作成し、トランスフォーマー(データTransformations、検証など)は、同じAPIに準拠するデータストリームを入力/出力することで連鎖できるようにします。ローダーは同じデータストリーミングインターフェースを持ちますが、永続的な影響を持つステージ(つまり、データが保存されるステージ)です外部システムに提供することになります
このモジュラー設計は、各ステージが独立してテスト可能であり、共有動作(エラー処理やコンカレンシーなど)は共有ベースステージから受け継がれており、新たなユースケースの開発時間を大幅に短縮し、コードの正確性の信頼性を高めます。
データ分割
次に、パイプライン全体の再実行と、一時的なエラーによるデータパーティションの内部リトライの両方で、パイプラインが偽装されることを可能にするデータ内訳を設計しました。私たちは、パイプラインが再試行に必要なデータのクリーンアップを実行できるような有意義なデータ分割を維持しながら、処理を並列化できる設計を決定しました。
RunInstance:パイプラインの1回の実行に対応するビジネスユニットに対応する最も細かい分類(1か月/日/1時間のデータなど)。
パーティション:RunInstanceの分割で、外部状態なしに行データから決定論的かつ自明な方法で各行がパーティションに割り当てられるようにするため、再試行と偽装する必要がありません。(例:accountId範囲、10分間隔)
バッチ:非決定的な利用傾向で、ストリーミング/パラレル処理のためにデータをより小さなチャンクに分割し、より少ないリソースで高速処理を実現するためにのみ使用されます。(例:1万行、50MB)
ユーザーがコンシューマーステージYAMLで設定するオプションは、ソースシステムからデータを取得するために使用されるクエリを構築し、また、システムに依存しない方法でこのデータ区分の意味をエンコードし、これが何であるかを後続のステージが理解できるようにします。データが表します。例えば、このパーティションには、すべてのアカウントID 0~500のデータが含まれます。これは、標的を絞ったデータクリーンアップを行い、たとえば、エラーにより1つのデータパーティションが再試行される場合、重複するデータエントリを回避できることを意味します。
フレームワークの実装
ステージ互換性のためのStandard内部状態
最も一般的なユースケースは、データベースから読み取り、Parquetフォーマットに変換、そしてオブジェクトストレージに保存するようなもので、これらのステップはそれぞれ別個の段階となります。Jetflowにオンボードされるユースケースが増えるにつれ、誰かが新しいステージを書いた場合、それが他のステージと互換性があることを確認しなければなりませんでした。出力フォーマットやターゲットシステムごとに新しいコードを書く必要があるような状況を生み出したり、異なるユースケースごとにカスタムパイプラインを構築することになりたくありません。
この問題を解決する方法は、ステージ抽出クラスに単一形式でのデータ出力のみを許可することです。つまり、ダウンストリームのステージがこのフォーマットをサポートしている限り、入出力フォーマットでは、パイプラインの残りの部分と互換性があります。これは今にしては当たり前のことのように思えますが、当初、私たちはカスタム型システムを作成し、ステージの相互運用性に苦労していたため、社内では苦痛な学びを経験しました。
この内部フォーマットには、メモリ内の列挙データフォーマットであるArrow を使うことを選びました。この形式の主な利点は次のとおりです。
Arrowエコシステム:現在、多くのデータプロジェクトがアウトプットフォーマットとしてArrowをサポートしています。つまり、新しいデータソースのために抽出ステージを書く場合、Arrow出力を生成するのはたいてい些細なことです。
直列化のオーバーヘッドなし:これにより、最小限のオーバーヘッドで、マシンやプログラミング言語間でArrowデータを簡単に移動することができます。Jetflowは、ジョブコントローラーのインターフェイスを介して幅広いシステムで実行できる柔軟性を持つように最初から設計されており、データ転送のこの効率性により、分散実装を作成する際のパフォーマンス上の妥協が最小限に抑えられます。
メモリ割り当てを避けるために、大きな固定サイズのバッチにメモリを確保する:Goはガベージコレクション(GC)言語であり、GCサイクルタイムはオブジェクトのサイズではなくオブジェクト数によって主に影響を受けるため、ヒープオブジェクトが少なくなり、CPUに費やされる時間が減少します合計サイズが同じであっても、大幅にガベージを収集しています。GCサイクル中にスキャンし、収集するオブジェクトの数は、割り当て数に応じて増加するため、各10列を持つ8192行がある場合、Arrowは10回の割り当てを行うだけで、ほとんどのドライバーは8192回の割り当てを行うだけです。行ごとに割り当てるため、Arrowでオブジェクトをより少なく、GCサイクルタイムを短縮できます。
行を列に変換する
もう1つの重要なパフォーマンス最適化は、データの読み取りや処理時に発生するコンバージョンステップの数を減らすことでした。ほとんどのデータ取り込みのフレームワークは、内部的にはデータを行として表現します。当社の場合、データは主にParquet形式で書き込んでいます。これは列ベースです。列ベースのソース(例:ClickHouse(ほとんどのドライバーがRowBinary形式を受信します)、特定の言語実装のために行ベースのメモリ表現に変換するのは非効率的です。これをさらに行から列に変換し、Parquetファイルを書きます。これらのコンバージョンは、パフォーマンスに大きな影響を与えます。
Jetflowは、その代わりに、列ベースのソースからカラム形式(例:ClickHouse-native Blockフォーマット)でデータを読み取り、このデータをArrow列フォーマットにコピーします。解析ファイルは、矢印列から直接書き込まれます。このプロセスが簡素化されることで、パフォーマンスが向上します。
各パイプライン段階の書き込み
導入事例:ClickHouse
Jetflowの最初のバージョンをテストした時、ClickHouseのアーキテクチャ上、ClickHouseはデータを受信するよりも読み取りが速いため、追加の接続を使用してもメリットがないことがわかりました。そうすれば、より最適化されたデータベースドライバーによって、その単一の接続を活かして、追加の接続を必要とせずに、毎秒はるかに大きな行数を読み取ることが可能になるはずです。
当初、カスタムデータベースドライバはClickHouse用に書かれていましたが、最終的には優れたch-go 低レベルライブラリに切り替えられました。ch-goはカラムフォーマットでClickHouseからブロックを直接読み取るものです。これは、標準的なGoドライバーと比較してパフォーマンスに多大な効果がありました。上記のフレームワーク最適化と組み合わせて、1つのClickHouse接続で、毎秒数百万行を取り込みます。
学んだ貴重な教訓は、他のソフトウェアと同様に、利便性や一般的なユースケースのために、自分のものと一致しない可能性があるということです。ほとんどのデータベースドライバは、行の大量のバッチを読み取るために最適化されていない傾向があり、行ごとのオーバーヘッドが高いです。
導入事例:Postgres
For Postgresには、優れたjackc/pgxドライバーを使用していますが、database/sql Scanインターフェースを使用する代わりに、各行の未加工バイトを直接受信し、Postgres OID(オブジェクト識別子)タイプごとにack/pgx内部スキャン関数を使用します。 .
Goのdatabase/sql Scanインターフェースは、リフレクションを使用して関数に渡される型を理解し、リフレクションを使用して、Postgresから受け取った列の値で各フィールドを設定します。典型的なシナリオでは、これは十分に高速で使いやすいのですが、パフォーマンスの点では、今回のユースケースには不十分です。jackc/pgxドライバは、次のPostgres行が要求されるたびに生成された行バイトを再利用するため、行ごとの割り当てはゼロになります。これにより、Jetflow内で高性能で低割り当てのコードを書くことができます。この設計により、メモリ使用量を非常に低く抑えながら、ほとんどのテーブルでPostgres接続ごとに毎秒60万行近くの行を実現できます。
まとめ
2025年7月初旬、同チームはJetflow を通じて1日あたり770億 件のレコードを取り込んでいます。残りのジョブはJetflowへの移行が進んでおり、1日あたりの総取り込みレコード数は1410億レコードになります。このフレームワークにより、他の方法では不可能だったケースでテーブルを取り込むことが可能になり、また、少ない時間と少ないリソースで取り込みを実行できるため、大幅なコスト削減が実現しました。
将来的には、プロジェクトをオープンソース化する予定です。このようなツールの開発に取り組むことに興味がある方は、https://www.cloudflare.com/careers/jobs/で募集中の職種をご覧いただけます。