• Home
  • コーポレートブログ Geniee’s BLOG
コーポレートブログ

Geniee’s BLOG

ジーニーは最先端の広告テクノロジーで
顧客の収益を最大化します。

はじめに

GENIEE インフラチーム片岡です。2019年に新卒として入社し、最初の二年間は DSP のフロントチームで管理画面の開発をしていましたが、2021年中頃からチームを異動し、今はインフラ寄りのお仕事をしています。
そのころから、DSP のレポート集計基盤を刷新するプロジェクトが動いており、なんやかんやあって遂に数ヶ月前にリプレイスが終わりました。今回はこれについてお話をしたいと思います。

片岡 崇史/高知工科大学大学院を卒業後 2019 年に新卒入社。R&D アドプラットフォームサプライサイド開発部 DevOps チーム所属。

レポート集計基盤について

弊社では、オープンソースの列指向 DBMS のひとつである ClickHouse を使ってレポートデータを閲覧できるようにしています。
過去の ClickHouse の利用については以下をご覧ください。

レポート集計基盤は、弊社の DSP から配信された広告の成果ログから成果を集計し、最終的に ClickHouse のレポートテーブルに結果を入れます。

旧レポート集計基盤

旧レポート集計基盤は以下のような流れになっていました(図1)。ログを出力するサーバは、ログを Logstash によって Kafka に転送します(図1①)。Flink は Kafka からログを読んで集計処理を行います(図1②)。Flink アプリケーションの処理は大きく分けて、重複排除を行うステップ、数値を集約するステップ、ClickHouse のレポート DB に格納するデータの形に変換するステップがあります。各ステップの処理結果は kafka に転送され、次のステップは先のステップの結果を Kafka から読んで処理を行います。最後に ClickHouse は集計結果を Kafka から読み、レポートテーブルに格納します(図1③)。

↑図1

旧レポート集計基盤の辛かったところ

旧レポート集計基盤では主に以下が問題になっていました。

  1. 成果ログを使った調査が面倒
  2. Flink を運用保守できる人が少ない(再集計が必要になったときの作業が面倒)
  3. Logstash が何故か安定しない

運用上、生の成果ログを使った調査を行いたいことがしばしばあります。しかし、ログファイルは大きいので調査対象の期間が長くなると検索するだけで何十分と時間がかかることもあります。
2つ目の問題として、Flink アプリケーションの保守&運用が難しく、誰も触りたがらないものとなってしまっていてなんとかしなければいけません。
また、ログパイプライン上で何か問題が起きて流れているログが一部欠損したような場合は、その時間帯のログからレポートを再集計する必要がありますが、そのときの手順も複雑でかなり面倒なものとなっていました。この手順のミスで再び再集計作業が必要になることもありました。
ログパイプライン上で問題が起きやすかったのは、ログ転送エージェントとして利用していた Logstash です。突然ログの転送が停止し、「よくわからないが再起動したらなおったのでヨシ」ということもしばしばありました。バージョンアップやチューニング、Logstash のソースコード調査などを行いましたが、結局解決されず原因は謎のままです。

方針と期待する効果

新しいレポート集計基盤の方針としては以下のようになりました。

  1. Flink は撤廃し、成果ログをそのまま ClickHouse に流し、ClickHouse 上で集計を行う
    a. ClickHouse に成果ログを保持するテーブルを設けることで、成果ログの調査 が簡単になる
    b. Kafka のストレージ容量・通信量が削減される
  2. ログ転送エージェントを Logstash から td-agent-bit に変える
  3. ログ形式を JSON に変更する

まず Flink は撤廃することにします(さようなら)。代わりに成果ログの処理を行うのは ClickHouse で行うことにしました。弊社ではこれまで ClickHouse をレポート閲覧・分析用の DB としてのみ利用していたので新しい使い方にはなりますが、先輩方のこれまでの知見もありこれ自体に大きな問題もなく実現することができました。使い方としては基本的に普通の SQL なのでチームの人員に入れ替わりがあっても対応できそうです。
これに伴い、生の成果ログを ClickHouse に流すことになるわけですが、この成果ログを ClickHouse のログテーブルとして一定期間保持するようにすることにしました。これによって、その保持期間の間は調査に必要なログを SQL を使って取得することができます。成果ログを頻繁に調査する人にとっては結構嬉しい改善です。
先に述べたように、旧集計基盤の Flink アプリケーションは、各ステップの処理結果のデータが Flink と Kafka の間で往復していました。今回の変更はこのやりとりを無くすことになるので、Kafka のストレージ容量と通信量が大きく削減されます。
ログ転送エージェントの Logstash が安定しない問題の対策として、これを td-agent-bit (fluent-bit) に置き換えることにしました。弊社ではログ転送エージェントに td-agent (fluentd) を使っている部分が多いですが、以下の理由で td-agent-bit を選択しました。

  • Kafka にログ転送を行うにあたり、td-agent-bit は librdkafka のバージョンが新しいものが使われていてかつ細かい設定が可能である
  • CPU 使用率が大きく減少した。遅延が減ると期待できる

