ククログ

株式会社クリアコード > ククログ > FluentdのInputプラグインでProtocol Buffersを扱う

FluentdのInputプラグインでProtocol Buffersを扱う

はじめに

Fluentdの周辺のIssueチケットを見ていたらProtocol BuffersをFluentdのInputプラグインで扱えると面白そう1ということで、対応を考えてみた畑ケです。

クリアコードはFluentdの開発に参加しています。

Fluentdにはプラグインというしくみがあり、たくさんのプラグインが開発されています。 Fluentdのプラグインには、Parserプラグインという種類のプラグインがあります。 この種類のプラグインはInputプラグインが受け取ったテキストやバイナリーデータをFluentdで扱いやすくするために使用されます。

Protocol Buffersとは

Protocol Buffersとは、プログラミング言語間の差異を吸収してデータのやり取りを行うデータ形式です。 データ構造を定義して、それをいくつかの言語のProtocol Buffersを扱える表現にコンパイルすることでProtocol Buffersの形式にシリアライズしたり、デシリアライズできます。

例えば、Goで書いたプログラムでProtocol Buffers形式でシリアライズされたデータをC++で書いたプログラムからProtocol Buffersのデータ定義を用いて元のデータを復元できます。

FluentdにProtocol Buffersを組み込む

Fluentdにはin_tcpin_httpのような<parse>ディレクティブを処理できるInputプラグインがあります。InputプラグインはParserプラグインとしてテキストやバイナリーのパースの処理をプラグインとして持ってこれるものがあります。この任意のパース処理を差し替えることができるようにする仕組みがParserプラグインです。

まとめると、FluentdでProtocol Buffersを処理するのに最適な場所はParserです。

そこで、InputプラグインでProtocol Buffersでシリアライズされたバイナリデータを処理できるようにProtocol Buffersを処理するParserプラグインを作成しました。

例えば、このParserプラグインを使ったin_httpプラグインを使ったHTTPエンドポイントを立てると、 FluentdがProtocol BuffersのAPIのHTTPエンドポイントとして振る舞うことができ、色んなシステムと直接つなげることができます。

fluent-plugin-parser-protobufの使い方

fluent-plugin-parser-protobufはProtocol Buffersのコンパイラーが必要です。この記事ではProtocol Buffers v3の場合について解説します。

Protocol Buffers v3のコンパイラーや各言語ごとのライブラリーは https://github.com/protocolbuffers/protobuf/releases からダウンロードできます。 Protocol Buffers v3をFluentdで使う手順では、Protocol Buffersのコンパイラーであるprotocがインストール済みであると仮定して解説をします。

Protocol BuffersのIDLの文法はProtocol Buffersの公式ドキュメントの概要を参照してください。

例えば、下記のようなprotobufのIDLを作成します。

simple.proto
syntax = "proto3";
import "google/protobuf/timestamp.proto";

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
  enum Corpus {
    UNIVERSAL = 0;
    WEB = 1;
    IMAGES = 2;
    LOCAL = 3;
    NEWS = 4;
    PRODUCTS = 5;
    VIDEO = 6;
  }
  Corpus corpus = 4;
  google.protobuf.Timestamp timestamp = 5;
}

これを、protocでコンパイルします。

$ protoc --proto_path=/path/to/idl --ruby_out=/path/to/output simple.proto

すると、下記のRubyのクラスが生成されます。

simple_pb.rb
# Generated by the protocol buffer compiler.  DO NOT EDIT!
# source: simple.proto

require 'google/protobuf'

require 'google/protobuf/timestamp_pb'
Google::Protobuf::DescriptorPool.generated_pool.build do
  add_file("simple.proto", :syntax => :proto3) do
    add_message "SearchRequest" do
      optional :query, :string, 1
      optional :page_number, :int32, 2
      optional :result_per_page, :int32, 3
      optional :corpus, :enum, 4, "SearchRequest.Corpus"
      optional :timestamp, :message, 5, "google.protobuf.Timestamp"
    end
    add_enum "SearchRequest.Corpus" do
      value :UNIVERSAL, 0
      value :WEB, 1
      value :IMAGES, 2
      value :LOCAL, 3
      value :NEWS, 4
      value :PRODUCTS, 5
      value :VIDEO, 6
    end
  end
end

SearchRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("SearchRequest").msgclass
SearchRequest::Corpus = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("SearchRequest.Corpus").enummodule

このprotobufクラスがあれば、fluent-plugin-parser-protobufにProtocol Buffersの定義を読み込ませることができます。 <parse>ディレクティブの中にprotobufパーサーに関連する設定を書きます。

fluent.conf
<source>
  @type http
  port 8080
  <parse>
    @type protobuf
    class_name SearchRequest
    class_file "#{File.expand_path(File.join('path', 'to', 'simple_pb.rb'))}"
    protobuf_version protobuf3
  </parse>
</source>
<match protobuf>
  @type stdout
</match>

疎通確認テストとしてHTTPでProtocol Buffersでシリアライズしたリクエストを送るようにします。 そのため、以下のRubyスクリプトを用意します。

test_http.rb
require 'google/protobuf'
require "net/http"

require_relative "path/to/simple_pb"

def encoded_simple_binary
  request = SearchRequest.new(query: "q=Fluentd",
                              page_number: 404,
                              result_per_page: 10,
                              corpus: :WEB,
                              timestamp: Time.now)
  SearchRequest.encode(request)
end

uri = URI.parse("http://localhost:8080/protobuf")
params = encoded_simple_binary
req = Net::HTTP.new(uri.host, uri.port)
req.post(uri.path, params.to_s)
実行結果

Fluentdとテストスクリプトはそれぞれ、別の端末で実行します。

$ bundle exec ruby test_http.rb
$ bundle exec fluentd -c fluent.conf -p lib/fluent/plugin
<snip>
2020-06-01 16:45:43 +0900 [info]: #0 fluentd worker is now running worker=0
2020-06-01 16:45:46.377515000 +0900 protobuf: {"query":"q=Fluentd","page_number":404,"result_per_page":10,"corpus":"WEB","timestamp":{"seconds":1590997546,"nanos":371940000}}

最後の2020-06-01 16:45:46.377515000 +0900 protobuf: {"query":"q=Fluentd","page_number":404,"result_per_page":10,"corpus":"WEB","timestamp":{"seconds":1590997546,"nanos":371940000}} により、Protocol BuffersでシリアライズされていたHTTPリクエストbodyがfluent-plugin-parser-protobufによりパースされ、Hashオブジェクトに分解されているのが確認できます。

まとめ

FluentdでProtocol Buffersを扱うにはどのようにしたら良いのかの方針を立て、実際にProtocol Buffersをパースするための手順を解説しました。 記事で解説した方法でFluentdがProtocol BuffersのAPIのHTTPエンドポイントとして振る舞うことができればより色んなシステムと直接つなげることができます。 今回のParserプラグインでは取り込むだけですが、FluentdのProtocol BuffersのFormatterプラグインを作成するとProtocol Buffersを用いてデータのやり取りを行うシステムに直接データを送ることができるようになるでしょう。

当社では、お客さまからの技術的なご質問・ご依頼に有償にて対応するFluentdサポートサービスを提供しています。Fluentd/Fluent Bitをエンタープライズ環境において導入/運用されるSIer様、サービス提供事業者様は、お問い合わせフォームよりお問い合わせください。

  1. https://github.com/fluent/fluentd/issues/3000