株式会社クリアコード > ククログ

ククログ

タグ:

Fluentd v0.14.xが安定版になりました

少し前の話ですが、2017年11月01日にFluentd v0.14.22 がリリースされました。 このリリースでは以下のようにstableなリリースだと宣言されました。

v0.14.22 is a stable release of v0.14 series

安定版になったので安定版を待っていたプラグイン開発者の人は、諸々対応していただけると大変ありがたいです pray

このリリースが出てからは、主にドキュメントを充実させるべくdocs.fluentd.orgリポジトリにPull requestを送っています。

11月20日時点で約40件PullRequestを送ってplugin helpersections (buffer sectionは既に書かれていた)のドキュメントを充実させました。 これで、これまではソースコードを読まないとわからなかったplugin helperの使い方や、plugin helperを使ったプラグインの設定の書き方をまとめることができました。

記事執筆時点でざっと見渡したところ、不足しているドキュメントは以下の通りでした。

  • Writing Buffer Plugins: 全体
    • 必要としている人は少ないのであとまわし
  • Writing Storage Plugins: 全体
    • 必要としている人は少ないのであとまわし
  • Output Plugin Overview: secondary output の例を追加
  • その他、不足している部分の改善や追加
    • built-in プラグインのドキュメント

まとめ

Fluentd v0.14 のplugin APIとplugin helperを使ったプラグインを書くためには、ソースコードを読む必要がありましたが、ドキュメントを整備したのでドキュメントを読めばFluentd v0.14のplugin APIやplugin helpersを使って少ない記述量で高度なプラグインを開発できるようになりました。

built-inプラグインのドキュメントについても今後、書いていく予定です。

タグ: Fluentd
2017-11-20

データ分析基盤構築入門

データ分析基盤構築入門という書籍を著者の一人@yoshi_kenさんから「Fluentdのプラグインの件でお世話になっているので〜」ということでいだたきました。ありがとうございます!

この書籍を読むとFluentd,Elasticsearch,Kibanaの体系化された知識を得ることができます。 特にこれからデータ分析基盤を作ろうと考えている人や、データ分析基盤を構築しているけど、自己流なので自信がないなぁという人におすすめの書籍です。

400ページくらいあり分厚いのですが、サンプルコード・実行例・設定例がたくさんあり、まずは真似して始めてみる、ということが大変やりやすくなっています。

第1部 データ分析基盤入門

データ分析基盤を構築する目的はなんなのか、データ分析基盤を構築し、提供する立場ではどういう考え方で行動するべきなのかという心構えから、ビジネスではデータ分析基盤をどのように活用される例があるのかも説明されているので、これから始める人にとってちょうど良い内容になっています。

第2部 Fluentd入門

現在存在する日本語の情報でFluentdについて最もきちんとまとまっている情報です。ドキュメント化されていない情報や見つけにくい情報もまとまっているため、リファレンスとして一冊手元にあるととても便利だと思いました。 またFluentd v0.12とFluentd v0.14の違いについても書かれているため、これから新しくFluentdを使い始める人にとっても役に立つでしょう。

多くの人が最初に使うであろうtailプラグインについては徹底攻略として設定例を含めて約20ページの解説が書かれています。 また、Fluentdノードのデザインパターンとしてよく使用されるであろうノード構成について詳しく解説されています。

第2部の最後に運用Tipsとして、様々な知見が書かれています。これらは実際にFluentdを運用しようとしたときに気になる項目ばかりでした。

第3部 Elasticsearch入門

Elastic社の中の人が書いているので、とても詳しく書いてあります。 Elasticsearchを使ったことがないので、内容の評価はできませんが始めて使う人から、大規模で運用したいと考えている人まで幅広く対応した内容だと思いました。

第4部 Kibana入門

Kibana入門のタイトル通りの内容でした。Kibanaも使ったことがなかったのですが、この書籍を読んで使い始めることができそうという気持ちになりました。

付録

付録というよりも第5部と言った方が適切なくらいの分量がありました。 Fluentd/Embulkのプラグインリストは、日本語でまとめられている情報では質と量で一番です。

まとめ

第1部から第4部までと付録はそれぞれ独立しているので、どこからでも読み始めることができます。 しかし、それぞれのソフトウェアについて独立して書かれているため、FluentdとElasticsearchの連携をするための方法については、あまり詳しく書かれていませんでした。 この書籍が書かれた時期は2017年4月より前だと思うのですが、その時期はfluent-plugin-elasticsearchのメンテンスが滞っていたので詳しく書けなかったのだと思われます。 2017年9月から、クリアコードメンバーがCollaboratorとして開発に参加しているので、FluentdからElasticsearchにデータを送るのにfluent-plugin-elasticsearchを使い続けても安心です。

データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化]
鈴木 健太/吉田 健太郎/大谷 純/道井 俊介
技術評論社
¥ 3,218

タグ: Fluentd
2017-09-27

Fluentdで古くなったプラグインを検出する試み

Fluentd v0.14.xはFluentd v0.10.xのAPIがいくつか廃止されていて古いAPIを使っているプラグインが動作しなくなっています。また、他にも以下の観点でFluentd v0.14では使わない方がよい3rd party製プラグインが見つかっています。

  • 古くなっていてメンテナンスされていないもの
  • 最新のFluentdに添付されているプラグインでは、その機能が取り込まれているため使う意味がないもの
  • 対象のミドルウェアがメンテナンスされなくなっているもの
  • 対象のウェブサービスがサービス提供を終了しているもの
  • 旧バージョンのRubyでのみ意味があったもの
  • ある問題を解決するためにrubygems.orgで公開されていたが、アップストリームで対応されるなどして問題が解決されたため不要になったもの