また、元々のログ形式は LTSV でしたが、これを JSON にすることにしました。多少ログのサイズが大きくなることが予想されましたが、代わりにログを扱いやすくすることを目指しました。

新しいログパイプラインとレポート集計系

以上を踏まえて設計すると以下のようになりました。(図2)

↑図2Kafka にログを流すところまでは、ログ転送エージェントが変わったこと以外は基本的に同じです。
上で述べたように、生ログの処理も ClickHouse で行うのですが、これまでレポート閲覧に使っていた ClickHouse クラスタ(以下、閲覧用クラスタと呼ぶ)とは別に、新しくログ処理のための ClickHouse クラスタ(以下、ログ処理用クラスタと呼ぶ)を作りました。Kafka からログを読むのはこのログ処理用クラスタのみになります(図2②)。このクラスタは Kafka から読んだログをパースして一定期間保持します。また、パース済みログを ClickHouse の Materialized View を利用して結果を閲覧用クラスタのレポートテーブルに挿入します(図2③)。このレポートテーブルはテーブルエンジンに SummingMergeTree を使っており、ここで数値が集計されます(後にもう少し詳しく説明します)。このようにクラスタを分けることにより、多くのレポート分析クエリが走って負荷が大きくなってもログ処理系に影響が出ないようにしました(その逆も同様)。
旧レポート集計系の良くなかったところとして、広告配信コストの計算のようなビジネスロジックの一部を Flink で行っていたということがあります。基本的にビジネスロジックは、ログを作るアプリケーション側で処理をして結果をログに落とすという形を取っていたので、ビジネスロジックを処理する場所が分散していました。そのため、今回 Flink に残っているビジネスロジックは全てログを作るアプリケーション側に寄せる変更を行いました。これによって、ClickHouse の集計はログに書いてある値を集約するだけで可能になりました。

ClickHouse 上でのレポート集計に使った機能

簡単にですが、ClickHouse 上でのレポート集計を支える機能の一部をご紹介します。

Kafka table engine

Kafka からログを読むのは Kafka table engine を利用しています。このテーブルを select することで Kafka のデータを consume して読むことができます。consume するので同じデータが読めるのは一度だけで、実際には以下で説明する Materialized View を使ってデータを読みます。

Materialized View

一般的な DB の Materialized View は、クエリ結果を期限付きでキャッシュする形で動作しますが、ClickHouse はそうではありません。ClickHouse の Materialized View は参照しているテーブルにレコードが挿入されたのをトリガーとして、そのレコードのみに対して処理を行い、結果をその Materialized View または指定する別テーブルに挿入します(別テーブルに挿入した結果はもちろん消えることはありません)。差分に対してのみ処理が行われるため高速に処理してくれます。

新レポート集計基盤では主に以下の用途で Materialized View を使っています。

  1. 生ログテーブルを読み、パースしてパース済みのログテーブルに挿入する
  2. パース済みのログテーブルを読み、レポートテーブル(SummingMergeTree table engine)に挿入する

1 では JSON 形式のログをパースして、必要なプロパティをカラムにしてログテーブルに挿入しています。
最終的なレポートテーブルは SummingMergeTree table engine を使っています。SummingMergeTree table engine は、挿入されたレコードを、テーブル定義の中で明示的に指定した特定のカラム(数値の型)のみ足し上げてくれます。2 の Materialized View が各成果情報を SummingMergeTree のレポートテーブルに挿入することによって、レポートテーブル上で必要な各種広告配信の指標(インプレッション数やクリック数、その他配信費など)が足し上げられ、結果が閲覧できます。

JSON を扱うための関数群

ClickHouse では JSON を扱うための SQL の関数が用意されており、我々はこれを利用して JSON のパースを行っています。C++ や simdjson を使って実装されているため、大量のログも高速に処理できています。

