はじめに
クリアコードではFluentdというソフトウェアの開発に参加しています。Fluentdはログ収集や収集したデータの分配・集約などを行うソフトウェアです。
Fluentdのv0.14はv0.12とある程度の後方互換性が保たれているメジャーバージョンアップです。
v0.14での新機能を使ったプラグインを作成する際にはこれまでの Fluent 以下のクラスではなく、Fluent::Plugin 以下のクラスを継承し、実装する必要が出てきました。
また、v0.14からはプラグインでよく書かれるコードをカプセル化し共通に使えるヘルパーを提供することで、よりプラグイン開発者が簡潔で良くテストされたコードを使ってプラグインが開発出来るようになる、とアナウンスされています。1
Inputプラグインの場合
Inputプラグインをv0.14のプラグインに移行する際には Fluent::Input の継承を止め、Fluent::Plugin::Input を継承するようにします。
また、Fluentdのテストを読むと、v0.14でのInputプラグインのテストにはfluent/test/driver/input.rbにある Fluent::Test::Driver::Input クラスのテストドライバを用いるようにすると良いことがわかります。
driver#run の書き方がv0.12と比べて変わっていると言う点に注意しなければなりません。
v0.14のテストドライバでは driver#run の終了条件が確定しない場合は例外が上がるようになっています。 2
そのため、v0.14向けのInputプラグインのテストでは driver#run へブロックを渡すか、 driver#end_if で終了条件を指定することが必要です。
-
requireするInputクラスを
fluent/inputからfluent/plugin/inputへ変更する -
Time.nowをナノ秒に対応した現在時刻を返すFluent::EventTime.nowに置き換える 3 -
前述のプラグインヘルパーを使う事のできる箇所は置き換える
-
テストドライバをv0.14のものを使用するようにする。
という点に注意してInputプラグインのv0.14への移行作業を行います。
実際に、in_object_spaceプラグインをv0.14化したプルリクエストを見てみましょう。
1.と2.は見ての通りほぼそのままなので、とくにここでは詳細に解説しません。 Inputプラグインのv0.14への移行作業は3.の作業と4.の作業が特に重い作業です。
3.の作業に該当するものは、in_object_spaceプラグインをv0.14化したプルリクエストでは、timerヘルパーの使用にあたります。 v0.14ではタイマーを用いてイベントを発生させるのに汎用的なtimerヘルパーが提供されており、v0.12の頃はCool.ioのクラスを継承したクラスを作成してこの手の処理を行うコードを書く必要がありました。v0.14ではヘルパーを使うだけになっています。
また、プラグインヘルパーは helpers に :inject のように使いたいプラグインヘルパー名をシンボルで渡す形式になっています。
4.の作業では、既に driver#run にブロックが渡されていたため、driver#emits を driver#events に書き換える作業と、v0.14のInput Driverクラスの Fluent::Test::Driver::Input を用い、プラグインのクラスをv0.14のテストドライバに渡す作業のみでした。v0.14でのInputプラグインのテストドライバを利用するには fluent/test/driver/input をrequireする必要があります。
Outputプラグインの場合
v0.14のOutputプラグインが継承するべき Fluent::Plugin::Output クラスは実装されているメソッドによって
-
バッファリングしないOutputプラグイン (non-Buffered Output)
-
バッファリングし、同期的にバッファをコミットするOutputプラグイン (Buffered Synchronous Output)
-
バッファリングし、非同期的にバッファをコミットするOutputプラグイン (Buffered Asynchronous Output)
の3つのOutputプラグインの性質を持つようになります。
また、v0.14でのOutputプラグインのテストドライバを利用するには fluent/test/driver/output をrequireする必要があります。
driver#run の書き方が変更になっており、例えば
driver.run(default_tag: 'test') do
driver.feed(time, {"a"=>1})
driver.feed(time, {"a"=>2})
end
のように、driver#run のdefault_tag: キーワード引数にemitする時のタグを渡したり、driver#emit ではなくdriver#feed を用いてイベントを流し込む必要があるのに注意してください。
流し込まれたイベントはInputと同様に driver#eventsで取得することができます。
non-Buffered Output
まずは、1.の場合のv0.14への移行のプルリクエストを見ていきます。
この場合は #process メソッドのみOutputプラグインが実装する必要があります。
また、driver#run の新しい書き方に対応させました。
Buffered Synchronous Output
out_fileのv0.14化対応のプルリクエストを見ていきます。
Buffered Synchronous Outputのv0.14のプラグインは #write メソッドと <buffer> セクションを解釈出来るようにするか、compat_parametersプラグインヘルパーをプラグイン中で使用するようにします。
このプルリクエストではTimeSlicedOutputを継承したOutputプラグインのv0.14化対応をしています。 また、Outputプラグインからfomatterプラグインやbufferプラグインを使う事ができるため、それに関するプラグインヘルパーのconfig sectionの追加も行っています。 v0.14形式とv0.12形式のconfigは書き方が大幅に異なっています。4
ですが、その差異を埋めるcompat_parametersプラグインヘルパーもあります。
このプラグインヘルパーの compat_parameters_convert メソッドを使う事により、v0.12形式のconfigでもv0.14のFluentdで引き続き使う事ができます。
Buffered Asynchronous Output
Buffered Asynchronous Outputのv0.14のプラグインは #try_write メソッドと <buffer> セクションを解釈出来るようにするか、compat_parametersプラグインヘルパーをプラグイン中で使用するようにします。
また、 prefer_delayed_commit でtrueを返すようにします。
まだこの非同期コミットに絞ったv0.14のプラグインはこの記事の執筆時点では書かれていません。
Buffered Synchronous/Asynchronous Output
実は、Buffered Outputプラグインはsynchronousとasynchronousの両方の機能をconfigで切り替えられるように書く事ができます。
Buffered Outputに対応したテスト用のプラグインを追加したプルリクエストを見てみます。
ここでは、Fluent::Plugin::Output を継承したOutputプラグインを追加しています。そこで#write、#try_write、そして、#prefer_delayed_commit をそれぞれ実装しています。
このプラグインではバッファのコミットを非同期にする設定を追加してはいませんが、その設定をconfigからできるようにすることでバッファを同期的または非同期的にコミットする動作をconfigで切り替えられるプラグインを書く事ができます。
Outputプラグインのcustom format
また、v0.14のOutputプラグインはレコードのformatを行うためにプラグイン固有の#formatメソッドも定義しておく事が可能です。
Parserプラグインの場合
ParserプラグインはParserプラグイン単体で使われるプラグインではなく、Input・Outputプラグインなどから使われるOwnedプラグインと呼ばれる範疇のプラグインです。
これらParserプラグインのインスタンスはv0.14ではparserヘルパーの parser_create メソッドを用いて作成することが推奨されます。
v0.14のParserプラグインは Fluent::Plugin::Parser クラスを継承する必要があります。
Parserプラグインが持つべきメソッドは #parse のみです。また、v0.14対応をするには fluent/test/driver/parser にある Fluent::Test::Driver::Parser クラスのParserテストドライバを用いる必要があります。
v0.12では driver#parse にてパース結果をテストする形になっていましたが、v0.14では driver#instance によりParserプラグインのインスタンスを取り出してからパーサープラグインのインスタンスの #parse メソッドを呼ぶような規約に変更になったので注意が必要です。
Formatterプラグインの場合
FormatterプラグインはFormatterプラグイン単体で使われるプラグインではなく、Input・Outputプラグインなどから使われるOwnedプラグインと呼ばれる範疇のプラグインです。
v0.14のFormatterプラグインは Fluent::Plugin::Formatter クラスを継承する必要があります。
これらFormatterプラグインのインスタンスはv0.14ではformatterヘルパーの formatter_create メソッドを用いて作成することが推奨されます。
v0.14において、Formatterプラグインが持つべきメソッドは #format です。
また、v0.14対応をするには fluent/test/driver/formatter にある Fluent::Test::Driver::Formatter クラスのFormatterテストドライバを用いる必要があります。
v0.12ではテストドライバを使う事は少なかったのですが、
v0.14では driver#instance によりFormatterプラグインのインスタンスを取り出してからパーサープラグインのインスタンスの #format メソッドを呼ぶとformat後の値が取得し、テストするようになったので注意が必要です。
Filterプラグインの場合
v0.14のFilterプラグインは Fluent::Plugin::Filter クラスを継承する必要があります。
また、v0.14でのFilterプラグインのテストドライバは fluent/test/driver/filter をrequireする必要があります。
v0.12の頃は #filter よりも #filter_stream の方が10%程度高速だったため高速化のために利用されていましたが、v0.14ではfilterをパイプライン化できる時はする機能がサポートされることになり、 #filter を使うようにすることが推奨されます。 #filter_stream は互換性のために残されますが、非推奨扱いになります。
driver#run の書き方が変更になっており、例えば
driver.run do
driver.feed('test', time, {"a"=>1})
driver.feed('test', time, {"a"=>2})
end
のように、driver#emit ではなくdriver#feed を用いてイベントを流し込む必要があるのに注意してください。
流し込まれてFilterされた後のイベントは driver#filtered_records で取得することができます。
まとめ
Fluentdのv0.12向けに書かれたプラグインのv0.14対応の概要について説明しました。Fluentd向けに多くのプラグインが公開されていますが、v0.12の書き方のままで公開されているプラグインが多くあるのが現状です。 Fluentdのプラグインのv0.14化対応はやり方を把握すれば挑戦できないことはないので、v0.14らしい書き方をしてみたいFluentdのプラグイン開発者やプラグインを良くしたいユーザーの方々はv0.14のAPIを使ってみるといいのではないでしょうか。 この記事ではプラグインヘルパーについては深入りする事ができませんでした。プラグインヘルパーについてはまたの機会に説明することとします。 最後に、v0.14でこのプラグインが動いていないというIssueを上げるだけでもありがたいのでそのようなフィードバックをして頂けると幸いです。
-
http://www.slideshare.net/tagomoris/fluentd-overview-now-and-then ↩
-
https://github.com/fluent/fluentd/blob/12718a218a1e78126108a573b85d4b18e8bd56d5/lib/fluent/test/driver/base.rb#L134 ↩
-
Fluent::Engine.nowは内部的にFluent::EventTime.nowを呼んでいるのでこのままでよいようです。 ↩ -
例えば、formatterやbufferも一つのconfigセクションとして書けるようになっています。 ↩