Fluentdを使いたいユーザーがプラグインを探すときはプラグイン一覧を見ると思いますが、ここのリストは機械的に更新されています。ユーザーが使いたいプラグインをより見つけやすくするために、もう使わない方がよいプラグインをリスト化してメンテナンスしています

このリストは手動でメンテナンスされていてfluent/fluentd-websiteにPull requestを送ると取り込まれて、翌日にはウェブサイト上に反映されるはずです。

記事執筆時点では約40個のプラグインがこのリストに載っています。このリストを見れば、使わない方がよいプラグインを確認することができます。 新しく使用するプラグインを検討するときは、このリストを目視で確認していくとよいのですが、既にインストール済みのプラグインがたくさんある場合は、目視で確認するのはとても大変です。

そこでFluentd起動時に確認できるfluent-plugin-obsolete-pluginsというプラグインを作ってみました。

このプラグインは、Fluentd起動時に非推奨プラグインのリストを読み込んでその環境で利用可能なgemのリストと比較し、非推奨なプラグインがインストールされていたら警告します。このプラグイン自体はFluentd v0.14 APIを使用して実装したレコードを素通しするFilterプラグインです。v0.14 APIを使用して実装したので、Fluentd v0.12以前のバージョンでは利用できません。

Fluentd v0.14の検証のついでに、非推奨なプラグインのチェックも一緒にしてみてはいかがでしょうか。

タグ: Fluentd
2017-07-03

Fluentd pluginのconfigure内でのアンチパターン

多くのFluentdのプラグインを見てきて知見が増えてきたのでまとめます。

config_sectionを使っていない

Fluentdのプラグインでは、configureメソッド内で設定を自力で解釈することによりconfig_sectionconfig_paramで定義していない設定値も扱えるようになっています。

以下のような設定ファイルを

<source>
  @type sample
  user hoge
  pass secret_passowrd
  <pattern>
    name foo
  </pattern>
  <pattern>
    name bar
  </pattern>
</source>

次のコードで自力で解釈することができます。

def configure(conf)
  super
  @patterns = conf.select {|element| element.name == "pattern" }
  @user = conf["user"]
  @pass = conf["pass"]
end

このように自力で解釈すると、テストも書かなければならないし、間違いも発生しやすくなるので以下のように、Fluentdの本体でよくテストされているconfig_sectionconfig_paramを組み合わせて使いましょう。config_sectionconfig_paramを使うと設定を宣言的に書くことができて、とてもわかりやすくなります。その上、fluent-plugin-config-formatというコマンドでMarkdownやJSONで出力することができます。descを使って説明も加えておくと、fluent-plugin-config-formatでも説明が出力されて便利です。

config_section :pattern, param_name: "patterns", multi: true do
  desc "name for pattern"
  config_param :name, :string
end
desc "user name for login"
config_param :user, :string
desc "password for login"
config_params :pass, :string, secret: true

def configure(conf)
  super
  # 何か追加の処理
end

config_paramのarrayを使っていない

<source>
  @type github-activities
  users ashie,cosmo0920,kenhys,kou
</source>

上記のような設定を以下のようなコードで、配列化して使っているプラグインがありました。

desc "user names e.g. user1,user2,user3"
config_param :users, :string, default: ""
def configure(conf)
  super

  @users = @users.split(",").map(&:strip)
end

これは以下のように書き換えることができます。

desc "user names e.g. user1,user2,user3"
config_param :users, :array, default: [], value_type: :string
def configure(conf)
  super
end

config_paramのenumを使っていない

<source>
  @type groonga
  protocol http
  # ...
</source>

上のような設定を次のようなコードでパースして使用しているプラグインがありました。

config_param :protocol, default: :http do |value|
  case value
  when "http", "gqtp", "command"
    value.to_sym 
  else
    rails Fluent::ConfigError, "must be http, gqtp or command: <#{value}>"
  end
end
def configure(conf)
  super
  # @protocol を使う
end

これは以下のように書き換えることができます。

config_param :protocol, :enum, list: [:http, :gqtp, :command], default: :http
def configure(conf)
  super
  # @protocol を使う
end

書き換えることで、コードがすっきりしました。

秘密の値をsecretにしていない

<match dummy.log>
  @type honeycomb
  writekey very-secret-key
  # ...
</match>

パスワードやAPIキーなどの秘密にしておきたい情報を設定ファイルに書かせるときは、secretオプションを付けましょう。 secretオプションをtrueにするとFluentdの起動時に表示されるダンプなどで値がマスクされます。

config_param :writekey, :string, secret: true

以下のようにsecretオプションを忘れてしまうと、Fluentd起動時のダンプなどにそのまま出力されてしまうので、バグ報告などのときにユーザーが自分でマスクしなければなりません。

config_param :writekey, :string

必須チェックを自分でしている

config_param :tag, :string, default: nil
def configure(conf)
  super
  raise Fluent::ConfigError, "tag is required" unless @tag
end

上のように必須の値のデフォルト値をnilにして、自分でチェックしているコードをたまに見かけます。 下のようにデフォルト値を省略するとその設定は必須になり、Fluentd起動時にチェックが実行されます。

config_param :tag, :string
def configure(conf)
  super
end

desc を書いていない