改善されたこと

当初の狙いだった以下のことについては達成することができました。

  1. 成果ログの調査が楽になった
  2. レポート集計基盤が安定し、再集計が必要になることがなくなった
  3. Flink を完全撤廃することができた(以前より運用しやすい状態になった)

上記以外では、Kafka の転送量が減ったことによって Kafka のサーバ台数も減らすことができました。Kafka サーバは約半数減、Flink サーバ全台撤廃、ClickHouse の集計用クラスタに数台のサーバを追加となり、全体のサーバの増減としては 10 台以上のサーバ減となりました。
また、ログが落ちてからレポートに反映されるまでの遅延が大幅に短くなりました。旧レポート集計基盤ではピークタイムに約 30 分の遅延がありましたが、新レポート集計基盤では基本的に 2 分以内に収まっています。

おわりに

レポート集計基盤の刷新した件について、新旧の違いと改善できたこと等を説明しました。と、まあ全てが上手くいったような書き方をしてきましたが、間で多々問題があったので着手から完了までかなり時間がかかり、関係各所にはご迷惑をおかけしました。今回は多くの勉強・反省することがあったので、今後これを活かして頑張っていきたいと思います。

ClickHouseのテーブル構成を考え直してみた


今回が二度目の投稿になります。ClickHouse Meetup TokyoでジーニーのLT(Lightning Talk)を担当しました、R&D本部 元 基盤技術開発部の犬伏です。(現所属はマーケティングテクノロジー開発部)

記事その1: 「ClickHouse Meetup Tokyo」イベントレポート

記事その2: 「ClickHouse Meetup Tokyo」エンジニアレポート

LT担当者の視点からイベントを振り返りました。

今回の記事:「ClickHouseのテーブル構成を考え直してみた」

Meetupの後、過去に弊社で作成した構成をどのように修正したかを概説します。


目次

  • はじめに
  • 2017年に本ブログで紹介した内容の振り返り
    • 紹介した構造の説明
    • 紹介した構造の問題点
  • 新規に構築した構造の決定過程
    • 複数あり得たPARTITION構造
    • 頻出クエリ抽出
    • 性能評価の結果
    • 性能評価結果についての考察
  • 新規に構築した構造の詳細
  • 新規に構築した構造への移行手順
  • おわりに
  • おまけ
    • text_logについて

はじめに

昨年の ClickHouse Meetup からはだいぶ時間が経ってしまいました。部署異動に伴う業務のキャッチアップ、研修やBootcampなどが重なり、3月中としていた本記事の公開がとても遅れてしまい、申し訳ありませんでした。

前回の記事の公表後、ClickHouseのドキュメントに日本語版が(多くがまだ機械翻訳ではありますが)できたり、Changelogが整備されたり、昨年のMeetup内容にも触れたQiitaに記事が投稿されるなど、じわじわと日本でも広がりを見せているかと思われます。(ほんまか?)

2017年に弊社ブログで紹介した内容の振り返り

紹介した構造の説明

まずは、過去に弊社で作成した構成がどのようなものであったかを振り返ります。

テーブル構造:

  • 1日単位のデータを保持する日毎テーブル ( records_YYYYMMDD = SummingMergeTree(…) ) を作る
  • 1日単位のテーブルをまとめた全期間テーブル ( merge_recordes = Merge(…, ‘^records_[0-9]{8}$’) ) を作る

集計値の更新:

  1. 定期的に一時的な日毎テーブル( tmp_records_YYYYMMDD = SummingMergeTree(…) )を作り、ここへINSERTして新しい日毎テーブルを構築
  2. 新しい日毎テーブルの数値の妥当性を検証し、怪しければ警告を発して中断
  3. RENAME TABLEにより対象の日毎テーブルを差し替える

紹介した構造の問題点

弊社では現在、この構造にレプリケーションを加えた構造のクラスター「V2」と、これにさらにシャーディングを追加してデータ量も増やしたクラスター「V3」の2つのClickHouseクラスターがあります。

