株式会社クリアコード > ククログ

ククログ


fluent-plugin-elasticsearchのSnifferクラスについて

はじめに

fluent-plugin-elasticsearchはよく使われているプラグインの一つです。 このプラグインをメンテナンスするためには、Fluentdの知識だけでなく、Elasticsearchが今後どのようになっていくかも知っておく必要があります。 また、このプラグインはRed Hat社がメンテナンスしているOpenShiftのログコンポーネントの一部としても使われています。

elasticsearch-transportのSnifferクラスとは

elasticsearch-transportには定期的にクラスタの状況を監視するSnifferクラスがあります。このクラスではGET _nodes/httpというクラスタの状況を返答するAPIを叩いており、大抵の場合はこのAPIを叩いておけばElasticsearchクラスタの状況がfluent-plugin-elasticsearchが使っているelasticsearchクライアントに通知されます。 そのため、X-Packを用いない通常の使用方法では問題になりません。

k8sサービス化されたElasticsearchクラスタに接続する

k8sのサービスとはPodから生成したノードを一まとめにしたアクセス手段を提供します。k8sの世界観ではサービスのアクセス先は一定です。しかし、サービスを構成するノードの構成要素はある時は起動していますが、またある時は停止または破棄されています。このノード一つ一つにElasticsearchが立っていても通知速度よりも起動・破棄のサイクルが速ければGET _nodes/httpを使用しても欠点が目立つようになります。 そのため、k8sのサービス化されたElasticsearchクラスタには新たなSnifferクラスの実装が必要になります。

そこで、元々のSnifferクラスのhostsメソッドの実装を見てみると、以下のようになっています。

        # Retrieves the node list from the Elasticsearch's
        # [_Nodes Info API_](http://www.elasticsearch.org/guide/reference/api/admin-cluster-nodes-info/)
        # and returns a normalized Array of information suitable for passing to transport.
        #
        # Shuffles the collection before returning it when the `randomize_hosts` option is set for transport.
        #
        # @return [Array<Hash>]
        # @raise  [SnifferTimeoutError]
        #
        def hosts
          Timeout::timeout(timeout, SnifferTimeoutError) do
            nodes = transport.perform_request('GET', '_nodes/http').body

            hosts = nodes['nodes'].map do |id,info|
              if info[PROTOCOL]
                host, port = info[PROTOCOL]['publish_address'].split(':')

                { :id =>      id,
                  :name =>    info['name'],
                  :version => info['version'],
                  :host =>    host,
                  :port =>    port,
                  :roles =>   info['roles'],
                  :attributes => info['attributes'] }
              end
            end.compact

            hosts.shuffle! if transport.options[:randomize_hosts]
            hosts
          end
        end

nodes = transport.perform_request('GET', '_nodes/http').body の行でElasticsearchクラスタの情報を取りに行き、取りに行った情報から再度クラスタの情報を再構築しています。

もし、接続先のURLやIPアドレスが固定であれば、以下のようなSnifferクラスを作成し、ホスト情報を使い回す振る舞いをさせた方が良いです。

require 'elasticsearch'

class Fluent::Plugin::ElasticsearchIdempotenceSniffer < Elasticsearch::Transport::Transport::Sniffer
  def hosts
    @transport.hosts
  end
end

elasticsearchクライアントは独自Snifferを渡してそのクラスを元にクラスタ情報を再構築するようなカスタマイズをすることができます。

@sniffer = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)

これらの変更をfluent-plugin-elasticsearchで扱うには以下のようにすると独自のSnifferクラスを用いてElasticsearchクラスタとやりとりできるようになります。

     config_param :pipeline, :string, :default => nil
     config_param :with_transporter_log, :bool, :default => false
     config_param :emit_error_for_missing_id, :bool, :default => false
+    config_param :sniffer_class_name, :string, :default => nil
     config_param :content_type, :enum, list: [:"application/json", :"application/x-ndjson"], :default => :"application/js
on",
                  :deprecated => <<EOC
 elasticsearch gem v6.0.2 starts to use correct Content-Type. Please upgrade elasticserach gem and stop to use this option
.

#...
     def client
       @_es ||= begin
         adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
         transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
                                                                             options: {
                                                                               reload_connections: @reload_connections,
                                                                               reload_on_failure: @reload_on_failure,
                                                                               resurrect_after: @resurrect_after,
                                                                               retry_on_failure: 5,
@@ -287,7 +300,8 @@ EOC
                                                                               http: {
                                                                                 user: @user,
                                                                                 password: @password
-                                                                              }
+                                                                              },
+                                                                              sniffer_class: @sniffer_class,
                                                                             }), &adapter_conf)
         es = Elasticsearch::Client.new transport: transport
 

まとめ

fluent-plugin-elasticsearchのElasticsearchへのリクエストに関わるelasticsearch-transportのSnifferに関するお話を書きました。 記事と同様の働きをするパッチはfluent-plugin-elasticsearchのv2.11.5に取り込んでリリース済みです。 fluent-plugin-elasticsearchでk8sやnginxのプロキシを設置していて接続のリロード時に正常なElasticsearchクラスタの情報が取得できずに困っている場合はsniffer_class_nameの設定項目を初期値から変えてみたり、独自のSnifferクラスを定義したりしてみてください。

タグ: Fluentd
2018-08-22

fluent-plugin-elasticsearchのHTTPバックエンドを切り替えられるようにするには

はじめに

fluent-plugin-elasticsearchはよく使われているプラグインの一つです。 このプラグインをメンテナンスするためには、Fluentdの知識だけでなく、Elasticsearchが今後どのようになっていくかも知っておく必要があります。 取り掛かりとして、fluent-plugin-elasticsearchの構造をまず軽く説明します。fluent-plugin-elasticsearchのElasticsearchのAPIリクエストは自前で実装しているのではなく、elasticsearch, elasticsearch-api, elasticsearch-transportというgemに依存しています。それぞれ、ElasticsearchのRubyクライアントライブラリをカプセル化して共通のインターフェースで使用できるようにgem化したもの、APIリクエストをgem化したもの、HTTPリクエストの方式をgem化したものです。

elasticsearch-transport

この中で、今回はelasticsearch-transportについて取り上げます。elasticsearch-transportは複数のHTTPバックエンドを切り替えて使用することができます。

fluent-plugin-elasticsearchでは、これまで以下のようにHTTPリクエストのバックエンドライブラリとしてexconが固定で使われていました。

    def client
      @_es ||= begin
        excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
        adapter_conf = lambda {|f| f.adapter :excon, excon_options } # f.adaptorに ':excon' 固定
        transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
                                                                            options: {
                                                                              reload_connections: @reload_connections,
                                                                               reload_on_failure: @reload_on_failure,
                                                                               resurrect_after: @resurrect_after,
                                                                               retry_on_failure: 5,
                                                                               logger: @transport_logger,
                                                                               transport_options: {
                                                                                 headers: { 'Content-Type' => @content_type.to_s },
                                                                                 request: { timeout: @request_timeout },
                                                                                 ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
                                                                               },
                                                                               http: {
                                                                                 user: @user,
                                                                                 password: @password
                                                                               }
                                                                             }), &adapter_conf)
      es = Elasticsearch::Client.new transport: transport
# ...

exconの問題点

exconはHTTPのバックエンドライブラリとしては申し分がないのですが、keepaliveがデフォルトで有効にならないという問題がありました。 nginxなどのproxy配下ではkeepaliveが有効でないと接続が頻繁に切れ、効率的な転送が行えないという問題が報告されました。

elasticsearch-transportでFaradyアダプターのHTTPバックエンドを切り替えられるようにする

elasticsearch-transportはいくつかのHTTPバックエンドを使用することができます。その一つがFaradayアダプターを使用するものです。 Faradayはそれ単体ではHTTPを扱う統一的なインターフェースを提供するだけですが、実際のHTTPリクエストはHTTPを扱うライブラリに担当させます。 例えば、exconを使ってHTTPリクエストを出すには以下のようにします。

require 'excon'

client = Elasticsearch::Client.new(host: 'localhost', port: '9200') do |f|
  f.response :logger
  f.adapter  :excon
end

この状態では、exconのみしかHTTPのバックエンドに使用することができません。

例えば、typhoeus を使ってHTTPリクエストを投げるようにするには、

require 'typhoeus'
require 'typhoeus/adapters/faraday'

client = Elasticsearch::Client.new(host: 'localhost', port: '9200') do |f|
  f.response :logger
  f.adapter  :typhoeus
end

のようにすると、HTTPのリクエストはTyphoeusを使って投げられるようになります。

実際のプラグインに適用する

実際のfluent-plugin-elasticsearchにHTTPバックエンドを変更できるようにしたパッチは以下の通りです。 (out_elasticsearch部分のみ示します。)

diff --git a/lib/fluent/plugin/out_elasticsearch.rb b/lib/fluent/plugin/out_elasticsearch.rb
index 42e1a16..cb5f1c0 100644
--- a/lib/fluent/plugin/out_elasticsearch.rb
+++ b/lib/fluent/plugin/out_elasticsearch.rb
@@ -107,6 +107,7 @@ elasticsearch gem v6.0.2 starts to use correct Content-Type. Please upgrade elas
 see: https://github.com/elastic/elasticsearch-ruby/pull/514
 EOC
     config_param :include_index_in_url, :bool, :default => false
+    config_param :http_backend, :enum, list: [:excon, :typhoeus], :default => :excon
 
     config_section :buffer do
       config_set_default :@type, DEFAULT_BUFFER_TYPE
@@ -128,6 +129,7 @@ EOC
       raise Fluent::ConfigError, "'tag' in chunk_keys is required." if not @chunk_key_tag
 
       @time_parser = create_time_parser
+      @backend_options = backend_options
 
       if @remove_keys
         @remove_keys = @remove_keys.split(/\s*,\s*/)
@@ -207,6 +209,18 @@ EOC
       end
     end
 
