ククログ

株式会社クリアコード > ククログ > Fluentdにおける yajl-ruby から json への移行

Fluentdにおける yajl-ruby から json への移行

Fluentdでは、JSONデータのパースにおいて、長らく yajl-ruby gem に依存してきました。 しかし、よりモダンで標準的な環境への移行を目指し、標準ライブラリである json gem への置き換えを進めています。 Fluentd v1.19.0 リリース時に当時対応可能な箇所は移行しましたが、ストリーミングで送られてくるJSONデータのパースにおいて yajl-ruby gem の依存を消せずにいました。 しかし、最新の json gem にストリーミング処理を可能にする JSON::ResumableParser が導入されることで、ついに完全移行への道が開かれました。

本記事では、この移行の過程で直面した技術的な課題と、RubyのC拡張におけるディープなバグの発見、そしてアップストリームへの還元について解説します。

背景:なぜ yajl-ruby を置き換えるのか

Fluentdの in_forwardin_unix、各種 exec 系のプラグインでは、ストリームとして連続的に流れてくるJSONの断片を効率よくパースする必要があります。これまでは yajl-ruby がその役割を長らく担ってきました。

しかし、yajl-ruby が内部で使用している Ruby の C API はすでに deprecated となっており、今後の Ruby 4.1 ではついにその API 自体が完全に削除される予定です(ruby/ruby#15447)。yajl-ruby のアップストリームでも対応パッチ自体はマージされているものの、一向に新バージョンとしてリリースされない状態が続いています(brianmario/yajl-ruby#233)。

このような背景から、このまま yajl-ruby に依存し続けると、今後新しくリリースされる Ruby のバージョンで Fluentd を安定してメンテナンスし続けることが困難になるという強い危機感がありました。

幸いなことに、次期バージョンの標準ライブラリである json gem に、ストリーミングパースを可能にする JSON::ResumableParser が導入されます。今回はこの最新のパーサーを利用し、将来の Ruby バージョンでも安全に動作し続けるよう、Fluentd内部のパース処理を置き換える実験的な取り組みを行いました。

新しいストリーミングパーサーの使い方

Yajl::ParserJSON::ResumableParser では、パース結果を受け取るアーキテクチャが大きく異なります。

従来の yajl-ruby は、コールバック型でした。チャンクを流し込むと、JSONオブジェクトが完成したタイミングでブロックが発火します。

# これまでの yajl-ruby の使い方
yajl_parser = Yajl::Parser.new
yajl_parser.on_parse_complete = ->(record) {
  # JSONパース完了時の処理
}

# ストリームから受信したチャンクデータをparserに入力しパースさせる
yajl_parser << payload

一方、新しい JSON::ResumableParser は、チャンクを流し込んだ後、parse メソッドを呼んで自発的に値を取り出す必要があります。

# 新しい JSON::ResumableParser の使い方
json_parser = JSON::ResumableParser.new({})

# ストリームから受信したチャンクデータをparserに入力する
json_parser << payload

# parse が true を返す(=完全なJSONオブジェクトが抽出できる)間ループする
while json_parser.parse
  record = json_parser.value
  # JSONパース完了時の処理
end

このインターフェースの変更に伴い、Fluentd内部の実装を書き換えていく必要がありました。

移行の過程で立ちはだかった技術的課題

単にクラス名を差し替えるだけで終わらないのが、非同期I/Oやプロセス間通信(IPC)を多用するFluentdの難しいところです。移行作業では、主に3つの壁にぶつかりました。

1. ストリーム読み込みにおけるデッドロックの回避

out_exec_filter のような外部プロセスとパイプで対話するプラグインにおいて、単純に io.read を使ってパースしようとすると、バッファが埋まるかEOFが来るまで処理がブロックされ、デッドロック(タイムアウト)に陥る問題が発生しました。

これは、非同期ストリーム処理の鉄則に従い、データが1バイトでも到達したらすぐに読み込んで返す io.readpartial を使用し、ストリームの終了を EOFError で適切に捕捉するように実装を修正することで解決しました。

2. チャンク境界をまたぐパース時のステート破壊

検証を進めると、ストリーミング特有のバグを発見しました。

JSONの文字列の途中でチャンクが千切れた状態のままパーサーが中断し、その後に次のチャンクが追加されてバッファが再確保された際、C拡張内部で保持していたポインタやオフセットが壊れてしまうというものです。 これにより、パース自体は成功しているように見えて、JSON::ParserError が遅れて顕在化するという非常に厄介な挙動を引き起こしていました。

アップストリームへ報告し、即座に修正していただくことが出来ました。

3. ネットワークバッファとRubyの「Shared String(共有文字列)」の不具合

さらに検証を進めていると、もうひとつ問題に遭遇しました。in_forward などでTCP経由で大量のJSONを受信した際に発生しました。

Fluentdのネットワーク層は、巨大な受信バッファから部分文字列を切り出してパーサーに渡します。Rubyはこの時、メモリを節約するために独立したコピーを作らず、元の巨大なバッファへのポインタを持つ「Shared String(共有文字列)」を生成します。

今回導入を試みた JSON::ResumableParser のC拡張側では、バッファのインプレースなメモリの詰め直しを行う際、この共有状態を解除せずに文字列の長さを変更しようとしてしまっていました。これにより、RubyのVMが共有メモリの保護違反として例外を引き起こす状態になっていました。

そこで、こちらも問題を再現できる最小のスクリプトを用意しアップストリームへ報告しました。

Fluentdのような高負荷なインフラミドルウェアでの負荷テストにより、Ruby標準ライブラリの重大なエッジケースをリリース前に発見・修正することに貢献できました。

さいごに

yajl-ruby から json gem への移行作業はまだ検証段階ですが、修正されたパーサーを用いたCPU負荷やメモリ使用量についてもクリアし、確かな手応えを掴んでいます。

クリアコードはFluentdのサポートサービスやプラグイン開発を行っています。 td-agentやFluent Packageのアップデート、あるいはRubyのC拡張に絡むようなディープなトラブルシューティングについてもサポートいたしますので、詳しくはFluentdのサポートサービスをご覧いただき、お問い合わせフォームよりお気軽にお問い合わせください。

日本コミュニティ向けのXアカウントでは、日々、Fluentdに関する情報を発信しております。 ぜひ @fluentd_jp をフォローしてください!