V2、V3それぞれの場合について、どのような問題が、どの程度の深刻度で存在していたかを説明します。

  • P1. 全期間テーブルへのクエリは大量の資源を消費し、かつ時間もかかる
    • 全期間テーブルは、V2ではマージテーブル、V3では分散テーブルのマージテーブルという構成であるが、それぞれが性質の異なった問題を生じる。
    • マージテーブル(V2): マージテーブルは内部的にはマッチするテーブル全てに対してクエリを発行する。例えば3年分のデータがDBに入っていれば、内部的には1000を超えるクエリが発行される。マージテーブルへのクエリの結果はこれらすべての内部クエリに対する結果を得るまで返されないので、最悪ケースに引っ張られてトータルでは時間もかかってしまう。特に日付横断的でフルスキャンに近いようなクエリの実行ではメモリ不足に陥ってしまう場合がある。
    • マージテーブル+分散テーブル(V3): マージテーブルの配下に分散テーブルが存在する場合、上記に加えてもっと厄介な問題が発生する。ClickHouseでは分散テーブルへのクエリに対しては他のシャードからデータを取得する必要があるが、マージテーブルによって N 個の内部クエリが発生した後、さらにシャードの数 S に対し、 S-1 個のコネクションが生成され、これと同じ数のクエリが外部シャードに対して発行される。デフォルト値は 1024 ( 根拠はこちら ) なので、年単位のデータを入れていけばすぐに限界が訪れる。また、 ClickHouse v19.7.3.9 時点ではこのようなクエリを投入すると再起動以外では ( KILL QUERY でも ) 停止できないクエリが生じてしまうという問題があった。 ( 注: 実験は行っていないが、この問題はこの修正で解消しているかもしれない。 )
  • P2. 大量の Replicated テーブルは ZooKeeper XID を高速に消費し XID overflow エラーを頻発させる
    • Replicated テーブルは、毎分ZooKeeperに対してリクエストを行うが、リクエストの度に順序一貫性を保つための XID をインクリメントする。これは符号付き32bit整数であるが、これは数千のテーブルに対しては一週間持つか持たないかという程度の猶予であり、また XID overflow となった場合、一時的にではあるが ZooKeeper とのセッションが断絶する。これは次のような問題を生じている
      • 一時期はより重篤な不具合があり、一部のテーブルが readonly になってしまうという状況もあったが、 ClickHouse v19.7.3.9 では改善されている。
      • また DROP TABLE などがアトミックな処理になっておらず、途中のZooKeeperリクエストが失敗すると中途半端な状態で放置される場合があったが、こちらも改善される見込みである。
    • このように、セッション断絶による致命的な不具合は既に修正されているものの、当時のバージョンにおいては INSERT や CREATE TABLE が中途半端に失敗することはあまり好ましい状況とは言えないため、弊社ではやむなく clickhouse-server を定期的に再起動し XID をリセットすることで対処しているが、これもやはり一時的とは言え部分的には可用性・耐障害性が低下しているため、望ましくない。
  • P3. RENAME TABLE の操作が遅延する場合がある
    • 置き換え対象のテーブルに対して実行中のSELECTがあると、ロックが解除されるまで実行されない。そのため、時間のかかるクエリがロックを開放しなければ、データの反映が遅延してしまう状況がある。
    • RENAME TABLE によってデータを更新するのは「V2」のみであるため、この問題については本記事では扱わない。

P1. によるメモリ消費は、仮に1日分のデータを集計するクエリであっても、インデックス ( 詳細はこちら ) が効かないような条件を指定すれば、その列はすべてメモリ上に展開されるため、全期間分の条件列がメモリ上に展開され、大きなメモリ消費に繋がります。 ( これは日付範囲で絞っているつもりのクエリでも、書き方が良くなければ同じ状況が発生します。これについては後述します。 )

単にこの問題に対処するだけであれば、1日分の結果を得る場合に限り、全期間テーブルではなく日毎テーブルを参照すれば良いのですが、期間が1日ではなく1週間や1月といった単位になってしまうと対応できなくなる他 ( 注: merge table function を使うことでこの問題には対処できるが、この機能を使うためには、 readonly=2 以上の権限が必要となる。この権限を持っていると、 max_memory_usage の変更など、管理者側が望まない設定変更も行うことができてしまうため、望ましくない。 ) 、「V3」では時間軸が一次元ではない(後述します)ために、この対策も良い解決策とはなりませんでした。

また P2. によるサーバー再起動は、タイミングを制御して行っているためクラスター全体としての可用性は保っているもの、再起動の時間帯にはそのサーバーで実行中のクエリが失敗してしまうため、クエリ単位では失敗してしまう確率を上げてしまっていました。