config_param :tag, :string

descを書くとfluent-plugin-config-formatというコマンドで、説明を見やすくフォーマットしたものを出力することができます。これはREADMEに設定の説明を書くときに使うととても便利です。

desc "tag for records"
config_param :tag, :string

まとめ

Fluentdで用意されている機能を活用して、メンテナンスを続けやすいプラグインを開発しましょう。

タグ: Fluentd
2017-06-21

Fluentd pluginでconfig_paramとconfig_sectionを使いこなす

Fluentdのプラグインの開発をする上で避けて通れないのが設定です。ユーザーに設定ファイルを書いてもらってプラグインの動作をカスタマイズできるようにすることは必須です。

config_paramconfig_sectionという2つのAPIを使いこなすとさまざまな設定を簡単にきれいに書くことができます。

config_param

config_paramは設定を定義するために使います。

次のように名前と型を指定するのが基本形です。

desc "description for name"
config_param :name, :string

この場合は:nameという名前の文字列型の設定を定義しています。型によって、使えるオプションが異なるので型ごとに説明します。なお、config_paramの直前にdescを書くと設定の説明を書くことができます。説明を書いておくとfluent-plugin-config-formatコマンドで説明付きで設定の定義をダンプすることができるようになります。

typeとして以下の9つの型を指定可能です。

  • :string
  • :integer
  • :float
  • :size
  • :time
  • :bool
  • :enum
  • :array
  • :hash
string, integer, float, bool

それぞれの型に変換して、指定された名前のインスタンス変数に値をセットします。

config_param :name, :string, default: "", secret: true, alias: :full_name
  • default: デフォルト値を指定します。省略するとこの設定は必須になり、Fluentd起動時に設定されているかどうかチェックされます。
  • secret: 真を指定すると、Fluentd起動時のログなどで値がマスクされます。
  • alias: この設定の別名を指定します。
size

バイト単位のサイズを設定します。k,m,g,tというサフィックスを利用可能です。それぞれ、キロ、メガ、ギガ、テラを表します。大文字、小文字は区別しません。 サイズは以下のように整数に変換されます。

limit 10  # 10 byte
limit 10k # 10240 byte
limit 10m # 10485760 byte
limit 10g # 10737418240 byte
limit 10t # 10995116277760 byte

使えるオプションはstring等と同じです。

time

時間の長さを指定します。s,m,h,dというサフィックスを利用可能です。それぞれ、秒、分、時、日を表します。小文字のみ有効です。 単位を省略するとto_fした値を使用します。秒に変換されます。

interval 0.5 # 0.5秒
interval 1s  # 1秒
interval 1m  # 1分 = 60秒
interval 1h  # 1時間 = 3600秒
interval 1d  # 1日 = 86400秒

使えるオプションはstring等と同じです。

enum

列挙型です。ユーザーに特定の値のリスト内から設定値を選ばせたいときに使います。ユーザーがリスト内に存在しない値を設定した場合、Fluentd起動時にエラーが発生します。

config_param :backend_library, :enum, list: [:geoip, :geoip2_c, :geoip2_compat], default: :geoip
  • default: デフォルト値を指定します。
array

配列です。

config_param :users, :array, default: [], value_type: :string

このように書くと、以下のような設定で@users = ["user1", "user2", "user3"]と設定されます。

users user1,user2,user3
  • default: デフォルト値を指定します。
  • value_type: 値の型を指定します。:string, :integer, :float, :size, :bool, :timeのいずれかを指定できます。
hash

ハッシュです。設定ファイルの書き方は2通りあります。

config_param :key_values, :hash, default: {}, symbolize_keys: true, value_type: :string

設定例:

key_values {"key1": "value1", "key2": "value2"} # JSONで書く
key_values key1:value1,key2:value2

どちらも以下のようにパースされます。

{ key1: "value1", key2: "value2" }
  • default: デフォルト値をハッシュリテラルで指定します。
  • symbolize_keys: 真を指定するとキーをシンボル化します。
  • value_type: 値の型を指定します。全ての値で同じ型を使用します。

config_section

config_sectionconfig_paramをグループ化するために使用します。 具体例としては、組み込みのバッファーの設定、パーサーの設定などがあります。

例えば、fluent-plugin-s3では以下のように認証情報の設定、バッファーの設定、フォーマッターの設定をグループ化して記述することができます。fluent-plugin-s3では認証方法が複数あるので、認証方法ごとにセクションを作ることによって、利用している認証方法がわかりやすくなっています。このように設定をグループ化することによって設定ファイルの可読性が向上しています。

<match *.log>
  @type s3
  <assume_role_credentials>
    role_arn xxxxx
    role_session_name xxxxx
  </assume_role_crecentials>
  <buffer tag,time>
    @type file
    path /var/log/fluent/s3
    timekey 3600 # 1 hour partition
    timekey_wait 10m
    timekey_use_utc true # use utc
  </buffer>
  <format>
    @type json
  </format>
</match>

config_section のメソッドシグニチャーは以下の通りです。

config_section(name, root: false, param_name: nil, final: nil, init: nil, required: nil, multi: nil, alias: nil, &block)
  • name: セクション名をシンボルで指定します。デフォルトではここで指定した名前と同じ名前のインスタンス変数をプラグインインスタンス内で参照することができます。
  • キーワード引数
    • root: ルートセクションである場合に真を指定します。Fluentd内部で利用するもので、一般のプラグインでは利用しません。
    • param_name: プラグインのインスタンス内で利用する、インスタンス変数名を@を除いたシンボルまたは文字列で指定します。
    • final: プラグインのサブクラスでこのセクションの上書きを禁止するならば真を指定します。一般のプラグインでは、あまり意識しなくてもよいです。
    • init: このセクションでは初期値が必須であるパラメータのみを定義します。一般のプラグインでは、あまり意識しなくてもよいです。
    • required: このセクションが必須であれば真を指定します。真を指定した場合、設定ファイルにこのセクションがないとFluentdの起動時にエラーが発生します。
    • multi: このセクションを複数指定可能であれば真を指定します。
    • alias: このセクションの別名を指定します。

例: sample というプラグインに以下のようなセクションがある場合の設定ファイルの書き方を例示します

config_section :child, param_name: 'children', required: false, multi: true, alias: 'node' do
  config_param :name, :string
  config_param :power, :integer, default: nil
  config_section :item, multi: true do
    config_param :name, :string
  end
end
<source>
  @type sample
  <child>
    name  gohan
    power 100
    <item>
      name senzu
    </item>
  </child>
  <child>
    name  goten
    power 10
  </child>
</source>

まとめ

config_paramconfig_sectionを使うことで得られる利点をまとめます。

  • plus1 設定を宣言的に定義できる
  • plus1 fluent-plugin-config-formatコマンドでMarkdownやJSONで定義を出力することができる
    • Markdownで出力したものはREADMEにそのまま貼り付けることも可能 100
  • plus1 コード量を少なくすることができる

config_paramconfig_sectionを使うことで、デメリットはありません。 Fluentd組み込みのAPIを使いこなして、使いやすくメンテナンスしやすいプラグインを書きましょう。

タグ: Fluentd
2017-06-14

Fluentd v0.14のEventTimeに関する話

はじめに

Fluentd v0.14ではログをmsgpackでエンコードし、新たに時間をForwardプロトコルで送る際に時間をEventTimeへエンコードして送信することができるようになりました。 このエンコード形式を用いて時間をForwardプロトコルで送るようにすると、秒よりもさらに細かな精度でログのやりとりができるようになります。 Fluent Loggerでログを送る際に、一秒間に2つ以上のログが発生する環境で秒精度までのログ転送を行った場合、Fluentdが扱うログの順番が送り先で発生した順ではなくなることがあります。 そのため、ログの順番をより正確に時刻でソートするために考えられたv0.14で新たにログの時刻の形式として秒精度以下も扱えるEventTimeが追加されました。

EventTime

Forwardプロトコルのv1の仕様にEventTimeについての解説があります。

仕様を見ると、EventTimeはmsgpackの拡張型として表されることがわかります。 また、fixext8とext8のどちらの場合でも秒とナノ秒は32bit integerで表し、msgpackへエンコードする必要があります。 fixext8形式は実行時に長さを取る必要がないので、構造体を直にEventTimeへエンコードする使い方をすると実行時のことを考えなくてもよくなります。

fruentlyではfixext8を使ったEventTime対応を行いました。

fixext8を用いたEventTime拡張型の場合もext8を用いたEventTime拡張型も秒とナノ秒を表す32bit Integerをエンコードする際にはBig Endianでエンコードする必要があることに注意してください。

fixext8
+-------+----+----+----+----+----+----+----+----+----+
|     1 |  2 |  3 |  4 |  5 |  6 |  7 |  8 |  9 | 10 |
+-------+----+----+----+----+----+----+----+----+----+
|    D7 | 00 | second from epoch |     nanosecond    |
+-------+----+----+----+----+----+----+----+----+----+
|fixext8|type| 32bits integer BE | 32bits integer BE |
+-------+----+----+----+----+----+----+----+----+----+
ext8
+--------+----+----+----+----+----+----+----+----+----+----+
|      1 |  2 |  3 |  4 |  5 |  6 |  7 |  8 |  9 | 10 | 11 |
+--------+----+----+----+----+----+----+----+----+----+----+
|     C7 | 08 | 00 | second from epoch |     nanosecond    |
+--------+----+----+----+----+----+----+----+----+----+----+
|   ext8 | len|type| 32bits integer BE | 32bits integer BE |
+--------+----+----+----+----+----+----+----+----+----+----+

Unix epoch時間(v0.12互換)

EventTimeなしでFluent Loggerを動かす場合は、秒精度までのログしか送ることができません。Unix epochで表現が可能な精度の時刻がログに含まれます。

2017-05-17 17:29:57.000000000 +0900 test: {"name":"fruently"}
EventTime(v0.14から)

EventTime形式にエンコードして時間をログに含めた際には、処理系がサポートする秒以下の精度の時間も含めてログに含めることができます。

2017-05-17 17:29:57.587226000 +0900 test: {"name":"fruently"}

まとめ

Fluent LoggerのEventTime対応をする際に参照する必要になったEventTimeの仕様を解説しました。 EventTimeを用いてエンコードされた時刻はこれまでのものよりもさらに精度の高い時刻情報を持てるようになります。 1秒に1つ以上のログをFluent Loggerに送る必要がある際にFluent LoggerがEventTime対応をしているかどうかを調べて、未対応であればバグ報告やプルリクエストをしてみるのはいかがでしょうか。

タグ: Fluentd
2017-05-24

Fluentd v0.14 API移行のすすめ

クリアコードではFluentd本体の開発だけでなくFluentdの600以上あるプラグインの開発にも参加しています*1。具体的にどういうことをやっているかは過去の記事を参照してください。

FluentdのプラグインをFluentd v0.14 APIに移行するための記事がいくつかあります。

具体的な移行方法についてはそれぞれの記事で解説しましたが、肝心のFluentd v0.14 API移行のPros/Consを説明していませんでした。 この記事ではプラグイン開発者視点でFluentd v0.14 APIへの移行について扱います。既に稼働しているFluentdをv0.14へ移行することについては扱いません。

Pros
  • 洗練されたプラグインAPIを使うことでメンテナンスしやすいプラグインを開発することが可能
  • non-buffered/buffered/async な output プラグインを1つのファイルで開発できる
  • non-buffered/buffered/async な output プラグインを設定ファイルで切り替えできる
  • 便利なプラグインヘルパーによってコードの見通しがよくなる
  • 簡単に使える本物のテストドライバーによって、メンテナンスしやすいテストコードを書くことができる
  • 組み込みのマルチプロセスサポート
  • bufferのchunkの分け方を柔軟に指定できる
    • タグごとにchunkを分ける
    • 時間ごとにchunkを分ける
    • chunkに含まれる任意のレコードごとにチャンクを分ける
  • storageプラグインのAPIが追加され、プラグインの状態を管理する汎用的な機構が備わった
Cons
  • 新APIは後方互換性がないので、基本的には全部書き直しが必要
    • ただし、Fluentd v0.14には互換レイヤーがあるので、移行しなくても基本的にはそのままで動作する
  • テストコードは全面的に書き直しが必要
    • Fluentd本体ではtest-unitを使用してテストコードを書いているので本体のテストコードを見ればどう書けばよいかわかる
  • Fluentd v0.14 API に移行すると Fluentd v0.12 以前でも動くようにするのは不可能*2
まとめ

Fluentd v0.14 APIに移行するPros/Consをまとめてみました。既に多くのプラグインがFluentd v0.14のAPIに移行済みです。最近は、最初からFluentd v0.14のAPIを使用している3rdパーティのプラグインも見かけるようになりました。

まだ移行していないプラグイン作者の方はtd-agent3が正式リリースされる前に、自分たちのプラグインをFluentd v0.14 APIに移行するとよいのではないでしょうか。

*1 すべてではありません

*2 Fluentd v0.10.x と Fluentd v0.12.x はプラグイン開発者が少しコードを追加することでFluentd v0.10.xとFluentd v0.12.xのどちらでも動くようにできた

タグ: Fluentd
2017-05-23

Fluent Loggerの信頼性を高めるには

はじめに

Fluentdにログを送る方法として、Fluent Loggerを使う方法があります。 RubyやJavaにはそれぞれfluent-logger-rubyやfluent-logger-javaなどのFluent Loggerがあり、よくメンテナンスされています。 この記事ではFluent Loggerを使ってFluentd v0.12またはv0.14にログを送信する時にどのようにするとより確実にログ転送ができるようになるかを解説します。

最小構成のFluent_Loggerを作成するには では最小構成のFluent Loggerはどのような仕様に基づき実装されるべきかを解説しました。この記事はその続編です。

確実にログを送るには

確実にログを送るにはエラーが起きた時にそのエラーを回復する手段を提供されていることが必要です。

ログが送れたことをFluent Logger側で検出する

より確実にログを送信したことをLogger側で確認するにはOptionやResponse節にあるようにoptionを使う事が重要になります。 optionの中に128bitユニークなIDのbase64を取ったものをchunkをキーとしたペアに入れ、ackで返って来たbase64の値と比較してやる事で、Logger側で確実に送信されたものと判定出来ます。 *1 確実に送信されたログに関してはバッファから削除してしまって問題ありません。

TCP接続のエラーから回復して何度か再試行する

また、ネットワークの状況によっては、一回でTCPの接続を確立するのが難しく、何回か再試行する必要があることがあります。 このときに、接続を複数回繰り返す方法としては、一定期間ごとに試行する方法(periodic)、試行間隔を指数関数的に増やして行く方法(exponential back-off)のどちらかが取られます。

送信の再試行回数を超えてエラーとなった時のログ消失を防ぐ方法

アプリケーションが予期しない理由により停止してしまった場合に備えて、送信していないログをメモリ上のバッファに溜めておくだけでなく、ファイルに書き込む必要がある場合もあります。 このとき、ファイルにバッファを書き込む際にはFluentdのプラグインで扱いやすい形にしておくほうがよいです。 例えば、msgpackのバイナリ列のバッファをそのまま吐き出したり、TSV形式にすることで、in_tailにより送信ができなかったファイルを後から送信する、という回復処理が行えるようになっているとFluentdに長期間繋がらなかった際にログの消失を抑える有効な手立てとなります。

Fluent Loggerのよくある実装を踏まえての解説

ここからはFluent Loggerのよくある実装を踏まえて解説を行います。 これら3つの仕様をFluent Loggerに入れることができればより強固にFluentdへログを転送することが可能になるでしょう。

Require Ack Response

確実にログを送ったことをFluent Logger側で検出するのに実装するべきことは次の通りです。 Fluent protocolでは最後のoptionというフィールドに12byteのbase64エンコードされた値をchunkをキーとするKey-Valueを持たせることができます。*2 このオプションを用いることで、Fluentdへのログ送信が完了したことがFluent Logger側で確認できるようになります。

このオプションが実装されているFluent LoggerにはFluencyfluent-logger-nodeがあります。

送信の再試行

TCP接続のエラーから回復して何度か再試行する、ということを実現するには再試行の戦略を決める必要があります。 大きく分けてTCP接続に失敗した時に等間隔で再試行するか、それとも再試行の間隔を指数関数的に増やしていくかの二つの方法が取れます。 等間隔で再試行を行う場合は再試行の時間まであとどれくらいかを予測しやすくなりますが、一方で送信先のノードが落ちている場合は再試行回数が極端に増える結果となります。 そのため、最初は再試行間隔が最初は徐々に増やされ、だんだんと間隔が開いていく指数関数的に再試行時間を決める方法を筆者はとることが多いです。

送信がうまくいかなかった時のファイルへの出力

送信の再試行回数を超えてエラーとなった時のログ消失を防ぐにはどうしてもログが送信できない場合に、ローカルのストレージへファイルとして出力する方法が取れます。in_tailではmsgpackやTSV形式などでログファイルをパースすることが可能です。 どうしてもエラーの回復ができない場合には最終的にはFluent Loggerに渡したログをファイルに出力し、後日改めてFluentdのin_tailなどで送信エラーの起きたログを回収するようにすると良いでしょう。

おわりに

Fluent Loggerは単純にログを転送するだけではなく、Logger側でログがFluentdへ転送できたことを検知する仕組みを入れることができたり、TCP接続を確実に確立するための再試行の機構を取り入れたり、再試行回数の上限を超えてしまった時はファイルに転送しようとしたログをダンプする戦略が取れることを解説しました。 なお、この記事では解説できませんでしたが、この記事は筆者が作成したRustのFluent Loggerのfruentlyを作成するにあたって得られた知見を元にしています。

*1 より詳細には https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option を参照すること。

*2 https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#grammar

タグ: Fluentd
2017-04-28

Fluentd v0.14で導入されたstorageプラグインとは

はじめに

Fluentd v0.14では新たにstorageプラグインという新しいタイプのプラグインが導入されました。 FluentdをインストールしただけではJSON形式により保存される storage_local プラグインしかありませんが、このstorageプラグインはFluentdのプラグインのインスタンスが保持する値をKVSに集約することにも使用することができます。

storageプラグインとstorageプラグインヘルパー

Fluentd v0.14ではさらにプラグインヘルパーという概念も追加されました。storageプラグインにおいてもstorageプラグインを直接使うのではなく、storageプラグインヘルパーを通じて使うことが推奨されます。

storageプラグインのAPI

storageプラグインは以下のAPIを持ちます。これはKVSから値を取り出したり、保存したり、また取り出した値をキャッシュしておくのに合うAPIとなっています。

# basically, interact with KVS
def load
end

# basically, interact with KVS
def save
end

# Normally, the following methods work as `cache`.
def get(key)
end

def fetch(key, defval)
end

def put(key, value)
end

def delete(key)
end

def update(key, &block) # transactional get-and-update
end

このうち、 #load#save については実際のKVSに対して値を読み込んできたり、保存したりする役割を担います。 一方、 #get#fetch#put#delete#updateについてはstorageプラグインだけではキャッシュとして振る舞うことが求められます。

storageプラグインヘルパー

storageプラグインヘルパーはstorageプラグインを直接使わずにstorageプラグインの性質を変化させるように作成されています。 storageプラグインヘルパーの #wrap_instance メソッドにより、storageプラグインのインスタンスをそのまま使用するか、storageプラグインの値を永続化して同期を取るか、単に同期を取るかが決定されます。

Fluentdの実際のコードでは以下のようになっています。

def wrap_instance(storage)
  if storage.persistent && storage.persistent_always?
    storage
  elsif storage.persistent
    PersistentWrapper.new(storage)
  elsif !storage.synchronized?
    SynchronizeWrapper.new(storage)
  else
    storage
  end
end

<storage> セクションに persistent true が設定されていることや、 #persistent_always? の返す値、#synchronized? が返す値が振る舞いを変えることがわかります。

実際のstorageプラグインの例

これらを踏まえて、筆者はMongo、Redis、Memchachedについてのstorageプラグインをそれぞれ作成しました。

これらを用いると上記3つのKVSに対してstorageプラグインによりownerプラグインであるinput, output, filterプラグインの情報をKVSへ集約することができます。

まとめ

Fluentd v0.14で導入されたstorageプラグインの概要とstorageプラグインヘルパーを通した場合の振る舞いの変化について解説しました。 storageプラグインをうまく活用するとstorageプラグイン対応が入っているfluent-plugin-systemdfluent-plugin-windows-eventlogのようにどこまで読んだかの位置の記録をKVSに集約することができるようになります。

つづき: 2017-05-23
タグ: Fluentd
2017-04-19

fluent-plugin-geoip の geoip2 対応した話

fluent-plugin-geoipというIPアドレスから国や州・県などの情報を取得してレコードを加工するFluentdのプラグインがあります。

以前はGeoIP Legacyにあるデータベースを使ってIPアドレスから情報を取得していましたが、GeoIP2がリリースされてしばらく経過したのでGeoIP2に対応した話を書きます。

geoip2_cの開発

経緯はSupport GeoLite2 format · Issue #39 · y-ken/fluent-plugin-geoipに書いてありますが、少し抜粋します。

