ククログ

株式会社クリアコード > ククログ > v0.14 Outputプラグインの仕様解説

v0.14 Outputプラグインの仕様解説

はじめに

クリアコードはFluentdというソフトウェアの開発に参加しています。Fluentdはログ収集や収集したデータの分配・集約などを行うソフトウェアです。

v0.14での新機能を使ったプラグインを作成する際にはこれまでの Fluent 以下のクラスではなく、Fluent::Plugin 以下のクラスを継承し、実装する必要が出てきました。 また、v0.14のOutputプラグインはv0.12とは異なり、Fluent::Plugin::Output クラスに様々な機能が入っています。これらの機能をプラグイン開発者向けに解説することを目指します。

この記事はv0.14.8以降が対象です。 まずは、Outputプラグインが必ず実装するべきメソッドについてのおさらいです。

non-buffered

def emit(tag, es, chain)
  # ...
  chain.next
end

def process(tag, es)
  # ...
end

と読み替えます。 output#process(tag, es) だけを実装するとnon-bufferedプラグインになります。

例えば、out_relabel の使用例があります。

buffered synchronous

output#write(chunk) を実装するとbuffered outputプラグインになります。

def write(chunk)
  # ...
end

例えば、out_stdout の使用例があります。

buffered asynchronous

output#try_write(chunk) を実装するとbuffered asynchronous outputプラグインになります。

def try_write(chunk)
  # ...
end

out_stdout の使用例があります。ただし、これはテスト用の実装のため、実用のものとは異なることに注意してください。

また、#commit_write(chunk_id) を呼び、chunkのwriteを確定させることが必要です。 rollback_writecommit_write が行われないまま指定秒数が経過した chunk に対して自動的に呼ばれるので、プラグイン開発者が明示的に呼ぶ必要は通常はありません(秒数は delayed_commit_timeout で設定から制御可能)。

ここまでがv0.14のOutputプラグインの基本的な事柄です。

では、さらにv0.14のプラグイン開発者にとって必要なことを順々に見ていきましょう。

custom format

#format(tag, time, record) を実装すると、bufferのchunkでmsgpack以外のformatが使用できるようになります。

#format を使用すると、

def formatted_to_msgpack_binary
  true
end

としてtrueを返すようにしなければ chunk#msgpack_each メソッドは使用できません。

chunk#msgpack_each

v0.12のObjectBufferedOutput互換になるのは #format を実装していない場合です。 #format の有無や、 #formatted_to_msgpack_binary の返り値によって挙動が異なってくるのに注意してください。

standard format

chunk#msgpack_each でyieldされてくる値は #format を実装している時とそうでない時で異なります。

def write(chunk)
  chunk.msgpack_each do |time, record|
    # ...
  end
end

ただし、#msgpack_each は互換性のために残されているものです。 通常は chunk.each を使ってください。msgpack_each も(主に互換性の関係から) alias が定義されていますが、本来 chunk の内部フォーマット(msgpack)を意識させたメソッドを使うのは好ましくありません。

tagが必要な場合は、

config_section :buffer do
  config_set_default :@type, DEFAULT_BUFFER_TYPE
  config_set_default :chunk_keys, ['tag']
end

のようなbufferのdefault confを足し、chunk.metadata.tag で取得してください。

また、tag が必要な場合 config_set_default :chunk_keys, ['tag'] を指定しておくのはよいですが、これは設定で上書きされる可能性があるため #configure でチェックを行うべきです。

def configure(conf)
  super

  raise Fluent::ConfigError, "chunk keys must include 'tag' for this plugin" unless @chunk_key_tag
  # ...
end
custom format

#format(tag, time, record) を実装した場合は、to_msgpackでmsgpackへパックした順にmsgpack_eachをすると得られます。 また、#formatted_to_msgpack_binary をオーバーライドしてtrueを返すようにしてください。

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

def formatted_to_msgpack_binary
  true
end

def write(chunk)
  chunk.msgpack_each do |tag, time, record|
    # ...
  end
end

injectヘルパーを使う場合は #format(tag, time, record) を通すことでより見通しが良くなります。そのため、 #format を実装し、その中で inject_values_to_record(tag, time, record) を呼ぶようにしてください。

発展形

v0.14のOutputプラグインはオーバーライドするメソッドや実装するメソッドにより、confの設定により実行時に3種の異なる種別のOutputプラグインへ切り替えることができます。

non-bufferedとbufferedの切り替え

