株式会社クリアコード > ククログ

ククログ

タグ:

Fluentd v0.14で導入されたstorageプラグインとは

はじめに

Fluentd v0.14では新たにstorageプラグインという新しいタイプのプラグインが導入されました。 FluentdをインストールしただけではJSON形式により保存される storage_local プラグインしかありませんが、このstorageプラグインはFluentdのプラグインのインスタンスが保持する値をKVSに集約することにも使用することができます。

storageプラグインとstorageプラグインヘルパー

Fluentd v0.14ではさらにプラグインヘルパーという概念も追加されました。storageプラグインにおいてもstorageプラグインを直接使うのではなく、storageプラグインヘルパーを通じて使うことが推奨されます。

storageプラグインのAPI

storageプラグインは以下のAPIを持ちます。これはKVSから値を取り出したり、保存したり、また取り出した値をキャッシュしておくのに合うAPIとなっています。

# basically, interact with KVS
def load
end

# basically, interact with KVS
def save
end

# Normally, the following methods work as `cache`.
def get(key)
end

def fetch(key, defval)
end

def put(key, value)
end

def delete(key)
end

def update(key, &block) # transactional get-and-update
end

このうち、 #load#save については実際のKVSに対して値を読み込んできたり、保存したりする役割を担います。 一方、 #get#fetch#put#delete#updateについてはstorageプラグインだけではキャッシュとして振る舞うことが求められます。

storageプラグインヘルパー

storageプラグインヘルパーはstorageプラグインを直接使わずにstorageプラグインの性質を変化させるように作成されています。 storageプラグインヘルパーの #wrap_instance メソッドにより、storageプラグインのインスタンスをそのまま使用するか、storageプラグインの値を永続化して同期を取るか、単に同期を取るかが決定されます。

Fluentdの実際のコードでは以下のようになっています。

def wrap_instance(storage)
  if storage.persistent && storage.persistent_always?
    storage
  elsif storage.persistent
    PersistentWrapper.new(storage)
  elsif !storage.synchronized?
    SynchronizeWrapper.new(storage)
  else
    storage
  end
end

<storage> セクションに persistent true が設定されていることや、 #persistent_always? の返す値、#synchronized? が返す値が振る舞いを変えることがわかります。

実際のstorageプラグインの例

これらを踏まえて、筆者はMongo、Redis、Memchachedについてのstorageプラグインをそれぞれ作成しました。

これらを用いると上記3つのKVSに対してstorageプラグインによりownerプラグインであるinput, output, filterプラグインの情報をKVSへ集約することができます。

まとめ

Fluentd v0.14で導入されたstorageプラグインの概要とstorageプラグインヘルパーを通した場合の振る舞いの変化について解説しました。 storageプラグインをうまく活用するとstorageプラグイン対応が入っているfluent-plugin-systemdfluent-plugin-windows-eventlogのようにどこまで読んだかの位置の記録をKVSに集約することができるようになります。

タグ: Fluentd
2017-04-19

fluent-plugin-geoip の geoip2 対応した話

fluent-plugin-geoipというIPアドレスから国や州・県などの情報を取得してレコードを加工するFluentdのプラグインがあります。

以前はGeoIP Legacyにあるデータベースを使ってIPアドレスから情報を取得していましたが、GeoIP2がリリースされてしばらく経過したのでGeoIP2に対応した話を書きます。

geoip2_cの開発

経緯はSupport GeoLite2 format · Issue #39 · y-ken/fluent-plugin-geoipに書いてありますが、少し抜粋します。

GeoIPについて調べているとGeoIP2を見つけ、さらにGeoIP2に対応したfluent-plugin-filter-geoipを見つけました。 そこで、fluent-plugin-filter-geoipの内部を調べ、どのようにしてGeoIP2に対応させているかを確認したところmaxminddbというピュアRuby実装のライブラリを使っていました。 他にGeoIP2を利用できるライブラリがあるかどうかを調査したところいくつか既存の実装がありました。

新たにfluent-plugin-geoipにGeoIP2対応を追加するにあたって、GeoIPを使用していたときと同等の性能を維持できるかどうかを確認するためにベンチマークをとりました。

ベンチマークによると、geoip2_compatであれば性能に問題はなさそうなことがわかりましたが、geoip2_compatだとGeoIP Legacyと同等のデータしか取得することができません。GeoIP2にはそれ以外のデータも多数追加されているので、できれば全ての機能を使えるようにしたいと考えていました。maxminddbとhive_geoip2はGeoIPよりも遅くなってしまうので使えません。maxmind_geoip2は速度的には問題なさそうでしたがAPIが独特で使い辛い感じでした。

それぞれの拡張ライブラリのコードを読んでみたところ、遅くなっていた原因は全ての属性値を取得していたことでした。速くするためには、必要な属性値のみ取得するようにすればよいはずです。 この仮説を検証するためにgeoip2_cを開発しました。

先程のベンチマークにgeoip2_cを追加したベンチマークによるとgeoip2_cが最速です。

Rehearsal ---------------------------------------------------------
geoip                   0.140000   0.000000   0.140000 (  0.147379)
geoip2_compat           0.110000   0.010000   0.120000 (  0.108135)
maxminddb (pure ruby)   4.310000   0.000000   4.310000 (  4.320897)
hive                    0.320000   0.000000   0.320000 (  0.321934)
maxmind_geoip2          1.240000   0.320000   1.560000 (  1.561630)
geoip2_c                0.070000   0.000000   0.070000 (  0.067715)
------------------------------------------------ total: 6.520000sec

                            user     system      total        real
geoip                   0.140000   0.000000   0.140000 (  0.142973)
geoip2_compat           0.160000   0.000000   0.160000 (  0.162996)
maxminddb (pure ruby)   4.650000   0.000000   4.650000 (  4.654088)
hive                    0.310000   0.000000   0.310000 (  0.308363)
maxmind_geoip2          1.350000   0.430000   1.780000 (  1.780049)
geoip2_c                0.080000   0.010000   0.090000 (  0.078209)
bundle exec ruby bench.rb  13.26s user 0.83s system 99% cpu 14.134 total

geoip2_cはIPアドレスでlookupを実行しただけでは、実際の値を取得しません。他のライブラリはlookupの時点で値を取得しています。GeoIP2のライブラリでは取得する値が多ければ多いほど処理に時間がかかります。geoip2_cでもGeoIP2で利用できる値を全て取得すると処理に時間がかかるようになります。 利用可能な属性数は、表の通りです。

ライブラリ 利用可能な属性数
geoip 9
geoip2_compat 8
geoip2_c 7+17+(4*7)=52
hive_geoip2 7+17+(4*7)=52

geoip2_cでは例えば、以下のような属性を取得することができますが、実際のアプリケーションでは全ての属性を必要とすることは少ないでしょう。よって必要な属性を必要なときに取得するようにした方が効率がよいです。なお、GeoIP2ではIPアドレスによって取得できる属性に違いがあります。

{"city"=>{"geoname_id"=>10300919, "names"=>{"en"=>"Fort Huachuaca"}},
 "continent"=>
  {"code"=>"NA",
   "geoname_id"=>6255149,
   "names"=>
    {"de"=>"Nordamerika",
     "en"=>"North America",
     "es"=>"Norteamérica",
     "fr"=>"Amérique du Nord",
     "ja"=>"北アメリカ",
     "pt-BR"=>"América do Norte",
     "ru"=>"Северная Америка",
     "zh-CN"=>"北美洲"}},
 "country"=>
  {"geoname_id"=>6252001,
   "iso_code"=>"US",
   "names"=>
    {"de"=>"USA",
     "en"=>"United States",
     "es"=>"Estados Unidos",
     "fr"=>"États-Unis",
     "ja"=>"アメリカ合衆国",
     "pt-BR"=>"Estados Unidos",
     "ru"=>"США",
     "zh-CN"=>"美国"}},
 "location"=>
  {"accuracy_radius"=>1000,
   "latitude"=>31.5273,
   "longitude"=>-110.3607,
   "metro_code"=>789,
   "time_zone"=>"America/Phoenix"},
 "postal"=>{"code"=>"85613"},
 "registered_country"=>
  {"geoname_id"=>6252001,
   "iso_code"=>"US",
   "names"=>
    {"de"=>"USA",
     "en"=>"United States",
     "es"=>"Estados Unidos",
     "fr"=>"États-Unis",
     "ja"=>"アメリカ合衆国",
     "pt-BR"=>"Estados Unidos",
     "ru"=>"США",
     "zh-CN"=>"美国"}},
 "subdivisions"=>
  [{"geoname_id"=>5551752,
    "iso_code"=>"AZ",
    "names"=>
     {"de"=>"Arizona",
      "en"=>"Arizona",
      "es"=>"Arizona",
      "fr"=>"Arizona",
      "ja"=>"アリゾナ州",
      "pt-BR"=>"Arizona",
      "ru"=>"Аризона"}}]}

fluent-plugin-geoipのGeoIP2対応について

GeoIP2サポートする際、なるべくGeoIP Legacyと互換性を保つためにgeoip2_compatを利用し、GeoIP2で利用できる属性を全て使用するためにgeoip2_cを使用することにしました。 設定によってGeoIP Legacyも利用できるようにしました。

それぞれで利用できる属性は以下の通りです。

GeoIP Legacy:

placeholder attributes output example type note
${city[lookup_field]} "Ithaca" varchar(255) -
${latitude[lookup_field]} 42.4277992248535 decimal -
${longitude[lookup_field]} -76.4981994628906 decimal -
${country_code3[lookup_field]} "USA" varchar(3) -
${country_code[lookup_field]} "US" varchar(2) A two-character ISO 3166-1 country code
${country_name[lookup_field]} "United States" varchar(50) -
${dma_code[lookup_field]} 555 unsigned int only for US
${area_code[lookup_field]} 607 char(3) only for US
${region[lookup_field]} "NY" char(2) A two character ISO-3166-2 or FIPS 10-4 code

geoip2_c backend:

placeholder attributes output example note
${city.names.en[lookup_field]} "Mountain View" -
${location.latitude[lookup_field]} 37.419200000000004 -
${location.longitude[lookup_field]} -122.0574 -
${country.iso_code[lookup_field]} "US" -
${country.names.en[lookup_field]} "United States" -
${postal.code[lookup_field]} "94043" -
${subdivisions.0.iso_code[lookup_field]} "CA" -
${subdivisions.0.names.en[lookup_field]} "California" -

geoip2_cバックエンドでは、上記の属性だけでなくGeoIP2のデータベースに含まれる全ての属性を使用可能です。

geoip2_compat backend:

placeholder attributes output example note
${city[lookup_field]} "Mountain View" -
${latitude[lookup_field]} 37.419200000000004 -
${longitude[lookup_field]} -122.0574 -
${country_code[lookup_field]} "US" -
${country_name[lookup_field]} "United States" -
${postal_code[lookup_field]} "94043"
${region[lookup_field]} "CA" -
${region_name[lookup_field]} "California" -

geoip2_compatバックエンドでは、上記の属性のみ使用可能です。

geoip2_c/geoip2_compatを利用するにはlibmaxminddbを事前にインストールする必要があります*1

libmaxminddb*2は多くのLinuxディスリビューションでパッケージ化されていてそれぞれのパッケージマネージャで簡単にインストールすることができます。

GeoIP2を利用できるfluent-plugin-geoip 0.7.0がリリース済みです。互換性のためにgeoip2_cとgeoip2_compatはdevelopment dependenciesになっているので、GeoIP2を利用したい場合は、利用したいバックエンドに対応したGemを事前にgem installするかGemfileに記載してbundle installするかしてください。

なおFluentd v0.14 APIへの対応はこのプルリクエストで進行中です。

類似プロダクト

調査過程で見つけたGeoIP Legacy/GeoIP2に対応したfluent-pluginの比較を載せておきます。おすすめはもちろんfluent-plugin-geoipです。

GeoIP Legacy GeoIP2 速度 特徴
fluent-plugin-geoip 速い
fluent-plugin-filter-geoip × 遅い データベースを自動ダウンロードできる
fluent-plugin-geoip-filter × 速い LRUキャッシュ搭載
fluent-plugin-filter-geo × 遅い fluent-plugin-filter-geoipのfork

まとめ

fluent-plugin-geoipのGeoIP2対応を進めたときの流れをまとめてみました。

  1. 既存のライブラリが要件を満たしているかどうか調査した
  2. 既存のライブラリだと要件を満たせなさそうなので、自分で拡張ライブラリを書いた
  3. ベンチマークで性能を確認
  4. 実際にfluent-plugin-geoipに組み込んでプルリクエストを出した
  5. プルリクエストを出した後も、project ownerのy-kenさんやFluentd開発者のrepeatedlyさんと協力して改善しfluent-plugin-geoip 0.7.0をリリースしていただいた
  6. 今後はFluentd v0.14 API対応したfluent-plugin-geoipリリースを目指す

Rubyの拡張ライブラリは簡単に書ける*3ので今後も機会があればどんどん書きたいです。

*1 geoip2_compatはlibmaxminddbのソースコードをバンドルしているのでgeoip2_compatを利用する場合は事前にインストールしなくてもよい

*2 geoip2_cで必要なのは開発用のファイルなのでlibmaxminddb-devまたはlibmaxminddb-develをインストールする

*3 geoip2_cは最初FFIを使って実装するつもりでしたがstructやunionのalignmentを考慮するのが辛かったので止めました

タグ: Fluentd
2017-04-18

Fluentd v0.14.13で追加されたコマンドのご紹介

Fluentd v0.14.13が2017-02-27にリリースされていました。新しいコマンドが2つ追加されたのでご紹介します。

  • fluent-plugin-config-format
  • fluent-plugin-generate

fluent-plugin-config-format

プラグインの設定をいくつかのフォーマットで出力するコマンドです。 README.mdを書くのを楽にしたりコマンドラインからプラグインの設定値の意味を調べるためのものです。 Fluentd v0.12.16でプラグインの設定をコマンドラインから確認する方法で紹介したものの実装を改善して使いやすくしました。

$ fluent-plugin-config-format --help
Usage: fluent-plugin-config-format [options] <type> <name>

Output plugin config definitions

Arguments:
        type: input,output,filter,buffer,parser,formatter,storage
        name: registered plugin name

Options:
        --verbose                    Be verbose
    -c, --compact                    Compact output
    -f, --format=FORMAT              Specify format. (markdown,txt,json)
    -I PATH                          Add PATH to $LOAD_PATH
    -r NAME                          Load library
    -p, --plugin=DIR                 Add plugin directory

以下のコマンドは同じ意味です。

$ fluentd --show-plugin-config input:tail
$ fluent-plugin-config-format --format=txt input tail

plugin helperや Fluent::Plugin::Output の設定についてはdocs.fluentd.orgへのリンクにしたいところですが、2017年2月某日時点ではまだページがないので、リンクを生成することができていません。 ドキュメントが書かれたら、リンクを追加したいと考えています。

fluent-plugin-generate

新規プラグインを開発するときに使用するテンプレートを生成するためのコマンドです。 これまでは、既存のプラグインからコピーしたり、公式のドキュメントからコピーしたりしていましたが、このコマンドを使うことによって、これまでよりも簡単にプラグインの雛形を生成することができるようになりました。

$ fluent-plugin-generate --help
Usage: fluent-plugin-generate [options] <type> <name>

Generate a project skeleton for creating a Fluentd plugin

Arguments:
        type: input,output,filter,parser,formatter
        name: Your plugin name

Options:
        --[no-]license=NAME          Specify license name (default: Apache-2.0)

例えば、Zulipに出力するプラグインを作成するときは以下のように利用します。

$ fluent-plugin-generate output zulip
License: Apache-2.0
        create Gemfile
        create README.md
        create Rakefile
        create fluent-plugin-zulip.gemspec
        create lib/fluent/plugin/out_zulip.rb
        create test/helper.rb
        create test/plugin/test_out_zulip.rb
Initialized empty Git repository in /tmp/fluent-plugin-zulip/.git/

このように、必要なファイルが一通り生成されるのですぐに開発を始めることができます。 ユニットテストにはFluentd本体と同じ test-unit を使用しています。 ただし、このコマンドではFluentd v0.12以前のAPIを使用したプラグインを開発するためのファイルを生成することはできません。

fluent-plugins-nursery/fluent-plugin-zulip はこのコマンドを使用して開発を始めました。 README.mdのConfigurationの項は fluent-plugin-config-format を使用して生成しました。

まとめ

Fluentd v0.14.13で追加された2つの新しいコマンドについて紹介しました。 他の変更点についてはChangeLogを参照してください。

タグ: Fluentd
2017-03-09

fluent-plugins-nurseryのご紹介

クリアコードではFluentdの開発に参加しています。 以前の記事でも紹介した通り多くのプラグインの開発にも参加しています。

Fluentd v0.14系がリリースされたので、最近はプラグインをFluentdのv0.14 APIに対応させるようなプルリクエストを送っています。

移行手順については公式のドキュメントに説明があり、ククログにもいくつか記事があります。なので、やろうと思えばすぐにFluentd v0.14 APIに移行することができます。

Fluentdには600以上のプラグインがありますが、中にはメンテナンスが活発でない*1プラグインもあります。 今のところ足りないドキュメントがあったり、新APIに移行した事例が少なかったりして、新APIへの移行をためらっているプラグイン作者もいると思われます。

そこでFluentdのプラグインを安定してメンテナンスできる体制を構築し、Fluentdのプラグインを新しいAPIに対応する活動を活発にするためにfluent-plugins-nurseryというGitHub Organizationを作りました*2

fluent-plugins-nurseryの目的

Fluentdのプラグインのために、安定したメンテナンス体制を提供することが目的です。

元のプラグイン作者が、何らかの理由でメンテナンスできない状態になったときに、メンテンスを引き取ることができる場所を提供します。

fluent-plugins-nurseryへ参加することのメリット・デメリット

  • plus1 元のプラグイン作者が何もしなくてもIssueやPull requestに対応する
  • plus1 元のプラグイン作者が何もしなくてもFluentdの新機能に対応する
  • plus1 元のプラグイン作者が何もしなくてもRubyやTravis CIの設定が最新のものに対応する
  • plus1 差分付きコミットメールがメーリングリストに飛ぶ *3
  • -1 fluent-plugins-nurseryチームが苦手な分野*4については、対応が遅れる可能性がある

FAQ

  • question 乗っ取りなの?
  • bulb いいえ。元のプラグイン作者が望まない限りfluent-plugins-nurseryに権限が移行されることはありません。
  • question 元のプラグイン作者はメンテナンスもリリースもできなくなるの?
  • bulb いいえ。元のプラグイン作者は自身が望めば、権限を維持することができます。完全に引き継いで手を引くこともできます。
  • question 元のプラグイン作者が行方不明です!どうしましょう!?
  • bulb 落ち着いてください。インターネット上で見つかるはずなので探してみましょう。もし見つけられなかったら相談してください。
  • question 私、プラグイン作者なんですが、Fluentdの新機能に対応したいです!でも、どうしたらいいかわかりません。
  • bulb Fluentdのコミュニティで相談してください。
  • question Issueをたてる前に相談したい。
  • bulb fluent-plugins-nusery/Lobby で細かい相談ができます。

fluent-plugins-nursery への参加方法

プラグイン作者がやることは少ないです。以下の簡単なステップを実行するだけで、最新のFluentdに対応する作業、issueやPull requestへの対応をfluent-plugins-nurseryのメンバーがベストエフォートで行います。 詳細はfluent-plugins-nursery/contactを確認してください。

  1. プラグイン作者がfluent-plugins-nursery/contactにissueを作る

  2. fluent-plugins-nurseryの管理者が accepted ラベルを付ける

  3. プラグイン作者がTransfer ownershipする

  4. プラグイン作者がGemのownerを追加する

  5. fluent-plugins-nurseryの管理者が他の管理者もGemのリリースができるようにする

まとめ

Fluentdのプラグインのメンテナンスに使う時間が足りないなー、新機能に対応したいけど難しそうだなーと思っているプラグイン作者はぜひfluent-plugins-nurseryまでご連絡ください。 もし、身の回りのプラグイン作者が、メンテナンスに困っている様子だったらfluent-pluguins-nurseryを紹介してあげてください。

*1 小さなプルリクエストを送っても半年以上マージしてもらえない

*2 rust-lang-nurseryを参考にしました

*3 See http://www.commit-email.info/

*4 マイナーなウェブサービスや使うのにものすごく手間がかかるもの

つづき: 2017-01-05
タグ: Fluentd
2016-11-09

v0.14 Outputプラグインの仕様解説

はじめに

クリアコードはFluentdというソフトウェアの開発に参加しています。Fluentdはログ収集や収集したデータの分配・集約などを行うソフトウェアです。

v0.14での新機能を使ったプラグインを作成する際にはこれまでの Fluent 以下のクラスではなく、Fluent::Plugin 以下のクラスを継承し、実装する必要が出てきました。 また、v0.14のOutputプラグインはv0.12とは異なり、Fluent::Plugin::Output クラスに様々な機能が入っています。これらの機能をプラグイン開発者向けに解説することを目指します。

この記事はv0.14.8以降が対象です。 まずは、Outputプラグインが必ず実装するべきメソッドについてのおさらいです。

non-buffered

def emit(tag, es, chain)
  # ...
  chain.next
end

def process(tag, es)
  # ...
end

と読み替えます。 output#process(tag, es) だけを実装するとnon-bufferedプラグインになります。

例えば、out_relabel の使用例があります。

buffered synchronous

output#write(chunk) を実装するとbuffered outputプラグインになります。

def write(chunk)
  # ...
end

例えば、out_stdout の使用例があります。

buffered asynchronous

output#try_write(chunk) を実装するとbuffered asynchronous outputプラグインになります。

def try_write(chunk)
  # ...
end

out_stdout の使用例があります。ただし、これはテスト用の実装のため、実用のものとは異なることに注意してください。

また、#commit_write(chunk_id) を呼び、chunkのwriteを確定させることが必要です。 rollback_writecommit_write が行われないまま指定秒数が経過した chunk に対して自動的に呼ばれるので、プラグイン開発者が明示的に呼ぶ必要は通常はありません(秒数は delayed_commit_timeout で設定から制御可能)。

ここまでがv0.14のOutputプラグインの基本的な事柄です。

では、さらにv0.14のプラグイン開発者にとって必要なことを順々に見ていきましょう。

custom format

#format(tag, time, record) を実装すると、bufferのchunkでmsgpack以外のformatが使用できるようになります。

#format を使用すると、

def formatted_to_msgpack_binary
  true
end

としてtrueを返すようにしなければ chunk#msgpack_each メソッドは使用できません。

chunk#msgpack_each

v0.12のObjectBufferedOutput互換になるのは #format を実装していない場合です。 #format の有無や、 #formatted_to_msgpack_binary の返り値によって挙動が異なってくるのに注意してください。

standard format

chunk#msgpack_each でyieldされてくる値は #format を実装している時とそうでない時で異なります。

def write(chunk)
  chunk.msgpack_each do |time, record|
    # ...
  end
end

ただし、#msgpack_each は互換性のために残されているものです。 通常は chunk.each を使ってください。msgpack_each も(主に互換性の関係から) alias が定義されていますが、本来 chunk の内部フォーマット(msgpack)を意識させたメソッドを使うのは好ましくありません。

tagが必要な場合は、

config_section :buffer do
  config_set_default :@type, DEFAULT_BUFFER_TYPE
  config_set_default :chunk_keys, ['tag']
end

のようなbufferのdefault confを足し、chunk.metadata.tag で取得してください。

また、tag が必要な場合 config_set_default :chunk_keys, ['tag'] を指定しておくのはよいですが、これは設定で上書きされる可能性があるため #configure でチェックを行うべきです。

def configure(conf)
  super

  raise Fluent::ConfigError, "chunk keys must include 'tag' for this plugin" unless @chunk_key_tag
  # ...
end
custom format

#format(tag, time, record) を実装した場合は、to_msgpackでmsgpackへパックした順にmsgpack_eachをすると得られます。 また、#formatted_to_msgpack_binary をオーバーライドしてtrueを返すようにしてください。

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

def formatted_to_msgpack_binary
  true
end

def write(chunk)
  chunk.msgpack_each do |tag, time, record|
    # ...
  end
end

injectヘルパーを使う場合は #format(tag, time, record) を通すことでより見通しが良くなります。そのため、 #format を実装し、その中で inject_values_to_record(tag, time, record) を呼ぶようにしてください。

発展形

v0.14のOutputプラグインはオーバーライドするメソッドや実装するメソッドにより、confの設定により実行時に3種の異なる種別のOutputプラグインへ切り替えることができます。

non-bufferedとbufferedの切り替え

これは以下の優先順位で行われます:

  1. 実装メソッドによる分岐 (例: #process しか実装されていない → non-buffered)
  2. 両方実装されている場合で、かつ設定において <buffer> セクションが指定されている場合 → buffered
  3. 両方実装されており設定にセクションが指定されていない場合 → #prefer_buffered_processing を呼んで判定
buffered synchronous/asynchronousの切り替え

output#writeoutput#try_write を実装して #prefer_delayed_commit の返り値のtrue/falseでbuffered synchronousとbuffered asynchronousを切り替えられます。

  • true -> buffered asynchronous
  • false -> buffered synchronous

output#writeoutput#try_write のどちらか一方だけ実装している場合は、#prefer_delayed_commit は呼ばれません。

bufferedプラグインの注意点

#write, #try_write を実装していないOutputプラグインへのconfigには <buffer> ディレクティブが使用できません。

複合形
#prefer_delayed_commit #prefer_buffered_processing 結果
false false non-buffered
false true buffered synchronous
true true buffered asynchronous
true false 選択不可

secondaryの扱い

secondaryに指定されたプラグインはbufferingのサポートが必要です。out_fileなどのbufferingをサポートしたoutputプラグインを指定できます。

bufferディレクティブのCHUNK_KEYSアトリビュート

<buffer CHUNK_KEYS>のようにbufferディレクティブにはCHUNK_KEYSのアトリビュートの指定が可能です。 tag, timekey, variablesの指定ができるようになっています。これはこのアトリビュートによってチャンクをひとまとめにするためにあります。

  • tag →タグごとにチャンクがまとめられる
  • timekey →time formatごとにチャンクがまとめられる
  • variables →レコードの中のキーごとのチャンクがまとめられる

buffered outputプラグインのflushで用いられるthread

start時に <buffer> ディレクティブにある flush_thread_count で指定されている数のスレッドを作ります。#submit_flush_once は単にそれらのスレッドを明示的にアクティブにしているだけです。

v0.12のbuffered outputプラグインの自前スレッドの書き換え

プラグインが自前で作成していたスレッドは以下のようにできるはずです。

  • 定期的にある処理を行う必要があった場合 → timer plugin helper を使う
  • Fluent::Output プラグインを継承していたが(ある設定が有効なときのみ)バックグラウンドでflushするような処理を自前で書いていた → #process および #write 両方を実装して設定により挙動を切り替える
  • socketをlistenしていた → socket/server plugin helper を使う(これから実装される)

それ以外の場合は thread plugin helper を使います。自前で Thread.new するべきではありません。thread plugin helperを使う場合、plugin test driverがそのスレッドの状態管理などの面倒を見てくれるため、たまに失敗するテスト、などの危険性が大幅に低下します。

プレースホルダ

chunk.metadata が実際にどの値を有しているかは <buffer CHUNK_KEYS>CHUNK_KEYS に何をユーザが指定したか(あるいは config_set_default で何が指定されていたか)により異なります。 が、プラグイン作者が独自にチェックするべきではなく #configure 内で #placeholder_validate!("name_of_parameter", @name_of_parameter) を使うべきです。使われているプレースホルダと chunk key の間に不整合があれば configuration error が上がります。 (もっと細かい制御もやろうと思えばできますが、コーナーケースです。こちらの議論を参照してください。)

つまりプラグイン作者は #configure 内で #placeholder_validate! し、そこが通っているならあとは #writeextract_placeholder(@name_of_parameter, chunk.metadata) するだけでよいです。

${tag}

chunkに含まれるタグに展開されます。 また、tag1.tag2.tag3.... のようなタグとなっている場合、 ${tag[0]}, ${tag[1]}, ${tag[2]},...のようにタグの添え字を指定することで個別に取り出すことができます。

strftime形式(%Y%m%dなど)

strftimeのフォーマットに準じて展開されます。 variable_%Y-%m-%dT%H:%M:%S.%N のように用います。 これは variable_2015-12-25T12:34:56.123450000 のように展開されます。

まとめ

v0.14のOutputプラグインの仕様をFluentdの開発者の協力を仰ぎ*1書き出してみました。v0.12のoutputプラグインと変わっている箇所も多く、単純にv0.14への移行は難しい箇所もあります。 v0.14のAPIを使うように移行するとプラグインヘルパーやプレースホルダーの機能により、より柔軟なconfの設定を書くことが可能になります。例えば、プレースホルダーの機能を使ったものとしては、fluent-plugin-mysql のテーブル名へのプレースホルダーを指定可能にする機能*2 を実装したものがあります。このようにタグや日付ごとのデータ集計をサポートする機能を簡単に実装できるようになるというメリットがあるため、v0.14のAPIを使うように移行を試みてみるのはいかがでしょうか?

*1 この記事を書くに当たって @tagomoris さんのレビューの協力を仰ぎました。ありがとうございます。

*2 https://github.com/tagomoris/fluent-plugin-mysql#configuration-examplebulk-insert-with-tag-placeholder-for-table-name や https://github.com/tagomoris/fluent-plugin-mysql#configuration-examplebulk-insert-with-time-format-placeholder-for-table-name を参照。

つづき: 2017-01-05
タグ: Fluentd
2016-10-20

Fluentd v0.14のプラグインヘルパーの使い方

はじめに

クリアコードではOSS開発支援サービスの一環でTreasureDataさんが中心になって開発し公開しているFluentdとそのプラグインなど*1の開発を支援しています。 Fluentd v0.14からはプラグインでよく使われる定型の処理をより簡易に扱えるようにするプラグインヘルパーが実装されました。 この記事ではv0.14の執筆時点のプラグインヘルパーの使用方法の概説を行います。また、プラグインヘルパーを解説した各節の最後には実際の使われている箇所の一例を示します。

プラグインの使い方の概要

プラグインヘルパーは以下のようにhelpersに可変長引数で指定します。

helpers :plugin_name1, :plugin_name2, ..., plugin_nameN

また、helpersは複数回呼ぶこともできます。プラグイン毎にhelpersで使いたいプラグインヘルパーを可変長引数で指定するか複数回指定するかのポリシーを決めておくとよいでしょう。

child_process

プラグインから子プロセスを起動する定型の処理をカプセル化し、より簡易に扱えるようにするためのプラグインヘルパーです。

helpers :child_process
# ...
def start
  super
  child_process_execute(title, command[, ...]) do |io|
  end
  # ...
end
# ...
end

のように使用します。一定間隔で子プロセスを起動するには

config_param :command, :string
config_param :run_interval, :time
# ...
def start
  super
  child_process_execute(:example_child_process, @command, interval: @run_interval, mode: [:read]) do |io|
  end
  # ...
end
# ...
end

のようにします。一回だけ起動する場合は

config_param :command, :string
config_param :run_interval, :time
# ...
def start
  super
  child_process_execute(:example_child_process, @command, immediate: true, mode: [:read]) do |io|
  end
  # ...
end
# ...
end

のように使用します。このプラグインヘルパーは例えばin_execプラグインで使用されています。

child_processプラグインヘルパーの注意点

第一引数はシンボルである必要があります。また、この引数で渡されたシンボルはグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

compat_parameters

v0.12のconfigのflat styleからv0.14のstructured styleへ自動的にマッピングさせるためのプラグインヘルパーです。

v0.12のconfigではbufferのconfigは以下のような形式でした。

<match **>
  @type file
  path /path/to/file/*.log
  buffer_type file
  buffer_path /path/to/buffer/*.log
  time_sliced_format %Y%m%d%H # sliced per hour
  buffer_chunk_limit 16m
</match>

v0.14のconfigでは以下のような形式で書く必要があります。

<match **>
  @type file
  path /path/to/file/*.log
  <buffer>
     @type file
     path /path/to/buffer/*.log
     time_key 60m # sliced per hour
     chunk_size_limit 16m
  </buffer>
</match>

compat_parametersプラグインヘルパーを使用する事でflat styleからstructured styleへ自動的にマッピングできます。 以下、その概要です。

helpers :compat_parameters
# ...

def configure(conf)
  # ...
  compat_parameters_convert(conf, :buffer)
  super
  # ...
end

のように使用します。また、このプラグインヘルパーは後述するformatterやinject、parserプラグインと一緒に使用することがあります。

その場合は、

helpers :inject, :formatter, :compat_parameters
# ...
def configure(conf)
  # ...
  compat_parameters_convert(conf, :inject, :formatter)
  # ...
end

のように、conpat_parametersを通して変換したいプラグインのtypeを指定します。 例えばfilter_stdoutプラグインがこのプラグインヘルパーを使用しています。

event_emitter

v0.14のFluentdの Fluent::Plugin::Output クラスはデフォルトで emit が使用できなくなりました。 v0.14のOutputプラグインでは Engine.emit を直接呼べなくなるので router.emit を行いたい場合はこのプラグインヘルパーを使う必要があります。

使い方は

helpers :event_emitter

とするのみです。 v0.12のOutputプラグインの時と同じように router.emit を呼べるようになります。 このプラグインヘルパーは例えばout_relabelプラグインヘルパーで使用されています。

event_loop

event_loopヘルパーは Coolio::Loop クラスをカプセル化し、より手軽に扱えるようにしたプラグインヘルパーです。

helpers :event_loop
# ...
def start
  # ...
  @handler = ...
  event_loop_attach(@handler)
  # ...
end

のようにして使います。 @handler の中身は Fluent::SocketUtil::UdpHandlerCoolio::TCPServer などのインスタンスを入れることになります。 このプラグインヘルパーは後述のtimerプラグインヘルパーの内部で使われているため、暗黙的に使われていることが多いです。 in_syslogプラグインは明示的にこのプラグインヘルパーを使っています。

formatter

formatterを使うにはv0.12では Fluent::Plugin.new_formatter を呼び、Formatterのインスタンスを作成する一連の処理を書く必要がありました。 この処理ををカプセル化し、より手軽にformatterをプラグイン中で使用するためのプラグインヘルパーです。

このプラグインヘルパーは

helpers :formatter
DEFAULT_FORMAT_TYPE = 'out_file'
# ...
def configure(conf)
  # ...
  super
  @formatter = formatter_create(usage: 'example_format', conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE)
  # ...
end

def format(tag, time, record)
  # ...
  @formatter.format(tag, time, record)
  # ...
end

のようにして使います。

また、実際のFluentdのプラグインでv0.12形式のconfigも扱う必要がある場合はcompat_parametersプラグインと一緒に以下のようにして使います。

helpers :compat_parameters, :formatter
DEFAULT_FORMAT_TYPE = 'out_file'
# ...
def configure(conf)
  compat_parameters_convert(conf, :formatter)
  super
  @formatter = formatter_create(usage: 'example_format', conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE)
  # ...
end

def format(tag, time, record)
  # ...
  @formatter.format(tag, time, record)
  # ...
end

formatterプラグインヘルパーは多くのプラグインで使用されています。compat_parametersプラグインヘルパーと組み合わせて使っている例として、filter_stdoutプラグインがあります。 また、Fluent::Plugin::Output クラスを継承している場合、formatter#format はプラグイン中の #format を実装している場合、このメソッドがバッファを書き出す前に呼ばれるようになります。 #format にレコードのフォーマット操作のみの役割を持たせる場合、テストもしやすくなるのでおすすめです。

formatterプラグインヘルパーの注意点

第一引数は文字列である必要があります。また、この引数で渡された文字列はグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

inject

injectプラグインヘルパーはホスト名、タグ、時刻をレコードへ挿入するためのプラグインヘルパーです。 このプラグインヘルパーは

helpers :inject
# ...
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  # ...
end

のようにして使用します。基本的にrecordにアクセス出来る箇所であればどこでも使えますが、テストコードの関係上#format メソッドの中で利用するのが良いでしょう。

このプラグインヘルパーはv0.12の SetTagKeyMixinSetTimeKeyMixinhostname プレースホルダーの置き換えを狙ったものです。 このプラグインヘルパーは例えば、out_stdoutプラグインで使用されています。 3rdパーティ製のプラグインでもこのプラグインヘルパーの使いどころはかなりあるはずなので、使えそうな箇所を探してみてください。

parser

formatterを使うにはv0.12では Fluent::Plugin.new_parser を呼び、Parserのインスタンスを作成する一連の処理を書く必要がありました。 この処理ををカプセル化し、より手軽にParserをプラグイン中で使用するためのプラグインヘルパーです。

このプラグインヘルパーは

helpers :parser
DEFAULT_PARSER_TYPE = 'syslog'
# ...
def configure(conf)
  # ...
  super
  @parser = parser_create(usage: 'example_parse', type: DEFAULT_PARSER_TYPE, conf: conf)
  # ...
end

def do_something(text)
  # ...
  @parser.parse(text) do {|time, record|
     # ...
  }
  # ...
end

のようにして使われます。 このプラグインヘルパーは使われ方が若干特殊ですがin_syslogプラグインで使われています。

parserプラグインヘルパーの注意点

第一引数は文字列である必要があります。また、この引数で渡された文字列はグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

retry_state

retry_stateプラグインヘルパーはOutputプラグイン等でリトライ処理を行う際の決まったコードをカプセル化し、より簡易に利用出来るようにする機能を提供します。 このプラグインヘルパーはOutputプラグインのリトライ処理を切り出したもので、組み込み以外のプラグインでも組み込みのプラグインと同じようなリトライのロジックをより簡易に扱えるようになります。

helpers :retry_state
# ...
config_section :buffer, param_name: :buffer_config, init: true, required: false, multi: false, final: true do
  config_param :retry_timeout, :time, default: 72 * 60 * 60
  config_param :retry_type, :enum, list: [:exponential_backoff, :periodic], default: :exponential_backoff
  config_param :retry_wait, :time, default: 1
end
# ...
def start
  super
  @retry_state = retry_state_create(
    :example_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout
  )
  # ...
end

のようにして使います。このプラグインヘルパーはOutputプラグインの継承元クラスを実装しているoutput.rbで使用されています。

storage

v0.14ではプラグインの状態をKey-Value形式で保存するのに用いるStorageプラグインが新たに導入されました。 storageプラグインヘルパーはv0.14で新たに導入されたこのプラグインを使用する際の決まったコードをカプセル化し、より簡易に利用出来るようにする機能を提供します。

このプラグインヘルパーは

helpers :storage

DEFAULT_STORAGE_TYPE = 'local'
#...
def configure(conf)
  super
  @storage = storage_create(usage: 'example_storing_value', conf: config, default_type: DEFAULT_STORAGE_TYPE)
end

def start
  super
  @storage.put(:example_value, 0) unless @storage.get(:example_value)
  # ...
end

def do_something
  @sutorage.update(:example_value){|v| v + 1 }
end

のようにして使用します。 このプラグインヘルパーはin_dummyプラグインのsuspendオプションの実装に使用しました。

storageプラグインヘルパーの注意点

第一引数は文字列である必要があります。また、この引数で渡された文字列はグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

thread

threadプラグインヘルパーはプラグインで新たにThreadを立てる必要がある際の決まったコードをカプセル化し、より簡易に利用できるようにする機能を提供します。

このプラグインヘルパーは

helpers :thread
# ...
def start(conf)
  super
  thread_create(:example_usage, &method(:run))
  # ...
end

def run
  # executed on other thread
end

# And thread lifecycle is managed by thread plugin helper automatically.

のようにして利用します。 thread_create のブロックは必須です。このブロックが別スレッドで実行されます。 このプラグインはevent_loopプラグインヘルパーの中で使用され、また、timerプラグインがevent_loopプラグインヘルパーに依存しているため、知らず知らずの内に使っている事が多いです。 明示的に使用している例としてはin_dummyプラグインがあります。

threadプラグインヘルパーの注意点

第一引数はシンボルである必要があります。また、この引数で渡されたシンボルはグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

timer

timerプラグインヘルパーはプラグインで新たに高精度なタイマーを実装する必要がある際の決まったコードをカプセル化し、より簡易に利用できるようにする機能を提供します。

このプラグインヘルパーは

helpers :timer
# ...
config_param :emit_interval, :time, default: 60
#...
def start(conf)
  super
  timer_execute(:example_timer, @emit_interval, &method(:on_timer))
  # ...
end

def on_timer
  # periodically execution block by timer
end

# And timer lifecycle is managed by timer plugin helper automatically.

のようにして利用します。 timer_execute のブロックは必須です。このブロックが別スレッドで実行されます。

また、一回のみの実行で良い場合は repeat: false をtimer_executeに渡します。

timer_execute(:example_timer, @emit_interval, repeat: false) do
  # one-shot timer execution block
end

in_gc_statプラグインではこのプラグインヘルパーを使っています。

timerプラグインヘルパーの注意点

第一引数はシンボルである必要があります。また、この引数で渡されたシンボルはグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

まとめ

執筆時点でのプラグインヘルパーの使い方と使われている箇所について解説しました。 v0.14ではプラグインでよく使われる定型の処理についてより簡易に扱えるようにプラグインヘルパーが実装されました。 プラグインでより高度なことをするのにためらっていたプラグイン開発者はv0.14向けのプラグインではこれらのプラグインヘルパーを使い、よりよいプラグインを目指されてみてはいかがでしょうか?

*1 fluent-plugin-xxxやfluent-logger-xxxが多い

つづき: 2017-01-05
タグ: Fluentd
2016-08-30

OSS開発支援の一例: Fluentdとその関連ソフトウェア

クリアコードではOSS開発支援サービスの一環でTreasureDataさんが中心になって開発し公開しているFluentdとそのプラグインなど*1の開発を支援しています。 かれこれ一年ほどやってきたので、どのようなことをやってきたのかふりかえります。

TreasureDataさんの目的はFluentdの普及促進です。Fluentdが普及すると、TreasureDataさんのサービスを利用する人が増えるためです。

クリアコードはTreasureDataさんの目的を達成するために以下のことをやっています。

  • TreasureData さんが手の回っていないところをサポートする
    • IssueやPRに対応する
    • メンテナのいないプロダクトでよく使われているものはメンテナンスを引き取る
    • 新機能の開発
    • バグの修正
    • その他、Fluentdの普及促進につながること
  • Fluentd やその関連ソフトウェアに関する技術情報を発信する
    • ククログやQiita等に記事を書く

直接依頼されたことだけでなく、Fluentdの普及促進につながることを積極的にやっています。

なお、TreasureDataさんとは基本的にはIssue/PullRequestでやりとりしています。作業の優先順位やIssue/PullRequestで相談しづらいことを相談するときはSlackを使用しています。

IssueやPRに対応する

IssueやPRにコメントを書くことによって、コミュニティが活発になります。コミュニティが活発になると次のような理由で新規ユーザーを惹き付ける要因の一つになります。よってFluentdの普及促進につながります。

  • 今後の発展を期待できる
  • 問題が見つかりやすくなり安定化につながる
  • ユーザー間での情報共有が活発になり早く問題解決できる

コメントを書いたIssueやPRのうち特に問題の解決につながったものをリストアップしました。

メンテナのいないプロダクトでよく使われているものはメンテナンスを引き取る

これらは依頼されて引き取ったものもありますが、Fluentdが内部で利用しているライブラリもあります。 こういったプロジェクトを引き取りメンテナンスを継続することによって、既存ユーザーは安心して使い続けることができますし、新規ユーザーも安心して使い始められます。安心して使えていると既存ユーザーは他の人から意見を求められたときに「安心して使える」という情報を伝えてくれます。このようにFluentdの普及促進につながります。

新機能の開発

既存のFluentdのままでもかなり便利ですが、以下の点をより強化することによりさらにFluentdの普及促進につながります。

  • 機能追加
    • Fluentdを活用できるケースが増えて普及促進につながります。
  • 既存機能の改良
    • 運用中のFluentdのメンテナンスが楽になります。運用が楽になると既存ユーザーはよりFluentdを活用でき、まわりにその情報を提供してくれる可能性が増えます。また、新しくFluentdを活用できるケースが増えて普及促進につながります。
  • テスト周りの改良
    • プラグインを開発しやすくなり、既存プラグインのメンテナンス・新規プラグインの開発がはかどります。プラグインが増えるとFluentdを活用できるケースが増えて普及促進につながります。

依頼されたものだけでなく、特にFluentdの普及推進につながるものをリストアップしました。

バグの修正

バグを修正することにより、それが原因で使えなかったケースでもFluentdを活用できるようになり、Fluentdの普及促進につながります。また、既存ユーザーは運用で回避せずに済むため、よりFluentdを楽に運用できるようになります。このような既存ユーザーが情報共有といった形でコミュニティで活躍してくれるとコミュニティがさらに活発になりFluendの普及促進につながります。

依頼されたものではなく、自分たちで見つけたバグを修正したものをリストアップします。

Fluentd関連の技術情報発信

QiitaやククログでFluentd関連の技術情報を発信しています。

その他、Fluentdの普及促進につながること

他にも以下のことを実施しました。

  • Fluentdのプラグインを新機能に対応させる*2
    • v0.12.11で追加された secret パラメータ対応
    • v0.12.16/v0.12.17で追加された desc 対応
    • v0.12.0で追加されたfilterプラグインに対応するための修正
  • Fluentd v0.14のプラグインAPIへの対応*3
  • ライセンス表記の修正
  • TravisCIの設定の追加・修正・更新
  • Rubyのバージョンアップに伴う修正
  • ユニットテストの追加
  • 警告の除去
  • flume-ng-fluentd-sinkの開発
  • fluent-bitのHomebrewのFormulaの作成のプルリクエスト
  • RustのFluent Loggerのfruentlyの開発

これらは少し遠回しなものもありますが、次の理由でFluentdの普及促進につながると考えて実施しました。

  • Fluentdの新しいユーザーが簡単に新しいFluentdやFluentdの関連ソフトウェアを使えるようになるため
  • ライセンス表記を正しいものに修正すると、ユーザーがそのソフトウェアを使ってもよいものなのか簡単に正しく判断することができるようになるため
  • メンテナンス性を向上させて開発を続けやすくするため
  • Flume NGからFluentdへの移行や接続を容易にするため
  • Fluentdを動かすには厳しい環境に対してもFluentdのエコシステムに乗っかれるかどうかの検証環境の導入を容易にするため
  • 新しい言語のユーザーにFluentdへの関心を持ってもらえるようにするため

まとめ

このようにクリアコードではOSSの開発支援を実施しているので、OSSの開発をしているけれど手が足りなくて思うように進められないといったことがあれば相談してみてください。 相談はフォームからメッセージを送ってください。

*1 fluent-plugin-xxxやfluent-logger-xxxが多い

*2 3rdパーティ製も含む

*3 絶賛対応中

タグ: Fluentd
2016-08-29

Fluentdのプラグインのv0.14への移行の仕方

はじめに

クリアコードではFluentdというソフトウェアの開発に参加しています。Fluentdはログ収集や収集したデータの分配・集約などを行うソフトウェアです。

Fluentdのv0.14はv0.12とある程度の後方互換性が保たれているメジャーバージョンアップです。

v0.14での新機能を使ったプラグインを作成する際にはこれまでの Fluent 以下のクラスではなく、Fluent::Plugin 以下のクラスを継承し、実装する必要が出てきました。 また、v0.14からはプラグインでよく書かれるコードをカプセル化し共通に使えるヘルパーを提供することで、よりプラグイン開発者が簡潔で良くテストされたコードを使ってプラグインが開発出来るようになる、とアナウンスされています。*1

Inputプラグインの場合

Inputプラグインをv0.14のプラグインに移行する際には Fluent::Input の継承を止め、Fluent::Plugin::Input を継承するようにします。

また、Fluentdのテストを読むと、v0.14でのInputプラグインのテストにはfluent/test/driver/input.rbにある Fluent::Test::Driver::Input クラスのテストドライバを用いるようにすると良いことがわかります。

driver#run の書き方がv0.12と比べて変わっていると言う点に注意しなければなりません。 v0.14のテストドライバでは driver#run の終了条件が確定しない場合は例外が上がるようになっています。 *2

そのため、v0.14向けのInputプラグインのテストでは driver#run へブロックを渡すか、 driver#end_if で終了条件を指定することが必要です。

  1. requireするInputクラスを fluent/input から fluent/plugin/input へ変更する
  2. Time.now をナノ秒に対応した現在時刻を返す Fluent::EventTime.now に置き換える *3
  3. 前述のプラグインヘルパーを使う事のできる箇所は置き換える
  4. テストドライバをv0.14のものを使用するようにする。

という点に注意してInputプラグインのv0.14への移行作業を行います。

実際に、in_object_spaceプラグインをv0.14化したプルリクエストを見てみましょう。

1.と2.は見ての通りほぼそのままなので、とくにここでは詳細に解説しません。 Inputプラグインのv0.14への移行作業は3.の作業と4.の作業が特に重い作業です。

3.の作業に該当するものは、in_object_spaceプラグインをv0.14化したプルリクエストでは、timerヘルパーの使用にあたります。 v0.14ではタイマーを用いてイベントを発生させるのに汎用的なtimerヘルパーが提供されており、v0.12の頃はCool.ioのクラスを継承したクラスを作成してこの手の処理を行うコードを書く必要がありました。v0.14ではヘルパーを使うだけになっています。

また、プラグインヘルパーは helpers:inject のように使いたいプラグインヘルパー名をシンボルで渡す形式になっています。

4.の作業では、既に driver#run にブロックが渡されていたため、driver#emitsdriver#events に書き換える作業と、v0.14のInput Driverクラスの Fluent::Test::Driver::Input を用い、プラグインのクラスをv0.14のテストドライバに渡す作業のみでした。v0.14でのInputプラグインのテストドライバを利用するには fluent/test/driver/input をrequireする必要があります。

Outputプラグインの場合

v0.14のOutputプラグインが継承するべき Fluent::Plugin::Output クラスは実装されているメソッドによって

  1. バッファリングしないOutputプラグイン (non-Buffered Output)
  2. バッファリングし、同期的にバッファをコミットするOutputプラグイン (Buffered Synchronous Output)
  3. バッファリングし、非同期的にバッファをコミットするOutputプラグイン (Buffered Asynchronous Output)

の3つのOutputプラグインの性質を持つようになります。 また、v0.14でのOutputプラグインのテストドライバを利用するには fluent/test/driver/output をrequireする必要があります。 driver#run の書き方が変更になっており、例えば

driver.run(default_tag: 'test') do
  driver.feed(time, {"a"=>1})
  driver.feed(time, {"a"=>2})
end

のように、driver#rundefault_tag: キーワード引数にemitする時のタグを渡したり、driver#emit ではなくdriver#feed を用いてイベントを流し込む必要があるのに注意してください。 流し込まれたイベントはInputと同様に driver#eventsで取得することができます。

non-Buffered Output

まずは、1.の場合のv0.14への移行のプルリクエストを見ていきます。

この場合は #process メソッドのみOutputプラグインが実装する必要があります。 また、driver#run の新しい書き方に対応させました。

Buffered Synchronous Output

out_fileのv0.14化対応のプルリクエストを見ていきます。

Buffered Synchronous Outputのv0.14のプラグインは #write メソッドと <buffer> セクションを解釈出来るようにするか、compat_parametersプラグインヘルパーをプラグイン中で使用するようにします。

このプルリクエストではTimeSlicedOutputを継承したOutputプラグインのv0.14化対応をしています。 また、Outputプラグインからfomatterプラグインやbufferプラグインを使う事ができるため、それに関するプラグインヘルパーのconfig sectionの追加も行っています。 v0.14形式とv0.12形式のconfigは書き方が大幅に異なっています。*4

ですが、その差異を埋めるcompat_parametersプラグインヘルパーもあります。 このプラグインヘルパーの compat_parameters_convert メソッドを使う事により、v0.12形式のconfigでもv0.14のFluentdで引き続き使う事ができます。

Buffered Asynchronous Output

Buffered Asynchronous Outputのv0.14のプラグインは #try_write メソッドと <buffer> セクションを解釈出来るようにするか、compat_parametersプラグインヘルパーをプラグイン中で使用するようにします。 また、 prefer_delayed_commit でtrueを返すようにします。

まだこの非同期コミットに絞ったv0.14のプラグインはこの記事の執筆時点では書かれていません。

Buffered Synchronous/Asynchronous Output

実は、Buffered Outputプラグインはsynchronousとasynchronousの両方の機能をconfigで切り替えられるように書く事ができます。

Buffered Outputに対応したテスト用のプラグインを追加したプルリクエストを見てみます。

ここでは、Fluent::Plugin::Output を継承したOutputプラグインを追加しています。そこで#write#try_write、そして、#prefer_delayed_commit をそれぞれ実装しています。 このプラグインではバッファのコミットを非同期にする設定を追加してはいませんが、その設定をconfigからできるようにすることでバッファを同期的または非同期的にコミットする動作をconfigで切り替えられるプラグインを書く事ができます。

Outputプラグインのcustom format

また、v0.14のOutputプラグインはレコードのformatを行うためにプラグイン固有の#formatメソッドも定義しておく事が可能です。

Parserプラグインの場合

ParserプラグインはParserプラグイン単体で使われるプラグインではなく、Input・Outputプラグインなどから使われるOwnedプラグインと呼ばれる範疇のプラグインです。

これらParserプラグインのインスタンスはv0.14ではparserヘルパーの parser_create メソッドを用いて作成することが推奨されます。

v0.14のParserプラグインは Fluent::Plugin::Parser クラスを継承する必要があります。 Parserプラグインが持つべきメソッドは #parse のみです。また、v0.14対応をするには fluent/test/driver/parser にある Fluent::Test::Driver::Parser クラスのParserテストドライバを用いる必要があります。 v0.12では driver#parse にてパース結果をテストする形になっていましたが、v0.14では driver#instance によりParserプラグインのインスタンスを取り出してからパーサープラグインのインスタンスの #parse メソッドを呼ぶような規約に変更になったので注意が必要です。

Formatterプラグインの場合

FormatterプラグインはFormatterプラグイン単体で使われるプラグインではなく、Input・Outputプラグインなどから使われるOwnedプラグインと呼ばれる範疇のプラグインです。

v0.14のFormatterプラグインは Fluent::Plugin::Formatter クラスを継承する必要があります。 これらFormatterプラグインのインスタンスはv0.14ではformatterヘルパーの formatter_create メソッドを用いて作成することが推奨されます。 v0.14において、Formatterプラグインが持つべきメソッドは #format です。

また、v0.14対応をするには fluent/test/driver/formatter にある Fluent::Test::Driver::Formatter クラスのFormatterテストドライバを用いる必要があります。 v0.12ではテストドライバを使う事は少なかったのですが、 v0.14では driver#instance によりFormatterプラグインのインスタンスを取り出してからパーサープラグインのインスタンスの #format メソッドを呼ぶとformat後の値が取得し、テストするようになったので注意が必要です。

Filterプラグインの場合

v0.14のFilterプラグインは Fluent::Plugin::Filter クラスを継承する必要があります。

また、v0.14でのFilterプラグインのテストドライバは fluent/test/driver/filter をrequireする必要があります。

v0.12の頃は #filter よりも #filter_stream の方が10%程度高速だったため高速化のために利用されていましたが、v0.14ではfilterをパイプライン化できる時はする機能がサポートされることになり、 #filter を使うようにすることが推奨されます。 #filter_stream は互換性のために残されますが、非推奨扱いになります。

driver#run の書き方が変更になっており、例えば

driver.run do
  driver.feed('test', time, {"a"=>1})
  driver.feed('test', time, {"a"=>2})
end

のように、driver#emit ではなくdriver#feed を用いてイベントを流し込む必要があるのに注意してください。 流し込まれてFilterされた後のイベントは driver#filtered_records で取得することができます。

まとめ

Fluentdのv0.12向けに書かれたプラグインのv0.14対応の概要について説明しました。Fluentd向けに多くのプラグインが公開されていますが、v0.12の書き方のままで公開されているプラグインが多くあるのが現状です。 Fluentdのプラグインのv0.14化対応はやり方を把握すれば挑戦できないことはないので、v0.14らしい書き方をしてみたいFluentdのプラグイン開発者やプラグインを良くしたいユーザーの方々はv0.14のAPIを使ってみるといいのではないでしょうか。 この記事ではプラグインヘルパーについては深入りする事ができませんでした。プラグインヘルパーについてはまたの機会に説明することとします。 最後に、v0.14でこのプラグインが動いていないというIssueを上げるだけでもありがたいのでそのようなフィードバックをして頂けると幸いです。

*1 http://www.slideshare.net/tagomoris/fluentd-overview-now-and-then

*2 https://github.com/fluent/fluentd/blob/12718a218a1e78126108a573b85d4b18e8bd56d5/lib/fluent/test/driver/base.rb#L134

*3 `Fluent::Engine.now`は内部的に`Fluent::EventTime.now`を呼んでいるのでこのままでよいようです。

*4 例えば、formatterやbufferも一つのconfigセクションとして書けるようになっています。

つづき: 2017-01-05
タグ: Fluentd
2016-08-26

最小構成のFluent Loggerを作成するには

はじめに

クリアコードではFluentdというソフトウェアの開発に参加しています。Fluentdはログ収集や収集したデータの分配・集約などを行うソフトウェアです。

また、Fluent Loggerと呼ばれるFluentdへデータを転送出来る実装が各言語で公開されています。

Fluentdのv0.10とv0.12のイベントの転送の仕組みはForward protocolとして仕様化されています。登場したばかりの新しいプログラミング言語でFluent Loggerがまだない場合、この仕様に基づいて自分で実装する必要があります。

Forward Protocol v0

この記事ではFluentd v0.10とv0.12で用いられているForward Protocol v0を元に解説します。

Forwardプロトコル

Fluent Loggerでは次の3つのうちどれかを実装している必要があります:

  • Message Mode
  • Forward Mode
  • PackedForward Mode

このうち、Fluent loggerの実装によく使われているのがMessage modeと呼ばれる形式です。現在のところPackedForward対応を謳っているFluent logger実装は多くありません。*1

この記事ではMessage modeでの最小構成のFluent Loggerの作り方について解説をするため、Message modeに絞って解説します。

Message modeとは

Message modeは一度に一つのイベントのみを送ることができます。また、そのイベントの中身は次の通りです。

  • タグ
  • Unix時間
  • レコード
  • オプション

このうち、オプションは必須ではありません。そのため、タグ・Unix時間・レコードをひとまとめにしてForwardプロトコルを実装しているサーバーに送ることが出来ればよい、となります。

また、送る形式はJSONまたはmsgpackのどちらかを選択する事ができます。

Message modeでイベントを送るには

ここまででMessage modeでイベントを送るための形式が分かりました。実際にMessage modeでイベントをForward protocol v0を実装したサーバーに送る際には接続の種類や、接続の切断まで仕様から読み解く必要があります。 では、その仕様を見てみましょう。その仕様はFluentdのWikiの以下の部分に書かれています:

OptionとResponseの節でMessage modeでoptionを送らない場合はForwardプロトコルを実装しているサーバーは接続を切る動作をします。 なので、Message modeで送る際には以下のような流れでFluent LoggerはForwardプロトコルを実装するサーバーにイベントを送ります。

  1. Forwardプロトコルを実装しているサーバーにTCPで接続する
  2. タグ・Unix時間・レコードを含むイベントをForwardプロトコルを実装しているサーバーに送る
  3. Forwardプロトコルを実装しているサーバーとの接続を切る

という流れになります。また、毎回サーバーがTCP接続を切るのでTCP接続を永続化する事はできないことに注意してください。

おわりに

Fluent Loggerに最低限必要な機能には何があるかを解説しました。以上はForwardプロトコルに載せてイベントを送る際に最低限必要な事柄です。 この方針で実装されたFluent Loggerはネットワーク上の問題やアプリケーションの異常によるログの消失を抑える工夫がなされていないため、これらを抑える工夫が必要になります。 その工夫については次の機会に解説します。

*1 例えば、 Fluency | GitHub はPacked Forwardを実装しているのを確認しています。

タグ: Fluentd
2016-04-22

Fluentd Sinkを用いてApache Flume NGからFluentdへレコードを出力するには

はじめに

クリアコードではFluentdというソフトウェアの開発に参加しています。Fluentdはログ収集や収集したデータの分配・集約などを行うソフトウェアです。

また、fluent-loggerと呼ばれるFluentdへデータを転送出来る実装が各言語で公開されています。

同じようなログ収集や収集したデータ分配・集約のソフトウェアとして、Apache Flume NGというものもあります。

Apache Flume NGにはデータの入力とデータの出力のAPIが公開されており、プラグインを実装することが可能です。 Apache Flume NGの場合はそれぞれ入力がSource、出力がSinkと呼ばれています。

Flume NGのSinkについて

FluentdのようにApache Flume NGに対してもサードパーティがプラグインを公開しています。 ただし、サードパーティのプラグインはJVMで動き、Flume NGのjarが公開しているAPIを使用できる言語なら好きなものを使う事ができます。 中にはFlumeの実装に使われているJavaではなくScalaで実装されたFlume NGプラグインも存在します。

FlumeのSinkでは次のメソッドを実装することが必要です。

public void start(); // @Override org.apache.flume.AbstractSink's start()
public void stop(); // @Override org.apache.flume.AbstractSink's stop()
public void process(); // @Override org.apache.flume.Sink's process()
public void configure(); // Implement org.apache.flume.conf.Configurable 

そのほか、トランザクションを使う時にはorg.apache.flume.Transaction をimportし、チャンネル毎のトランザクションを発行する必要があったり、org.apache.flume.Sink にある Status 定数を用いて、トランザクションが成功すれば READY、失敗であれば BACKOFF とするように書きます。

Fluentd Sinkの作成

Apache Flume NGはJavaで実装されている上に、特段他の言語を採用する理由が見当たらなかったのでJavaで実装しました。また、ビルドツールにはGradleを使用しました。 リポジトリはこちらです。

Javaのfluent-loggerにはfluent-logger-javaが存在しますが、より速いと謳っているfluencyをfluent-loggerとして用いる事にしました。

FlumeのSinkとして作成をしてみると以下の利点がある事が分かりました。

  • Sinkの中でTransactionが発行出来る
  • Transactionが発行出来るため、FlumeのconfでtransactionCapacityで指定した分だけトランザクションを実行出来る

という点です。このため、設定によっては安全に転送する事が出来ます。

使い方

ほとんどREADMEに書いていますが、Flume NGの${FLUME_HOME}/lib 以下に gradle shadowJar を実行して作成されたflume-ng-fluentd-sink-${version}-all.jarを配置し、confを書くだけです。 0.0.1のリリースタグを打ってあるので、このv0.0.1 リリースページからflume-ng-fluentd-sink-0.0.1-all.jarをFlume NGの${FLUME_HOME}/lib 以下に配置することでも使う事ができます。

おわりに

Flume NGを使っていて、Fluentdに送れる機能が欲しいなぁという方にはぜひ試して頂きたいです。そうでなくてもJavaでこうした方が良いですよ、というものでもPRして下されば取り込みますのでよろしくお願いします。

つづき: 2016-01-06
タグ: Fluentd
2015-12-02

2008|05|06|07|08|09|10|11|12|
2009|01|02|03|04|05|06|07|08|09|10|11|12|
2010|01|02|03|04|05|06|07|08|09|10|11|12|
2011|01|02|03|04|05|06|07|08|09|10|11|12|
2012|01|02|03|04|05|06|07|08|09|10|11|12|
2013|01|02|03|04|05|06|07|08|09|10|11|12|
2014|01|02|03|04|05|06|07|08|09|10|11|12|
2015|01|02|03|04|05|06|07|08|09|10|11|12|
2016|01|02|03|04|05|06|07|08|09|10|11|12|
2017|01|02|03|04|
タグ:
RubyKaigi 2015 sponsor RubyKaigi 2015 speaker RubyKaigi 2015 committer RubyKaigi 2014 official-sponsor RubyKaigi 2014 speaker RubyKaigi 2014 committer RubyKaigi 2013 OfficialSponsor RubyKaigi 2013 Speaker RubyKaigi 2013 Committer SapporoRubyKaigi 2012 OfficialSponsor SapporoRubyKaigi 2012 Speaker RubyKaigi2010 Sponsor RubyKaigi2010 Speaker RubyKaigi2010 Committer badge_speaker.gif RubyKaigi2010 Sponsor RubyKaigi2010 Speaker RubyKaigi2010 Committer
SapporoRubyKaigi02Sponsor
SapporoRubyKaigi02Speaker
RubyKaigi2009Sponsor
RubyKaigi2009Speaker
RubyKaigi2008Speaker