そこで、クラスターの安定性を向上させるとともに、あわよくば応答速度も高速化しようという目的で、抜本的にテーブルの構成を変更することとなりました。

新規に構築した構造の決定過程

新規にテーブル構造を設計するにあたり、これまで登場していなかったパーティションという概念が必要になるので、先にこれを説明します。

次に、 Partitioning key をどのように選択したかを順を追って説明します。Partitioning key は通常データの取り回し方から決定されるもので、ClickHouseでは月単位でのパーティショニングが推奨されていると言えるでしょう (注: 明言されているドキュメントがあったか定かではないが、想定する最大のパーティション数が1000-10000とされているほか、多くの例示において toYYYYMM が用いられている) 。

しかし私たちは、とある理由でパフォーマンス上の問題も考慮する必要がありました。この経緯や詳細については「複数あり得たPARTITION構造」で述べます。

次に私たちは、最終的なテーブル構造を選定するため、実際によく叩かれているクエリを集め、それらの典型的な集計期間を定めて、それぞれの構造に対してパフォーマンス計測を行いました。この手順の詳細及び結果、考察については「頻出クエリ抽出」及び「性能評価の結果」、「性能評価結果についての考察」で述べます。

複数あり得たPARTITION構造

私たちのClickHouseテーブルには、キーとなる日時 DateTime 型のカラムが2つあります。やや込み入った内容になるので、一旦進みたい方は、「統一時刻」と「現地時刻」という意味の違う2種類の日付が必要になった、とご理解ください。

★☆★ 込み入った説明ここから ★☆★

前提知識:

  1. ClickHouseの DateTime 型は時刻をUNIXタイムスタンプで保持する。
  2. UNIXタイムスタンプにはタイムゾーン情報は含まれない。UNIXタイムスタンプとタイムゾーンの組み合わせで、はじめて「ある国・地域のタイムゾーンでの時刻」を表現できる。

一つは正当なUNIXタイムスタンプを記録するカラムで、従ってすべての行が同じタイムゾーンで記録されるものです ( 説明のため「統一時刻」と呼びます ) 。

もう一つがトリッキーで、海外での配信成果の集計をその国・地域でのタイムゾーンに沿って行うために、「タイムゾーンをUTCとして取得すると現地での時刻を得られるUNIXタイムスタンプ」を記録するカラムです ( 説明のため「現地時刻」と呼びます ) 。例えば日本時間 (JST, UTC+9) であれば、通常のUNIXタイムスタンプに9時間分、 32400 (秒) を加えたものを記録します。こうしておくことで、UTCとして取得したときに、9時間進んだ時刻を取得できます。 ( 注: 本当は行ごとにタイムゾーン情報を保持できれば最も望ましかったのですが、そういった機能はなかった上、たとえあったとしても「現地時刻」の取得で計算が必要になるため、「統一時刻」と同等のクエリ性能を発揮しにくいため、 DateTime 型で2種類の時刻を保持するのが最も高速に要求を実現できると判断され、このような構成となりました。 )

★☆★ 込み入った説明ここまで ★☆★

私たちは日本時間での時間 (hour) 単位で集計してデータベースへの投入を行っていたため、再集計のことだけを考えれば、日本時間での日単位でパーティショニングしてしまえばよいのですが、その後に行われるバッチ類のSELECTクエリでは各国・地域のタイムゾーンでの日単位で行っているものも多く、これらについては日本時間での日単位でのパーティショニングのみであると、パーティションの中からは該当するものを発見できなくなり、すべてのパーティションのインデックスを参照しに行くことになってしまいます。

このオーバーヘッドがどの程度かを見積もるため、私たちは日本時間の日単位のみのパーティショニングと、日本時間の日単位と現地時間の日単位の組による二次元 (注: 見た目上は二次元ではあるものの、日本時間と現地時間の差分は一定の範囲に限定されているので、パーティション数の増え方は線形オーダーです。) のパーティショニングの両方を試そうと考えました。

頻出クエリ抽出

さて、パフォーマンステストのためには妥当なベンチマークが必要です。しかしこのベンチマークの用意は当初想定した以上に厄介なものでした。

クエリをいくつか用意すれば良い、と漠然と考えていましたが、実際にどういうクエリが速いと嬉しいかとなると、単純に全件取ってくるとか、特定の1列だけを取得してくるとか、集約単位 (group by) の細かさはどうするかとか、自由度が高すぎてわからなくなってしまいました。

そこで、実際に使われているクエリを16個程度 (注: 数は勘で決めた) 集め、それらの結果がどの程度改善するかを見ることでベンチマークとする、という方針を立て、クエリを集め始めました。このテーブルを使っているチームに「こういうクエリを使っているとか、こういうクエリが早くなると嬉しいとか、教えてください」と呼びかけ、協力を求めました。

しかしここでも課題が生じました。エンジニアに質問するとたいていバッチ処理のためのクエリが返ってきますが、実際に飛んできているクエリを見てみると、ビジネス側の社員も社内のRedashを経由してそれなりの数のクエリを投げていることがわかりました。こういったクエリもできるだけベンチマークに組み込みたくなりました。

そして最終的には、クエリログ (注: 公式ドキュメント: https://clickhouse.tech/docs/en/operations/system-tables/query_log/) からよく飛んできている、あるいは資源を多く消費しているクエリを集めることにしました。

しかし、クエリを集めると言っても、通常テンプレートとなるクエリが存在し、実際に投入されるクエリはその中の文字列や IN 節の括弧内、数値など、様々な部分が変化します。この部分を同じに扱ってやらないと同じクエリとみなすことができず、回数のカウントが漏れてしまいます。そこで私たちは次のようなクエリを作り、これを調査しました。

SELECT
    replaceRegexpAll(
        replaceRegexpAll(
            replaceRegexpAll(
                replaceRegexpAll(
                    replaceRegexpAll(query, '(--.*?)?\n', ''),  -- コメント・改行除去
                    '\\s+', ' '),                               -- 連続する空白文字を同一視
                '\\s*\\d+\\s*', '0'),                           -- 数を同一視
            '0(,0)*', ' 0 '),                                   -- 数・数列を同一視
        '\'.*?\'', '\'\'') AS query_template,                   -- 文字列を同一視
    any(query) as query_sample,
    count(*) AS execution_count
FROM
    system.query_log
WHERE
    is_initial_query                -- ユーザーが直接投入したクエリを対象とする
    AND
    NOT match(query, '.*INSERT.*')  -- INSERTクエリは除外する (INSERT ... SELECT は使っていない)
GROUP BY
    query_template
ORDER BY
    execution_count DESC
LIMIT 64
FORMAT Vertical

replaceRegexpAll によって、実際に実行されたクエリをそのテンプレートのようなものに変換して、この数を数えることで実行回数を調べました。

実際にはこのあと、バッチかヒトかを推測するためや使われなくなった古いクエリを除外するためにクエリの実行周期を調べたり、実行時間やメモリ消費量などの指標も加えて取得するようにするなども行いましたが、割愛します。

性能評価の結果

性能評価にあたって、テーブル構造以外に一つ注視すべき設定項目がありました。分散テーブルのマージテーブルに対しては ClickHouse v19.7.3.9 時点では誤った結果が返却されるため使えなかった、 distributed_aggregation_memory_efficient (注: 公式ドキュメント: https://clickhouse.tech/docs/en/sql-reference/statements/select/group-by/#select-group-by-in-external-memory) という設定項目です。(長いので、以下 DAME と略します。)

分散テーブルのマージテーブルという構成をやめ、パーティションを使うことにした結果、この設定項目も使えるようになったので、これを有効化した場合にそれぞれの構造でどの程度パフォーマンスが向上するかも加えて評価することにしました。

よって性能評価は以下の5種類の比較となりました。

  1. base(旧構成)
  2. single(新構成、日本時間のみのパーティショニング)
  3. single_DAME(新構成、日本時間のみのパーティショニング + DAME )
  4. twin(新構成、二次元のパーティショニング)
  5. twin_DAME(新構成、二次元のパーティショニング + DAME )

また、集めたクエリは以下の13個となりました。

  • No.3(頻度3位、現地時間で細かい単位で集約する、大量の行を返す重いクエリ)
  • No.5(頻度5位、現地時間で細かい単位で集約するが、結果行数は少ないクエリ)
  • No.7(頻度7位、現地時間で細かい単位で集約する、大量の行を返す重いクエリ)
  • No.8(頻度8位、日本時間で「直近24時間」を粗い単位で集約する軽いはずのクエリ)
  • No.9(頻度9位、日本時間で「直近28日間」を粗い単位で集約するちょい重いクエリ)
  • No.100(人間がたまに叩く、重いが速いと嬉しいクエリその0)
  • No.110(人間がたまに叩く、重いが速いと嬉しいクエリその1)
  • No.19(頻度19位、日本時間で、決められた4時間の間隔のデータを無差別に合計する監視用クエリ)
    • このクエリでは、旧構造についてのみ、日本時間での1日分のデータのみを集約していることを利用して、全期間テーブルではなくその日の日毎テーブルを直接参照するという最適化を行っている。
    • またこのクエリでは PREWHERE での最適化が行われておらず、インデックスで範囲を絞れないようになっていたため、実質的に意味のないベンチマークとなってしまった。
    • しかしながら、No.919にて最適化されたものの結果も得ているのでこちらを信用すれば良い。(呼び出し箇所は後で書き換えました。)
  • No.29(頻度29位、日本時間で、1ヶ月間のデータを荒い単位で集約するクエリ)
  • No.903(頻度3位を筆者が最適化したもの)
  • No.905(頻度5位を筆者が最適化したもの)
  • No.907(頻度7位を筆者が最適化したもの)
  • No.919(頻度19位を筆者が最適化したもの)

ベンチマークの計測方法は次のとおりです:

  • それぞれのテーブル構造に対して、若いナンバーから順に一つずつ実行していきます。これを9回繰り返します。
  • 他のクエリと重複する範囲をSELECTするなどによって有利不利が生じないよう、最初の2回の結果は捨てます。(注: この間に、データがOSによってキャッシュされることを期待します。)
  • その後の7回から、「最速値」及び「期待される値」を計算します。「期待される値」には、7回のうち遅い2回を除いた5回の平均を使います。(注: クラスターそれぞれの計算資源やネットワークなどを再現することが難しかったため、本番環境上でベンチマークを実行した都合上、運悪く他の重いクエリと同時に動作した場合に不利になってしまうのを回避しようという意図です。)
  • 実行エラー(すべてメモリ制限超過と確認済み)になった実行の実行時間及びメモリ消費量は、例外処理が面倒だったのでそれぞれ 999,999 ms , 999,999,999,999 Byte に設定しました。
  • パフォーマンス計測の際には、クラスター上でのクエリの実行についてのみ興味があるため、 FORMAT Null を指定することで結果を捨てています。
  • 結果の集計では、実行時間やメモリ消費量の統計をあとから楽に計算できるように、それぞれのクエリの先頭に定型コメントを付与した上で、あとからまとめてクエリログを参照しました。このようにしたことで、集計の計算も含めてクエリログに対するクエリ内で完結させることができました。

ベンチマーク結果は次のとおりです:

  • 左側は、大雑把に緑色が最良、薄いグレーが次点(1.2倍以内)、赤が最悪、白はその他です。中央のグラフは左側の対数プロット、右側のグラフは最良を1としたときの相対値です。
  • 上から、最速実行時間、期待実行時間、最小メモリ消費量、期待メモリ消費量です。

テーブル構造のSELECT性能比較

性能評価結果についての考察

実行速度上は、全体的に twin_DAME が多くの最善を叩き出しており、最速を逃した6つのうち4つはDAMEでないほうに僅差で負けているのみで、残った7番は全体的に僅差であり、19番はもともと勝ち目のない比較を行っているため考慮する必要がありません。

次にメモリ消費ですが、横並びから19番、905番、919番にてやや twin 系が多くのメモリを消費する傾向が見て取れます。

このことから、テーブル構造としては二次元の構造を採用することとしました。

ここからは蛇足ですが、No.3の旧構造では結局全ての実行がメモリエラーで落ちてしまいました。一応本番で何度かは成功しているはずのクエリなので、定常的なメモリ負荷があったのかもしれません。

またNo.3からNo.9までは無視できない確率で(少なくとも9回に1回以上は)メモリ不足で失敗してしまうことがありましたが、クエリを書き直すことである程度症状は緩和できています。(それでもNo.905では8倍近く時間を使ってしまっていますが…)

新規に構築した構造の詳細

ということで、旧来の「分散テーブルのマージテーブル」という構造は、「パーティション付きのReplicatedSummingMergeTree」という構造に置き換えられることになりました。

DDL上は、日毎テーブルの名前から日付部分を取り去って全期間テーブルのように改め、以下の Partitioning key の指定を加えるのみです。しかし、私たちの場合は後述する移行手順のために、直接古いマージテーブルを上書きすることはせず、別に新しい名前で新しいテーブルを作り、それのみを含むマージテーブルを新しく作って古いマージテーブルと差し替える ( RENAME TABLE ) という方法を取りました。

この部分をマージテーブルではなくビューテーブルを使うことも検討しましたが、 PREWHERE が使えなくなるなどマージテーブルの場合と比べて互換性を維持できなかったために棄却されました。

結局、マージテーブルの部分がパーティションに変化しただけであるため、基本的には旧来の全期間テーブル向けにかかれているクエリであれば書き換えなく動作します。(書き換えることでさらに速くなる可能性はありますが…)

CREATE TABLE our_database.shard_reports ON CLUSTER our_cluster
ENGINE = ReplicatedSummingMergeTree(...)
-- ここから
PARTITION BY (
    toYYYYMMDD(現地時刻みたいな日時カラム, 'UTC'),
    toYYYYMMDD(普通の日時カラム, 'Asia/Tokyo')
)
-- ここまで
ORDER BY ...

新規に構築した構造への移行手順

ディスク容量に余裕があったので、新しい構造でテーブルを作り、そこへINSERTの速度をプロダクトに影響が出ない範囲に抑えつつデータを流し込むことで基本的には対応できました。

しかし、ある程度時間のたった範囲のデータについては既に確定しているので特に考慮することなく入れ直すだけで済みますが、直近のデータは常に古いテーブルのほうへ流入してくるので対応にやや苦慮しました。

また、参照しているクエリをすべて把握しているわけでもなかったため、ユーザーからは同じテーブル名のデータが差し替わることが期待されました。

そのため、一度古いテーブルへのデータの流入を止め、直近分のデータを新しいテーブルに入れてから、互換性のためにマージテーブルを差し替え、その後新しいテーブルに向けてデータの流入を再開する必要がありました。この手順を時系列で表すと次のようになります。

概略詳細・遷移条件
A: 確認用の数値を取得D, G あたりで確認するため、過去分の各列の合計等を取得

(直近分は B のあとに行うが、過去分は分量が多く実行に時間がかかってしまうので、先に行っておくとよい)

B: Flinkを停止Flinkを停止し、ClickHouseへのデータ流入を止める。

【ClickHouseへの流入が止まったら goto C】

C: バッチを停止本当は長時間かかる可能性があるクエリは全部止めたいところ、 E 向けの布石
D: 直近データ移行すべてのreplicaについてreplicationが完了していることを確認したのち、直近分のデータを旧テーブルから新テーブルにINSERT

【数値の一致を見たら goto E】

E: 全期間テーブル置き換えRENAME TABLE によって行う

【入れ替え完了が確認されたら goto F】

F: Flinkを再開新しいテーブルの方にINSERTするように変更するのを忘れない(忘れるとだいぶ面倒なことに)
G: みまもる大丈夫そうならCで止めたバッチを戻す

様子がおかしかったら臨機応変に対応

テーブル構造更新の手順

この手順により、実際には無事「臨機応変に頑張る」ことなく移行を完了することができました。

おわりに

今回は、2017年に弊社ブログで紹介した内容を振り返り、そこで紹介されていたClickHouseのテーブルについて構造上の課題を指摘し、これをより良い構造で再構築した過程を、順を追って説明しました。

特にメモリ使用量やクエリの応答速度で悩まれている方は、パーティションを用いた構造に修正されることをおすすめします。

もし「次回」があれば、ClickHouseクラスターのバージョンアップ( v19.7.3.9 to v19.16.14.65)についての記事になるかと思います。

おまけ

text_log について

text_log は ClickHouse v19.14 以降で使える機能で、 /var/log/clickhouse-server/clickhouse-server.log に出ていたClickHouseのログをデータベース上のテーブルにも保存することができます。

特に、「何時頃のこういうエラーログ」という条件さえわかっていればすばやくログにアクセスできる、どういうエラーがどういうタイミングで発生しているのかの調査など、テーブルとして存在していることによる恩恵は大きいと感じました。

ただし、 system.query_log と同様にストレージ容量を食いつぶす可能性もあるので、パーティショニングや容量の監視は常に行う必要があります。

公式ドキュメント: https://clickhouse.tech/docs/en/operations/system-tables/text_log/

設定の書き方等参考PR: https://github.com/ClickHouse/ClickHouse/pull/6164/files

Date
Author
エンジニアの視点から、様々な技術、サービス開発秘話、イベントをご紹介していきます。 ジーニーエンジニアチーム
Tag
Back to top