ククログ

株式会社クリアコード > ククログ > fluent-plugin-elasticsearchのSnifferクラスについて

fluent-plugin-elasticsearchのSnifferクラスについて

はじめに

fluent-plugin-elasticsearchはよく使われているプラグインの一つです。 このプラグインをメンテナンスするためには、Fluentdの知識だけでなく、Elasticsearchが今後どのようになっていくかも知っておく必要があります。 また、このプラグインはRed Hat社がメンテナンスしているOpenShiftのログコンポーネントの一部としても使われています。

elasticsearch-transportのSnifferクラスとは

elasticsearch-transportには定期的にクラスタの状況を監視するSnifferクラスがあります。このクラスではGET _nodes/httpというクラスタの状況を返答するAPIを叩いており、大抵の場合はこのAPIを叩いておけばElasticsearchクラスタの状況がfluent-plugin-elasticsearchが使っているelasticsearchクライアントに通知されます。 そのため、X-Packを用いない通常の使用方法では問題になりません。

k8sサービス化されたElasticsearchクラスタに接続する

k8sのサービスとはPodから生成したノードを一まとめにしたアクセス手段を提供します。k8sの世界観ではサービスのアクセス先は一定です。しかし、サービスを構成するノードの構成要素はある時は起動していますが、またある時は停止または破棄されています。このノード一つ一つにElasticsearchが立っていても通知速度よりも起動・破棄のサイクルが速ければGET _nodes/httpを使用しても欠点が目立つようになります。 そのため、k8sのサービス化されたElasticsearchクラスタには新たなSnifferクラスの実装が必要になります。

そこで、元々のSnifferクラスのhostsメソッドの実装を見てみると、以下のようになっています。

        # Retrieves the node list from the Elasticsearch's
        # [_Nodes Info API_](http://www.elasticsearch.org/guide/reference/api/admin-cluster-nodes-info/)
        # and returns a normalized Array of information suitable for passing to transport.
        #
        # Shuffles the collection before returning it when the `randomize_hosts` option is set for transport.
        #
        # @return [Array<Hash>]
        # @raise  [SnifferTimeoutError]
        #
        def hosts
          Timeout::timeout(timeout, SnifferTimeoutError) do
            nodes = transport.perform_request('GET', '_nodes/http').body

            hosts = nodes['nodes'].map do |id,info|
              if info[PROTOCOL]
                host, port = info[PROTOCOL]['publish_address'].split(':')

                { :id =>      id,
                  :name =>    info['name'],
                  :version => info['version'],
                  :host =>    host,
                  :port =>    port,
                  :roles =>   info['roles'],
                  :attributes => info['attributes'] }
              end
            end.compact

            hosts.shuffle! if transport.options[:randomize_hosts]
            hosts
          end
        end

nodes = transport.perform_request('GET', '_nodes/http').body の行でElasticsearchクラスタの情報を取りに行き、取りに行った情報から再度クラスタの情報を再構築しています。

もし、接続先のURLやIPアドレスが固定であれば、以下のようなSnifferクラスを作成し、ホスト情報を使い回す振る舞いをさせた方が良いです。

require 'elasticsearch'

class Fluent::Plugin::ElasticsearchIdempotenceSniffer < Elasticsearch::Transport::Transport::Sniffer
  def hosts
    @transport.hosts
  end
end

elasticsearchクライアントは独自Snifferを渡してそのクラスを元にクラスタ情報を再構築するようなカスタマイズをすることができます。

@sniffer = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)

これらの変更をfluent-plugin-elasticsearchで扱うには以下のようにすると独自のSnifferクラスを用いてElasticsearchクラスタとやりとりできるようになります。

     config_param :pipeline, :string, :default => nil
     config_param :with_transporter_log, :bool, :default => false
     config_param :emit_error_for_missing_id, :bool, :default => false
+    config_param :sniffer_class_name, :string, :default => nil
     config_param :content_type, :enum, list: [:"application/json", :"application/x-ndjson"], :default => :"application/js
on",
                  :deprecated => <<EOC
 elasticsearch gem v6.0.2 starts to use correct Content-Type. Please upgrade elasticserach gem and stop to use this option
.

#...
     def client
       @_es ||= begin
         adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
         transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
                                                                             options: {
                                                                               reload_connections: @reload_connections,
                                                                               reload_on_failure: @reload_on_failure,
                                                                               resurrect_after: @resurrect_after,
                                                                               retry_on_failure: 5,
@@ -287,7 +300,8 @@ EOC
                                                                               http: {
                                                                                 user: @user,
                                                                                 password: @password
-                                                                              }
+                                                                              },
+                                                                              sniffer_class: @sniffer_class,
                                                                             }), &adapter_conf)
         es = Elasticsearch::Client.new transport: transport
 

まとめ

fluent-plugin-elasticsearchのElasticsearchへのリクエストに関わるelasticsearch-transportのSnifferに関するお話を書きました。 記事と同様の働きをするパッチはfluent-plugin-elasticsearchのv2.11.5に取り込んでリリース済みです。 fluent-plugin-elasticsearchでk8sやnginxのプロキシを設置していて接続のリロード時に正常なElasticsearchクラスタの情報が取得できずに困っている場合はsniffer_class_nameの設定項目を初期値から変えてみたり、独自のSnifferクラスを定義したりしてみてください。