GeoIPについて調べているとGeoIP2を見つけ、さらにGeoIP2に対応したfluent-plugin-filter-geoipを見つけました。 そこで、fluent-plugin-filter-geoipの内部を調べ、どのようにしてGeoIP2に対応させているかを確認したところmaxminddbというピュアRuby実装のライブラリを使っていました。 他にGeoIP2を利用できるライブラリがあるかどうかを調査したところいくつか既存の実装がありました。

新たにfluent-plugin-geoipにGeoIP2対応を追加するにあたって、GeoIPを使用していたときと同等の性能を維持できるかどうかを確認するためにベンチマークをとりました。

ベンチマークによると、geoip2_compatであれば性能に問題はなさそうなことがわかりましたが、geoip2_compatだとGeoIP Legacyと同等のデータしか取得することができません。GeoIP2にはそれ以外のデータも多数追加されているので、できれば全ての機能を使えるようにしたいと考えていました。maxminddbとhive_geoip2はGeoIPよりも遅くなってしまうので使えません。maxmind_geoip2は速度的には問題なさそうでしたがAPIが独特で使い辛い感じでした。

それぞれの拡張ライブラリのコードを読んでみたところ、遅くなっていた原因は全ての属性値を取得していたことでした。速くするためには、必要な属性値のみ取得するようにすればよいはずです。 この仮説を検証するためにgeoip2_cを開発しました。

先程のベンチマークにgeoip2_cを追加したベンチマークによるとgeoip2_cが最速です。

Rehearsal ---------------------------------------------------------
geoip                   0.140000   0.000000   0.140000 (  0.147379)
geoip2_compat           0.110000   0.010000   0.120000 (  0.108135)
maxminddb (pure ruby)   4.310000   0.000000   4.310000 (  4.320897)
hive                    0.320000   0.000000   0.320000 (  0.321934)
maxmind_geoip2          1.240000   0.320000   1.560000 (  1.561630)
geoip2_c                0.070000   0.000000   0.070000 (  0.067715)
------------------------------------------------ total: 6.520000sec

                            user     system      total        real
geoip                   0.140000   0.000000   0.140000 (  0.142973)
geoip2_compat           0.160000   0.000000   0.160000 (  0.162996)
maxminddb (pure ruby)   4.650000   0.000000   4.650000 (  4.654088)
hive                    0.310000   0.000000   0.310000 (  0.308363)
maxmind_geoip2          1.350000   0.430000   1.780000 (  1.780049)
geoip2_c                0.080000   0.010000   0.090000 (  0.078209)
bundle exec ruby bench.rb  13.26s user 0.83s system 99% cpu 14.134 total

geoip2_cはIPアドレスでlookupを実行しただけでは、実際の値を取得しません。他のライブラリはlookupの時点で値を取得しています。GeoIP2のライブラリでは取得する値が多ければ多いほど処理に時間がかかります。geoip2_cでもGeoIP2で利用できる値を全て取得すると処理に時間がかかるようになります。 利用可能な属性数は、表の通りです。

ライブラリ 利用可能な属性数
geoip 9
geoip2_compat 8
geoip2_c 7+17+(4*7)=52
hive_geoip2 7+17+(4*7)=52

geoip2_cでは例えば、以下のような属性を取得することができますが、実際のアプリケーションでは全ての属性を必要とすることは少ないでしょう。よって必要な属性を必要なときに取得するようにした方が効率がよいです。なお、GeoIP2ではIPアドレスによって取得できる属性に違いがあります。

{"city"=>{"geoname_id"=>10300919, "names"=>{"en"=>"Fort Huachuaca"}},
 "continent"=>
  {"code"=>"NA",
   "geoname_id"=>6255149,
   "names"=>
    {"de"=>"Nordamerika",
     "en"=>"North America",
     "es"=>"Norteamérica",
     "fr"=>"Amérique du Nord",
     "ja"=>"北アメリカ",
     "pt-BR"=>"América do Norte",
     "ru"=>"Северная Америка",
     "zh-CN"=>"北美洲"}},
 "country"=>
  {"geoname_id"=>6252001,
   "iso_code"=>"US",
   "names"=>
    {"de"=>"USA",
     "en"=>"United States",
     "es"=>"Estados Unidos",
     "fr"=>"États-Unis",
     "ja"=>"アメリカ合衆国",
     "pt-BR"=>"Estados Unidos",
     "ru"=>"США",
     "zh-CN"=>"美国"}},
 "location"=>
  {"accuracy_radius"=>1000,
   "latitude"=>31.5273,
   "longitude"=>-110.3607,
   "metro_code"=>789,
   "time_zone"=>"America/Phoenix"},
 "postal"=>{"code"=>"85613"},
 "registered_country"=>
  {"geoname_id"=>6252001,
   "iso_code"=>"US",
   "names"=>
    {"de"=>"USA",
     "en"=>"United States",
     "es"=>"Estados Unidos",
     "fr"=>"États-Unis",
     "ja"=>"アメリカ合衆国",
     "pt-BR"=>"Estados Unidos",
     "ru"=>"США",
     "zh-CN"=>"美国"}},
 "subdivisions"=>
  [{"geoname_id"=>5551752,
    "iso_code"=>"AZ",
    "names"=>
     {"de"=>"Arizona",
      "en"=>"Arizona",
      "es"=>"Arizona",
      "fr"=>"Arizona",
      "ja"=>"アリゾナ州",
      "pt-BR"=>"Arizona",
      "ru"=>"Аризона"}}]}

fluent-plugin-geoipのGeoIP2対応について

GeoIP2サポートする際、なるべくGeoIP Legacyと互換性を保つためにgeoip2_compatを利用し、GeoIP2で利用できる属性を全て使用するためにgeoip2_cを使用することにしました。 設定によってGeoIP Legacyも利用できるようにしました。

それぞれで利用できる属性は以下の通りです。

GeoIP Legacy:

placeholder attributes output example type note
${city[lookup_field]} "Ithaca" varchar(255) -
${latitude[lookup_field]} 42.4277992248535 decimal -
${longitude[lookup_field]} -76.4981994628906 decimal -
${country_code3[lookup_field]} "USA" varchar(3) -
${country_code[lookup_field]} "US" varchar(2) A two-character ISO 3166-1 country code
${country_name[lookup_field]} "United States" varchar(50) -
${dma_code[lookup_field]} 555 unsigned int only for US
${area_code[lookup_field]} 607 char(3) only for US
${region[lookup_field]} "NY" char(2) A two character ISO-3166-2 or FIPS 10-4 code

geoip2_c backend:

placeholder attributes output example note
${city.names.en[lookup_field]} "Mountain View" -
${location.latitude[lookup_field]} 37.419200000000004 -
${location.longitude[lookup_field]} -122.0574 -
${country.iso_code[lookup_field]} "US" -
${country.names.en[lookup_field]} "United States" -
${postal.code[lookup_field]} "94043" -
${subdivisions.0.iso_code[lookup_field]} "CA" -
${subdivisions.0.names.en[lookup_field]} "California" -

geoip2_cバックエンドでは、上記の属性だけでなくGeoIP2のデータベースに含まれる全ての属性を使用可能です。

geoip2_compat backend:

placeholder attributes output example note
${city[lookup_field]} "Mountain View" -
${latitude[lookup_field]} 37.419200000000004 -
${longitude[lookup_field]} -122.0574 -
${country_code[lookup_field]} "US" -
${country_name[lookup_field]} "United States" -
${postal_code[lookup_field]} "94043"
${region[lookup_field]} "CA" -
${region_name[lookup_field]} "California" -

geoip2_compatバックエンドでは、上記の属性のみ使用可能です。

geoip2_c/geoip2_compatを利用するにはlibmaxminddbを事前にインストールする必要があります*1

libmaxminddb*2は多くのLinuxディスリビューションでパッケージ化されていてそれぞれのパッケージマネージャで簡単にインストールすることができます。

GeoIP2を利用できるfluent-plugin-geoip 0.7.0がリリース済みです。互換性のためにgeoip2_cとgeoip2_compatはdevelopment dependenciesになっているので、GeoIP2を利用したい場合は、利用したいバックエンドに対応したGemを事前にgem installするかGemfileに記載してbundle installするかしてください。

なおFluentd v0.14 APIへの対応はこのプルリクエストで進行中です。

類似プロダクト

調査過程で見つけたGeoIP Legacy/GeoIP2に対応したfluent-pluginの比較を載せておきます。おすすめはもちろんfluent-plugin-geoipです。

| | GeoIP Legacy | GeoIP2 | 速度 | 特徴 | |---------------------------------+--------------+--------+------+--------------------------------------| | fluent-plugin-geoip | ○ | ○ | 速い | | | fluent-plugin-filter-geoip | × | ○ | 遅い | データベースを自動ダウンロードできる | | fluent-plugin-geoip-filter | ○ | × | 速い | LRUキャッシュ搭載 | | fluent-plugin-filter-geo | × | ○ | 遅い | fluent-plugin-filter-geoipのfork |

まとめ

fluent-plugin-geoipのGeoIP2対応を進めたときの流れをまとめてみました。

  1. 既存のライブラリが要件を満たしているかどうか調査した
  2. 既存のライブラリだと要件を満たせなさそうなので、自分で拡張ライブラリを書いた
  3. ベンチマークで性能を確認
  4. 実際にfluent-plugin-geoipに組み込んでプルリクエストを出した
  5. プルリクエストを出した後も、project ownerのy-kenさんやFluentd開発者のrepeatedlyさんと協力して改善しfluent-plugin-geoip 0.7.0をリリースしていただいた
  6. 今後はFluentd v0.14 API対応したfluent-plugin-geoipリリースを目指す

Rubyの拡張ライブラリは簡単に書ける*3ので今後も機会があればどんどん書きたいです。

*1 geoip2_compatはlibmaxminddbのソースコードをバンドルしているのでgeoip2_compatを利用する場合は事前にインストールしなくてもよい

*2 geoip2_cで必要なのは開発用のファイルなのでlibmaxminddb-devまたはlibmaxminddb-develをインストールする

*3 geoip2_cは最初FFIを使って実装するつもりでしたがstructやunionのalignmentを考慮するのが辛かったので止めました

タグ: Fluentd
2017-04-18

2008|05|06|07|08|09|10|11|12|
2009|01|02|03|04|05|06|07|08|09|10|11|12|
2010|01|02|03|04|05|06|07|08|09|10|11|12|
2011|01|02|03|04|05|06|07|08|09|10|11|12|
2012|01|02|03|04|05|06|07|08|09|10|11|12|
2013|01|02|03|04|05|06|07|08|09|10|11|12|
2014|01|02|03|04|05|06|07|08|09|10|11|12|
2015|01|02|03|04|05|06|07|08|09|10|11|12|
2016|01|02|03|04|05|06|07|08|09|10|11|12|
2017|01|02|03|04|05|06|07|08|09|10|11|12|
タグ: