Fluentd v0.14のプラグインヘルパーの使い方 - 2016-08-30 - ククログ

ククログ

株式会社クリアコード > ククログ > Fluentd v0.14のプラグインヘルパーの使い方

Fluentd v0.14のプラグインヘルパーの使い方

はじめに

クリアコードではOSS開発支援サービスの一環でTreasureDataさんが中心になって開発し公開しているFluentdとそのプラグインなど1の開発を支援しています。 Fluentd v0.14からはプラグインでよく使われる定型の処理をより簡易に扱えるようにするプラグインヘルパーが実装されました。 この記事ではv0.14の執筆時点のプラグインヘルパーの使用方法の概説を行います。また、プラグインヘルパーを解説した各節の最後には実際の使われている箇所の一例を示します。

プラグインの使い方の概要

プラグインヘルパーは以下のようにhelpersに可変長引数で指定します。

helpers :plugin_name1, :plugin_name2, ..., plugin_nameN

また、helpersは複数回呼ぶこともできます。プラグイン毎にhelpersで使いたいプラグインヘルパーを可変長引数で指定するか複数回指定するかのポリシーを決めておくとよいでしょう。

child_process

プラグインから子プロセスを起動する定型の処理をカプセル化し、より簡易に扱えるようにするためのプラグインヘルパーです。

helpers :child_process
# ...
def start
  super
  child_process_execute(title, command[, ...]) do |io|
  end
  # ...
end
# ...
end

のように使用します。一定間隔で子プロセスを起動するには

config_param :command, :string
config_param :run_interval, :time
# ...
def start
  super
  child_process_execute(:example_child_process, @command, interval: @run_interval, mode: [:read]) do |io|
  end
  # ...
end
# ...
end

のようにします。一回だけ起動する場合は

config_param :command, :string
config_param :run_interval, :time
# ...
def start
  super
  child_process_execute(:example_child_process, @command, immediate: true, mode: [:read]) do |io|
  end
  # ...
end
# ...
end

のように使用します。このプラグインヘルパーは例えばin_execプラグインで使用されています。

child_processプラグインヘルパーの注意点

第一引数はシンボルである必要があります。また、この引数で渡されたシンボルはグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

compat_parameters

v0.12のconfigのflat styleからv0.14のstructured styleへ自動的にマッピングさせるためのプラグインヘルパーです。

v0.12のconfigではbufferのconfigは以下のような形式でした。

<match **>
  @type file
  path /path/to/file/*.log
  buffer_type file
  buffer_path /path/to/buffer/*.log
  time_sliced_format %Y%m%d%H # sliced per hour
  buffer_chunk_limit 16m
</match>

v0.14のconfigでは以下のような形式で書く必要があります。

<match **>
  @type file
  path /path/to/file/*.log
  <buffer>
     @type file
     path /path/to/buffer/*.log
     time_key 60m # sliced per hour
     chunk_size_limit 16m
  </buffer>
</match>

compat_parametersプラグインヘルパーを使用する事でflat styleからstructured styleへ自動的にマッピングできます。 以下、その概要です。

helpers :compat_parameters
# ...

def configure(conf)
  # ...
  compat_parameters_convert(conf, :buffer)
  super
  # ...
end

のように使用します。また、このプラグインヘルパーは後述するformatterやinject、parserプラグインと一緒に使用することがあります。

その場合は、

helpers :inject, :formatter, :compat_parameters
# ...
def configure(conf)
  # ...
  compat_parameters_convert(conf, :inject, :formatter)
  # ...
end

のように、conpat_parametersを通して変換したいプラグインのtypeを指定します。 例えばfilter_stdoutプラグインがこのプラグインヘルパーを使用しています。

event_emitter

v0.14のFluentdの Fluent::Plugin::Output クラスはデフォルトで emit が使用できなくなりました。 v0.14のOutputプラグインでは Engine.emit を直接呼べなくなるので router.emit を行いたい場合はこのプラグインヘルパーを使う必要があります。

使い方は

helpers :event_emitter

とするのみです。 v0.12のOutputプラグインの時と同じように router.emit を呼べるようになります。 このプラグインヘルパーは例えばout_relabelプラグインヘルパーで使用されています。

event_loop

event_loopヘルパーは Coolio::Loop クラスをカプセル化し、より手軽に扱えるようにしたプラグインヘルパーです。

helpers :event_loop
# ...
def start
  # ...
  @handler = ...
  event_loop_attach(@handler)
  # ...
end

のようにして使います。 @handler の中身は Fluent::SocketUtil::UdpHandlerCoolio::TCPServer などのインスタンスを入れることになります。 このプラグインヘルパーは後述のtimerプラグインヘルパーの内部で使われているため、暗黙的に使われていることが多いです。 in_syslogプラグインは明示的にこのプラグインヘルパーを使っています。

formatter

formatterを使うにはv0.12では Fluent::Plugin.new_formatter を呼び、Formatterのインスタンスを作成する一連の処理を書く必要がありました。 この処理ををカプセル化し、より手軽にformatterをプラグイン中で使用するためのプラグインヘルパーです。

このプラグインヘルパーは

helpers :formatter
DEFAULT_FORMAT_TYPE = 'out_file'
# ...
def configure(conf)
  # ...
  super
  @formatter = formatter_create(usage: 'example_format', conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE)
  # ...
end

def format(tag, time, record)
  # ...
  @formatter.format(tag, time, record)
  # ...
end

のようにして使います。

また、実際のFluentdのプラグインでv0.12形式のconfigも扱う必要がある場合はcompat_parametersプラグインと一緒に以下のようにして使います。

helpers :compat_parameters, :formatter
DEFAULT_FORMAT_TYPE = 'out_file'
# ...
def configure(conf)
  compat_parameters_convert(conf, :formatter)
  super
  @formatter = formatter_create(usage: 'example_format', conf: conf.elements('format').first, default_type: DEFAULT_FORMAT_TYPE)
  # ...
end

def format(tag, time, record)
  # ...
  @formatter.format(tag, time, record)
  # ...
end

formatterプラグインヘルパーは多くのプラグインで使用されています。compat_parametersプラグインヘルパーと組み合わせて使っている例として、filter_stdoutプラグインがあります。 また、Fluent::Plugin::Output クラスを継承している場合、formatter#format はプラグイン中の #format を実装している場合、このメソッドがバッファを書き出す前に呼ばれるようになります。 #format にレコードのフォーマット操作のみの役割を持たせる場合、テストもしやすくなるのでおすすめです。

formatterプラグインヘルパーの注意点

第一引数は文字列である必要があります。また、この引数で渡された文字列はグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

inject

injectプラグインヘルパーはホスト名、タグ、時刻をレコードへ挿入するためのプラグインヘルパーです。 このプラグインヘルパーは

helpers :inject
# ...
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  # ...
end

のようにして使用します。基本的にrecordにアクセス出来る箇所であればどこでも使えますが、テストコードの関係上#format メソッドの中で利用するのが良いでしょう。

このプラグインヘルパーはv0.12の SetTagKeyMixinSetTimeKeyMixinhostname プレースホルダーの置き換えを狙ったものです。 このプラグインヘルパーは例えば、out_stdoutプラグインで使用されています。 3rdパーティ製のプラグインでもこのプラグインヘルパーの使いどころはかなりあるはずなので、使えそうな箇所を探してみてください。

parser

formatterを使うにはv0.12では Fluent::Plugin.new_parser を呼び、Parserのインスタンスを作成する一連の処理を書く必要がありました。 この処理ををカプセル化し、より手軽にParserをプラグイン中で使用するためのプラグインヘルパーです。

このプラグインヘルパーは

helpers :parser
DEFAULT_PARSER_TYPE = 'syslog'
# ...
def configure(conf)
  # ...
  super
  @parser = parser_create(usage: 'example_parse', type: DEFAULT_PARSER_TYPE, conf: conf)
  # ...
end

def do_something(text)
  # ...
  @parser.parse(text) do {|time, record|
     # ...
  }
  # ...
end

のようにして使われます。 このプラグインヘルパーは使われ方が若干特殊ですがin_syslogプラグインで使われています。

parserプラグインヘルパーの注意点

第一引数は文字列である必要があります。また、この引数で渡された文字列はグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

retry_state

retry_stateプラグインヘルパーはOutputプラグイン等でリトライ処理を行う際の決まったコードをカプセル化し、より簡易に利用出来るようにする機能を提供します。 このプラグインヘルパーはOutputプラグインのリトライ処理を切り出したもので、組み込み以外のプラグインでも組み込みのプラグインと同じようなリトライのロジックをより簡易に扱えるようになります。

helpers :retry_state
# ...
config_section :buffer, param_name: :buffer_config, init: true, required: false, multi: false, final: true do
  config_param :retry_timeout, :time, default: 72 * 60 * 60
  config_param :retry_type, :enum, list: [:exponential_backoff, :periodic], default: :exponential_backoff
  config_param :retry_wait, :time, default: 1
end
# ...
def start
  super
  @retry_state = retry_state_create(
    :example_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout
  )
  # ...
end

のようにして使います。このプラグインヘルパーはOutputプラグインの継承元クラスを実装しているoutput.rbで使用されています。

storage

v0.14ではプラグインの状態をKey-Value形式で保存するのに用いるStorageプラグインが新たに導入されました。 storageプラグインヘルパーはv0.14で新たに導入されたこのプラグインを使用する際の決まったコードをカプセル化し、より簡易に利用出来るようにする機能を提供します。

このプラグインヘルパーは

helpers :storage

DEFAULT_STORAGE_TYPE = 'local'
#...
def configure(conf)
  super
  @storage = storage_create(usage: 'example_storing_value', conf: config, default_type: DEFAULT_STORAGE_TYPE)
end

def start
  super
  @storage.put(:example_value, 0) unless @storage.get(:example_value)
  # ...
end

def do_something
  @sutorage.update(:example_value){|v| v + 1 }
end

のようにして使用します。 このプラグインヘルパーはin_dummyプラグインのsuspendオプションの実装に使用しました。

storageプラグインヘルパーの注意点

第一引数は文字列である必要があります。また、この引数で渡された文字列はグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

thread

threadプラグインヘルパーはプラグインで新たにThreadを立てる必要がある際の決まったコードをカプセル化し、より簡易に利用できるようにする機能を提供します。

このプラグインヘルパーは

helpers :thread
# ...
def start(conf)
  super
  thread_create(:example_usage, &method(:run))
  # ...
end

def run
  # executed on other thread
end

# And thread lifecycle is managed by thread plugin helper automatically.

のようにして利用します。 thread_create のブロックは必須です。このブロックが別スレッドで実行されます。 このプラグインはevent_loopプラグインヘルパーの中で使用され、また、timerプラグインがevent_loopプラグインヘルパーに依存しているため、知らず知らずの内に使っている事が多いです。 明示的に使用している例としてはin_dummyプラグインがあります。

threadプラグインヘルパーの注意点

第一引数はシンボルである必要があります。また、この引数で渡されたシンボルはグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

timer

timerプラグインヘルパーはプラグインで新たに高精度なタイマーを実装する必要がある際の決まったコードをカプセル化し、より簡易に利用できるようにする機能を提供します。

このプラグインヘルパーは

helpers :timer
# ...
config_param :emit_interval, :time, default: 60
#...
def start(conf)
  super
  timer_execute(:example_timer, @emit_interval, &method(:on_timer))
  # ...
end

def on_timer
  # periodically execution block by timer
end

# And timer lifecycle is managed by timer plugin helper automatically.

のようにして利用します。 timer_execute のブロックは必須です。このブロックが別スレッドで実行されます。

また、一回のみの実行で良い場合は repeat: false をtimer_executeに渡します。

timer_execute(:example_timer, @emit_interval, repeat: false) do
  # one-shot timer execution block
end

in_gc_statプラグインではこのプラグインヘルパーを使っています。

timerプラグインヘルパーの注意点

第一引数はシンボルである必要があります。また、この引数で渡されたシンボルはグローバルになるのでプラグイン名_使用用途としておくことが推奨されています。

まとめ

執筆時点でのプラグインヘルパーの使い方と使われている箇所について解説しました。 v0.14ではプラグインでよく使われる定型の処理についてより簡易に扱えるようにプラグインヘルパーが実装されました。 プラグインでより高度なことをするのにためらっていたプラグイン開発者はv0.14向けのプラグインではこれらのプラグインヘルパーを使い、よりよいプラグインを目指されてみてはいかがでしょうか?

  1. fluent-plugin-xxxやfluent-logger-xxxが多い