これは以下の優先順位で行われます:

  1. 実装メソッドによる分岐 (例: #process しか実装されていない → non-buffered)

  2. 両方実装されている場合で、かつ設定において <buffer> セクションが指定されている場合 → buffered

  3. 両方実装されており設定にセクションが指定されていない場合 → #prefer_buffered_processing を呼んで判定

buffered synchronous/asynchronousの切り替え

output#writeoutput#try_write を実装して #prefer_delayed_commit の返り値のtrue/falseでbuffered synchronousとbuffered asynchronousを切り替えられます。

  • true -> buffered asynchronous

  • false -> buffered synchronous

output#writeoutput#try_write のどちらか一方だけ実装している場合は、#prefer_delayed_commit は呼ばれません。

bufferedプラグインの注意点

#write, #try_write を実装していないOutputプラグインへのconfigには <buffer> ディレクティブが使用できません。

複合形
#prefer_delayed_commit #prefer_buffered_processing 結果
false false non-buffered
false true buffered synchronous
true true buffered asynchronous
true false 選択不可

secondaryの扱い

secondaryに指定されたプラグインはbufferingのサポートが必要です。out_fileなどのbufferingをサポートしたoutputプラグインを指定できます。

bufferディレクティブのCHUNK_KEYSアトリビュート

<buffer CHUNK_KEYS>のようにbufferディレクティブにはCHUNK_KEYSのアトリビュートの指定が可能です。 tag, timekey, variablesの指定ができるようになっています。これはこのアトリビュートによってチャンクをひとまとめにするためにあります。

  • tag →タグごとにチャンクがまとめられる

  • timekey →time formatごとにチャンクがまとめられる

  • variables →レコードの中のキーごとのチャンクがまとめられる

buffered outputプラグインのflushで用いられるthread

start時に <buffer> ディレクティブにある flush_thread_count で指定されている数のスレッドを作ります。#submit_flush_once は単にそれらのスレッドを明示的にアクティブにしているだけです。

v0.12のbuffered outputプラグインの自前スレッドの書き換え

プラグインが自前で作成していたスレッドは以下のようにできるはずです。

  • 定期的にある処理を行う必要があった場合 → timer plugin helper を使う

  • Fluent::Output プラグインを継承していたが(ある設定が有効なときのみ)バックグラウンドでflushするような処理を自前で書いていた → #process および #write 両方を実装して設定により挙動を切り替える

  • socketをlistenしていた → socket/server plugin helper を使う(これから実装される)

それ以外の場合は thread plugin helper を使います。自前で Thread.new するべきではありません。thread plugin helperを使う場合、plugin test driverがそのスレッドの状態管理などの面倒を見てくれるため、たまに失敗するテスト、などの危険性が大幅に低下します。

プレースホルダ

chunk.metadata が実際にどの値を有しているかは <buffer CHUNK_KEYS>CHUNK_KEYS に何をユーザが指定したか(あるいは config_set_default で何が指定されていたか)により異なります。 が、プラグイン作者が独自にチェックするべきではなく #configure 内で #placeholder_validate!("name_of_parameter", @name_of_parameter) を使うべきです。使われているプレースホルダと chunk key の間に不整合があれば configuration error が上がります。 (もっと細かい制御もやろうと思えばできますが、コーナーケースです。こちらの議論を参照してください。)

つまりプラグイン作者は #configure 内で #placeholder_validate! し、そこが通っているならあとは #writeextract_placeholder(@name_of_parameter, chunk.metadata) するだけでよいです。

${tag}

chunkに含まれるタグに展開されます。 また、tag1.tag2.tag3.... のようなタグとなっている場合、 ${tag[0]}, ${tag[1]}, ${tag[2]},...のようにタグの添え字を指定することで個別に取り出すことができます。

strftime形式(%Y%m%dなど)

strftimeのフォーマットに準じて展開されます。 variable_%Y-%m-%dT%H:%M:%S.%N のように用います。 これは variable_2015-12-25T12:34:56.123450000 のように展開されます。

まとめ

v0.14のOutputプラグインの仕様をFluentdの開発者の協力を仰ぎ1書き出してみました。v0.12のoutputプラグインと変わっている箇所も多く、単純にv0.14への移行は難しい箇所もあります。 v0.14のAPIを使うように移行するとプラグインヘルパーやプレースホルダーの機能により、より柔軟なconfの設定を書くことが可能になります。例えば、プレースホルダーの機能を使ったものとしては、fluent-plugin-mysql のテーブル名へのプレースホルダーを指定可能にする機能2 を実装したものがあります。このようにタグや日付ごとのデータ集計をサポートする機能を簡単に実装できるようになるというメリットがあるため、v0.14のAPIを使うように移行を試みてみるのはいかがでしょうか?

  1. この記事を書くに当たって @tagomoris さんのレビューの協力を仰ぎました。ありがとうございます。

  2. https://github.com/tagomoris/fluent-plugin-mysql#configuration-examplebulk-insert-with-tag-placeholder-for-table-namehttps://github.com/tagomoris/fluent-plugin-mysql#configuration-examplebulk-insert-with-time-format-placeholder-for-table-name を参照。