+    def backend_options
+      case @http_backend
+      when :excon
+        { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
+      when :typhoeus
+        require 'typhoeus'
+        { sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
+      end
+    rescue LoadError
+      raise Fluent::ConfigError, "You must install #{@http_backend} gem."
+    end
+
     def detect_es_major_version
       @_es_info ||= client.info
       @_es_info["version"]["number"].to_i
@@ -257,8 +271,7 @@ EOC
 
     def client
       @_es ||= begin
-        excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
-        adapter_conf = lambda {|f| f.adapter :excon, excon_options }
+        adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
         transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
                                                                             options: {
                                                                               reload_connections: @reload_connections,

前述の通り、f.adapter の部分へ切り替えたいHTTPバックエンドのgem名のシンボルを渡してあげれば良いことになります。 この記事では解説していませんが、バックエンドによってはTLSの設定方法に違いがある場合があるので新しいバックエンドを追加した際には無効なハッシュキーを渡さないように注意してください。

まとめ

fluent-plugin-elasticsearchのHTTPバックエンドをexconだけではなくtyphoeusも扱えるように改修したお話を書きました。 記事で引用したパッチはfluent-plugin-elasticsearchのv2.11.4に取り込んでリリース済みです。 fluent-plugin-elasticsearchでkeepaliveが有効にならず、困っている場合はtyphoeus gemをインストールした後、http_backend typhoeusの設定値を加えてkeepaliveが有効になるHTTPバックエンドをぜひ試してみてください。

タグ: Fluentd
2018-08-21

Fluentd UIのFluentd v1対応のロードマップ

fluentd-uiというFluentdの設定を管理できるWebアプリケーションがあります。 Fluentd v1 がリリースされる前から機能の追加やFluentdの新しい機能への対応はされていませんでした。

手を入れる前は、以下のような状態でした。

  • Rails 4.2.8
  • Fluentd v0.12
  • Vue.js v0.11.4
  • sb-admin-v2
  • filter非対応
  • label feature非対応
  • systemd非対応
  • reload非対応
  • Fluentd v1に対応していないプラグインがおすすめプラグインに載っている

2018年4月中旬から手を入れ始めて、新しいバージョンをいくつかリリースしました。

  • v0.4.5
    • 約一年ぶりのリリース
    • Rails 4.2.10に更新
    • 使用しているgemを一通り更新
    • poltergeistからheadless chromeに移行
  • v1.0.0-alpha.1
    • Rails 5.2.0に更新
    • Fluentd v1のサポートを開始し、Fluentd v0.12以前のサポートをやめた
    • Vue.js v2.5.16 に更新
    • startbootstrap-sb-admin 4.0.0 に更新
    • JavaScript まわりを sprockets から webpacker に移行 *1
  • v1.0.0-alpha.2
    • v1.0.0-aplha.1で動かない箇所があったのを修正した

今後の予定は以下の通りです。

  1. Fluent::Config::ConfigureProxy#dump_config_definitionで得られる情報を利用して、設定UIを構築する
  2. 複雑な設定を持つプラグインは個別にフォームを作成し、使いやすくする
  3. owned plugin (parserやformatterなど)の設定方法について検討し、実装する
  4. テストの修正
  5. filterサポート
  6. label featureサポート
  7. おすすめプラグインの更新
  8. issuesの対応

3まで完了したらalpha.3かbeta.1をリリースする予定です。

*1 CSSはsprocketsのまま

タグ: Fluentd
2018-06-08

fluent-plugin-elasticsearch v2.8.6/v1.13.4からのX-Pack向けの認証情報の設定方法の注意点

はじめに

fluent-plugin-elasticsearchはよく使われているプラグインの一つです。 このプラグインをメンテナンスするためには、Fluentdの知識だけでなく、Elasticsearchのエコシステムも知っておく必要があります。 Elasticsearch 5.xからnginxのリバースプロキシの認証だけでなく、Elastic Stack自体で認証をコントロールする仕組みがプラグインとして提供されています。

X-Packには、旧Shieldの機能であるBasic認証などの認証の仕組みがあります。 この認証の仕組みへの対応はElasticsearchのRubyクライアントであるelasticsearch-rubyも提供しています。

認証情報の設定方法の注意点

fluent-plugin-elasticsearchのこれまでの挙動はhost毎に認証情報を紐付ける形で設定していました。 v2.8.6/v1.13.4以降ではクラスタ毎に設定する設定ファイルの書き方が推奨されるようになります。*1 そのため、以下の設定方法は推奨されなくなることに注意してください。 X-Packを設定せずに運用している場合は設定変更の必要はありません。

hosts https://username:password@host-custom.com:443

という設定は推奨されなくなり、

hosts https://host-custom.com:443
user username
password password

という書き方が推奨されます。 前者の書き方ではElasticsearchのクラスタの情報の再読み込み時に認証情報が抜け落ちてしまうということが起きるためです。 後者の書き方では、クラスタの情報の再読み込み時に認証情報が抜け落ちてしまうということを防ぐ対策が動作するようになります。

まとめ

fluent-plugin-elasticsearch v2.8.6/v1.13.4 では、再読み込み時に接続先のElasticsearchの認証情報を保持することが可能となる対策が入っています。 X-PackでElasticsearchクラスタのBasic認証を有効化していて、リロード時に接続情報が失われて困っている運用者の方は是非バージョンアップしてみてください。

*1 これまで困っている方々が結構いたようです。例として https://github.com/uken/fluent-plugin-elasticsearch/issues/307 https://github.com/uken/fluent-plugin-elasticsearch/issues/257 などが報告されていました。

タグ: Fluentd
2018-04-10

td-agent3でGemfileベースのプラグイン管理

Fluentdのプラグイン管理はGemfileベースでやると、きちんとバージョンを管理できるのでよいです。 ドキュメントでもGemfileベースのプラグイン管理について書かれています。 しかし、記事執筆時点ではtd-agentでどのようにするのかは、書かれていませんでした。

td-agent3からsystemdに対応しているので、特に断りがない場合はsystemdについて書いています。 また、パッケージはいくつか種類がありますが主にdebパッージについて書きます。rpmの場合でも、serviceファイルなどの内容は同じなので、そのまま使える知識です。

使用するソフトウェアで特にバージョンを気にするものは以下の通りです。

  • td-agent 3.1.1
  • bundler 1.16.0
bundle install中にsudoを実行される

Fluentdは--gemfileオプションを指定すると、起動時に指定されたGemfileを使ってbundle installを実行します。 このとき、bundlerがある条件下でsudoを実行しようとしますが、td-agentを使用している場合、td-agentユーザーはsudoを使用できるようになっていないのでエラーになってしまいます。 なお、Fluentdは--gemfileオプションのみを指定すると、そのGemfileと同じディレクトリのvendor/bundleというディレクトリをBUNDLE_PATHとして使用します。

--gemfile /etc/td-agent/Gemfileのみ指定したとすると、以下のように動作します。

  1. BUNDLE_PATHは/etc/td-agent/vendor/bundle
  2. BUNDLE_PATHが存在しないので/etc/td-agentをパーミッションチェックの起点とする
  3. /etc/td-agent/build_info/* と /etc/td-agent/* がtd-agentユーザーで書き込み可能かどうかチェックする
  4. ↑で書き込み不可なファイルやディレクトリが一つでもあれば、sudoが必要と判定する

今の場合、td-agentをインストールした後、すぐにGemfileを置いてtd-agentを再起動したとすると以下のファイルやディレクトリのパーミションをチェックします。

  • /etc/td-agent
  • /etc/td-agent/td-agent.conf
  • /etc/td-agent/plugin

そのパーミッションは以下の通りで、pluginディレクトリとtd-agent.confはtd-agentユーザーから書き込みできません。

$ ls -laR /etc/td-agent/
/etc/td-agent/:
total 16
drwxr-xr-x  3 td-agent td-agent 4096 Mar  1 08:44 .
drwxr-xr-x 57 root     root     4096 Mar  5 06:14 ..
drwxr-xr-x  2 root     root     4096 Mar  1 08:43 plugin
-rw-r--r--  1 root     root     2381 Mar  1 08:44 td-agent.conf

よって、bundlerはsudoが必要だと判定してしまいます。

どのようにすればよいかは後述します。

td-agentユーザーのホームディレクトリが存在しない

これはdebパッケージ限定の問題ですが、masterでは修正済みです。 ホームディレクトリが存在しない場合、/tmp/bundler/home/vagrantのようなディレクトリをかわりに使用するため、マシンを再起動するとbundlerのキャッシュが消えます。 また、起動するたびにログにメッセージが残るのも本来着目すべきログを見つけづらくなります。

td-agent 3.1.1のメンテナスクリプトでは、以下のようにホームディレクトリ(/home/td-agent)を作成していませんでした。

adduser --group --system --no-create-home td-agent

ホームディレクトリを /var/lib/td-agent に変更して作成します。 /var/lib/td-agentにしたのはRPMに合わせるためです。また、システムユーザーは/var/lib以下にホームディレクトリを持つことが多いためです。

$ sudo usermod -d /var/lib/td-agent td-agent
$ sudo mkdir -p /var/lib/td-agent
$ sudo chown -R td-agent:td-agent /var/lib/td-agent
systemdのドロップインファイルについて

td-agentはsystemdに対応しているので、systemdの場合はtd-agentのオプションをカスタマイズするのに/etc/default/td-agent*1や/etc/sysconfig/td-agent*2は使いません。

ドロップインファイルというものを使用します。

$ sudo systemctl edit td-agent.service

を実行するとエディターが起動するので、内容を書いて保存すると/etc/systemd/system/td-agent.service.d/override.confというファイルを作成することができます。

td-agent 3.1.1 の場合は以下のようにすると、td-agentのオプションを変更することができます。

[Service]
Environment='TD_AGENT_OPTIONS=--gemfile=/etc/td-agent/Gemfile'
ExecStart=
ExecStart=/opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid $TD_AGENT_OPTIONS

masterではExecStartに$TD_AGENT_OPTIONSが追加されているので以下のようにEnvironmentだけ指定すれば十分です。

[Service]
Environment='TD_AGENT_OPTIONS=--gemfile=/etc/td-agent/Gemfile'

参考: systemd - ArchWiki

設定例
td-agent 3.1.1

td-agent 3.1.1では次のようにします。

debの場合、td-agentユーザーのホームディレクトリを作成してください。

$ sudo usermod -d /var/lib/td-agent td-agent
$ sudo mkdir -p /var/lib/td-agent
$ sudo chown -R td-agent:td-agent /var/lib/td-agent

bundlerがsudoを必要としないドロップインファイルは以下の通りです。--gem-fileオプションでGemfileを指定し、--gempathオプションでtd-agentユーザーが書き込むことができるパスを指定します。

[Service]
Environment='TD_AGENT_OPTIONS=--gemfile=/etc/td-agent/Gemfile  --gempath=/var/lib/td-agent/vendor/bundle'
ExecStart=
ExecStart=/opt/td-agent/embedded/bin/fluentd --log /var/log/td-agent/td-agent.log --daemon /var/run/td-agent/td-agent.pid $TD_AGENT_OPTIONS
td-agent の次のリリースでは

bundlerがsudoを必要としないドロップインファイルは以下の通りです。

td-agentユーザーのホームディレクトリはパッケージによって作成済みなので、ドロップインファイルで環境変数を指定すればよいだけです。 環境変数の内容はtd-agent 3.1.1と同じです。

[Service]
Environment='TD_AGENT_OPTIONS=--gemfile=/etc/td-agent/Gemfile  --gempath=/var/lib/td-agent/vendor/bundle'
トレードオフ

Gemfileでプラグインを管理する場合、td-agent起動時にbundle installを実行するため、td-agentにバンドルされているgemであってもgemパッケージをダウンロードします。 Gemfileでプラグインのバージョンを固定できるというメリットとのトレードオフです。

まとめ

td-agent3でGemfileを使ってFluentdのプラグインを管理する方法を紹介しました。 きちんと設定すればtd-agentユーザーにsudoできる権限を与えずにGemfileを使ってFluentdのプラグインを管理することができます。

*1 debの場合

*2 rpmの場合

タグ: Fluentd
2018-04-06

Fluent-plugin-elasticsearch v2.8.0対応でわかったElasticsearch 6.0以降の動向

はじめに

fluent-plugin-elasticsearchはよく使われているプラグインの一つです。 このプラグインをメンテナンスするためには、Fluentdの知識だけでなく、Elasticsearchが今後どのようになっていくかも知っておく必要があります。 Elasticsearch 6.xで次のようなアナウンスがありました。

要約すると、Elasticsearch 6.0ではインデックスに対して一つのタイプしか許可されなくなり、 Elasticsearch 7.xではインデックスに付与できるタイプは _doc のみになる、というものです。

fluent-plugin-elasticsearch v2.8.xでの対策

fluent-plugin-elasticsearchのデフォルトの挙動は、 fluentd というインデックスに対して、 fluentd という _type を付加することになっています。 しかし、問題となるパラメーターがあります。 target_type_key というパラメーターを使用している際に、 レコードの中から該当するキーが存在する場合は _type に当てはめるという動作をしていました。 このパラメーターについては deprecated 警告を出すようにしました。 また、Elsaticsearch 7.xでは Removal of Mapping Types in Elasticsearch 6.0 によると、 _type フィールドには _doc というダミーしか許可されなくなるので、type_name パラメーターについても _doc 固定となる動作になるよう変更を加えました。 まだ _doc のような _ 始まりのタイプを許可しない環境もあるため、v2.8.1では INFO レベルのログを出力する挙動に留めるようにしました。

まとめ

fluent-plugin-elasticsearch v2.8.0 では、接続先のElasticsearchのバージョンによって挙動が変わるようになります。 接続先がElasticsearch 5.xでは今までの動作のまま、6.xでは target_type_key が動作しなくなり、 7.xでは target_type_key が動作しなくなるのに加えて、 type_name パラメーターが _doc 固定になります。

タグ: Fluentd
2018-03-07

Kafka Connector for Fluentdのベンチマーク

PLAZMA OSS Day: TD Tech Talk 2018 – Cooperative works for Fluentd Community の補足記事その3です。

fluent-plugin-kafkaのアウトプットプラグインをkafka-connect-fluentdで置き換えられるようにするためには、機能だけでなく性能も重要なので性能を比較した結果をまとめます。

構成

GCP上にn1-standard-2(vCPU 2, memory 7.5GB)で構築しています。OSイメージはdebian-cloud/debian-9です。

システム構成図

  • client
    • fluent-benchmark-client が入っているホスト
  • Fluentd
    • in_forward + fluent-plugin-kafkaのoutputプラグインの入ったホスト
    • Fluentdはtd-agent3で入れた1.0.2
  • Kafka
    • Kafka + kafka-connect-fluentd が入っているホスト
  • metrics
    • Fluentd + influxdb + grafana が入っているホスト
    • Fluentdはtd-agent3で入れた1.0.2
    • influxdb + grafana はDockerイメージを使用している

ベンチマークシナリオ

今回はfluent-plugin-kafkaとkafka-connect-fluentdの比較ができればいいので、送信するレコードはfluent-benchmark-clientのデフォルトのものを使用します。 JSONで表現すると以下のものとなります。MessagePackだと50byteです。

{ "message": "Hello, Fluentd! This is a test message." }

また、スループットを比較したいのでout_kafka, out_kafka_buffered, out_kafka2の各パラメータはデフォルト値のままにします。 ただし、デフォルトだと遅すぎるため、out_kafka2に関しては buffer/flush_thread_countを3に変更しています。

kafka-connect-fluentdもworker pool sizeは1としますが、KAFKA_HEAP_OPTS="-Xmx4G -Xms4G" を指定しています。

fluent-benchmark-client のパラメータは以下の通り、5分間負荷をかけるようにしました。 --n-events-per-secに与える値を変えると、負荷を大きくしたり小さくしたりできます。

fluent-benchmark-client \
        --host=$host \
        --port=$port \
        --max-buffer-size=4g \
        --flush-interval=10 \
        --n-events-per-sec=$1 \
        --period=5m

fluentd-benchmarkにあるシナリオを参考にして、--n-events-per-secの値を決めました。

  • 1000 events/sec
  • 10000 events/sec
  • 30000 events/sec
  • 50000 events/sec

ベンチマーク結果

10000 events/sec 30000 events/sec or 50000 events/sec
out_kafka out_kafka 10k events/sec out_kafka 30k events/sec
out_kafka_buffered out_kafka_buffered 10k events/sec out_kafka_buffered 30k events/sec
out_kafka2 out_kafka2 10k events/sec out_kafka2 50k events/sec
FluentdSourceConnector FluentdSourceConnector 50k events/sec FluentdSourceConnector 50k events/sec

out_kafkaとout_kafka_bufferedは30000events/secでout_kafka2とFluentdSourceConnectorは50000events/secです。

それぞれのグラフは上から順に、CPU使用率、メモリ使用量、Kafkaが処理したイベント数(直近の1分間の平均、5分間の平均、15分間の平均)、Kafkaが処理したイベント数の合計です。

「Kafkaが処理したメッセージ数の合計=指定した秒間イベント数×5分」になっていれば、イベントはKafkaで処理できています。

送信イベント数(events/sec) 平均受信イベント数(events/sec) CPU usage 全部処理できた? 備考
out_kafka 10000 10000 40%-50%
out_kafka 30000 2万弱 90%以上 ×
out_kafka_buffered 10000 10000 0%-85%
out_kafka_buffered 30000 2万弱 100% ×
out_kafka2 10000 10000 40%-50%
out_kafka2 50000 - 100% × Fluentdのバッファがあふれた
FluentdSourceConnector 10000 10000 20%以下
FluentdSourceConnector 50000 5万弱 平均66%

fluent-plugin-kafkaよりもkafka-connect-fluentdの方がメッセージのスループットが約2.5倍高かった。 また、同じイベンント数を処理する場合でもkafka-connect-fluentdの方がCPUの使用率が低かった。

メモリー使用量については、Fluentd側が正しく測定できていなさそうなので追加の検証が必要です。

まとめ

kafka-connect-fluentdとfluent-plugin-kafkaの性能を比較して、kafka-connect-fluentdの方が速いことがわかりました。

kafka-connect-fluentdは、Kafka Connecotr APIのマルチタスクをサポートしていなかったり、fluent-plugin-kafkaと同じ形式で書き込むようにしていたりするので、まだまだ性能を向上する余地があります。

タグ: Fluentd
2018-03-01

KafkaのメトリクスをFluentdに送信するプラグインを書きました

PLAZMA OSS Day: TD Tech Talk 2018 – Cooperative works for Fluentd Community の補足記事です。

背景

kafka-connect-fluentdfluent-pluguin-kafkaの性能を比較したいと考えたときに、同じ指標で性能を比較したいと考えていました。 kafka-connect-fluentdのFluentdSourceConnectorとfluent-plugin-kafkaのoutputプラグインはどちらもKafkaに書き込むので、Kafkaのスループットを見ればよいはずです。

幸いKafkaにはメトリックスを取るためのAPIが提供されていました。

しかしKafkaに組み込みのKafkaCSVMetricsReporterは、動きませんでした*1。他にもいくつか既存のMetricsReporterを探してみましたがちょうどよいものはありませんでした。 Kafkaのメトリックスも、同じフィールドに型の異なる値が入っていることがあり、そのままでは扱いづらいものでした。

そこで、全てのメトリックスをFluentdに流せば、それ以降はFluentdのプラグインで好きなように加工できるので素早くデータを可視化するための環境を作ることができそうだと考えました。 またKafkaの提供するメトリックスは大量にあるので、kafka-connect-fluentdとfluent-plugin-kafkaの性能比較に使えるものだけを選別することも簡単にできそうだと考えていました。

kafka-fluent-metrics-reporter

kafka-fluent-metrics-reporterを作りました。

KafkaのプラグインはScalaで書かれていたり、Javaで書かれていたり、Kotlinで書かれていたり様々なので、これもKotlinで実装しました。

特に工夫したところはなく、Kafkaの提供してくれるメトリックスをmapにつめてFluentdに送るだけでした。

使い方

こちらも未リリースなので自分でビルドする必要があります。

$ git clone https://github.com/okkez/kafka-fluent-metrics-reporter.git
$ cd kafka-fluent-metrics-reporter
$ ./gradlew shadowJar
$ cp build/libs/kafka-fluent-metrics-reporter-1.0-SNAPSHOT-all.jar /path/to/kafka_2.11-1.0.0/libs

以下の設定をKafka Serverの設定ファイル server.properties に追加します。

kafka.metrics.reporters=org.fluentd.kafka.metrics.KafkaFluentMetricsReporter
kafka.metrics.polling.interval.secs=5
kafka.fluent.metrics.enabled=true
kafka.fluent.metrics.host=localhost
kafka.fluent.metrics.port=24224
kafka.fluent.metrics.tagPrefix=kafka-metrics

Fluentd側は以下のように設定します。このように設定してしばらく流してみると、大体様子がわかると思います。

<source>
  @type forward
  port 24224
</source>

<match kafka-metrics.**>
  @type copy
  <store>
    @type file
    path log/${tag}
    <buffer tag>
    </buffer>
  </store>
  <store>
    @type stdout
  </store>
</match>

タグにメトリックスの名前が入ってくるので、必要なメトリックスをタグで絞り込むことができます。

まとめ

kafka-fluent-metrics-reporterを使うことで、Fluentdを経由してKafkaのメトリックスを簡単に可視化することができました。

InfluxDBに直接流すものやPrometheus用のexporterなどもありましたが、Fluentdに流すものはなかったので作りました。

参考

他の Kafka metrics reporter の実装例です。

*1 問題は報告済みです

タグ: Fluentd
2018-02-28

Fluentd用のベンチマークツールを開発しました

PLAZMA OSS Day: TD Tech Talk 2018 – Cooperative works for Fluentd Community の補足記事です。

背景

kafka-connect-fluentdfluent-pluguin-kafkaの性能を比較したいと考えたときに、高い負荷を与える簡単に使えるツールがありませんでした。

dummerとFluentd組み込みのin_tailを使用すれば、負荷をかけることができるのはわかっていましたが、高い負荷を与えるためには複数のFluentdを動作させる必要があり、セットアップや管理が煩雑になることがわかっていました。

例えば、以下のようにたくさんのFluentdプロセスで1つのFluentdに負荷をかけるような感じになります。

dummer+in_tailのイメージ

参考: https://github.com/okumin/influent-benchmark

これはこれできちんと動きますが、以下の点で手軽ではありませんでした。

  • dummer でファイルを作る必要があるのでクライアント側にも Fluentd(in_tail + out_forward) が必須になってしまう
    • in_tailはposファイルを作るので、条件を変えながら繰り返し測定しようとすると、セットアップとクリーンアップが必要となる
  • dummerで流量制御しようとするとI/Oの速度が上限になってしまう
    • I/Oの速度を越えようとすると、上の図のように複数のFluentdプロセスを起動する必要があります

fluent-benchmark-client

そこでdummer + in_tailよりも簡単に使うことができる fluent-benchmark-client というコマンドラインツールを作りました。

Fluentd Forward Protocolを扱う部分はFluencyを採用しました。*1

Kotlinを採用した理由は以下の通りです。

  • Javaで書くのと性能は変わらない
  • Javaで書かれたライブラリーは全部使える
  • IntelliJ IDEAのサポートがある
  • Null安全を試してみたかった
  • 型推論を試したかった
  • コルーチンどうなの?というのを試したかった

初めて書くので慣れてないという以外にデメリットはありませんでした。

fluent-benchmark-clientを使うと、1つのプロセスから大量のイベントをFluentdに送信することができます。

fluent-benchmark-clientのイメージ

使い方

0.5.0がリリース済みなので使えます。

$ tar xf fluent-benchmark-client-0.5.0.tar
$ cd fluent-benchmark-client
$ ./bin/fluent-benchmark-client --max-buffer-size=4g --period=1m --n-events-per-sec=1000

これで、localhost:24224に1000 events/secを1分間送ることができます。 送る内容は、{ "message": "Hello, Fluentd! This is a test message." } です。 --max-buffer-size=4gはFluencyのバッファのサイズを指定しています。これを指定することにより大きなワークロードにも対応可能です。

送信するワークロードの内容はコマンドラインオプションで変更することができます。 また、ファイルから読むこともできるので複雑なワークロードにも対応可能です。対応しているフォーマットはLTSVJSONL*2です。 ここで利用するファイルはdummerで生成することを想定しています。

工夫したところ

初期の実装では、以下のようにビジーループの中でMap<String, Object>を作っていました。さらにemitの内部でMessagePackへの変換が行われていたため速度が出ていませんでした。 約50万 events/sec 出てたかどうかも怪しいという記録が手元に残っていました。

while (true) {
    fluency.emit(tag, mapOf("message" to "Hello Kotlin!"))
}

これを以下のように、ループの外でMessagePackに変換してからemitすると、手元のマシンで200万 events/secくらいの速度でemitできるようになりました。

val buffer = ByteArrayOutputStream()
val packer = MessagePack.newDefaultPacker(buffer)
packer.packMapHeader(1)
packer.packString("message")
packer.packString("Hello Kotlin!")
packer.flush()
packer.close()
val mapValue = buffer.toByteArray()
while (true) {
   fluency.emit(tag, mapValue)
}

ファイルからLTSVやJSONLを読み込む場合も、ファイルを全部読み込んでMessagePackに変換してからemitするようにしているので、速度は変わりません。 しかし、全データをメモリーに載せるので注意が必要です。

まだできていないこと

まだ、できていないことがいくつかあります。

  • マルチワーカー化
    • Kotlinのコルーチンへの理解が進んだら着手したいです
  • SSL/TLS対応
    • Fluencyが対応したら対応したいです

まとめ

kafka-connect-fluentdとfluent-plugin-kafkaの性能を比較するために、ちょうどよいベンチマークツールがなかったのでfluent-benchmark-clientを開発しました。

*1 軽く試した感じではロガー系のライブラリーでは最速でした

*2 主にk8sの標準のロガーで出力されたログも使用できるはずです

タグ: Fluentd
2018-02-27

Kafka ConnectのConnector開発入門

kafka-connect-fluentdを開発したので、その際に得た知見をまとめます。

Kafka Connectとは、簡単に説明するとKafka Consumersまたは、Kafka Producersの一種です。

あるデータソースからKafkaにデータを投入(Kafka Producers)したり、Kafkaから取り出したデータを別のところに流し(Kafka Consumers)たりするときにKafka ConsumersやKafka Producersでは、定型的なコードをたくさん書く必要がありました。 それを汎用化して、データを取り扱う箇所のコードのみを書けばいいようにまとめたものかKafka Connect APIです。

Connector Developer Guideを読めば、Kafka Connectorを開発する方法が書いてあります。 しかし、プロジェクトの初期設定は面倒なものです。定型コードを自動生成できるコマンドを使うと開発を始めるのが楽になります。

$ mvn -e archetype:generate -B -DarchetypeGroupId=io.confluent.maven \
      -DarchetypeArtifactId=kafka-connect-quickstart \
      -DarchetypeVersion=0.10.0.0 \
      -Dpackage=org.fluentd.kafka \
      -DgroupId=org.fluentd.kafka \
      -DartifactId=kafka-connect-fluentd \
      -Dversion=0.0.1

参考: https://github.com/jcustenborder/kafka-connect-archtype

@jcustenborderさんは、Confluentに所属し、多くのKafka Connectorを公開している方です。

これでpom.xmlやソースコードの雛形などが一通り生成されるので、開発を始めることができます。

kafka-connect-fluentdの場合は、自動生成されたpom.xmlをbuild.gradleに変換して利用しています。

Kafka Connector Source

Kafka Producerに対応するConnectorがSourceです。あるデータソースからKafkaにデータを書き込むために使用します。

kafka-connect-fluentd の場合は、Fluentdのout_forwardからkafka-connect-fluentdのSourceにデータを投げてKafkaにデータを流します。

FluentdSourceConnector

Sourceを開発する場合、以下を実装する必要があります。kafka-connect-fluentdの場合は以下のような名前にしました。

  • FluentdSourceConnector (SourceConnectorを継承)
  • FluentdSourceConnectorConfig (AbstractConfigを継承)
  • FluentdSourceTask (SourceTaskを継承)
FluentdSourceConnector

SourceConnectorを継承し、必要なメソッドを実装します。

  • public String version()
    • バージョン文字列を返します
  • public List<Map<String, String>> taskConfigs(int taskMax)
    • マルチスレッドに対応させたい場合は、taskMaxの値に応じて返り値のListMap<String,String>を詰め込みます
    • 設定ファイル(properties)に指定した値をそのままセットします
  • public void start(Map<String, String> properties)
    • Connector開始時にやることを書きます
  • public Class<? extends Task> taskClass()
    • taskクラス (FluentdSourceTaks.class) を返します
  • public void stop()
    • Connectorの終了時にやることを書きます
  • public ConfigDef config()
    • 設定の定義(FluentdSourceConnectorConfig.conf())を返します
FluentdSourceConnectorConfig

public static ConfigDef conf()ConfigDefのAPIを使って設定を定義します。

    public static ConfigDef conf() {
        return new ConfigDef()
                .define(FLUENTD_PORT, Type.INT, 24224, Importance.HIGH,
                        "Port number to listen. Default: 24224")
                .define(FLUENTD_BIND, Type.STRING, "0.0.0.0", Importance.HIGH,
                        "Bind address to listen. Default: 0.0.0.0");
    }

define()は引数が5つあり、前から順に名前、型、デフォルト値、重要度、説明です。引数の数が異なるdefine()もありますが、詳細はjavadocを参照してください。

FluentdSourceTask

Sourceを実装する際に最も重要なクラスです。以下のメソッドを実装します。

  • public void start(Map<String, String> properties)
    • poll()から定期的にアクセスされる共通のリソースを準備します。daemonやスレッドが必要な場合は、ここで起動します。
  • public List<SourceRecord> poll()
    • Kafka Connectのフレームワーク側から定期的に呼ばれます。FluentdSourceTaskではキューに溜めたデータをList<SourceRecord>に詰めて返します。
  • public void stop()
    • このタスクを止めるときに、必要な処理があれば書きます。start で起動したdaemonやスレッドをここで停止します。
  • public String version()
    • バージョン文字列を返します。定型のコードなので詳細は省略します。

このクラスは複数のスレッドから使用される可能性があります。特にpoll()でアクセスするインスタンス変数などについてはマルチスレッドを意識したコードを書く必要があります。

Kafka Connector Sink

Kafka Consumerに対応するConnectorがSinkです。Kafkaから取り出したデータをどこかに書き込みます。

kafka-connect-fluentd の場合は、Kafkaからデータを取り出してkafka-connect-fluentdのSinkからFluentdのin_forwardへデータを流します。

FluentdSinkConnector

Sinkを開発する場合、以下を実装する必要があります。kafka-connect-fluentdの場合は以下のような名前にしました。

  • FluentdSinkConnector (SinkConnectorを継承)
  • FluentdSinkConnectorConfig (AbstractConfigを継承)
  • FluentdSinkTask (SinkTaskを継承)
FluentdSinkConnector

SinkConnectorを継承し必要なメソッドを実装します。

  • public String version()
    • バージョン文字列を返します
  • public Class<? extends Task> taskClass()
    • taskクラス(FluentdSinkTask.class)を返します
  • public List<Map<String, String>> taskConfigs(int maxTasks)
    • マルチスレッドに対応させたい場合は、taskMaxの値に応じて返り値のListMap<String,String>を詰め込みます
    • 設定ファイル(properties)に指定した値をそのままセットします
  • public void stop()
    • Connectorの終了時にやることがあれば書きます
  • public ConfigDef config()
    • 設定の定義('FluentdSinkConnectorConfig.conf()')を返します
FluentdSinkConnectorConfig

FluentdSourceConnectorConfigと同じようにpublic static ConfigDef conf()ConfigDefのAPIを使って設定を定義します。

FluentdSinkTask

Sinkを実装する際に最も重要なクラスです。以下のメソッドを実装します。

  • public String version()
    • バージョン文字列を返します。定型のコードなので詳細は省略します。
  • public void start(Map<String, String> properties)
    • このタスクを初期化します。FluentdSinkTaskではFluencyのインスタンスを作成しています。
  • public void put(Collection<SinkRecord> collection)
    • Kafkaから定期的に呼ばれます。FluentdSinkTaskではFluencyでデータをFluentdに送信しています。
  • public void flush(Map<TopicPartition, OffsetAndMetadata> map)
    • これをきちんと実装すれば exactly-once に対応できるらしいです。しかし Fluentd は exactly-once に対応していないので、今のところ単にFluencyのバッファをflushするだけにしています。
  • public void stop()
    • タスクの停止にやることがあれば、中身を実装します。FluentdSinkTaskではFluencyが全てのバッファをflushするのを待ちます。
スキーマについて

SchemaBuilderStructを利用すれば任意のデータ構造をスキーマにマッピングできるのですが、MessagePackとKafka ConnectのSchemaで表現できる型に違いがあるようです。 サポートしている型の違いついて理解を深め、差異をうまく吸収できるような実装をしたいと考えています。

パッケージング

GradleでMaven Centralにライブラリを公開する - たごもりすメモを参考にしてGradleでやりました。

依存関係にKafkaが入っているけど、配布するjarにはKafka関係のファイルを入れてはいけないことに注意が必要でした。

まとめ

思っていたよりも簡単に実装できたのでKafkaにデータを投入したり、Kafkaからデータを取り込みたいのだけどProducer/Consumerはちょっと難しいという場合にKafka Connectを検討してみるとよいのではないでしょうか。

タグ: Fluentd
2018-02-26

2008|05|06|07|08|09|10|11|12|
2009|01|02|03|04|05|06|07|08|09|10|11|12|
2010|01|02|03|04|05|06|07|08|09|10|11|12|
2011|01|02|03|04|05|06|07|08|09|10|11|12|
2012|01|02|03|04|05|06|07|08|09|10|11|12|
2013|01|02|03|04|05|06|07|08|09|10|11|12|
2014|01|02|03|04|05|06|07|08|09|10|11|12|
2015|01|02|03|04|05|06|07|08|09|10|11|12|
2016|01|02|03|04|05|06|07|08|09|10|11|12|
2017|01|02|03|04|05|06|07|08|09|10|11|12|
2018|01|02|03|04|05|06|07|08|09|
タグ: