株式会社クリアコード > ククログ

ククログ


PLAZMA OSS Day: TD Tech Talk 2018 – Cooperative works for Fluentd Community

PLAZMA OSS Day: TD Tech Talk 2018 にて登壇しました。 fluent-plugin-kafkaのスループット問題を解消するために開発したkafka-connect-fluentdの紹介とfluent-plugins-nurseryに代表されるFluentdのプラグインを引き取り、メンテナンスしている事例の概要を紹介しました。

内容

  • kafka-connect-fluentdについての現状とベンチマーク
  • Fluentdのプラグイン周りのメンテナンスの実績

を紹介しました。

スライドにもありますが、Fluentdのプラグインの作者によるメンテナンスが滞っているプラグインがよく見かけられます。そのため、よく使われているプラグインのうち、作者がメンテナンスをする時間が取れないものに関しては引き取ることにしました。その引き取り場所として、fluent-plugins-nursery という organization を作成しました。

まとめ

PLAZMA OSS Day: TD Tech Talk 2018で先日リリースしたkafka-connect-fluentdの紹介と、Fluentd周りを良くして行く活動の実績を紹介しました。

タグ: Fluentd
2018-02-16

Firefox ESR60以降でのPolicy Engineによるポリシー定義の手順

Firefox ESR60以降のバージョンでは、Policy Engineと呼ばれる新しいポリシー設定の仕組みが導入されます。近いうちに公式なドキュメントが用意されるものと予想されますが、Firefoxを法人で利用中の場合、一日も早く実際の動作を検証したいというニーズもある事でしょう。この記事では、現時点で実装されているPolicy Engineの具体的な使用手順を解説します。

準備

Policy EngineはNightly 60で既に使用可能な状態になっています。Firefox開発版のダウンロードページから最新の開発版であるNightlyをダウンロードし、インストールしておきます。

次に、Nightlyの起動用ショートカットを編集します。Firefoxは通常版と開発版(Nightly)でプロファイルの内容に互換性が無い場合があり、普段使いのFirefoxのプロファイルでNightlyを起動してしまうと、不可逆的な移行処理が行われてしまって、以後通常版のFirefoxでそのプロファイルを使用できなくなる可能性があります。Firefoxのショートカットのプロパティを開き、「リンク先」欄の末尾にC:\Program Files\Nightly\firefox.exe -profile %temp%\NightlyProfile -no-remoteのように起動プロファイルを明示しておく事で、普段使いのプロファイルとは別の専用プロファイルでNightlyを起動できるようになります。

Nightlyの準備が終わったら、Policy Engineの設定の準備です。

Policy Engine用のポリシー設定は、Windowsの場合はレジストリ経由(グループポリシーオブジェクトでの設定)とJSONファイル経由での設定の2通りの方法があります。 簡単のため、ここではJSONファイルを使う方法のみ解説します。

ポリシー設定のためのJSONファイルは、Firefoxのインストール先フォルダ配下にdistributionという名前でフォルダを作成し、さらにその中にpolicies.jsonという名前で以下の内容のテキストファイル(JSON形式)を設置します。

{
  "policies": {
  }
}

以上で準備は完了です。

ポリシー設定の記述の仕方

JSONファイルでのポリシー設定はpolicies.jsonpoliciesのプロパティとして記述します。現時点で使用できるポリシー設定にどのような物かがあるかはpolicies.jsonのスキーマ定義に列挙されています。

例えば、about:configで設定を変更される事とFirefoxの自動更新のそれぞれを禁止したい場合、policies.jsonは以下のように記述します。

{
  "policies": {
    "BlockAboutConfig": true,
    "DisableAppUpdate": true
  }
}

2018年3月9日現在、以下のポリシー設定が存在します。これらは仕様が変更または削除される可能性がある事、あるいは新しいポリシー設定が今後追加される可能性がある事に注意して下さい。

  • "BlockAboutAddons": trueabout:addons(アドオンマネージャ)の使用を禁止する。間接的に、アドオンのインストールを禁止する効果がある。
  • "BlockAboutConfig": trueabout:configの使用を禁止する。同時に、副作用として"DisableDeveloperTools": trueの効果も反映される。
  • "BlockAboutProfiles": trueabout:profilesの使用を禁止する。
  • "BlockAboutSupport": trueabout:supportの使用を禁止する。
  • "BlockSetDesktopBackground": true :画像をコンテキストメニューからデスクトップの壁紙に設定する機能の使用を禁止する。
  • "Bookmarks": [{"Title": "...", "URL": "...", "Favicon": "...", "Placement": "toolbar", "Folder": true/false }, ...] :ブックマークツールバーに既定のブックマーク項目を追加する。
  • "Bookmarks": [{"Title": "...", "URL": "...", "Favicon": "...", "Placement": "menu", "Folder": true/false }, ...] :ブックマークメニューに既定のブックマーク項目を追加する。
  • "Cookies": { "Allow": ["http://example.com", "https://example.org:8080"] } :指定のWebサイト(オリジンで指定)でCookie、IndexedDB、Web Storage、およびService Worker用Cacheを保存する(任意に無効化はできない)。
  • "Cookies": { "Block": ["http://example.com", "https://example.org:8080"] } :指定のWebサイト(オリジンで指定)でCookie、IndexedDB、Web Storage、およびService Worker用Cacheを保存しない(任意に有効化はできない)。また、これらのホストに保存済みのCookieがあった場合、それらは削除される。
  • "CreateMasterPassword": false : マスターパスワードを設定する事を禁止する。
  • "DisableAppUpdate": true :Firefoxの自動更新を停止する。
  • "DisableDeveloperTools": true :開発ツールの使用を禁止する。
  • "DisableFirefoxAccounts": true :Firefoxアカウントの使用を禁止する(ひいては、Firefox Syncの使用も禁止される)。
  • "DisableFirefoxScreenshots": true :Firefox Screenshotsの使用を禁止する。
  • "DisableFirefoxStudies": trueFirefoxの新機能のテストへの参加を禁止する。
  • "DisableFormHistory": true :フォームの入力履歴の保存とオートコンプリートを禁止する。
  • "DisablePocket": true :Pocketの使用を禁止する。
  • "DisablePrivateBrowsing": true :プライベートブラウジング機能の使用を禁止する。
  • "DisableSysAddonUpdate": true :システムアドオンの更新を禁止する。
  • "DisplayBookmarksToolbar": true :初期状態でブックマークツールバーを表示する。(ただしこの設定は強制でなく、ユーザーが任意に非表示にする事もでき、非表示にした場合は次回以降の起動時も非表示のままとなる。)
  • "DisplayMenuBar": true :初期状態でメニューバーを表示する。(ただしこの設定は強制でなく、ユーザーが任意に非表示にする事もでき、非表示にした場合は次回以降の起動時も非表示のままとなる。)
  • "DontCheckDefaultBrowser": true :起動時に既定のブラウザにするかどうかを確認しない。
  • "FlashPlugin": { "Allow": ["http://example.com", "https://example.org:8080"] } :指定のWebサイト(オリジンで指定)でAdobe FlashをClick to Play無しで自動実行する(任意に無効化はできない)。
  • "FlashPlugin": { "Block": ["http://example.com", "https://example.org:8080"] } :指定のWebサイト(オリジンで指定)でAdobe Flashの実行を禁止する(任意に有効化はできない)。
  • "Homepage": { "URL": "http://example.com", "Locked": true/false, "Additional": ["https://example.org:8080", ...] } :既定のホームページを設定する。"Locked"trueの場合はホームページを固定する。また、"Additional"を指定した場合は2番目以降のホームページとして設定する。
  • "InstallAddons": { "Allow": ["https://example.com", "https://example.org:8080"] } :指定のWebサイト(オリジンで指定)でアドオンのインストール時に警告しない(httpsのみ指定可能)。
  • "Popups": { "Allow": ["http://example.com", "https://example.org:8080"] } :指定のWebサイト(オリジンで指定)でwindow.open()によるポップアップを常に自動的に開く(任意に無効化はできない)。
  • "RememberPasswords": true/false :パスワードマネージャの使用または使用禁止を強制する。

ポリシー設定はFirefoxの起動時に読み込まれます。Firefoxの動作中に変更したポリシー設定は、次回のFirefox起動時から反映されます。

まとめ

以上、Firefox ESR60から使用可能になるPolicy Engineによるポリシー設定の手順を解説しました。

従来Firefoxでは、AutoConfigの一般的な設定でできないカスタマイズが必要な場合、アドオンやAutoConfig内に埋め込まれたスクリプトによってそれらを強引に実施するという方法を取る事ができました。ESR60以降のバージョンではXULアドオンが廃止され、またAutoConfigのスクリプト内での特権が必要なコードの実行が禁止される見込みであることから、それらの方法は取れなくなります。Policy Engineで実現可能な部分はPolicy Engineで設定するように、今のうちに備えておくようにしましょう。

タグ: Mozilla
2018-02-19

Node-REDをYoctoレシピによりインストールする

2017年7月6日の記事で紹介した通り、クリアコードは組み込みLinux向けにMozilla Firefoxを移植するプロジェクトGecko EmbeddedWebDINO Japan(旧Mozilla Japan)様と共同で立ち上げ、開発を進めております。Yoctoを使用してFirefoxをビルドしたりハードウェアクセラレーションを有効化する際のノウハウを蓄積して公開することで、同じ問題に悩む開発者の助けになることを目指しています。

この記事では、Gecko Embedded本体ではなく周辺のミドルウェアのNode-REDをRZ/G1で動かす話を書きます。 Node-REDは、ハードウェアデバイス/APIおよびオンラインサービスを接続するためのツールです。

現在のステータス

ターゲットボード

2月時点では、Node-REDのビルド及び動作は以下のボードで確認しています。

ビルド方法

レシピはGitHubにて公開されているものを使用します。

Yoctoに組み込むには、meta-nodejs-contribを git clone したのち、(bitbakeのビルドディレクトリ)/conf/local.confへ

IMAGE_INSTALL_append = " node-red nodejs nodejs-npm "
PREFERRED_VERSION_nodejs = "6.11.2"
PREFERRED_VERSION_nodejs-native = "6.11.2"

(bitbakeのビルドディレクトリ)/conf/bblayers.confへ

BBLAYEYS += " ${TOPDIR}/../meta-nodejs "
BBLAYEYS += " ${TOPDIR}/../meta-nodejs-contrib "

をそれぞれ追加し、bitbakeを実行します。 ここで、RZ/G1向けのYoctoではNode.js 6.11.2の動作確認が取れているため、 PREFERRED_VERSION_nodejs には 6.11.2 を指定しています。

動作確認

Node-REDを上記の設定でビルドした場合、node-red コマンドが起動イメージにインストールされます。

$ node-red
21 Feb 04:53:01 - [info] 

Welcome to Node-RED
===================

21 Feb 04:53:01 - [info] Node-RED version: v0.17.5
21 Feb 04:53:01 - [info] Node.js  version: v6.11.2
21 Feb 04:53:01 - [info] Linux 4.4.55-cip3 arm LE
21 Feb 04:53:07 - [info] Loading palette nodes
21 Feb 04:53:12 - [warn] ------------------------------------------------------
21 Feb 04:53:12 - [warn] [rpi-gpio] Info : Ignoring Raspberry Pi specific node
21 Feb 04:53:12 - [warn] ------------------------------------------------------
21 Feb 04:53:12 - [info] Settings file  : /home/root/.node-red/settings.js
21 Feb 04:53:12 - [info] User directory : /home/root/.node-red
21 Feb 04:53:12 - [info] Flows file     : /home/root/.node-red/flows_iwg20m.json
21 Feb 04:53:12 - [info] Creating new flow file
21 Feb 04:53:12 - [info] Starting flows
21 Feb 04:53:12 - [info] Started flows
21 Feb 04:53:12 - [info] Server now running at http://127.0.0.1:1880/

となれば動作確認は完了です。 Node-REDの使い方に関してはNode-REDの公式サイトのドキュメントを参照してください。

まとめ

Node-REDのYoctoレシピを用いてRZ/Gシリーズのボードに載せた話を紹介しました。

2018-02-22

Rubyとクリアコード #ruby25th

これはRuby25周年へのメッセージです。

クリアコードの社長の須藤です。そんなにコミットしていないのであんまり自分から言わないのですが、Rubyのコミット権を持っています。

同い年のRubyコミッター8人の中では一番最初にコミット権をもらいました。2004年の1月のことなので、なんともう14年前!当時は大学生だったのですが、すごくドキドキしたことを覚えています。私がコミット権をもらったのは自分が作っているライブラリーがRuby本体に取り込まれたからなんですが、会ったこともない人がしっくりくると推薦してくれたことがうれしかったです。Rubyが使いやすいなぁと思って使っていたので、他のRubyistからしっくりくると思ってもらえる使い勝手のライブラリーを作れているのがわかったのが嬉しかったんでしょうねぇ。

クリアコードが始まったのは私が社会人になった2006年の7月で、私は(たしか)9月からクリアコードに合流しました。私の社会人歴のほぼすべてはクリアコードでのものです。

クリアコードはフリーソフトウェアを推進するのが一番大事な会社であって、Rubyを応援するのが一番大事な会社ではないので、クリアコードのメンバーみんながRubyistというわけではありません。クリアコードを始めた当時、Rubyistは私だけでした。

私はフリーソフトウェアも推進したいしRubyも応援したかったので、なにかしらRubyを活かせる場所をみつけてRubyを活用していました。たとえば、独立行政法人情報処理推進機構(IPA)平成20年度オープンソフトウェア利用促進事業(リンク切れ)が「迷惑メール対策でなにか」みたいなテーマで募集していたときは、「Rubyを組み込んだ迷惑メール対策システム」を応募しました。それに採択されてお金をもらって開発したのがmilter managerというフリーソフトウェアです。「Rubyを組み込むと動的にいろいろできて捗るよ!」というようにRubyを活かす場所を考えました。

milter managerの開発を始めたのが10年前なのですが、実は、milter managerきっかけでいくつかRubyをよくしたことがあります。1つがメモリーリークの修正で、もう1つが拡張ライブラリーのメモリー使用量を抑えやすくするAPIの追加です。

前者は再現スクリプトを作るのに数週間とか使った気がする(もちろん業務時間内でやっていた)ので、なかなか大変だったなぁという記憶があります。作業していたのは私ではないですが。

後者はmilter managerを開発し始めてから8年後の東京Ruby会議11での発表がきっかけで話が進みました。なにがつながるかわからないものですね。

直接お金を稼いでいるわけではないですが、仕事ですごくRubyを活用している例があります。それはRabbitという私がRubyを使って開発しているプレゼンテーションツールです。

クリアコードは「お客さんを探す」ではなく「お客さんに見つけてもらう」という仕事の探し方をしているので、クリアコードがいろいろ情報発信をすることはお金を稼ぐためにとても大事なことです。ここにいろいろ記事を書くこともそうですし、イベントで発表することもそうです。そして、イベントを発表するときに役立つのがRabbitです。

Rabbitは私が大学にいたときに研究関係の発表をするために作り始めたものです。私にとってはRubyで作ることが大事だったので、プレゼンテーションツールを作るためにRubyでできないことがあれば、それらをRubyでできるようにしながら作ってきました。たとえば、PDF出力機能GUI・画像処理・マルチメディア機能などです。これらの機能があるからRubyを使っているという人がいるといいな。

Rabbitはすごくヒットしているツールではありません。RubyのイベントでもRabbitを使っている人は極少数派です。ただ、まつもとさんがMagicPointからRabbitに乗り換えたので、ヒットしていなくても私は満足です。みんながまつもとさんのいい話を聴けるのは私のおかげでもあるはず!(RubyKaigi 2017でまつもとさんに教えてもらったRabbitの問題の修正は25周年イベントには間に合わなかったなぁ。残念。)

本当のところを言うと、私は自分が使うために作っているのでユーザーが私だけでも満足だったりします。言い方を変えると、ヒットしていようがしていまいが私は別にどうでもいいです。そういえば、ここ数年、なぜか私以外のクリアコードのメンバーもRabbitを使っているのが不思議です。特に強制していないはずなんですが。。。

Ruby25周年ということで、クリアコードでのRubyの関わり方を一部紹介しました。Rubyを積極的に使っていく(人がいる)し、Rubyを使う中で得られた知見はRuby本体にフィードバックするし、まつもとさんのプレゼンをツールでサポートする、とかやっています。そうそう、RubyKaigiのスポンサーとしてお金を出すというのもやっていました。

これからも引き続き同じような感じでRubyと関わっていくつもりです。近い将来、Red Data Tools関連のことでも稼げるようになるといいなぁと思っています。今はあまり稼げていませんが、開発中に気づいたKeyErrorの改良案をRuby 2.6に入れたり、Rubyのcsvをよくしたり、Rubyでできることを増やしたり、といった点でRubyをよくすることはでき始めています。

タグ: Ruby
2018-02-23

Kafka ConnectのConnector開発入門

kafka-connect-fluentdを開発したので、その際に得た知見をまとめます。

Kafka Connectとは、簡単に説明するとKafka Consumersまたは、Kafka Producersの一種です。

あるデータソースからKafkaにデータを投入(Kafka Producers)したり、Kafkaから取り出したデータを別のところに流し(Kafka Consumers)たりするときにKafka ConsumersやKafka Producersでは、定型的なコードをたくさん書く必要がありました。 それを汎用化して、データを取り扱う箇所のコードのみを書けばいいようにまとめたものかKafka Connect APIです。

Connector Developer Guideを読めば、Kafka Connectorを開発する方法が書いてあります。 しかし、プロジェクトの初期設定は面倒なものです。定型コードを自動生成できるコマンドを使うと開発を始めるのが楽になります。

$ mvn -e archetype:generate -B -DarchetypeGroupId=io.confluent.maven \
      -DarchetypeArtifactId=kafka-connect-quickstart \
      -DarchetypeVersion=0.10.0.0 \
      -Dpackage=org.fluentd.kafka \
      -DgroupId=org.fluentd.kafka \
      -DartifactId=kafka-connect-fluentd \
      -Dversion=0.0.1

参考: https://github.com/jcustenborder/kafka-connect-archtype

@jcustenborderさんは、Confluentに所属し、多くのKafka Connectorを公開している方です。

これでpom.xmlやソースコードの雛形などが一通り生成されるので、開発を始めることができます。

kafka-connect-fluentdの場合は、自動生成されたpom.xmlをbuild.gradleに変換して利用しています。

Kafka Connector Source

Kafka Producerに対応するConnectorがSourceです。あるデータソースからKafkaにデータを書き込むために使用します。

kafka-connect-fluentd の場合は、Fluentdのout_forwardからkafka-connect-fluentdのSourceにデータを投げてKafkaにデータを流します。

FluentdSourceConnector

Sourceを開発する場合、以下を実装する必要があります。kafka-connect-fluentdの場合は以下のような名前にしました。

  • FluentdSourceConnector (SourceConnectorを継承)
  • FluentdSourceConnectorConfig (AbstractConfigを継承)
  • FluentdSourceTask (SourceTaskを継承)
FluentdSourceConnector

SourceConnectorを継承し、必要なメソッドを実装します。

  • public String version()
    • バージョン文字列を返します
  • public List<Map<String, String>> taskConfigs(int taskMax)
    • マルチスレッドに対応させたい場合は、taskMaxの値に応じて返り値のListMap<String,String>を詰め込みます
    • 設定ファイル(properties)に指定した値をそのままセットします
  • public void start(Map<String, String> properties)
    • Connector開始時にやることを書きます
  • public Class<? extends Task> taskClass()
    • taskクラス (FluentdSourceTaks.class) を返します
  • public void stop()
    • Connectorの終了時にやることを書きます
  • public ConfigDef config()
    • 設定の定義(FluentdSourceConnectorConfig.conf())を返します
FluentdSourceConnectorConfig

public static ConfigDef conf()ConfigDefのAPIを使って設定を定義します。

    public static ConfigDef conf() {
        return new ConfigDef()
                .define(FLUENTD_PORT, Type.INT, 24224, Importance.HIGH,
                        "Port number to listen. Default: 24224")
                .define(FLUENTD_BIND, Type.STRING, "0.0.0.0", Importance.HIGH,
                        "Bind address to listen. Default: 0.0.0.0");
    }

define()は引数が5つあり、前から順に名前、型、デフォルト値、重要度、説明です。引数の数が異なるdefine()もありますが、詳細はjavadocを参照してください。

FluentdSourceTask

Sourceを実装する際に最も重要なクラスです。以下のメソッドを実装します。

  • public void start(Map<String, String> properties)
    • poll()から定期的にアクセスされる共通のリソースを準備します。daemonやスレッドが必要な場合は、ここで起動します。
  • public List<SourceRecord> poll()
    • Kafka Connectのフレームワーク側から定期的に呼ばれます。FluentdSourceTaskではキューに溜めたデータをList<SourceRecord>に詰めて返します。
  • public void stop()
    • このタスクを止めるときに、必要な処理があれば書きます。start で起動したdaemonやスレッドをここで停止します。
  • public String version()
    • バージョン文字列を返します。定型のコードなので詳細は省略します。

このクラスは複数のスレッドから使用される可能性があります。特にpoll()でアクセスするインスタンス変数などについてはマルチスレッドを意識したコードを書く必要があります。

Kafka Connector Sink

Kafka Consumerに対応するConnectorがSinkです。Kafkaから取り出したデータをどこかに書き込みます。

kafka-connect-fluentd の場合は、Kafkaからデータを取り出してkafka-connect-fluentdのSinkからFluentdのin_forwardへデータを流します。

FluentdSinkConnector

Sinkを開発する場合、以下を実装する必要があります。kafka-connect-fluentdの場合は以下のような名前にしました。

  • FluentdSinkConnector (SinkConnectorを継承)
  • FluentdSinkConnectorConfig (AbstractConfigを継承)
  • FluentdSinkTask (SinkTaskを継承)
FluentdSinkConnector

SinkConnectorを継承し必要なメソッドを実装します。

  • public String version()
    • バージョン文字列を返します
  • public Class<? extends Task> taskClass()
    • taskクラス(FluentdSinkTask.class)を返します
  • public List<Map<String, String>> taskConfigs(int maxTasks)
    • マルチスレッドに対応させたい場合は、taskMaxの値に応じて返り値のListMap<String,String>を詰め込みます
    • 設定ファイル(properties)に指定した値をそのままセットします
  • public void stop()
    • Connectorの終了時にやることがあれば書きます
  • public ConfigDef config()
    • 設定の定義('FluentdSinkConnectorConfig.conf()')を返します
FluentdSinkConnectorConfig

FluentdSourceConnectorConfigと同じようにpublic static ConfigDef conf()ConfigDefのAPIを使って設定を定義します。

FluentdSinkTask

Sinkを実装する際に最も重要なクラスです。以下のメソッドを実装します。

  • public String version()
    • バージョン文字列を返します。定型のコードなので詳細は省略します。
  • public void start(Map<String, String> properties)
    • このタスクを初期化します。FluentdSinkTaskではFluencyのインスタンスを作成しています。
  • public void put(Collection<SinkRecord> collection)
    • Kafkaから定期的に呼ばれます。FluentdSinkTaskではFluencyでデータをFluentdに送信しています。
  • public void flush(Map<TopicPartition, OffsetAndMetadata> map)
    • これをきちんと実装すれば exactly-once に対応できるらしいです。しかし Fluentd は exactly-once に対応していないので、今のところ単にFluencyのバッファをflushするだけにしています。
  • public void stop()
    • タスクの停止にやることがあれば、中身を実装します。FluentdSinkTaskではFluencyが全てのバッファをflushするのを待ちます。
スキーマについて

SchemaBuilderStructを利用すれば任意のデータ構造をスキーマにマッピングできるのですが、MessagePackとKafka ConnectのSchemaで表現できる型に違いがあるようです。 サポートしている型の違いついて理解を深め、差異をうまく吸収できるような実装をしたいと考えています。

パッケージング

GradleでMaven Centralにライブラリを公開する - たごもりすメモを参考にしてGradleでやりました。

依存関係にKafkaが入っているけど、配布するjarにはKafka関係のファイルを入れてはいけないことに注意が必要でした。

まとめ

思っていたよりも簡単に実装できたのでKafkaにデータを投入したり、Kafkaからデータを取り込みたいのだけどProducer/Consumerはちょっと難しいという場合にKafka Connectを検討してみるとよいのではないでしょうか。

タグ: Fluentd
2018-02-26

Fluentd用のベンチマークツールを開発しました

PLAZMA OSS Day: TD Tech Talk 2018 – Cooperative works for Fluentd Community の補足記事です。

背景

kafka-connect-fluentdfluent-pluguin-kafkaの性能を比較したいと考えたときに、高い負荷を与える簡単に使えるツールがありませんでした。

dummerとFluentd組み込みのin_tailを使用すれば、負荷をかけることができるのはわかっていましたが、高い負荷を与えるためには複数のFluentdを動作させる必要があり、セットアップや管理が煩雑になることがわかっていました。

例えば、以下のようにたくさんのFluentdプロセスで1つのFluentdに負荷をかけるような感じになります。

dummer+in_tailのイメージ

参考: https://github.com/okumin/influent-benchmark

これはこれできちんと動きますが、以下の点で手軽ではありませんでした。

  • dummer でファイルを作る必要があるのでクライアント側にも Fluentd(in_tail + out_forward) が必須になってしまう
    • in_tailはposファイルを作るので、条件を変えながら繰り返し測定しようとすると、セットアップとクリーンアップが必要となる
  • dummerで流量制御しようとするとI/Oの速度が上限になってしまう
    • I/Oの速度を越えようとすると、上の図のように複数のFluentdプロセスを起動する必要があります

fluent-benchmark-client

そこでdummer + in_tailよりも簡単に使うことができる fluent-benchmark-client というコマンドラインツールを作りました。

Fluentd Forward Protocolを扱う部分はFluencyを採用しました。*1

Kotlinを採用した理由は以下の通りです。

  • Javaで書くのと性能は変わらない
  • Javaで書かれたライブラリーは全部使える
  • IntelliJ IDEAのサポートがある
  • Null安全を試してみたかった
  • 型推論を試したかった
  • コルーチンどうなの?というのを試したかった

初めて書くので慣れてないという以外にデメリットはありませんでした。

fluent-benchmark-clientを使うと、1つのプロセスから大量のイベントをFluentdに送信することができます。

fluent-benchmark-clientのイメージ

使い方

0.5.0がリリース済みなので使えます。

$ tar xf fluent-benchmark-client-0.5.0.tar
$ cd fluent-benchmark-client
$ ./bin/fluent-benchmark-client --max-buffer-size=4g --period=1m --n-events-per-sec=1000

これで、localhost:24224に1000 events/secを1分間送ることができます。 送る内容は、{ "message": "Hello, Fluentd! This is a test message." } です。 --max-buffer-size=4gはFluencyのバッファのサイズを指定しています。これを指定することにより大きなワークロードにも対応可能です。

送信するワークロードの内容はコマンドラインオプションで変更することができます。 また、ファイルから読むこともできるので複雑なワークロードにも対応可能です。対応しているフォーマットはLTSVJSONL*2です。 ここで利用するファイルはdummerで生成することを想定しています。

工夫したところ

初期の実装では、以下のようにビジーループの中でMap<String, Object>を作っていました。さらにemitの内部でMessagePackへの変換が行われていたため速度が出ていませんでした。 約50万 events/sec 出てたかどうかも怪しいという記録が手元に残っていました。

while (true) {
    fluency.emit(tag, mapOf("message" to "Hello Kotlin!"))
}

これを以下のように、ループの外でMessagePackに変換してからemitすると、手元のマシンで200万 events/secくらいの速度でemitできるようになりました。

val buffer = ByteArrayOutputStream()
val packer = MessagePack.newDefaultPacker(buffer)
packer.packMapHeader(1)
packer.packString("message")
packer.packString("Hello Kotlin!")
packer.flush()
packer.close()
val mapValue = buffer.toByteArray()
while (true) {
   fluency.emit(tag, mapValue)
}

ファイルからLTSVやJSONLを読み込む場合も、ファイルを全部読み込んでMessagePackに変換してからemitするようにしているので、速度は変わりません。 しかし、全データをメモリーに載せるので注意が必要です。

まだできていないこと

まだ、できていないことがいくつかあります。

  • マルチワーカー化
    • Kotlinのコルーチンへの理解が進んだら着手したいです
  • SSL/TLS対応
    • Fluencyが対応したら対応したいです

まとめ

kafka-connect-fluentdとfluent-plugin-kafkaの性能を比較するために、ちょうどよいベンチマークツールがなかったのでfluent-benchmark-clientを開発しました。

*1 軽く試した感じではロガー系のライブラリーでは最速でした

*2 主にk8sの標準のロガーで出力されたログも使用できるはずです

タグ: Fluentd
2018-02-27

KafkaのメトリクスをFluentdに送信するプラグインを書きました

PLAZMA OSS Day: TD Tech Talk 2018 – Cooperative works for Fluentd Community の補足記事です。

背景

kafka-connect-fluentdfluent-pluguin-kafkaの性能を比較したいと考えたときに、同じ指標で性能を比較したいと考えていました。 kafka-connect-fluentdのFluentdSourceConnectorとfluent-plugin-kafkaのoutputプラグインはどちらもKafkaに書き込むので、Kafkaのスループットを見ればよいはずです。

幸いKafkaにはメトリックスを取るためのAPIが提供されていました。

しかしKafkaに組み込みのKafkaCSVMetricsReporterは、動きませんでした*1。他にもいくつか既存のMetricsReporterを探してみましたがちょうどよいものはありませんでした。 Kafkaのメトリックスも、同じフィールドに型の異なる値が入っていることがあり、そのままでは扱いづらいものでした。

そこで、全てのメトリックスをFluentdに流せば、それ以降はFluentdのプラグインで好きなように加工できるので素早くデータを可視化するための環境を作ることができそうだと考えました。 またKafkaの提供するメトリックスは大量にあるので、kafka-connect-fluentdとfluent-plugin-kafkaの性能比較に使えるものだけを選別することも簡単にできそうだと考えていました。

kafka-fluent-metrics-reporter

kafka-fluent-metrics-reporterを作りました。

KafkaのプラグインはScalaで書かれていたり、Javaで書かれていたり、Kotlinで書かれていたり様々なので、これもKotlinで実装しました。

特に工夫したところはなく、Kafkaの提供してくれるメトリックスをmapにつめてFluentdに送るだけでした。

使い方

こちらも未リリースなので自分でビルドする必要があります。

$ git clone https://github.com/okkez/kafka-fluent-metrics-reporter.git
$ cd kafka-fluent-metrics-reporter
$ ./gradlew shadowJar
$ cp build/libs/kafka-fluent-metrics-reporter-1.0-SNAPSHOT-all.jar /path/to/kafka_2.11-1.0.0/libs

以下の設定をKafka Serverの設定ファイル server.properties に追加します。

kafka.metrics.reporters=org.fluentd.kafka.metrics.KafkaFluentMetricsReporter
kafka.metrics.polling.interval.secs=5
kafka.fluent.metrics.enabled=true
kafka.fluent.metrics.host=localhost
kafka.fluent.metrics.port=24224
kafka.fluent.metrics.tagPrefix=kafka-metrics

Fluentd側は以下のように設定します。このように設定してしばらく流してみると、大体様子がわかると思います。

<source>
  @type forward
  port 24224
</source>

<match kafka-metrics.**>
  @type copy
  <store>
    @type file
    path log/${tag}
    <buffer tag>
    </buffer>
  </store>
  <store>
    @type stdout
  </store>
</match>

タグにメトリックスの名前が入ってくるので、必要なメトリックスをタグで絞り込むことができます。

まとめ

kafka-fluent-metrics-reporterを使うことで、Fluentdを経由してKafkaのメトリックスを簡単に可視化することができました。

InfluxDBに直接流すものやPrometheus用のexporterなどもありましたが、Fluentdに流すものはなかったので作りました。

参考

他の Kafka metrics reporter の実装例です。

*1 問題は報告済みです

タグ: Fluentd
2018-02-28

«前月 最新 翌月»
2008|05|06|07|08|09|10|11|12|
2009|01|02|03|04|05|06|07|08|09|10|11|12|
2010|01|02|03|04|05|06|07|08|09|10|11|12|
2011|01|02|03|04|05|06|07|08|09|10|11|12|
2012|01|02|03|04|05|06|07|08|09|10|11|12|
2013|01|02|03|04|05|06|07|08|09|10|11|12|
2014|01|02|03|04|05|06|07|08|09|10|11|12|
2015|01|02|03|04|05|06|07|08|09|10|11|12|
2016|01|02|03|04|05|06|07|08|09|10|11|12|
2017|01|02|03|04|05|06|07|08|09|10|11|12|
2018|01|02|03|04|05|06|07|08|09|
タグ: